Skip to content

Correlation

__regenerate_correlation_values(session, correlation_threshold) async

Method to regenerate the amount of correlations for the values with correlations. :return: if the database was changed :rtype: bool

Source code in src/mmisp/worker/jobs/correlation/regenerate_occurrences_job.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
async def __regenerate_correlation_values(session: AsyncSession, correlation_threshold: int) -> bool:
    """
    Method to regenerate the amount of correlations for the values with correlations.
    :return: if the database was changed
    :rtype: bool
    """
    changed: bool = False
    correlation_values: list[str] = await get_values_with_correlation(session)
    for value in correlation_values:
        query = select(Attribute.id).filter(Attribute.value == value).limit(1)  # type: ignore
        attribute_id = (await session.execute(query)).scalars().first()
        if attribute_id is None:
            continue

        count_correlations: int = await get_number_of_correlations(session, value, False)
        current_attributes: list[Attribute] = await get_attributes_with_same_value(session, value)
        count_possible_correlations: int = get_amount_of_possible_correlations(current_attributes)
        count_attributes: int = len(current_attributes)
        if count_attributes > correlation_threshold:
            await delete_correlations(session, value)
            await add_over_correlating_value(session, value, count_attributes)
            changed = True
        elif count_possible_correlations != count_correlations:
            await delete_correlations(session, value)
            job_data = CorrelationJobData(attribute_id=attribute_id)
            user_data = UserData(user_id=0)
            await correlation_job.run(user_data, job_data)
            changed = True
        elif count_possible_correlations == count_correlations == 0:
            await delete_correlations(session, value)
            changed = True
    return changed

__regenerate_over_correlating(session, correlation_threshold) async

Method to regenerate the amount of correlations for the over correlating values. :return: if the database was changed :rtype: bool

Source code in src/mmisp/worker/jobs/correlation/regenerate_occurrences_job.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
async def __regenerate_over_correlating(session: AsyncSession, correlation_threshold: int) -> bool:
    """
    Method to regenerate the amount of correlations for the over correlating values.
    :return: if the database was changed
    :rtype: bool
    """
    changed: bool = False
    over_correlating_values: list[tuple[str, int]] = await get_over_correlating_values(session)
    for entry in over_correlating_values:
        value: str = entry[0]
        count: int = entry[1]

        query = select(Attribute.id).filter(Attribute.value == value).limit(1)  # type: ignore
        attribute_id = (await session.execute(query)).scalars().first()
        if attribute_id is None:
            continue

        current_attributes: list[Attribute] = await get_attributes_with_same_value(session, value)
        count_attributes: int = len(current_attributes)

        if count_attributes != count and count_attributes > correlation_threshold:
            await add_over_correlating_value(session, value, count_attributes)
            changed = True
        elif count_attributes <= correlation_threshold:
            await delete_over_correlating_value(session, value)
            job_data = CorrelationJobData(attribute_id=attribute_id)
            user_data = UserData(user_id=0)
            await correlation_job.run(user_data, job_data)
            changed = True
    return changed

regenerate_occurrences_job(ctx, user) async

Method to regenerate the occurrences of the correlations in the database. Over correlating values and values with correlations are checked. :param user: the user who requested the job :type user: UserData :return: if the job was successful and if the database was changed :rtype: DatabaseChangedResponse

Source code in src/mmisp/worker/jobs/correlation/regenerate_occurrences_job.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@queue.task()
@add_ajob_db_log
async def regenerate_occurrences_job(ctx: WrappedContext[None], user: UserData) -> DatabaseChangedResponse:
    """
    Method to regenerate the occurrences of the correlations in the database.
    Over correlating values and values with correlations are checked.
    :param user: the user who requested the job
    :type user: UserData
    :return: if the job was successful and if the database was changed
    :rtype: DatabaseChangedResponse
    """
    assert sessionmanager is not None
    # TODO: get correlation_threshold from redis, or db, anything that can be changed
    correlation_threshold = 30
    async with sessionmanager.session() as session:
        first_changed: bool = await __regenerate_over_correlating(session, correlation_threshold)
        second_changed: bool = await __regenerate_correlation_values(session, correlation_threshold)
        changed: bool = first_changed or second_changed
        return DatabaseChangedResponse(success=True, database_changed=changed)

clean_excluded_correlations_job(ctx, user) async

