Skip to content

MISP Database Adapter

decode_json_response(response)

Decodes the JSON response from the MISP API

:param response: response from the MISP API :type response: Response :return: returns the decoded JSON response :rtype: dict

Source code in src/mmisp/worker/misp_database/misp_api_utils.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
def decode_json_response(response: Response) -> dict:
    """
    Decodes the JSON response from the MISP API

    :param response: response from the MISP API
    :type response: Response
    :return: returns the decoded JSON response
    :rtype: dict
    """
    response_dict: dict
    try:
        response_dict = response.json()
    except JSONDecodeError as json_error:
        print(response.text)
        raise InvalidAPIResponse(f"Invalid API response: {json_error}")

    return response_dict

translate_dictionary(dictionary, translation_dict)

translates the keys of a dictionary according to the translation dictionary

:param dictionary: dictionary to be translated :type dictionary: dict :param translation_dict: translation dictionary including the old key as the key and the new key as the value :type translation_dict: dict[str, str] :return: returns the translated dictionary :rtype: dict

Source code in src/mmisp/worker/misp_database/misp_api_utils.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def translate_dictionary(dictionary: dict, translation_dict: dict[str, str]) -> dict:
    """
    translates the keys of a dictionary according to the translation dictionary

    :param dictionary: dictionary to be translated
    :type dictionary: dict
    :param translation_dict: translation dictionary including the old key as the key and the new key as the value
    :type translation_dict: dict[str, str]
    :return: returns the translated dictionary
    :rtype: dict
    """
    translated_dict: dict = {}
    for key in dictionary.keys():
        if key in translation_dict:
            new_key: str = translation_dict[key]
            translated_dict[new_key] = dictionary[key]
        else:
            translated_dict[key] = dictionary[key]

    return translated_dict

MispAPI

This class is used to communicate with the MISP API.

it encapsulates the communication with the MISP API and provides methods to retrieve and send data. the data is parsed and validated by the MispAPIParser and MispAPIUtils classes, and returns the data as MMISP dataclasses.

