Skip to content

Reference 📚

Alsek

cli special

cli

Command Line Interface

core special

Core

backoff

Backoff Algorithms

Backoff

Base backoff class.

Parameters:

Name Type Description Default
floor int

minimum backoff in milliseconds

required
ceiling int

maximum backoff in milliseconds

required
zero_override bool

override backoff to zero if the number of incidents is zero.

required
parameters: Dict[str, Optional[int]] property readonly

Parameters of the current instance which uniquely characterize it.

Returns:

Type Description
Dict[str, Optional[int]]

params (dict): backoff parameters

settings: Dict[str, Any] property readonly

Settings the current algorithm.

Returns:

Type Description
Dict[str, Any]

serialization (BackoffSettingsType): summary of the current algorithm and parameters with sufficient information to reconstruct it.

formula(self, incidents)

Implementation of the formula for computing the backoff.

Parameters:

Name Type Description Default
incidents int

current number of incidents

required

Returns:

Type Description
int

int

Source code in alsek/core/backoff.py
@abstractmethod
def formula(self, incidents: int) -> int:
    """Implementation of the formula for computing the backoff.

    Args:
        incidents (int): current number of incidents

    Returns:
        int

    """
    raise NotImplementedError()
get(self, incidents)

Get the backoff.

Parameters:

Name Type Description Default
incidents int

current number of incidents

required

Returns:

Type Description
int

backoff (int): backoff in milliseconds

Source code in alsek/core/backoff.py
def get(self, incidents: int) -> int:
    """Get the backoff.

    Args:
        incidents (int): current number of incidents

    Returns:
        backoff (int): backoff in milliseconds

    """
    if self.zero_override and incidents == 0:
        return 0
    return self._clipper(self.formula(incidents))

ConstantBackoff

Constant backoff.

Parameters:

Name Type Description Default
constant int

amount of time (in milliseconds) to backoff.

required
**kwargs Keyword Args

keyword arguments to pass to Backoff

required
parameters: Dict[str, Optional[int]] property readonly

Parameters of the current ConstantBackoff instance which uniquely characterize it.

Returns:

Type Description
Dict[str, Optional[int]]

params (dict): backoff parameters

formula(self, incidents)

Constant backoff formula.

Implements:

\[c\]

where \(c\) is constant.

Parameters:

Name Type Description Default
incidents int

current number of incidents

required

Returns:

Type Description
int

backoff (int): backoff in milliseconds

Source code in alsek/core/backoff.py
def formula(self, incidents: int) -> int:
    """Constant backoff formula.

    Implements:

    $$c$$

    where $c$ is `constant`.

    Args:
        incidents (int): current number of incidents

    Returns:
        backoff (int): backoff in milliseconds

    """
    return self.constant

ExponentialBackoff

Exponential backoff.

Parameters:

Name Type Description Default
base int

the base of the exponential (milliseconds)

required
factor int

factor to multiply the result by

required
**kwargs Keyword Args

keyword arguments to pass to Backoff

required
parameters: Dict[str, Optional[int]] property readonly

Parameters of the current ExponentialBackoff instance which uniquely characterize it.

Returns:

Type Description
Dict[str, Optional[int]]

params (dict): backoff parameters

formula(self, incidents)

Exponential backoff formula.

Implements:

\[f * (b^{i})\]

where \(f\) is factor, \(b\) is base and \(i\) is the number of incidents.

Parameters:

Name Type Description Default
incidents int

current number of incidents

required

Returns:

Type Description
int

backoff (int): backoff in milliseconds

Source code in alsek/core/backoff.py
def formula(self, incidents: int) -> int:
    """Exponential backoff formula.

    Implements:

    $$f * (b^{i})$$

    where $f$ is `factor`, $b$ is `base` and $i$ is the number of `incidents`.

    Args:
        incidents (int): current number of incidents

    Returns:
        backoff (int): backoff in milliseconds

    """
    return int(self.factor * (self.base ** incidents))

LinearBackoff

Linear backoff.

Parameters:

Name Type Description Default
factor int

amount of time (in milliseconds) to add to backoff after each retry.

required
**kwargs Keyword Args

keyword arguments to pass to Backoff

required
parameters: Dict[str, Optional[int]] property readonly

Parameters of the current LinearBackoff instance which uniquely characterize it.

Returns:

Type Description
Dict[str, Optional[int]]

params (dict): backoff parameters

formula(self, incidents)

Linear backoff formula.

Implements:

\[f * i\]

where \(f\) is factor and \(i\) is the number of incidents.

Parameters:

Name Type Description Default
incidents int

current number of incidents

required

Returns:

Type Description
int

backoff (int): backoff in milliseconds

Source code in alsek/core/backoff.py
def formula(self, incidents: int) -> int:
    """Linear backoff formula.

    Implements:

    $$f * i$$

    where $f$ is `factor` and $i$ is the number of `incidents`.

    Args:
        incidents (int): current number of incidents

    Returns:
        backoff (int): backoff in milliseconds

    """
    return int(self.factor * incidents)

settings2backoff(settings)

Convert backoff settings to a Backoff instance.

Parameters:

Name Type Description Default
settings Dict[str, Any]

backoff settings of the form {"algorithm": str, "parameters": dict}.

required

Returns:

Type Description
Backoff

backoff (Backoff): a backoff instance

Source code in alsek/core/backoff.py
def settings2backoff(settings: BackoffSettingsType) -> Backoff:
    """Convert backoff settings to a ``Backoff`` instance.

    Args:
        settings (BackoffSettingsType): backoff settings of
            the form ``{"algorithm": str, "parameters": dict}``.

    Returns:
        backoff (Backoff): a backoff instance

    """
    algorithm = _get_algorithm(settings["algorithm"])
    return algorithm(**settings["parameters"])

broker

Broker

Broker

Alsek Broker.

Parameters:

Name Type Description Default
backend Backend

backend for data storage

required
dlq_ttl int

time to live (in milliseconds) for Dead Letter Queue (DLQ). If None, failed messages will not be moved to the DLQ.

required
ack(self, message)

Acknowledge a message by removing it from the data backend.

Parameters:

Name Type Description Default
message Message

a message to acknowledge

required

Returns:

Type Description
None

None

Warning

  • Messages will not be redelivered once acked.
Source code in alsek/core/broker.py
@magic_logger(
    before=lambda message: log.debug("Acking %s...", message.summary),
    after=lambda input_: log.debug("Acked %s.", input_["message"].summary),
)
exists(self, message)

Determine if the message exists in the backend.

Parameters:

Name Type Description Default
message Message

an Alsek message

required

Returns:

Type Description
bool

exists (bool): whether or not the message exists.

Source code in alsek/core/broker.py
def exists(self, message: Message) -> bool:
    """Determine if the message exists in the backend.

    Args:
        message (Message): an Alsek message

    Returns:
        exists (bool): whether or not the message exists.

    """
    name = self.get_message_name(message)
    return self.backend.exists(name)
fail(self, message)

Acknowledge and fail a message by removing it from the backend. If dlq_ttl is not null, the messages will be persisted to the dead letter queue for the prescribed amount of time.

Parameters:

Name Type Description Default
message Message

an Alsek message

required

Returns:

Type Description
None

None

Source code in alsek/core/broker.py
@magic_logger(
    before=lambda message: log.debug("Failing %s...", message.summary),
    after=lambda input_: log.debug("Failed %s.", input_["message"].summary),
)
get_message_name(self, message)

Get the name for message in the backend.

Parameters:

Name Type Description Default
message Message

an Alsek message

required

Returns:

Type Description
str

name (str): message-specific name

Source code in alsek/core/broker.py
def get_message_name(self, message: Message) -> str:
    """Get the name for ``message`` in the backend.

    Args:
        message (Message): an Alsek message

    Returns:
        name (str): message-specific name

    """
    subnamespace = self.get_subnamespace(message.queue, message.task_name)
    return f"{subnamespace}:messages:{message.uuid}"
get_subnamespace(queue=None, task_name=None) staticmethod

Get the subnamespace for a given queue and (optionally) task_name.

Parameters:

Name Type Description Default
queue Optional[str]

the name of the queue

None
task_name Optional[str]

name of the task

None

Returns:

Type Description
str

subnamespace (str): queue-specific namespace

Exceptions:

Type Description
ValueError

if task_name is provided and queue is not.

Source code in alsek/core/broker.py
@staticmethod
def get_subnamespace(
    queue: Optional[str] = None,
    task_name: Optional[str] = None,
) -> str:
    """Get the subnamespace for a given ``queue``
    and (optionally) ``task_name``.

    Args:
        queue (str, optional): the name of the queue
        task_name (str): name of the task

    Returns:
        subnamespace (str): queue-specific namespace

    Raises:
        ValueError: if ``task_name`` is provided and ``queue`` is not.

    """
    if queue is None and task_name is not None:
        raise ValueError("`queue` must be provided if `task_name` is not None")

    if queue and task_name:
        return f"queues:{queue}:tasks:{task_name}"
    elif queue:
        return f"queues:{queue}"
    else:
        return "queues"
nack(self, message)

Do not acknowledge a message and render it eligible for redelivery.

Parameters:

Name Type Description Default
message Message

a message to not acknowledge

required

Returns:

Type Description
None

None

Source code in alsek/core/broker.py
@magic_logger(  # noqa
    before=lambda message: log.debug("Nacking %s...", message.summary),
    after=lambda input_: log.debug("Nacked %s.", input_["message"].summary),
)
remove(self, message)