Task to clean the excluded correlations from the correlations of the MISP database. For every excluded value the correlations are removed. :param user: the user who requested the job :type user: UserData :return: if the job was successful and if the database was changed :rtype: DatabaseChangedResponse

Source code in src/mmisp/worker/jobs/correlation/clean_excluded_correlations_job.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@queue.task()
async def clean_excluded_correlations_job(ctx: WrappedContext[None], user: UserData) -> DatabaseChangedResponse:
    """
    Task to clean the excluded correlations from the correlations of the MISP database.
    For every excluded value the correlations are removed.
    :param user: the user who requested the job
    :type user: UserData
    :return: if the job was successful and if the database was changed
    :rtype: DatabaseChangedResponse
    """
    assert sessionmanager is not None
    async with sessionmanager.session() as session:
        changed = False
        excluded = await misp_sql.get_excluded_correlations(session)
        for value in excluded:
            if await misp_sql.delete_correlations(session, value):
                changed = True
        return DatabaseChangedResponse(success=True, database_changed=changed)

__process_result(session, plugin_name, value, result) async

Processes the result of the plugin. :param result: the result of the plugin :type result: InternPluginResult :return: a response with the result of the plugin :rtype: CorrelationResponse :raises: PluginExecutionException: If the result of the plugin is invalid.

Source code in src/mmisp/worker/jobs/correlation/correlation_job.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
async def __process_result(
    session: AsyncSession, plugin_name: str, value: str, result: InternPluginResult | None
) -> CorrelationResponse:
    """
    Processes the result of the plugin.
    :param result: the result of the plugin
    :type result: InternPluginResult
    :return: a response with the result of the plugin
    :rtype: CorrelationResponse
    :raises: PluginExecutionException: If the result of the plugin is invalid.
    """
    if result is None:
        raise PluginExecutionException(message="The result of the plugin was None.")

    response: CorrelationResponse = CorrelationResponse(
        success=result.success,
        found_correlations=result.found_correlations,
        is_excluded_value=False,
        is_over_correlating_value=result.is_over_correlating_value,
        plugin_name=plugin_name,
    )
    if result.found_correlations and len(result.correlations) > 1:
        uuid_events: set[UUID] = await save_correlations(session, result.correlations, value)
        response.events = uuid_events
    elif len(result.correlations) <= 1:
        response.found_correlations = False
    return response

correlation_job(ctx, user, data) async

Method to execute a correlation plugin job. It creates a plugin based on the given data and runs it. Finally, it processes the result and returns a response.

:param user: the user who requested the job :type user: UserData :param data: specifies the value and the plugin to use :type data: CorrelationPluginJobData :return: a response with the result of the correlation by the plugin :rtype: CorrelationResponse

Source code in src/mmisp/worker/jobs/correlation/correlation_job.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@queue.task()
@add_ajob_db_log
async def correlation_job(ctx: WrappedContext[None], user: UserData, data: CorrelationJobData) -> CorrelationResponse:
    """
    Method to execute a correlation plugin job.
    It creates a plugin based on the given data and runs it.
    Finally, it processes the result and returns a response.

    :param user: the user who requested the job
    :type user: UserData
    :param data: specifies the value and the plugin to use
    :type data: CorrelationPluginJobData
    :return: a response with the result of the correlation by the plugin
    :rtype: CorrelationResponse
    """
    assert sessionmanager is not None
    async with sessionmanager.session() as db:
        query = select(Attribute).filter(Attribute.id == data.attribute_id)
        attribute = (await db.execute(query)).scalars().one_or_none()

        if attribute is None:
            return CorrelationResponse(
                success=False,
                found_correlations=False,
                is_excluded_value=False,
                is_over_correlating_value=False,
                plugin_name=data.correlation_plugin_name,
            )
        correlation_threshold: int = 20

        if await misp_sql.is_excluded_correlation(db, attribute.value):
            return CorrelationResponse(
                success=True,
                found_correlations=False,
                is_excluded_value=True,
                is_over_correlating_value=False,
                plugin_name=data.correlation_plugin_name,
            )
        try:
            plugin = factory.get_plugin(PluginType.CORRELATION, data.correlation_plugin_name)
        except PluginNotFound:
            raise PluginNotFound(message=PLUGIN_NAME_STRING + data.correlation_plugin_name + " was not found.")
        try:
            result: CorrelationResponse | InternPluginResult | None = await plugin.run(
                db, attribute, correlation_threshold
            )
        except PluginExecutionException:
            raise PluginExecutionException(
                message=PLUGIN_NAME_STRING
                + data.correlation_plugin_name
                + "and the value"
                + attribute.value
                + " was executed but an error occurred."
            )
        except Exception as exception:
            raise PluginExecutionException(
                message=PLUGIN_NAME_STRING
                + data.correlation_plugin_name
                + " and the value "
                + attribute.value
                + " was executed but the following error occurred: "
                + str(exception)
            )
        if isinstance(result, CorrelationResponse):
            return result

        response: CorrelationResponse = await __process_result(
            db, data.correlation_plugin_name, attribute.value, result
        )
        return response

