Reference 📚
Alsek
cli
special
cli
Command Line Interface
core
special
Core
backoff
Backoff Algorithms
Backoff
Base backoff class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
floor |
int |
minimum backoff in milliseconds |
required |
ceiling |
int |
maximum backoff in milliseconds |
required |
zero_override |
bool |
override backoff to zero if the number
of |
required |
parameters: Dict[str, Optional[int]]
property
readonly
Parameters of the current instance which uniquely characterize it.
Returns:
Type | Description |
---|---|
Dict[str, Optional[int]] |
params (dict): backoff parameters |
settings: Dict[str, Any]
property
readonly
Settings the current algorithm.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
serialization (BackoffSettingsType): summary of the current algorithm and parameters with sufficient information to reconstruct it. |
formula(self, incidents)
Implementation of the formula for computing the backoff.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
incidents |
int |
current number of incidents |
required |
Returns:
Type | Description |
---|---|
int |
int |
Source code in alsek/core/backoff.py
@abstractmethod
def formula(self, incidents: int) -> int:
"""Implementation of the formula for computing the backoff.
Args:
incidents (int): current number of incidents
Returns:
int
"""
raise NotImplementedError()
get(self, incidents)
Get the backoff.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
incidents |
int |
current number of incidents |
required |
Returns:
Type | Description |
---|---|
int |
backoff (int): backoff in milliseconds |
Source code in alsek/core/backoff.py
def get(self, incidents: int) -> int:
"""Get the backoff.
Args:
incidents (int): current number of incidents
Returns:
backoff (int): backoff in milliseconds
"""
if self.zero_override and incidents == 0:
return 0
return self._clipper(self.formula(incidents))
ConstantBackoff
Constant backoff.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
constant |
int |
amount of time (in milliseconds) to backoff. |
required |
**kwargs |
Keyword Args |
keyword arguments to pass to
|
required |
parameters: Dict[str, Optional[int]]
property
readonly
Parameters of the current ConstantBackoff
instance which uniquely characterize it.
Returns:
Type | Description |
---|---|
Dict[str, Optional[int]] |
params (dict): backoff parameters |
formula(self, incidents)
Constant backoff formula.
Implements:
where \(c\) is constant
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
incidents |
int |
current number of incidents |
required |
Returns:
Type | Description |
---|---|
int |
backoff (int): backoff in milliseconds |
Source code in alsek/core/backoff.py
def formula(self, incidents: int) -> int:
"""Constant backoff formula.
Implements:
$$c$$
where $c$ is `constant`.
Args:
incidents (int): current number of incidents
Returns:
backoff (int): backoff in milliseconds
"""
return self.constant
ExponentialBackoff
Exponential backoff.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
base |
int |
the base of the exponential (milliseconds) |
required |
factor |
int |
factor to multiply the result by |
required |
**kwargs |
Keyword Args |
keyword arguments to pass to
|
required |
parameters: Dict[str, Optional[int]]
property
readonly
Parameters of the current ExponentialBackoff
instance which uniquely characterize it.
Returns:
Type | Description |
---|---|
Dict[str, Optional[int]] |
params (dict): backoff parameters |
formula(self, incidents)
Exponential backoff formula.
Implements:
where \(f\) is factor
, \(b\) is base
and \(i\) is the number of incidents
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
incidents |
int |
current number of incidents |
required |
Returns:
Type | Description |
---|---|
int |
backoff (int): backoff in milliseconds |
Source code in alsek/core/backoff.py
def formula(self, incidents: int) -> int:
"""Exponential backoff formula.
Implements:
$$f * (b^{i})$$
where $f$ is `factor`, $b$ is `base` and $i$ is the number of `incidents`.
Args:
incidents (int): current number of incidents
Returns:
backoff (int): backoff in milliseconds
"""
return int(self.factor * (self.base ** incidents))
LinearBackoff
Linear backoff.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
factor |
int |
amount of time (in milliseconds) to add to backoff after each retry. |
required |
**kwargs |
Keyword Args |
keyword arguments to pass to
|
required |
parameters: Dict[str, Optional[int]]
property
readonly
Parameters of the current LinearBackoff
instance which uniquely characterize it.
Returns:
Type | Description |
---|---|
Dict[str, Optional[int]] |
params (dict): backoff parameters |
formula(self, incidents)
Linear backoff formula.
Implements:
where \(f\) is factor
and \(i\) is the number of incidents
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
incidents |
int |
current number of incidents |
required |
Returns:
Type | Description |
---|---|
int |
backoff (int): backoff in milliseconds |
Source code in alsek/core/backoff.py
def formula(self, incidents: int) -> int:
"""Linear backoff formula.
Implements:
$$f * i$$
where $f$ is `factor` and $i$ is the number of `incidents`.
Args:
incidents (int): current number of incidents
Returns:
backoff (int): backoff in milliseconds
"""
return int(self.factor * incidents)
settings2backoff(settings)
Convert backoff settings to a Backoff
instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
settings |
Dict[str, Any] |
backoff settings of
the form |
required |
Returns:
Type | Description |
---|---|
Backoff |
backoff (Backoff): a backoff instance |
Source code in alsek/core/backoff.py
def settings2backoff(settings: BackoffSettingsType) -> Backoff:
"""Convert backoff settings to a ``Backoff`` instance.
Args:
settings (BackoffSettingsType): backoff settings of
the form ``{"algorithm": str, "parameters": dict}``.
Returns:
backoff (Backoff): a backoff instance
"""
algorithm = _get_algorithm(settings["algorithm"])
return algorithm(**settings["parameters"])
broker
Broker
Broker
Alsek Broker.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
backend |
Backend |
backend for data storage |
required |
dlq_ttl |
int |
time to live (in milliseconds) for
Dead Letter Queue (DLQ). If |
required |
ack(self, message)
Acknowledge a message by removing it from the data backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
Message |
a message to acknowledge |
required |
Returns:
Type | Description |
---|---|
None |
None |
Warning
- Messages will not be redelivered once acked.
Source code in alsek/core/broker.py
@magic_logger(
before=lambda message: log.debug("Acking %s...", message.summary),
after=lambda input_: log.debug("Acked %s.", input_["message"].summary),
)
exists(self, message)
Determine if the message exists in the backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
Message |
an Alsek message |
required |
Returns:
Type | Description |
---|---|
bool |
exists (bool): whether or not the message exists. |
Source code in alsek/core/broker.py
def exists(self, message: Message) -> bool:
"""Determine if the message exists in the backend.
Args:
message (Message): an Alsek message
Returns:
exists (bool): whether or not the message exists.
"""
name = self.get_message_name(message)
return self.backend.exists(name)
fail(self, message)
Acknowledge and fail a message by removing it from the backend.
If dlq_ttl
is not null, the messages will be persisted to
the dead letter queue for the prescribed amount of time.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
Message |
an Alsek message |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in alsek/core/broker.py
@magic_logger(
before=lambda message: log.debug("Failing %s...", message.summary),
after=lambda input_: log.debug("Failed %s.", input_["message"].summary),
)
get_message_name(self, message)
Get the name for message
in the backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
Message |
an Alsek message |
required |
Returns:
Type | Description |
---|---|
str |
name (str): message-specific name |
Source code in alsek/core/broker.py
def get_message_name(self, message: Message) -> str:
"""Get the name for ``message`` in the backend.
Args:
message (Message): an Alsek message
Returns:
name (str): message-specific name
"""
subnamespace = self.get_subnamespace(message.queue, message.task_name)
return f"{subnamespace}:messages:{message.uuid}"
get_subnamespace(queue=None, task_name=None)
staticmethod
Get the subnamespace for a given queue
and (optionally) task_name
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
queue |
Optional[str] |
the name of the queue |
None |
task_name |
Optional[str] |
name of the task |
None |
Returns:
Type | Description |
---|---|
str |
subnamespace (str): queue-specific namespace |
Exceptions:
Type | Description |
---|---|
ValueError |
if |
Source code in alsek/core/broker.py
@staticmethod
def get_subnamespace(
queue: Optional[str] = None,
task_name: Optional[str] = None,
) -> str:
"""Get the subnamespace for a given ``queue``
and (optionally) ``task_name``.
Args:
queue (str, optional): the name of the queue
task_name (str): name of the task
Returns:
subnamespace (str): queue-specific namespace
Raises:
ValueError: if ``task_name`` is provided and ``queue`` is not.
"""
if queue is None and task_name is not None:
raise ValueError("`queue` must be provided if `task_name` is not None")
if queue and task_name:
return f"queues:{queue}:tasks:{task_name}"
elif queue:
return f"queues:{queue}"
else:
return "queues"
nack(self, message)
Do not acknowledge a message and render it eligible for redelivery.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
Message |
a message to not acknowledge |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in alsek/core/broker.py
@magic_logger( # noqa
before=lambda message: log.debug("Nacking %s...", message.summary),
after=lambda input_: log.debug("Nacked %s.", input_["message"].summary),
)
remove(self, message)
Remove a message from the backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
Message |
an Alsek message |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in alsek/core/broker.py
@magic_logger(
before=lambda message: log.debug("Removing %s...", message.summary),
after=lambda input_: log.debug("Removed %s.", input_["message"].summary),
)
retry(self, message)
Retry a message.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
Message |
an Alsek message |
required |
Returns:
Type | Description |
---|---|
None |
None |
Warning
- This method will mutate
message
by incrementing it.
Source code in alsek/core/broker.py
@magic_logger(
before=lambda message: log.debug("Retrying %s...", message.summary),
)
def retry(self, message: Message) -> None:
"""Retry a message.
Args:
message (Message): an Alsek message
Returns:
None
Warning:
* This method will mutate ``message`` by incrementing it.
"""
if not self.exists(message):
raise MessageDoesNotExistsError(
f"Message '{message.uuid}' not found in backend"
)
message.increment()
self.backend.set(self.get_message_name(message), value=message.data)
self.nack(message)
log.info(
"Retrying %s in %s ms...",
message.summary,
format(message.get_backoff_duration(), ","),
)
submit(self, message, ttl=604800000)
Submit a message for processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
Message |
an Alsek message |
required |
ttl |
int |
time to live for the submitted message in milliseconds |
604800000 |
Returns:
Type | Description |
---|---|
None |
None |
Exceptions:
Type | Description |
---|---|
MessageAlreadyExistsError |
if the message already exists |
Source code in alsek/core/broker.py
@magic_logger(
before=lambda message: log.debug("Submitting %s...", message.summary),
after=lambda input_: log.debug("Submitted %s.", input_["message"].summary),
)
concurrency
Concurrency
Lock
Distributed mutual exclusion (MUTEX) lock.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
name of the lock |
required |
backend |
Backend |
backend for data storage |
required |
ttl |
int |
time to live in milliseconds.
If |
required |
auto_release |
bool |
if |
required |
Warning
- Locks are global and do not consider queues, unless
included in
name
. - When used as a context manager, the lock is not automatically
acquired. Lock acquisition requires calling
acquire()
.
Examples:
>>> from alsek import Lock
>>> from alsek.storage.backends.disk import DiskCacheBackend
...
>>> backend = DiskCacheBackend()
...
>>> with Lock("mutex", backend=backend) as lock:
>>> if lock.acquire(strict=False):
>>> print("Acquired lock.")
>>> else:
>>> print("Did not acquire lock.")
held: bool
property
readonly
If the lock is held by the current host.
holder: Optional[str]
property
readonly
Name of the host that currently holds the lock, if any.
long_name: str
property
readonly
Subnamespace for the lock.
__enter__(self)
special
Enter the context and try to acquire the lock.
Returns:
Type | Description |
---|---|
Lock |
lock (Lock): underlying lock object. |
Source code in alsek/core/concurrency.py
def __enter__(self) -> Lock:
"""Enter the context and try to acquire the lock.
Returns:
lock (Lock): underlying lock object.
"""
return self
__exit__(self, exc_type, exc_val, exc_tb)
special
Exit the context. If auto_release
is enabled,
the lock will be released.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
exc_val |
Optional[BaseException] |
an exception from within the context |
required |
exc_val |
Optional[BaseException] |
value of any exception from within the context |
required |
exc_tb |
Optional[TracebackType] |
the traceback from the context |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in alsek/core/concurrency.py
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
"""Exit the context. If ``auto_release`` is enabled,
the lock will be released.
Args:
exc_val (BaseException, optional): an exception from within the context
exc_val (BaseException, optional): value of any exception from
within the context
exc_tb (TracebackType, optional): the traceback from the context
Returns:
None
"""
if self.auto_release:
self.release()
acquire(self, strict=True)
Try to acquire the lock.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
strict |
bool |
if |
True |
Returns:
Type | Description |
---|---|
bool |
acquired (bool): |
Warning
True
is only returned if execution of this method resulted in acquisition of the lock. This means thatFalse
will be returned if the current host already holds the lock.
Source code in alsek/core/concurrency.py
def acquire(self, strict: bool = True) -> bool:
"""Try to acquire the lock.
Args:
strict (bool): if ``True`` return ``False`` if
the lock is already held.
Returns:
acquired (bool): ``True`` if the message is
acquired or already acquired by the current
host.
Warning:
* ``True`` is only returned if execution of this method
resulted in acquisition of the lock. This means that
``False`` will be returned if the current host already
holds the lock.
"""
if self.held:
return not strict
try:
self.backend.set(self.long_name, value=gethostname(), nx=True, ttl=self.ttl)
return True
except KeyError:
return False
release(self)
Release the lock.
Returns:
Type | Description |
---|---|
bool |
released (bool): whether or not the lock was found and released. |
Source code in alsek/core/concurrency.py
def release(self) -> bool:
"""Release the lock.
Returns:
released (bool): whether or not the lock was
found and released.
"""
if self.held:
self.backend.delete(self.long_name, missing_ok=True)
return True
else:
return False
consumer
Consumer
Consumer
Tool for consuming messages generated by the broker.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
broker |
Broker |
an Alsek broker |
required |
subset |
List[str], Dict[str, List[str]] |
subset of messages to consume Must be one of the following
|
required |
backoff |
Backoff |
backoff to use in response to passes over the backend which did not yield any actionable messages. |
required |
Notes
- If
subset
is alist
ordict
, queue priority is derived from the order of the items. Items which appear earlier are given higher priority. - If
subset
is adict
, task priority is derived from the order of task names in the value associated with each key (queue).
Warning
- If
subset
is of typedict
, task names not included in the any of the values will be ignored.
stream(self)
Generate a stream of messages to process from the data backend.
Returns:
Type | Description |
---|---|
Iterable[alsek.core.message.Message] |
stream (Iterable[Message]): an iterable of messages to process |
Source code in alsek/core/consumer.py
def stream(self) -> Iterable[Message]:
"""Generate a stream of messages to process from
the data backend.
Returns:
stream (Iterable[Message]): an iterable of messages to process
"""
while not self.stop_signal.received:
for message in self._poll():
yield message
self.stop_signal.wait(self.backoff.get(self._empty_passes))
futures
Futures
ProcessTaskFuture
Future for task execution in a separate process.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
Task |
a task to perform |
required |
message |
Message |
a message to run |
required |
patience |
int |
time to wait (in milliseconds) after issuing a SIGTERM signal to the process at shutdown. If the process is still active after this time, a SIGKILL will be issued. |
required |
complete: bool
property
readonly
Whether or not the task has finished.
stop(self, exception)
Stop the future.
Returns:
Type | Description |
---|---|
None |
None |
Source code in alsek/core/futures.py
def stop(self, exception: Type[BaseException]) -> None:
"""Stop the future.
Returns:
None
"""
if self._process.ident is None: # type: ignore
log.error(
"Process task future for %s did not start.",
self.message.summary,
)
return None
self._shutdown()
if self._wrapper_exit_queue.empty():
self._wrapper_exit_queue.put(1)
try:
raise exception(f"Stopped process {self._process.ident}") # type: ignore
except BaseException as error:
log.error("Error processing %s.", self.message.summary, exc_info=True)
_retry_future_handler(self.task, self.message, exception=error)
TaskFuture
Future for background task execution.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
Task |
a task to perform |
required |
message |
Message |
a message to run |
required |
complete: bool
property
readonly
Whether or not the task has finished.
time_limit_exceeded: bool
property
readonly
Whether or not task has been running longer than the allowed time window.
stop(self, exception)
Stop the future.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
exception |
Type[BaseException] |
exception type to raise. |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in alsek/core/futures.py
@abstractmethod
def stop(self, exception: Type[BaseException]) -> None:
"""Stop the future.
Args:
exception (Type[BaseException]): exception type to raise.
Returns:
None
"""
raise NotImplementedError()
ThreadTaskFuture
Future for task execution in a separate thread.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
Task |
a task to perform |
required |
message |
Message |
a message to run |
required |
complete: bool
property
readonly
Whether or not the task has finished.
stop(self, exception)
Stop the future.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
exception |
Type[BaseException] |
exception type to raise. |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in alsek/core/futures.py
def stop(self, exception: Type[BaseException]) -> None:
"""Stop the future.
Args:
exception (Type[BaseException]): exception type to raise.
Returns:
None
"""
if self._thread.ident is None:
log.error(
"Thread task future for %s did not start.",
self.message.summary,
)
return None
elif python_implementation() != "CPython":
log.error(
f"Unable to raise exception {exception} in thread {self._thread.ident}. "
f"Unsupported platform '{python_implementation()}'."
)
return None
thread_raise(self._thread.ident, exception=exception)
if not self._wrapper_exit:
self._wrapper_exit = True
_retry_future_handler(
self.task,
message=self.message,
exception=exception(f"Stopped thread {self._thread.ident}"),
)
message
Message
Message
Alsek Message.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task_name |
str |
the name of the task for which the message is intended |
required |
queue |
str |
the queue for which the message was intended.
If |
required |
args |
list, tuple |
positional arguments to pass to
the task's function during the execution of |
required |
kwargs |
dict |
keyword arguments to pass to
the task's function during the execution of |
required |
metadata |
dict |
a dictionary of user-defined message metadata. This can store any data types supported by the backend's serializer. |
required |
result_ttl |
int |
time to live (in milliseconds) for the
result in the result store. If a result store is provided and
this parameter is |
required |
uuid |
str |
universal unique identifier for the message.
If |
required |
progenitor_uuid |
str |
universal unique identifier for the message from which this message descended. (This field is only set in for tasks with triggers and/or callbacks.) |
required |
retries |
int |
number of retries |
required |
timeout |
int |
the maximum amount of time (in milliseconds) a task is permitted to run against this message. |
required |
created_at |
int |
UTC timestamp (in milliseconds) for when the message was created |
required |
updated_at |
int |
UTC timestamp (in milliseconds) for when the message was last updated |
required |
delay |
int |
delay before the message becomes ready |
required |
previous_result |
any |
the output of any previously executed task. (This will only be non-null in cases where callbacks are used.) |
required |
previous_message_uuid |
str |
universal unique identifier for the message for the preceeding message (This will only be non-null in cases where callbacks are used.) |
required |
callback_message_data |
dict |
data to construct a new message as part of a callback operation |
required |
backoff_settings |
dict |
parameters to control
backoff. Expected to be of the form
|
required |
mechanism |
str |
mechanism for executing the task. Must be either "process" or "thread". |
required |
Notes
- While not recommended,
timeout
can be disabled, in effect, by setting it to a very large integer.
data: Dict[str, Any]
property
readonly
Underlying message data.
descendant_uuids: Optional[List[str]]
property
readonly
A list of uuids which have or will decent from this message.
ready: bool
property
readonly
If the messages is currently ready for processing.
ready_at: int
property
readonly
Timestamp denoting when the message will be ready for processing.
summary: str
property
readonly
High-level summary of the message object.
ttr: int
property
readonly
Time to ready in milliseconds.
clone(self)
Create an exact copy of the current message.
Returns:
Type | Description |
---|---|
Message |
clone (Message): the cloned message |
Source code in alsek/core/message.py
def clone(self) -> Message:
"""Create an exact copy of the current message.
Returns:
clone (Message): the cloned message
"""
return Message(**deepcopy(self.data))
duplicate(self, uuid=None)
Create a duplicate of the current message, changing only uuid
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
Optional[str] |
universal unique identifier for the new message.
If |
None |
Returns:
Type | Description |
---|---|
Message |
duplicate_message (Message): the duplicate message |
Warning
- Linked locks are not conserved
Source code in alsek/core/message.py
def duplicate(self, uuid: Optional[str] = None) -> Message:
"""Create a duplicate of the current message, changing only ``uuid``.
Args:
uuid (str, optional): universal unique identifier for the new message.
If ``None``, one will be generated automatically.
Returns:
duplicate_message (Message): the duplicate message
Warning:
* Linked locks are not conserved
"""
return self.clone().update(uuid=uuid or _make_uuid())
get_backoff_duration(self)
Get the amount of time to backoff (wait) before the message is eligible for processing again, should it fail.
Returns:
Type | Description |
---|---|
int |
duration (int): duration of the backoff in milliseconds |
Source code in alsek/core/message.py
def get_backoff_duration(self) -> int:
"""Get the amount of time to backoff (wait)
before the message is eligible for processing again,
should it fail.
Returns:
duration (int): duration of the backoff in milliseconds
"""
return settings2backoff(self.backoff_settings).get(self.retries)
increment(self)
Update a message by increasing the number of retries.
Returns:
Type | Description |
---|---|
Message |
message (Message): the updated message |
Notes
updated_at
will be updated to the current time.
Warning
- Changes are not automatically persisted to the backend.
Source code in alsek/core/message.py
def increment(self) -> Message:
"""Update a message by increasing the number
of retries.
Returns:
message (Message): the updated message
Notes:
* ``updated_at`` will be updated to the
current time.
Warning:
* Changes are *not* automatically persisted to the backend.
"""
return self.update(retries=self.retries + 1, updated_at=utcnow_timestamp_ms())
update(self, **data)
Update the data
in the current message.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**data |
Any |
key value pairs of data to update |
{} |
Returns:
Type | Description |
---|---|
Message |
updated_message (Message): the updated message |
Warning
- This method operates 'in place'. To avoid changing the current
message, first call
.clone()
, e.g.,message.clone().update(...)
. - Changes are not automatically persisted to the backend.
Source code in alsek/core/message.py
def update(self, **data: Any) -> Message:
"""Update the ``data`` in the current message.
Args:
**data (Keyword Args): key value pairs of
data to update
Returns:
updated_message (Message): the updated message
Warning:
* This method operates 'in place'. To avoid changing the current
message, first call ``.clone()``, e.g., ``message.clone().update(...)``.
* Changes are *not* automatically persisted to the backend.
"""
for k, v in data.items():
if k in self.data:
setattr(self, k, v)
else:
raise KeyError(f"Unsupported key '{k}'")
return self
status
Status Tracking
StatusTracker
Alsek Status Tracker.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
broker |
Broker |
broker used by tasks. |
required |
ttl |
int |
time to live (in milliseconds) for the status |
required |
integrity_scan_trigger |
CronTrigger, DateTrigger, IntervalTrigger |
trigger which determines how often to scan for messages with non-terminal
statuses (i.e., |
required |
delete(self, message, check=True)
Delete the status of message
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
Message |
an Alsek message |
required |
check |
bool |
check that it is safe to delete the status.
This is done by ensuring that the current status of |
True |
Returns:
Type | Description |
---|---|
None |
None |
Exceptions:
Type | Description |
---|---|
ValidationError |
if |
Source code in alsek/core/status.py
def delete(self, message: Message, check: bool = True) -> None:
"""Delete the status of ``message``.
Args:
message (Message): an Alsek message
check (bool): check that it is safe to delete the status.
This is done by ensuring that the current status of ``message``
is terminal (i.e., ``TaskStatus.FAILED`` or ``TaskStatus.SUCCEEDED``).
Returns:
None
Raises:
ValidationError: if ``check`` is ``True`` and the status of
``message`` is not ``TaskStatus.FAILED`` or ``TaskStatus.SUCCEEDED``.
"""
if check and self.get(message) not in TERMINAL_TASK_STATUSES:
raise ValidationError(f"Message '{message.uuid}' in a non-terminal state")
self._backend.delete(self.get_storage_name(message), missing_ok=False)
exists(self, message)
Check if a status for message
exists in the backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
Message |
an Alsek message |
required |
Returns:
Type | Description |
---|---|
bool |
bool |
Source code in alsek/core/status.py
def exists(self, message: Message) -> bool:
"""Check if a status for ``message`` exists in the backend.
Args:
message (Message): an Alsek message
Returns:
bool
"""
return self._backend.exists(self.get_storage_name(message))
get(self, message)
Get the status of message
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
Message |
an Alsek message |
required |
Returns:
Type | Description |
---|---|
TaskStatus |
status (TaskStatus): the status of |
Source code in alsek/core/status.py
def get(self, message: Message) -> TaskStatus:
"""Get the status of ``message``.
Args:
message (Message): an Alsek message
Returns:
status (TaskStatus): the status of ``message``
"""
status_name = self._backend.get(self.get_storage_name(message))
return TaskStatus[status_name]
set(self, message, status)
Set a status
for message
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
Message |
an Alsek message |
required |
status |
TaskStatus |
a status to set |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in alsek/core/status.py
def set(self, message: Message, status: TaskStatus) -> None:
"""Set a ``status`` for ``message``.
Args:
message (Message): an Alsek message
status (TaskStatus): a status to set
Returns:
None
"""
self._backend.set(
self.get_storage_name(message),
value=status.name,
ttl=self.ttl if status == TaskStatus.SUBMITTED else None,
)
TaskStatus
Alsek task statuses.
task
Task
Task
Alsek Task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
function |
callable |
function to use for the main operation |
required |
broker |
Broker |
an Alsek broker |
required |
name |
str |
the name of the task. If |
required |
queue |
str |
the name of the queue to generate the task on.
If |
required |
priority |
int |
priority of the task. Tasks with lower values will be executed before tasks with higher values. |
required |
timeout |
int |
the maximum amount of time (in milliseconds) this task is permitted to run. |
required |
max_retries |
int |
maximum number of allowed retries |
required |
backoff |
Backoff |
backoff algorithm and parameters to use when computing delay between retries |
required |
result_store |
ResultStore |
store for persisting task results |
required |
status_tracker |
StatusTracker |
store for persisting task statuses |
required |
mechanism |
str |
mechanism for executing the task. Must be either "process" or "thread". |
required |
Notes
do_retry()
can be overridden in cases wheremax_retries
is not sufficiently complex to determine if a retry should occur.
Warning
- Timeouts are not supported for
mechanism='thread'
on Python implementations other than CPython.
deferred: bool
property
readonly
Whether or not deferred mode is currently enabled.
name: str
property
readonly
Name of the task.
cancel_defer(self)
Cancel "deferred" mode.
Returns:
Type | Description |
---|---|
Task |
task (Task): the current task |
Source code in alsek/core/task.py
def cancel_defer(self) -> Task:
"""Cancel "deferred" mode.
Returns:
task (Task): the current task
"""
self._deferred = False
return self
defer(self)
Enter "deferred" mode.
Do not submit the next message created by generate()
to the broker.
Returns:
Type | Description |
---|---|
Task |
task (Task): the current task |
Warning
- Deferred mode is automatically cancelled by
generate()
prior to it returning.
Source code in alsek/core/task.py
def defer(self) -> Task:
"""Enter "deferred" mode.
Do not submit the next message created by ``generate()``
to the broker.
Returns:
task (Task): the current task
Warning:
* Deferred mode is automatically cancelled by ``generate()``
prior to it returning.
"""
self._deferred = True
return self
do_callback(self, message, result)
Whether or to submit the callback provided.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
Message |
message with the callback |
required |
result |
Any |
output of |
required |
Returns:
Type | Description |
---|---|
bool |
bool |
Warning
- If the task message does not have a callback this this method will not be invoked.
Source code in alsek/core/task.py
def do_callback(self, message: Message, result: Any) -> bool: # noqa
"""Whether or to submit the callback provided.
Args:
message (Message): message with the callback
result (Any): output of ``op()``
Returns:
bool
Warning:
* If the task message does not have a callback this
this method will *not* be invoked.
"""
return True
do_retry(self, message, exception)
Whether or not a failed task should be retried.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
Message |
message which failed |
required |
exception |
BaseException |
the exception which was raised |
required |
Returns:
Type | Description |
---|---|
bool |
bool |
Source code in alsek/core/task.py
def do_retry(self, message: Message, exception: BaseException) -> bool: # noqa
"""Whether or not a failed task should be retried.
Args:
message (Message): message which failed
exception (BaseException): the exception which was raised
Returns:
bool
"""
return self.max_retries is None or message.retries < self.max_retries
execute(self, message)
Execute the task against a message.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
Message |
message to process |
required |
Returns:
Type | Description |
---|---|
Any |
result (Any): output of |
Source code in alsek/core/task.py
def execute(self, message: Message) -> Any:
"""Execute the task against a message.
Args:
message (Message): message to process
Returns:
result (Any): output of ``op()``
"""
self.pre_op(message)
result = self.op(message)
self.post_op(message, result=result)
return result
generate(self, args=None, kwargs=None, metadata=None, result_ttl=None, uuid=None, timeout_override=None, delay=None, previous_result=None, callback=None, submit=True, **options)
Generate an instance of the task for processing.
This method generates a new message for the task and submit it to the broker.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
Optional[Union[List[Any], Tuple[Any, ...]]] |
positional arguments to pass to |
None |
kwargs |
Optional[Dict[Any, Any]] |
keyword arguments to pass to |
None |
metadata |
Optional[Dict[Any, Any]] |
a dictionary of user-defined message metadata. This can store any data types supported by the backend's serializer. |
None |
result_ttl |
Optional[int] |
time to live (in milliseconds) for the result in the result store. If a result store is provided and |
None |
uuid |
Optional[str] |
universal unique identifier for the message.
If |
None |
timeout_override |
Optional[int] |
override the default maximum runtime (in milliseconds) for instances of this task. |
None |
delay |
Optional[int] |
delay before message is ready |
None |
previous_result |
Any |
result of a previous task. |
None |
callback |
Optional[Union[Message, Tuple[Message, ...]]] |
one ore more messages to be submitted to the broker after the proceeding message has been successfully processed by a worker. |
None |
submit |
bool |
if |
True |
options |
Any |
options to use when submitting
the message via the broker. See |
{} |
Returns:
Type | Description |
---|---|
Message |
message (Message): message generated for the task |
Warning
submit
is overridden toFalse
if deferred mode is activeuuid
is refreshed after the first event when using a trigger.
Source code in alsek/core/task.py
def generate(
self,
args: Optional[Union[List[Any], Tuple[Any, ...]]] = None,
kwargs: Optional[Dict[Any, Any]] = None,
metadata: Optional[Dict[Any, Any]] = None,
result_ttl: Optional[int] = None,
uuid: Optional[str] = None,
timeout_override: Optional[int] = None,
delay: Optional[int] = None,
previous_result: Any = None,
callback: Optional[Union[Message, Tuple[Message, ...]]] = None,
submit: bool = True,
**options: Any,
) -> Message:
"""Generate an instance of the task for processing.
This method generates a new message for the task and
submit it to the broker.
Args:
args (list, tuple, optional): positional arguments to pass to ``function``
kwargs (dict, optional): keyword arguments to pass to ``function``
metadata (dict, optional): a dictionary of user-defined message metadata.
This can store any data types supported by the backend's serializer.
result_ttl (int, optional): time to live (in milliseconds) for the
result in the result store. If a result store is provided and
this parameter is ``None``, the result will be persisted indefinitely.
uuid (str, optional): universal unique identifier for the message.
If ``None``, one will be generated automatically.
timeout_override (int, optional): override the default maximum runtime
(in milliseconds) for instances of this task.
set maximum amount of time (in milliseconds)
this message is permitted to run. This will override the default
for the task.
delay (int, optional): delay before message is ready
previous_result (Any): result of a previous task.
callback (Message, Tuple[Message, ...], optional): one ore more messages
to be submitted to the broker after the proceeding message has been
successfully processed by a worker.
submit (bool): if ``True`` submit the task to the broker
options (Keyword Args): options to use when submitting
the message via the broker. See ``Broker.submit()``.
Returns:
message (Message): message generated for the task
Warning:
* ``submit`` is overridden to ``False`` if deferred mode is active
* ``uuid`` is refreshed after the first event when using a trigger.
"""
if result_ttl and not self.result_store:
raise ValidationError(f"`result_ttl` invalid. No result store set.")
message = Message(
task_name=self.name,
queue=self.queue,
args=args,
kwargs=kwargs,
metadata=metadata,
result_ttl=result_ttl,
uuid=uuid,
timeout=timeout_override or self.timeout,
delay=delay,
previous_result=previous_result,
callback_message_data=_parse_callback(callback).data if callback else None,
backoff_settings=self.backoff.settings,
mechanism=self.mechanism,
)
if self._deferred:
self.cancel_defer()
elif submit:
self._submit(message, **options)
self._update_status(message, status=TaskStatus.SUBMITTED)
return message
op(self, message)
Pass message
data to function
for processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
Message |
message to perform the operation against |
required |
Returns:
Type | Description |
---|---|
Any |
result (Any): output of the function |
Notes
- If, and only if, the signature of
function
contains amessage
parameter,message
itself will be passed along with anyargs
andkwargs
contained in the message.
Warning
message
will not be passed in cases where a "message" exists inmessage.kwargs
.
Source code in alsek/core/task.py
def op(self, message: Message) -> Any:
"""Pass ``message`` data to ``function`` for processing.
Args:
message (Message): message to perform the operation against
Returns:
result (Any): output of the function
Notes:
* If, and only if, the signature of ``function`` contains
a ``message`` parameter, ``message`` itself will be passed
along with any ``args`` and ``kwargs`` contained in the message.
Warning:
* ``message`` will not be passed in cases where a "message"
exists in ``message.kwargs``.
"""
if "message" not in message.kwargs and _expects_message(self.function):
return self.function(*message.args, **message.kwargs, message=message)
else:
return self.function(*message.args, **message.kwargs)
post_op(self, message, result)
Operation to perform after running op
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
Message |
message |
required |
result |
Any |
output of |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in alsek/core/task.py
def post_op(self, message: Message, result: Any) -> None:
"""Operation to perform after running ``op``.
Args:
message (Message): message ``op()`` was run against
result (Any): output of ``op()``
Returns:
None
"""
pre_op(self, message)
Operation to perform before running op
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
Message |
message |
required |
Returns:
Type | Description |
---|---|
None |
None |
Source code in alsek/core/task.py
def pre_op(self, message: Message) -> None:
"""Operation to perform before running ``op``.
Args:
message (Message): message ``op`` will be run against.
Returns:
None
"""
TriggerTask
Triggered Task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
function |
callable |
function to use for the main operation |
required |
trigger |
CronTrigger, DateTrigger, IntervalTrigger |
trigger for task execution. |
required |
broker |
Broker |
an Alsek broker |
required |
name |
str |
the name of the task. If |
required |
queue |
str |
the name of the queue to generate the task on.
If |
required |
priority |
int |
priority of the task. Tasks with lower values will be executed before tasks with higher values. |
required |
timeout |
int |
the maximum amount of time (in milliseconds) this task is permitted to run. |
required |
max_retries |
int |
maximum number of allowed retries |
required |
backoff |
Backoff |
backoff algorithm and parameters to use when computing delay between retries |
required |
result_store |
ResultStore |
store for persisting task results |
required |
status_tracker |
StatusTracker |
store for persisting task statuses |
required |
mechanism |
str |
mechanism for executing the task. Must be either "process" or "thread". |
required |
Warnings
- The signature of
function
cannot contain parameters
Exceptions:
Type | Description |
---|---|
* ``SchedulingError`` |
if the signature of |
generated: bool
property
readonly
If the task has been generated.
clear(self)
Clear the currently scheduled task.
Returns:
Type | Description |
---|---|
None |
None |
Raises * AttributeError: if a task has not yet been generated
Source code in alsek/core/task.py
def clear(self) -> None:
"""Clear the currently scheduled task.
Returns:
None
Raises
* AttributeError: if a task has not yet
been generated
"""
try:
self.scheduler.remove_job(self.name)
except JobLookupError:
raise AttributeError("Task not generated")
pause(self)
Pause the underlying scheduler.
Returns:
Type | Description |
---|---|
None |
None |
Source code in alsek/core/task.py
def pause(self) -> None:
"""Pause the underlying scheduler.
Returns:
None
"""
self.scheduler.pause()
resume(self)
Resume the underlying scheduler.
Returns:
Type | Description |
---|---|
None |
None |
Source code in alsek/core/task.py
def resume(self) -> None:
"""Resume the underlying scheduler.
Returns:
None
"""
self.scheduler.resume()
shutdown(self)
Shutdown the underlying scheduler.
Returns:
Type | Description |
---|---|
None |
None |
Source code in alsek/core/task.py
def shutdown(self) -> None:
"""Shutdown the underlying scheduler.
Returns:
None
"""
self.scheduler.shutdown()
task(broker, name=None, queue=None, priority=0, timeout=3600000, max_retries=3, backoff=ExponentialBackoff(base=4, factor=10000, floor=60000, ceiling=3600000, zero_override=True), trigger=None, result_store=None, status_tracker=None, mechanism='process', base_task=None)
Wrapper for task construction.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
broker |
Broker |
an Alsek broker |
required |
name |
Optional[str] |
the name of the task. If |
None |
queue |
Optional[str] |
the name of the queue to generate the task on.
If |
None |
priority |
int |
priority of the task. Tasks with lower values will be executed before tasks with higher values. |
0 |
timeout |
int |
the maximum amount of time (in milliseconds) this task is permitted to run. |
3600000 |
max_retries |
int |
maximum number of allowed retries |
3 |
backoff |
Optional[Backoff] |
backoff algorithm and parameters to use when computing delay between retries |
ExponentialBackoff(base=4, factor=10000, floor=60000, ceiling=3600000, zero_override=True) |
trigger |
Optional[Union[CronTrigger, DateTrigger, IntervalTrigger]] |
trigger for task execution. |
None |
result_store |
Optional[ResultStore] |
store for persisting task results |
None |
status_tracker |
Optional[StatusTracker] |
store for persisting task statuses |
None |
mechanism |
str |
mechanism for executing the task. Must be either "process" or "thread". |
'process' |
base_task |
Optional[Type[Task]] |
base to use for task constuction.
If |
None |
Returns:
Type | Description |
---|---|
Callable[..., Task] |
wrapper (callable): task-wrapped function |
Exceptions:
Type | Description |
---|---|
* ValueError |
if a |
Examples:
>>> from alsek import Broker, task
>>> from alsek.storage.backends.disk import DiskCacheBackend
>>> backend = DiskCacheBackend()
>>> broker = Broker(backend)
>>> @task(broker)
... def add(a: int, b: int) -> int:
... return a + b
Source code in alsek/core/task.py
def task(
broker: Broker,
name: Optional[str] = None,
queue: Optional[str] = None,
priority: int = 0,
timeout: int = DEFAULT_TASK_TIMEOUT,
max_retries: int = DEFAULT_MAX_RETRIES,
backoff: Optional[Backoff] = ExponentialBackoff(),
trigger: Optional[Union[CronTrigger, DateTrigger, IntervalTrigger]] = None,
result_store: Optional[ResultStore] = None,
status_tracker: Optional[StatusTracker] = None,
mechanism: str = DEFAULT_MECHANISM,
base_task: Optional[Type[Task]] = None,
) -> Callable[..., Task]:
"""Wrapper for task construction.
Args:
broker (Broker): an Alsek broker
name (str, optional): the name of the task. If ``None``,
the class name will be used.
queue (str, optional): the name of the queue to generate the task on.
If ``None``, the default queue will be used.
priority (int): priority of the task. Tasks with lower values
will be executed before tasks with higher values.
timeout (int): the maximum amount of time (in milliseconds)
this task is permitted to run.
max_retries (int, optional): maximum number of allowed retries
backoff (Backoff, optional): backoff algorithm and parameters to use when computing
delay between retries
trigger (CronTrigger, DateTrigger, IntervalTrigger, optional): trigger
for task execution.
result_store (ResultStore, optional): store for persisting task results
status_tracker (StatusTracker, optional): store for persisting task statuses
mechanism (str): mechanism for executing the task. Must
be either "process" or "thread".
base_task (Type[Task]): base to use for task constuction.
If ``None``, a base task will be selected automatically.
Returns:
wrapper (callable): task-wrapped function
Raises:
* ValueError: if a ``trigger`` and not supported by ``base_task``
Examples:
>>> from alsek import Broker, task
>>> from alsek.storage.backends.disk import DiskCacheBackend
>>> backend = DiskCacheBackend()
>>> broker = Broker(backend)
>>> @task(broker)
... def add(a: int, b: int) -> int:
... return a + b
"""
parsed_base_task = _parse_base_task(base_task, trigger=trigger)
base_task_signature = inspect.signature(parsed_base_task.__init__)
if trigger and "trigger" not in base_task_signature.parameters:
raise ValueError(f"Trigger not supported by {parsed_base_task}")
def wrapper(function: Callable[..., Any]) -> Task:
return parsed_base_task( # type: ignore
function=function,
name=name,
broker=broker,
queue=queue,
priority=priority,
timeout=timeout,
max_retries=max_retries,
backoff=backoff,
mechanism=mechanism,
result_store=result_store,
status_tracker=status_tracker,
**( # noqa (handled above)
dict(trigger=trigger)
if trigger in base_task_signature.parameters
else dict()
),
)
return wrapper
worker
Worker Pool
WorkerPool
Pool of Alsek workers.
Generate a pool of workers to service tasks
.
The reference broker is extracted from tasks
and
therefore must be common among all tasks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks |
Collection[Task] |
one or more tasks to handle. This
must include all tasks the worker may encounter by listening
to |
required |
queues |
List[str] |
the names of one or more queues
consume messages from. If |
required |
max_threads |
int |
the maximum of tasks with |
required |
max_processes |
int |
the maximum of tasks with |
required |
management_interval |
int |
amount of time (in milliseconds) between maintenance scans of background task execution. |
required |
slot_wait_interval |
int |
amount of time (in milliseconds) to wait between checks to determine worker availability for pending tasks. |
required |
**kwargs |
Keyword Args |
Keyword arguments to pass to |
required |
Exceptions:
Type | Description |
---|---|
NoTasksFoundError |
if no tasks are provided |
MultipleBrokersError |
if multiple brokers are used by the collected tasks. |
run(self)
Run the worker pool.
This method coordinates the following:
1. Starting a background thread to monitor background tasks
2a. Recovering messages fom the data backend
2b. Processing recovered messages as places in the pool become available.
Returns:
Type | Description |
---|---|
None |
None |
Warning
- This method is blocking.
Source code in alsek/core/worker.py
@magic_logger(
before=lambda: log.info("Alsek v%s worker pool booting up...", __version__),
after=lambda: log.info("Graceful shutdown complete."),
)
exceptions
Exceptions
AlsekError
Base Alsek error.
MessageAlreadyExistsError
Message already exists in backend.
MessageDoesNotExistsError
Message does not exists in backend.
MultipleBrokersError
Multiple brokers in use.
NoTasksFoundError
No tasks found.
SchedulingError
Error scheduling work.
TaskNameCollisionError
Duplicate task detected.
TerminationError
Alsek termination error.
ValidationError
Data validation failed.
storage
special
Storage
backends
special
Backend
Backend
Backend base class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
namespace |
str |
prefix to use when inserting names in the backend |
required |
serializer |
Serializer |
tool for encoding and decoding values written into the backend. |
required |
clear_namespace(self, raise_on_error=True)
Clear all items in backend under the current namespace.
Returns:
Type | Description |
---|---|
int |
count (int): number of items cleared raise_on_error (bool): raise if a delete operation fails |
Exceptions:
Type | Description |
---|---|
KeyError |
if |
Source code in alsek/storage/backends/__init__.py
def clear_namespace(self, raise_on_error: bool = True) -> int:
"""Clear all items in backend under the current namespace.
Returns:
count (int): number of items cleared
raise_on_error (bool): raise if a delete operation fails
Raises:
KeyError: if ``raise_on_error`` and a delete operation fails
"""
count: int = 0
for name in self.scan():
try:
self.delete(name, missing_ok=False)
count += 1
except KeyError as error:
if raise_on_error:
raise error
else:
log.warning("Unable to delete %r", name)
return count
count(self, pattern=None)
Count the number of items in the backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pattern |
Optional[str] |
pattern to limit count to |
None |
Returns:
Type | Description |
---|---|
int |
count (int): number of matching names |
Source code in alsek/storage/backends/__init__.py
def count(self, pattern: Optional[str] = None) -> int:
"""Count the number of items in the backend.
Args:
pattern (str, optional): pattern to limit count to
Returns:
count (int): number of matching names
"""
return sum(1 for _ in self.scan(pattern))
delete(self, name, missing_ok=False)
Delete a name
from the backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
name of the item |
required |
missing_ok |
bool |
if |
False |
Returns:
Type | Description |
---|---|
None |
None |
Exceptions:
Type | Description |
---|---|
KeyError |
if |
Source code in alsek/storage/backends/__init__.py
@abstractmethod
def delete(self, name: str, missing_ok: bool = False) -> None:
"""Delete a ``name`` from the backend.
Args:
name (str): name of the item
missing_ok (bool): if ``True``, do not raise for missing
Returns:
None
Raises:
KeyError: if ``missing_ok`` is ``False`` and ``name`` is not found.
"""
raise NotImplementedError()
exists(self, name)
Check if name
exists in the backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
name of the item |
required |
Returns:
Type | Description |
---|---|
bool |
bool |
Source code in alsek/storage/backends/__init__.py
@abstractmethod
def exists(self, name: str) -> bool:
"""Check if ``name`` exists in the backend.
Args:
name (str): name of the item
Returns:
bool
"""
raise NotImplementedError()
full_name(self, name)
Get an item's complete name, including the namespace in which it exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
the name of an item |
required |
Returns:
Type | Description |
---|---|
str |
full_name (str): a name of the form |
Notes
- If
name
is already the full name, this method will collapse to a no-op.
Source code in alsek/storage/backends/__init__.py
def full_name(self, name: str) -> str:
"""Get an item's complete name, including the namespace
in which it exists.
Args:
name (str): the name of an item
Returns:
full_name (str): a name of the form ``"{NAMESPACE}:{name}"``
Notes:
* If ``name`` is already the full name, this method
will collapse to a no-op.
"""
if name.startswith(f"{self.namespace}:"):
return name
else:
return f"{self.namespace}:{name}"
get(self, name)
Get name
from the backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
name of the item |
required |
Returns:
Type | Description |
---|---|
Any |
Any |
Source code in alsek/storage/backends/__init__.py
@abstractmethod
def get(self, name: str) -> Any:
"""Get ``name`` from the backend.
Args:
name (str): name of the item
Returns:
Any
"""
raise NotImplementedError()
in_namespace(self, name)
Determine if name
belong to the current namespace.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
a name (key) |
required |
Returns:
Type | Description |
---|---|
bool |
bool |
Warning
name
should be a complete (i.e., full) name.
Source code in alsek/storage/backends/__init__.py
def in_namespace(self, name: str) -> bool:
"""Determine if ``name`` belong to the current namespace.
Args:
name (str): a name (key)
Returns:
bool
Warning:
* ``name`` should be a complete (i.e., _full_) name.
"""
return name.startswith(f"{self.namespace}:")
scan(self, pattern=None)
Scan the backend for matching names.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pattern |
Optional[str] |
pattern to limit search to |
None |
Returns:
Type | Description |
---|---|
Iterable[str] |
matches_stream (Iterable[str]): a stream of matching name |
Source code in alsek/storage/backends/__init__.py
@abstractmethod
def scan(self, pattern: Optional[str] = None) -> Iterable[str]:
"""Scan the backend for matching names.
Args:
pattern (str, optional): pattern to limit search to
Returns:
matches_stream (Iterable[str]): a stream of matching name
"""
raise NotImplementedError()
set(self, name, value, nx=False, ttl=None)
Set name
to value
in the backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
name of the item |
required |
value |
Any |
value to set for |
required |
nx |
bool |
if |
False |
ttl |
Optional[int] |
time to live for the entry in milliseconds |
None |
Returns:
Type | Description |
---|---|
None |
None |
Exceptions:
Type | Description |
---|---|
KeyError |
if |
Source code in alsek/storage/backends/__init__.py
@abstractmethod
def set(
self,
name: str,
value: Any,
nx: bool = False,
ttl: Optional[int] = None,
) -> None:
"""Set ``name`` to ``value`` in the backend.
Args:
name (str): name of the item
value (Any): value to set for ``name``
nx (bool): if ``True`` the item must not exist prior to being set
ttl (int, optional): time to live for the entry in milliseconds
Returns:
None
Raises:
KeyError: if ``nx`` is ``True`` and ``name`` already exists
"""
raise NotImplementedError()
short_name(self, name)
Get an item's short name, without the namespace in which it exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
the full name of an item |
required |
Returns:
Type | Description |
---|---|
str |
short_name (str): |
Notes
- If
name
is already the short name, this method will collapse to a no-op.
Source code in alsek/storage/backends/__init__.py
def short_name(self, name: str) -> str:
"""Get an item's short name, without the namespace
in which it exists.
Args:
name (str): the full name of an item
Returns:
short_name (str): ``name`` without the namespace prefix
Notes:
* If ``name`` is already the short name, this method
will collapse to a no-op.
"""
return re.sub(rf"^{self.namespace}:", repl="", string=name)
LazyClient
Lazy client.
Wrapper for lazy client initialization.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
client_func |
callable |
a callable which returns a backend client. |
required |
get(self)
Execute client_func
.
Returns:
Type | Description |
---|---|
Any |
client (Any): a backend client |
Source code in alsek/storage/backends/__init__.py
def get(self) -> Any:
"""Execute ``client_func``.
Returns:
client (Any): a backend client
"""
return self.client_func()
disk
Disk Backend
DiskCacheBackend
DiskCache Backend.
Backend powered by DiskCache.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
conn |
str, Path, DiskCache, LazyClient |
a directory, |
required |
name_match_func |
callable |
a callable to determine if
a name matches a specified pattern when scanning
the backend. If |
required |
namespace |
str |
prefix to use when inserting names in the backend |
required |
serializer |
Serializer |
tool for encoding and decoding values written into the backend. |
required |
Warning
DiskCache
persists data to a local (Sqlite) database and does
not implement 'server-side' "if not exist" on SET
(nx
) support. For
these reasons, DiskCacheBackend()
is recommended for development
and testing purposes only.
conn: Cache
property
readonly
Connection to the backend.
delete(self, name, missing_ok=False)
Delete a name
from the disk backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
name of the item |
required |
missing_ok |
bool |
if |
False |
Returns:
Type | Description |
---|---|
None |
None |
Exceptions:
Type | Description |
---|---|
KeyError |
if |
Source code in alsek/storage/backends/disk.py
def delete(self, name: str, missing_ok: bool = False) -> None:
"""Delete a ``name`` from the disk backend.
Args:
name (str): name of the item
missing_ok (bool): if ``True``, do not raise for missing
Returns:
None
Raises:
KeyError: if ``missing_ok`` is ``False`` and ``name`` is not found
of if ``name`` has a non-expired TTL.
"""
try:
self.conn.__delitem__(self.full_name(name), retry=False)
except KeyError:
if not missing_ok:
raise KeyError(f"No name '{name}' found")
destroy(self)
Destroy the cache.
Returns:
Type | Description |
---|---|
None |
None |
Warning
- This method recursively delete the cache directory. Use with caution.
- The backend will not be usable following execution of this method.
Source code in alsek/storage/backends/disk.py
def destroy(self) -> None:
"""Destroy the cache.
Returns:
None
Warning:
* This method recursively delete the cache directory.
Use with caution.
* The backend will not be usable following execution
of this method.
"""
shutil.rmtree(self.conn.directory)
exists(self, name)
Check if name
exists in the disk backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
name of the item |
required |
Returns:
Type | Description |
---|---|
bool |
bool |
Source code in alsek/storage/backends/disk.py
def exists(self, name: str) -> bool:
"""Check if ``name`` exists in the disk backend.
Args:
name (str): name of the item
Returns:
bool
"""
return self.full_name(name) in self.conn
get(self, name)
Get name
from the disk backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
name of the item |
required |
Returns:
Type | Description |
---|---|
Any |
Any |
Source code in alsek/storage/backends/disk.py
def get(self, name: str) -> Any:
"""Get ``name`` from the disk backend.
Args:
name (str): name of the item
Returns:
Any
"""
encoded = self.conn.get(self.full_name(name))
return self.serializer.reverse(encoded)
scan(self, pattern=None)
Scan the disk backend for matching names.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pattern |
Optional[str] |
pattern to limit search to |
None |
Returns:
Type | Description |
---|---|
Iterable[str] |
name_stream (Iterable[str]): a stream of matching name |
Source code in alsek/storage/backends/disk.py
def scan(self, pattern: Optional[str] = None) -> Iterable[str]:
"""Scan the disk backend for matching names.
Args:
pattern (str): pattern to limit search to
Returns:
name_stream (Iterable[str]): a stream of matching name
"""
for name in filter(self.in_namespace, self.conn):
short_name = self.short_name(name)
if self.name_match_func(pattern, short_name):
yield short_name
set(self, name, value, nx=False, ttl=None)
Set name
to value
in the disk backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
name of the item |
required |
value |
Any |
value to set for |
required |
nx |
bool |
if |
False |
ttl |
Optional[int] |
time to live for the entry in milliseconds |
None |
Returns:
Type | Description |
---|---|
None |
None |
Warning
nx
is implement client-side forDiskCache
. For this reason, it should not be relied upon for critical tasks.
Exceptions:
Type | Description |
---|---|
KeyError |
if |
Source code in alsek/storage/backends/disk.py
def set(
self,
name: str,
value: Any,
nx: bool = False,
ttl: Optional[int] = None,
) -> None:
"""Set ``name`` to ``value`` in the disk backend.
Args:
name (str): name of the item
value (Any): value to set for ``name``
nx (bool): if ``True`` the item must not exist prior to being set
ttl (int, optional): time to live for the entry in milliseconds
Returns:
None
Warning:
* ``nx`` is implement client-side for ``DiskCache``.
For this reason, it should not be relied upon for
critical tasks.
Raises:
KeyError: if ``nx`` is ``True`` and ``name`` already exists
"""
if nx and self.exists(name):
raise KeyError(f"Name '{name}' already exists")
self.conn.set(
self.full_name(name),
value=self.serializer.forward(value),
expire=ttl if ttl is None else ttl / 1000,
)
redis
Redis Backend
RedisBackend
Redis Backend.
Backend powered by Redis.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
conn |
str, Redis, LazyClient |
a connection url, |
required |
namespace |
str |
prefix to use when inserting names in the backend |
required |
serializer |
Serializer |
tool for encoding and decoding values written into the backend. |
required |
Warning
- If
conn
is aRedis()
object,decode_responses
is expected to be set toTrue
.
conn: Redis
property
readonly
Connection to the backend.
delete(self, name, missing_ok=False)
Delete a name
from the Redis backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
name of the item |
required |
missing_ok |
bool |
if |
False |
Returns:
Type | Description |
---|---|
None |
None |
Exceptions:
Type | Description |
---|---|
KeyError |
if |
Source code in alsek/storage/backends/redis.py
def delete(self, name: str, missing_ok: bool = False) -> None:
"""Delete a ``name`` from the Redis backend.
Args:
name (str): name of the item
missing_ok (bool): if ``True``, do not raise for missing
Returns:
None
Raises:
KeyError: if ``missing_ok`` is ``False`` and ``name`` is not found.
"""
found = self.conn.delete(self.full_name(name))
if not missing_ok and not found:
raise KeyError(f"No name '{name}' found")
exists(self, name)
Check if name
exists in the Redis backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
name of the item |
required |
Returns:
Type | Description |
---|---|
bool |
bool |
Source code in alsek/storage/backends/redis.py
def exists(self, name: str) -> bool:
"""Check if ``name`` exists in the Redis backend.
Args:
name (str): name of the item
Returns:
bool
"""
return bool(self.conn.exists(self.full_name(name)))
get(self, name)
Get name
from the Redis backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
name of the item |
required |
Returns:
Type | Description |
---|---|
Any |
Any |
Source code in alsek/storage/backends/redis.py
def get(self, name: str) -> Any:
"""Get ``name`` from the Redis backend.
Args:
name (str): name of the item
Returns:
Any
"""
encoded = self.conn.get(self.full_name(name))
return self.serializer.reverse(encoded)
scan(self, pattern=None)
Scan the backend for matching names.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pattern |
Optional[str] |
pattern to match against |
None |
Returns:
Type | Description |
---|---|
Iterable[str] |
names_stream (Iterable[str]): a stream of matching name |
Source code in alsek/storage/backends/redis.py
def scan(self, pattern: Optional[str] = None) -> Iterable[str]:
"""Scan the backend for matching names.
Args:
pattern (str): pattern to match against
Returns:
names_stream (Iterable[str]): a stream of matching name
"""
match = self.full_name(pattern or "*")
yield from map(self.short_name, self.conn.scan_iter(match))
set(self, name, value, nx=False, ttl=None)
Set name
to value
in the Redis backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
name of the item |
required |
value |
Any |
value to set for |
required |
nx |
bool |
if |
False |
ttl |
Optional[int] |
time to live for the entry in milliseconds |
None |
Returns:
Type | Description |
---|---|
None |
None |
Exceptions:
Type | Description |
---|---|
KeyError |
if |
Source code in alsek/storage/backends/redis.py
def set(
self,
name: str,
value: Any,
nx: bool = False,
ttl: Optional[int] = None,
) -> None:
"""Set ``name`` to ``value`` in the Redis backend.
Args:
name (str): name of the item
value (Any): value to set for ``name``
nx (bool): if ``True`` the item must not exist prior to being set
ttl (int, optional): time to live for the entry in milliseconds
Returns:
None
Raises:
KeyError: if ``nx`` is ``True`` and ``name`` already exists
"""
response = self.conn.set(
self.full_name(name),
value=self.serializer.forward(value),
px=ttl,
nx=nx,
keepttl=ttl is None, # type: ignore
)
if nx and response is None:
raise KeyError(f"Name '{name}' already exists")
result
Result Storage
ResultStore
Alsek Result Store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
backend |
Backend |
backend for data storage |
required |
Warning
- In order for a result to be stored, it must be
serializable by the
serializer
used bybackend
.
delete(self, message, descendants=True, missing_ok=False)
Delete any data for message
from the backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
Message |
an Alsek message. |
required |
descendants |
bool |
if |
True |
missing_ok |
bool |
if |
False |
Returns:
Type | Description |
---|---|
int |
count (int): number of results deleted |
Source code in alsek/storage/result.py
def delete(
self,
message: Message,
descendants: bool = True,
missing_ok: bool = False,
) -> int:
"""Delete any data for ``message`` from the backend.
Args:
message (Message): an Alsek message.
descendants (bool): if ``True`` also delete results for descendants.
missing_ok (bool): if ``True``, do not raise for missing
Returns:
count (int): number of results deleted
"""
count: int = 0
for name in self._get_all_storage_names(message, descendants=descendants):
self.backend.delete(name, missing_ok=missing_ok)
count += 1
return count
exists(self, message, descendants=False)
Whether or not data for message
exists in the store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
Message |
an Alsek message |
required |
descendants |
bool |
if |
False |
Returns:
Type | Description |
---|---|
bool |
bool |
Source code in alsek/storage/result.py
def exists(self, message: Message, descendants: bool = False) -> bool:
"""Whether or not data for ``message`` exists in the store.
Args:
message (Message): an Alsek message
descendants (bool): if ``True``, this method will return ``True``
iff all of the descendants of ``message`` also exist.
Returns:
bool
"""
names = self._get_all_storage_names(message, descendants=descendants)
return all(self.backend.exists(n) for n in names)
get(self, message, timeout=0, keep=False, with_metadata=False, descendants=False)
Get the result for message
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
Message |
an Alsek message. |
required |
timeout |
int |
amount of time (in milliseconds) to wait for the result to become available |
0 |
keep |
bool |
whether or not to keep the result afer fetching it.
Defaults to |
False |
with_metadata |
bool |
if |
False |
descendants |
bool |
if |
False |
Returns:
Type | Description |
---|---|
Union[Any, List[Any]] |
result (Any, List[Any]): the stored result. If |
Exceptions:
Type | Description |
---|---|
KeyError |
if results are not available for |
TimeoutError |
if results are not available for |
Notes
- The order of results when
descendants=True
is determined by the time at which the data was written to the backend. timeout
only applies tomessage
, even ifdescendants=True
.
Warning
- If a message has a projenitor, the
projenitor_uuid
field in themessage
must be set.
Examples:
>>> from alsek import Message
>>> from alsek.storage.backends.disk import DiskCacheBackend
>>> from alsek.storage.result import ResultStore
>>> backend = DiskCacheBackend()
>>> result_store = ResultStore(backend)
>>> result_store.get(Message(uuid="..."))
Source code in alsek/storage/result.py
def get(
self,
message: Message,
timeout: int = 0,
keep: bool = False,
with_metadata: bool = False,
descendants: bool = False,
) -> Union[Any, List[Any]]:
"""Get the result for ``message``.
Args:
message (Message): an Alsek message.
timeout (int): amount of time (in milliseconds) to wait
for the result to become available
keep (bool): whether or not to keep the result afer fetching it.
Defaults to ``False`` to conserve storage space.
with_metadata (bool): if ``True`` return results of the form
``{"result": <result>, "uuid": str, "timestamp": int}``, where
"result" is the result persisted to the backend, "uuid" if the uuid
of the message associated with the result and "timestamp" is the
time at which the result was written to the backend.
descendants (bool): if ``True`` also fetch results for descendants.
Returns:
result (Any, List[Any]): the stored result. If ``descendants``
is ``True`` a list of results will be returned.
Raises:
KeyError: if results are not available for ``message``
TimeoutError: if results are not available for ``message``
following ``timeout``.
Notes:
* The order of results when ``descendants=True`` is determined
by the time at which the data was written to the backend.
* ``timeout`` only applies to ``message``, even if ``descendants=True``.
Warning:
* If a message has a projenitor, the ``projenitor_uuid`` field in the
``message`` must be set.
Examples:
>>> from alsek import Message
>>> from alsek.storage.backends.disk import DiskCacheBackend
>>> from alsek.storage.result import ResultStore
>>> backend = DiskCacheBackend()
>>> result_store = ResultStore(backend)
>>> result_store.get(Message(uuid="..."))
"""
if not self.exists(message, descendants=descendants):
if timeout:
waiter(
lambda: self.exists(message, descendants=descendants),
timeout=timeout,
timeout_msg=f"Timeout waiting on result for {message.summary}",
sleep_interval=_GET_RESULT_WAIT_SLEEP_INTERVAL,
)
else:
raise KeyError(f"No results for {message.uuid}")
names = self._get_all_storage_names(message, descendants=descendants)
results = self._get_engine(names, with_metadata=with_metadata)
if not keep:
for n in names:
self.backend.delete(n)
return results if descendants else results[0]
get_storage_name(message)
staticmethod
Get the name for message
in the backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
Message |
an Alsek message |
required |
Returns:
Type | Description |
---|---|
str |
name (str): message-specific name |
Source code in alsek/storage/result.py
@staticmethod
def get_storage_name(message: Message) -> str:
"""Get the name for ``message`` in the backend.
Args:
message (Message): an Alsek message
Returns:
name (str): message-specific name
"""
if message.progenitor_uuid:
return f"results:{message.progenitor_uuid}:descendants:{message.uuid}"
else:
return f"results:{message.uuid}"
set(self, message, result, nx=True)
Store a result
for message
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message |
Message |
an Alsek message. |
required |
result |
Any |
the result to persist |
required |
nx |
bool |
if |
True |
Returns:
Type | Description |
---|---|
None |
None |
Source code in alsek/storage/result.py
def set(self, message: Message, result: Any, nx: bool = True) -> None:
"""Store a ``result`` for ``message``.
Args:
message (Message): an Alsek message.
result (Any): the result to persist
nx (bool): if ``True`` the item must not exist prior to being set
Returns:
None
"""
self.backend.set(
self.get_storage_name(message),
value={"result": result, "timestamp": utcnow_timestamp_ms()},
nx=nx,
ttl=message.result_ttl,
)
serialization
Serialization
JsonSerializer
JSON serialization.
forward(obj)
staticmethod
Encode an object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj |
Any |
an object to encode |
required |
Returns:
Type | Description |
---|---|
Any |
encoded (Any): JSON encoded object |
Source code in alsek/storage/serialization.py
@staticmethod
def forward(obj: Any) -> Any:
"""Encode an object.
Args:
obj (Any): an object to encode
Returns:
encoded (Any): JSON encoded object
"""
return json.dumps(obj)
reverse(obj)
staticmethod
Decode an object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj |
Any |
an object to decode |
required |
Returns:
Type | Description |
---|---|
Any |
decoded (Any): JSON decoded object |
Source code in alsek/storage/serialization.py
@staticmethod
def reverse(obj: Any) -> Any:
"""Decode an object.
Args:
obj (Any): an object to decode
Returns:
decoded (Any): JSON decoded object
"""
if obj is None:
return None
return json.loads(obj)
Serializer
Base Serializer Class.
forward(obj)
staticmethod
Encode an object for backend serialization.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj |
Any |
an object to encode |
required |
Returns:
Type | Description |
---|---|
Any |
encoded (Any): encoded object |
Source code in alsek/storage/serialization.py
@staticmethod
@abstractmethod
def forward(obj: Any) -> Any:
"""Encode an object for backend serialization.
Args:
obj (Any): an object to encode
Returns:
encoded (Any): encoded object
"""
raise NotImplementedError()
reverse(obj)
staticmethod
Decode an object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj |
Any |
an object to decode |
required |
Returns:
Type | Description |
---|---|
Any |
decoded (Any): decoded object |
Source code in alsek/storage/serialization.py
@staticmethod
@abstractmethod
def reverse(obj: Any) -> Any:
"""Decode an object.
Args:
obj (Any): an object to decode
Returns:
decoded (Any): decoded object
"""
raise NotImplementedError()
tools
special
Tools
iteration
Result Iteration
ResultPool
Tooling for iterating over task results.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
result_store |
ResultStore |
store where task results are persisted |
required |
Examples:
>>> from alsek.storage.result import ResultStore
>>> from alsek.tools.iteration import ResultPool
...
>>> result_store = ResultStore(...)
>>> pool = ResultPool(result_store)
...
>>> messages = [...]
>>> for uuid, result in pool.istream(*messages):
... pass
istream(self, *messages, *, wait=5000, descendants=False, **kwargs)
Stream the results of one or more messages. Results are yielded in the order in which they become available. (This may differ from the order in which messages are provided.)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*messages |
Message |
one or more messages to iterate over |
() |
wait |
int |
time to wait (in milliseconds) between checks for available results |
5000 |
descendants |
bool |
if |
False |
**kwargs |
Any |
keyword arguments to pass to
|
{} |
results (iterable): an iterable of results of the form
(Message, result)
.
Warning
- By default,
result_store
does not keep messages once they have been collected. As a result, providing messages for which the corresponding results have already been collected (and deleted) will cause this method to loop indefinitely. In order to loop over messages multiple times setkeep=True
.
Source code in alsek/tools/iteration.py
def istream(
self,
*messages: Message,
wait: int = 5 * 1000,
descendants: bool = False,
**kwargs: Any,
) -> Iterable[Tuple[Message, Any]]:
"""Stream the results of one or more messages. Results are yielded
in the order in which they become available. (This may differ from
the order in which messages are provided.)
Args:
*messages (Message): one or more messages to iterate over
wait (int): time to wait (in milliseconds) between checks for
available results
descendants (bool): if ``True``, wait for and return
the results of all descendant (callback) messages.
**kwargs (Keyword Args): keyword arguments to pass to
``result_store.get()``.
results (iterable): an iterable of results of the form
``(Message, result)``.
Warning:
* By default, ``result_store`` does not keep messages once
they have been collected. As a result, providing messages
for which the corresponding results have already been collected
(and deleted) will cause this method to loop indefinitely.
In order to loop over messages multiple times set ``keep=True``.
"""
yield from self._engine(
messages,
wait=wait,
break_on_error=False,
descendants=descendants,
**kwargs,
)
stream(self, *messages, *, wait=5000, descendants=False, **kwargs)
Stream the results of one or more messages. The order of the
results are guaranteed to match the order of messages
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*messages |
Message |
one or more messages to iterate over |
() |
wait |
int |
time to wait (in milliseconds) between checks for available results |
5000 |
descendants |
bool |
if |
False |
**kwargs |
Any |
keyword arguments to pass to
|
{} |
Returns:
Type | Description |
---|---|
Iterable[Tuple[alsek.core.message.Message, Any]] |
results (iterable): an iterable of results of the form
|
Warning
- By default,
result_store
does not keep messages once they have been collected. As a result, providing messages for which the corresponding results have already been collected (and deleted) will cause this method to loop indefinitely. In order to loop over messages multiple times setkeep=True
.
Source code in alsek/tools/iteration.py
def stream(
self,
*messages: Message,
wait: int = 5 * 1000,
descendants: bool = False,
**kwargs: Any,
) -> Iterable[Tuple[Message, Any]]:
"""Stream the results of one or more messages. The order of the
results are guaranteed to match the order of ``messages``.
Args:
*messages (Message): one or more messages to iterate over
wait (int): time to wait (in milliseconds) between checks for
available results
descendants (bool): if ``True``, wait for and return
the results of all descendant (callback) messages.
**kwargs (Keyword Args): keyword arguments to pass to
``result_store.get()``.
Returns:
results (iterable): an iterable of results of the form
``(Message, result)``.
Warning:
* By default, ``result_store`` does not keep messages once
they have been collected. As a result, providing messages
for which the corresponding results have already been collected
(and deleted) will cause this method to loop indefinitely.
In order to loop over messages multiple times set ``keep=True``.
"""
yield from self._engine(
messages,
wait=wait,
break_on_error=True,
descendants=descendants,
**kwargs,
)