Skip to content

MISP Database Adapter

decode_json_response

decode_json_response(response: Response) -> dict

Decodes the JSON response from the MISP API

:param response: response from the MISP API :type response: Response :return: returns the decoded JSON response :rtype: dict

translate_dictionary

translate_dictionary(dictionary: dict, translation_dict: dict[str, str]) -> dict

translates the keys of a dictionary according to the translation dictionary

:param dictionary: dictionary to be translated :type dictionary: dict :param translation_dict: translation dictionary including the old key as the key and the new key as the value :type translation_dict: dict[str, str] :return: returns the translated dictionary :rtype: dict

MispAPI

This class is used to communicate with the MISP API.

it encapsulates the communication with the MISP API and provides methods to retrieve and send data. the data is parsed and validated by the MispAPIParser and MispAPIUtils classes, and returns the data as MMISP dataclasses.

__filter_rule_to_parameter

__filter_rule_to_parameter(filter_rules: str) -> dict[str, list[str]]

This method is used to convert the given filter rules string to a dictionary for the API. :param filter_rules: the filter rules to convert :type filter_rules: dict :return: returns the filter rules as a parameter for the API :rtype: dict

__get_session async

__get_session(server: Server | None = None) -> Session

This method is used to get the session for the given server_id if a session for the given server_id already exists, it returns the existing session, otherwise it sets up a new session and returns it.

:param server: server to get the session for, if no server is given, the own API is used :type server: Server :return: returns a session to the specified server :rtype: Session

__get_url

__get_url(path: str, server: Server | None = None) -> str

This method is used to get the url for the given server, adding the given path to the url.

if no server is given, it uses the default url from the config, otherwise it uses the url of the given server.

:param path: path to add to the url :type path: str :param server: remote server to get the url for :type server: Server :return: returns the url for the given server with the path added :rtype: str

__join_path staticmethod

__join_path(url: str, path: str) -> str

This method is used to join the given path to the given url. it checks if the path starts with a slash, if it does not, it also adds a slash to the url.

:param url: url to join the path to :type url: str :param path: path to join to the url :type path: str :return: returns the url with the path added :rtype: str

__send_request async

__send_request(request: PreparedRequest, server: Server | None = None, **kwargs) -> dict

This method is used to send the given request and return the response.

:param request: the request to send :type request: PreparedRequest :param kwargs: keyword arguments :type kwargs: dict[str, Any] :return: returns the response of the request :rtype: dict

__setup_api_session

__setup_api_session() -> Session

This method is used to set up the session for the API.

:return: returns the session that was set up :rtype: Session

__setup_remote_api_session async

__setup_remote_api_session(server_id: int) -> Session

This method is used to set up the session for the remote API.

:param server_id: server id of the remote server to set up the session for :type server_id: int :return: returns the session to the specified server that was set up :rtype: Session

attach_attribute_tag async

attach_attribute_tag(attribute_id: int, tag_id: int, local: bool, server: Server | None = None) -> bool

Attaches a tag to an attribute

:param attribute_id: The ID of the attribute. :type attribute_id: int :param tag_id: The ID of the tag. :type tag_id: int :param local: If the tag is to be attached only locally. :type local: bool :param server: the server to attach the tag to the attribute on, if no server is given, the own API is used :type server: Server :return: true if the attachment was successful :rtype: bool

attach_event_tag async

attach_event_tag(event_id: int, tag_id: int, local: bool, server: Server | None = None) -> bool

Attaches a tag to an event

:param event_id: The ID of the event. :type event_id: int :param tag_id: The ID of the tag. :type tag_id: int :param local: If the tag is to be attached only locally. :type local: bool :param server: the server to attach the tag to the event on, if no server is given, the own API is used :type server: Server :return: :rtype: bool

create_attribute async

create_attribute(attribute: AddAttributeBody, server: Server | None = None) -> int

creates the given attribute on the server

:param attribute: contains the required attributes to creat an attribute :type attribute: AddAttributeBody :param server: the server to create the attribute on, if no server is given, the own API is used :type server: Server :return: The attribute id if the creation was successful. -1 otherwise. :rtype: int

create_tag async

create_tag(tag: TagCreateBody, server: Server | None = None) -> int

Creates the given tag on the server :param tag: The tag to create. :type tag: TagCreateBody :param server: The server to create the tag on. If no server is given, the own MMISP-API Server is used. :type server: Server :return: the id of the created tag :rtype: int

edit_server async

edit_server(server_body: EditServer, server_id: int) -> AddServerResponse

Edits the given server on the local server. :param server_body: the server to edit :type server_body: EditServer :return: returns the edited server :rtype: Server