run(db, attribute, correlation_threshold) async

Static method to correlate the given value based on the misp_sql database and misp_api interface. :param value: to correlate :param value: string :return: relevant information about the correlation :rtype: CorrelationResponse

Source code in src/mmisp/worker/jobs/correlation/plugins/simple_value.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
async def run(db: AsyncSession, attribute: Attribute, correlation_threshold: int) -> CorrelationResponse:
    """
    Static method to correlate the given value based on the misp_sql database and misp_api interface.
    :param value: to correlate
    :param value: string
    :return: relevant information about the correlation
    :rtype: CorrelationResponse
    """
    value = attribute.value
    if await misp_sql.is_excluded_correlation(db, value):
        return CorrelationResponse(
            success=True,
            found_correlations=False,
            is_excluded_value=True,
            is_over_correlating_value=False,
            plugin_name=None,
            events=None,
        )
    attributes: list[Attribute] = await misp_sql.get_attributes_with_same_value(db, value)
    count: int = len(attributes)
    if count > correlation_threshold:
        await misp_sql.delete_correlations(db, value)
        await misp_sql.add_over_correlating_value(db, value, count)
        return CorrelationResponse(
            success=True,
            found_correlations=True,
            is_excluded_value=False,
            is_over_correlating_value=True,
            plugin_name=None,
            events=None,
        )
    elif count > 1:
        uuid_events: set[UUID] = await save_correlations(db, attributes, value)
        return CorrelationResponse(
            success=True,
            found_correlations=(len(uuid_events) > 1),
            is_excluded_value=False,
            is_over_correlating_value=False,
            plugin_name=None,
            events=uuid_events,
        )

    return CorrelationResponse(
        success=True,
        found_correlations=False,
        is_excluded_value=False,
        is_over_correlating_value=False,
        plugin_name=NAME,
        events=None,
    )

create_correlations(attributes, events, objects, value_id)

Method to create DefaultCorrelation objects based on the given list of MispEventAttribute und list of AddEditGetEventDetails. For every attribute a correlation is created with any other attribute in the list (except itself). The MispEventAttribute at place i in the list has to be an attribute of the AddEditGetEventDetails at place i in the list of AddEditGetEventDetails to function properly.

:param attributes: list of MispEventAttribute to create correlations from :param events: list of the MispEvents the MispEventAttribute occurs in :param value_id: the id of the value for the correlation :return: a list of DefaultCorrelation

Source code in src/mmisp/worker/jobs/correlation/utility.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def create_correlations(
    attributes: list[Attribute],
    events: list[Event],
    objects: list[Object],
    value_id: int,
) -> list[DefaultCorrelation]:
    """
    Method to create DefaultCorrelation objects based on the given list of MispEventAttribute und list of
    AddEditGetEventDetails. For every attribute a correlation is created with any other attribute in the list
    (except itself). The MispEventAttribute at place i in the list has to be an attribute of the AddEditGetEventDetails
    at place i in the list of AddEditGetEventDetails to function properly.

    :param attributes: list of MispEventAttribute to create correlations from
    :param events: list of the MispEvents the MispEventAttribute occurs in
    :param value_id: the id of the value for the correlation
    :return: a list of DefaultCorrelation
    """
    correlations = [
        _create_correlation_from_attributes(a1, e1, o1, a2, e2, o2, value_id)
        for ((a1, e1, o1), (a2, e2, o2)) in combinations(zip(attributes, events, objects), 2)
        if a1.event_id != a2.event_id
    ]

    return correlations

get_amount_of_possible_correlations(attributes)

Method to calculate the amount of possible correlations for the given list of Attribute. The amount of possible correlations is the amount of attributes minus the amount of attributes which are in the same event. :param attributes: the attributes to calculate the amount of possible correlations for :type attributes: list[Attribute] :return: the amount of possible correlations :rtype: int

