Skip to content

E-Mail

alert_email_job(ctx, user, data) async

prepares an alert email by filling and rendering a template. afterward it will be sent to all specified users. :param user: the user who requested the job :type user: UserData :param data: contains data for the template and the user ids who will receive the emails. :type data: AlertEmailData

Source code in src/mmisp/worker/jobs/email/alert_email_job.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@queue.task()
@add_ajob_db_log
async def alert_email_job(ctx: WrappedContext[None], user: UserData, data: AlertEmailData) -> None:
    """
    prepares an alert email by filling and rendering a template. afterward it will be sent to all specified users.
    :param user: the user who requested the job
    :type user: UserData
    :param data: contains data for the template and the user ids who will receive the emails.
    :type data: AlertEmailData
    """
    assert sessionmanager is not None

    __TEMPLATE_NAME: str = "alert_email.j2"
    __SUBJECT: str = (
        "[MISP] event: {event_id} - event info: {event_info} - thread level: {thread_level_name} - {tag_name}"
    )
    config: EmailConfigData = EmailConfigData()
    environment: Environment = Environment(loader=FileSystemLoader(Path(p)), autoescape=select_autoescape())

    email_msg: EmailMessage = email.message.EmailMessage()

    async with sessionmanager.session() as session:
        misp_api = MispAPI(session)
        event: AddEditGetEventDetails = await misp_api.get_event(data.event_id)
        thread_level: str = await get_threat_level(session, event.threat_level_id)

        if event.sharing_group_id is not None:
            event_sharing_group: ShortSharingGroup | None = (
                await misp_api.get_sharing_group(event.sharing_group_id)
            ).SharingGroup
        else:
            event_sharing_group = None

        email_msg["From"] = config.mmisp_email_address
        email_msg["Subject"] = __SUBJECT.format(
            event_id=data.event_id,
            event_info=event.info,
            thread_level_name=thread_level,
            tag_name=UtilityEmail.get_email_subject_mark_for_event(event, config.email_subject_string),
        )

        template = environment.get_template(__TEMPLATE_NAME)
        email_msg.set_content(
            template.render(
                mmisp_url=config.mmisp_url,
                event=event,
                event_sharing_group=event_sharing_group,
                event_thread_level=thread_level,
                old_publish_timestamp=data.old_publish,
            )
        )

        await UtilityEmail.send_emails(
            misp_api,
            config.mmisp_email_address,
            config.mmisp_email_username,
            config.mmisp_email_password,
            config.mmisp_smtp_port,
            config.mmisp_smtp_host,
            data.receiver_ids,
            email_msg,
        )

contact_email_job(ctx, requester, data) async

Prepares a contact email by filling and rendering a template. Afterward it will be sent to all specified users. :param requester: is the user who wants to contact the users :type requester: UserData :param data: contains data for the template and the user ids who will receive the emails. :type data: ContactEmailData

Source code in src/mmisp/worker/jobs/email/contact_email_job.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@queue.task()
@add_ajob_db_log
async def contact_email_job(ctx: WrappedContext[None], requester: UserData, data: ContactEmailData) -> None:
    """
    Prepares a contact email by filling and rendering a template. Afterward it will be sent to all specified users.
    :param requester: is the user who wants to contact the users
    :type requester: UserData
    :param data: contains data for the template and the user ids who will receive the emails.
    :type data: ContactEmailData
    """
    assert sessionmanager is not None
    __TEMPLATE_NAME: str = "contact_email.j2"
    __SUBJECT: str = "Need info about event {event_id} - {tag_name}"

    config: EmailConfigData = EmailConfigData()
    environment: Environment = Environment(loader=FileSystemLoader(Path(p)), autoescape=select_autoescape())

    email_msg: EmailMessage = email.message.EmailMessage()

    async with sessionmanager.session() as session:
        misp_api = MispAPI(session)
        requester_misp: MispUser = await misp_api.get_user(requester.user_id)
        event: AddEditGetEventDetails = await misp_api.get_event(data.event_id)

        email_msg["From"] = config.mmisp_email_address
        email_msg["Subject"] = __SUBJECT.format(
            event_id=data.event_id,
            tag_name=UtilityEmail.get_email_subject_mark_for_event(event, config.email_subject_string),
        )

        template = environment.get_template(__TEMPLATE_NAME)
        email_msg.set_content(
            template.render(
                requestor_email=requester_misp.email,
                message=data.message,
                mmisp_url=config.mmisp_url,
                event_id=data.event_id,
            )
        )

        await UtilityEmail.send_emails(
            misp_api,
            config.mmisp_email_address,
            config.mmisp_email_username,
            config.mmisp_email_password,
            config.mmisp_smtp_port,
            config.mmisp_smtp_host,
            data.receiver_ids,
            email_msg,
        )