Source code in src/mmisp/worker/misp_database/misp_api.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
class MispAPI:
    """
    This class is used to communicate with the MISP API.

    it encapsulates the communication with the MISP API and provides methods to retrieve and send data.
    the data is parsed and validated by the MispAPIParser and MispAPIUtils classes,
    and returns the data as MMISP dataclasses.
    """

    __HEADERS: dict = {"Accept": "application/json", "Content-Type": "application/json", "Authorization": ""}
    __LIMIT: int = 1000

    def __init__(self: Self, db: AsyncSession) -> None:
        self.__config: MispAPIConfigData = misp_api_config_data
        self._db = db

    def __setup_api_session(self: Self) -> Session:
        """
        This method is used to set up the session for the API.

        :return:  returns the session that was set up
        :rtype: Session
        """
        print("Auth Key is:", self.__config.key)
        if not self.__config.key:
            raise ValueError("Authorization cannot be empty")

        session = Session()
        session.headers.update(self.__HEADERS)
        session.headers.update({"Authorization": f"{self.__config.key}"})
        return session

    async def __setup_remote_api_session(self: Self, server_id: int) -> Session:
        """
        This method is used to set up the session for the remote API.

        :param server_id: server id of the remote server to set up the session for
        :type server_id: int
        :return: returns the session to the specified server that was set up
        :rtype: Session
        """

        key: str | None = await get_api_authkey(self._db, server_id)
        if key is None:
            raise APIException(f"API key for server {server_id} is not available.")

        session = Session()
        session.headers.update(self.__HEADERS)
        session.headers.update({"Authorization": f"{key}"})
        return session

    async def __get_session(self: Self, server: Server | None = None) -> Session:
        """
        This method is used to get the session for the given server_id
        if a session for the given server_id already exists, it returns the existing session,
        otherwise it sets up a new session and returns it.

        :param server: server to get the session for, if no server is given, the own API is used
        :type server: Server
        :return: returns a session to the specified server
        :rtype: Session
        """

        server_id: int = server.id if server is not None else 0
        if server_id == 0:
            return self.__setup_api_session()
        else:
            return await self.__setup_remote_api_session(server_id)

    def __get_url(self: Self, path: str, server: Server | None = None) -> str:
        """
        This method is used to get the url for the given server, adding the given path to the url.

        if no server is given, it uses the default url from the config,
        otherwise it uses the url of the given server.

        :param path: path to add to the url
        :type path: str
        :param server: remote server to get the url for
        :type server: Server
        :return: returns the url for the given server with the path added
        :rtype: str
        """
        url: str
        if server:
            url = server.url
        else:
            if self.__config.url.endswith("/"):
                url = self.__config.url[:-1]
            else:
                url = self.__config.url

        return self.__join_path(url, path)

    @staticmethod
    def __join_path(url: str, path: str) -> str:
        """
        This method is used to join the given path to the given url.
        it checks if the path starts with a slash, if it does not, it also adds a slash to the url.

        :param url: url to join the path to
        :type url: str
        :param path: path to join to the url
        :type path: str
        :return: returns the url with the path added
        :rtype: str
        """

        if path.startswith("/"):
            return url + path
        else:
            return f"{url}/{path}"

    async def __send_request(self: Self, request: PreparedRequest, server: Server | None = None, **kwargs) -> dict:
        """
        This method is used to send the given request and return the response.

        :param request: the request to send
        :type request: PreparedRequest
        :param kwargs: keyword arguments
        :type kwargs: dict[str, Any]
        :return: returns the response of the request
        :rtype: dict
        """
        print("Request is: ", request)
        print(request.method)
        print(request.headers)
        print(request.url)
        if request.method == "POST":
            print(request.body)
        response: Response

        if "timeout" not in kwargs:
            kwargs["timeout"] = (self.__config.connect_timeout, self.__config.read_timeout)

        try:
            response = (await self.__get_session(server)).send(request, **kwargs)
        except (ConnectionError, TimeoutError, TooManyRedirects) as api_exception:
            _log.warning(f"API not available. The request could not be made. ==> {api_exception}")
            raise APIException(f"API not available. The request could not be made. ==> {api_exception}")

        try:
            response.raise_for_status()
        except requests.HTTPError as http_err:
            # Füge hier eine detaillierte Fehlerausgabe hinzu
            error_details = (
                f"HTTP Error occurred: {http_err}\n"
                f"URL: {request.url}\n"
                f"Status Code: {response.status_code}\n"
                f"Response Text: {response.text}\n"
                f"Headers: {response.headers}"
            )
            _log.error(error_details)
            raise APIException(error_details) from http_err

        if response.status_code != codes.ok:
            raise requests.HTTPError(response, response.text)

        return misp_api_utils.decode_json_response(response)

    async def get_user(self: Self, user_id: int, server: Server | None = None) -> MispUser:
        """
        Returns the user with the given user_id.

        :param user_id: id of the user
        :type user_id: int
        :param server: the server to get the user from, if no server is given, the own API is used
        :type server: Server
        :return: returns the user with the given user_id
        :rtype: MispUser
        """
        url: str = self.__get_url(f"/admin/users/view/{user_id}", server)

        request: Request = Request("GET", url)
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
        response: dict = await self.__send_request(prepared_request, server)
        get_user_element_responds: GetUsersElement = GetUsersElement.model_validate(response)
        user_dict: dict = get_user_element_responds.User.model_dump()
        user_dict["role"] = get_user_element_responds.Role.model_dump()

        try:
            return MispUser.model_validate(user_dict)
        except ValueError as value_error:
            raise InvalidAPIResponse(f"Invalid API response. MISP user could not be parsed: {value_error}")

    async def get_object(self: Self, object_id: int, server: Server | None = None) -> ObjectWithAttributesResponse:
        """
        Returns the object with the given object_id.

        :param object_id:  id of the object
        :type object_id: int
        :param server: the server to get the object from, if no server is given, the own API is used
        :type server: Server
        :return: The object
        :rtype: ObjectWithAttributesResponse
        """
        if object_id == 0:
            #  for correlation to give back an empty object
            return ObjectWithAttributesResponse(
                id=0, uuid="", name="", distribution=AttributeDistributionLevels.OWN_ORGANIZATION, sharing_group_id=0
            )

        url: str = self.__get_url(f"objects/view/{object_id}", server)

        request: Request = Request("GET", url)
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
        response: dict = await self.__send_request(prepared_request, server)

        try:
            return ObjectResponse.model_validate(response).Object
        except ValueError as value_error:
            raise InvalidAPIResponse(
                f"Invalid API response. MISP ObjectWithAttributesResponse could not be parsed: {value_error}"
            )

    async def get_sharing_group(
        self: Self, sharing_group_id: int, server: Server | None = None
    ) -> ViewUpdateSharingGroupLegacyResponse:
        """
        Returns the sharing group with the given sharing_group_id

        :param sharing_group_id: id of the sharing group to get from the API
        :type sharing_group_id: int
        :param server: the server to get the sharing group from, if no server is given, the own API is used
        :type server: Server
        :return: returns the sharing group that got requested
        :rtype: ViewUpdateSharingGroupLegacyResponse
        """

        url: str = self.__get_url(f"/sharing_groups/view/{sharing_group_id}", server)
        request: Request = Request("GET", url)
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
        response: dict = await self.__send_request(prepared_request, server)
        try:
            return ViewUpdateSharingGroupLegacyResponse.parse_obj(response)
        except ValueError as value_error:
            raise InvalidAPIResponse(
                f"Invalid API response. MISP ViewUpdateSharingGroupLegacyResponse could not be parsed: {value_error}"
            )

    async def get_event(self: Self, event_id: int | UUID, server: Server | None = None) -> AddEditGetEventDetails:
        """
        Returns the event with the given event_id from the given server,
         the own API is used if no server is given.

        :param event_id: the id of the event to get
        :type event_id: int
        :param server: the server to get the event from, if no server is given, the own API is used
        :type server: Server
        :return: returns the event with the given event_id from the given server
        :rtype: AddEditGetEventDetails
        """
        url: str = self.__get_url(f"/events/view/{event_id}", server)
        request: Request = Request("GET", url)
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
        response: dict = await self.__send_request(prepared_request, server)
        try:
            return AddEditGetEventDetails.parse_obj(response["Event"])
        except ValueError as value_error:
            raise InvalidAPIResponse(
                f"Invalid API response. AddEditGetEventDetails"
                f"{json.dumps(response['Event'])} could not be parsed: {value_error}"
            )

    async def get_sharing_groups(
        self: Self, server: Server | None = None
    ) -> list[GetAllSharingGroupsResponseResponseItem]:
        """
        Returns all sharing groups from the given server, if no server is given, the own API is used.

        :param server: the server to get the sharing groups from, if no server is given, the own API is used
        :type server: Server
        :return: returns all sharing groups from the given server
        :rtype: list[GetAllSharingGroupsResponseResponseItem]
        """
        url: str = self.__get_url("/sharing_groups", server)

        request: Request = Request("GET", url)
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
        response: dict = await self.__send_request(prepared_request, server)
        print(f"get_sharing_groups: response={response}")

        try:
            return GetAllSharingGroupsResponse.parse_obj(response).response
        except ValueError as value_error:
            raise InvalidAPIResponse(f"Invalid API response. MISP Sharing Group could not be parsed: {value_error}")

    async def get_attribute(self: Self, attribute_id: int, server: Server | None = None) -> GetAttributeAttributes:
        """
        Returns the attribute with the given attribute_id.

        :param attribute_id: the id of the attribute to get
        :type attribute_id: int
        :param server: the server to get the attribute from, if no server is given, the own API is used
        :type server: Server
        :return: returns the attribute with the given attribute_id
        :rtype: GetAttributeAttributes
        """

        url: str = self.__get_url(f"/attributes/{attribute_id}", server)

        request: Request = Request("GET", url)
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
        response: dict = await self.__send_request(prepared_request, server)

        try:
            return GetAttributeResponse.parse_obj(response).Attribute
        except ValueError as value_error:
            raise InvalidAPIResponse(f"Invalid API response. MISP Attribute could not be parsed: {value_error}")

    async def get_event_attributes(
        self: Self, event_id: int, server: Server | None = None
    ) -> list[SearchAttributesAttributesDetails]:
        """
        Returns all attribute object of the given event, represented by given event_id.

        :param event_id: of the event
        :type event_id: int
        :param server: the server to get the attribute from, if no server is given, the own API is used
        :type server: Server
        :return: a list of all attributes
        :rtype: list[SearchAttributesAttributesDetails]
        """

        url: str = self.__get_url("/attributes/restSearch", server)
        body: SearchAttributesBody = SearchAttributesBody(
            eventid=event_id, with_attachments=True, include_event_uuid=True
        )
        request: Request = Request("POST", url, json=body.model_dump(mode="json"))
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
        response: dict = await self.__send_request(prepared_request, server)

        try:
            return SearchAttributesResponse.model_validate(response).response.Attribute
        except ValueError as value_error:
            raise InvalidAPIResponse(f"Invalid API response. Event Attributes could not be parsed: {value_error}")

    async def create_attribute(self: Self, attribute: AddAttributeBody, server: Server | None = None) -> int:
        """
        creates the given attribute on the server

        :param attribute: contains the required attributes to creat an attribute
        :type attribute: AddAttributeBody
        :param server: the server to create the attribute on, if no server is given, the own API is used
        :type server: Server
        :return: The attribute id if the creation was successful. -1 otherwise.
        :rtype: int
        """
        if attribute.uuid is None:
            attribute.uuid = uuid()

        if attribute.deleted is None:
            attribute.deleted = False

        url: str = self.__get_url(f"/attributes/add/{attribute.event_id}", server)

        request: Request = Request("POST", url, json=attribute.model_dump(mode="json"))
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
        response: dict = await self.__send_request(prepared_request, server)
        if "Attribute" in response:
            return int(response["Attribute"]["id"])

        return -1

    async def create_tag(self: Self, tag: TagCreateBody, server: Server | None = None) -> int:
        """
        Creates the given tag on the server
        :param tag: The tag to create.
        :type tag: TagCreateBody
        :param server: The server to create the tag on. If no server is given, the own MMISP-API Server is used.
        :type server: Server
        :return: the id of the created tag
        :rtype: int
        """

        url: str = self.__get_url("/tags/add", server)
        request: Request = Request("POST", url, json=tag.model_dump(mode="json"))
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)

        response: dict = await self.__send_request(prepared_request, server)
        return int(response["Tag"]["id"])

    async def attach_attribute_tag(
        self: Self, attribute_id: int, tag_id: int, local: bool, server: Server | None = None
    ) -> bool:
        """
        Attaches a tag to an attribute

        :param attribute_id: The ID of the attribute.
        :type attribute_id: int
        :param tag_id: The ID of the tag.
        :type tag_id: int
        :param local: If the tag is to be attached only locally.
        :type local: bool
        :param server: the server to attach the tag to the attribute on, if no server is given, the own API is used
        :type server: Server
        :return: true if the attachment was successful
        :rtype: bool
        """

        url: str = self.__get_url(
            f"/attributes/addTag/{attribute_id}/{tag_id}/local:{local}",
            server,
        )
        request: Request = Request("POST", url)
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
        await self.__send_request(prepared_request, server)

        return True

    async def attach_event_tag(
        self: Self, event_id: int, tag_id: int, local: bool, server: Server | None = None
    ) -> bool:
        """
        Attaches a tag to an event

        :param event_id: The ID of the event.
        :type event_id: int
        :param tag_id: The ID of the tag.
        :type tag_id: int
        :param local: If the tag is to be attached only locally.
        :type local: bool
        :param server: the server to attach the tag to the event on, if no server is given, the own API is used
        :type server: Server
        :return:
        :rtype: bool
        """

        url: str = self.__get_url(f"/events/addTag/{event_id}/{tag_id}/local:{local}", server)
        request: Request = Request("POST", url)
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)

        await self.__send_request(prepared_request, server)
        return True

    async def modify_event_tag_relationship(
        self: Self, event_tag_id: int, relationship_type: str, server: Server | None = None
    ) -> bool:
        """
        Modifies the relationship of the given tag to the given event
        Endpoint documented at: https://www.misp-project.org/2022/10/10/MISP.2.4.164.released.html/

        :param event_tag_id: The ID of the event-tag assignment.
        :type event_tag_id: int
        :param relationship_type: The relationship type to set.
        :type relationship_type: str
        :param server: the server to modify the relationship on, if no server is given, the own API is used
        :type server: Server
        :return: returns true if the modification was successful
        :rtype: bool
        """

        url: str = self.__get_url(f"/tags/modifyTagRelationship/event/{event_tag_id}", server)
        body: dict = {"Tag": {"relationship_type": relationship_type}}

        request: Request = Request("POST", url, json=body)
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)

        response: dict = await self.__send_request(prepared_request, server)
        print(response)
        return response["saved"] == "true" and response["success"] == "true"

    async def modify_attribute_tag_relationship(
        self: Self, attribute_tag_id: int, relationship_type: str, server: Server | None = None
    ) -> bool:
        """
        Modifies the relationship of the given tag to the given attribute
        Endpoint documented at: https://www.misp-project.org/2022/10/10/MISP.2.4.164.released.html/

        :param attribute_tag_id: The ID of the attribute-tag assignment.
        :type attribute_tag_id: int
        :param relationship_type: The relationship type to set.
        :type relationship_type: str
        :param server: the server to modify the relationship on, if no server is given, the own API is used
        :type server: Server
        :return: returns true if the modification was successful
        :rtype: bool
        """

        url: str = self.__get_url(f"/tags/modifyTagRelationship/attribute/{attribute_tag_id}", server)
        body = {"Tag": {"relationship_type": relationship_type}}

        request: Request = Request("POST", url, json=body)
        prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)

        response: dict = await self.__send_request(prepared_request, server)
        print(f"bananenbieger: modify_attribute_tag_relationship: response={response}")
        return response["saved"] is True and response["success"] is True