Source code in src/mmisp/worker/jobs/correlation/utility.py
124
125
126
127
128
129
130
131
132
133
134
def get_amount_of_possible_correlations(attributes: list[Attribute]) -> int:
    """
    Method to calculate the amount of possible correlations for the given list of Attribute.
    The amount of possible correlations is the amount of attributes minus the amount of attributes which are in the same
    event.
    :param attributes: the attributes to calculate the amount of possible correlations for
    :type attributes: list[Attribute]
    :return: the amount of possible correlations
    :rtype: int
    """
    return sum(1 for a1, a2 in combinations(attributes, 2) if a1.event_id != a2.event_id)

save_correlations(db, attributes, value) async

Method to generate DefaultCorrelation objects from the given list of MispEventAttribute and save them in the database. All MispEventAttribute in the list have to be attributes which have the same value and are correlated with each other. :param attributes: the attributes to correlate with each other :type attributes: list[Attribute] :param value: on which the correlations are based :type value: str :return: a set of UUIDs representing the events the correlation are associated with :rtype: set[UUID]

Source code in src/mmisp/worker/jobs/correlation/utility.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
async def save_correlations(db: AsyncSession, attributes: list[Attribute], value: str) -> set[UUID]:
    """
    Method to generate DefaultCorrelation objects from the given list of MispEventAttribute and save them in the
    database. All MispEventAttribute in the list have to be attributes which have the same value and are correlated
    with each other.
    :param attributes: the attributes to correlate with each other
    :type attributes: list[Attribute]
    :param value: on which the correlations are based
    :type value: str
    :return: a set of UUIDs representing the events the correlation are associated with
    :rtype: set[UUID]
    """

    value_id: int = await misp_sql.add_correlation_value(db, value)
    events: list[Event] = list()
    objects: list[Object] = list()

    for attribute in attributes:
        events.append(attribute.event)
        objects.append(attribute.mispobject)
    correlations = create_correlations(attributes, events, objects, value_id)
    await misp_sql.add_correlations(db, correlations)
    result: list[UUID] = list()
    for event in events:
        result.append(UUID(event.uuid))
    return set(result)

ChangeThresholdData

Bases: BaseModel

Data to change the threshold.

Source code in src/mmisp/worker/jobs/correlation/job_data.py
59
60
61
62
63
64
class ChangeThresholdData(BaseModel):
    """
    Data to change the threshold.
    """

    new_threshold: int

ChangeThresholdResponse

Bases: BaseModel

Response for the change of the threshold.

Source code in src/mmisp/worker/jobs/correlation/job_data.py
40
41
42
43
44
45
46
47
class ChangeThresholdResponse(BaseModel):
    """
    Response for the change of the threshold.
    """

    saved: bool
    valid_threshold: bool
    new_threshold: Optional[int] = None

CorrelationJobData

Bases: BaseModel

Data for a correlation plugin job.

Source code in src/mmisp/worker/jobs/correlation/job_data.py
50
51
52
53
54
55
56
class CorrelationJobData(BaseModel):
    """
    Data for a correlation plugin job.
    """

    attribute_id: int
    correlation_plugin_name: str = "ExactValueCorrelationPlugin"

CorrelationResponse

Bases: BaseModel

Response for the correlation of a value.

Source code in src/mmisp/worker/jobs/correlation/job_data.py
 9
10
11
12
13
14
15
16
17
18
19
class CorrelationResponse(BaseModel):
    """
    Response for the correlation of a value.
    """

    success: bool
    found_correlations: bool
    is_excluded_value: bool
    is_over_correlating_value: bool
    plugin_name: Optional[str] = None
    events: Optional[set[UUID]] = None

DatabaseChangedResponse

Bases: BaseModel

Response for jobs that only change the database.

Source code in src/mmisp/worker/jobs/correlation/job_data.py
31
32
33
34
35
36
37
class DatabaseChangedResponse(BaseModel):
    """
    Response for jobs that only change the database.
    """

    success: bool
    database_changed: bool

InternPluginResult

Bases: BaseModel

Result of a plugin to process by the job.

Source code in src/mmisp/worker/jobs/correlation/job_data.py
67
68
69
70
71
72
73
74
75
76
class InternPluginResult(BaseModel):
    """
    Result of a plugin to process by the job.
    """

    success: bool
    found_correlations: bool
    is_over_correlating_value: bool
    correlations: list[Attribute]
    model_config = ConfigDict(arbitrary_types_allowed=True)