posts_email_job(ctx, user, data) async

Prepares a posts email by filling and rendering a template. Afterward it will be sent to all specified users. :param user: the user who requested the job :type user: UserData :param data: contains data for the template and the user ids who will receive the emails. :type data: PostsEmailData

Source code in src/mmisp/worker/jobs/email/posts_email_job.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@queue.task()
@add_ajob_db_log
async def posts_email_job(ctx: WrappedContext[None], user: UserData, data: PostsEmailData) -> None:
    """
    Prepares a posts email by filling and rendering a template. Afterward it will be sent to all specified users.
    :param user: the user who requested the job
    :type user: UserData
    :param data: contains data for the template and the user ids who will receive the emails.
    :type data: PostsEmailData
    """
    assert sessionmanager is not None
    __SUBJECT: str = "New post in discussion: {thread_id} - {tlp}"
    __TEMPLATE_NAME: str = "posts_email.j2"

    # environment: Environment = email_worker.environment
    # config: EmailConfigData = email_worker.config
    #    self.__misp_api: MispAPI = MispAPI()
    config: EmailConfigData = EmailConfigData()
    environment: Environment = Environment(loader=FileSystemLoader(Path(p)), autoescape=select_autoescape())

    email_msg: EmailMessage = email.message.EmailMessage()

    async with sessionmanager.session() as session:
        misp_api = MispAPI(session)
        post: Post = await get_post(session, data.post_id)

        email_msg["From"] = config.mmisp_email_address
        email_msg["Subject"] = __SUBJECT.format(thread_id=post.thread_id, tlp=config.email_subject_string)
        template = environment.get_template(__TEMPLATE_NAME)
        email_msg.set_content(
            template.render(
                title=data.title,
                mmisp_url=config.mmisp_url,
                thread_id=post.thread_id,
                post_id=data.post_id,
                message=data.message,
            )
        )

        await UtilityEmail.send_emails(
            misp_api,
            config.mmisp_email_address,
            config.mmisp_email_username,
            config.mmisp_email_password,
            config.mmisp_smtp_port,
            config.mmisp_smtp_host,
            data.receiver_ids,
            email_msg,
        )

AlertEmailData

Bases: BaseModel

Encapsulates the necessary data to send and create an alert email.

Source code in src/mmisp/worker/jobs/email/job_data.py
 6
 7
 8
 9
10
11
12
13
14
15
16
class AlertEmailData(BaseModel):
    """
    Encapsulates the necessary data to send and create an alert email.
    """

    receiver_ids: list[int]
    """The ids of the receivers"""
    event_id: int
    """The id of the event which triggered the alert"""
    old_publish: datetime
    """The timestamp of old publishing"""

event_id instance-attribute

The id of the event which triggered the alert

old_publish instance-attribute

The timestamp of old publishing

receiver_ids instance-attribute

The ids of the receivers

ContactEmailData

Bases: BaseModel

Encapsulates the necessary data to send and create a contact email.

Source code in src/mmisp/worker/jobs/email/job_data.py
19
20
21
22
23
24
25
26
27
28
29
class ContactEmailData(BaseModel):
    """
    Encapsulates the necessary data to send and create a contact email.
    """

    event_id: int
    """The id of the event which the user wants to know more about"""
    message: str
    """The custom message of the user"""
    receiver_ids: list[int]
    """The ids of the receivers"""

event_id instance-attribute