__get_session(server=None) async

This method is used to get the session for the given server_id if a session for the given server_id already exists, it returns the existing session, otherwise it sets up a new session and returns it.

:param server: server to get the session for, if no server is given, the own API is used :type server: Server :return: returns a session to the specified server :rtype: Session

Source code in src/mmisp/worker/misp_database/misp_api.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
async def __get_session(self: Self, server: Server | None = None) -> Session:
    """
    This method is used to get the session for the given server_id
    if a session for the given server_id already exists, it returns the existing session,
    otherwise it sets up a new session and returns it.

    :param server: server to get the session for, if no server is given, the own API is used
    :type server: Server
    :return: returns a session to the specified server
    :rtype: Session
    """

    server_id: int = server.id if server is not None else 0
    if server_id == 0:
        return self.__setup_api_session()
    else:
        return await self.__setup_remote_api_session(server_id)

__get_url(path, server=None)

This method is used to get the url for the given server, adding the given path to the url.

if no server is given, it uses the default url from the config, otherwise it uses the url of the given server.

:param path: path to add to the url :type path: str :param server: remote server to get the url for :type server: Server :return: returns the url for the given server with the path added :rtype: str

Source code in src/mmisp/worker/misp_database/misp_api.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def __get_url(self: Self, path: str, server: Server | None = None) -> str:
    """
    This method is used to get the url for the given server, adding the given path to the url.

    if no server is given, it uses the default url from the config,
    otherwise it uses the url of the given server.

    :param path: path to add to the url
    :type path: str
    :param server: remote server to get the url for
    :type server: Server
    :return: returns the url for the given server with the path added
    :rtype: str
    """
    url: str
    if server:
        url = server.url
    else:
        if self.__config.url.endswith("/"):
            url = self.__config.url[:-1]
        else:
            url = self.__config.url

    return self.__join_path(url, path)

__join_path(url, path) staticmethod

This method is used to join the given path to the given url. it checks if the path starts with a slash, if it does not, it also adds a slash to the url.

:param url: url to join the path to :type url: str :param path: path to join to the url :type path: str :return: returns the url with the path added :rtype: str

Source code in src/mmisp/worker/misp_database/misp_api.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@staticmethod
def __join_path(url: str, path: str) -> str:
    """
    This method is used to join the given path to the given url.
    it checks if the path starts with a slash, if it does not, it also adds a slash to the url.

    :param url: url to join the path to
    :type url: str
    :param path: path to join to the url
    :type path: str
    :return: returns the url with the path added
    :rtype: str
    """

    if path.startswith("/"):
        return url + path
    else:
        return f"{url}/{path}"

__send_request(request, server=None, **kwargs) async

This method is used to send the given request and return the response.

:param request: the request to send :type request: PreparedRequest :param kwargs: keyword arguments :type kwargs: dict[str, Any] :return: returns the response of the request :rtype: dict

Source code in src/mmisp/worker/misp_database/misp_api.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
async def __send_request(self: Self, request: PreparedRequest, server: Server | None = None, **kwargs) -> dict:
    """
    This method is used to send the given request and return the response.

    :param request: the request to send
    :type request: PreparedRequest
    :param kwargs: keyword arguments
    :type kwargs: dict[str, Any]
    :return: returns the response of the request
    :rtype: dict
    """
    print("Request is: ", request)
    print(request.method)
    print(request.headers)
    print(request.url)
    if request.method == "POST":
        print(request.body)
    response: Response

    if "timeout" not in kwargs:
        kwargs["timeout"] = (self.__config.connect_timeout, self.__config.read_timeout)

    try:
        response = (await self.__get_session(server)).send(request, **kwargs)
    except (ConnectionError, TimeoutError, TooManyRedirects) as api_exception:
        _log.warning(f"API not available. The request could not be made. ==> {api_exception}")
        raise APIException(f"API not available. The request could not be made. ==> {api_exception}")

    try:
        response.raise_for_status()
    except requests.HTTPError as http_err:
        # Füge hier eine detaillierte Fehlerausgabe hinzu
        error_details = (
            f"HTTP Error occurred: {http_err}\n"
            f"URL: {request.url}\n"
            f"Status Code: {response.status_code}\n"
            f"Response Text: {response.text}\n"
            f"Headers: {response.headers}"
        )
        _log.error(error_details)
        raise APIException(error_details) from http_err

    if response.status_code != codes.ok:
        raise requests.HTTPError(response, response.text)

    return misp_api_utils.decode_json_response(response)