get_attribute async

get_attribute(attribute_id: int, server: Server | None = None) -> GetAttributeAttributes

Returns the attribute with the given attribute_id.

:param attribute_id: the id of the attribute to get :type attribute_id: int :param server: the server to get the attribute from, if no server is given, the own API is used :type server: Server :return: returns the attribute with the given attribute_id :rtype: GetAttributeAttributes

get_custom_clusters async

get_custom_clusters(conditions: GalaxyClusterSearchBody, server: Server | None = None) -> list[SearchGalaxyClusterGalaxyClustersDetails]

Returns all custom clusters that match the given conditions from the given server. the limit is set as a constant in the class, if the amount of clusters is higher, the method will return only the first n clusters.

:param conditions: the conditions to filter the clusters :type conditions: JsonType :param server: the server to get the event from, if no server is given, the own API is used :type server: Server :return: returns all custom clusters that match the given conditions from the given server :rtype: list[SearchGalaxyClusterGalaxyClustersDetails]

get_event async

get_event(event_id: int | UUID, server: Server | None = None) -> AddEditGetEventDetails

Returns the event with the given event_id from the given server, the own API is used if no server is given.

:param event_id: the id of the event to get :type event_id: int :param server: the server to get the event from, if no server is given, the own API is used :type server: Server :return: returns the event with the given event_id from the given server :rtype: AddEditGetEventDetails

get_event_attributes async

get_event_attributes(event_id: int, server: Server | None = None) -> list[SearchAttributesAttributesDetails]

Returns all attribute object of the given event, represented by given event_id.

:param event_id: of the event :type event_id: int :param server: the server to get the attribute from, if no server is given, the own API is used :type server: Server :return: a list of all attributes :rtype: list[SearchAttributesAttributesDetails]

get_galaxy async

get_galaxy(galaxy_id: int | str | UUID, server: Server | None = None) -> GetGalaxyResponse

Returns the galaxy with the given galaxy_id. :param galaxy_id: id or uuid of the galaxy to get :type galaxy_id: int | str :param server: the server to get the event from, if no server is given, the own API is used :type server: Server :return: returns the requested galaxy :rtype: GetGalaxyResponse

get_galaxy_cluster async

get_galaxy_cluster(cluster_id: int | str, server: Server | None = None) -> GetGalaxyClusterResponse

Returns the galaxy cluster with the given cluster_id from the given server.

:param cluster_id: the id of the cluster to get :type cluster_id: int :param server: the server to get the event from, if no server is given, the own API is used :type server: Server :return: returns the requested galaxy cluster with the given id from the given server :rtype: GetGalaxyClusterResponse

get_minimal_events async

get_minimal_events(ignore_filter_rules: bool, server: Server | None = None) -> list[MispMinimalEvent]

Returns all minimal events from the given server. if ignore_filter_rules is set to false, it uses the filter rules from the given server to filter the events. the limit is set as a constant in the class, if the amount of events is higher, the method will return only the first n events.

:param ignore_filter_rules: boolean to ignore the filter rules :type ignore_filter_rules: bool :param server: the server to get the event from, if no server is given, the own API is used :type server: Server :return: return all minimal events from the given server, capped by the limit :rtype: list[MispMinimalEvent]

get_object async

get_object(object_id: int, server: Server | None = None) -> ObjectWithAttributesResponse

Returns the object with the given object_id.

:param object_id: id of the object :type object_id: int :param server: the server to get the object from, if no server is given, the own API is used :type server: Server :return: The object :rtype: ObjectWithAttributesResponse

get_organisation async

get_organisation(organisation_id: int | str | UUID, server: Server | None = None) -> GetOrganisationElement

Returns the organisation with the given organisation_id.

:param organisation_id: id of the organisation. Can be an int or an uuid. :type organisation_id: int | str :param server: the server to get the organisation from, if no server is given, the own API is used :type server: Server :return: returns the organisation with the given organisation_id :rtype: dict

get_proposals async

get_proposals(server: Server | None = None) -> list[ShadowAttribute]

Returns all shadow_attributes (proposals) from the given server from the last 90 days.

:param server: the server to get the proposals from, if no server is given, the own API is used :type server: Server :return: returns all proposals from the given server from the last 90 days :rtype: list[ShadowAttribute]

get_server_version async

get_server_version(server: Server | None = None) -> ServerVersion

Returns the version of the given server

:param server: the server to get the event from, if no server is given, the own API is used :type server: Server :return: returns the version of the given server :rtype: ServerVersion

get_sharing_group async