Remove a message from the backend.

Parameters:

Name Type Description Default
message Message

an Alsek message

required

Returns:

Type Description
None

None

Source code in alsek/core/broker.py
@magic_logger(
    before=lambda message: log.debug("Removing %s...", message.summary),
    after=lambda input_: log.debug("Removed %s.", input_["message"].summary),
)
retry(self, message)

Retry a message.

Parameters:

Name Type Description Default
message Message

an Alsek message

required

Returns:

Type Description
None

None

Warning

  • This method will mutate message by incrementing it.
Source code in alsek/core/broker.py
@magic_logger(
    before=lambda message: log.debug("Retrying %s...", message.summary),
)
def retry(self, message: Message) -> None:
    """Retry a message.

    Args:
        message (Message): an Alsek message

    Returns:
        None

    Warning:
        * This method will mutate ``message`` by incrementing it.

    """
    if not self.exists(message):
        raise MessageDoesNotExistsError(
            f"Message '{message.uuid}' not found in backend"
        )

    message.increment()
    self.backend.set(self.get_message_name(message), value=message.data)
    self.nack(message)
    log.info(
        "Retrying %s in %s ms...",
        message.summary,
        format(message.get_backoff_duration(), ","),
    )
submit(self, message, ttl=604800000)

Submit a message for processing.

Parameters:

Name Type Description Default
message Message

an Alsek message

required
ttl int

time to live for the submitted message in milliseconds

604800000

Returns:

Type Description
None

None

Exceptions:

Type Description
MessageAlreadyExistsError

if the message already exists

Source code in alsek/core/broker.py
@magic_logger(
    before=lambda message: log.debug("Submitting %s...", message.summary),
    after=lambda input_: log.debug("Submitted %s.", input_["message"].summary),
)

concurrency

Concurrency

Lock

Distributed mutual exclusion (MUTEX) lock.

Parameters:

Name Type Description Default
name str

name of the lock

required
backend Backend

backend for data storage

required
ttl int

time to live in milliseconds. If None, the lock will not expire automatically.

required
auto_release bool

if True automatically release the lock on context exit.

required

Warning

  • Locks are global and do not consider queues, unless included in name.
  • When used as a context manager, the lock is not automatically acquired. Lock acquisition requires calling acquire().

Examples:

>>> from alsek import Lock
>>> from alsek.storage.backends.disk import DiskCacheBackend
...
>>> backend = DiskCacheBackend()
...
>>> with Lock("mutex", backend=backend) as lock:
>>>     if lock.acquire(strict=False):
>>>         print("Acquired lock.")
>>>     else:
>>>         print("Did not acquire lock.")
held: bool property readonly

If the lock is held by the current host.

holder: Optional[str] property readonly

Name of the host that currently holds the lock, if any.

long_name: str property readonly

Subnamespace for the lock.

__enter__(self) special

Enter the context and try to acquire the lock.

Returns:

Type Description
Lock

lock (Lock): underlying lock object.

Source code in alsek/core/concurrency.py
def __enter__(self) -> Lock:
    """Enter the context and try to acquire the lock.

    Returns:
        lock (Lock): underlying lock object.

    """
    return self
__exit__(self, exc_type, exc_val, exc_tb) special

Exit the context. If auto_release is enabled, the lock will be released.

Parameters:

Name Type Description Default
exc_val Optional[BaseException]

an exception from within the context

required
exc_val Optional[BaseException]

value of any exception from within the context

required
exc_tb Optional[TracebackType]

the traceback from the context

required

Returns:

Type Description
None

None

Source code in alsek/core/concurrency.py
def __exit__(
    self,
    exc_type: Optional[Type[BaseException]],
    exc_val: Optional[BaseException],
    exc_tb: Optional[TracebackType],
) -> None:
    """Exit the context. If ``auto_release`` is enabled,
     the lock will be released.

    Args:
        exc_val (BaseException, optional): an exception from within the context
        exc_val (BaseException, optional): value of any exception from
            within the context
        exc_tb (TracebackType, optional): the traceback from the context

    Returns:
        None

    """
    if self.auto_release:
        self.release()
acquire(self, strict=True)

Try to acquire the lock.

Parameters:

Name Type Description Default
strict bool

if True return False if the lock is already held.

True

Returns:

Type Description
bool

acquired (bool): True if the message is acquired or already acquired by the current host.

Warning

  • True is only returned if execution of this method resulted in acquisition of the lock. This means that False will be returned if the current host already holds the lock.
Source code in alsek/core/concurrency.py
def acquire(self, strict: bool = True) -> bool:
    """Try to acquire the lock.

    Args:
        strict (bool): if ``True`` return ``False`` if
            the lock is already held.

    Returns:
        acquired (bool): ``True`` if the message is
            acquired or already acquired by the current
            host.

    Warning:
        * ``True`` is only returned if execution of this method
          resulted in acquisition of the lock. This means that
          ``False`` will be returned if the current host already
          holds the lock.

    """
    if self.held:
        return not strict

    try:
        self.backend.set(self.long_name, value=gethostname(), nx=True, ttl=self.ttl)
        return True
    except KeyError:
        return False
release(self)

Release the lock.

Returns:

Type Description
bool

released (bool): whether or not the lock was found and released.

Source code in alsek/core/concurrency.py
def release(self) -> bool:
    """Release the lock.

    Returns:
        released (bool): whether or not the lock was
            found and released.

    """
    if self.held:
        self.backend.delete(self.long_name, missing_ok=True)
        return True
    else:
        return False

consumer

Consumer

Consumer

Tool for consuming messages generated by the broker.

Parameters:

Name Type Description Default
broker Broker

an Alsek broker

required
subset List[str], Dict[str, List[str]]

subset of messages to consume Must be one of the following

* ``None``: consume messages from all queues and tasks
* ``list``: a list of queues of the form ``["queue_a", "queue_b", "queue_c", ...]``
* ``dict``: a dictionary of queues and tasks of the form
    ``{"queue_a": ["task_name_a", "task_name_b", "task_name_c", ...], ...}``
required
backoff Backoff

backoff to use in response to passes over the backend which did not yield any actionable messages.

required

Notes

  • If subset is a list or dict, queue priority is derived from the order of the items. Items which appear earlier are given higher priority.
  • If subset is a dict, task priority is derived from the order of task names in the value associated with each key (queue).

Warning

  • If subset is of type dict, task names not included in the any of the values will be ignored.
stream(self)

Generate a stream of messages to process from the data backend.

Returns:

Type Description
Iterable[alsek.core.message.Message]

stream (Iterable[Message]): an iterable of messages to process

Source code in alsek/core/consumer.py
def stream(self) -> Iterable[Message]:
    """Generate a stream of messages to process from
    the data backend.

    Returns:
        stream (Iterable[Message]): an iterable of messages to process

    """
    while not self.stop_signal.received:
        for message in self._poll():
            yield message
        self.stop_signal.wait(self.backoff.get(self._empty_passes))

futures

Futures

ProcessTaskFuture

Future for task execution in a separate process.

Parameters:

Name Type Description Default
task Task

a task to perform

required
message Message

a message to run task against

required
patience int

time to wait (in milliseconds) after issuing a SIGTERM signal to the process at shutdown. If the process is still active after this time, a SIGKILL will be issued.

required
complete: bool property readonly

Whether or not the task has finished.

stop(self, exception)

Stop the future.

Returns:

Type Description
None

None

Source code in alsek/core/futures.py
def stop(self, exception: Type[BaseException]) -> None:
    """Stop the future.

    Returns:
        None

    """
    if self._process.ident is None:  # type: ignore
        log.error(
            "Process task future for %s did not start.",
            self.message.summary,
        )
        return None

    self._shutdown()
    if self._wrapper_exit_queue.empty():
        self._wrapper_exit_queue.put(1)
        try:
            raise exception(f"Stopped process {self._process.ident}")  # type: ignore
        except BaseException as error:
            log.error("Error processing %s.", self.message.summary, exc_info=True)
            _retry_future_handler(self.task, self.message, exception=error)

TaskFuture

Future for background task execution.

Parameters:

Name Type Description Default
task Task

a task to perform

required
message Message

a message to run task against

required
complete: bool property readonly

Whether or not the task has finished.

time_limit_exceeded: bool property readonly

Whether or not task has been running longer than the allowed time window.

stop(self, exception)

Stop the future.

Parameters:

Name Type Description Default
exception Type[BaseException]

exception type to raise.

required

Returns:

Type Description
None

None

Source code in alsek/core/futures.py
@abstractmethod
def stop(self, exception: Type[BaseException]) -> None:
    """Stop the future.

    Args:
        exception (Type[BaseException]): exception type to raise.

    Returns:
        None

    """
    raise NotImplementedError()

ThreadTaskFuture

Future for task execution in a separate thread.

Parameters:

Name Type Description Default
task Task

a task to perform

required
message Message

a message to run task against

required
complete: bool property readonly

Whether or not the task has finished.

stop(self, exception)

Stop the future.

Parameters:

Name Type Description Default
exception Type[BaseException]

exception type to raise.

required

Returns:

Type Description
None

None

