MISP Database Adapter
decode_json_response
decode_json_response(response: Response) -> dict
Decodes the JSON response from the MISP API
:param response: response from the MISP API :type response: Response :return: returns the decoded JSON response :rtype: dict
translate_dictionary
translate_dictionary(dictionary: dict, translation_dict: dict[str, str]) -> dict
translates the keys of a dictionary according to the translation dictionary
:param dictionary: dictionary to be translated :type dictionary: dict :param translation_dict: translation dictionary including the old key as the key and the new key as the value :type translation_dict: dict[str, str] :return: returns the translated dictionary :rtype: dict
MispAPI
This class is used to communicate with the MISP API.
it encapsulates the communication with the MISP API and provides methods to retrieve and send data. the data is parsed and validated by the MispAPIParser and MispAPIUtils classes, and returns the data as MMISP dataclasses.
__filter_rule_to_parameter
__filter_rule_to_parameter(filter_rules: str) -> dict[str, list[str]]
This method is used to convert the given filter rules string to a dictionary for the API. :param filter_rules: the filter rules to convert :type filter_rules: dict :return: returns the filter rules as a parameter for the API :rtype: dict
__get_session
async
__get_session(server: Server | None = None) -> Session
This method is used to get the session for the given server_id if a session for the given server_id already exists, it returns the existing session, otherwise it sets up a new session and returns it.
:param server: server to get the session for, if no server is given, the own API is used :type server: Server :return: returns a session to the specified server :rtype: Session
__get_url
__get_url(path: str, server: Server | None = None) -> str
This method is used to get the url for the given server, adding the given path to the url.
if no server is given, it uses the default url from the config, otherwise it uses the url of the given server.
:param path: path to add to the url :type path: str :param server: remote server to get the url for :type server: Server :return: returns the url for the given server with the path added :rtype: str
__join_path
staticmethod
__join_path(url: str, path: str) -> str
This method is used to join the given path to the given url. it checks if the path starts with a slash, if it does not, it also adds a slash to the url.
:param url: url to join the path to :type url: str :param path: path to join to the url :type path: str :return: returns the url with the path added :rtype: str
__send_request
async
__send_request(request: PreparedRequest, server: Server | None = None, **kwargs) -> dict
This method is used to send the given request and return the response.
:param request: the request to send :type request: PreparedRequest :param kwargs: keyword arguments :type kwargs: dict[str, Any] :return: returns the response of the request :rtype: dict
__setup_api_session
__setup_api_session() -> Session
This method is used to set up the session for the API.
:return: returns the session that was set up :rtype: Session
__setup_remote_api_session
async
__setup_remote_api_session(server_id: int) -> Session
This method is used to set up the session for the remote API.
:param server_id: server id of the remote server to set up the session for :type server_id: int :return: returns the session to the specified server that was set up :rtype: Session
attach_attribute_tag
async
attach_attribute_tag(attribute_id: int, tag_id: int, local: bool, server: Server | None = None) -> bool
Attaches a tag to an attribute
:param attribute_id: The ID of the attribute. :type attribute_id: int :param tag_id: The ID of the tag. :type tag_id: int :param local: If the tag is to be attached only locally. :type local: bool :param server: the server to attach the tag to the attribute on, if no server is given, the own API is used :type server: Server :return: true if the attachment was successful :rtype: bool
attach_event_tag
async
attach_event_tag(event_id: int, tag_id: int, local: bool, server: Server | None = None) -> bool
Attaches a tag to an event
:param event_id: The ID of the event. :type event_id: int :param tag_id: The ID of the tag. :type tag_id: int :param local: If the tag is to be attached only locally. :type local: bool :param server: the server to attach the tag to the event on, if no server is given, the own API is used :type server: Server :return: :rtype: bool
create_attribute
async
create_attribute(attribute: AddAttributeBody, server: Server | None = None) -> int
creates the given attribute on the server
:param attribute: contains the required attributes to creat an attribute :type attribute: AddAttributeBody :param server: the server to create the attribute on, if no server is given, the own API is used :type server: Server :return: The attribute id if the creation was successful. -1 otherwise. :rtype: int
create_tag
async
create_tag(tag: TagCreateBody, server: Server | None = None) -> int
Creates the given tag on the server :param tag: The tag to create. :type tag: TagCreateBody :param server: The server to create the tag on. If no server is given, the own MMISP-API Server is used. :type server: Server :return: the id of the created tag :rtype: int
edit_server
async
edit_server(server_body: EditServer, server_id: int) -> AddServerResponse
Edits the given server on the local server. :param server_body: the server to edit :type server_body: EditServer :return: returns the edited server :rtype: Server
get_attribute
async
get_attribute(attribute_id: int, server: Server | None = None) -> GetAttributeAttributes
Returns the attribute with the given attribute_id.
:param attribute_id: the id of the attribute to get :type attribute_id: int :param server: the server to get the attribute from, if no server is given, the own API is used :type server: Server :return: returns the attribute with the given attribute_id :rtype: GetAttributeAttributes
get_custom_clusters
async
get_custom_clusters(conditions: GalaxyClusterSearchBody, server: Server | None = None) -> list[SearchGalaxyClusterGalaxyClustersDetails]
Returns all custom clusters that match the given conditions from the given server. the limit is set as a constant in the class, if the amount of clusters is higher, the method will return only the first n clusters.
:param conditions: the conditions to filter the clusters :type conditions: JsonType :param server: the server to get the event from, if no server is given, the own API is used :type server: Server :return: returns all custom clusters that match the given conditions from the given server :rtype: list[SearchGalaxyClusterGalaxyClustersDetails]
get_event
async
get_event(event_id: int | UUID, server: Server | None = None) -> AddEditGetEventDetails
Returns the event with the given event_id from the given server, the own API is used if no server is given.
:param event_id: the id of the event to get :type event_id: int :param server: the server to get the event from, if no server is given, the own API is used :type server: Server :return: returns the event with the given event_id from the given server :rtype: AddEditGetEventDetails
get_event_attributes
async
get_event_attributes(event_id: int, server: Server | None = None) -> list[SearchAttributesAttributesDetails]
Returns all attribute object of the given event, represented by given event_id.
:param event_id: of the event :type event_id: int :param server: the server to get the attribute from, if no server is given, the own API is used :type server: Server :return: a list of all attributes :rtype: list[SearchAttributesAttributesDetails]
get_galaxy
async
get_galaxy(galaxy_id: int | str | UUID, server: Server | None = None) -> GetGalaxyResponse
Returns the galaxy with the given galaxy_id. :param galaxy_id: id or uuid of the galaxy to get :type galaxy_id: int | str :param server: the server to get the event from, if no server is given, the own API is used :type server: Server :return: returns the requested galaxy :rtype: GetGalaxyResponse
get_galaxy_cluster
async
get_galaxy_cluster(cluster_id: int | str, server: Server | None = None) -> GetGalaxyClusterResponse
Returns the galaxy cluster with the given cluster_id from the given server.
:param cluster_id: the id of the cluster to get :type cluster_id: int :param server: the server to get the event from, if no server is given, the own API is used :type server: Server :return: returns the requested galaxy cluster with the given id from the given server :rtype: GetGalaxyClusterResponse
get_minimal_events
async
get_minimal_events(ignore_filter_rules: bool, server: Server | None = None) -> list[MispMinimalEvent]
Returns all minimal events from the given server. if ignore_filter_rules is set to false, it uses the filter rules from the given server to filter the events. the limit is set as a constant in the class, if the amount of events is higher, the method will return only the first n events.
:param ignore_filter_rules: boolean to ignore the filter rules :type ignore_filter_rules: bool :param server: the server to get the event from, if no server is given, the own API is used :type server: Server :return: return all minimal events from the given server, capped by the limit :rtype: list[MispMinimalEvent]
get_object
async
get_object(object_id: int, server: Server | None = None) -> ObjectWithAttributesResponse
Returns the object with the given object_id.
:param object_id: id of the object :type object_id: int :param server: the server to get the object from, if no server is given, the own API is used :type server: Server :return: The object :rtype: ObjectWithAttributesResponse
get_organisation
async
get_organisation(organisation_id: int | str | UUID, server: Server | None = None) -> GetOrganisationElement
Returns the organisation with the given organisation_id.
:param organisation_id: id of the organisation. Can be an int or an uuid. :type organisation_id: int | str :param server: the server to get the organisation from, if no server is given, the own API is used :type server: Server :return: returns the organisation with the given organisation_id :rtype: dict
get_proposals
async
get_proposals(server: Server | None = None) -> list[ShadowAttribute]
Returns all shadow_attributes (proposals) from the given server from the last 90 days.
:param server: the server to get the proposals from, if no server is given, the own API is used :type server: Server :return: returns all proposals from the given server from the last 90 days :rtype: list[ShadowAttribute]
get_server_version
async
get_server_version(server: Server | None = None) -> ServerVersion
Returns the version of the given server
:param server: the server to get the event from, if no server is given, the own API is used :type server: Server :return: returns the version of the given server :rtype: ServerVersion
get_sharing_group
async
get_sharing_group(sharing_group_id: int | str, server: Server | None = None) -> ViewUpdateSharingGroupLegacyResponse
Returns the sharing group with the given id.
:param sharing_group_id: id or uuid of the sharing group to get from the API :type sharing_group_id: int | str :param server: the server to get the sharing group from, if no server is given, the own API is used :type server: Server :return: returns the sharing group that got requested :rtype: ViewUpdateSharingGroupLegacyResponse
get_sharing_groups
async
get_sharing_groups(server: Server | None = None) -> list[GetAllSharingGroupsResponseResponseItem]
Returns all sharing groups from the given server, if no server is given, the own API is used.
:param server: the server to get the sharing groups from, if no server is given, the own API is used :type server: Server :return: returns all sharing groups from the given server :rtype: list[GetAllSharingGroupsResponseResponseItem]
get_sightings_from_event
async
get_sightings_from_event(event_id: int, server: Server | None = None) -> list[SightingAttributesResponse]
Returns all sightings from the given event from the given server.
:param event_id: id of the event to get the sightings from :type event_id: id :param server: the server to get the event from, if no server is given, the own API is used :type server: Server :return: returns all sightings from the given event from the given server :rtype: list[SightingAttributesResponse]
get_user
async
get_user(user_id: int | None, server: Server | None = None) -> MispUser
Returns the user with the given user_id.
:param user_id: id of the user :type user_id: int :param server: the server to get the user from, if no server is given, the own API is used :type server: Server :return: returns the user with the given user_id :rtype: MispUser
modify_attribute_tag_relationship
async
modify_attribute_tag_relationship(attribute_tag_id: int, relationship_type: str, server: Server | None = None) -> bool
Modifies the relationship of the given tag to the given attribute Endpoint documented at: https://www.misp-project.org/2022/10/10/MISP.2.4.164.released.html/
:param attribute_tag_id: The ID of the attribute-tag assignment. :type attribute_tag_id: int :param relationship_type: The relationship type to set. :type relationship_type: str :param server: the server to modify the relationship on, if no server is given, the own API is used :type server: Server :return: returns true if the modification was successful :rtype: bool
modify_event_tag_relationship
async
modify_event_tag_relationship(event_tag_id: int, relationship_type: str, server: Server | None = None) -> bool
Modifies the relationship of the given tag to the given event Endpoint documented at: https://www.misp-project.org/2022/10/10/MISP.2.4.164.released.html/
:param event_tag_id: The ID of the event-tag assignment. :type event_tag_id: int :param relationship_type: The relationship type to set. :type relationship_type: str :param server: the server to modify the relationship on, if no server is given, the own API is used :type server: Server :return: returns true if the modification was successful :rtype: bool
save_cluster
async
save_cluster(cluster: GetGalaxyClusterResponse | SearchGalaxyClusterGalaxyClustersDetails, server: Server | None = None) -> bool
Saves the given cluster on the given server.
:param cluster: the cluster to save :type cluster: GetGalaxyClusterResponse :param server: the server to save the cluster on, if no server is given, the own API is used :type server: Server :return: returns true if the saving was successful :rtype: bool
save_event
async
save_event(event: AddEditGetEventDetails, server: Server | None = None) -> bool
Saves the given event on the given server.
:param event: the event to save :type event: AddEditGetEventDetails :param server: the server to save the event on, if no server is given, the own API is used :type server: Server :return: returns true if the saving was successful :rtype: bool
save_organisation
async
save_organisation(org: AddOrganisation) -> GetOrganisationElement
Saves the given organisation on the local server.
:param org: the organisation to save :type org: AddOrganisation :return: returns the saved organisation :rtype: GetOrganisationElement
save_proposal
async
save_proposal(event: AddEditGetEventDetails, server: Server | None = None) -> bool
Saves the given proposal on the given server.
:param event: the event to save the proposal for :type event: AddEditGetEventDetails :param server: the server to save the proposal on, if no server is given, the own API is used :type server: Server :return: returns true if the saving was successful :rtype: bool
save_sighting
async
save_sighting(sighting: SightingAttributesResponse, server: Server | None = None) -> bool
Saves the given sighting on the given server.
:param sighting: the sighting to save :type sighting: SightingAttributesResponse :param server: the server to save the sighting on, if no server is given, the own API is used :type server: Server :return: returns true if the saving was successful :rtype: bool
update_cluster
async
update_cluster(cluster: PutGalaxyClusterRequest, server: Server | None = None) -> bool
Updates the given cluster on the given server.
:param cluster: the cluster to update :type cluster: PutGalaxyClusterRequest :param server: the server to update the cluster on, if no server is given, the own API is used :type server: Server :return: returns true if the update was successful :rtype: bool
update_event
async
update_event(event: AddEditGetEventDetails, server: Server | None = None) -> bool
Updates the given event on the given server.
:param event: the event to update :type event: AddEditGetEventDetails :param server: the server to update the event on, if no server is given, the own API is used :type server: Server :return: returns true if the update was successful :rtype: bool
helper module to interact with misp database
add_correlation_value
async
add_correlation_value(session: AsyncSession, value: str) -> int
Adds a new value to correlation_values table or returns the id of the current entry with the same value. :param value: to add or get id of in the correlation_values table :type value: str :return: the id of the value in the correlation_values table :rtype: int
add_correlations
async
add_correlations(session: AsyncSession, correlations: list[DefaultCorrelation]) -> bool
Adds a list of correlations to the database. Returns True if at least one correlation was added, False otherwise. Doesn’t add correlations that are already in the database. :param correlations: list of correlations to add :type correlations: list[DefaultCorrelation] :return: true if at least one correlation was added, false otherwise :rtype: bool
add_over_correlating_value
async
add_over_correlating_value(session: AsyncSession, value: str, count: int) -> bool
Adds a new value to over_correlating_values table or updates the current entry with the same value. Returns True if value was added or updated, False otherwise. :param value: add or update :type value: str :param count: occurrence of value :type count: int :return: True if value was added or updated, False otherwise :rtype: bool
delete_correlations
async
delete_correlations(session: AsyncSession, value: str) -> bool
Deletes all correlations with value from database. Returns True if value was in database, False otherwise. :param value: to delete the correlations of :type value: str :return: True if value was in database, False otherwise :rtype: bool
delete_over_correlating_value
async
delete_over_correlating_value(session: AsyncSession, value: str) -> bool
Deletes value from over_correlating_values table. Returns True if value was in table, False otherwise. :param value: row to delete :type value: str :return: true if value was in table, false otherwise :rtype: bool
event_id_exists
async
event_id_exists(session: AsyncSession, event_id: int | str) -> bool
Checks if the event with the given ID exists in the database.
:param session: The database session. :type session: AsyncSession :param event_id: The ID or UUID of the event to check. :type event_id: int | str :return: True if the event exists, False otherwise. :rtype: bool :raises ValueError: If the event ID is not a valid integer or UUID.
filter_blocked_clusters
async
filter_blocked_clusters(session: AsyncSession, clusters: list[SearchGalaxyClusterGalaxyClustersDetails]) -> list[SearchGalaxyClusterGalaxyClustersDetails]
Get all blocked clusters from database and remove them from clusters list. :param clusters: list of clusters to check :type clusters: list[GetGalaxyClusterResponse] :return: list without blocked clusters :rtype: list[MispGalaxyCluster]
filter_blocked_events
async
filter_blocked_events(session: AsyncSession, events: list[MispMinimalEvent], use_event_blocklist: bool, use_org_blocklist: bool) -> list[MispMinimalEvent]
Clear the list from events that are listed as blocked in the misp database. Also, if the org is blocked, the events in the org are removed from the list. Return the list without the blocked events. :param events: list to remove blocked events from :type events: list[AddEditGetEventDetails] :param use_event_blocklist: if True, blocked events are removed from the list :type use_event_blocklist: bool :param use_org_blocklist: if True, the events from blocked orgs are removed from the list :type use_org_blocklist: bool :return: the list without the blocked events :rtype: list[MispEvent]
galaxy_cluster_id_exists
async
galaxy_cluster_id_exists(session: AsyncSession, cluster_id: int | UUID) -> bool
Checks if the galaxy cluster with the given ID exists in the database.
:param session: The database session. :type session: AsyncSession :param cluster_id: The ID of the galaxy cluster to check. :type cluster_id: int | str :return: True if the galaxy cluster exists, False otherwise. :rtype: bool :raises ValueError: If the galaxy cluster ID is not a valid integer or UUID.
galaxy_id_exists
async
galaxy_id_exists(session: AsyncSession, galaxy_id: int | UUID) -> bool
Checks if the galaxy with the given ID exists in the database.
:param session: The database session. :type session: AsyncSession :param galaxy_id: The ID of the galaxy to check. :type galaxy_id: int | str :return: True if the galaxy exists, False otherwise. :rtype: bool :raises ValueError: If the galaxy ID is not a valid integer or UUID.
get_api_authkey
async
get_api_authkey(session: AsyncSession, server_id: int) -> str | None
Method to get the API authentication key of the server with the given ID. :param server_id: The ID of the server. :type server_id: int :return: The API authentication key of the server. :rtype: str
get_attribute_tag
async
get_attribute_tag(session: AsyncSession, attribute_tag_id: int) -> AttributeTag | None
Method to get the AttributeTag object with the given ID.
:param attribute_tag_id: The ID of the attribute-tag object. :type attribute_tag_id: int :return: The AttributeTag object or None if it doesn’t exist. :rtype: AttributeTag | None
get_attribute_tag_id
async
get_attribute_tag_id(session: AsyncSession, attribute_id: int, tag_id: int) -> int
Method to get the ID of the attribute-tag object associated with the given attribute-ID and tag-ID.
:param attribute_id: The ID of the attribute. :type attribute_id: int :param tag_id: The ID of the tag. :type tag_id: int :return: The ID of the attribute-tag object or -1 if the object does not exist. :rtype: int
get_attributes_with_same_value
async
get_attributes_with_same_value(session: AsyncSession, value: str) -> list[Attribute]
Method to get all attributes with the same value from database. :param value: to get attributes with :type value: str :return: list of attributes with the same value :rtype: list[Attribute]
get_event_tag_id
async
get_event_tag_id(session: AsyncSession, event_id: int, tag_id: int) -> int
Method to get the ID of the event-tag object associated with the given event-ID and tag-ID.
:param event_id: The ID of the event. :type event_id: int :param tag_id: The ID of the tag. :type tag_id: int :return: The ID of the event-tag object or -1 if the object does not exist. :rtype: int
get_excluded_correlations
async
get_excluded_correlations(session: AsyncSession) -> Sequence[str]
Method to get all values from correlation_exclusions table. :return: all values from correlation_exclusions table :rtype: list[str]
get_number_of_correlations
async
get_number_of_correlations(session: AsyncSession, value: str, only_over_correlating_table: bool) -> int
Returns the number of correlations of value in the database. If only_over_correlating_table is True, only the value in the over_correlating_values table is returned. Else the number of correlations in the default_correlations table is returned Attention: It is assumed that the value is in the over_correlating_values table if only_over_correlating_table is True. :param value: to get number of correlations of :type value: str :param only_over_correlating_table: if True, only the value in the over_correlating_values table is returned :type only_over_correlating_table: bool :return: number of correlations of value in the database
get_org_by_name
async
get_org_by_name(session: AsyncSession, org_name: str) -> Organisation | None
Get organisation by name from database.
:param session: The database session. :type session: AsyncSession :param org_name: The name of the organisation to retrieve. :type org_name: str :return: The organisation object if found, None otherwise. :rtype: Organisation | None
get_over_correlating_values
async
get_over_correlating_values(session: AsyncSession) -> list[tuple[str, int]]
Method to get all values from over_correlating_values table with their occurrence. :return: all values from over_correlating_values table with their occurrence :rtype: list[tuple[str, int]]
get_post
async
get_post(session: AsyncSession, post_id: int) -> Post
Method to get a post from database. :param post_id: the id of the post to get :type post_id: int :return: the post with the given id :rtype: MispPost
get_server
async
get_server(session: AsyncSession, server_id: int) -> Server | None
Returns the server with the given server_id or none if it doesn’t exist.
:param server_id: id of the server :type server_id: int :return: returns the server that got requested or None :rtype: Server
get_values_with_correlation
async
get_values_with_correlation(session: AsyncSession) -> list[str]
” Method to get all values from correlation_values table. :return: all values from correlation_values table :rtype: list[str]
is_excluded_correlation
async
is_excluded_correlation(session: AsyncSession, value: str) -> bool
Checks if value is in correlation_exclusions table. :param value: to check :type value: str :return: True if value is in correlation_exclusions table, False otherwise :rtype: bool
is_over_correlating_value
async
is_over_correlating_value(session: AsyncSession, value: str) -> bool
Checks if value is in over_correlating_values table. Doesn’t check if value has more correlations in the database than the current threshold. :param value: to check :type value: str :return: True if value is in over_correlating_values table, False otherwise :rtype: bool
set_last_pushed_id
async
set_last_pushed_id(session: AsyncSession, server_id: int, last_pushed_id: int) -> None
Set the last pushed ID for a server in the database.
:param session: The database session. :type session: AsyncSession :param server_id: The ID of the server. :type server_id: int :param last_pushed_id: The last pushed ID to set. :type last_pushed_id: int
sighting_id_exists
async
sighting_id_exists(session: AsyncSession, sighting_id: int | str) -> bool
Checks if the sighting with the given ID exists in the database. :param session: The database session. :type session: AsyncSession :param sighting_id: The ID of the sighting to check. :type sighting_id: int | str :return: True if the sighting exists, False otherwise.