get_sharing_group(sharing_group_id: int | str, server: Server | None = None) -> ViewUpdateSharingGroupLegacyResponse

Returns the sharing group with the given id.

:param sharing_group_id: id or uuid of the sharing group to get from the API :type sharing_group_id: int | str :param server: the server to get the sharing group from, if no server is given, the own API is used :type server: Server :return: returns the sharing group that got requested :rtype: ViewUpdateSharingGroupLegacyResponse

get_sharing_groups async

get_sharing_groups(server: Server | None = None) -> list[GetAllSharingGroupsResponseResponseItem]

Returns all sharing groups from the given server, if no server is given, the own API is used.

:param server: the server to get the sharing groups from, if no server is given, the own API is used :type server: Server :return: returns all sharing groups from the given server :rtype: list[GetAllSharingGroupsResponseResponseItem]

get_sightings_from_event async

get_sightings_from_event(event_id: int, server: Server | None = None) -> list[SightingAttributesResponse]

Returns all sightings from the given event from the given server.

:param event_id: id of the event to get the sightings from :type event_id: id :param server: the server to get the event from, if no server is given, the own API is used :type server: Server :return: returns all sightings from the given event from the given server :rtype: list[SightingAttributesResponse]

get_user async

get_user(user_id: int | None, server: Server | None = None) -> MispUser

Returns the user with the given user_id.

:param user_id: id of the user :type user_id: int :param server: the server to get the user from, if no server is given, the own API is used :type server: Server :return: returns the user with the given user_id :rtype: MispUser

modify_attribute_tag_relationship async

modify_attribute_tag_relationship(attribute_tag_id: int, relationship_type: str, server: Server | None = None) -> bool

Modifies the relationship of the given tag to the given attribute Endpoint documented at: https://www.misp-project.org/2022/10/10/MISP.2.4.164.released.html/

:param attribute_tag_id: The ID of the attribute-tag assignment. :type attribute_tag_id: int :param relationship_type: The relationship type to set. :type relationship_type: str :param server: the server to modify the relationship on, if no server is given, the own API is used :type server: Server :return: returns true if the modification was successful :rtype: bool

modify_event_tag_relationship async

modify_event_tag_relationship(event_tag_id: int, relationship_type: str, server: Server | None = None) -> bool

Modifies the relationship of the given tag to the given event Endpoint documented at: https://www.misp-project.org/2022/10/10/MISP.2.4.164.released.html/

:param event_tag_id: The ID of the event-tag assignment. :type event_tag_id: int :param relationship_type: The relationship type to set. :type relationship_type: str :param server: the server to modify the relationship on, if no server is given, the own API is used :type server: Server :return: returns true if the modification was successful :rtype: bool

save_cluster async

save_cluster(cluster: GetGalaxyClusterResponse | SearchGalaxyClusterGalaxyClustersDetails, server: Server | None = None) -> bool

Saves the given cluster on the given server.

:param cluster: the cluster to save :type cluster: GetGalaxyClusterResponse :param server: the server to save the cluster on, if no server is given, the own API is used :type server: Server :return: returns true if the saving was successful :rtype: bool

save_event async

save_event(event: AddEditGetEventDetails, server: Server | None = None) -> bool

Saves the given event on the given server.

:param event: the event to save :type event: AddEditGetEventDetails :param server: the server to save the event on, if no server is given, the own API is used :type server: Server :return: returns true if the saving was successful :rtype: bool

save_organisation async

save_organisation(org: AddOrganisation) -> GetOrganisationElement

Saves the given organisation on the local server.

:param org: the organisation to save :type org: AddOrganisation :return: returns the saved organisation :rtype: GetOrganisationElement

save_proposal async

save_proposal(event: AddEditGetEventDetails, server: Server | None = None) -> bool

Saves the given proposal on the given server.

:param event: the event to save the proposal for :type event: AddEditGetEventDetails :param server: the server to save the proposal on, if no server is given, the own API is used :type server: Server :return: returns true if the saving was successful :rtype: bool

save_sighting async

save_sighting(sighting: SightingAttributesResponse, server: Server | None = None) -> bool

Saves the given sighting on the given server.

:param sighting: the sighting to save :type sighting: SightingAttributesResponse :param server: the server to save the sighting on, if no server is given, the own API is used :type server: Server :return: returns true if the saving was successful :rtype: bool

update_cluster async

update_cluster(cluster: PutGalaxyClusterRequest, server: Server | None = None) -> bool

Updates the given cluster on the given server.

:param cluster: the cluster to update :type cluster: PutGalaxyClusterRequest :param server: the server to update the cluster on, if no server is given, the own API is used :type server: Server :return: returns true if the update was successful :rtype: bool