Source code in alsek/core/futures.py
def stop(self, exception: Type[BaseException]) -> None:
    """Stop the future.

    Args:
        exception (Type[BaseException]): exception type to raise.

    Returns:
        None

    """
    if self._thread.ident is None:
        log.error(
            "Thread task future for %s did not start.",
            self.message.summary,
        )
        return None
    elif python_implementation() != "CPython":
        log.error(
            f"Unable to raise exception {exception} in thread {self._thread.ident}. "
            f"Unsupported platform '{python_implementation()}'."
        )
        return None

    thread_raise(self._thread.ident, exception=exception)
    if not self._wrapper_exit:
        self._wrapper_exit = True
        _retry_future_handler(
            self.task,
            message=self.message,
            exception=exception(f"Stopped thread {self._thread.ident}"),
        )

message

Message

Message

Alsek Message.

Parameters:

Name Type Description Default
task_name str

the name of the task for which the message is intended

required
queue str

the queue for which the message was intended. If None the default queue will be set.

required
args list, tuple

positional arguments to pass to the task's function during the execution of op()

required
kwargs dict

keyword arguments to pass to the task's function during the execution of op()

required
metadata dict

a dictionary of user-defined message metadata. This can store any data types supported by the backend's serializer.

required
result_ttl int

time to live (in milliseconds) for the result in the result store. If a result store is provided and this parameter is None, the result will be persisted indefinitely.

required
uuid str

universal unique identifier for the message. If None, one will be generated automatically.

required
progenitor_uuid str

universal unique identifier for the message from which this message descended. (This field is only set in for tasks with triggers and/or callbacks.)

required
retries int

number of retries

required
timeout int

the maximum amount of time (in milliseconds) a task is permitted to run against this message.

required
created_at int

UTC timestamp (in milliseconds) for when the message was created

required
updated_at int

UTC timestamp (in milliseconds) for when the message was last updated

required
delay int

delay before the message becomes ready

required
previous_result any

the output of any previously executed task. (This will only be non-null in cases where callbacks are used.)

required
previous_message_uuid str

universal unique identifier for the message for the preceeding message (This will only be non-null in cases where callbacks are used.)

required
callback_message_data dict

data to construct a new message as part of a callback operation

required
backoff_settings dict

parameters to control backoff. Expected to be of the form {"algorithm": str, "parameters": dict}.

required
mechanism str

mechanism for executing the task. Must be either "process" or "thread".

required

Notes

  • While not recommended, timeout can be disabled, in effect, by setting it to a very large integer.
data: Dict[str, Any] property readonly

Underlying message data.

descendant_uuids: Optional[List[str]] property readonly

A list of uuids which have or will decent from this message.

ready: bool property readonly

If the messages is currently ready for processing.

ready_at: int property readonly

Timestamp denoting when the message will be ready for processing.

summary: str property readonly

High-level summary of the message object.

ttr: int property readonly

Time to ready in milliseconds.

clone(self)

Create an exact copy of the current message.

Returns:

Type Description
Message

clone (Message): the cloned message

Source code in alsek/core/message.py
def clone(self) -> Message:
    """Create an exact copy of the current message.

    Returns:
        clone (Message): the cloned message

    """
    return Message(**deepcopy(self.data))
duplicate(self, uuid=None)

Create a duplicate of the current message, changing only uuid.

Parameters:

Name Type Description Default
uuid Optional[str]

universal unique identifier for the new message. If None, one will be generated automatically.

None

Returns:

Type Description
Message

duplicate_message (Message): the duplicate message

Warning

  • Linked locks are not conserved
Source code in alsek/core/message.py
def duplicate(self, uuid: Optional[str] = None) -> Message:
    """Create a duplicate of the current message, changing only ``uuid``.

    Args:
        uuid (str, optional): universal unique identifier for the new message.
            If ``None``, one will be generated automatically.

    Returns:
        duplicate_message (Message): the duplicate message

    Warning:
        * Linked locks are not conserved

    """
    return self.clone().update(uuid=uuid or _make_uuid())
get_backoff_duration(self)

Get the amount of time to backoff (wait) before the message is eligible for processing again, should it fail.

Returns:

Type Description
int

duration (int): duration of the backoff in milliseconds

Source code in alsek/core/message.py
def get_backoff_duration(self) -> int:
    """Get the amount of time to backoff (wait)
    before the message is eligible for processing again,
    should it fail.

    Returns:
        duration (int): duration of the backoff in milliseconds

    """
    return settings2backoff(self.backoff_settings).get(self.retries)
increment(self)

Update a message by increasing the number of retries.

Returns:

Type Description
Message

message (Message): the updated message

Notes

  • updated_at will be updated to the current time.

Warning

  • Changes are not automatically persisted to the backend.
Source code in alsek/core/message.py
def increment(self) -> Message:
    """Update a message by increasing the number
    of retries.

    Returns:
        message (Message): the updated message

    Notes:
        * ``updated_at`` will be updated to the
           current time.

    Warning:
        * Changes are *not* automatically persisted to the backend.

    """
    return self.update(retries=self.retries + 1, updated_at=utcnow_timestamp_ms())
update(self, **data)

Update the data in the current message.

Parameters:

Name Type Description Default
**data Any

key value pairs of data to update

{}

Returns:

Type Description
Message

updated_message (Message): the updated message

Warning

  • This method operates 'in place'. To avoid changing the current message, first call .clone(), e.g., message.clone().update(...).
  • Changes are not automatically persisted to the backend.
Source code in alsek/core/message.py
def update(self, **data: Any) -> Message:
    """Update the ``data`` in the current message.

    Args:
        **data (Keyword Args): key value pairs of
            data to update

    Returns:
        updated_message (Message): the updated message

    Warning:
        * This method operates 'in place'. To avoid changing the current
          message, first call ``.clone()``, e.g., ``message.clone().update(...)``.
        * Changes are *not* automatically persisted to the backend.

    """
    for k, v in data.items():
        if k in self.data:
            setattr(self, k, v)
        else:
            raise KeyError(f"Unsupported key '{k}'")
    return self

status

Status Tracking

StatusTracker

Alsek Status Tracker.

Parameters:

Name Type Description Default
broker Broker

broker used by tasks.

required
ttl int

time to live (in milliseconds) for the status

required
integrity_scan_trigger CronTrigger, DateTrigger, IntervalTrigger

trigger which determines how often to scan for messages with non-terminal statuses (i.e., TaskStatus.FAILED or TaskStatus.SUCCEEDED) that no longer exist in the broker. Entries which meet this criteria will have their status set to TaskStatus.UNKNOWN.

required
delete(self, message, check=True)

Delete the status of message.

Parameters:

Name Type Description Default
message Message

an Alsek message

required
check bool

check that it is safe to delete the status. This is done by ensuring that the current status of message is terminal (i.e., TaskStatus.FAILED or TaskStatus.SUCCEEDED).

True

Returns:

Type Description
None

None

Exceptions:

Type Description
ValidationError

if check is True and the status of message is not TaskStatus.FAILED or TaskStatus.SUCCEEDED.

Source code in alsek/core/status.py
def delete(self, message: Message, check: bool = True) -> None:
    """Delete the status of ``message``.

    Args:
        message (Message): an Alsek message
        check (bool): check that it is safe to delete the status.
            This is done by ensuring that the current status of ``message``
            is terminal (i.e., ``TaskStatus.FAILED`` or ``TaskStatus.SUCCEEDED``).

    Returns:
        None

    Raises:
        ValidationError: if ``check`` is ``True`` and the status of
            ``message`` is not ``TaskStatus.FAILED`` or ``TaskStatus.SUCCEEDED``.

    """
    if check and self.get(message) not in TERMINAL_TASK_STATUSES:
        raise ValidationError(f"Message '{message.uuid}' in a non-terminal state")
    self._backend.delete(self.get_storage_name(message), missing_ok=False)
exists(self, message)

Check if a status for message exists in the backend.

Parameters:

Name Type Description Default
message Message

an Alsek message

required

Returns:

Type Description
bool

bool

Source code in alsek/core/status.py
def exists(self, message: Message) -> bool:
    """Check if a status for ``message`` exists in the backend.

    Args:
        message (Message): an Alsek message

    Returns:
        bool

    """
    return self._backend.exists(self.get_storage_name(message))
get(self, message)

Get the status of message.

Parameters:

Name Type Description Default
message Message

an Alsek message

required

Returns:

Type Description
TaskStatus

status (TaskStatus): the status of message

Source code in alsek/core/status.py
def get(self, message: Message) -> TaskStatus:
    """Get the status of ``message``.

    Args:
        message (Message): an Alsek message

    Returns:
        status (TaskStatus): the status of ``message``

    """
    status_name = self._backend.get(self.get_storage_name(message))
    return TaskStatus[status_name]
set(self, message, status)

Set a status for message.

Parameters:

Name Type Description Default
message Message

an Alsek message

required
status TaskStatus

a status to set

required

Returns:

Type Description
None

None

Source code in alsek/core/status.py
def set(self, message: Message, status: TaskStatus) -> None:
    """Set a ``status`` for ``message``.

    Args:
        message (Message): an Alsek message
        status (TaskStatus): a status to set

    Returns:
        None

    """
    self._backend.set(
        self.get_storage_name(message),
        value=status.name,
        ttl=self.ttl if status == TaskStatus.SUBMITTED else None,
    )

TaskStatus

Alsek task statuses.

task

Task

Task

Alsek Task.

Parameters:

Name Type Description Default
function callable

function to use for the main operation

required
broker Broker

an Alsek broker

required
name str

the name of the task. If None, the class name will be used.

required
queue str

the name of the queue to generate the task on. If None, the default queue will be used.

required
priority int

priority of the task. Tasks with lower values will be executed before tasks with higher values.

required
timeout int

the maximum amount of time (in milliseconds) this task is permitted to run.