The id of the event which the user wants to know more about

message instance-attribute

The custom message of the user

receiver_ids instance-attribute

The ids of the receivers

PostsEmailData

Bases: BaseModel

Encapsulates the necessary data to send and create a posts email.

Source code in src/mmisp/worker/jobs/email/job_data.py
32
33
34
35
36
37
38
39
40
41
42
43
44
class PostsEmailData(BaseModel):
    """
    Encapsulates the necessary data to send and create a posts email.
    """

    post_id: int
    """The id of the post where something new was posted"""
    title: str
    """The title of the post"""
    message: str
    """The message which was posted at the post"""
    receiver_ids: list[int]
    """The ids of the receivers"""

message instance-attribute

The message which was posted at the post

post_id instance-attribute

The id of the post where something new was posted

receiver_ids instance-attribute

The ids of the receivers

title instance-attribute

The title of the post

SmtpClient

Provides methods to build an SMTP connection to send emails.

Source code in src/mmisp/worker/jobs/email/utility/smtp_client.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class SmtpClient:
    """
    Provides methods to build an SMTP connection to send emails.
    """

    def __init__(self: Self, host: str, port: int) -> None:
        """
        Initializes a new SMTP object.
        :param port: is the port of the SMTP server
        :type port: int
        :param host: is the host of the SMTP server
        :type host: str
        """
        self.__smtp: SMTP = smtplib.SMTP(host, port)

    def open_smtp_connection(self: Self, email: str, password: str) -> None:
        """
        Connects to the SMTP server and logs in with the misp email.
        If no password is given, the connection will be established without a password.
        :param email: is the email of misp
        :type email: str
        :param password: is the password of the email
        :type password: str
        """
        self.__smtp.ehlo()
        self.__smtp.starttls()
        self.__smtp.ehlo()
        self.__smtp.login(user=email, password=password)

    def send_email(self: Self, from_addr: str, to_addr: str, email: str) -> None:
        """
        Sends an email.
        :param from_addr: is the address of the sender (misp email9
        :type from_addr: str
        :param to_addr: is the address of the receiver (user)
        :type to_addr: str
        :param email: is the content of the email
        :type email: str
        """
        try:
            self.__smtp.sendmail(from_addr, to_addr, email)
        except smtplib.SMTPRecipientsRefused as e:
            log.warning(f"Email to {to_addr} was refused: {e}")

    def close_smtp_connection(self: Self) -> None:
        """
        Closes the SMTP Connection.
        """
        self.__smtp.close()

__init__(host, port)

Initializes a new SMTP object. :param port: is the port of the SMTP server :type port: int :param host: is the host of the SMTP server :type host: str

Source code in src/mmisp/worker/jobs/email/utility/smtp_client.py
14
15
16
17
18
19
20
21
22
def __init__(self: Self, host: str, port: int) -> None:
    """
    Initializes a new SMTP object.
    :param port: is the port of the SMTP server
    :type port: int
    :param host: is the host of the SMTP server
    :type host: str
    """
    self.__smtp: SMTP = smtplib.SMTP(host, port)

close_smtp_connection()

Closes the SMTP Connection.

Source code in src/mmisp/worker/jobs/email/utility/smtp_client.py
53
54
55
56
57
def close_smtp_connection(self: Self) -> None:
    """
    Closes the SMTP Connection.
    """
    self.__smtp.close()

open_smtp_connection(email, password)

Connects to the SMTP server and logs in with the misp email. If no password is given, the connection will be established without a password. :param email: is the email of misp :type email: str :param password: is the password of the email :type password: str

Source code in src/mmisp/worker/jobs/email/utility/smtp_client.py
24
25
26
27
28
29
30
31
32
33
34
35
36
def open_smtp_connection(self: Self, email: str, password: str) -> None:
    """
    Connects to the SMTP server and logs in with the misp email.
    If no password is given, the connection will be established without a password.
    :param email: is the email of misp
    :type email: str
    :param password: is the password of the email
    :type password: str
    """
    self.__smtp.ehlo()
    self.__smtp.starttls()
    self.__smtp.ehlo()
    self.__smtp.login(user=email, password=password)