TopCorrelationsResponse

Bases: BaseModel

Response for the top correlations job.

Source code in src/mmisp/worker/jobs/correlation/job_data.py
22
23
24
25
26
27
28
class TopCorrelationsResponse(BaseModel):
    """
    Response for the top correlations job.
    """

    success: bool
    top_correlations: list[tuple[str, int]]

ENV_CORRELATION_PLUGIN_DIRECTORY = 'CORRELATION_PLUGIN_DIRECTORY' module-attribute

The name of the environment variable that configures the directory where correlation plugins are loaded from.

PLUGIN_DEFAULT_DIRECTORY = '' module-attribute

The default package used for correlation plugins.

CorrelationConfigData

Bases: BaseSettings

Encapsulates configuration for the correlation worker and its jobs.

Source code in src/mmisp/worker/jobs/correlation/correlation_config_data.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class CorrelationConfigData(BaseSettings):
    """
    Encapsulates configuration for the correlation worker and its jobs.
    """

    plugin_directory: str = Field(PLUGIN_DEFAULT_DIRECTORY, validation_alias=ENV_CORRELATION_PLUGIN_DIRECTORY)
    """The directory where the plugins are stored."""

    @field_validator("plugin_directory")
    @classmethod
    @classmethod
    def validate_plugin_module(cls: Type["CorrelationConfigData"], value: str) -> str:
        """
        Validates the plugin_directory.
        If the module is not valid or could not be found a default value is assigned.
        :param value: The plugin_directory value.
        :type value: str
        :return: The given or a default plugin directory.
        """

        plugin_module: str = value.strip()

        if plugin_module:
            if os.path.isdir(plugin_module):
                return plugin_module
            else:
                _log.error(f"The given plugin directory '{plugin_module}' for correlation plugins does not exist.")

        return PLUGIN_DEFAULT_DIRECTORY

plugin_directory = Field(PLUGIN_DEFAULT_DIRECTORY, validation_alias=ENV_CORRELATION_PLUGIN_DIRECTORY) class-attribute instance-attribute

The directory where the plugins are stored.

validate_plugin_module(value) classmethod

Validates the plugin_directory. If the module is not valid or could not be found a default value is assigned. :param value: The plugin_directory value. :type value: str :return: The given or a default plugin directory.

Source code in src/mmisp/worker/jobs/correlation/correlation_config_data.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@field_validator("plugin_directory")
@classmethod
@classmethod
def validate_plugin_module(cls: Type["CorrelationConfigData"], value: str) -> str:
    """
    Validates the plugin_directory.
    If the module is not valid or could not be found a default value is assigned.
    :param value: The plugin_directory value.
    :type value: str
    :return: The given or a default plugin directory.
    """

    plugin_module: str = value.strip()

    if plugin_module:
        if os.path.isdir(plugin_module):
            return plugin_module
        else:
            _log.error(f"The given plugin directory '{plugin_module}' for correlation plugins does not exist.")

    return PLUGIN_DEFAULT_DIRECTORY

top_correlations_job(ctx, user) async

Method to get a list of all correlations with their occurrence in the database. The list is sorted decreasing by the occurrence. :param user: the user who requested the job :type user: UserData :return: TopCorrelationsResponse with the list and if the job was successful :rtype: TopCorrelationsResponse

Source code in src/mmisp/worker/jobs/correlation/top_correlations_job.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@queue.task()
@add_ajob_db_log
async def top_correlations_job(ctx: WrappedContext[None], user: UserData) -> TopCorrelationsResponse:
    """
    Method to get a list of all correlations with their occurrence in the database.
    The list is sorted decreasing by the occurrence.
    :param user: the user who requested the job
    :type user: UserData
    :return: TopCorrelationsResponse with the list and if the job was successful
    :rtype: TopCorrelationsResponse
    """
    assert sessionmanager is not None
    async with sessionmanager.session() as session:
        values: list[str] = await misp_sql.get_values_with_correlation(session)
        top_correlations: list[tuple[str, int]] = list()
        for value in values:
            count: int = await misp_sql.get_number_of_correlations(session, value, False)
            top_correlations.append((value, count))

        top_correlations = list(filter(lambda num: num[1] != 0, top_correlations))  # remove all 0s from the list
        top_correlations.sort(key=lambda a: a[1], reverse=True)  # sort by the second element of the tuple

        return TopCorrelationsResponse(success=True, top_correlations=top_correlations)