Skip to content

Controller

Controller

log = logging.getLogger(__name__) module-attribute

Encapsulates the logic of the API for the worker router

add_queue_to_worker(id, queue_name) async

Adds a queue to the worker specified by the id.

Parameters:

Name Type Description Default
id str

The id of the worker.

required
queue_name str

The name of the queue to be added.

required

Returns:

Type Description
None

None

Source code in src/mmisp/worker/controller/worker_controller.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
async def add_queue_to_worker(id: str, queue_name: str) -> None:
    """
    Adds a queue to the worker specified by the id.

    Args:
        id (str): The id of the worker.
        queue_name (str): The name of the queue to be added.

    Returns:
        None
    """
    answer = await connection_manager.send_msg_and_wait(id, "add_queue", queue_name=queue_name)
    if answer == "Timeout":
        raise RuntimeError("Connection to node timeout")
    return answer["msg"]

check_worker_name(name)

Check if the worker name is valid

Parameters:

Name Type Description Default
name str

The name of the worker.

required

Returns:

Name Type Description
bool bool

True if the worker name is not valid, otherwise False.

Source code in src/mmisp/worker/controller/worker_controller.py
195
196
197
198
199
200
201
202
203
204
205
206
def check_worker_name(name: str) -> bool:
    """
    Check if the worker name is valid

    Args:
        name (str): The name of the worker.

    Returns:
        bool: True if the worker name is not valid, otherwise False.

    """
    return name in connection_manager.active_connections.keys()

get_worker_jobqueues(name) async

Get a list of all job queues for the worker specified by the id.

Parameters:

Name Type Description Default
name str

The id of the worker.

required

Returns:

Type Description
list[GetWorkerJobqueue]

list[GetWorkerJobqueue]: A list of job queue objects for the worker.

Source code in src/mmisp/worker/controller/worker_controller.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
async def get_worker_jobqueues(name: str) -> list[GetWorkerJobqueue]:
    """
    Get a list of all job queues for the worker specified by the id.

    Args:
        name (str): The id of the worker.

    Returns:
        list[GetWorkerJobqueue]: A list of job queue objects for the worker.

    """
    jobqueus: list[GetWorkerJobqueue] = []
    activ_queues = await get_worker_queues(name)
    for queue in JobEnum:
        if queue.value not in activ_queues:
            jobqueus.append(GetWorkerJobqueue(name=queue.value, activ="inactive"))
        else:
            jobqueus.append(GetWorkerJobqueue(name=queue.value, activ="active"))
    return jobqueus

get_worker_list() async

Retrieves a list of all workers along with their status, queues, and job count.

Returns:

Type Description
list[GetWorkerWorkers]

list[GetWorkerWorkers]: A list of worker objects with their status, queues, and job counts.

Source code in src/mmisp/worker/controller/worker_controller.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
async def get_worker_list() -> list[GetWorkerWorkers]:
    """
    Retrieves a list of all workers along with their status, queues, and job count.

    Returns:
        list[GetWorkerWorkers]: A list of worker objects with their status, queues, and job counts.

    """
    print(connection_manager.active_connections.keys())
    worker_queues = await connection_manager.send_all_msg_and_wait("currently_listened_queues")
    return [
        GetWorkerWorkers(name=client_id, status="active", queues=queues["msg"], jobCount=-1)
        for client_id, queues in worker_queues.items()
        if queues != "Timeout"
    ]

get_worker_queues(name) async

Returns the active queues of the specified worker.

Parameters:

Name Type Description Default
name str

The name of the worker.

required

Returns:

Type Description
list[str]

list[str]: A list of active queues for the worker

Source code in src/mmisp/worker/controller/worker_controller.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
async def get_worker_queues(name: str) -> list[str]:
    """
    Returns the active queues of the specified worker.

    Args:
        name (str): The name of the worker.

    Returns:
        list[str]: A list of active queues for the worker

    """
    answer = await connection_manager.send_msg_and_wait(name, "currently_listened_queues")
    if answer == "Timeout":
        raise RuntimeError("Connection to node timeout")
    return answer["msg"]

pause_worker(**kwargs) async

Pauses a worker by removing all queues from the workers specified in the names list, preventing jobs from being executed. If names not set than all workers are addressed.

Parameters:

Name Type Description Default
**kwargs

The names of the workers as a list[str].