send_email(from_addr, to_addr, email)

Sends an email. :param from_addr: is the address of the sender (misp email9 :type from_addr: str :param to_addr: is the address of the receiver (user) :type to_addr: str :param email: is the content of the email :type email: str

Source code in src/mmisp/worker/jobs/email/utility/smtp_client.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def send_email(self: Self, from_addr: str, to_addr: str, email: str) -> None:
    """
    Sends an email.
    :param from_addr: is the address of the sender (misp email9
    :type from_addr: str
    :param to_addr: is the address of the receiver (user)
    :type to_addr: str
    :param email: is the content of the email
    :type email: str
    """
    try:
        self.__smtp.sendmail(from_addr, to_addr, email)
    except smtplib.SMTPRecipientsRefused as e:
        log.warning(f"Email to {to_addr} was refused: {e}")

EmailConfigData

Bases: BaseSettings

Encapsulates configuration for the email worker and its jobs.

Source code in src/mmisp/worker/jobs/email/utility/email_config_data.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class EmailConfigData(BaseSettings):
    """
    Encapsulates configuration for the email worker and its jobs.
    """

    mmisp_url: str = Field("http://127.0.0.1", validation_alias=ENV_URL)
    """The url of MISP"""
    email_subject_string: str = Field("tlp", validation_alias=ENV_EMAIL_SUBJECT_STRING)
    """The tlp string to search for an email subject"""
    mmisp_email_address: str = Field("misp@localhost", validation_alias=ENV_EMAIL_ADDRESS)
    """The email of MISP"""
    mmisp_email_username: str = Field("misp", validation_alias=ENV_EMAIL_USERNAME)
    """The username of the MISP email"""
    mmisp_email_password: str = Field("", validation_alias=ENV_EMAIL_PASSWORD)
    """The password of the MISP email"""
    mmisp_smtp_port: NonNegativeInt = Field(25, validation_alias=ENV_SMTP_PORT)
    """The port of the SMTP server"""
    mmisp_smtp_host: str = Field("localhost", validation_alias=ENV_SMTP_HOST)
    """The host of the SMTP server"""

email_subject_string = Field('tlp', validation_alias=ENV_EMAIL_SUBJECT_STRING) class-attribute instance-attribute

The tlp string to search for an email subject

mmisp_email_address = Field('misp@localhost', validation_alias=ENV_EMAIL_ADDRESS) class-attribute instance-attribute

The email of MISP

mmisp_email_password = Field('', validation_alias=ENV_EMAIL_PASSWORD) class-attribute instance-attribute

The password of the MISP email

mmisp_email_username = Field('misp', validation_alias=ENV_EMAIL_USERNAME) class-attribute instance-attribute

The username of the MISP email

mmisp_smtp_host = Field('localhost', validation_alias=ENV_SMTP_HOST) class-attribute instance-attribute

The host of the SMTP server

mmisp_smtp_port = Field(25, validation_alias=ENV_SMTP_PORT) class-attribute instance-attribute

The port of the SMTP server

mmisp_url = Field('http://127.0.0.1', validation_alias=ENV_URL) class-attribute instance-attribute

The url of MISP

UtilityEmail

Provides functionality to built emails.

Source code in src/mmisp/worker/jobs/email/utility/utility_email.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class UtilityEmail:
    """
    Provides functionality to built emails.
    """

    @staticmethod
    def get_email_subject_mark_for_event(event: AddEditGetEventDetails, email_subject_string: str) -> str:
        """
        Returns the tlp tag of the given event as a subject for emails.

        :param event: the event to get the subject for
        :type event: AddEditGetEventDetails
        :param email_subject_string: is the tlp string to search
        :type email_subject_string: str
        :return: the tlp tag of the event
        :rtype: str
        """
        if event.Tag is not None:
            for tag in event.Tag:
                if email_subject_string in tag.name:
                    return tag.name

        return email_subject_string

    @staticmethod
    async def send_emails(
        misp_api: MispAPI,
        misp_email_address: str,
        email_username: str,
        email_password: str,
        smtp_port: int,
        smtp_host: str,
        receiver_ids: list[int],
        email_msg: EmailMessage,
    ) -> None:
        """
        Sends emails to the given users by opening an SMTP connection

        :param misp_email_address: is the email of misp
        :type misp_email_address: str
        :param email_username: is the username of misp
        :type email_username: str
        :param email_password: is the password of misp
        :type email_password: str
        :param smtp_port: is the port of the SMTP server
        :type smtp_port: int
        :param smtp_host: is the host of the SMTP server
        :type smtp_host: str
        :param receiver_ids: are the ids of the users who get the email
        :type receiver_ids: list[int]
        :param email_msg: is the email which will be sent
        :type email_msg: EmailMessage
        """
        smtp_client: SmtpClient = SmtpClient(smtp_host, smtp_port)

        smtp_client.open_smtp_connection(email_username, email_password)

        for receiver_id in receiver_ids:
            user: MispUser = await misp_api.get_user(receiver_id)
            email_msg["To"] = user.email
            smtp_client.send_email(misp_email_address, user.email, email_msg.as_string())
            del email_msg["To"]

        smtp_client.close_smtp_connection()

get_email_subject_mark_for_event(event, email_subject_string) staticmethod

Returns the tlp tag of the given event as a subject for emails.

:param event: the event to get the subject for :type event: AddEditGetEventDetails :param email_subject_string: is the tlp string to search :type email_subject_string: str :return: the tlp tag of the event :rtype: str

Source code in src/mmisp/worker/jobs/email/utility/utility_email.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@staticmethod
def get_email_subject_mark_for_event(event: AddEditGetEventDetails, email_subject_string: str) -> str:
    """
    Returns the tlp tag of the given event as a subject for emails.

    :param event: the event to get the subject for
    :type event: AddEditGetEventDetails
    :param email_subject_string: is the tlp string to search
    :type email_subject_string: str
    :return: the tlp tag of the event
    :rtype: str
    """
    if event.Tag is not None:
        for tag in event.Tag:
            if email_subject_string in tag.name:
                return tag.name

    return email_subject_string

send_emails(misp_api, misp_email_address, email_username, email_password, smtp_port, smtp_host, receiver_ids, email_msg) async staticmethod

Sends emails to the given users by opening an SMTP connection

:param misp_email_address: is the email of misp :type misp_email_address: str :param email_username: is the username of misp :type email_username: str :param email_password: is the password of misp :type email_password: str :param smtp_port: is the port of the SMTP server :type smtp_port: int :param smtp_host: is the host of the SMTP server :type smtp_host: str :param receiver_ids: are the ids of the users who get the email :type receiver_ids: list[int] :param email_msg: is the email which will be sent :type email_msg: EmailMessage

Source code in src/mmisp/worker/jobs/email/utility/utility_email.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@staticmethod
async def send_emails(
    misp_api: MispAPI,
    misp_email_address: str,
    email_username: str,
    email_password: str,
    smtp_port: int,
    smtp_host: str,
    receiver_ids: list[int],
    email_msg: EmailMessage,
) -> None:
    """
    Sends emails to the given users by opening an SMTP connection

    :param misp_email_address: is the email of misp
    :type misp_email_address: str
    :param email_username: is the username of misp
    :type email_username: str
    :param email_password: is the password of misp
    :type email_password: str
    :param smtp_port: is the port of the SMTP server
    :type smtp_port: int
    :param smtp_host: is the host of the SMTP server
    :type smtp_host: str
    :param receiver_ids: are the ids of the users who get the email
    :type receiver_ids: list[int]
    :param email_msg: is the email which will be sent
    :type email_msg: EmailMessage
    """
    smtp_client: SmtpClient = SmtpClient(smtp_host, smtp_port)

    smtp_client.open_smtp_connection(email_username, email_password)

    for receiver_id in receiver_ids:
        user: MispUser = await misp_api.get_user(receiver_id)
        email_msg["To"] = user.email
        smtp_client.send_email(misp_email_address, user.email, email_msg.as_string())
        del email_msg["To"]

    smtp_client.close_smtp_connection()