update_event async

update_event(event: AddEditGetEventDetails, server: Server | None = None) -> bool

Updates the given event on the given server.

:param event: the event to update :type event: AddEditGetEventDetails :param server: the server to update the event on, if no server is given, the own API is used :type server: Server :return: returns true if the update was successful :rtype: bool

helper module to interact with misp database

add_correlation_value async

add_correlation_value(session: AsyncSession, value: str) -> int

Adds a new value to correlation_values table or returns the id of the current entry with the same value. :param value: to add or get id of in the correlation_values table :type value: str :return: the id of the value in the correlation_values table :rtype: int

add_correlations async

add_correlations(session: AsyncSession, correlations: list[DefaultCorrelation]) -> bool

Adds a list of correlations to the database. Returns True if at least one correlation was added, False otherwise. Doesn’t add correlations that are already in the database. :param correlations: list of correlations to add :type correlations: list[DefaultCorrelation] :return: true if at least one correlation was added, false otherwise :rtype: bool

add_over_correlating_value async

add_over_correlating_value(session: AsyncSession, value: str, count: int) -> bool

Adds a new value to over_correlating_values table or updates the current entry with the same value. Returns True if value was added or updated, False otherwise. :param value: add or update :type value: str :param count: occurrence of value :type count: int :return: True if value was added or updated, False otherwise :rtype: bool

delete_correlations async

delete_correlations(session: AsyncSession, value: str) -> bool

Deletes all correlations with value from database. Returns True if value was in database, False otherwise. :param value: to delete the correlations of :type value: str :return: True if value was in database, False otherwise :rtype: bool

delete_over_correlating_value async

delete_over_correlating_value(session: AsyncSession, value: str) -> bool

Deletes value from over_correlating_values table. Returns True if value was in table, False otherwise. :param value: row to delete :type value: str :return: true if value was in table, false otherwise :rtype: bool

event_id_exists async

event_id_exists(session: AsyncSession, event_id: int | str) -> bool

Checks if the event with the given ID exists in the database.

:param session: The database session. :type session: AsyncSession :param event_id: The ID or UUID of the event to check. :type event_id: int | str :return: True if the event exists, False otherwise. :rtype: bool :raises ValueError: If the event ID is not a valid integer or UUID.

filter_blocked_clusters async

filter_blocked_clusters(session: AsyncSession, clusters: list[SearchGalaxyClusterGalaxyClustersDetails]) -> list[SearchGalaxyClusterGalaxyClustersDetails]

Get all blocked clusters from database and remove them from clusters list. :param clusters: list of clusters to check :type clusters: list[GetGalaxyClusterResponse] :return: list without blocked clusters :rtype: list[MispGalaxyCluster]

filter_blocked_events async

filter_blocked_events(session: AsyncSession, events: list[MispMinimalEvent], use_event_blocklist: bool, use_org_blocklist: bool) -> list[MispMinimalEvent]

Clear the list from events that are listed as blocked in the misp database. Also, if the org is blocked, the events in the org are removed from the list. Return the list without the blocked events. :param events: list to remove blocked events from :type events: list[AddEditGetEventDetails] :param use_event_blocklist: if True, blocked events are removed from the list :type use_event_blocklist: bool :param use_org_blocklist: if True, the events from blocked orgs are removed from the list :type use_org_blocklist: bool :return: the list without the blocked events :rtype: list[MispEvent]

galaxy_cluster_id_exists async

galaxy_cluster_id_exists(session: AsyncSession, cluster_id: int | UUID) -> bool

Checks if the galaxy cluster with the given ID exists in the database.

:param session: The database session. :type session: AsyncSession :param cluster_id: The ID of the galaxy cluster to check. :type cluster_id: int | str :return: True if the galaxy cluster exists, False otherwise. :rtype: bool :raises ValueError: If the galaxy cluster ID is not a valid integer or UUID.

galaxy_id_exists async

galaxy_id_exists(session: AsyncSession, galaxy_id: int | UUID) -> bool

Checks if the galaxy with the given ID exists in the database.

:param session: The database session. :type session: AsyncSession :param galaxy_id: The ID of the galaxy to check. :type galaxy_id: int | str :return: True if the galaxy exists, False otherwise. :rtype: bool :raises ValueError: If the galaxy ID is not a valid integer or UUID.

get_api_authkey async

get_api_authkey(session: AsyncSession, server_id: int) -> str | None

Method to get the API authentication key of the server with the given ID. :param server_id: The ID of the server. :type server_id: int :return: The API authentication key of the server. :rtype: str

