Correlation
__regenerate_correlation_values
async
__regenerate_correlation_values(session: AsyncSession, correlation_threshold: int) -> bool
Method to regenerate the amount of correlations for the values with correlations. :return: if the database was changed :rtype: bool
__regenerate_over_correlating
async
__regenerate_over_correlating(session: AsyncSession, correlation_threshold: int) -> bool
Method to regenerate the amount of correlations for the over correlating values. :return: if the database was changed :rtype: bool
regenerate_occurrences_job
async
regenerate_occurrences_job(ctx: WrappedContext[None], user: UserData) -> DatabaseChangedResponse
Method to regenerate the occurrences of the correlations in the database. Over correlating values and values with correlations are checked. :param user: the user who requested the job :type user: UserData :return: if the job was successful and if the database was changed :rtype: DatabaseChangedResponse
clean_excluded_correlations_job
async
clean_excluded_correlations_job(ctx: WrappedContext[None], user: UserData) -> DatabaseChangedResponse
Task to clean the excluded correlations from the correlations of the MISP database. For every excluded value the correlations are removed. :param user: the user who requested the job :type user: UserData :return: if the job was successful and if the database was changed :rtype: DatabaseChangedResponse
__process_result
async
__process_result(session: AsyncSession, plugin_name: str, value: str, result: InternPluginResult | None) -> CorrelationResponse
Processes the result of the plugin. :param result: the result of the plugin :type result: InternPluginResult :return: a response with the result of the plugin :rtype: CorrelationResponse :raises: PluginExecutionException: If the result of the plugin is invalid.
correlation_job
async
correlation_job(ctx: WrappedContext[None], user: UserData, data: CorrelationJobData) -> CorrelationResponse
Method to execute a correlation plugin job. It creates a plugin based on the given data and runs it. Finally, it processes the result and returns a response.
:param user: the user who requested the job :type user: UserData :param data: specifies the value and the plugin to use :type data: CorrelationPluginJobData :return: a response with the result of the correlation by the plugin :rtype: CorrelationResponse
run
async
run(db: AsyncSession, attribute: Attribute, correlation_threshold: int) -> CorrelationResponse
Static method to correlate the given value based on the misp_sql database and misp_api interface. :param value: to correlate :param value: string :return: relevant information about the correlation :rtype: CorrelationResponse
create_correlations
create_correlations(attributes: list[Attribute], events: list[Event], objects: list[Object], value_id: int) -> list[DefaultCorrelation]
Method to create DefaultCorrelation objects based on the given list of MispEventAttribute und list of AddEditGetEventDetails. For every attribute a correlation is created with any other attribute in the list (except itself). The MispEventAttribute at place i in the list has to be an attribute of the AddEditGetEventDetails at place i in the list of AddEditGetEventDetails to function properly.
:param attributes: list of MispEventAttribute to create correlations from :param events: list of the MispEvents the MispEventAttribute occurs in :param value_id: the id of the value for the correlation :return: a list of DefaultCorrelation
get_amount_of_possible_correlations
get_amount_of_possible_correlations(attributes: list[Attribute]) -> int
Method to calculate the amount of possible correlations for the given list of Attribute. The amount of possible correlations is the amount of attributes minus the amount of attributes which are in the same event. :param attributes: the attributes to calculate the amount of possible correlations for :type attributes: list[Attribute] :return: the amount of possible correlations :rtype: int
save_correlations
async
save_correlations(db: AsyncSession, attributes: list[Attribute], value: str) -> set[UUID]
Method to generate DefaultCorrelation objects from the given list of MispEventAttribute and save them in the database. All MispEventAttribute in the list have to be attributes which have the same value and are correlated with each other. :param attributes: the attributes to correlate with each other :type attributes: list[Attribute] :param value: on which the correlations are based :type value: str :return: a set of UUIDs representing the events the correlation are associated with :rtype: set[UUID]
ChangeThresholdData
Bases: BaseModel
Data to change the threshold.
ChangeThresholdResponse
Bases: BaseModel
Response for the change of the threshold.
CorrelationJobData
Bases: BaseModel
Data for a correlation plugin job.
CorrelationResponse
Bases: BaseModel
Response for the correlation of a value.
DatabaseChangedResponse
Bases: BaseModel
Response for jobs that only change the database.
InternPluginResult
Bases: BaseModel
Result of a plugin to process by the job.
TopCorrelationsResponse
Bases: BaseModel
Response for the top correlations job.
ENV_CORRELATION_PLUGIN_DIRECTORY
module-attribute
ENV_CORRELATION_PLUGIN_DIRECTORY = 'CORRELATION_PLUGIN_DIRECTORY'
The name of the environment variable that configures the directory where correlation plugins are loaded from.
PLUGIN_DEFAULT_DIRECTORY
module-attribute
PLUGIN_DEFAULT_DIRECTORY: str = ''
The default package used for correlation plugins.
CorrelationConfigData
Bases: BaseSettings
Encapsulates configuration for the correlation worker and its jobs.
plugin_directory
class-attribute
instance-attribute
plugin_directory: str = Field(PLUGIN_DEFAULT_DIRECTORY, validation_alias=ENV_CORRELATION_PLUGIN_DIRECTORY)
The directory where the plugins are stored.
validate_plugin_module
classmethod
validate_plugin_module(value: str) -> str
Validates the plugin_directory. If the module is not valid or could not be found a default value is assigned. :param value: The plugin_directory value. :type value: str :return: The given or a default plugin directory.
top_correlations_job
async
top_correlations_job(ctx: WrappedContext[None], user: UserData) -> TopCorrelationsResponse
Method to get a list of all correlations with their occurrence in the database. The list is sorted decreasing by the occurrence. :param user: the user who requested the job :type user: UserData :return: TopCorrelationsResponse with the list and if the job was successful :rtype: TopCorrelationsResponse