Skip to content

Correlation

__regenerate_correlation_values async

__regenerate_correlation_values(session: AsyncSession, correlation_threshold: int) -> bool

Method to regenerate the amount of correlations for the values with correlations. :return: if the database was changed :rtype: bool

__regenerate_over_correlating async

__regenerate_over_correlating(session: AsyncSession, correlation_threshold: int) -> bool

Method to regenerate the amount of correlations for the over correlating values. :return: if the database was changed :rtype: bool

regenerate_occurrences_job async

regenerate_occurrences_job(ctx: WrappedContext[None], user: UserData) -> DatabaseChangedResponse

Method to regenerate the occurrences of the correlations in the database. Over correlating values and values with correlations are checked. :param user: the user who requested the job :type user: UserData :return: if the job was successful and if the database was changed :rtype: DatabaseChangedResponse

clean_excluded_correlations_job async

clean_excluded_correlations_job(ctx: WrappedContext[None], user: UserData) -> DatabaseChangedResponse

Task to clean the excluded correlations from the correlations of the MISP database. For every excluded value the correlations are removed. :param user: the user who requested the job :type user: UserData :return: if the job was successful and if the database was changed :rtype: DatabaseChangedResponse

__process_result async

__process_result(session: AsyncSession, plugin_name: str, value: str, result: InternPluginResult | None) -> CorrelationResponse

Processes the result of the plugin. :param result: the result of the plugin :type result: InternPluginResult :return: a response with the result of the plugin :rtype: CorrelationResponse :raises: PluginExecutionException: If the result of the plugin is invalid.

correlation_job async

correlation_job(ctx: WrappedContext[None], user: UserData, data: CorrelationJobData) -> CorrelationResponse

Method to execute a correlation plugin job. It creates a plugin based on the given data and runs it. Finally, it processes the result and returns a response.

:param user: the user who requested the job :type user: UserData :param data: specifies the value and the plugin to use :type data: CorrelationPluginJobData :return: a response with the result of the correlation by the plugin :rtype: CorrelationResponse

run async

run(db: AsyncSession, attribute: Attribute, correlation_threshold: int) -> CorrelationResponse

Static method to correlate the given value based on the misp_sql database and misp_api interface. :param value: to correlate :param value: string :return: relevant information about the correlation :rtype: CorrelationResponse

create_correlations

create_correlations(attributes: list[Attribute], events: list[Event], objects: list[Object], value_id: int) -> list[DefaultCorrelation]

Method to create DefaultCorrelation objects based on the given list of MispEventAttribute und list of AddEditGetEventDetails. For every attribute a correlation is created with any other attribute in the list (except itself). The MispEventAttribute at place i in the list has to be an attribute of the AddEditGetEventDetails at place i in the list of AddEditGetEventDetails to function properly.

:param attributes: list of MispEventAttribute to create correlations from :param events: list of the MispEvents the MispEventAttribute occurs in :param value_id: the id of the value for the correlation :return: a list of DefaultCorrelation

get_amount_of_possible_correlations

get_amount_of_possible_correlations(attributes: list[Attribute]) -> int

Method to calculate the amount of possible correlations for the given list of Attribute. The amount of possible correlations is the amount of attributes minus the amount of attributes which are in the same event. :param attributes: the attributes to calculate the amount of possible correlations for :type attributes: list[Attribute] :return: the amount of possible correlations :rtype: int

save_correlations async

save_correlations(db: AsyncSession, attributes: list[Attribute], value: str) -> set[UUID]

Method to generate DefaultCorrelation objects from the given list of MispEventAttribute and save them in the database. All MispEventAttribute in the list have to be attributes which have the same value and are correlated with each other. :param attributes: the attributes to correlate with each other :type attributes: list[Attribute] :param value: on which the correlations are based :type value: str :return: a set of UUIDs representing the events the correlation are associated with :rtype: set[UUID]

ChangeThresholdData

Bases: BaseModel

Data to change the threshold.

ChangeThresholdResponse

Bases: BaseModel

Response for the change of the threshold.

CorrelationJobData

Bases: BaseModel

Data for a correlation plugin job.

CorrelationResponse

Bases: BaseModel

Response for the correlation of a value.

DatabaseChangedResponse

Bases: BaseModel

Response for jobs that only change the database.

InternPluginResult

Bases: BaseModel

Result of a plugin to process by the job.

TopCorrelationsResponse

Bases: BaseModel

Response for the top correlations job.

ENV_CORRELATION_PLUGIN_DIRECTORY module-attribute

ENV_CORRELATION_PLUGIN_DIRECTORY = 'CORRELATION_PLUGIN_DIRECTORY'

The name of the environment variable that configures the directory where correlation plugins are loaded from.

PLUGIN_DEFAULT_DIRECTORY module-attribute

PLUGIN_DEFAULT_DIRECTORY: str = ''

The default package used for correlation plugins.

CorrelationConfigData

Bases: BaseSettings

Encapsulates configuration for the correlation worker and its jobs.

plugin_directory class-attribute instance-attribute

plugin_directory: str = Field(PLUGIN_DEFAULT_DIRECTORY, validation_alias=ENV_CORRELATION_PLUGIN_DIRECTORY)

The directory where the plugins are stored.

validate_plugin_module classmethod

validate_plugin_module(value: str) -> str

Validates the plugin_directory. If the module is not valid or could not be found a default value is assigned. :param value: The plugin_directory value. :type value: str :return: The given or a default plugin directory.

top_correlations_job async

top_correlations_job(ctx: WrappedContext[None], user: UserData) -> TopCorrelationsResponse

Method to get a list of all correlations with their occurrence in the database. The list is sorted decreasing by the occurrence. :param user: the user who requested the job :type user: UserData :return: TopCorrelationsResponse with the list and if the job was successful :rtype: TopCorrelationsResponse