required
max_retries int

maximum number of allowed retries

required
backoff Backoff

backoff algorithm and parameters to use when computing delay between retries

required
result_store ResultStore

store for persisting task results

required
status_tracker StatusTracker

store for persisting task statuses

required
mechanism str

mechanism for executing the task. Must be either "process" or "thread".

required

Notes

  • do_retry() can be overridden in cases where max_retries is not sufficiently complex to determine if a retry should occur.

Warning

  • Timeouts are not supported for mechanism='thread' on Python implementations other than CPython.
deferred: bool property readonly

Whether or not deferred mode is currently enabled.

name: str property readonly

Name of the task.

cancel_defer(self)

Cancel "deferred" mode.

Returns:

Type Description
Task

task (Task): the current task

Source code in alsek/core/task.py
def cancel_defer(self) -> Task:
    """Cancel "deferred" mode.

    Returns:
        task (Task): the current task

    """
    self._deferred = False
    return self
defer(self)

Enter "deferred" mode.

Do not submit the next message created by generate() to the broker.

Returns:

Type Description
Task

task (Task): the current task

Warning

  • Deferred mode is automatically cancelled by generate() prior to it returning.
Source code in alsek/core/task.py
def defer(self) -> Task:
    """Enter "deferred" mode.

    Do not submit the next message created by ``generate()``
    to the broker.

    Returns:
        task (Task): the current task

    Warning:
        * Deferred mode is automatically cancelled by ``generate()``
          prior to it returning.

    """
    self._deferred = True
    return self
do_callback(self, message, result)

Whether or to submit the callback provided.

Parameters:

Name Type Description Default
message Message

message with the callback

required
result Any

output of op()

required

Returns:

Type Description
bool

bool

Warning

  • If the task message does not have a callback this this method will not be invoked.
Source code in alsek/core/task.py
def do_callback(self, message: Message, result: Any) -> bool:  # noqa
    """Whether or to submit the callback provided.

    Args:
        message (Message): message with the callback
        result (Any): output of ``op()``

    Returns:
        bool

    Warning:
        * If the task message does not have a callback this
          this method will *not* be invoked.

    """
    return True
do_retry(self, message, exception)

Whether or not a failed task should be retried.

Parameters:

Name Type Description Default
message Message

message which failed

required
exception BaseException

the exception which was raised

required

Returns:

Type Description
bool

bool

Source code in alsek/core/task.py
def do_retry(self, message: Message, exception: BaseException) -> bool:  # noqa
    """Whether or not a failed task should be retried.

    Args:
        message (Message): message which failed
        exception (BaseException): the exception which was raised

    Returns:
        bool

    """
    return self.max_retries is None or message.retries < self.max_retries
execute(self, message)

Execute the task against a message.

Parameters:

Name Type Description Default
message Message

message to process

required

Returns:

Type Description
Any

result (Any): output of op()

Source code in alsek/core/task.py
def execute(self, message: Message) -> Any:
    """Execute the task against a message.

    Args:
        message (Message): message to process

    Returns:
        result (Any): output of ``op()``

    """
    self.pre_op(message)
    result = self.op(message)
    self.post_op(message, result=result)
    return result
generate(self, args=None, kwargs=None, metadata=None, result_ttl=None, uuid=None, timeout_override=None, delay=None, previous_result=None, callback=None, submit=True, **options)

Generate an instance of the task for processing.

This method generates a new message for the task and submit it to the broker.

Parameters:

Name Type Description Default
args Optional[Union[List[Any], Tuple[Any, ...]]]

positional arguments to pass to function

None
kwargs Optional[Dict[Any, Any]]

keyword arguments to pass to function

None
metadata Optional[Dict[Any, Any]]

a dictionary of user-defined message metadata. This can store any data types supported by the backend's serializer.

None
result_ttl Optional[int]

time to live (in milliseconds) for the result in the result store. If a result store is provided and

None
uuid Optional[str]

universal unique identifier for the message. If None, one will be generated automatically.

None
timeout_override Optional[int]

override the default maximum runtime (in milliseconds) for instances of this task.

None
delay Optional[int]

delay before message is ready

None
previous_result Any

result of a previous task.

None
callback Optional[Union[Message, Tuple[Message, ...]]]

one ore more messages to be submitted to the broker after the proceeding message has been successfully processed by a worker.

None
submit bool

if True submit the task to the broker

True
options Any

options to use when submitting the message via the broker. See Broker.submit().

{}

Returns:

Type Description
Message

message (Message): message generated for the task

Warning

  • submit is overridden to False if deferred mode is active
  • uuid is refreshed after the first event when using a trigger.
Source code in alsek/core/task.py
def generate(
    self,
    args: Optional[Union[List[Any], Tuple[Any, ...]]] = None,
    kwargs: Optional[Dict[Any, Any]] = None,
    metadata: Optional[Dict[Any, Any]] = None,
    result_ttl: Optional[int] = None,
    uuid: Optional[str] = None,
    timeout_override: Optional[int] = None,
    delay: Optional[int] = None,
    previous_result: Any = None,
    callback: Optional[Union[Message, Tuple[Message, ...]]] = None,
    submit: bool = True,
    **options: Any,
) -> Message:
    """Generate an instance of the task for processing.

    This method generates a new message for the task and
    submit it to the broker.

    Args:
        args (list, tuple, optional): positional arguments to pass to ``function``
        kwargs (dict, optional): keyword arguments to pass to ``function``
        metadata (dict, optional): a dictionary of user-defined message metadata.
            This can store any data types supported by the backend's serializer.
        result_ttl (int, optional): time to live (in milliseconds) for the
            result in the result store. If a result store is provided and
        this parameter is ``None``, the result will be persisted indefinitely.
        uuid (str, optional): universal unique identifier for the message.
            If ``None``, one will be generated automatically.
        timeout_override (int, optional): override the default maximum runtime
            (in milliseconds) for instances of this task.
        set maximum amount of time (in milliseconds)
            this message is permitted to run. This will override the default
            for the task.
        delay (int, optional): delay before message is ready
        previous_result (Any): result of a previous task.
        callback (Message, Tuple[Message, ...], optional): one ore more messages
            to be submitted to the broker after the proceeding message has been
            successfully processed by a worker.
        submit (bool): if ``True`` submit the task to the broker
        options (Keyword Args): options to use when submitting
            the message via the broker. See ``Broker.submit()``.

    Returns:
        message (Message): message generated for the task

    Warning:
        * ``submit`` is overridden to ``False`` if deferred mode is active
        * ``uuid`` is refreshed after the first event when using a trigger.

    """
    if result_ttl and not self.result_store:
        raise ValidationError(f"`result_ttl` invalid. No result store set.")

    message = Message(
        task_name=self.name,
        queue=self.queue,
        args=args,
        kwargs=kwargs,
        metadata=metadata,
        result_ttl=result_ttl,
        uuid=uuid,
        timeout=timeout_override or self.timeout,
        delay=delay,
        previous_result=previous_result,
        callback_message_data=_parse_callback(callback).data if callback else None,
        backoff_settings=self.backoff.settings,
        mechanism=self.mechanism,
    )
    if self._deferred:
        self.cancel_defer()
    elif submit:
        self._submit(message, **options)
        self._update_status(message, status=TaskStatus.SUBMITTED)
    return message
op(self, message)

Pass message data to function for processing.

Parameters:

Name Type Description Default
message Message

message to perform the operation against

required

Returns:

Type Description
Any

result (Any): output of the function

Notes

  • If, and only if, the signature of function contains a message parameter, message itself will be passed along with any args and kwargs contained in the message.

Warning

  • message will not be passed in cases where a "message" exists in message.kwargs.
Source code in alsek/core/task.py
def op(self, message: Message) -> Any:
    """Pass ``message`` data to ``function`` for processing.

    Args:
        message (Message): message to perform the operation against

    Returns:
        result (Any): output of the function

    Notes:
        * If, and only if, the signature of ``function`` contains
          a ``message`` parameter, ``message`` itself will be passed
          along with any ``args`` and ``kwargs`` contained in the message.

    Warning:
        * ``message`` will not be passed in cases where a "message"
          exists in ``message.kwargs``.

    """
    if "message" not in message.kwargs and _expects_message(self.function):
        return self.function(*message.args, **message.kwargs, message=message)
    else:
        return self.function(*message.args, **message.kwargs)
post_op(self, message, result)

Operation to perform after running op.

Parameters:

Name Type Description Default
message Message

message op() was run against

required
result Any

output of op()

required

Returns:

Type Description
None

None

Source code in alsek/core/task.py
def post_op(self, message: Message, result: Any) -> None:
    """Operation to perform after running ``op``.

    Args:
        message (Message): message ``op()`` was run against
        result (Any): output of ``op()``

    Returns:
        None

    """
pre_op(self, message)

Operation to perform before running op.

Parameters:

Name Type Description Default
message Message

message op will be run against.

required

Returns:

Type Description
None

None

Source code in alsek/core/task.py
def pre_op(self, message: Message) -> None:
    """Operation to perform before running ``op``.

    Args:
        message (Message): message ``op`` will be run against.

    Returns:
        None

    """

TriggerTask

Triggered Task.

Parameters:

Name Type Description Default
function callable

function to use for the main operation

required
trigger CronTrigger, DateTrigger, IntervalTrigger

trigger for task execution.

required
broker Broker

an Alsek broker

required
name str

the name of the task. If None, the class name will be used.

required
queue str

the name of the queue to generate the task on. If None, the default queue will be used.