get_attribute_tag async

get_attribute_tag(session: AsyncSession, attribute_tag_id: int) -> AttributeTag | None

Method to get the AttributeTag object with the given ID.

:param attribute_tag_id: The ID of the attribute-tag object. :type attribute_tag_id: int :return: The AttributeTag object or None if it doesn’t exist. :rtype: AttributeTag | None

get_attribute_tag_id async

get_attribute_tag_id(session: AsyncSession, attribute_id: int, tag_id: int) -> int

Method to get the ID of the attribute-tag object associated with the given attribute-ID and tag-ID.

:param attribute_id: The ID of the attribute. :type attribute_id: int :param tag_id: The ID of the tag. :type tag_id: int :return: The ID of the attribute-tag object or -1 if the object does not exist. :rtype: int

get_attributes_with_same_value async

get_attributes_with_same_value(session: AsyncSession, value: str) -> list[Attribute]

Method to get all attributes with the same value from database. :param value: to get attributes with :type value: str :return: list of attributes with the same value :rtype: list[Attribute]

get_event_tag_id async

get_event_tag_id(session: AsyncSession, event_id: int, tag_id: int) -> int

Method to get the ID of the event-tag object associated with the given event-ID and tag-ID.

:param event_id: The ID of the event. :type event_id: int :param tag_id: The ID of the tag. :type tag_id: int :return: The ID of the event-tag object or -1 if the object does not exist. :rtype: int

get_excluded_correlations async

get_excluded_correlations(session: AsyncSession) -> Sequence[str]

Method to get all values from correlation_exclusions table. :return: all values from correlation_exclusions table :rtype: list[str]

get_number_of_correlations async

get_number_of_correlations(session: AsyncSession, value: str, only_over_correlating_table: bool) -> int

Returns the number of correlations of value in the database. If only_over_correlating_table is True, only the value in the over_correlating_values table is returned. Else the number of correlations in the default_correlations table is returned Attention: It is assumed that the value is in the over_correlating_values table if only_over_correlating_table is True. :param value: to get number of correlations of :type value: str :param only_over_correlating_table: if True, only the value in the over_correlating_values table is returned :type only_over_correlating_table: bool :return: number of correlations of value in the database

get_org_by_name async

get_org_by_name(session: AsyncSession, org_name: str) -> Organisation | None

Get organisation by name from database.

:param session: The database session. :type session: AsyncSession :param org_name: The name of the organisation to retrieve. :type org_name: str :return: The organisation object if found, None otherwise. :rtype: Organisation | None

get_over_correlating_values async

get_over_correlating_values(session: AsyncSession) -> list[tuple[str, int]]

Method to get all values from over_correlating_values table with their occurrence. :return: all values from over_correlating_values table with their occurrence :rtype: list[tuple[str, int]]

get_post async

get_post(session: AsyncSession, post_id: int) -> Post

Method to get a post from database. :param post_id: the id of the post to get :type post_id: int :return: the post with the given id :rtype: MispPost

get_server async

get_server(session: AsyncSession, server_id: int) -> Server | None

Returns the server with the given server_id or none if it doesn’t exist.

:param server_id: id of the server :type server_id: int :return: returns the server that got requested or None :rtype: Server

get_values_with_correlation async

get_values_with_correlation(session: AsyncSession) -> list[str]

” Method to get all values from correlation_values table. :return: all values from correlation_values table :rtype: list[str]

is_excluded_correlation async

is_excluded_correlation(session: AsyncSession, value: str) -> bool

Checks if value is in correlation_exclusions table. :param value: to check :type value: str :return: True if value is in correlation_exclusions table, False otherwise :rtype: bool

is_over_correlating_value async

is_over_correlating_value(session: AsyncSession, value: str) -> bool

Checks if value is in over_correlating_values table. Doesn’t check if value has more correlations in the database than the current threshold. :param value: to check :type value: str :return: True if value is in over_correlating_values table, False otherwise :rtype: bool

set_last_pushed_id async

set_last_pushed_id(session: AsyncSession, server_id: int, last_pushed_id: int) -> None

Set the last pushed ID for a server in the database.

:param session: The database session. :type session: AsyncSession :param server_id: The ID of the server. :type server_id: int :param last_pushed_id: The last pushed ID to set. :type last_pushed_id: int

sighting_id_exists async

sighting_id_exists(session: AsyncSession, sighting_id: int | str) -> bool

Checks if the sighting with the given ID exists in the database. :param session: The database session. :type session: AsyncSession :param sighting_id: The ID of the sighting to check. :type sighting_id: int | str :return: True if the sighting exists, False otherwise.