{}
Source code in src/mmisp/worker/controller/worker_controller.py
81
82
83
84
85
86
87
88
89
90
async def pause_worker(**kwargs) -> None:
    """
    Pauses a worker by removing all queues from the workers specified in the names list,
    preventing jobs from being executed. If names not set than all workers are addressed.

    Args:
        **kwargs: The names of the workers as a list[str].
    """
    workers = kwargs.get("names", None)
    await connection_manager.send_all_msg_and_wait("remove_all_queues", client_ids=workers)

remove_queue_from_worker(id, queue_name) async

Removes a queue from the worker specified by the id.

Parameters:

Name Type Description Default
id str

The id of the worker.

required
queue_name str

The name of the queue to be removed.

required

Returns:

Type Description
None

None

Source code in src/mmisp/worker/controller/worker_controller.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
async def remove_queue_from_worker(id: str, queue_name: str) -> None:
    """
    Removes a queue from the worker specified by the id.

    Args:
        id (str): The id of the worker.
        queue_name (str): The name of the queue to be removed.

    Returns:
        None

    """
    answer = await connection_manager.send_msg_and_wait(id, "remove_queue", queue_name=queue_name)
    if answer == "Timeout":
        raise RuntimeError("Connection to node timeout")
    return answer["msg"]

reset_worker_queues(**kwargs) async

Adds all queues back to the worker specified in the names list. If names not set than all workers are addressed.

Parameters:

Name Type Description Default
**kwargs

The names of the workers as a list[str].

{}
Source code in src/mmisp/worker/controller/worker_controller.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
async def reset_worker_queues(**kwargs) -> None:
    """
    Adds all queues back to the worker specified in the names list.
    If names not set than all workers are addressed.

    Args:
         **kwargs: The names of the workers as a list[str].
    """
    workers = kwargs.get("names", None)
    await connection_manager.send_all_msg_and_wait("reset_queues", client_ids=workers)

cancel_job(queue, job_id) async

Revokes a given job.

Parameters:

Name Type Description Default
job_id str

The ID of the job

required

Returns:

Type Description
bool

Whether the revoke action was successful.

Source code in src/mmisp/worker/controller/job_controller.py
44
45
46
47
48
49
50
51
52
53
54
55
56
async def cancel_job(queue: Worker, job_id: str) -> bool:
    """
    Revokes a given job.

    Args:
      job_id: The ID of the job

    Returns:
      Whether the revoke action was successful.
    """

    async with queue:
        return await queue.abort_by_id(job_id)

create_job(queue, function, *args, **kwargs) async

Enqueues a given task.

Parameters:

Name Type Description Default
queue Worker

The Queue to enqueue the task in

required
function RegisteredTask

The function to enqueue

required
args

Arguments passed to the job.

()
kwargs

Arguments passed to the job.

{}

Returns:

Type Description
CreateJobResponse

The job_id of the created job and a success status.

Source code in src/mmisp/worker/controller/job_controller.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
async def create_job(queue: Worker, function: RegisteredTask, *args, **kwargs) -> CreateJobResponse:
    """
    Enqueues a given task.

    Args:
        queue: The Queue to enqueue the task in
        function: The function to enqueue
        args: Arguments passed to the job.
        kwargs: Arguments passed to the job.

    Returns:
        The job_id of the created job and a success status.
    """
    async with queue:
        task = await function.enqueue(*args, **kwargs)
        return CreateJobResponse(job_id=task.id, success=True)

get_job_result(queue, job_id) async

Returns the result of the specified job

Parameters:

Name Type Description Default
job_id str

is the id of the job

required

Returns:

Type Description
Any

The Task result, i.e. the return of job executed

Source code in src/mmisp/worker/controller/job_controller.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
async def get_job_result(queue: Worker, job_id: str) -> Any:
    """
    Returns the result of the specified job

    Args:
      job_id: is the id of the job

    Returns:
      The Task result, i.e. the return of job executed
    """
    async with queue:
        job_status = await queue.status_by_id(job_id)

        if job_status != TaskStatus.DONE:
            raise JobNotFinishedException

        job_result = await queue.result_by_id(job_id)
        return job_result.result

get_job_status(queue, job_id) async

Returns the status of the given job.

Parameters:

Name Type Description Default
job_id str

The ID of the job.

required

Returns:

Type Description
TaskStatus

The status of the job.

Source code in src/mmisp/worker/controller/job_controller.py
10
11
12
13
14
15
16
17
18
19
20
21
async def get_job_status(queue: Worker, job_id: str) -> TaskStatus:
    """
    Returns the status of the given job.

    Args:
      job_id: The ID of the job.

    Returns:
      The status of the job.
    """
    async with queue:
        return await queue.status_by_id(job_id)