__setup_api_session()

This method is used to set up the session for the API.

:return: returns the session that was set up :rtype: Session

Source code in src/mmisp/worker/misp_database/misp_api.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def __setup_api_session(self: Self) -> Session:
    """
    This method is used to set up the session for the API.

    :return:  returns the session that was set up
    :rtype: Session
    """
    print("Auth Key is:", self.__config.key)
    if not self.__config.key:
        raise ValueError("Authorization cannot be empty")

    session = Session()
    session.headers.update(self.__HEADERS)
    session.headers.update({"Authorization": f"{self.__config.key}"})
    return session

__setup_remote_api_session(server_id) async

This method is used to set up the session for the remote API.

:param server_id: server id of the remote server to set up the session for :type server_id: int :return: returns the session to the specified server that was set up :rtype: Session

Source code in src/mmisp/worker/misp_database/misp_api.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
async def __setup_remote_api_session(self: Self, server_id: int) -> Session:
    """
    This method is used to set up the session for the remote API.

    :param server_id: server id of the remote server to set up the session for
    :type server_id: int
    :return: returns the session to the specified server that was set up
    :rtype: Session
    """

    key: str | None = await get_api_authkey(self._db, server_id)
    if key is None:
        raise APIException(f"API key for server {server_id} is not available.")

    session = Session()
    session.headers.update(self.__HEADERS)
    session.headers.update({"Authorization": f"{key}"})
    return session

attach_attribute_tag(attribute_id, tag_id, local, server=None) async

Attaches a tag to an attribute

:param attribute_id: The ID of the attribute. :type attribute_id: int :param tag_id: The ID of the tag. :type tag_id: int :param local: If the tag is to be attached only locally. :type local: bool :param server: the server to attach the tag to the attribute on, if no server is given, the own API is used :type server: Server :return: true if the attachment was successful :rtype: bool

Source code in src/mmisp/worker/misp_database/misp_api.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
async def attach_attribute_tag(
    self: Self, attribute_id: int, tag_id: int, local: bool, server: Server | None = None
) -> bool:
    """
    Attaches a tag to an attribute

    :param attribute_id: The ID of the attribute.
    :type attribute_id: int
    :param tag_id: The ID of the tag.
    :type tag_id: int
    :param local: If the tag is to be attached only locally.
    :type local: bool
    :param server: the server to attach the tag to the attribute on, if no server is given, the own API is used
    :type server: Server
    :return: true if the attachment was successful
    :rtype: bool
    """

    url: str = self.__get_url(
        f"/attributes/addTag/{attribute_id}/{tag_id}/local:{local}",
        server,
    )
    request: Request = Request("POST", url)
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
    await self.__send_request(prepared_request, server)

    return True

attach_event_tag(event_id, tag_id, local, server=None) async

Attaches a tag to an event

:param event_id: The ID of the event. :type event_id: int :param tag_id: The ID of the tag. :type tag_id: int :param local: If the tag is to be attached only locally. :type local: bool :param server: the server to attach the tag to the event on, if no server is given, the own API is used :type server: Server :return: :rtype: bool

Source code in src/mmisp/worker/misp_database/misp_api.py
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
async def attach_event_tag(
    self: Self, event_id: int, tag_id: int, local: bool, server: Server | None = None
) -> bool:
    """
    Attaches a tag to an event

    :param event_id: The ID of the event.
    :type event_id: int
    :param tag_id: The ID of the tag.
    :type tag_id: int
    :param local: If the tag is to be attached only locally.
    :type local: bool
    :param server: the server to attach the tag to the event on, if no server is given, the own API is used
    :type server: Server
    :return:
    :rtype: bool
    """

    url: str = self.__get_url(f"/events/addTag/{event_id}/{tag_id}/local:{local}", server)
    request: Request = Request("POST", url)
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)

    await self.__send_request(prepared_request, server)
    return True

create_attribute(attribute, server=None) async

creates the given attribute on the server

:param attribute: contains the required attributes to creat an attribute :type attribute: AddAttributeBody :param server: the server to create the attribute on, if no server is given, the own API is used :type server: Server :return: The attribute id if the creation was successful. -1 otherwise. :rtype: int

Source code in src/mmisp/worker/misp_database/misp_api.py
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
async def create_attribute(self: Self, attribute: AddAttributeBody, server: Server | None = None) -> int:
    """
    creates the given attribute on the server

    :param attribute: contains the required attributes to creat an attribute
    :type attribute: AddAttributeBody
    :param server: the server to create the attribute on, if no server is given, the own API is used
    :type server: Server
    :return: The attribute id if the creation was successful. -1 otherwise.
    :rtype: int
    """
    if attribute.uuid is None:
        attribute.uuid = uuid()

    if attribute.deleted is None:
        attribute.deleted = False

    url: str = self.__get_url(f"/attributes/add/{attribute.event_id}", server)

    request: Request = Request("POST", url, json=attribute.model_dump(mode="json"))
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
    response: dict = await self.__send_request(prepared_request, server)
    if "Attribute" in response:
        return int(response["Attribute"]["id"])

    return -1

create_tag(tag, server=None) async

Creates the given tag on the server :param tag: The tag to create. :type tag: TagCreateBody :param server: The server to create the tag on. If no server is given, the own MMISP-API Server is used. :type server: Server :return: the id of the created tag :rtype: int

Source code in src/mmisp/worker/misp_database/misp_api.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
async def create_tag(self: Self, tag: TagCreateBody, server: Server | None = None) -> int:
    """
    Creates the given tag on the server
    :param tag: The tag to create.
    :type tag: TagCreateBody
    :param server: The server to create the tag on. If no server is given, the own MMISP-API Server is used.
    :type server: Server
    :return: the id of the created tag
    :rtype: int
    """

    url: str = self.__get_url("/tags/add", server)
    request: Request = Request("POST", url, json=tag.model_dump(mode="json"))
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)

    response: dict = await self.__send_request(prepared_request, server)
    return int(response["Tag"]["id"])

get_attribute(attribute_id, server=None) async

Returns the attribute with the given attribute_id.

:param attribute_id: the id of the attribute to get :type attribute_id: int :param server: the server to get the attribute from, if no server is given, the own API is used :type server: Server :return: returns the attribute with the given attribute_id :rtype: GetAttributeAttributes

Source code in src/mmisp/worker/misp_database/misp_api.py
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
async def get_attribute(self: Self, attribute_id: int, server: Server | None = None) -> GetAttributeAttributes:
    """
    Returns the attribute with the given attribute_id.

    :param attribute_id: the id of the attribute to get
    :type attribute_id: int
    :param server: the server to get the attribute from, if no server is given, the own API is used
    :type server: Server
    :return: returns the attribute with the given attribute_id
    :rtype: GetAttributeAttributes
    """

    url: str = self.__get_url(f"/attributes/{attribute_id}", server)

    request: Request = Request("GET", url)
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
    response: dict = await self.__send_request(prepared_request, server)

    try:
        return GetAttributeResponse.parse_obj(response).Attribute
    except ValueError as value_error:
        raise InvalidAPIResponse(f"Invalid API response. MISP Attribute could not be parsed: {value_error}")

get_event(event_id, server=None) async

Returns the event with the given event_id from the given server, the own API is used if no server is given.