required
priority int

priority of the task. Tasks with lower values will be executed before tasks with higher values.

required
timeout int

the maximum amount of time (in milliseconds) this task is permitted to run.

required
max_retries int

maximum number of allowed retries

required
backoff Backoff

backoff algorithm and parameters to use when computing delay between retries

required
result_store ResultStore

store for persisting task results

required
status_tracker StatusTracker

store for persisting task statuses

required
mechanism str

mechanism for executing the task. Must be either "process" or "thread".

required

Warnings

  • The signature of function cannot contain parameters

Exceptions:

Type Description
* ``SchedulingError``

if the signature of function includes parameters.

generated: bool property readonly

If the task has been generated.

clear(self)

Clear the currently scheduled task.

Returns:

Type Description
None

None

Raises * AttributeError: if a task has not yet been generated

Source code in alsek/core/task.py
def clear(self) -> None:
    """Clear the currently scheduled task.

    Returns:
        None

    Raises
        * AttributeError: if a task has not yet
            been generated

    """
    try:
        self.scheduler.remove_job(self.name)
    except JobLookupError:
        raise AttributeError("Task not generated")
pause(self)

Pause the underlying scheduler.

Returns:

Type Description
None

None

Source code in alsek/core/task.py
def pause(self) -> None:
    """Pause the underlying scheduler.

    Returns:
        None

    """
    self.scheduler.pause()
resume(self)

Resume the underlying scheduler.

Returns:

Type Description
None

None

Source code in alsek/core/task.py
def resume(self) -> None:
    """Resume the underlying scheduler.

    Returns:
        None

    """
    self.scheduler.resume()
shutdown(self)

Shutdown the underlying scheduler.

Returns:

Type Description
None

None

Source code in alsek/core/task.py
def shutdown(self) -> None:
    """Shutdown the underlying scheduler.

    Returns:
        None

    """
    self.scheduler.shutdown()

task(broker, name=None, queue=None, priority=0, timeout=3600000, max_retries=3, backoff=ExponentialBackoff(base=4, factor=10000, floor=60000, ceiling=3600000, zero_override=True), trigger=None, result_store=None, status_tracker=None, mechanism='process', base_task=None)

Wrapper for task construction.

Parameters:

Name Type Description Default
broker Broker

an Alsek broker

required
name Optional[str]

the name of the task. If None, the class name will be used.

None
queue Optional[str]

the name of the queue to generate the task on. If None, the default queue will be used.

None
priority int

priority of the task. Tasks with lower values will be executed before tasks with higher values.

0
timeout int

the maximum amount of time (in milliseconds) this task is permitted to run.

3600000
max_retries int

maximum number of allowed retries

3
backoff Optional[Backoff]

backoff algorithm and parameters to use when computing delay between retries

ExponentialBackoff(base=4, factor=10000, floor=60000, ceiling=3600000, zero_override=True)
trigger Optional[Union[CronTrigger, DateTrigger, IntervalTrigger]]

trigger for task execution.

None
result_store Optional[ResultStore]

store for persisting task results

None
status_tracker Optional[StatusTracker]

store for persisting task statuses

None
mechanism str

mechanism for executing the task. Must be either "process" or "thread".

'process'
base_task Optional[Type[Task]]

base to use for task constuction. If None, a base task will be selected automatically.

None

Returns:

Type Description
Callable[..., Task]

wrapper (callable): task-wrapped function

Exceptions:

Type Description
* ValueError

if a trigger and not supported by base_task

Examples:

>>> from alsek import Broker, task
>>> from alsek.storage.backends.disk import DiskCacheBackend
>>> backend = DiskCacheBackend()
>>> broker = Broker(backend)
>>> @task(broker)
... def add(a: int, b: int) -> int:
...     return a + b
Source code in alsek/core/task.py
def task(
    broker: Broker,
    name: Optional[str] = None,
    queue: Optional[str] = None,
    priority: int = 0,
    timeout: int = DEFAULT_TASK_TIMEOUT,
    max_retries: int = DEFAULT_MAX_RETRIES,
    backoff: Optional[Backoff] = ExponentialBackoff(),
    trigger: Optional[Union[CronTrigger, DateTrigger, IntervalTrigger]] = None,
    result_store: Optional[ResultStore] = None,
    status_tracker: Optional[StatusTracker] = None,
    mechanism: str = DEFAULT_MECHANISM,
    base_task: Optional[Type[Task]] = None,
) -> Callable[..., Task]:
    """Wrapper for task construction.

    Args:
        broker (Broker): an Alsek broker
        name (str, optional): the name of the task. If ``None``,
            the class name will be used.
        queue (str, optional): the name of the queue to generate the task on.
            If ``None``, the default queue will be used.
        priority (int): priority of the task. Tasks with lower values
            will be executed before tasks with higher values.
        timeout (int): the maximum amount of time (in milliseconds)
            this task is permitted to run.
        max_retries (int, optional): maximum number of allowed retries
        backoff (Backoff, optional): backoff algorithm and parameters to use when computing
            delay between retries
        trigger (CronTrigger, DateTrigger, IntervalTrigger, optional): trigger
            for task execution.
        result_store (ResultStore, optional): store for persisting task results
        status_tracker (StatusTracker, optional): store for persisting task statuses
        mechanism (str): mechanism for executing the task. Must
            be either "process" or "thread".
        base_task (Type[Task]): base to use for task constuction.
            If ``None``, a base task will be selected automatically.

    Returns:
        wrapper (callable): task-wrapped function

    Raises:
        * ValueError: if a ``trigger`` and not supported by ``base_task``

    Examples:
        >>> from alsek import Broker, task
        >>> from alsek.storage.backends.disk import DiskCacheBackend

        >>> backend = DiskCacheBackend()
        >>> broker = Broker(backend)

        >>> @task(broker)
        ... def add(a: int, b: int) -> int:
        ...     return a + b

    """
    parsed_base_task = _parse_base_task(base_task, trigger=trigger)
    base_task_signature = inspect.signature(parsed_base_task.__init__)

    if trigger and "trigger" not in base_task_signature.parameters:
        raise ValueError(f"Trigger not supported by {parsed_base_task}")

    def wrapper(function: Callable[..., Any]) -> Task:
        return parsed_base_task(  # type: ignore
            function=function,
            name=name,
            broker=broker,
            queue=queue,
            priority=priority,
            timeout=timeout,
            max_retries=max_retries,
            backoff=backoff,
            mechanism=mechanism,
            result_store=result_store,
            status_tracker=status_tracker,
            **(  # noqa (handled above)
                dict(trigger=trigger)
                if trigger in base_task_signature.parameters
                else dict()
            ),
        )

    return wrapper

worker

Worker Pool

WorkerPool

Pool of Alsek workers.

Generate a pool of workers to service tasks. The reference broker is extracted from tasks and therefore must be common among all tasks.

Parameters:

Name Type Description Default
tasks Collection[Task]

one or more tasks to handle. This must include all tasks the worker may encounter by listening to queues.

required
queues List[str]

the names of one or more queues consume messages from. If None, all queues will be consumed.

required
max_threads int

the maximum of tasks with mechanism="thread" supported at any 'one' time.

required
max_processes int

the maximum of tasks with mechanism="process" supported at any one time. If None, max(1, cpu_count() - 1) will be used.

required
management_interval int

amount of time (in milliseconds) between maintenance scans of background task execution.

required
slot_wait_interval int

amount of time (in milliseconds) to wait between checks to determine worker availability for pending tasks.

required
**kwargs Keyword Args

Keyword arguments to pass to Consumer().

required

Exceptions:

Type Description
NoTasksFoundError

if no tasks are provided

MultipleBrokersError

if multiple brokers are used by the collected tasks.

run(self)

Run the worker pool.

This method coordinates the following:

1.  Starting a background thread to monitor background tasks
2a. Recovering messages fom the data backend
2b. Processing recovered messages as places in the pool become available.

Returns:

Type Description
None

None

Warning

  • This method is blocking.
Source code in alsek/core/worker.py
@magic_logger(
    before=lambda: log.info("Alsek v%s worker pool booting up...", __version__),
    after=lambda: log.info("Graceful shutdown complete."),
)

exceptions

Exceptions

AlsekError

Base Alsek error.

MessageAlreadyExistsError

Message already exists in backend.

MessageDoesNotExistsError

Message does not exists in backend.

MultipleBrokersError

Multiple brokers in use.

NoTasksFoundError

No tasks found.

SchedulingError

Error scheduling work.

TaskNameCollisionError

Duplicate task detected.

TerminationError

Alsek termination error.

ValidationError

Data validation failed.

storage special

Storage

backends special

Backend

Backend

Backend base class.

Parameters:

Name Type Description Default
namespace str

prefix to use when inserting names in the backend

required
serializer Serializer

tool for encoding and decoding values written into the backend.

required
clear_namespace(self, raise_on_error=True)

Clear all items in backend under the current namespace.

Returns:

Type Description
int

count (int): number of items cleared raise_on_error (bool): raise if a delete operation fails

Exceptions:

Type Description
KeyError

if raise_on_error and a delete operation fails

Source code in alsek/storage/backends/__init__.py
def clear_namespace(self, raise_on_error: bool = True) -> int:
    """Clear all items in backend under the current namespace.

    Returns:
        count (int): number of items cleared
        raise_on_error (bool): raise if a delete operation fails

    Raises:
        KeyError: if ``raise_on_error`` and a delete operation fails

    """
    count: int = 0
    for name in self.scan():
        try:
            self.delete(name, missing_ok=False)
            count += 1
        except KeyError as error:
            if raise_on_error:
                raise error
            else:
                log.warning("Unable to delete %r", name)
    return count
