Skip to content

Enrichment

EnrichAttributeData

Bases: BaseModel

Encapsulates the necessary data to create an enrich-attribute job.

Source code in src/mmisp/worker/jobs/enrichment/job_data.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
class EnrichAttributeData(BaseModel):
    """
    Encapsulates the necessary data to create an enrich-attribute job.
    """

    model_config = ConfigDict()

    attribute_id: NonNegativeInt
    """The ID of the attribute to enrich."""
    enrichment_plugins: list[str]
    """The list of enrichment plugins to use for enrichment"""

attribute_id instance-attribute

The ID of the attribute to enrich.

enrichment_plugins instance-attribute

The list of enrichment plugins to use for enrichment

EnrichEventData

Bases: BaseModel

Encapsulates the data needed for an enrich-event job.

Source code in src/mmisp/worker/jobs/enrichment/job_data.py
17
18
19
20
21
22
23
24
25
26
27
class EnrichEventData(BaseModel):
    """
    Encapsulates the data needed for an enrich-event job.
    """

    model_config = ConfigDict()

    event_id: int
    """The ID of the event to enrich."""
    enrichment_plugins: list[str]
    """The list of enrichment plugins to use for enrichment"""

enrichment_plugins instance-attribute

The list of enrichment plugins to use for enrichment

event_id instance-attribute

The ID of the event to enrich.

EnrichEventResult

Bases: BaseModel

Encapsulates the result of an enrich-event job.

Contains the number of created attributes.

Source code in src/mmisp/worker/jobs/enrichment/job_data.py
30
31
32
33
34
35
36
37
38
class EnrichEventResult(BaseModel):
    """
    Encapsulates the result of an enrich-event job.

    Contains the number of created attributes.
    """

    created_attributes: NonNegativeInt = 0
    """The number of created attributes."""

created_attributes = 0 class-attribute instance-attribute

The number of created attributes.

enrich_attribute(db, misp_attribute, enrichment_plugins) async

Enriches the given event attribute with the specified plugins and returns the created attributes and tags.

:param misp_attribute: The attribute to enrich. :type misp_attribute: AttributeWithTagRelationship :param enrichment_plugins: The plugins to use for enriching the attribute. :type enrichment_plugins: list[str] :return: The created Attributes and Tags. :rtype: EnrichAttributeData

Source code in src/mmisp/worker/jobs/enrichment/enrich_attribute_job.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
async def enrich_attribute(
    db: AsyncSession, misp_attribute: Attribute, enrichment_plugins: list[str]
) -> EnrichAttributeResult:
    """
    Enriches the given event attribute with the specified plugins and returns the created attributes and tags.

    :param misp_attribute: The attribute to enrich.
    :type misp_attribute: AttributeWithTagRelationship
    :param enrichment_plugins: The plugins to use for enriching the attribute.
    :type enrichment_plugins: list[str]
    :return: The created Attributes and Tags.
    :rtype: EnrichAttributeData
    """

    result: EnrichAttributeResult = EnrichAttributeResult()
    for plugin_name in enrichment_plugins:
        if not factory.is_plugin_registered(PluginType.ENRICHMENT, plugin_name):
            _logger.warning(f"Plugin '{plugin_name}' is not registered. Cannot be used for enrichment.")
            continue

        plugin: EnrichmentPlugin = factory.get_plugin(PluginType.ENRICHMENT, plugin_name)

        # Skip Plugins that are not compatible with the attribute.
        if misp_attribute.type not in plugin.ATTRIBUTE_TYPES_INPUT:
            _logger.info(
                f"Plugin {plugin_name} is not compatible with attribute type {misp_attribute.type}. "
                f"Plugin execution will be skipped."
            )
            continue

        # Execute Plugin and save result
        plugin_result: EnrichAttributeResult
        try:
            plugin_result = await plugin.run(db, misp_attribute)
        except Exception as exception:
            _logger.exception(f"Execution of plugin '{plugin_name}' failed. {exception}")
            continue

        if plugin_result:
            result.append(plugin_result)

    return result

enrich_attribute_job(ctx, user_data, data) async

Provides an implementation of the enrich-attribute job.

Takes a Misp event-attribute as input and runs specified plugins to enrich the attribute.

Parameters:

Name Type Description Default
user_data UserData

The user who created the job. (not used)

required
data EnrichAttributeData

The data needed for the enrichment process.

required

Returns: The created Attributes and Tags.

Source code in src/mmisp/worker/jobs/enrichment/enrich_attribute_job.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@queue.task()
@add_ajob_db_log
async def enrich_attribute_job(
    ctx: WrappedContext[None], user_data: UserData, data: EnrichAttributeData
) -> EnrichAttributeResult:
    """
    Provides an implementation of the enrich-attribute job.

    Takes a Misp event-attribute as input and runs specified plugins to enrich the attribute.

    Args:
      user_data: The user who created the job. (not used)
      data: The data needed for the enrichment process.
    Returns:
        The created Attributes and Tags.
    """
    assert sessionmanager is not None
    async with sessionmanager.session() as db:
        query = select(Attribute).filter(Attribute.id == data.attribute_id)
        attribute = (await db.execute(query)).scalars().one_or_none()

        if attribute is None:
            return EnrichAttributeResult()

        return await enrich_attribute(db, attribute, data.enrichment_plugins)