:param event_id: the id of the event to get :type event_id: int :param server: the server to get the event from, if no server is given, the own API is used :type server: Server :return: returns the event with the given event_id from the given server :rtype: AddEditGetEventDetails

Source code in src/mmisp/worker/misp_database/misp_api.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
async def get_event(self: Self, event_id: int | UUID, server: Server | None = None) -> AddEditGetEventDetails:
    """
    Returns the event with the given event_id from the given server,
     the own API is used if no server is given.

    :param event_id: the id of the event to get
    :type event_id: int
    :param server: the server to get the event from, if no server is given, the own API is used
    :type server: Server
    :return: returns the event with the given event_id from the given server
    :rtype: AddEditGetEventDetails
    """
    url: str = self.__get_url(f"/events/view/{event_id}", server)
    request: Request = Request("GET", url)
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
    response: dict = await self.__send_request(prepared_request, server)
    try:
        return AddEditGetEventDetails.parse_obj(response["Event"])
    except ValueError as value_error:
        raise InvalidAPIResponse(
            f"Invalid API response. AddEditGetEventDetails"
            f"{json.dumps(response['Event'])} could not be parsed: {value_error}"
        )

get_event_attributes(event_id, server=None) async

Returns all attribute object of the given event, represented by given event_id.

:param event_id: of the event :type event_id: int :param server: the server to get the attribute from, if no server is given, the own API is used :type server: Server :return: a list of all attributes :rtype: list[SearchAttributesAttributesDetails]

Source code in src/mmisp/worker/misp_database/misp_api.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
async def get_event_attributes(
    self: Self, event_id: int, server: Server | None = None
) -> list[SearchAttributesAttributesDetails]:
    """
    Returns all attribute object of the given event, represented by given event_id.

    :param event_id: of the event
    :type event_id: int
    :param server: the server to get the attribute from, if no server is given, the own API is used
    :type server: Server
    :return: a list of all attributes
    :rtype: list[SearchAttributesAttributesDetails]
    """

    url: str = self.__get_url("/attributes/restSearch", server)
    body: SearchAttributesBody = SearchAttributesBody(
        eventid=event_id, with_attachments=True, include_event_uuid=True
    )
    request: Request = Request("POST", url, json=body.model_dump(mode="json"))
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
    response: dict = await self.__send_request(prepared_request, server)

    try:
        return SearchAttributesResponse.model_validate(response).response.Attribute
    except ValueError as value_error:
        raise InvalidAPIResponse(f"Invalid API response. Event Attributes could not be parsed: {value_error}")

get_object(object_id, server=None) async

Returns the object with the given object_id.

:param object_id: id of the object :type object_id: int :param server: the server to get the object from, if no server is given, the own API is used :type server: Server :return: The object :rtype: ObjectWithAttributesResponse

Source code in src/mmisp/worker/misp_database/misp_api.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
async def get_object(self: Self, object_id: int, server: Server | None = None) -> ObjectWithAttributesResponse:
    """
    Returns the object with the given object_id.

    :param object_id:  id of the object
    :type object_id: int
    :param server: the server to get the object from, if no server is given, the own API is used
    :type server: Server
    :return: The object
    :rtype: ObjectWithAttributesResponse
    """
    if object_id == 0:
        #  for correlation to give back an empty object
        return ObjectWithAttributesResponse(
            id=0, uuid="", name="", distribution=AttributeDistributionLevels.OWN_ORGANIZATION, sharing_group_id=0
        )

    url: str = self.__get_url(f"objects/view/{object_id}", server)

    request: Request = Request("GET", url)
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
    response: dict = await self.__send_request(prepared_request, server)

    try:
        return ObjectResponse.model_validate(response).Object
    except ValueError as value_error:
        raise InvalidAPIResponse(
            f"Invalid API response. MISP ObjectWithAttributesResponse could not be parsed: {value_error}"
        )

get_sharing_group(sharing_group_id, server=None) async

Returns the sharing group with the given sharing_group_id

:param sharing_group_id: id of the sharing group to get from the API :type sharing_group_id: int :param server: the server to get the sharing group from, if no server is given, the own API is used :type server: Server :return: returns the sharing group that got requested :rtype: ViewUpdateSharingGroupLegacyResponse

Source code in src/mmisp/worker/misp_database/misp_api.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
async def get_sharing_group(
    self: Self, sharing_group_id: int, server: Server | None = None
) -> ViewUpdateSharingGroupLegacyResponse:
    """
    Returns the sharing group with the given sharing_group_id

    :param sharing_group_id: id of the sharing group to get from the API
    :type sharing_group_id: int
    :param server: the server to get the sharing group from, if no server is given, the own API is used
    :type server: Server
    :return: returns the sharing group that got requested
    :rtype: ViewUpdateSharingGroupLegacyResponse
    """

    url: str = self.__get_url(f"/sharing_groups/view/{sharing_group_id}", server)
    request: Request = Request("GET", url)
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
    response: dict = await self.__send_request(prepared_request, server)
    try:
        return ViewUpdateSharingGroupLegacyResponse.parse_obj(response)
    except ValueError as value_error:
        raise InvalidAPIResponse(
            f"Invalid API response. MISP ViewUpdateSharingGroupLegacyResponse could not be parsed: {value_error}"
        )

get_sharing_groups(server=None) async

Returns all sharing groups from the given server, if no server is given, the own API is used.

:param server: the server to get the sharing groups from, if no server is given, the own API is used :type server: Server :return: returns all sharing groups from the given server :rtype: list[GetAllSharingGroupsResponseResponseItem]

Source code in src/mmisp/worker/misp_database/misp_api.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
async def get_sharing_groups(
    self: Self, server: Server | None = None
) -> list[GetAllSharingGroupsResponseResponseItem]:
    """
    Returns all sharing groups from the given server, if no server is given, the own API is used.

    :param server: the server to get the sharing groups from, if no server is given, the own API is used
    :type server: Server
    :return: returns all sharing groups from the given server
    :rtype: list[GetAllSharingGroupsResponseResponseItem]
    """
    url: str = self.__get_url("/sharing_groups", server)

    request: Request = Request("GET", url)
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
    response: dict = await self.__send_request(prepared_request, server)
    print(f"get_sharing_groups: response={response}")

    try:
        return GetAllSharingGroupsResponse.parse_obj(response).response
    except ValueError as value_error:
        raise InvalidAPIResponse(f"Invalid API response. MISP Sharing Group could not be parsed: {value_error}")

get_user(user_id, server=None) async

Returns the user with the given user_id.

:param user_id: id of the user :type user_id: int :param server: the server to get the user from, if no server is given, the own API is used :type server: Server :return: returns the user with the given user_id :rtype: MispUser

Source code in src/mmisp/worker/misp_database/misp_api.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
async def get_user(self: Self, user_id: int, server: Server | None = None) -> MispUser:
    """
    Returns the user with the given user_id.

    :param user_id: id of the user
    :type user_id: int
    :param server: the server to get the user from, if no server is given, the own API is used
    :type server: Server
    :return: returns the user with the given user_id
    :rtype: MispUser
    """
    url: str = self.__get_url(f"/admin/users/view/{user_id}", server)

    request: Request = Request("GET", url)
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)
    response: dict = await self.__send_request(prepared_request, server)
    get_user_element_responds: GetUsersElement = GetUsersElement.model_validate(response)
    user_dict: dict = get_user_element_responds.User.model_dump()
    user_dict["role"] = get_user_element_responds.Role.model_dump()

    try:
        return MispUser.model_validate(user_dict)
    except ValueError as value_error:
        raise InvalidAPIResponse(f"Invalid API response. MISP user could not be parsed: {value_error}")