count(self, pattern=None)

Count the number of items in the backend.

Parameters:

Name Type Description Default
pattern Optional[str]

pattern to limit count to

None

Returns:

Type Description
int

count (int): number of matching names

Source code in alsek/storage/backends/__init__.py
def count(self, pattern: Optional[str] = None) -> int:
    """Count the number of items in the backend.

    Args:
        pattern (str, optional): pattern to limit count to

    Returns:
        count (int): number of matching names

    """
    return sum(1 for _ in self.scan(pattern))
delete(self, name, missing_ok=False)

Delete a name from the backend.

Parameters:

Name Type Description Default
name str

name of the item

required
missing_ok bool

if True, do not raise for missing

False

Returns:

Type Description
None

None

Exceptions:

Type Description
KeyError

if missing_ok is False and name is not found.

Source code in alsek/storage/backends/__init__.py
@abstractmethod
def delete(self, name: str, missing_ok: bool = False) -> None:
    """Delete a ``name`` from the backend.

    Args:
        name (str): name of the item
        missing_ok (bool): if ``True``, do not raise for missing

    Returns:
        None

    Raises:
        KeyError: if ``missing_ok`` is ``False`` and ``name`` is not found.

    """
    raise NotImplementedError()
exists(self, name)

Check if name exists in the backend.

Parameters:

Name Type Description Default
name str

name of the item

required

Returns:

Type Description
bool

bool

Source code in alsek/storage/backends/__init__.py
@abstractmethod
def exists(self, name: str) -> bool:
    """Check if ``name`` exists in the backend.

    Args:
        name (str): name of the item

    Returns:
        bool

    """
    raise NotImplementedError()
full_name(self, name)

Get an item's complete name, including the namespace in which it exists.

Parameters:

Name Type Description Default
name str

the name of an item

required

Returns:

Type Description
str

full_name (str): a name of the form "{NAMESPACE}:{name}"

Notes

  • If name is already the full name, this method will collapse to a no-op.
Source code in alsek/storage/backends/__init__.py
def full_name(self, name: str) -> str:
    """Get an item's complete name, including the namespace
    in which it exists.

    Args:
        name (str): the name of an item

    Returns:
        full_name (str): a name of the form ``"{NAMESPACE}:{name}"``

    Notes:
        * If ``name`` is already the full name, this method
          will collapse to a no-op.

    """
    if name.startswith(f"{self.namespace}:"):
        return name
    else:
        return f"{self.namespace}:{name}"
get(self, name)

Get name from the backend.

Parameters:

Name Type Description Default
name str

name of the item

required

Returns:

Type Description
Any

Any

Source code in alsek/storage/backends/__init__.py
@abstractmethod
def get(self, name: str) -> Any:
    """Get ``name`` from the backend.

    Args:
        name (str): name of the item

    Returns:
        Any

    """
    raise NotImplementedError()
in_namespace(self, name)

Determine if name belong to the current namespace.

Parameters:

Name Type Description Default
name str

a name (key)

required

Returns:

Type Description
bool

bool

Warning

  • name should be a complete (i.e., full) name.
Source code in alsek/storage/backends/__init__.py
def in_namespace(self, name: str) -> bool:
    """Determine if ``name`` belong to the current namespace.

    Args:
        name (str): a name (key)

    Returns:
        bool

    Warning:
        * ``name`` should be a complete (i.e., _full_) name.

    """
    return name.startswith(f"{self.namespace}:")
scan(self, pattern=None)

Scan the backend for matching names.

Parameters:

Name Type Description Default
pattern Optional[str]

pattern to limit search to

None

Returns:

Type Description
Iterable[str]

matches_stream (Iterable[str]): a stream of matching name

Source code in alsek/storage/backends/__init__.py
@abstractmethod
def scan(self, pattern: Optional[str] = None) -> Iterable[str]:
    """Scan the backend for matching names.

    Args:
        pattern (str, optional): pattern to limit search to

    Returns:
        matches_stream (Iterable[str]): a stream of matching name

    """
    raise NotImplementedError()
set(self, name, value, nx=False, ttl=None)

Set name to value in the backend.

Parameters:

Name Type Description Default
name str

name of the item

required
value Any

value to set for name

required
nx bool

if True the item must not exist prior to being set

False
ttl Optional[int]

time to live for the entry in milliseconds

None

Returns:

Type Description
None

None

Exceptions:

Type Description
KeyError

if nx is True and name already exists

Source code in alsek/storage/backends/__init__.py
@abstractmethod
def set(
    self,
    name: str,
    value: Any,
    nx: bool = False,
    ttl: Optional[int] = None,
) -> None:
    """Set ``name`` to ``value`` in the backend.

    Args:
        name (str): name of the item
        value (Any): value to set for ``name``
        nx (bool): if ``True`` the item must not exist prior to being set
        ttl (int, optional): time to live for the entry in milliseconds

    Returns:
        None

    Raises:
        KeyError: if ``nx`` is ``True`` and ``name`` already exists

    """
    raise NotImplementedError()
short_name(self, name)

Get an item's short name, without the namespace in which it exists.

Parameters:

Name Type Description Default
name str

the full name of an item

required

Returns:

Type Description
str

short_name (str): name without the namespace prefix

Notes

  • If name is already the short name, this method will collapse to a no-op.
Source code in alsek/storage/backends/__init__.py
def short_name(self, name: str) -> str:
    """Get an item's short name, without the namespace
    in which it exists.

    Args:
        name (str): the full name of an item

    Returns:
        short_name (str): ``name`` without the namespace prefix

    Notes:
        * If ``name`` is already the short name, this method
          will collapse to a no-op.

    """
    return re.sub(rf"^{self.namespace}:", repl="", string=name)

LazyClient

Lazy client.

Wrapper for lazy client initialization.

Parameters:

Name Type Description Default
client_func callable

a callable which returns a backend client.

required
get(self)

Execute client_func.

Returns:

Type Description
Any

client (Any): a backend client

Source code in alsek/storage/backends/__init__.py
def get(self) -> Any:
    """Execute ``client_func``.

    Returns:
        client (Any): a backend client

    """
    return self.client_func()

disk

Disk Backend

DiskCacheBackend

DiskCache Backend.

Backend powered by DiskCache.

Parameters:

Name Type Description Default
conn str, Path, DiskCache, LazyClient

a directory, DiskCache() object or LazyClient.

required
name_match_func callable

a callable to determine if a name matches a specified pattern when scanning the backend. If None, one will selected automatically.

required
namespace str

prefix to use when inserting names in the backend

required
serializer Serializer

tool for encoding and decoding values written into the backend.

required

Warning

DiskCache persists data to a local (Sqlite) database and does not implement 'server-side' "if not exist" on SET (nx) support. For these reasons, DiskCacheBackend() is recommended for development and testing purposes only.

conn: Cache property readonly

Connection to the backend.

delete(self, name, missing_ok=False)

Delete a name from the disk backend.

Parameters:

Name Type Description Default
name str

name of the item

required
missing_ok bool

if True, do not raise for missing

False

Returns:

Type Description
None

None

Exceptions:

Type Description
KeyError

if missing_ok is False and name is not found of if name has a non-expired TTL.

Source code in alsek/storage/backends/disk.py
def delete(self, name: str, missing_ok: bool = False) -> None:
    """Delete a ``name`` from the disk backend.

    Args:
        name (str): name of the item
        missing_ok (bool): if ``True``, do not raise for missing

    Returns:
        None

    Raises:
        KeyError: if ``missing_ok`` is ``False`` and ``name`` is not found
            of if ``name`` has a non-expired TTL.

    """
    try:
        self.conn.__delitem__(self.full_name(name), retry=False)
    except KeyError:
        if not missing_ok:
            raise KeyError(f"No name '{name}' found")
destroy(self)

Destroy the cache.

Returns:

Type Description
None

None

Warning

  • This method recursively delete the cache directory. Use with caution.
  • The backend will not be usable following execution of this method.
Source code in alsek/storage/backends/disk.py
def destroy(self) -> None:
    """Destroy the cache.

    Returns:
        None

    Warning:
        * This method recursively delete the cache directory.
          Use with caution.
        * The backend will not be usable following execution
          of this method.

    """
    shutil.rmtree(self.conn.directory)
exists(self, name)

Check if name exists in the disk backend.

Parameters:

Name Type Description Default
name str

name of the item

required

Returns:

Type Description
bool

bool

Source code in alsek/storage/backends/disk.py
def exists(self, name: str) -> bool:
    """Check if ``name`` exists in the disk backend.

    Args:
        name (str): name of the item

    Returns:
        bool

    """
    return self.full_name(name) in self.conn
get(self, name)

Get name from the disk backend.

Parameters:

Name Type Description Default
name str

name of the item

required

Returns:

Type Description
Any

Any

Source code in alsek/storage/backends/disk.py
def get(self, name: str) -> Any:
    """Get ``name`` from the disk backend.

    Args:
        name (str): name of the item

    Returns:
        Any

    """
    encoded = self.conn.get(self.full_name(name))
    return self.serializer.reverse(encoded)
scan(self, pattern=None)

Scan the disk backend for matching names.

Parameters:

Name Type Description Default
pattern Optional[str]

pattern to limit search to

None

Returns:

Type Description
Iterable[str]