ENV_ENRICHMENT_PLUGIN_DIRECTORY = 'ENRICHMENT_PLUGIN_DIRECTORY' module-attribute

The name of the environment variable that configures the directory where enrichment plugins are loaded from.

EnrichmentConfigData

Bases: BaseSettings

Encapsulates configuration for the enrichment worker and its jobs.

Source code in src/mmisp/worker/jobs/enrichment/enrichment_config_data.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class EnrichmentConfigData(BaseSettings):
    """
    Encapsulates configuration for the enrichment worker and its jobs.
    """

    plugin_directory: str = Field(_PLUGIN_DEFAULT_DIRECTORY, validation_alias=ENV_ENRICHMENT_PLUGIN_DIRECTORY)
    """The directory where the plugins are stored."""

    @field_validator("plugin_directory")
    @classmethod
    @classmethod
    def validate_plugin_module(cls: Type["EnrichmentConfigData"], value: str) -> str:
        """
        Validates the plugin_directory.
        If the module is not valid or could not be found a default value is assigned.
        :param value: The plugin_directory value.
        :type value: str
        :return: The given or a default plugin directory.
        """

        plugin_module: str = value.strip()

        if plugin_module:
            if os.path.isdir(plugin_module):
                return plugin_module
            else:
                _log.error(f"The given plugin directory {plugin_module} for enrichment plugins does not exist.")

        return _PLUGIN_DEFAULT_DIRECTORY

plugin_directory = Field(_PLUGIN_DEFAULT_DIRECTORY, validation_alias=ENV_ENRICHMENT_PLUGIN_DIRECTORY) class-attribute instance-attribute

The directory where the plugins are stored.

validate_plugin_module(value) classmethod

Validates the plugin_directory. If the module is not valid or could not be found a default value is assigned. :param value: The plugin_directory value. :type value: str :return: The given or a default plugin directory.

Source code in src/mmisp/worker/jobs/enrichment/enrichment_config_data.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@field_validator("plugin_directory")
@classmethod
@classmethod
def validate_plugin_module(cls: Type["EnrichmentConfigData"], value: str) -> str:
    """
    Validates the plugin_directory.
    If the module is not valid or could not be found a default value is assigned.
    :param value: The plugin_directory value.
    :type value: str
    :return: The given or a default plugin directory.
    """

    plugin_module: str = value.strip()

    if plugin_module:
        if os.path.isdir(plugin_module):
            return plugin_module
        else:
            _log.error(f"The given plugin directory {plugin_module} for enrichment plugins does not exist.")

    return _PLUGIN_DEFAULT_DIRECTORY

enrich_event_job(ctx, user_data, data) async

Encapsulates a Job enriching a given MISP Event.

Job fetches MISP Attributes from a given Event and executes the specified enrichment plugins for each of these attributes. Newly created Attributes and Tags are attached to the Event in the MISP-Database.

:param user_data: The user who created the job. (not used) :type user_data: UserData :param data: The event id and enrichment plugins. :return: The number of newly created attributes. :rtype: EnrichEventResult

Source code in src/mmisp/worker/jobs/enrichment/enrich_event_job.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@queue.task()
@add_ajob_db_log
async def enrich_event_job(ctx: WrappedContext[None], user_data: UserData, data: EnrichEventData) -> EnrichEventResult:
    """
    Encapsulates a Job enriching a given MISP Event.

    Job fetches MISP Attributes from a given Event and executes the specified enrichment plugins
    for each of these attributes.
    Newly created Attributes and Tags are attached to the Event in the MISP-Database.

    :param user_data: The user who created the job. (not used)
    :type user_data: UserData
    :param data: The event id and enrichment plugins.
    :return: The number of newly created attributes.
    :rtype: EnrichEventResult
    """
    assert sessionmanager is not None
    async with sessionmanager.session() as db:
        api: MispAPI = MispAPI(db)
        query = select(Attribute).filter(Attribute.event_id == data.event_id)
        res = await db.execute(query)
        attributes = res.scalars().all()

        print(f"enrich_event_job parsed_attributes: {len(attributes)}")

        created_attributes: int = 0
        for attribute in attributes:
            # Run plugins
            result: EnrichAttributeResult = await enrich_attribute(db, attribute, data.enrichment_plugins)
            print(f"enrich_event_job enrich_attribute_result: {result} for attribute: {attribute}")

            # Write created attributes to database
            for new_attribute in result.attributes:
                try:
                    await _create_attribute(db, api, new_attribute)
                    created_attributes += 1
                except HTTPException as http_exception:
                    _logger.exception(f"Could not create attribute with MISP-API. {http_exception}")
                    continue
                except APIException as api_exception:
                    raise JobException(f"Could not create attribute {new_attribute} with MISP-API: {api_exception}.")

            # Write created event tags to database
            for new_tag in result.event_tags:
                try:
                    await _write_event_tag(db, api, data.event_id, new_tag)
                except HTTPException as http_exception:
                    _logger.exception(f"Could not create event tag with MISP-API. {http_exception}")
                    continue
                except APIException as api_exception:
                    raise JobException(f"Could not create event tag {new_tag} with MISP-API: {api_exception}.")

        return EnrichEventResult(created_attributes=created_attributes)