modify_attribute_tag_relationship(attribute_tag_id, relationship_type, server=None) async

Modifies the relationship of the given tag to the given attribute Endpoint documented at: https://www.misp-project.org/2022/10/10/MISP.2.4.164.released.html/

:param attribute_tag_id: The ID of the attribute-tag assignment. :type attribute_tag_id: int :param relationship_type: The relationship type to set. :type relationship_type: str :param server: the server to modify the relationship on, if no server is given, the own API is used :type server: Server :return: returns true if the modification was successful :rtype: bool

Source code in src/mmisp/worker/misp_database/misp_api.py
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
async def modify_attribute_tag_relationship(
    self: Self, attribute_tag_id: int, relationship_type: str, server: Server | None = None
) -> bool:
    """
    Modifies the relationship of the given tag to the given attribute
    Endpoint documented at: https://www.misp-project.org/2022/10/10/MISP.2.4.164.released.html/

    :param attribute_tag_id: The ID of the attribute-tag assignment.
    :type attribute_tag_id: int
    :param relationship_type: The relationship type to set.
    :type relationship_type: str
    :param server: the server to modify the relationship on, if no server is given, the own API is used
    :type server: Server
    :return: returns true if the modification was successful
    :rtype: bool
    """

    url: str = self.__get_url(f"/tags/modifyTagRelationship/attribute/{attribute_tag_id}", server)
    body = {"Tag": {"relationship_type": relationship_type}}

    request: Request = Request("POST", url, json=body)
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)

    response: dict = await self.__send_request(prepared_request, server)
    print(f"bananenbieger: modify_attribute_tag_relationship: response={response}")
    return response["saved"] is True and response["success"] is True

modify_event_tag_relationship(event_tag_id, relationship_type, server=None) async

Modifies the relationship of the given tag to the given event Endpoint documented at: https://www.misp-project.org/2022/10/10/MISP.2.4.164.released.html/

:param event_tag_id: The ID of the event-tag assignment. :type event_tag_id: int :param relationship_type: The relationship type to set. :type relationship_type: str :param server: the server to modify the relationship on, if no server is given, the own API is used :type server: Server :return: returns true if the modification was successful :rtype: bool

Source code in src/mmisp/worker/misp_database/misp_api.py
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
async def modify_event_tag_relationship(
    self: Self, event_tag_id: int, relationship_type: str, server: Server | None = None
) -> bool:
    """
    Modifies the relationship of the given tag to the given event
    Endpoint documented at: https://www.misp-project.org/2022/10/10/MISP.2.4.164.released.html/

    :param event_tag_id: The ID of the event-tag assignment.
    :type event_tag_id: int
    :param relationship_type: The relationship type to set.
    :type relationship_type: str
    :param server: the server to modify the relationship on, if no server is given, the own API is used
    :type server: Server
    :return: returns true if the modification was successful
    :rtype: bool
    """

    url: str = self.__get_url(f"/tags/modifyTagRelationship/event/{event_tag_id}", server)
    body: dict = {"Tag": {"relationship_type": relationship_type}}

    request: Request = Request("POST", url, json=body)
    prepared_request: PreparedRequest = (await self.__get_session(server)).prepare_request(request)

    response: dict = await self.__send_request(prepared_request, server)
    print(response)
    return response["saved"] == "true" and response["success"] == "true"

helper module to interact with misp database

add_correlation_value(session, value) async

Adds a new value to correlation_values table or returns the id of the current entry with the same value. :param value: to add or get id of in the correlation_values table :type value: str :return: the id of the value in the correlation_values table :rtype: int

Source code in src/mmisp/worker/misp_database/misp_sql.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
async def add_correlation_value(session: AsyncSession, value: str) -> int:
    """
    Adds a new value to correlation_values table or returns the id of the current entry with the same value.
    :param value: to add or get id of in the correlation_values table
    :type value: str
    :return: the id of the value in the correlation_values table
    :rtype: int
    """
    statement = select(CorrelationValue).where(CorrelationValue.value == value)
    result: CorrelationValue | None = (await session.execute(statement)).scalars().first()
    if not result:
        new_value: CorrelationValue = CorrelationValue(value=value)
        session.add(new_value)
        await session.commit()
        await session.refresh(new_value)
        return new_value.id
    else:
        return result.id

add_correlations(session, correlations) async

Adds a list of correlations to the database. Returns True if at least one correlation was added, False otherwise. Doesn't add correlations that are already in the database. :param correlations: list of correlations to add :type correlations: list[DefaultCorrelation] :return: true if at least one correlation was added, false otherwise :rtype: bool

Source code in src/mmisp/worker/misp_database/misp_sql.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
async def add_correlations(session: AsyncSession, correlations: list[DefaultCorrelation]) -> bool:
    """
    Adds a list of correlations to the database. Returns True if at least one correlation was added,
    False otherwise.
    Doesn't add correlations that are already in the database.
    :param correlations: list of correlations to add
    :type correlations: list[DefaultCorrelation]
    :return: true if at least one correlation was added, false otherwise
    :rtype: bool
    """
    changed: bool = False
    for correlation in correlations:
        attribute_id1 = correlation.attribute_id
        attribute_id2 = correlation.attribute_id_1
        search_statement_1 = select(DefaultCorrelation.id).where(
            and_(
                DefaultCorrelation.attribute_id == attribute_id1,
                DefaultCorrelation.attribute_id_1 == attribute_id2,
            )
        )
        search_statement_2 = select(DefaultCorrelation.id).where(
            and_(
                DefaultCorrelation.attribute_id == attribute_id2,
                DefaultCorrelation.attribute_id_1 == attribute_id1,
            )
        )
        search_result_1: int | None = (await session.execute(search_statement_1)).scalars().first()
        search_result_2: int | None = (await session.execute(search_statement_2)).scalars().first()

        if search_result_1 or search_result_2:
            continue
        session.add(correlation)
        changed = True
    if changed:
        await session.commit()
    return changed

add_over_correlating_value(session, value, count) async

Adds a new value to over_correlating_values table or updates the current entry with the same value. Returns True if value was added or updated, False otherwise. :param value: add or update :type value: str :param count: occurrence of value :type count: int :return: True if value was added or updated, False otherwise :rtype: bool

Source code in src/mmisp/worker/misp_database/misp_sql.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
async def add_over_correlating_value(session: AsyncSession, value: str, count: int) -> bool:
    """
    Adds a new value to over_correlating_values table or updates the current entry with the same value.
    Returns True if value was added or updated, False otherwise.
    :param value: add or update
    :type value: str
    :param count: occurrence of value
    :type count: int
    :return: True if value was added or updated, False otherwise
    :rtype: bool
    """
    statement = select(OverCorrelatingValue).where(OverCorrelatingValue.value == value)
    result: OverCorrelatingValue | None = (await session.execute(statement)).scalars().first()
    if result is not None:
        result.occurrence = count
        session.add(result)
    else:
        ocv: OverCorrelatingValue = OverCorrelatingValue(value=value, occurrence=count)
        session.add(ocv)

    await session.commit()
    return True

delete_correlations(session, value) async

Deletes all correlations with value from database. Returns True if value was in database, False otherwise. :param value: to delete the correlations of :type value: str :return: True if value was in database, False otherwise :rtype: bool

