Guided Tour 🥾
This document provides a guided tour of Alsek.
Please feel free to feed the functions. 🙂
Backends
Alsek currently provides 'out of the box' support for Redis
.
Redis
from alsek.storage.backends.redis import RedisBackend
# Note: by default, `RedisBackend()` will attempt to
# connect to an instance of Redis running on localhost.
backend = RedisBackend()
Lazy Initialization
The RedisBackend
supports lazy initialization.
In this mode, an attempt to establish a connection with the database will be
deferred until the first time it is absolutely needed (e.g., for a SET
, GET
,
DELETE
, etc.). This can be useful in applications such as REST APIs where the
backend may not be available precisely when the application boots up. A small example
of this mode is provided below.
from redis import Redis
from alsek.storage.backends import LazyClient
from alsek.storage.backends.redis import RedisBackend
# Initialize the backend with LazyClient
lazy_backend = RedisBackend(LazyClient(lambda: Redis()))
# Run an operation.
# This will cause LazyClient to evaluate the
# lambda and return a usable client connection.
lazy_backend.count("queues:math_ops")
Namespaces
Backends store all of their data in a predefined namespace. This is simply a prefix that will be prepended to all "keys" (names) the backend writes to storage.
Serializers
Prior to being written into the backend, Alsek requires that data is first serialized. When data is read from the backend, it is deserialized prior to use.
By default, Alsek uses JSON serialization, as implemented in
alsek.storage.serialization.JsonSerializer()
. However, another
approach can be used, provided it is supported by the backend.
To use a different serialization procedure, one must:
-
create a new from serializer on top of the base
Serializer()
and -
pass the new serializer to the relevant backend at initialization time.
Messages
Messages are the "lingua franca" of Alsek. This is because the data they contain can be "understood" by all parts of the library from brokers and result stores, to tasks and worker pools.
Properties
Messages have a lot of helpful functionality, the most notable of which is explored here.
from alsek import Broker, task
from alsek.storage.backends.redis.standard import RedisBackend
broker = Broker(RedisBackend())
@task(broker)
def simple_task() -> int:
return 99
message = simple_task.generate()
First, let's print the message:
from alsek import Message
Message(
task_name='simple_task',
queue='alsek_queue',
args=(),
kwargs={},
metadata=None,
exception_details=None,
result_ttl=None,
uuid='67de0e4c-9816-11eb-8cb0-acde48001122',
progenitor_uuid=None,
retries=0,
timeout=3600000,
created_at='2021-04-08 05:59:16.424000',
updated_at='2021-04-08 05:59:16.424000',
delay=0,
previous_result=None,
previous_message_uuid=None,
callback_message_data=None,
backoff_settings={'algorithm': 'ExponentialBackoff', 'parameters': {'base': 4, 'factor': 10000, 'floor': 60000, 'ceiling': 3600000, 'zero_override': True}},
mechanism='thread',
)
Next, we can take a quick look at the key properties of the message
object.
# Dictionary containing all of the data that can be,
# and will be, persisted to the backend by the broker.
message.data
# Output: {'task_name': 'simple_task', ...}
# High-level summary of the message
message.summary
# Output: Message(uuid='67de0e4c-9816-11eb-8cb0-acde48001122', queue='alsek_queue', task='simple_task')
# UTC Timestamp for when the message will next be ready for processing
message.ready_at
# Output: 1617861556424
# Time until the message is ready for processing
message.ttr
# Output: 0
# UUIDs which have descended or will descend from this message
message.descendant_uuids
# Output: None
# Whether the message is ready for processing
message.ready
# Output: True
Brokers
A message broker is responsible for adding and managing tasks on the backend.
In Alsek, the Broker
class provides the following main methods:
exists()
: whether a message exists on the backendsumbit()
: submit a message to a queueretry()
: retry a message on a queueremove()
: remove a message from the backendfail()
: remove a message, and move it to the Dead Letter Queue (DQL), if enabled
Notably, Broker
also exposes:
ack()
: acknowledge the message. (This is a convenience method and is functionally the same asremove()
.)
Tasks
In this section we will take a closer look at the capabilities of Alsek tasks.
Note
The original behaviour of a function is conserved after it has been
decorated with task
. This is illustrated in the example below.
from alsek import task
@task(...)
def add(a: int, b: int) -> int:
return a + b
assert add(1, b=1) == 2 # True
Mechanisms
Tasks can be executed on a Worker Pool using either a 'thread'
or 'process'
mechanism.
We can specify which mechanism the worker pool should use when we construct the task.
from alsek import task
@task(..., mechanism="thread")
def my_task() -> int:
return 99
While the default mechanism is 'process'
, threads can be used in cases where
lower overhead is desirable, or the task in question is largely I/O bound.
Danger
Many implementations of Python use a Global Interpreter Lock
(GIL), including the most common one: CPython. In such implementations,
only one thread can do work at any one time within any given Python process. As a consequence, using
mechanism="thread"
carries the risk of interfering with the threads used by the Worker Pool itself (see below).
In the worst case, a task specified with mechanism="thread"
may never relinquish the GIL and, as a result, the
process in which that thread will cease to function (along with any other tasks running in threads in that process).
While this problem is relatively uncommon in practice, it is an important risk to keep in mind when building your application.
Timeouts
All Alsek tasks must include a timeout
(in milliseconds), which is used to
safeguard against hanging tasks. The default timeout is 3,600,000 milliseconds (1 hour).
Tasks which exceed their timeout will be shutdown by the worker
pool. If a task is eligible to be retried (see below) against a TimeoutError
,
it will be. Otherwise, the corresponding message will be failed and deleted.
from alsek import task
@task(..., timeout=90 * 1000) # lower timeout to 90 seconds
def my_task() -> int:
return 99
Warning
Enforcement of timeouts for tasks which use mechanism='thread'
is only available
for CPython (see below).
Warning
Timeout enforcement for tasks which use mechanism='thread'
is not as reliable as it
is for tasks which use mechanism='process'
. This is because Alsek effectuates timeouts in
thread tasks by asynchronously setting a TimeoutError
inside them. In order for this error
to be raised, the thread in question must first acquire the GIL (see above). If the thread
never acquires the GIL, the error will never be raised. Conversely, Alsek implements timeouts
for process tasks by directly terminating them, which is generally extremely reliable. The reasons
for this dichotomy are beyond Alsek's control and stem from the implementation details of CPython
itself.
As above, while this problem is relatively uncommon in practice, it is an important risk to keep in mind when building your application.
Priority
As with timeouts, priority values can be set for each message.
Let's take a look at an example.
from alsek import task
@task(..., queue="my_queue")
def task_a() -> str:
return "A!"
message_1 = task_a.generate(priority=1)
message_2 = task_a.generate(priority=0)
In Alsek, priority
is inverted. That is, lower integers correspond to higher priority.
Thus, in the example above, message_2
will take priority over message_1
.
Note
Alsek implements intra-queue message priority. In other words, priority is enforced within, but not between, queues (which themselves can be prioritized).
Triggers
Alsek supports cron, date as well as interval triggers. Let's explore this using the example below.
from alsek import Broker, task
from alsek.storage.backends.redis.standard import RedisBackend
from apscheduler.triggers.interval import IntervalTrigger
broker = Broker(RedisBackend())
@task(broker, trigger=IntervalTrigger(hours=1))
def check_system_usage() -> int:
return 99
The result will be a special of a type: TriggerTask
.
Like Task
, we can generate an instance of the task by
calling generate()
.
message = check_system_usage.generate()
The task will now be submitted to the broker every hour for as long as the Python process in which it was created is alive.
There are three main ways we can interrupt these kinds of tasks.
First, we can pause the scheduler:
check_resource_usage.pause()
(Note that this can be undone by running check_resource_usage.resume()
.)
Second, we can clear the scheduled task:
check_resource_usage.clear()
Finally, we can shut down the underlying scheduler:
check_resource_usage.shutdown()
Warning
Function parameters are not permitted for tasks which use a trigger.
Message Passing
The message itself will be passed to task
s which include a message
parameter.
from alsek import Broker, Message, task
from alsek.storage.backends.redis.standard import RedisBackend
broker = Broker(RedisBackend())
@task(broker)
def my_task(message: Message) -> None: # note: type hints are optional
print(message.uuid)
Warning
The message will not be passed to the task if:
- a "message" key is included in
kwargs
, e.g.,my_task.generate(kwargs={"message": "string"})
- a type hint that does not resolve to
Message
is used formessage
.
Callbacks
When a task completes, another task can be automatically triggered through the use of a callback.
To see this, let's contrive two simple tasks: add_1()
and print_result()
:
from alsek import Broker, Message, task
from alsek.storage.backends.redis.standard import RedisBackend
broker = Broker(RedisBackend())
@task(broker)
def add_1(number: int) -> int:
return number + 1
@task(broker)
def print_result(message: Message) -> None:
print(message.previous_result)
In order to make print_result()
execute when add_1()
completes, we simply need to pass a generated message to callback
.
It's also advisable to set submit=False
so that submission to the broker is
deferred until after the first message completes.
add_1.generate(
args=(1,),
callback=print_result.generate(submit=False)
)
As a convenience, we can also use deferred mode, which instructs
the next call of generate()
to skip submitting the message to
the broker.
add_1.generate(
args=(1,),
callback=print_result.defer().generate()
)
Nested
Callbacks of arbitrary depth are also supported.
To see this, let's add another task into the mix.
@task(broker)
def add_1_previous(message: Message) -> int:
return message.previous_result + 1
add_1.generate(
args=(1,),
callback=(
add_1_previous.defer().generate(
callback=add_1_previous.defer().generate(
callback=add_1_previous.defer().generate(
callback=print_result.defer().generate()
)
),
)
),
)
While the code above will "work", it is very difficult to read.
A better solution is to use a tuple
of messages.
add_1.generate(
args=(1,),
callback=(
add_1_previous.defer().generate(),
add_1_previous.defer().generate(),
add_1_previous.defer().generate(),
print_result.defer().generate()
),
)
Internally, a message nesting procedure will be run against the tuple passed to callback
.
As a result, the two different approaches to multiple callbacks shown above are functionally
identical.
Note
The internal process described above for nesting a flat
tuple of callbacks will update the callback_message_data
fields in the original messages.
Note
Deferred mode is automatically cancelled by generate()
prior to it returning.
Note
The progenitor for a callback message is considered to be the root callback.
Warning
Each callback message's previous_result
and progenitor_uuid
fields
will be set on the worker pool after successful execution of the
previous message, and are not available prior to this.
Danger
While it is valid for a task with a trigger to have callbacks, callbacks should not include tasks with triggers.
Control
If a callback is present for a message it will be executed by default. However, it is possible to override this behaviour by reasoning about the original message itself, the result of the task or both.
from typing import Any
from alsek import Message
from alsek.core.task import Task, task
class CustomTask1(Task):
def do_callback(self, message: Message, result: Any) -> bool:
if result > 1:
return True
else:
return False
@task(..., base_task=CustomTask1)
def simple_task() -> int:
return 99
Warning
The do_callback()
method is only evaluated for messages which contain a callback.
Pre/Post Ops
The pre_op()
and post_op()
methods of Task
can be used to
perform operations before and/or after the function
itself executes, respectively.
To do this, a new base_task
must be created.
from alsek import Message
from alsek.core.task import Task, task
class CustomTask2(Task):
def pre_op(self, message: Message) -> None:
print(f"About to process {message.summary}!")
def post_op(self, message: Message, result: Any) -> None:
print(f"Processed {message.summary} and got '{result}'!")
@task(..., base_task=CustomTask2)
def simple_task() -> int:
return 99
Retries
The number of times a task will be retried is determined by
max_retries
by default. In cases where this is not sufficiently
sophisticated to determine if message should be retried, the do_retry()
method of the Task
class can be overridden.
from alsek import Message
from alsek.core.task import Task, task
class CustomTask3(Task):
def do_retry(self, message: Message, exception: BaseException) -> bool:
if isinstance(exception, ZeroDivisionError):
return False
elif self.max_retries is None:
return True
else:
return message.retries < self.max_retries
@task(..., base_task=CustomTask3)
def simple_task() -> int:
return 99
Backoff
Rather than reprocessing a task immediately after it fails,
Alsek uses a backoff procedure. By default, ExponentialBackoff()
is used with "sensible" defaults. However, the type of backoff algorithm
as well as its parameters are extremely customizable.
from alsek import task
from alsek.core.backoff import (
ConstantBackoff,
ExponentialBackoff,
LinearBackoff,
)
@task(..., backoff=ConstantBackoff(constant=30 * 1000))
def task_a() -> int:
return 99
@task(..., backoff=LinearBackoff(factor=30 * 1000))
def task_b() -> int:
return 99
@task(..., backoff=ExponentialBackoff(ceiling=90 * 1000))
def task_c() -> int:
return 99
Note
Backoff duration is determined by the number of 'incidents'. Here, an incident is a failed attempt to process the message.
Note
Setting backoff=None
is functionally equivalent to
ConstantBackoff(constant=0, floor=0, ceiling=0, zero_override=True)
.
Status Tracking
The status of tasks can be tracked using StatusTracker()
.
from alsek import Broker, StatusTracker, task
from alsek.storage.backends.redis import RedisBackend
backend = RedisBackend("<connection_url>")
broker = Broker(backend)
status_tracker = StatusTracker(backend)
@task(broker, status_tracker=status_tracker)
def sum_n(n: int) -> int:
return int(n * (n + 1) / 2)
message = sum_n.generate(kwargs={"n": 100})
The status can be checked using .get()
:
status_tracker.get(message)
and can be any one of the following:
<TaskStatus.UNKNOWN: 0>
<TaskStatus.SUBMITTED: 1>
<TaskStatus.RUNNING: 2>
<TaskStatus.RETRYING: 3>
<TaskStatus.FAILED: 4>
<TaskStatus.SUCCEEDED: 5>
Note
StatusTracker()
can be paired with StatusTrackerIntegryScanner()
, which
will periodically scan for message statuses which have become invalid. Specifically,
a scan will be performed to check for messages with statuses which are non-terminal (i.e., not
TaskStatus.FAILED
or TaskStatus.SUCCEEDED
) and no longer exist
in the broker. Any messages meeting these criteria will have their status
updated to TaskStatus.UNKNOWN
. Status information can become corrupt in
this way in cases where a worker pool is unable to update the message status
before exiting (i.e., in the event of an ungraceful shutdown) and the message
is never subsequently retried.
The frequency of status integrity scans can be changed by altering the
trigger
parameter of StatusTrackerIntegryScanner()
..
Result Storage
Task results can be persisted to a backend
using ResultStore()
.
from typing import Dict
from alsek import Broker, task
from alsek.storage.backends.redis import RedisBackend
from alsek.storage.result import ResultStore
backend = RedisBackend("<connection_url>")
broker = Broker(backend)
result_store = ResultStore(backend)
@task(broker, result_store=result_store)
def valuable_output() -> Dict[str, int]:
return {"a": 1, "b": 2, "c": 3}
Warning
In order for data to be persisted via result_store
, it must be
of a type supported by the backend
's serializer
.
Warning
By default, results are automatically deleted once they are fetched.
To disable this behavior, set keep=True
when invoking get()
.
Triggers & Result Storage
We can request result storage for tasks with triggers, just as we did with a standard task above.
However, fetching the results of a task requires us to know its uuid
. While it is possible
to collect this information (e.g., via pre_op()
or post_op()
), it is often far easier to
simply store the progenitor message or, at the least, its uuid
. With this information,
we can obtain all descendant messages.
from random import randint
from apscheduler.triggers.interval import IntervalTrigger
from alsek import Broker, task
from alsek.storage.backends.redis.standard import RedisBackend
from alsek.storage.result import ResultStore
backend = RedisBackend()
broker = Broker(backend)
result_storage = ResultStore(backend)
@task(broker, result_store=result_storage, trigger=IntervalTrigger(seconds=10))
def harvest_data() -> int:
data = randint(0, 100)
return data
# Start
message = harvest_data.generate()
# Get all of the results as a list. By setting `descendants=True` we
# will also data for any descendant messages which have completed.
results = result_storage.get(message, timeout=30 * 1000, descendants=True)
print(results)
# [3, 5, 88, ...]
Note
Metadata for each result can be included by specifying with_metadata=True
.
Warning
The order of results when descendants=True
is determined by the
time at which the data was written to the backend, not when the corresponding task
completed. While this difference is usually very small, if this is
not appropriate for your application, you must include timestamp information
in the output of the task function and re-sort the results accordingly.
Result Iteration
The ResultPool()
class provides an intuitive means of iterating over
stored results. To see how, we can define a task, just as we have done
several times before, and create an instance of the ResultPool()
class.
from alsek import Broker, task
from alsek.storage.backends.redis import RedisBackend
from alsek.storage.result import ResultStore
from alsek.tools import ResultPool
backend = RedisBackend("<connection_url>")
broker = Broker(backend)
result_store = ResultStore(backend)
result_pool = ResultPool(result_store)
@task(broker, result_store=result_store)
def sum_n(n: int) -> int:
return int(n * (n + 1) / 2)
From here we can use the istream()
method of result_pool
to
iterate over message results as they become available.
for message, result in result_pool.istream(
sum_n.generate(kwargs=dict(n=10)),
sum_n.generate(kwargs=dict(n=100)),
sum_n.generate(kwargs=dict(n=1000)),
descendants=False, # enable if any messages contain callbacks
):
print(f"The result of message '{message.uuid}' is {result}.")
Result pools can be used in applications, or for interactive distributed computing.
Note
If your use case requires a guarantee that results will be yielded
in the same order in which the messages were provided, use the stream()
method instead.
Concurrency
Alsek's concurrency Lock()
provides a straightforward way limit
simultaneity across a distributed application to a single task, as shown here:
from alsek import Lock, task
from alsek.storage.backends.redis.standard import RedisBackend
backend = RedisBackend()
@task(...)
def send_data() -> None:
with Lock("send_data", backend=backend) as lock:
if lock.acquire(strict=False):
print("Sending data...")
else:
print("Failed to acquire lock")
Consumers
As their name suggests, consumers pull messages inserted by the broker onto workers. A concurrency lock (similar to what is shown above) is used to ensure than one, and only one, consumer can hold a message at any given time.
Standard use of Alsek does not typically entail direct interaction with consumers, as they are managed by Worker Pools (see below). However, in the interest of completeness, an illustrative example of working with consumers is provided below.
from alsek import Broker
from alsek.core.consumer import Consumer
from alsek.storage.backends.redis import RedisBackend
broker = Broker(RedisBackend())
consumer = Consumer(broker)
for message in consumer.stream():
print(f"Got {message.summary}")
Note
Consumers backoff following one or more passes over the backend that
do not yield any ready messages. By default, LinearBackoff()
is used.
Worker Pools
Alsek provides two distinct worker pool implementations for processing tasks: ThreadWorkerPool
and ProcessWorkerPool
. Each offers different performance characteristics and scaling capabilities.
Thread Worker Pool
The ThreadWorkerPool
uses a hierarchical architecture with process groups that each manage multiple threads:
from alsek import Broker
from alsek.core.worker.thread import ThreadWorkerPool
from alsek.storage.backends.redis import RedisBackend
backend = RedisBackend()
broker = Broker(backend)
pool = ThreadWorkerPool(
tasks=[task_a, task_b], # tasks to process
n_threads=8, # threads per process group
n_processes=4, # maximum number of process groups
)
pool.run()
Key features of the ThreadWorkerPool
:
- Elastic Scaling: Automatically creates new process groups as needed and prunes them when they're no longer required
- Hierarchical Design: Each process group manages its own set of threads
- Total Capacity: The maximum number of concurrent tasks is
n_threads × n_processes
- Resource Efficiency: Ideal for I/O-bound tasks or when lower overhead is desired
The ThreadWorkerPool
will dynamically scale up to n_processes
process groups, each managing up to n_threads
threads, for a maximum capacity of n_threads × n_processes
. When a message needs to be processed, it is assigned to an available process group, which then executes it on one of its threads.
Process Worker Pool
The ProcessWorkerPool
uses a direct approach where each task runs in its own dedicated process:
from alsek import Broker
from alsek.core.worker.process import ProcessWorkerPool
from alsek.storage.backends.redis import RedisBackend
backend = RedisBackend()
broker = Broker(backend)
pool = ProcessWorkerPool(
tasks=[task_a, task_b], # tasks to process
n_processes=4, # maximum number of processes
prune_interval=100, # milliseconds between prune scans
)
pool.run()
Key features of the ProcessWorkerPool
:
- Process Isolation: Each task runs in its own dedicated process
- Fixed Capacity: Limited to a maximum of
n_processes
concurrent tasks - Background Pruning: Uses a background scheduler to periodically prune spent futures
- Resource Safety: Ideal for CPU-bound tasks or when process isolation is necessary
The ProcessWorkerPool
will execute each task in its own dedicated process, up to the maximum of n_processes
concurrent processes. This offers stronger isolation between tasks but with slightly higher overhead compared to threads.
Performance Considerations
When choosing between worker pool types, consider:
-
I/O vs CPU Bound Tasks: For I/O-bound tasks (e.g., network requests, database operations),
ThreadWorkerPool
often offers better performance. For CPU-bound tasks,ProcessWorkerPool
can better utilize multiple cores. -
Memory Usage:
ThreadWorkerPool
is typically more memory-efficient as threads share memory within their process group. -
Isolation Needs: If tasks need strong isolation from each other,
ProcessWorkerPool
provides better separation. -
Elasticity: If your workload has variable demand,
ThreadWorkerPool
's elastic scaling may be more efficient. -
Shutdown Reliability:
ProcessWorkerPool
offers more robust shutdown logic since it can directly terminate processes, avoiding potential GIL-related issues where a thread might never relinquish control and prevent proper shutdown (see above).
Note
Both worker pool types use timeouts to manage hanging tasks and provide similar retry and backoff capabilities.
Warning
For tasks using mechanism='thread'
, be aware of Python's Global Interpreter Lock (GIL) which can limit true parallelism within a single process group.
CLI
Alsek's command line interface (CLI) provides an easy way to bring a pool of workers online to process tasks for which we can provide the definition.
Alsek offers two types of worker pools:
thread-pool
: Uses a hierarchical architecture with process groups that each manage multiple threadsprocess-pool
: Uses a direct approach where each task runs in its own process
Each worker pool relies on a Consumer
to pull messages written to the backend by the Broker
.
When the worker pool reaches capacity, it will pause the stream of data from the consumer.
This is done to both reduce load on the backend and to allow other worker pools (perhaps
running on different machines) to acquire messages for processing.
Basics
The CLI requires that we inform it where tasks can be found.
To start with a simple case, let's imagine our one and only task
is located in the current working directory in a file titled my_task.py
.
Then, starting a worker pool against this task can be accomplished by running:
# For a process-based worker pool
alsek process-pool my_tasks
# For a thread-based worker pool
alsek thread-pool my_tasks
Nested files
Alternatively, we might have a file of task definitions inside a directory, like this:
my_project
├── __init__.py
├── singletons.py
└── my_tasks.py
Starting a pool with this kind of structure can be accomplished by passing the dot-separated "path" to the file:
alsek process-pool my_project.my_tasks
Recursive
We can also simply specify the directory where the task definitions live, and it will be scanned recursively in order to recover all task definitions.
alsek thread-pool my_project
Advanced options
Alsek's CLI includes several dials to achieve fine-grain control over the worker pool. We won't cover all of them here, but there are at least a few worth highlighting.
Queue Selection
The -qu
/--queues
option allows you to limit the queues which will be consumed by the worker pool.
It can be set using a comma-separated list.
alsek process-pool my_project -qu queue_a
alsek thread-pool my_project -qu queue_a,queue_b,queue_z
Worker Capacity
Thread-based Worker Pools
For thread-based worker pools, you can control both the number of threads per process group and the maximum number of process groups:
# Configure 8 threads per process group
alsek thread-pool my_project --n_threads 8
# Configure a maximum of 4 process groups
alsek thread-pool my_project --n_processes 4
The total maximum capacity of a thread-based worker pool is n_threads × n_processes
.
Thread pools support elastic scaling, automatically creating new process groups up to the configured maximum as needed, and pruning them when no longer needed.
Process-based Worker Pools
For process-based worker pools, you can control the maximum number of concurrent processes:
# Configure a maximum of 4 processes
alsek process-pool my_project --n_processes 4
Performance Tuning
Both worker pool types support additional performance tuning options:
# Configure slot wait interval (milliseconds to wait when pool is full)
alsek thread-pool my_project --slot_wait_interval 50
# For process-based worker pools, configure prune interval
alsek process-pool my_project --prune_interval 100
# For thread-based worker pools, wait for thread exit to mark as complete
alsek thread-pool my_project --complete_only_on_thread_exit
Note
The worker pool's Consumer
will respect the order in which queues
are listed for the -qu
/--queues
option. If, this option is not specified,
queues will be consumed in alphabetical order.
Note
Worker pools scale up and down dynamically based on load.
Note
The full documentation for Alsek's CLI can be obtained by running:
alsek --help
Warning
If a worker pool encounters a message which refers to an unknown task, an error will be logged and the message will be failed.