name_stream (Iterable[str]): a stream of matching name

Source code in alsek/storage/backends/disk.py
def scan(self, pattern: Optional[str] = None) -> Iterable[str]:
    """Scan the disk backend for matching names.

    Args:
        pattern (str): pattern to limit search to

    Returns:
        name_stream (Iterable[str]): a stream of matching name

    """
    for name in filter(self.in_namespace, self.conn):
        short_name = self.short_name(name)
        if self.name_match_func(pattern, short_name):
            yield short_name
set(self, name, value, nx=False, ttl=None)

Set name to value in the disk backend.

Parameters:

Name Type Description Default
name str

name of the item

required
value Any

value to set for name

required
nx bool

if True the item must not exist prior to being set

False
ttl Optional[int]

time to live for the entry in milliseconds

None

Returns:

Type Description
None

None

Warning

  • nx is implement client-side for DiskCache. For this reason, it should not be relied upon for critical tasks.

Exceptions:

Type Description
KeyError

if nx is True and name already exists

Source code in alsek/storage/backends/disk.py
def set(
    self,
    name: str,
    value: Any,
    nx: bool = False,
    ttl: Optional[int] = None,
) -> None:
    """Set ``name`` to ``value`` in the disk backend.

    Args:
        name (str): name of the item
        value (Any): value to set for ``name``
        nx (bool): if ``True`` the item must not exist prior to being set
        ttl (int, optional): time to live for the entry in milliseconds

    Returns:
        None

    Warning:
        * ``nx`` is implement client-side for ``DiskCache``.
           For this reason, it should not be relied upon for
           critical tasks.

    Raises:
        KeyError: if ``nx`` is ``True`` and ``name`` already exists

    """
    if nx and self.exists(name):
        raise KeyError(f"Name '{name}' already exists")

    self.conn.set(
        self.full_name(name),
        value=self.serializer.forward(value),
        expire=ttl if ttl is None else ttl / 1000,
    )

redis

Redis Backend

RedisBackend

Redis Backend.

Backend powered by Redis.

Parameters:

Name Type Description Default
conn str, Redis, LazyClient

a connection url, Redis() object or LazyClient.

required
namespace str

prefix to use when inserting names in the backend

required
serializer Serializer

tool for encoding and decoding values written into the backend.

required

Warning

  • If conn is a Redis() object, decode_responses is expected to be set to True.
conn: Redis property readonly

Connection to the backend.

delete(self, name, missing_ok=False)

Delete a name from the Redis backend.

Parameters:

Name Type Description Default
name str

name of the item

required
missing_ok bool

if True, do not raise for missing

False

Returns:

Type Description
None

None

Exceptions:

Type Description
KeyError

if missing_ok is False and name is not found.

Source code in alsek/storage/backends/redis.py
def delete(self, name: str, missing_ok: bool = False) -> None:
    """Delete a ``name`` from the Redis backend.

    Args:
        name (str): name of the item
        missing_ok (bool): if ``True``, do not raise for missing

    Returns:
        None

    Raises:
        KeyError: if ``missing_ok`` is ``False`` and ``name`` is not found.

    """
    found = self.conn.delete(self.full_name(name))
    if not missing_ok and not found:
        raise KeyError(f"No name '{name}' found")
exists(self, name)

Check if name exists in the Redis backend.

Parameters:

Name Type Description Default
name str

name of the item

required

Returns:

Type Description
bool

bool

Source code in alsek/storage/backends/redis.py
def exists(self, name: str) -> bool:
    """Check if ``name`` exists in the Redis backend.

    Args:
        name (str): name of the item

    Returns:
        bool

    """
    return bool(self.conn.exists(self.full_name(name)))
get(self, name)

Get name from the Redis backend.

Parameters:

Name Type Description Default
name str

name of the item

required

Returns:

Type Description
Any

Any

Source code in alsek/storage/backends/redis.py
def get(self, name: str) -> Any:
    """Get ``name`` from the Redis backend.

    Args:
        name (str): name of the item

    Returns:
        Any

    """
    encoded = self.conn.get(self.full_name(name))
    return self.serializer.reverse(encoded)
scan(self, pattern=None)

Scan the backend for matching names.

Parameters:

Name Type Description Default
pattern Optional[str]

pattern to match against

None

Returns:

Type Description
Iterable[str]

names_stream (Iterable[str]): a stream of matching name

Source code in alsek/storage/backends/redis.py
def scan(self, pattern: Optional[str] = None) -> Iterable[str]:
    """Scan the backend for matching names.

    Args:
        pattern (str): pattern to match against

    Returns:
        names_stream (Iterable[str]): a stream of matching name

    """
    match = self.full_name(pattern or "*")
    yield from map(self.short_name, self.conn.scan_iter(match))
set(self, name, value, nx=False, ttl=None)

Set name to value in the Redis backend.

Parameters:

Name Type Description Default
name str

name of the item

required
value Any

value to set for name

required
nx bool

if True the item must not exist prior to being set

False
ttl Optional[int]

time to live for the entry in milliseconds

None

Returns:

Type Description
None

None

Exceptions:

Type Description
KeyError

if nx is True and name already exists

Source code in alsek/storage/backends/redis.py
def set(
    self,
    name: str,
    value: Any,
    nx: bool = False,
    ttl: Optional[int] = None,
) -> None:
    """Set ``name`` to ``value`` in the Redis backend.

    Args:
        name (str): name of the item
        value (Any): value to set for ``name``
        nx (bool): if ``True`` the item must not exist prior to being set
        ttl (int, optional): time to live for the entry in milliseconds

    Returns:
        None

    Raises:
        KeyError: if ``nx`` is ``True`` and ``name`` already exists

    """
    response = self.conn.set(
        self.full_name(name),
        value=self.serializer.forward(value),
        px=ttl,
        nx=nx,
        keepttl=ttl is None,  # type: ignore
    )
    if nx and response is None:
        raise KeyError(f"Name '{name}' already exists")

result

Result Storage

ResultStore

Alsek Result Store.

Parameters:

Name Type Description Default
backend Backend

backend for data storage

required

Warning

  • In order for a result to be stored, it must be serializable by the serializer used by backend.
delete(self, message, descendants=True, missing_ok=False)

Delete any data for message from the backend.

Parameters:

Name Type Description Default
message Message

an Alsek message.

required
descendants bool

if True also delete results for descendants.

True
missing_ok bool

if True, do not raise for missing

False

Returns:

Type Description
int

count (int): number of results deleted

Source code in alsek/storage/result.py
def delete(
    self,
    message: Message,
    descendants: bool = True,
    missing_ok: bool = False,
) -> int:
    """Delete any data for ``message`` from the backend.

    Args:
        message (Message): an Alsek message.
        descendants (bool): if ``True`` also delete results for descendants.
        missing_ok (bool): if ``True``, do not raise for missing

    Returns:
        count (int): number of results deleted

    """
    count: int = 0
    for name in self._get_all_storage_names(message, descendants=descendants):
        self.backend.delete(name, missing_ok=missing_ok)
        count += 1
    return count
exists(self, message, descendants=False)

Whether or not data for message exists in the store.

Parameters:

Name Type Description Default
message Message

an Alsek message

required
descendants bool

if True, this method will return True iff all of the descendants of message also exist.

False

Returns:

Type Description
bool

bool

Source code in alsek/storage/result.py
def exists(self, message: Message, descendants: bool = False) -> bool:
    """Whether or not data for ``message`` exists in the store.

    Args:
        message (Message): an Alsek message
        descendants (bool): if ``True``, this method will return ``True``
            iff all of the descendants of ``message`` also exist.

    Returns:
        bool

    """
    names = self._get_all_storage_names(message, descendants=descendants)
    return all(self.backend.exists(n) for n in names)
get(self, message, timeout=0, keep=False, with_metadata=False, descendants=False)

Get the result for message.

Parameters:

Name Type Description Default
message Message

an Alsek message.

required
timeout int

amount of time (in milliseconds) to wait for the result to become available

0
keep bool

whether or not to keep the result afer fetching it. Defaults to False to conserve storage space.

False
with_metadata bool

if True return results of the form {"result": <result>, "uuid": str, "timestamp": int}, where "result" is the result persisted to the backend, "uuid" if the uuid of the message associated with the result and "timestamp" is the time at which the result was written to the backend.

False
descendants bool

if True also fetch results for descendants.

False

Returns:

Type Description
Union[Any, List[Any]]

result (Any, List[Any]): the stored result. If descendants is True a list of results will be returned.

Exceptions:

Type Description
KeyError

if results are not available for message

TimeoutError

if results are not available for message following timeout.

Notes

  • The order of results when descendants=True is determined by the time at which the data was written to the backend.
  • timeout only applies to message, even if descendants=True.

Warning

  • If a message has a projenitor, the projenitor_uuid field in the message must be set.

Examples:

>>> from alsek import Message
>>> from alsek.storage.backends.disk import DiskCacheBackend
>>> from alsek.storage.result import ResultStore
>>> backend = DiskCacheBackend()
>>> result_store = ResultStore(backend)
>>> result_store.get(Message(uuid="..."))
Source code in alsek/storage/result.py
def get(
    self,
    message: Message,
    timeout: int = 0,
    keep: bool = False,
    with_metadata: bool = False,
    descendants: bool = False,
) -> Union[Any, List[Any]]:
    """Get the result for ``message``.

    Args:
        message (Message): an Alsek message.
        timeout (int): amount of time (in milliseconds) to wait
            for the result to become available
        keep (bool): whether or not to keep the result afer fetching it.
            Defaults to ``False`` to conserve storage space.
        with_metadata (bool): if ``True`` return results of the form
            ``{"result": <result>, "uuid": str, "timestamp": int}``, where
            "result" is the result persisted to the backend, "uuid" if the uuid
             of the message associated with the result and "timestamp" is the
            time at which the result was written to the backend.
        descendants (bool): if ``True`` also fetch results for descendants.

    Returns:
        result (Any, List[Any]): the stored result. If ``descendants``
            is ``True`` a list of results will be returned.

    Raises:
        KeyError: if results are not available for ``message``
        TimeoutError: if results are not available for ``message``
            following ``timeout``.

    Notes:
        * The order of results when ``descendants=True`` is determined
          by the time at which the data was written to the backend.
        * ``timeout`` only applies to ``message``, even if ``descendants=True``.

    Warning:
        * If a message has a projenitor, the ``projenitor_uuid`` field in the
          ``message`` must be set.

    Examples:
        >>> from alsek import Message
        >>> from alsek.storage.backends.disk import DiskCacheBackend
        >>> from alsek.storage.result import ResultStore

        >>> backend = DiskCacheBackend()
        >>> result_store = ResultStore(backend)

        >>> result_store.get(Message(uuid="..."))

    """
    if not self.exists(message, descendants=descendants):
        if timeout:
            waiter(
                lambda: self.exists(message, descendants=descendants),
                timeout=timeout,
                timeout_msg=f"Timeout waiting on result for {message.summary}",
                sleep_interval=_GET_RESULT_WAIT_SLEEP_INTERVAL,
            )
        else:
            raise KeyError(f"No results for {message.uuid}")

    names = self._get_all_storage_names(message, descendants=descendants)
    results = self._get_engine(names, with_metadata=with_metadata)
    if not keep:
        for n in names:
            self.backend.delete(n)

    return results if descendants else results[0]
get_storage_name(message) staticmethod

Get the name for message in the backend.

Parameters:

Name Type Description Default
message Message

an Alsek message

required

Returns:

Type Description
str

name (str): message-specific name

Source code in alsek/storage/result.py
@staticmethod
def get_storage_name(message: Message) -> str:
    """Get the name for ``message`` in the backend.

    Args:
        message (Message): an Alsek message

    Returns:
        name (str): message-specific name

    """
    if message.progenitor_uuid:
        return f"results:{message.progenitor_uuid}:descendants:{message.uuid}"
    else:
        return f"results:{message.uuid}"
set(self, message, result, nx=True)

Store a result for message.

Parameters:

Name Type Description Default
message Message

an Alsek message.

required
result Any

the result to persist

required
nx bool

if True the item must not exist prior to being set

True

Returns:

Type Description
None

None

Source code in alsek/storage/result.py
def set(self, message: Message, result: Any, nx: bool = True) -> None:
    """Store a ``result`` for ``message``.

    Args:
        message (Message): an Alsek message.
        result (Any): the result to persist
        nx (bool): if ``True`` the item must not exist prior to being set

    Returns:
        None

    """
    self.backend.set(
        self.get_storage_name(message),
        value={"result": result, "timestamp": utcnow_timestamp_ms()},
        nx=nx,
        ttl=message.result_ttl,
    )

serialization

Serialization

JsonSerializer

JSON serialization.

forward(obj) staticmethod

Encode an object.

Parameters:

Name Type Description Default
obj Any

an object to encode

required

Returns:

Type Description
Any

encoded (Any): JSON encoded object

Source code in alsek/storage/serialization.py
@staticmethod
def forward(obj: Any) -> Any:
    """Encode an object.

    Args:
        obj (Any): an object to encode

    Returns:
        encoded (Any): JSON encoded object

    """
    return json.dumps(obj)
reverse(obj) staticmethod

Decode an object.

Parameters:

Name Type Description Default
obj Any

an object to decode

required

Returns:

Type Description
Any

decoded (Any): JSON decoded object

Source code in alsek/storage/serialization.py
@staticmethod
def reverse(obj: Any) -> Any:
    """Decode an object.

    Args:
        obj (Any): an object to decode

    Returns:
        decoded (Any): JSON decoded object

    """
    if obj is None:
        return None
    return json.loads(obj)

Serializer

Base Serializer Class.

forward(obj) staticmethod

Encode an object for backend serialization.

Parameters:

Name Type Description Default
obj Any

an object to encode

required

Returns:

Type Description
Any

encoded (Any): encoded object

Source code in alsek/storage/serialization.py
@staticmethod
@abstractmethod
def forward(obj: Any) -> Any:
    """Encode an object for backend serialization.

    Args:
        obj (Any): an object to encode

    Returns:
        encoded (Any): encoded object

    """
    raise NotImplementedError()
reverse(obj) staticmethod

Decode an object.

Parameters:

Name Type Description Default
obj Any

an object to decode

required

Returns:

Type Description
Any

decoded (Any): decoded object

Source code in alsek/storage/serialization.py
@staticmethod
@abstractmethod
def reverse(obj: Any) -> Any:
    """Decode an object.

    Args:
        obj (Any): an object to decode

    Returns:
        decoded (Any): decoded object

    """
    raise NotImplementedError()

tools special

Tools

iteration

Result Iteration

ResultPool

Tooling for iterating over task results.

Parameters:

Name Type Description Default
result_store ResultStore

store where task results are persisted

required

Examples:

>>> from alsek.storage.result import ResultStore
>>> from alsek.tools.iteration import ResultPool
...
>>> result_store = ResultStore(...)
>>> pool = ResultPool(result_store)
...
>>> messages = [...]
>>> for uuid, result in pool.istream(*messages):
...     pass
istream(self, *messages, *, wait=5000, descendants=False, **kwargs)

Stream the results of one or more messages. Results are yielded in the order in which they become available. (This may differ from the order in which messages are provided.)

Parameters:

Name Type Description Default
*messages Message

one or more messages to iterate over

()
wait int

time to wait (in milliseconds) between checks for available results

5000
descendants bool

if True, wait for and return the results of all descendant (callback) messages.

False
**kwargs Any

keyword arguments to pass to result_store.get().

{}

results (iterable): an iterable of results of the form (Message, result).

Warning

  • By default, result_store does not keep messages once they have been collected. As a result, providing messages for which the corresponding results have already been collected (and deleted) will cause this method to loop indefinitely. In order to loop over messages multiple times set keep=True.
Source code in alsek/tools/iteration.py
def istream(
    self,
    *messages: Message,
    wait: int = 5 * 1000,
    descendants: bool = False,
    **kwargs: Any,
) -> Iterable[Tuple[Message, Any]]:
    """Stream the results of one or more messages. Results are yielded
    in the order in which they become available. (This may differ from
    the order in which messages are provided.)

    Args:
        *messages (Message): one or more messages to iterate over
        wait (int): time to wait (in milliseconds) between checks for
            available results
        descendants (bool): if ``True``, wait for and return
            the results of all descendant (callback) messages.
        **kwargs (Keyword Args): keyword arguments to pass to
            ``result_store.get()``.

    results (iterable): an iterable of results of the form
        ``(Message, result)``.

    Warning:
        * By default, ``result_store`` does not keep messages once
          they have been collected. As a result, providing messages
          for which the corresponding results have already been collected
          (and deleted) will cause this method to loop indefinitely.
          In order to loop over messages multiple times set ``keep=True``.

    """
    yield from self._engine(
        messages,
        wait=wait,
        break_on_error=False,
        descendants=descendants,
        **kwargs,
    )
stream(self, *messages, *, wait=5000, descendants=False, **kwargs)

Stream the results of one or more messages. The order of the results are guaranteed to match the order of messages.

Parameters:

Name Type Description Default
*messages Message

one or more messages to iterate over

()
wait int

time to wait (in milliseconds) between checks for available results

5000
descendants bool

if True, wait for and return the results of all descendant (callback) messages.

False
**kwargs Any

keyword arguments to pass to result_store.get().

{}

Returns:

Type Description
Iterable[Tuple[alsek.core.message.Message, Any]]

results (iterable): an iterable of results of the form (Message, result).

Warning

  • By default, result_store does not keep messages once they have been collected. As a result, providing messages for which the corresponding results have already been collected (and deleted) will cause this method to loop indefinitely. In order to loop over messages multiple times set keep=True.
Source code in alsek/tools/iteration.py
def stream(
    self,
    *messages: Message,
    wait: int = 5 * 1000,
    descendants: bool = False,
    **kwargs: Any,
) -> Iterable[Tuple[Message, Any]]:
    """Stream the results of one or more messages. The order of the
    results are guaranteed to match the order of ``messages``.

    Args:
        *messages (Message): one or more messages to iterate over
        wait (int): time to wait (in milliseconds) between checks for
            available results
        descendants (bool): if ``True``, wait for and return
            the results of all descendant (callback) messages.
        **kwargs (Keyword Args): keyword arguments to pass to
            ``result_store.get()``.

    Returns:
        results (iterable): an iterable of results of the form
            ``(Message, result)``.

    Warning:
        * By default, ``result_store`` does not keep messages once
          they have been collected. As a result, providing messages
          for which the corresponding results have already been collected
          (and deleted) will cause this method to loop indefinitely.
          In order to loop over messages multiple times set ``keep=True``.

    """
    yield from self._engine(
        messages,
        wait=wait,
        break_on_error=True,
        descendants=descendants,
        **kwargs,
    )