Source code in src/mmisp/worker/misp_database/misp_sql.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
async def delete_correlations(session: AsyncSession, value: str) -> bool:
    """
    Deletes all correlations with value from database. Returns True if value was in database, False otherwise.
    :param value: to delete the correlations of
    :type value: str
    :return: True if value was in database, False otherwise
    :rtype: bool
    """
    statement_value_id = select(CorrelationValue).where(CorrelationValue.value == value)
    correlation_value: CorrelationValue | None = (await session.execute(statement_value_id)).scalars().first()

    if correlation_value:
        delete_statement_value = delete(CorrelationValue).where(CorrelationValue.value == value)
        await session.execute(delete_statement_value)

        delete_statement_correlations = delete(DefaultCorrelation).where(
            DefaultCorrelation.value_id == correlation_value.id
        )
        await session.execute(delete_statement_correlations)

        return True
    else:
        return False

delete_over_correlating_value(session, value) async

Deletes value from over_correlating_values table. Returns True if value was in table, False otherwise. :param value: row to delete :type value: str :return: true if value was in table, false otherwise :rtype: bool

Source code in src/mmisp/worker/misp_database/misp_sql.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
async def delete_over_correlating_value(session: AsyncSession, value: str) -> bool:
    """
    Deletes value from over_correlating_values table. Returns True if value was in table, False otherwise.
    :param value: row to delete
    :type value: str
    :return: true if value was in table, false otherwise
    :rtype: bool
    """
    result = await is_over_correlating_value(session, value)
    if result:
        statement = delete(OverCorrelatingValue).where(OverCorrelatingValue.value == value)
        await session.execute(statement)
        return True
    return False

filter_blocked_clusters(session, clusters) async

Get all blocked clusters from database and remove them from clusters list. :param clusters: list of clusters to check :type clusters: list[GetGalaxyClusterResponse] :return: list without blocked clusters :rtype: list[MispGalaxyCluster]

Source code in src/mmisp/worker/misp_database/misp_sql.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
async def filter_blocked_clusters(
    session: AsyncSession, clusters: list[GetGalaxyClusterResponse]
) -> list[GetGalaxyClusterResponse]:
    """
    Get all blocked clusters from database and remove them from clusters list.
    :param clusters: list of clusters to check
    :type clusters: list[GetGalaxyClusterResponse]
    :return: list without blocked clusters
    :rtype: list[MispGalaxyCluster]
    """
    for cluster in clusters:
        statement = select(GalaxyClusterBlocklist).where(GalaxyClusterBlocklist.cluster_uuid == cluster.uuid)
        result = (await session.execute(statement)).scalars().all()
        if len(result) > 0:
            clusters.remove(cluster)
    return clusters

filter_blocked_events(session, events, use_event_blocklist, use_org_blocklist) async

Clear the list from events that are listed as blocked in the misp database. Also, if the org is blocked, the events in the org are removed from the list. Return the list without the blocked events. :param events: list to remove blocked events from :type events: list[AddEditGetEventDetails] :param use_event_blocklist: if True, blocked events are removed from the list :type use_event_blocklist: bool :param use_org_blocklist: if True, the events from blocked orgs are removed from the list :type use_org_blocklist: bool :return: the list without the blocked events :rtype: list[MispEvent]

Source code in src/mmisp/worker/misp_database/misp_sql.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
async def filter_blocked_events(
    session: AsyncSession, events: list[MispMinimalEvent], use_event_blocklist: bool, use_org_blocklist: bool
) -> list[MispMinimalEvent]:
    """
    Clear the list from events that are listed as blocked in the misp database. Also, if the org is blocked, the
    events in the org are removed from the list. Return the list without the blocked events.
    :param events: list to remove blocked events from
    :type events: list[AddEditGetEventDetails]
    :param use_event_blocklist: if True, blocked events are removed from the list
    :type use_event_blocklist: bool
    :param use_org_blocklist: if True, the events from blocked orgs are removed from the list
    :type use_org_blocklist: bool
    :return: the list without the blocked events
    :rtype: list[MispEvent]
    """
    if use_org_blocklist:
        for event in events:
            statement = select(EventBlocklist).where(OrgBlocklist.org_uuid == event.org_c_uuid)
            result = (await session.execute(statement)).scalars().all()
            if len(result) > 0:
                events.remove(event)
    if use_event_blocklist:
        for event in events:
            statement = select(EventBlocklist).where(EventBlocklist.event_uuid == event.uuid)
            result = (await session.execute(statement)).scalars().all()
            if len(result) > 0:
                events.remove(event)
    return events

get_api_authkey(session, server_id) async

Method to get the API authentication key of the server with the given ID. :param server_id: The ID of the server. :type server_id: int :return: The API authentication key of the server. :rtype: str

Source code in src/mmisp/worker/misp_database/misp_sql.py
25
26
27
28
29
30
31
32
33
34
35
async def get_api_authkey(session: AsyncSession, server_id: int) -> str | None:
    """
    Method to get the API authentication key of the server with the given ID.
    :param server_id: The ID of the server.
    :type server_id: int
    :return: The API authentication key of the server.
    :rtype: str
    """
    statement = select(Server.authkey).where(Server.id == server_id)
    result: str | None = (await session.execute(statement)).scalars().first()
    return result

get_attribute_tag(session, attribute_tag_id) async

Method to get the AttributeTag object with the given ID.

:param attribute_tag_id: The ID of the attribute-tag object. :type attribute_tag_id: int :return: The AttributeTag object or None if it doesn't exist. :rtype: AttributeTag | None

Source code in src/mmisp/worker/misp_database/misp_sql.py
379
380
381
382
383
384
385
386
387
388
389
390
async def get_attribute_tag(session: AsyncSession, attribute_tag_id: int) -> AttributeTag | None:
    """
    Method to get the AttributeTag object with the given ID.

    :param attribute_tag_id: The ID of the attribute-tag object.
    :type attribute_tag_id: int
    :return: The AttributeTag object or None if it doesn't exist.
    :rtype: AttributeTag | None
    """

    statement = select(AttributeTag).where(AttributeTag.id == attribute_tag_id)
    return (await session.execute(statement)).scalars().first()

get_attribute_tag_id(session, attribute_id, tag_id) async

Method to get the ID of the attribute-tag object associated with the given attribute-ID and tag-ID.

:param attribute_id: The ID of the attribute. :type attribute_id: int :param tag_id: The ID of the tag. :type tag_id: int :return: The ID of the attribute-tag object or -1 if the object does not exist. :rtype: int

Source code in src/mmisp/worker/misp_database/misp_sql.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
async def get_attribute_tag_id(session: AsyncSession, attribute_id: int, tag_id: int) -> int:
    """
    Method to get the ID of the attribute-tag object associated with the given attribute-ID and tag-ID.

    :param attribute_id: The ID of the attribute.
    :type attribute_id: int
    :param tag_id: The ID of the tag.
    :type tag_id: int
    :return: The ID of the attribute-tag object or -1 if the object does not exist.
    :rtype: int
    """

    statement = select(AttributeTag.id).where(
        and_(AttributeTag.attribute_id == attribute_id, AttributeTag.tag_id == tag_id)
    )
    search_result: int | None = (await session.execute(statement)).scalar()
    if search_result:
        return search_result
    else:
        return -1

get_attributes_with_same_value(session, value) async

Method to get all attributes with the same value from database. :param value: to get attributes with :type value: str :return: list of attributes with the same value :rtype: list[Attribute]

Source code in src/mmisp/worker/misp_database/misp_sql.py
86
87
88
89
90
91
92
93
94
95
96
async def get_attributes_with_same_value(session: AsyncSession, value: str) -> list[Attribute]:
    """
    Method to get all attributes with the same value from database.
    :param value: to get attributes with
    :type value: str
    :return: list of attributes with the same value
    :rtype: list[Attribute]
    """
    statement = select(Attribute).where(and_(Attribute.value == value, Attribute.disable_correlation == false()))  # type: ignore
    result: list[Attribute] = list((await session.execute(statement)).scalars().all())
    return result

get_event_tag_id(session, event_id, tag_id) async

Method to get the ID of the event-tag object associated with the given event-ID and tag-ID.

:param event_id: The ID of the event. :type event_id: int :param tag_id: The ID of the tag. :type tag_id: int :return: The ID of the event-tag object or -1 if the object does not exist. :rtype: int

Source code in src/mmisp/worker/misp_database/misp_sql.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
async def get_event_tag_id(session: AsyncSession, event_id: int, tag_id: int) -> int:
    """
    Method to get the ID of the event-tag object associated with the given event-ID and tag-ID.

    :param event_id: The ID of the event.
    :type event_id: int
    :param tag_id: The ID of the tag.
    :type tag_id: int
    :return: The ID of the event-tag object or -1 if the object does not exist.
    :rtype: int
    """

    statement = select(EventTag.id).where(and_(EventTag.event_id == event_id, EventTag.tag_id == tag_id))
    search_result: int | None = (await session.execute(statement)).scalar()
    if search_result:
        return search_result
    else:
        return -1

get_excluded_correlations(session) async

Method to get all values from correlation_exclusions table. :return: all values from correlation_exclusions table :rtype: list[str]

Source code in src/mmisp/worker/misp_database/misp_sql.py
120
121
122
123
124
125
126
127
async def get_excluded_correlations(session: AsyncSession) -> Sequence[str]:
    """
    Method to get all values from correlation_exclusions table.
    :return: all values from correlation_exclusions table
    :rtype: list[str]
    """
    statement = select(CorrelationExclusions.value)
    return (await session.execute(statement)).scalars().all()

get_number_of_correlations(session, value, only_over_correlating_table) async

Returns the number of correlations of value in the database. If only_over_correlating_table is True, only the value in the over_correlating_values table is returned. Else the number of correlations in the default_correlations table is returned Attention: It is assumed that the value is in the over_correlating_values table if only_over_correlating_table is True. :param value: to get number of correlations of :type value: str :param only_over_correlating_table: if True, only the value in the over_correlating_values table is returned :type only_over_correlating_table: bool :return: number of correlations of value in the database

Source code in src/mmisp/worker/misp_database/misp_sql.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
async def get_number_of_correlations(session: AsyncSession, value: str, only_over_correlating_table: bool) -> int:
    """
    Returns the number of correlations of value in the database. If only_over_correlating_table is True, only the
    value in the over_correlating_values table is returned. Else the number of  correlations in the
    default_correlations table is returned
    Attention: It is assumed that the value is in the over_correlating_values table if only_over_correlating_table
     is True.
    :param value: to get number of correlations of
    :type value: str
    :param only_over_correlating_table: if True, only the value in the over_correlating_values table is returned
    :type only_over_correlating_table: bool
    :return: number of correlations of value in the database
    """
    if only_over_correlating_table:
        statement = select(OverCorrelatingValue.occurrence).where(OverCorrelatingValue.value == value)
        result: int | None = (await session.execute(statement)).scalars().first()
        if result:
            return result
        raise ValueError(f"Value {value} not in over_correlating_values table")
    else:
        search_statement = select(CorrelationValue.id).where(CorrelationValue.value == value)
        value_id: int | None = (await session.execute(search_statement)).scalars().first()
        if value_id:
            statement = select(DefaultCorrelation.id).where(DefaultCorrelation.value_id == value_id)
            all_elements: Sequence = (await session.execute(statement)).scalars().all()
            return len(all_elements)
        else:
            return 0

get_over_correlating_values(session) async

Method to get all values from over_correlating_values table with their occurrence. :return: all values from over_correlating_values table with their occurrence :rtype: list[tuple[str, int]]

Source code in src/mmisp/worker/misp_database/misp_sql.py
110
111
112
113
114
115
116
117
async def get_over_correlating_values(session: AsyncSession) -> list[tuple[str, int]]:
    """
    Method to get all values from over_correlating_values table with their occurrence.
    :return: all values from over_correlating_values table with their occurrence
    :rtype: list[tuple[str, int]]
    """
    statement = select(OverCorrelatingValue.value, OverCorrelatingValue.occurrence)
    return cast(list[tuple[str, int]], (await session.execute(statement)).all())

get_post(session, post_id) async

Method to get a post from database. :param post_id: the id of the post to get :type post_id: int :return: the post with the given id :rtype: MispPost

Source code in src/mmisp/worker/misp_database/misp_sql.py
138
139
140
141
142
143
144
145
146
147
148
149
150
async def get_post(session: AsyncSession, post_id: int) -> Post:
    """
    Method to get a post from database.
    :param post_id: the id of the post to get
    :type post_id: int
    :return: the post with the given id
    :rtype: MispPost
    """
    statement = select(Post).where(Post.id == post_id)
    result: Post | None = (await session.execute(statement)).scalars().first()
    if result:
        return result
    raise ValueError(f"Post with ID {post_id} doesn't exist.")

get_values_with_correlation(session) async

" Method to get all values from correlation_values table. :return: all values from correlation_values table :rtype: list[str]

Source code in src/mmisp/worker/misp_database/misp_sql.py
 99
100
101
102
103
104
105
106
107
async def get_values_with_correlation(session: AsyncSession) -> list[str]:
    """ "
    Method to get all values from correlation_values table.
    :return: all values from correlation_values table
    :rtype: list[str]
    """
    statement = select(CorrelationValue.value)
    result: Sequence = (await session.execute(statement)).scalars().all()
    return list(result)

is_excluded_correlation(session, value) async

Checks if value is in correlation_exclusions table. :param value: to check :type value: str :return: True if value is in correlation_exclusions table, False otherwise :rtype: bool

Source code in src/mmisp/worker/misp_database/misp_sql.py
153
154
155
156
157
158
159
160
161
162
163
164
165
async def is_excluded_correlation(session: AsyncSession, value: str) -> bool:
    """
    Checks if value is in correlation_exclusions table.
    :param value: to check
    :type value: str
    :return: True if value is in correlation_exclusions table, False otherwise
    :rtype: bool
    """
    statement = select(CorrelationExclusions.id).where(CorrelationExclusions.value == value)
    result = (await session.execute(statement)).first()
    if result:
        return True
    return False

is_over_correlating_value(session, value) async

Checks if value is in over_correlating_values table. Doesn't check if value has more correlations in the database than the current threshold. :param value: to check :type value: str :return: True if value is in over_correlating_values table, False otherwise :rtype: bool

Source code in src/mmisp/worker/misp_database/misp_sql.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
async def is_over_correlating_value(session: AsyncSession, value: str) -> bool:
    """
    Checks if value is in over_correlating_values table. Doesn't check if value has more correlations in the
    database than the current threshold.
    :param value: to check
    :type value: str
    :return: True if value is in over_correlating_values table, False otherwise
    :rtype: bool
    """
    statement = select(OverCorrelatingValue).where(OverCorrelatingValue.value == value)
    result: OverCorrelatingValue | None = (await session.execute(statement)).scalars().first()
    if result:
        return True
    return False