Skip to content

Reference 📚

alsek

Alsek

cli

cli

Command Line Interface

main()

Alsek CLI.

Source code in alsek/cli/cli.py
27
28
29
30
31
@click.group()
@click.version_option(__version__)
def main() -> None:
    """Alsek CLI."""
    pass
process_pool(package, queues, task_specific_mode, n_processes, prune_interval, slot_wait_interval, consumer_backoff_factor, consumer_backoff_floor, consumer_backoff_ceiling, log_level)

Start a process-based worker pool.

Source code in alsek/cli/cli.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
@main.command()
@click.argument("package", type=str)
@click.option(
    "--queues",
    type=str,
    default=None,
    help="Comma-separated list of queues to consume from.",
)
@click.option(
    "--task_specific_mode",
    is_flag=True,
    help="Monitor tasks specifically, not just queues.",
)
@click.option(
    "--n_processes",
    type=int,
    default=None,
    help="Max number of processes.",
)
@click.option(
    "--prune_interval",
    type=int,
    default=100,
    help="Milliseconds between prune scans.",
)
@click.option(
    "--slot_wait_interval",
    type=int,
    default=100,
    help="Milliseconds to wait when full.",
)
@click.option(
    "--consumer_backoff_factor",
    type=int,
    default=1 * 1000,
    help="Backoff factor in response to passes over the backend "
    "which yield no messages (milliseconds)",
)
@click.option(
    "--consumer_backoff_floor",
    type=int,
    default=1_000,
    help="Minimum backoff in response to a pass over the backend"
    "which yields no message (milliseconds)",
)
@click.option(
    "--consumer_backoff_ceiling",
    type=int,
    default=30_000,
    help="Maximum backoff in response to a pass over the backend"
    "which yields no message (milliseconds)",
)
@click.option(
    "--log-level",
    type=click.Choice(LOG_LEVELS, case_sensitive=False),
    default="INFO",
    help="Logging level.",
)
def process_pool(
    package: str,
    queues: Optional[str],
    task_specific_mode: bool,
    n_processes: Optional[int],
    prune_interval: int,
    slot_wait_interval: int,
    consumer_backoff_factor: int,
    consumer_backoff_floor: int,
    consumer_backoff_ceiling: int,
    log_level: str,
) -> None:
    """Start a process-based worker pool."""
    _apply_logging_level(log_level)

    pool = ProcessWorkerPool(
        tasks=collect_tasks(package),
        queues=[q.strip() for q in queues.split(",")] if queues else None,
        task_specific_mode=task_specific_mode,
        n_processes=n_processes,
        prune_interval=prune_interval,
        slot_wait_interval=slot_wait_interval,
        backoff=LinearBackoff(
            factor=consumer_backoff_factor,
            floor=consumer_backoff_floor,
            ceiling=consumer_backoff_ceiling,
            zero_override=False,
        ),
    )
    pool.run()
thread_pool(package, queues, task_specific_mode, n_threads, n_processes, n_process_floor, slot_wait_interval, complete_only_on_thread_exit, consumer_backoff_factor, consumer_backoff_floor, consumer_backoff_ceiling, log_level)

Start a thread-based worker pool.

Source code in alsek/cli/cli.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
@main.command()
@click.argument("package", type=str)
@click.option(
    "--queues",
    type=str,
    default=None,
    help="Comma-separated list of queues to consume from.",
)
@click.option(
    "--task_specific_mode",
    is_flag=True,
    help="Monitor tasks specifically, not just queues.",
)
@click.option(
    "--n_threads",
    type=int,
    default=8,
    help="Threads per group.",
)
@click.option(
    "--n_processes",
    type=int,
    default=None,
    help="Max process groups.",
)
@click.option(
    "--n_process_floor",
    type=int,
    default=1,
    help="Minimum number of process groups to keep alive.",
)
@click.option(
    "--slot_wait_interval",
    type=int,
    default=50,
    help="Milliseconds to wait when full.",
)
@click.option(
    "--complete_only_on_thread_exit",
    is_flag=True,
    help="Wait for thread exit to mark as complete.",
)
@click.option(
    "--consumer_backoff_factor",
    type=int,
    default=1 * 1000,
    help="Backoff factor in response to passes over the backend "
    "which yield no messages (milliseconds)",
)
@click.option(
    "--consumer_backoff_floor",
    type=int,
    default=1_000,
    help="Minimum backoff in response to a pass over the backend"
    "which yields no message (milliseconds)",
)
@click.option(
    "--consumer_backoff_ceiling",
    type=int,
    default=30_000,
    help="Maximum backoff in response to a pass over the backend"
    "which yields no message (milliseconds)",
)
@click.option(
    "--log-level",
    type=click.Choice(LOG_LEVELS, case_sensitive=False),
    default="INFO",
    help="Logging level.",
)
def thread_pool(
    package: str,
    queues: Optional[str],
    task_specific_mode: bool,
    n_threads: int,
    n_processes: Optional[int],
    n_process_floor: int,
    slot_wait_interval: int,
    complete_only_on_thread_exit: bool,
    consumer_backoff_factor: int,
    consumer_backoff_floor: int,
    consumer_backoff_ceiling: int,
    log_level: str,
) -> None:
    """Start a thread-based worker pool."""
    _apply_logging_level(log_level)

    pool = ThreadWorkerPool(
        tasks=collect_tasks(package),
        queues=[q.strip() for q in queues.split(",")] if queues else None,
        task_specific_mode=task_specific_mode,
        n_threads=n_threads,
        n_processes=n_processes,
        n_process_floor=n_process_floor,
        slot_wait_interval=slot_wait_interval,
        complete_only_on_thread_exit=complete_only_on_thread_exit,
        backoff=LinearBackoff(
            factor=consumer_backoff_factor,
            floor=consumer_backoff_floor,
            ceiling=consumer_backoff_ceiling,
            zero_override=False,
        ),
    )
    pool.run()

helpers

Helpers

package2path(package)

Convert a Python package name into its corresponding filesystem path.

Source code in alsek/cli/helpers.py
11
12
13
14
15
16
17
18
def package2path(package: str) -> Path:
    """Convert a Python package name into its corresponding filesystem path."""
    spec = find_spec(package)
    if spec is None or spec.origin is None:
        raise ModuleNotFoundError(f"Package '{package}' not found.")

    path = Path(spec.origin)
    return path.parent if path.name == "__init__.py" else path

core

Core

backoff

Backoff Algorithms

Backoff

Bases: ABC

Base backoff class.

Parameters:

Name Type Description Default
floor int

minimum backoff in milliseconds

60 * 1000
ceiling int

maximum backoff in milliseconds

60 * 60 * 1000
zero_override bool

override backoff to zero if the number of incidents is zero.

True
Source code in alsek/core/backoff.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
class Backoff(ABC):
    """Base backoff class.

    Args:
        floor (int, optional): minimum backoff in milliseconds
        ceiling (int, optional): maximum backoff in milliseconds
        zero_override (bool): override backoff to zero if the number
            of ``incidents`` is zero.

    """

    def __init__(
        self,
        floor: Optional[int] = 60 * 1000,
        ceiling: Optional[int] = 60 * 60 * 1000,
        zero_override: bool = True,
    ) -> None:
        self.floor = floor
        self.ceiling = ceiling
        self.zero_override = zero_override

        if floor is not None and ceiling is not None:
            if floor > ceiling:
                raise ValueError(f"floor ({floor} greater than ceiling ({ceiling})")
            if ceiling < floor:
                raise ValueError(f"ceiling ({ceiling}) less than floor ({floor})")

    @property
    @abstractmethod
    def parameters(self) -> dict[str, Optional[int]]:
        """Parameters of the current instance which uniquely
        characterize it.

        Returns:
            params (dict): backoff parameters

        """
        raise NotImplementedError()

    def __repr__(self) -> str:
        return auto_repr(self, **self.parameters)

    @property
    def settings(self) -> BackoffSettingsType:
        """Settings the current algorithm.

        Returns:
            serialization (BackoffSettingsType): summary
                of the current algorithm and parameters with
                sufficient information to reconstruct it.

        """
        return dict(algorithm=self.__class__.__name__, parameters=self.parameters)

    @abstractmethod
    def formula(self, incidents: int) -> int:
        """Implementation of the formula for computing the backoff.

        Args:
            incidents (int): current number of incidents

        Returns:
            int

        """
        raise NotImplementedError()

    def _clipper(self, amount: int) -> int:
        if self.floor is not None and amount < self.floor:
            return self.floor
        elif self.ceiling is not None and amount > self.ceiling:
            return self.ceiling
        else:
            return amount

    def get(self, incidents: int) -> int:
        """Get the backoff.

        Args:
            incidents (int): current number of incidents

        Returns:
            backoff (int): backoff in milliseconds

        """
        if self.zero_override and incidents == 0:
            return 0
        return self._clipper(self.formula(incidents))
parameters abstractmethod property

Parameters of the current instance which uniquely characterize it.

Returns:

Name Type Description
params dict

backoff parameters

settings property

Settings the current algorithm.

Returns:

Name Type Description
serialization BackoffSettingsType

summary of the current algorithm and parameters with sufficient information to reconstruct it.

formula(incidents) abstractmethod

Implementation of the formula for computing the backoff.

Parameters:

Name Type Description Default
incidents int

current number of incidents

required

Returns:

Type Description
int

int

Source code in alsek/core/backoff.py
70
71
72
73
74
75
76
77
78
79
80
81
@abstractmethod
def formula(self, incidents: int) -> int:
    """Implementation of the formula for computing the backoff.

    Args:
        incidents (int): current number of incidents

    Returns:
        int

    """
    raise NotImplementedError()
get(incidents)

Get the backoff.

Parameters:

Name Type Description Default
incidents int

current number of incidents

required

Returns:

Name Type Description
backoff int

backoff in milliseconds

Source code in alsek/core/backoff.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def get(self, incidents: int) -> int:
    """Get the backoff.

    Args:
        incidents (int): current number of incidents

    Returns:
        backoff (int): backoff in milliseconds

    """
    if self.zero_override and incidents == 0:
        return 0
    return self._clipper(self.formula(incidents))
ConstantBackoff

Bases: Backoff

Constant backoff.

Parameters:

Name Type Description Default
constant int

amount of time (in milliseconds) to backoff.

60 * 1000
**kwargs Keyword Args

keyword arguments to pass to Backoff

{}
Source code in alsek/core/backoff.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class ConstantBackoff(Backoff):
    """Constant backoff.

    Args:
        constant (int): amount of time (in milliseconds) to backoff.
        **kwargs (Keyword Args): keyword arguments to pass to
            ``Backoff``

    """

    def __init__(self, constant: int = 60 * 1000, **kwargs: Any) -> None:
        super().__init__(**kwargs)
        self.constant = constant

    @property
    def parameters(self) -> dict[str, Optional[int]]:
        """Parameters of the current ``ConstantBackoff``
        instance which uniquely characterize it.

        Returns:
            params (dict): backoff parameters

        """
        return dict(
            constant=self.constant,
            floor=self.floor,
            ceiling=self.ceiling,
            zero_override=self.zero_override,
        )

    def formula(self, incidents: int) -> int:
        """Constant backoff formula.

        Implements:

        $$c$$

        where $c$ is `constant`.

        Args:
            incidents (int): current number of incidents

        Returns:
            backoff (int): backoff in milliseconds

        """
        return self.constant
parameters property

Parameters of the current ConstantBackoff instance which uniquely characterize it.

Returns:

Name Type Description
params dict

backoff parameters

formula(incidents)

Constant backoff formula.

Implements:

\[c\]

where \(c\) is constant.

Parameters:

Name Type Description Default
incidents int

current number of incidents

required

Returns:

Name Type Description
backoff int

backoff in milliseconds

Source code in alsek/core/backoff.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def formula(self, incidents: int) -> int:
    """Constant backoff formula.

    Implements:

    $$c$$

    where $c$ is `constant`.

    Args:
        incidents (int): current number of incidents

    Returns:
        backoff (int): backoff in milliseconds

    """
    return self.constant
ExponentialBackoff

Bases: Backoff

Exponential backoff.

Parameters:

Name Type Description Default
base int

the base of the exponential (milliseconds)

4
factor int

factor to multiply the result by

10000
**kwargs Keyword Args

keyword arguments to pass to Backoff

{}
Source code in alsek/core/backoff.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
class ExponentialBackoff(Backoff):
    """Exponential backoff.

    Args:
        base (int): the base of the exponential (milliseconds)
        factor (int): factor to multiply the result by
        **kwargs (Keyword Args): keyword arguments to pass to
            ``Backoff``

    """

    def __init__(self, base: int = 4, factor: int = 10_000, **kwargs: Any) -> None:
        super().__init__(**kwargs)
        self.base = base
        self.factor = factor

    @property
    def parameters(self) -> dict[str, Optional[int]]:
        """Parameters of the current ``ExponentialBackoff``
        instance which uniquely characterize it.

        Returns:
            params (dict): backoff parameters

        """
        return dict(
            base=self.base,
            factor=self.factor,
            floor=self.floor,
            ceiling=self.ceiling,
            zero_override=self.zero_override,
        )

    def formula(self, incidents: int) -> int:
        """Exponential backoff formula.

        Implements:

        $$f * (b^{i})$$

        where $f$ is `factor`, $b$ is `base` and $i$ is the number of `incidents`.

        Args:
            incidents (int): current number of incidents

        Returns:
            backoff (int): backoff in milliseconds

        """
        return int(self.factor * (self.base**incidents))
parameters property

Parameters of the current ExponentialBackoff instance which uniquely characterize it.

Returns:

Name Type Description
params dict

backoff parameters

formula(incidents)

Exponential backoff formula.

Implements:

\[f * (b^{i})\]

where \(f\) is factor, \(b\) is base and \(i\) is the number of incidents.

Parameters:

Name Type Description Default
incidents int

current number of incidents

required

Returns:

Name Type Description
backoff int

backoff in milliseconds

Source code in alsek/core/backoff.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def formula(self, incidents: int) -> int:
    """Exponential backoff formula.

    Implements:

    $$f * (b^{i})$$

    where $f$ is `factor`, $b$ is `base` and $i$ is the number of `incidents`.

    Args:
        incidents (int): current number of incidents

    Returns:
        backoff (int): backoff in milliseconds

    """
    return int(self.factor * (self.base**incidents))
LinearBackoff

Bases: Backoff

Linear backoff.

Parameters:

Name Type Description Default
factor int

amount of time (in milliseconds) to add to backoff after each retry.

30 * 1000
**kwargs Keyword Args

keyword arguments to pass to Backoff

{}
Source code in alsek/core/backoff.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
class LinearBackoff(Backoff):
    """Linear backoff.

    Args:
        factor (int): amount of time (in milliseconds) to add to backoff
            after each retry.
        **kwargs (Keyword Args): keyword arguments to pass to
            ``Backoff``

    """

    def __init__(self, factor: int = 30 * 1000, **kwargs: Any) -> None:
        super().__init__(**kwargs)
        self.factor = factor

    @property
    def parameters(self) -> dict[str, Optional[int]]:
        """Parameters of the current ``LinearBackoff``
        instance which uniquely characterize it.

        Returns:
            params (dict): backoff parameters

        """
        return dict(
            factor=self.factor,
            floor=self.floor,
            ceiling=self.ceiling,
            zero_override=self.zero_override,
        )

    def formula(self, incidents: int) -> int:
        """Linear backoff formula.

        Implements:

        $$f * i$$

        where $f$ is `factor` and $i$ is the number of `incidents`.

        Args:
            incidents (int): current number of incidents

        Returns:
            backoff (int): backoff in milliseconds

        """
        return int(self.factor * incidents)
parameters property

Parameters of the current LinearBackoff instance which uniquely characterize it.

Returns:

Name Type Description
params dict

backoff parameters

formula(incidents)

Linear backoff formula.

Implements:

\[f * i\]

where \(f\) is factor and \(i\) is the number of incidents.

Parameters:

Name Type Description Default
incidents int

current number of incidents

required

Returns:

Name Type Description
backoff int

backoff in milliseconds

Source code in alsek/core/backoff.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def formula(self, incidents: int) -> int:
    """Linear backoff formula.

    Implements:

    $$f * i$$

    where $f$ is `factor` and $i$ is the number of `incidents`.

    Args:
        incidents (int): current number of incidents

    Returns:
        backoff (int): backoff in milliseconds

    """
    return int(self.factor * incidents)
settings2backoff(settings)

Convert backoff settings to a Backoff instance.

Parameters:

Name Type Description Default
settings BackoffSettingsType

backoff settings of the form {"algorithm": str, "parameters": dict}.

required

Returns:

Name Type Description
backoff Backoff

a backoff instance

Source code in alsek/core/backoff.py
266
267
268
269
270
271
272
273
274
275
276
277
278
def settings2backoff(settings: BackoffSettingsType) -> Backoff:
    """Convert backoff settings to a ``Backoff`` instance.

    Args:
        settings (BackoffSettingsType): backoff settings of
            the form ``{"algorithm": str, "parameters": dict}``.

    Returns:
        backoff (Backoff): a backoff instance

    """
    algorithm = _get_algorithm(settings["algorithm"])
    return algorithm(**settings["parameters"])

broker

Broker

Broker

Alsek Broker.

Parameters:

Name Type Description Default
backend Backend

backend for data storage

required
dlq_ttl int

time to live (in milliseconds) for Dead Letter Queue (DLQ). If None, failed messages will not be moved to the DLQ.

DEFAULT_TTL
Source code in alsek/core/broker.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
class Broker:
    """Alsek Broker.

    Args:
        backend (Backend): backend for data storage
        dlq_ttl (int, optional): time to live (in milliseconds) for
            Dead Letter Queue (DLQ). If ``None``, failed messages
            will not be moved to the DLQ.

    """

    def __init__(self, backend: Backend, dlq_ttl: Optional[int] = DEFAULT_TTL) -> None:
        self.backend = backend
        self.dlq_ttl = dlq_ttl

        if self.backend.IS_ASYNC:
            raise AttributeError("Asynchronous backends are not yet supported")

    def __repr__(self) -> str:
        return auto_repr(
            self,
            backend=self.backend,
            dlq_ttl=self.dlq_ttl,
        )

    def exists(self, message: Message) -> bool:
        """Determine if the message exists in the backend.

        Args:
            message (Message): an Alsek message

        Returns:
            exists (bool): whether the message exists.

        """
        name = get_message_name(message)
        return self.backend.exists(name)

    @magic_logger(
        before=lambda message: log.debug("Submitting %s...", message.summary),
        after=lambda input_: log.debug("Submitted %s.", input_["message"].summary),
    )
    def submit(self, message: Message, ttl: int = DEFAULT_TTL) -> None:
        """Submit a message for processing.

        Args:
            message (Message): an Alsek message
            ttl (int): time to live for the submitted message in milliseconds

        Returns:
            None

        Raises:
            MessageAlreadyExistsError: if the message already exists

        """
        name = get_message_name(message)
        try:
            self.backend.set(name, value=message.data, nx=True, ttl=ttl)
        except KeyError:
            raise MessageAlreadyExistsError(f"'{name}' found in backend")

        self.backend.priority_add(
            get_priority_namespace_from_message(message),
            unique_id=name,
            priority=message.priority,
        )

    @magic_logger(
        before=lambda message: log.debug("Retrying %s...", message.summary),
    )
    def retry(self, message: Message) -> None:
        """Retry a message.

        Args:
            message (Message): an Alsek message

        Returns:
            None

        Warning:
            * This method will mutate ``message`` by incrementing it.

        """
        if not self.exists(message):
            raise MessageDoesNotExistsError(
                f"Message '{message.uuid}' not found in backend"
            )

        # We release the lock before setting the messate data
        # so that the `linked_lock` field on the message ie None.
        message.release_lock(
            not_linked_ok=True,
            target_backend=self.backend,
        )
        message.increment_retries()
        self.backend.set(get_message_name(message), value=message.data)
        log.info(
            "Retrying %s in %s ms...",
            message.summary,
            format(message.get_backoff_duration(), ","),
        )

    @magic_logger(
        before=lambda message: log.info("Removing %s...", message.summary),
        after=lambda input_: log.info("Removed %s.", input_["message"].summary),
    )
    def remove(self, message: Message) -> None:
        """Remove a message from the backend.

        Args:
            message (Message): an Alsek message

        Returns:
            None

        """
        self.backend.priority_remove(
            key=get_priority_namespace_from_message(message),
            unique_id=get_message_name(message),
        )
        self.backend.delete(get_message_name(message), missing_ok=True)
        message.release_lock(
            not_linked_ok=True,
            target_backend=self.backend,
        )

    @magic_logger(
        before=lambda message: log.debug("Acking %s...", message.summary),
        after=lambda input_: log.debug("Acked %s.", input_["message"].summary),
    )
    def ack(self, message: Message) -> None:
        """Acknowledge a message by removing it from the data backend.

        Args:
            message (Message): a message to acknowledge

        Returns:
            None

        Warning:
            * Messages will not be redelivered once acked.

        """
        self.remove(message)

    @magic_logger(
        before=lambda message: log.info("Failing %s...", message.summary),
        after=lambda input_: log.info("Failed %s.", input_["message"].summary),
    )
    def fail(self, message: Message) -> None:
        """Acknowledge and fail a message by removing it from the backend.
        If ``dlq_ttl`` is not null, the messages will be persisted to
        the dead letter queue for the prescribed amount of time.

        Args:
            message (Message): an Alsek message

        Returns:
            None

        """
        self.ack(message)
        if self.dlq_ttl:
            self.backend.set(
                get_dlq_message_name(message),
                value=message.data,
                ttl=self.dlq_ttl,
            )
            log.debug("Added %s to DLQ.", message.summary)

    @magic_logger(
        before=lambda message: log.info("Failing %s...", message.summary),
        after=lambda input_: log.info("Failed %s.", input_["message"].summary),
    )
    def in_dlq(self, message: Message) -> bool:
        """Determine if a message is in the dead letter queue.

        Args:
            message (Message): an Alsek message

        Returns:
            bool: whether the message is in the DLQ.

        """
        return self.backend.exists(get_dlq_message_name(message))

    @magic_logger(
        before=lambda message: log.info("Syncing from backend %s...", message.summary),
        after=lambda input_: log.info(
            "Synced from backend %s.",
            input_["message"].summary,
        ),
    )
    def sync_from_backend(self, message: Message) -> Message:
        """Synchronize a message's internal data with that in the backend.

        Args:
            message (Message): an Alsek message

        Returns:
            updated_message (Message): the updated message data

        """
        try:
            data = self.backend.get(get_message_name(message), default=Empty)
        except KeyError:
            data = self.backend.get(get_dlq_message_name(message), default=Empty)
        return Message(**data)

    @magic_logger(
        before=lambda message: log.info("Syncing %s to backend...", message.summary),
        after=lambda input_: log.info(
            "Synced %s to backend.", input_["message"].summary
        ),
    )
    def sync_to_backend(self, message: Message) -> None:
        """Synchronize the data persisted in the backend with the current state of
        ``message`` held in memory.

        This method is the logical inverse of ``sync_from_backend``; any changes
        made to the ``message`` instance are written back to the backend so that
        future reads reflect the most up-to-date information.

        Args:
            message (Message): an Alsek message whose current state should be
                persisted.

        Returns:
            None

        Warning:
            * This method will mutate ``message`` by updating it
              regardless of whether a lock is linked to it.
              You are responsible for ensuring that any mutation
              of the message's underlying data is only performed
              by the lock owner.

        """
        # Determine which key (regular queue or DLQ) should be updated
        if self.exists(message):
            key = get_message_name(message)
        elif self.in_dlq(message):
            key = get_dlq_message_name(message)
        else:
            raise MessageDoesNotExistsError(
                f"Message '{message.uuid}' not found in backend"
            )

        # Persist the updated message data. We intentionally omit a TTL value
        # to preserve the existing expiry associated with ``key`` (if any).
        self.backend.set(key, value=message.data)
ack(message)

Acknowledge a message by removing it from the data backend.

Parameters:

Name Type Description Default
message Message

a message to acknowledge

required

Returns:

Type Description
None

None

Warning
  • Messages will not be redelivered once acked.
Source code in alsek/core/broker.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
@magic_logger(
    before=lambda message: log.debug("Acking %s...", message.summary),
    after=lambda input_: log.debug("Acked %s.", input_["message"].summary),
)
def ack(self, message: Message) -> None:
    """Acknowledge a message by removing it from the data backend.

    Args:
        message (Message): a message to acknowledge

    Returns:
        None

    Warning:
        * Messages will not be redelivered once acked.

    """
    self.remove(message)
exists(message)

Determine if the message exists in the backend.

Parameters:

Name Type Description Default
message Message

an Alsek message

required

Returns:

Name Type Description
exists bool

whether the message exists.

Source code in alsek/core/broker.py
52
53
54
55
56
57
58
59
60
61
62
63
def exists(self, message: Message) -> bool:
    """Determine if the message exists in the backend.

    Args:
        message (Message): an Alsek message

    Returns:
        exists (bool): whether the message exists.

    """
    name = get_message_name(message)
    return self.backend.exists(name)
fail(message)

Acknowledge and fail a message by removing it from the backend. If dlq_ttl is not null, the messages will be persisted to the dead letter queue for the prescribed amount of time.

Parameters:

Name Type Description Default
message Message

an Alsek message

required

Returns:

Type Description
None

None

Source code in alsek/core/broker.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
@magic_logger(
    before=lambda message: log.info("Failing %s...", message.summary),
    after=lambda input_: log.info("Failed %s.", input_["message"].summary),
)
def fail(self, message: Message) -> None:
    """Acknowledge and fail a message by removing it from the backend.
    If ``dlq_ttl`` is not null, the messages will be persisted to
    the dead letter queue for the prescribed amount of time.

    Args:
        message (Message): an Alsek message

    Returns:
        None

    """
    self.ack(message)
    if self.dlq_ttl:
        self.backend.set(
            get_dlq_message_name(message),
            value=message.data,
            ttl=self.dlq_ttl,
        )
        log.debug("Added %s to DLQ.", message.summary)
in_dlq(message)

Determine if a message is in the dead letter queue.

Parameters:

Name Type Description Default
message Message

an Alsek message

required

Returns:

Name Type Description
bool bool

whether the message is in the DLQ.

Source code in alsek/core/broker.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
@magic_logger(
    before=lambda message: log.info("Failing %s...", message.summary),
    after=lambda input_: log.info("Failed %s.", input_["message"].summary),
)
def in_dlq(self, message: Message) -> bool:
    """Determine if a message is in the dead letter queue.

    Args:
        message (Message): an Alsek message

    Returns:
        bool: whether the message is in the DLQ.

    """
    return self.backend.exists(get_dlq_message_name(message))
remove(message)

Remove a message from the backend.

Parameters:

Name Type Description Default
message Message

an Alsek message

required

Returns:

Type Description
None

None

Source code in alsek/core/broker.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
@magic_logger(
    before=lambda message: log.info("Removing %s...", message.summary),
    after=lambda input_: log.info("Removed %s.", input_["message"].summary),
)
def remove(self, message: Message) -> None:
    """Remove a message from the backend.

    Args:
        message (Message): an Alsek message

    Returns:
        None

    """
    self.backend.priority_remove(
        key=get_priority_namespace_from_message(message),
        unique_id=get_message_name(message),
    )
    self.backend.delete(get_message_name(message), missing_ok=True)
    message.release_lock(
        not_linked_ok=True,
        target_backend=self.backend,
    )
retry(message)

Retry a message.

Parameters:

Name Type Description Default
message Message

an Alsek message

required

Returns:

Type Description
None

None

Warning
  • This method will mutate message by incrementing it.
Source code in alsek/core/broker.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
@magic_logger(
    before=lambda message: log.debug("Retrying %s...", message.summary),
)
def retry(self, message: Message) -> None:
    """Retry a message.

    Args:
        message (Message): an Alsek message

    Returns:
        None

    Warning:
        * This method will mutate ``message`` by incrementing it.

    """
    if not self.exists(message):
        raise MessageDoesNotExistsError(
            f"Message '{message.uuid}' not found in backend"
        )

    # We release the lock before setting the messate data
    # so that the `linked_lock` field on the message ie None.
    message.release_lock(
        not_linked_ok=True,
        target_backend=self.backend,
    )
    message.increment_retries()
    self.backend.set(get_message_name(message), value=message.data)
    log.info(
        "Retrying %s in %s ms...",
        message.summary,
        format(message.get_backoff_duration(), ","),
    )
submit(message, ttl=DEFAULT_TTL)

Submit a message for processing.

Parameters:

Name Type Description Default
message Message

an Alsek message

required
ttl int

time to live for the submitted message in milliseconds

DEFAULT_TTL

Returns:

Type Description
None

None

Raises:

Type Description
MessageAlreadyExistsError

if the message already exists

Source code in alsek/core/broker.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@magic_logger(
    before=lambda message: log.debug("Submitting %s...", message.summary),
    after=lambda input_: log.debug("Submitted %s.", input_["message"].summary),
)
def submit(self, message: Message, ttl: int = DEFAULT_TTL) -> None:
    """Submit a message for processing.

    Args:
        message (Message): an Alsek message
        ttl (int): time to live for the submitted message in milliseconds

    Returns:
        None

    Raises:
        MessageAlreadyExistsError: if the message already exists

    """
    name = get_message_name(message)
    try:
        self.backend.set(name, value=message.data, nx=True, ttl=ttl)
    except KeyError:
        raise MessageAlreadyExistsError(f"'{name}' found in backend")

    self.backend.priority_add(
        get_priority_namespace_from_message(message),
        unique_id=name,
        priority=message.priority,
    )
sync_from_backend(message)

Synchronize a message's internal data with that in the backend.

Parameters:

Name Type Description Default
message Message

an Alsek message

required

Returns:

Name Type Description
updated_message Message

the updated message data

Source code in alsek/core/broker.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
@magic_logger(
    before=lambda message: log.info("Syncing from backend %s...", message.summary),
    after=lambda input_: log.info(
        "Synced from backend %s.",
        input_["message"].summary,
    ),
)
def sync_from_backend(self, message: Message) -> Message:
    """Synchronize a message's internal data with that in the backend.

    Args:
        message (Message): an Alsek message

    Returns:
        updated_message (Message): the updated message data

    """
    try:
        data = self.backend.get(get_message_name(message), default=Empty)
    except KeyError:
        data = self.backend.get(get_dlq_message_name(message), default=Empty)
    return Message(**data)
sync_to_backend(message)

Synchronize the data persisted in the backend with the current state of message held in memory.

This method is the logical inverse of sync_from_backend; any changes made to the message instance are written back to the backend so that future reads reflect the most up-to-date information.

Parameters:

Name Type Description Default
message Message

an Alsek message whose current state should be persisted.

required

Returns:

Type Description
None

None

Warning
  • This method will mutate message by updating it regardless of whether a lock is linked to it. You are responsible for ensuring that any mutation of the message's underlying data is only performed by the lock owner.
Source code in alsek/core/broker.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
@magic_logger(
    before=lambda message: log.info("Syncing %s to backend...", message.summary),
    after=lambda input_: log.info(
        "Synced %s to backend.", input_["message"].summary
    ),
)
def sync_to_backend(self, message: Message) -> None:
    """Synchronize the data persisted in the backend with the current state of
    ``message`` held in memory.

    This method is the logical inverse of ``sync_from_backend``; any changes
    made to the ``message`` instance are written back to the backend so that
    future reads reflect the most up-to-date information.

    Args:
        message (Message): an Alsek message whose current state should be
            persisted.

    Returns:
        None

    Warning:
        * This method will mutate ``message`` by updating it
          regardless of whether a lock is linked to it.
          You are responsible for ensuring that any mutation
          of the message's underlying data is only performed
          by the lock owner.

    """
    # Determine which key (regular queue or DLQ) should be updated
    if self.exists(message):
        key = get_message_name(message)
    elif self.in_dlq(message):
        key = get_dlq_message_name(message)
    else:
        raise MessageDoesNotExistsError(
            f"Message '{message.uuid}' not found in backend"
        )

    # Persist the updated message data. We intentionally omit a TTL value
    # to preserve the existing expiry associated with ``key`` (if any).
    self.backend.set(key, value=message.data)

concurrency

Concurrency

Lock

Distributed mutual exclusion (MUTEX) lock for controlling concurrency accross machines.

Parameters:

Name Type Description Default
name str

name of the lock

required
backend Backend

backend for data storage

required
ttl int

time to live in milliseconds. If None, the lock will not expire automatically.

60 * 60 * 1000
auto_release bool

if True automatically release the lock on context exit.

True
owner_id str

unique identifier for the lock. Do not change this value unless you know what you are doing.

CURRENT_HOST_OWNER_ID
Warning
  • Locks are global and do not consider queues, unless included in name.
  • When used as a context manager, the lock is not automatically acquired. Lock acquisition requires calling acquire().

Examples:

>>> from alsek import Lock
>>> from alsek.storage.backends.redis.standard import RedisBackend
...
>>> backend = RedisBackend()
...
>>> with Lock("mutex", backend=backend) as lock:
>>>     if lock.acquire():
>>>         print("Acquired lock.")
>>>     else:
>>>         print("Did not acquire lock.")
Source code in alsek/core/concurrency.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
class Lock:
    """Distributed mutual exclusion (MUTEX) lock for controlling
    concurrency accross machines.

    Args:
        name (str): name of the lock
        backend (Backend): backend for data storage
        ttl (int, optional): time to live in milliseconds.
            If ``None``, the lock will not expire automatically.
        auto_release (bool): if ``True`` automatically release
            the lock on context exit.
        owner_id (str): unique identifier for the lock.
            Do not change this value unless you know what you are doing.

    Warning:
        * Locks are global and do not consider queues, unless
          included in ``name``.
        * When used as a context manager, the lock is *not* automatically
          acquired. Lock acquisition requires calling ``acquire()``.

    Examples:
        >>> from alsek import Lock
        >>> from alsek.storage.backends.redis.standard import RedisBackend
        ...
        >>> backend = RedisBackend()
        ...
        >>> with Lock("mutex", backend=backend) as lock:
        >>>     if lock.acquire():
        >>>         print("Acquired lock.")
        >>>     else:
        >>>         print("Did not acquire lock.")

    """

    def __init__(
        self,
        name: str,
        backend: Backend,
        ttl: Optional[int] = 60 * 60 * 1000,
        auto_release: bool = True,
        owner_id: str = CURRENT_HOST_OWNER_ID,
    ) -> None:
        self.name = name
        self.backend = backend
        self.ttl = ttl
        self.auto_release = auto_release
        self._owner_id = owner_id

        if not isinstance(backend, RedisBackend):
            raise NotImplementedError("Only RedisBackend is supported.")

        self.validate()

        self._lock = redis_lock.Lock(
            backend.conn,
            name=self.full_name,
            expire=None if ttl is None else round(ttl / 1000),
            id=self.owner_id,
        )

    def validate(self) -> None:
        if not self.name:
            raise ValueError("`name` must be provided.")
        elif not self.owner_id:
            raise ValueError("`owner_id` must be provided.")

    @property
    def owner_id(self) -> str:
        return self._owner_id

    def __repr__(self) -> str:
        return auto_repr(
            self,
            name=self.name,
            backend=self.backend,
            ttl=self.ttl,
            auto_release=self.auto_release,
        )

    @property
    def full_name(self) -> str:
        """The full name of the lock including its namespace prefix."""
        return f"{self.backend.namespace}:{self.name}"

    @property
    def holder(self) -> Optional[str]:
        """Name of the owner that currently holds the lock, if any."""
        return self._lock.get_owner_id()

    @property
    def held(self) -> bool:
        """If the lock is held by the current owner."""
        return self.holder == self.owner_id

    def acquire(
        self,
        wait: Optional[int] = None,
        if_already_acquired: IF_ALREADY_ACQUIRED_TYPE = "raise_error",
    ) -> bool:
        """Try to acquire the lock.

        Args:
            wait (int, optional): the amount of time wait to acquire
                the lock (in seconds). If ``None`` do not block.
            if_already_acquired (str): if ``True`` do not raise if the lock
                is already held by the current owner.

        Returns:
            acquired (bool): ``True`` if the message is
                acquired or already acquired by the current owner.

        """
        if if_already_acquired not in get_args(IF_ALREADY_ACQUIRED_TYPE):
            raise ValueError(f"Invalid `on_already_acquired`, got  {if_already_acquired}")  # fmt: skip

        try:
            return self._lock.acquire(blocking=bool(wait), timeout=wait)
        except redis_lock.AlreadyAcquired as error:
            if if_already_acquired == "return_true":
                return True
            elif if_already_acquired == "return_false":
                return False
            else:
                raise error

    def release(self, raise_if_not_acquired: bool = False) -> bool:
        """Release the lock.

        Args:
            raise_if_not_acquired (bool): raise if the lock was not
                acquired for release.

        Returns:
            released (bool): whether the lock was
                found and released.

        """
        try:
            self._lock.release()
            return True
        except redis_lock.NotAcquired as error:
            if raise_if_not_acquired:
                raise error
            else:
                return False

    def __enter__(self) -> Lock:
        """Enter the context and try to acquire the lock.

        Returns:
            lock (Lock): underlying lock object.

        """
        return self

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_val: Optional[BaseException],
        exc_tb: Optional[TracebackType],
    ) -> None:
        """Exit the context. If ``auto_release`` is enabled,
         the lock will be released.

        Args:
            exc_val (BaseException, optional): an exception from within the context
            exc_val (BaseException, optional): value of any exception from
                within the context
            exc_tb (TracebackType, optional): the traceback from the context

        Returns:
            None

        """
        if self.auto_release:
            self.release()
full_name property

The full name of the lock including its namespace prefix.

held property

If the lock is held by the current owner.

holder property

Name of the owner that currently holds the lock, if any.

__enter__()

Enter the context and try to acquire the lock.

Returns:

Name Type Description
lock Lock

underlying lock object.

Source code in alsek/core/concurrency.py
181
182
183
184
185
186
187
188
def __enter__(self) -> Lock:
    """Enter the context and try to acquire the lock.

    Returns:
        lock (Lock): underlying lock object.

    """
    return self
__exit__(exc_type, exc_val, exc_tb)

Exit the context. If auto_release is enabled, the lock will be released.

Parameters:

Name Type Description Default
exc_val BaseException

an exception from within the context

required
exc_val BaseException

value of any exception from within the context

required
exc_tb TracebackType

the traceback from the context

required

Returns:

Type Description
None

None

Source code in alsek/core/concurrency.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def __exit__(
    self,
    exc_type: Optional[Type[BaseException]],
    exc_val: Optional[BaseException],
    exc_tb: Optional[TracebackType],
) -> None:
    """Exit the context. If ``auto_release`` is enabled,
     the lock will be released.

    Args:
        exc_val (BaseException, optional): an exception from within the context
        exc_val (BaseException, optional): value of any exception from
            within the context
        exc_tb (TracebackType, optional): the traceback from the context

    Returns:
        None

    """
    if self.auto_release:
        self.release()
acquire(wait=None, if_already_acquired='raise_error')

Try to acquire the lock.

Parameters:

Name Type Description Default
wait int

the amount of time wait to acquire the lock (in seconds). If None do not block.

None
if_already_acquired str

if True do not raise if the lock is already held by the current owner.

'raise_error'

Returns:

Name Type Description
acquired bool

True if the message is acquired or already acquired by the current owner.

Source code in alsek/core/concurrency.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def acquire(
    self,
    wait: Optional[int] = None,
    if_already_acquired: IF_ALREADY_ACQUIRED_TYPE = "raise_error",
) -> bool:
    """Try to acquire the lock.

    Args:
        wait (int, optional): the amount of time wait to acquire
            the lock (in seconds). If ``None`` do not block.
        if_already_acquired (str): if ``True`` do not raise if the lock
            is already held by the current owner.

    Returns:
        acquired (bool): ``True`` if the message is
            acquired or already acquired by the current owner.

    """
    if if_already_acquired not in get_args(IF_ALREADY_ACQUIRED_TYPE):
        raise ValueError(f"Invalid `on_already_acquired`, got  {if_already_acquired}")  # fmt: skip

    try:
        return self._lock.acquire(blocking=bool(wait), timeout=wait)
    except redis_lock.AlreadyAcquired as error:
        if if_already_acquired == "return_true":
            return True
        elif if_already_acquired == "return_false":
            return False
        else:
            raise error
release(raise_if_not_acquired=False)

Release the lock.

Parameters:

Name Type Description Default
raise_if_not_acquired bool

raise if the lock was not acquired for release.

False

Returns:

Name Type Description
released bool

whether the lock was found and released.

Source code in alsek/core/concurrency.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def release(self, raise_if_not_acquired: bool = False) -> bool:
    """Release the lock.

    Args:
        raise_if_not_acquired (bool): raise if the lock was not
            acquired for release.

    Returns:
        released (bool): whether the lock was
            found and released.

    """
    try:
        self._lock.release()
        return True
    except redis_lock.NotAcquired as error:
        if raise_if_not_acquired:
            raise error
        else:
            return False
ProcessLock

Bases: Lock

Distributed mutual exclusion (MUTEX) lock for controlling concurrency accross processes on the same host.

Source code in alsek/core/concurrency.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
class ProcessLock(Lock):
    """Distributed mutual exclusion (MUTEX) lock for controlling
    concurrency accross processes on the same host.
    """

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        super().__init__(*args, **kwargs, owner_id="")

    def validate(self) -> None:
        if not self.name:
            raise ValueError("`name` must be provided.")

    @property
    def owner_id(self) -> str:
        # We compute this "fresh" every time so that
        # It's always accurate even if the lock is moved
        # to a different process than it was created in.
        return _get_process_lock_owner_id()
ThreadLock

Bases: Lock

Distributed mutual exclusion (MUTEX) lock for controlling concurrency accross processes and threads on the same host.

Source code in alsek/core/concurrency.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
class ThreadLock(Lock):
    """Distributed mutual exclusion (MUTEX) lock for controlling
    concurrency accross processes and threads on the same host.
    """

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        super().__init__(*args, **kwargs, owner_id="")

    def validate(self) -> None:
        if not self.name:
            raise ValueError("`name` must be provided.")

    @property
    def owner_id(self) -> str:
        # We compute this "fresh" every time so that
        # It's always accurate even if the lock is moved
        # to a different thread than it was created in.
        return _get_thread_lock_owner_id()

consumer

Consumer

Consumer

Tool for consuming messages generated by the broker.

Parameters:

Name Type Description Default
broker Broker

an Alsek broker

required
subset (list[str], dict[str, list[str]])

subset of messages to consume Must be one of the following

* ``None``: consume messages from all queues and tasks
* ``list``: a list of queues of the form ``["queue_a", "queue_b", "queue_c", ...]``
* ``dict``: a dictionary of queues and tasks of the form
    ``{"queue_a": ["task_name_a", "task_name_b", "task_name_c", ...], ...}``
None
backoff Backoff

backoff to use in response to passes over the backend which did not yield any actionable messages.

LinearBackoff(1 * 1000, floor=1000, ceiling=30000, zero_override=False)
Notes
  • If subset is a list or dict, queue priority is derived from the order of the items. Items which appear earlier are given higher priority.
  • If subset is a dict, task priority is derived from the order of task names in the value associated with each key (queue).
Warning
  • If subset is of type dict, task names not included in any of the values will be ignored.
Source code in alsek/core/consumer.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
class Consumer:
    """Tool for consuming messages generated by the broker.

    Args:
        broker (Broker): an Alsek broker
        subset (list[str], dict[str, list[str]], optional): subset of messages to consume
            Must be one of the following

                * ``None``: consume messages from all queues and tasks
                * ``list``: a list of queues of the form ``["queue_a", "queue_b", "queue_c", ...]``
                * ``dict``: a dictionary of queues and tasks of the form
                    ``{"queue_a": ["task_name_a", "task_name_b", "task_name_c", ...], ...}``

        backoff (Backoff, optional): backoff to use in response to passes over the backend
            which did not yield any actionable messages.

    Notes:
        * If ``subset`` is a ``list`` or ``dict``, queue priority is derived from the
          order of the items. Items which appear earlier are given higher priority.
        * If ``subset`` is a ``dict``, task priority is derived from the order of
          task names in the value associated with each key (queue).

    Warning:
        * If ``subset`` is of type ``dict``, task names not included
          in any of the values will be ignored.

    """

    def __init__(
        self,
        broker: Broker,
        subset: Optional[Union[list[str], dict[str, list[str]]]] = None,
        backoff: Optional[Backoff] = LinearBackoff(
            1 * 1000,
            floor=1000,
            ceiling=30_000,
            zero_override=False,
        ),
    ) -> None:
        self.subset = subset
        self.broker = broker
        self.backoff = backoff or ConstantBackoff(0, floor=0, ceiling=0)

        self._empty_passes: int = 0
        self.stop_signal = StopSignalListener()

    def _scan_subnamespaces(self) -> Iterable[str]:
        if not self.subset:
            subnamespaces = [get_subnamespace(None)]
        elif isinstance(self.subset, list):
            subnamespaces = [get_subnamespace(q) for q in self.subset]
        else:
            subnamespaces = [
                get_subnamespace(q, task_name=t)
                for (q, tasks) in self.subset.items()
                for t in tasks
            ]

        for s in subnamespaces:
            if self.stop_signal.received:
                break
            for i in self.broker.backend.scan(f"{get_priority_namespace(s)}*"):
                if self.stop_signal.received:
                    break
                yield i

    def _poll(self) -> list[Message]:
        # NOTE: with this approach, we 'drain' / exhaust queues in
        #       the order they're provided, and then drain the next.
        #       So if we had queues A,B,C we'd drain A, then drain B
        #       and, finally, drain C.
        # ToDo: implement a 'flat' option that moves to the next queue
        #       So, A then B then C, round and round.
        output: list[Message] = list()

        def main_loop() -> None:
            for s in self._scan_subnamespaces():
                for name in self.broker.backend.priority_iter(s):
                    message_data = self.broker.backend.get(name)
                    if message_data is None:
                        # Message data can be None if it has been deleted (by a TTL or
                        # another worker) between the `priority_iter()` and `get()` operations.
                        continue

                    message = Message(**message_data)
                    if message.ready and not self.stop_signal.received:
                        with MessageMutex(message, self.broker.backend) as lock:
                            if lock.acquire(if_already_acquired="return_false"):
                                output.append(message.link_lock(lock, override=True))

        try:
            main_loop()
        except KeyboardInterrupt:
            pass
        except redis.exceptions.ConnectionError as error:
            if not self.stop_signal.received:
                raise error

        self._empty_passes = 0 if output else self._empty_passes + 1
        return _dedup_messages(output)

    def stream(self) -> Iterable[Message]:
        """Generate a stream of messages to process from
        the data backend.

        Returns:
            stream (Iterable[Message]): an iterable of messages to process

        """
        while not self.stop_signal.received:
            for message in self._poll():
                yield message
            self.stop_signal.wait(self.backoff.get(self._empty_passes))
stream()

Generate a stream of messages to process from the data backend.

Returns:

Name Type Description
stream Iterable[Message]

an iterable of messages to process

Source code in alsek/core/consumer.py
148
149
150
151
152
153
154
155
156
157
158
159
def stream(self) -> Iterable[Message]:
    """Generate a stream of messages to process from
    the data backend.

    Returns:
        stream (Iterable[Message]): an iterable of messages to process

    """
    while not self.stop_signal.received:
        for message in self._poll():
            yield message
        self.stop_signal.wait(self.backoff.get(self._empty_passes))

futures

Futures

ProcessTaskFuture

Bases: TaskFuture

Future for task execution in a separate process.

Parameters:

Name Type Description Default
task Task

a task to perform

required
message Message

a message to run task against

required
patience int

time to wait (in milliseconds) after issuing a SIGTERM signal to the process at shutdown. If the process is still active after this time, a SIGKILL will be issued.

1 * 1000
Source code in alsek/core/futures.py
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
class ProcessTaskFuture(TaskFuture):
    """Future for task execution in a separate process.

    Args:
        task (Task): a task to perform
        message (Message): a message to run ``task`` against
        patience (int): time to wait (in milliseconds) after issuing
            a SIGTERM signal to the process at shutdown. If the process
            is still active after this time, a SIGKILL will be issued.

    """

    def __init__(self, task: Task, message: Message, patience: int = 1 * 1000) -> None:
        super().__init__(task, message=message)
        self.patience = patience

        self._wrapper_exit_queue: Queue = Queue()
        self._process = Process(
            target=self._wrapper,
            args=(
                _process_future_encoder(task, message=message),
                get_logger().level,
                self._wrapper_exit_queue,
            ),
            daemon=True,
        )
        self._process.start()

        # Note: this must go here b/c the scan depends on
        #   `.complete`, which in turn depends on `_process`.
        self._revocation_scan_thread.start()

    @property
    def complete(self) -> bool:
        """Whether the task has finished."""
        return not self._process.is_alive()

    @staticmethod
    def _wrapper(
        encoded_data: bytes,
        log_level: int,
        wrapper_exit_queue: Queue,
    ) -> None:
        setup_logging(log_level)
        task, message = _process_future_decoder(encoded_data)
        log.info("Received %s...", message.summary)
        task.update_status(message, status=TaskStatus.RUNNING)

        result, exception = None, None
        try:
            task.pre_op(message)
            result = task.execute(message)
            if task.is_revoked(message):
                log.info(
                    "Result for %s recovered after revocation. Discarding.",
                    message.summary,
                )
                return None

            message.update(exception_details=None)  # clear any existing errors
            log.info("Successfully processed %s.", message.summary)
        except BaseException as error:
            log.error("Error processing %s.", message.summary, exc_info=True)
            exception = error
            message.update(exception_details=parse_exception(exception).as_dict())

        # Post op is called here so that exception_details can be set
        task.post_op(message, result=result)

        if not wrapper_exit_queue.empty():
            log.debug("Process task future finished after termination.")
        elif exception is not None:
            _error_encountered_future_handler(
                task,
                message=message,
                exception=exception,
                update_exception_on_message=False,
            )
        else:
            _complete_future_handler(task, message=message, result=result)

        wrapper_exit_queue.put(1)

    def _shutdown(self) -> None:
        self._process.terminate()
        self._process.join(self.patience / 1000)
        if self._process.is_alive():
            self._process.kill()

    def stop(self, exception: Type[BaseException]) -> None:
        """Stop the future.

        Returns:
            None

        """
        if self._process.ident is None:  # type: ignore
            log.error(
                "Process task future for %s did not start.",
                self.message.summary,
            )
            return None

        self._shutdown()
        if self._wrapper_exit_queue.empty():
            self._wrapper_exit_queue.put(1)
            try:
                raise exception(f"Stopped process {self._process.ident}")  # type: ignore
            except BaseException as error:
                log.error("Error processing %s.", self.message.summary, exc_info=True)
                _error_encountered_future_handler(
                    self.task, self.message, exception=error
                )
complete property

Whether the task has finished.

stop(exception)

Stop the future.

Returns:

Type Description
None

None

Source code in alsek/core/futures.py
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
def stop(self, exception: Type[BaseException]) -> None:
    """Stop the future.

    Returns:
        None

    """
    if self._process.ident is None:  # type: ignore
        log.error(
            "Process task future for %s did not start.",
            self.message.summary,
        )
        return None

    self._shutdown()
    if self._wrapper_exit_queue.empty():
        self._wrapper_exit_queue.put(1)
        try:
            raise exception(f"Stopped process {self._process.ident}")  # type: ignore
        except BaseException as error:
            log.error("Error processing %s.", self.message.summary, exc_info=True)
            _error_encountered_future_handler(
                self.task, self.message, exception=error
            )
TaskFuture

Bases: ABC

Future for background task execution.

Parameters:

Name Type Description Default
task Task

a task to perform

required
message Message

a message to run task against

required
Source code in alsek/core/futures.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
class TaskFuture(ABC):
    """Future for background task execution.

    Args:
        task (Task): a task to perform
        message (Message): a message to run ``task`` against

    """

    def __init__(self, task: Task, message: Message) -> None:
        self.task = task
        self.message = message

        self.created_at = utcnow_timestamp_ms()

        self._revocation_stop_event = Event()
        self._revocation_scan_thread = Thread(
            target=self._revocation_scan,
            daemon=True,
        )

    @property
    @abstractmethod
    def complete(self) -> bool:
        """Whether the task has finished."""
        raise NotImplementedError()

    @property
    def time_limit_exceeded(self) -> bool:
        """Whether task has been running longer
        than the allowed time window."""
        if self.complete:
            return False
        return (utcnow_timestamp_ms() - self.created_at) > self.message.timeout

    @abstractmethod
    def stop(self, exception: Type[BaseException]) -> None:
        """Stop the future.

        Args:
            exception (Type[BaseException]): exception type to raise.

        Returns:
            None

        """
        raise NotImplementedError()

    @suppress_exception(
        TerminationError,
        on_suppress=lambda error: log.info("Termination Detected"),
    )
    def _revocation_scan(self, check_interval: int | float = 0.5) -> None:
        while not self.complete and not self._revocation_stop_event.is_set():
            if self.task.is_revoked(self.message):
                log.info(
                    "Evicting '%s' due to task revocation...",
                    self.message.summary,
                )
                self.stop(RevokedError)
                _handle_failure(
                    task=self.task,
                    message=self.message,
                    exception=RevokedError(f"Task '{self.task.name}' was revoked."),
                )
                log.info("Evicted '%s'.", self.message.summary)
                break
            self._revocation_stop_event.wait(check_interval)

    def clean_up(self, ignore_errors: bool = False) -> None:
        try:
            self._revocation_stop_event.set()
            self._revocation_scan_thread.join(timeout=0)
        except BaseException as error:  # noqa
            log.error(
                "Clean up error encountered for task %s with message %s.",
                self.task.name,
                self.message.summary,
            )
            if not ignore_errors:
                raise error
complete abstractmethod property

Whether the task has finished.

time_limit_exceeded property

Whether task has been running longer than the allowed time window.

stop(exception) abstractmethod

Stop the future.

Parameters:

Name Type Description Default
exception Type[BaseException]

exception type to raise.

required

Returns:

Type Description
None

None

Source code in alsek/core/futures.py
182
183
184
185
186
187
188
189
190
191
192
193
@abstractmethod
def stop(self, exception: Type[BaseException]) -> None:
    """Stop the future.

    Args:
        exception (Type[BaseException]): exception type to raise.

    Returns:
        None

    """
    raise NotImplementedError()
ThreadTaskFuture

Bases: TaskFuture

Future for task execution in a separate thread.

Parameters:

Name Type Description Default
task Task

a task to perform

required
message Message

a message to run task against

required
complete_only_on_thread_exit bool

if True, only mark the future as complete when the thread formally exits (i.e., is not alive). Pro: more rigorous — avoids marking the task complete until the thread fully terminates. Useful when you need strict control over thread lifecycle (e.g., for resource management). Con: may lead to hanging if the thread doesn't terminate quickly (e.g., when using thread_raise() during revocation). This can also temporarily result in more than the allotted number of threads running, because it entails treating a thread as expired regardless of its actual status.

False
Source code in alsek/core/futures.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
class ThreadTaskFuture(TaskFuture):
    """Future for task execution in a separate thread.

    Args:
        task (Task): a task to perform
        message (Message): a message to run ``task`` against
        complete_only_on_thread_exit (bool): if ``True``, only mark the future
            as complete when the thread formally exits (i.e., is not alive).
            Pro: more rigorous — avoids marking the task complete until the thread fully terminates.
            Useful when you need strict control over thread lifecycle (e.g., for resource management).
            Con: may lead to hanging if the thread doesn't terminate quickly (e.g., when using
            `thread_raise()` during revocation). This can also temporarily result in more than the
            allotted number of threads running, because it entails treating a thread as
            expired regardless of its actual status.

    """

    def __init__(
        self,
        task: Task,
        message: Message,
        complete_only_on_thread_exit: bool = False,
    ) -> None:
        super().__init__(task, message=message)
        self.complete_only_on_thread_exit = complete_only_on_thread_exit

        self._wrapper_exit: bool = False
        self._thread = Thread(target=self._wrapper, daemon=True)
        self._thread.start()

        # Note: this must go here b/c the scan depends on
        #   `.complete`, which in turn depends on `_thread`.
        self._revocation_scan_thread.start()

    @property
    def complete(self) -> bool:
        """Whether the task has finished."""
        thread_alive = self._thread.is_alive()
        if self.complete_only_on_thread_exit:
            return not thread_alive
        else:
            # If _wrapper_exit is True, consider the task complete even if the thread is still running
            # This ensures the future gets removed from the worker pool's _futures list
            # and new tasks can be polled even if a revoked task's thread is still running
            return self._wrapper_exit or not thread_alive

    def _wrapper(self) -> None:
        log.info("Received %s...", self.message.summary)
        self.task.update_status(self.message, status=TaskStatus.RUNNING)

        result, exception = None, None
        try:
            self.task.pre_op(self.message)
            result = self.task.execute(self.message)
            if self.task.is_revoked(self.message):
                log.info(
                    "Result for %s recovered after revocation. Discarding.",
                    self.message.summary,
                )
                return None

            self.message.update(exception_details=None)  # clear any existing errors
            log.info("Successfully processed %s.", self.message.summary)
        except BaseException as error:
            log.error("Error processing %s.", self.message.summary, exc_info=True)
            exception = error
            self.message.update(exception_details=parse_exception(exception).as_dict())

        # Post op is called here so that exception_details can be set
        self.task.post_op(self.message, result=result)

        if self._wrapper_exit:
            log.debug("Thread task future finished after termination.")
        elif exception is not None:
            _error_encountered_future_handler(
                task=self.task,
                message=self.message,
                exception=exception,
            )
        else:
            _complete_future_handler(self.task, self.message, result=result)

        self._wrapper_exit = True

    def stop(self, exception: Type[BaseException]) -> None:
        """Stop the future.

        Args:
            exception (Type[BaseException]): exception type to raise.

        Returns:
            None

        """
        if self._thread.ident is None:
            log.error(
                "Thread task future for %s did not start.",
                self.message.summary,
            )
            return None
        elif python_implementation() != "CPython":
            log.error(
                f"Unable to raise exception {exception} in thread {self._thread.ident}. "
                f"Unsupported platform '{python_implementation()}'."
            )
            return None

        thread_raise(self._thread.ident, exception=exception)
        if not self._wrapper_exit:
            self._wrapper_exit = True
            _error_encountered_future_handler(
                self.task,
                message=self.message,
                exception=exception(f"Stopped thread {self._thread.ident}"),
            )
complete property

Whether the task has finished.

stop(exception)

Stop the future.

Parameters:

Name Type Description Default
exception Type[BaseException]

exception type to raise.

required

Returns:

Type Description
None

None

Source code in alsek/core/futures.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
def stop(self, exception: Type[BaseException]) -> None:
    """Stop the future.

    Args:
        exception (Type[BaseException]): exception type to raise.

    Returns:
        None

    """
    if self._thread.ident is None:
        log.error(
            "Thread task future for %s did not start.",
            self.message.summary,
        )
        return None
    elif python_implementation() != "CPython":
        log.error(
            f"Unable to raise exception {exception} in thread {self._thread.ident}. "
            f"Unsupported platform '{python_implementation()}'."
        )
        return None

    thread_raise(self._thread.ident, exception=exception)
    if not self._wrapper_exit:
        self._wrapper_exit = True
        _error_encountered_future_handler(
            self.task,
            message=self.message,
            exception=exception(f"Stopped thread {self._thread.ident}"),
        )

message

Message

Message

Alsek Message.

Parameters:

Name Type Description Default
task_name str

the name of the task for which the message is intended

required
queue str

the queue for which the message was intended. If None the default queue will be set.

None
args (list, tuple)

positional arguments to pass to the task's function during the execution of op()

None
kwargs dict

keyword arguments to pass to the task's function during the execution of op()

None
priority int

priority of the message within the task. Messages with lower values will be executed before messages with higher values.

0
metadata dict

a dictionary of user-defined message metadata. This can store any data types supported by the backend's serializer.

None
exception_details dict

information about any exception raised while executing this message. See ExceptionDetails().

None
result_ttl int

time to live (in milliseconds) for the result in the result store. If a result store is provided and this parameter is None, the result will be persisted indefinitely.

None
uuid str

universal unique identifier for the message. If None, one will be generated automatically.

None
progenitor_uuid str

universal unique identifier for the message from which this message descended. (This field is only set in for tasks with triggers and/or callbacks.)

None
retries int

number of retries

0
timeout int

the maximum amount of time (in milliseconds) a task is permitted to run against this message.

DEFAULT_TASK_TIMEOUT
created_at int

UTC timestamp (in milliseconds) for when the message was created

None
updated_at int

UTC timestamp (in milliseconds) for when the message was last updated

None
delay int

delay before the message becomes ready (in milliseconds).

None
previous_result any

the output of any previously executed task. (This will only be non-null in cases where callbacks are used.)

None
previous_message_uuid str

universal unique identifier for the message for the preceding message (This will only be non-null in cases where callbacks are used.)

None
callback_message_data dict

data to construct a new message as part of a callback operation

None
backoff_settings dict

parameters to control backoff. Expected to be of the form {"algorithm": str, "parameters": dict}.

None
mechanism SupportedMechanismType

mechanism for executing the task. Must be either "process" or "thread".

DEFAULT_MECHANISM
Notes
  • While not recommended, timeout can be disabled, in effect, by setting it to a very large integer.
Source code in alsek/core/message.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
class Message:
    """Alsek Message.

    Args:
        task_name (str): the name of the task for which
            the message is intended
        queue (str, optional): the queue for which the message was intended.
            If ``None`` the default queue will be set.
        args (list, tuple, optional): positional arguments to pass to
            the task's function during the execution of ``op()``
        kwargs (dict, optional): keyword arguments to pass to
            the task's function during the execution of ``op()``
        priority (int): priority of the message within the task.
            Messages with lower values will be executed before messages with higher values.
        metadata (dict, optional): a dictionary of user-defined message metadata.
            This can store any data types supported by the backend's serializer.
        exception_details (dict, optional): information about any exception raised
            while executing this message. See ``ExceptionDetails()``.
        result_ttl (int, optional): time to live (in milliseconds) for the
            result in the result store. If a result store is provided and
            this parameter is ``None``, the result will be persisted indefinitely.
        uuid (str, optional): universal unique identifier for the message.
            If ``None``, one will be generated automatically.
        progenitor_uuid (str, optional): universal unique identifier for the message
            from which this message descended. (This field is only set in for tasks
            with triggers and/or callbacks.)
        retries (int): number of retries
        timeout (int): the maximum amount of time (in milliseconds)
            a task is permitted to run against this message.
        created_at (int): UTC timestamp (in milliseconds) for
            when the message was created
        updated_at (int): UTC timestamp (in milliseconds) for
            when the message was last updated
        delay (int): delay before the message becomes ready (in milliseconds).
        previous_result (any, optional): the output of any
            previously executed task. (This will only be non-null
            in cases where callbacks are used.)
        previous_message_uuid (str, optional): universal unique identifier
            for the message for the preceding message (This will only be
            non-null in cases where callbacks are used.)
        callback_message_data (dict, optional): data to construct
            a new message as part of a callback operation
        backoff_settings (dict, optional): parameters to control
            backoff. Expected to be of the form
            ``{"algorithm": str, "parameters": dict}``.
        mechanism (SupportedMechanismType): mechanism for executing the task. Must
            be either "process" or "thread".

    Notes:
        * While *not* recommended, ``timeout`` can be disabled,
          in effect, by setting it to a very large integer.

    """

    def __init__(
        self,
        task_name: str,
        queue: Optional[str] = None,
        args: Optional[Union[list[Any], tuple[Any, ...]]] = None,
        kwargs: Optional[dict[Any, Any]] = None,
        priority: int = 0,
        metadata: Optional[dict[Any, Any]] = None,
        exception_details: Optional[Union[dict[str, Any], ExceptionDetails]] = None,
        result_ttl: Optional[int] = None,
        uuid: Optional[str] = None,
        progenitor_uuid: Optional[str] = None,
        retries: int = 0,
        timeout: int = DEFAULT_TASK_TIMEOUT,
        created_at: Optional[int] = None,
        updated_at: Optional[int] = None,
        delay: Optional[int] = None,
        previous_result: Optional[Any] = None,
        previous_message_uuid: Optional[str] = None,
        callback_message_data: Optional[dict[str, Any]] = None,
        backoff_settings: Optional[dict[str, Any]] = None,
        mechanism: SupportedMechanismType = DEFAULT_MECHANISM,
        linked_lock: Optional[LinkedLock] = None,
    ) -> None:
        self.task_name = task_name
        self.queue = queue or DEFAULT_QUEUE
        self.args = tuple(args) if args else tuple()
        self.kwargs = kwargs or dict()
        self.priority = priority
        self.metadata = metadata
        self._exception_details = exception_details
        self.result_ttl = result_ttl
        self.retries = retries
        self.timeout = timeout
        self.uuid = uuid or _make_uuid()
        self.progenitor_uuid = progenitor_uuid
        self.delay = delay or 0
        self.previous_result = previous_result
        self.previous_message_uuid = previous_message_uuid
        self.callback_message_data = callback_message_data
        self.backoff_settings = backoff_settings or ExponentialBackoff().settings
        self.mechanism = mechanism
        self.linked_lock = linked_lock

        if created_at is None and updated_at is None:
            self.created_at = self.updated_at = utcnow_timestamp_ms()
        elif created_at is None or updated_at is None:
            raise ValueError("Time data is corrupt")
        else:
            self.created_at, self.updated_at = created_at, updated_at

    @property
    def exception_details(self) -> Optional[ExceptionDetails]:
        """information about any exception raised."""
        if self._exception_details is None:
            return None
        elif isinstance(self._exception_details, ExceptionDetails):
            return self._exception_details
        elif isinstance(self._exception_details, dict):
            return ExceptionDetails(**self._exception_details)
        else:
            raise ValueError("Unexpected `exception_details` type")

    @exception_details.setter
    def exception_details(
        self,
        value: Optional[Union[ExceptionDetails, dict[str, Any]]],
    ) -> None:
        """Set information about any exception raised."""
        if isinstance(value, (ExceptionDetails, dict, type(None))):
            self._exception_details = value
        else:
            raise TypeError("`exception_details` is invalid")

    @property
    def data(self) -> dict[str, Any]:
        """Underlying message data."""
        return dict(
            task_name=self.task_name,
            queue=self.queue,
            args=self.args,
            kwargs=self.kwargs,
            priority=self.priority,
            metadata=self.metadata,
            exception_details=(
                None
                if self.exception_details is None
                else self.exception_details.as_dict()
            ),
            result_ttl=self.result_ttl,
            uuid=self.uuid,
            progenitor_uuid=self.progenitor_uuid,
            retries=self.retries,
            timeout=self.timeout,
            created_at=self.created_at,
            updated_at=self.updated_at,
            delay=self.delay,
            previous_result=self.previous_result,
            previous_message_uuid=self.previous_message_uuid,
            callback_message_data=self.callback_message_data,
            backoff_settings=self.backoff_settings,
            mechanism=self.mechanism,
            linked_lock=self.linked_lock,
        )

    def __repr__(self) -> str:
        params = self.data
        for k in ("created_at", "updated_at"):
            params[k] = from_timestamp_ms(params[k])
        return auto_repr(self, **params)

    @property
    def summary(self) -> str:
        """High-level summary of the message object."""
        return auto_repr(
            self,
            new_line_threshold=None,
            uuid=self.uuid,
            queue=self.queue,
            task=self.task_name,
        )

    def get_backoff_duration(self) -> int:
        """Get the amount of time to backoff (wait)
        before the message is eligible for processing again,
        should it fail.

        Returns:
            duration (int): duration of the backoff in milliseconds

        """
        return settings2backoff(self.backoff_settings).get(self.retries)

    @property
    def ready_at(self) -> int:
        """Timestamp denoting when the message will be ready for processing."""
        return self.created_at + self.delay + self.get_backoff_duration()

    @property
    def ready(self) -> bool:
        """If the messages is currently ready for processing."""
        return self.ready_at <= utcnow_timestamp_ms()

    @property
    def ttr(self) -> int:
        """Time to ready in milliseconds."""
        if self.ready:
            return 0
        return max(self.ready_at - utcnow_timestamp_ms(), 0)

    @property
    def descendant_uuids(self) -> Optional[list[str]]:
        """A list of uuids which have or will decent from this message."""
        if self.callback_message_data:
            return list(_collect_callback_uuids(self.callback_message_data))
        else:
            return None

    def link_lock(self, lock: Lock, override: bool = False) -> Message:
        """Link a lock to the current message.

        Links are formed against the ``long_name`` of ``lock``.

        Args:
            lock (Lock): a concurrency lock
            override (bool): if ``True`` replace any existing lock

        Returns:
            message (Message): the updated message

        Warning:
            * Locks links are formed in memory and are
              never persisted to the data backend.

        """
        if self.linked_lock and not override:
            raise AttributeError(f"Message already linked to a lock")
        else:
            self.linked_lock = LinkedLock(
                name=lock.name,
                owner_id=lock.owner_id,
            )
        return self

    def release_lock(self, not_linked_ok: bool, target_backend: Backend) -> bool:
        """Release the lock linked to the message.

        Args:
            not_linked_ok (bool): if ``True`` do not raise if no lock is found
            target_backend (Backend): a backend to release the lock from.

        Returns:
            success (bool): if the lock was released successfully.

        Raises:
            AttributeError: if no lock is associated with the message
                and ``missing_ok`` is not ``True``.

        """
        log.info("Releasing lock for %s...", self.summary)
        if self.linked_lock:
            # ToDo: the backend passed into might not be the same
            #   one that was used to create the lock. Without also
            #   saving the backend information along with 'name' and
            #   'owner_id' we have no way of knowing that. Fix.
            try:
                Lock(
                    self.linked_lock["name"],
                    backend=target_backend,
                    owner_id=self.linked_lock["owner_id"],
                ).release(raise_if_not_acquired=True)
                log.info("Released lock for %s.", self.summary)
                self.linked_lock = None
                return True
            except redis_lock.NotAcquired:  # noqa
                log.critical(
                    "Failed to release lock for %s",
                    self.summary,
                    exc_info=True,
                )
                return False
        elif not_linked_ok:
            return False
        else:
            raise AttributeError("No lock linked to message")

    def clone(self) -> Message:
        """Create an exact copy of the current message.

        Returns:
            clone (Message): the cloned message

        """
        return Message(**deepcopy(self.data))

    def update(self, **data: Any) -> Message:
        """Update the ``data`` in the current message.

        Args:
            **data (Keyword Args): key value pairs of
                data to update

        Returns:
            updated_message (Message): the updated message

        Warning:
            * This method operates 'in place'. To avoid changing the current
              message, first call ``.clone()``, e.g., ``message.clone().update(...)``.
            * Changes are *not* automatically persisted to the backend.

        """
        for k, v in data.items():
            if k in self.data:
                setattr(self, k, v)
            else:
                raise KeyError(f"Unsupported key '{k}'")
        return self

    def add_to_metadata(self, **data: Any) -> Message:
        """Adds metadata to the current instance by merging provided data into the
        existing metadata. The function performs a non-inplace merge operation,
        ensuring the original metadata is not directly altered unless returned
        and reassigned.

        Args:
            **data: Key-value pairs to merge into the existing metadata.

        Returns:
            Message: The updated instance with the merged metadata.
        """
        if not data:
            raise ValueError("No data provided to add to metadata.")

        self.metadata = dict_merge_update_into_origin(
            origin=self.metadata or dict(),
            update=data,
            inplace=False,
        )
        return self

    def duplicate(self, uuid: Optional[str] = None) -> Message:
        """Create a duplicate of the current message, changing only ``uuid``.

        Args:
            uuid (str, optional): universal unique identifier for the new message.
                If ``None``, one will be generated automatically.

        Returns:
            duplicate_message (Message): the duplicate message

        Warning:
            * Linked locks are not conserved

        """
        return self.clone().update(uuid=uuid or _make_uuid())

    def increment_retries(self) -> Message:
        """Update a message by increasing the number
        of retries.

        Returns:
            message (Message): the updated message

        Notes:
            * ``updated_at`` will be updated to the
               current time.

        Warning:
            * Changes are *not* automatically persisted to the backend.

        """
        return self.update(
            retries=self.retries + 1,
            updated_at=utcnow_timestamp_ms(),
        )
data property

Underlying message data.

descendant_uuids property

A list of uuids which have or will decent from this message.

exception_details property writable

information about any exception raised.

ready property

If the messages is currently ready for processing.

ready_at property

Timestamp denoting when the message will be ready for processing.

summary property

High-level summary of the message object.

ttr property

Time to ready in milliseconds.

add_to_metadata(**data)

Adds metadata to the current instance by merging provided data into the existing metadata. The function performs a non-inplace merge operation, ensuring the original metadata is not directly altered unless returned and reassigned.

Parameters:

Name Type Description Default
**data Any

Key-value pairs to merge into the existing metadata.

{}

Returns:

Name Type Description
Message Message

The updated instance with the merged metadata.

Source code in alsek/core/message.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
def add_to_metadata(self, **data: Any) -> Message:
    """Adds metadata to the current instance by merging provided data into the
    existing metadata. The function performs a non-inplace merge operation,
    ensuring the original metadata is not directly altered unless returned
    and reassigned.

    Args:
        **data: Key-value pairs to merge into the existing metadata.

    Returns:
        Message: The updated instance with the merged metadata.
    """
    if not data:
        raise ValueError("No data provided to add to metadata.")

    self.metadata = dict_merge_update_into_origin(
        origin=self.metadata or dict(),
        update=data,
        inplace=False,
    )
    return self
clone()

Create an exact copy of the current message.

Returns:

Name Type Description
clone Message

the cloned message

Source code in alsek/core/message.py
326
327
328
329
330
331
332
333
def clone(self) -> Message:
    """Create an exact copy of the current message.

    Returns:
        clone (Message): the cloned message

    """
    return Message(**deepcopy(self.data))
duplicate(uuid=None)

Create a duplicate of the current message, changing only uuid.

Parameters:

Name Type Description Default
uuid str

universal unique identifier for the new message. If None, one will be generated automatically.

None

Returns:

Name Type Description
duplicate_message Message

the duplicate message

Warning
  • Linked locks are not conserved
Source code in alsek/core/message.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
def duplicate(self, uuid: Optional[str] = None) -> Message:
    """Create a duplicate of the current message, changing only ``uuid``.

    Args:
        uuid (str, optional): universal unique identifier for the new message.
            If ``None``, one will be generated automatically.

    Returns:
        duplicate_message (Message): the duplicate message

    Warning:
        * Linked locks are not conserved

    """
    return self.clone().update(uuid=uuid or _make_uuid())
get_backoff_duration()

Get the amount of time to backoff (wait) before the message is eligible for processing again, should it fail.

Returns:

Name Type Description
duration int

duration of the backoff in milliseconds

Source code in alsek/core/message.py
222
223
224
225
226
227
228
229
230
231
def get_backoff_duration(self) -> int:
    """Get the amount of time to backoff (wait)
    before the message is eligible for processing again,
    should it fail.

    Returns:
        duration (int): duration of the backoff in milliseconds

    """
    return settings2backoff(self.backoff_settings).get(self.retries)
increment_retries()

Update a message by increasing the number of retries.

Returns:

Name Type Description
message Message

the updated message

Notes
  • updated_at will be updated to the current time.
Warning
  • Changes are not automatically persisted to the backend.
Source code in alsek/core/message.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
def increment_retries(self) -> Message:
    """Update a message by increasing the number
    of retries.

    Returns:
        message (Message): the updated message

    Notes:
        * ``updated_at`` will be updated to the
           current time.

    Warning:
        * Changes are *not* automatically persisted to the backend.

    """
    return self.update(
        retries=self.retries + 1,
        updated_at=utcnow_timestamp_ms(),
    )

Link a lock to the current message.

Links are formed against the long_name of lock.

Parameters:

Name Type Description Default
lock Lock

a concurrency lock

required
override bool

if True replace any existing lock

False

Returns:

Name Type Description
message Message

the updated message

Warning
  • Locks links are formed in memory and are never persisted to the data backend.
Source code in alsek/core/message.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def link_lock(self, lock: Lock, override: bool = False) -> Message:
    """Link a lock to the current message.

    Links are formed against the ``long_name`` of ``lock``.

    Args:
        lock (Lock): a concurrency lock
        override (bool): if ``True`` replace any existing lock

    Returns:
        message (Message): the updated message

    Warning:
        * Locks links are formed in memory and are
          never persisted to the data backend.

    """
    if self.linked_lock and not override:
        raise AttributeError(f"Message already linked to a lock")
    else:
        self.linked_lock = LinkedLock(
            name=lock.name,
            owner_id=lock.owner_id,
        )
    return self
release_lock(not_linked_ok, target_backend)

Release the lock linked to the message.

Parameters:

Name Type Description Default
not_linked_ok bool

if True do not raise if no lock is found

required
target_backend Backend

a backend to release the lock from.

required

Returns:

Name Type Description
success bool

if the lock was released successfully.

Raises:

Type Description
AttributeError

if no lock is associated with the message and missing_ok is not True.

Source code in alsek/core/message.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
def release_lock(self, not_linked_ok: bool, target_backend: Backend) -> bool:
    """Release the lock linked to the message.

    Args:
        not_linked_ok (bool): if ``True`` do not raise if no lock is found
        target_backend (Backend): a backend to release the lock from.

    Returns:
        success (bool): if the lock was released successfully.

    Raises:
        AttributeError: if no lock is associated with the message
            and ``missing_ok`` is not ``True``.

    """
    log.info("Releasing lock for %s...", self.summary)
    if self.linked_lock:
        # ToDo: the backend passed into might not be the same
        #   one that was used to create the lock. Without also
        #   saving the backend information along with 'name' and
        #   'owner_id' we have no way of knowing that. Fix.
        try:
            Lock(
                self.linked_lock["name"],
                backend=target_backend,
                owner_id=self.linked_lock["owner_id"],
            ).release(raise_if_not_acquired=True)
            log.info("Released lock for %s.", self.summary)
            self.linked_lock = None
            return True
        except redis_lock.NotAcquired:  # noqa
            log.critical(
                "Failed to release lock for %s",
                self.summary,
                exc_info=True,
            )
            return False
    elif not_linked_ok:
        return False
    else:
        raise AttributeError("No lock linked to message")
update(**data)

Update the data in the current message.

Parameters:

Name Type Description Default
**data Keyword Args

key value pairs of data to update

{}

Returns:

Name Type Description
updated_message Message

the updated message

Warning
  • This method operates 'in place'. To avoid changing the current message, first call .clone(), e.g., message.clone().update(...).
  • Changes are not automatically persisted to the backend.
Source code in alsek/core/message.py
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def update(self, **data: Any) -> Message:
    """Update the ``data`` in the current message.

    Args:
        **data (Keyword Args): key value pairs of
            data to update

    Returns:
        updated_message (Message): the updated message

    Warning:
        * This method operates 'in place'. To avoid changing the current
          message, first call ``.clone()``, e.g., ``message.clone().update(...)``.
        * Changes are *not* automatically persisted to the backend.

    """
    for k, v in data.items():
        if k in self.data:
            setattr(self, k, v)
        else:
            raise KeyError(f"Unsupported key '{k}'")
    return self

status

Status Tracking

StatusTracker

Alsek Status Tracker.

Parameters:

Name Type Description Default
backend Backend

backend to persists results to. (In almost all cases, this should be the same backend used by the Broker).

required
ttl int

time to live (in milliseconds) for the status

DEFAULT_TTL
enable_pubsub bool

if True automatically publish PUBSUB updates. If None determine automatically given the capabilities of the backend used by broker.

None
Source code in alsek/core/status.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
class StatusTracker:
    """Alsek Status Tracker.

    Args:
        backend (Backend): backend to persists results to. (In almost all cases, this
            should be the same backend used by the Broker).
        ttl (int, optional): time to live (in milliseconds) for the status
        enable_pubsub (bool, optional): if ``True`` automatically publish PUBSUB updates.
            If ``None`` determine automatically given the capabilities of the backend
            used by ``broker``.

    """

    def __init__(
        self,
        backend: Backend,
        ttl: Optional[int] = DEFAULT_TTL,
        enable_pubsub: Optional[bool] = None,
    ) -> None:
        self.backend = backend
        self.ttl = ttl
        self.enable_pubsub = backend.SUPPORTS_PUBSUB if enable_pubsub is None else enable_pubsub  # fmt: skip

        if enable_pubsub and not backend.SUPPORTS_PUBSUB:
            raise AssertionError("Backend does not support PUBSUB")

    def serialize(self) -> dict[str, Any]:
        return {
            "backend": self.backend.encode(),
            "ttl": self.ttl,
            "enable_pubsub": self.enable_pubsub,
        }

    @staticmethod
    def deserialize(data: dict[str, Any]) -> StatusTracker:
        backend_data = dill.loads(data["backend"])
        backend = backend_data["backend"]._from_settings(backend_data["settings"])
        return StatusTracker(
            backend=backend,
            ttl=data["ttl"],
            enable_pubsub=data["enable_pubsub"],
        )

    @staticmethod
    def get_storage_name(message: Message) -> str:
        """Get the key for the status information about the message

        Args:
            message (Message): an Alsek message

        Returns:
            name (string): the key for the status information

        """
        if not message.queue or not message.task_name or not message.uuid:
            raise ValidationError("Required attributes not set for message")
        return f"status:{message.queue}:{message.task_name}:{message.uuid}"

    @staticmethod
    def get_pubsub_name(message: Message) -> str:
        """Get the channel for status updates about the message.

        Args:
            message (Message): an Alsek message

        Returns:
            name (string): the channel for the status information

        """
        if not message.queue or not message.task_name or not message.uuid:
            raise ValidationError("Required attributes not set for message")
        return f"channel:{message.queue}:{message.task_name}:{message.uuid}"

    def exists(self, message: Message) -> bool:
        """Check if a status for ``message`` exists in the backend.

        Args:
            message (Message): an Alsek message

        Returns:
            bool

        """
        return self.backend.exists(self.get_storage_name(message))

    def publish_update(self, message: Message, update: StatusUpdate) -> None:
        """Publish a PUBSUB update for a message.

        Args:
            message (Message): an Alsek message
            update (StatusUpdate): a status to publish

        Returns:
            None

        """
        self.backend.pub(
            self.get_pubsub_name(message),
            value=update.as_dict(),  # converting to dict makes this serializer-agnostic
        )

    def listen_to_updates(
        self,
        message: Message,
        auto_exit: bool = True,
    ) -> Iterable[StatusUpdate]:
        """Listen to PUBSUB updates for ``message``.

        Args:
            message (Message): an Alsek message
            auto_exit (bool): if ``True`` stop listening if a terminal status for the
                task is encountered (succeeded or failed).

        Returns:
            stream (Iterable[StatusUpdate]): A stream of updates from the pubsub channel

        """
        if not self.enable_pubsub:
            raise ValueError("PUBSUB not enabled")

        for i in self.backend.sub(self.get_pubsub_name(message)):
            if i.get("type", "").lower() == "message":
                update = StatusUpdate(
                    status=TaskStatus[i["data"]["status"]],  # noqa
                    details=i["data"]["details"],  # noqa
                )
                yield update
                if auto_exit and update.status in TERMINAL_TASK_STATUSES:
                    break

    def set(
        self,
        message: Message,
        status: TaskStatus,
        details: Optional[Any] = None,
    ) -> None:
        """Set a ``status`` for ``message``.

        Args:
            message (Message): an Alsek message
            status (TaskStatus): a status to set
            details (Any, optional): additional information about the status (e.g., progress percentage)

        Returns:
            None

        """
        update = StatusUpdate(status=status, details=details)
        self.backend.set(
            self.get_storage_name(message),
            value=update.as_dict(),
            ttl=self.ttl if status == TaskStatus.SUBMITTED else None,
        )
        if self.enable_pubsub:
            self.publish_update(message, update=update)

    def get(self, message: Message) -> StatusUpdate:
        """Get the status of ``message``.

        Args:
            message (Message): an Alsek message

        Returns:
            status (StatusUpdate): the status of ``message``

        """
        if value := self.backend.get(self.get_storage_name(message)):
            return StatusUpdate(
                status=TaskStatus[value["status"]],  # noqa
                details=value["details"],
            )
        else:
            raise KeyError(f"No status found for message '{message.summary}'")

    def wait_for(
        self,
        message: Message,
        status: TaskStatus | tuple[TaskStatus, ...] | list[TaskStatus],
        timeout: Optional[float] = 5.0,
        poll_interval: float = 0.05,
    ) -> bool:
        """Wait for a message to reach a desired status.

        Args:
            message (Message): the message to monitor
            status (TaskStatus, tuple[TaskStatus...], list[TaskStatus]): the target status
            timeout (float, optional): max time to wait (in seconds). None means wait forever.
            poll_interval (float): how often to check (in seconds)

        Returns:
            bool: True if desired status reached, False if timed out
        """
        if not isinstance(status, TaskStatus) and not isinstance(status, (list, tuple)):
            raise ValueError(f"Invalid status type: {type(status)}")

        def is_match(current_status: TaskStatus) -> bool:
            if isinstance(status, TaskStatus):
                return current_status == status
            elif isinstance(status, (list, tuple)):
                return current_status in status
            else:
                raise ValueError(f"Invalid status type: {type(status)}")

        deadline = None if timeout is None else time.time() + timeout
        while True:
            try:
                if is_match(self.get(message).status):
                    return True
            except KeyError:
                pass
            if deadline is not None and time.time() > deadline:
                return False
            time.sleep(poll_interval)

    def delete(self, message: Message, check: bool = True) -> None:
        """Delete the status of ``message``.

        Args:
            message (Message): an Alsek message
            check (bool): check that it is safe to delete the status.
                This is done by ensuring that the current status of ``message``
                is terminal (i.e., ``TaskStatus.FAILED`` or ``TaskStatus.SUCCEEDED``).

        Returns:
            None

        Raises:
            ValidationError: if ``check`` is ``True`` and the status of
                ``message`` is not ``TaskStatus.FAILED`` or ``TaskStatus.SUCCEEDED``.

        """
        if check and self.get(message).status not in TERMINAL_TASK_STATUSES:
            raise ValidationError(f"Message '{message.uuid}' in a non-terminal state")
        self.backend.delete(self.get_storage_name(message), missing_ok=False)
delete(message, check=True)

Delete the status of message.

Parameters:

Name Type Description Default
message Message

an Alsek message

required
check bool

check that it is safe to delete the status. This is done by ensuring that the current status of message is terminal (i.e., TaskStatus.FAILED or TaskStatus.SUCCEEDED).

True

Returns:

Type Description
None

None

Raises:

Type Description
ValidationError

if check is True and the status of message is not TaskStatus.FAILED or TaskStatus.SUCCEEDED.

Source code in alsek/core/status.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
def delete(self, message: Message, check: bool = True) -> None:
    """Delete the status of ``message``.

    Args:
        message (Message): an Alsek message
        check (bool): check that it is safe to delete the status.
            This is done by ensuring that the current status of ``message``
            is terminal (i.e., ``TaskStatus.FAILED`` or ``TaskStatus.SUCCEEDED``).

    Returns:
        None

    Raises:
        ValidationError: if ``check`` is ``True`` and the status of
            ``message`` is not ``TaskStatus.FAILED`` or ``TaskStatus.SUCCEEDED``.

    """
    if check and self.get(message).status not in TERMINAL_TASK_STATUSES:
        raise ValidationError(f"Message '{message.uuid}' in a non-terminal state")
    self.backend.delete(self.get_storage_name(message), missing_ok=False)
exists(message)

Check if a status for message exists in the backend.

Parameters:

Name Type Description Default
message Message

an Alsek message

required

Returns:

Type Description
bool

bool

Source code in alsek/core/status.py
131
132
133
134
135
136
137
138
139
140
141
def exists(self, message: Message) -> bool:
    """Check if a status for ``message`` exists in the backend.

    Args:
        message (Message): an Alsek message

    Returns:
        bool

    """
    return self.backend.exists(self.get_storage_name(message))
get(message)

Get the status of message.

Parameters:

Name Type Description Default
message Message

an Alsek message

required

Returns:

Name Type Description
status StatusUpdate

the status of message

Source code in alsek/core/status.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def get(self, message: Message) -> StatusUpdate:
    """Get the status of ``message``.

    Args:
        message (Message): an Alsek message

    Returns:
        status (StatusUpdate): the status of ``message``

    """
    if value := self.backend.get(self.get_storage_name(message)):
        return StatusUpdate(
            status=TaskStatus[value["status"]],  # noqa
            details=value["details"],
        )
    else:
        raise KeyError(f"No status found for message '{message.summary}'")
get_pubsub_name(message) staticmethod

Get the channel for status updates about the message.

Parameters:

Name Type Description Default
message Message

an Alsek message

required

Returns:

Name Type Description
name string

the channel for the status information

Source code in alsek/core/status.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
@staticmethod
def get_pubsub_name(message: Message) -> str:
    """Get the channel for status updates about the message.

    Args:
        message (Message): an Alsek message

    Returns:
        name (string): the channel for the status information

    """
    if not message.queue or not message.task_name or not message.uuid:
        raise ValidationError("Required attributes not set for message")
    return f"channel:{message.queue}:{message.task_name}:{message.uuid}"
get_storage_name(message) staticmethod

Get the key for the status information about the message

Parameters:

Name Type Description Default
message Message

an Alsek message

required

Returns:

Name Type Description
name string

the key for the status information

Source code in alsek/core/status.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
@staticmethod
def get_storage_name(message: Message) -> str:
    """Get the key for the status information about the message

    Args:
        message (Message): an Alsek message

    Returns:
        name (string): the key for the status information

    """
    if not message.queue or not message.task_name or not message.uuid:
        raise ValidationError("Required attributes not set for message")
    return f"status:{message.queue}:{message.task_name}:{message.uuid}"
listen_to_updates(message, auto_exit=True)

Listen to PUBSUB updates for message.

Parameters:

Name Type Description Default
message Message

an Alsek message

required
auto_exit bool

if True stop listening if a terminal status for the task is encountered (succeeded or failed).

True

Returns:

Name Type Description
stream Iterable[StatusUpdate]

A stream of updates from the pubsub channel

Source code in alsek/core/status.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def listen_to_updates(
    self,
    message: Message,
    auto_exit: bool = True,
) -> Iterable[StatusUpdate]:
    """Listen to PUBSUB updates for ``message``.

    Args:
        message (Message): an Alsek message
        auto_exit (bool): if ``True`` stop listening if a terminal status for the
            task is encountered (succeeded or failed).

    Returns:
        stream (Iterable[StatusUpdate]): A stream of updates from the pubsub channel

    """
    if not self.enable_pubsub:
        raise ValueError("PUBSUB not enabled")

    for i in self.backend.sub(self.get_pubsub_name(message)):
        if i.get("type", "").lower() == "message":
            update = StatusUpdate(
                status=TaskStatus[i["data"]["status"]],  # noqa
                details=i["data"]["details"],  # noqa
            )
            yield update
            if auto_exit and update.status in TERMINAL_TASK_STATUSES:
                break
publish_update(message, update)

Publish a PUBSUB update for a message.

Parameters:

Name Type Description Default
message Message

an Alsek message

required
update StatusUpdate

a status to publish

required

Returns:

Type Description
None

None

Source code in alsek/core/status.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def publish_update(self, message: Message, update: StatusUpdate) -> None:
    """Publish a PUBSUB update for a message.

    Args:
        message (Message): an Alsek message
        update (StatusUpdate): a status to publish

    Returns:
        None

    """
    self.backend.pub(
        self.get_pubsub_name(message),
        value=update.as_dict(),  # converting to dict makes this serializer-agnostic
    )
set(message, status, details=None)

Set a status for message.

Parameters:

Name Type Description Default
message Message

an Alsek message

required
status TaskStatus

a status to set

required
details Any

additional information about the status (e.g., progress percentage)

None

Returns:

Type Description
None

None

Source code in alsek/core/status.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def set(
    self,
    message: Message,
    status: TaskStatus,
    details: Optional[Any] = None,
) -> None:
    """Set a ``status`` for ``message``.

    Args:
        message (Message): an Alsek message
        status (TaskStatus): a status to set
        details (Any, optional): additional information about the status (e.g., progress percentage)

    Returns:
        None

    """
    update = StatusUpdate(status=status, details=details)
    self.backend.set(
        self.get_storage_name(message),
        value=update.as_dict(),
        ttl=self.ttl if status == TaskStatus.SUBMITTED else None,
    )
    if self.enable_pubsub:
        self.publish_update(message, update=update)
wait_for(message, status, timeout=5.0, poll_interval=0.05)

Wait for a message to reach a desired status.

Parameters:

Name Type Description Default
message Message

the message to monitor

required
status TaskStatus, tuple[TaskStatus...], list[TaskStatus]

the target status

required
timeout float

max time to wait (in seconds). None means wait forever.

5.0
poll_interval float

how often to check (in seconds)

0.05

Returns:

Name Type Description
bool bool

True if desired status reached, False if timed out

Source code in alsek/core/status.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def wait_for(
    self,
    message: Message,
    status: TaskStatus | tuple[TaskStatus, ...] | list[TaskStatus],
    timeout: Optional[float] = 5.0,
    poll_interval: float = 0.05,
) -> bool:
    """Wait for a message to reach a desired status.

    Args:
        message (Message): the message to monitor
        status (TaskStatus, tuple[TaskStatus...], list[TaskStatus]): the target status
        timeout (float, optional): max time to wait (in seconds). None means wait forever.
        poll_interval (float): how often to check (in seconds)

    Returns:
        bool: True if desired status reached, False if timed out
    """
    if not isinstance(status, TaskStatus) and not isinstance(status, (list, tuple)):
        raise ValueError(f"Invalid status type: {type(status)}")

    def is_match(current_status: TaskStatus) -> bool:
        if isinstance(status, TaskStatus):
            return current_status == status
        elif isinstance(status, (list, tuple)):
            return current_status in status
        else:
            raise ValueError(f"Invalid status type: {type(status)}")

    deadline = None if timeout is None else time.time() + timeout
    while True:
        try:
            if is_match(self.get(message).status):
                return True
        except KeyError:
            pass
        if deadline is not None and time.time() > deadline:
            return False
        time.sleep(poll_interval)
StatusTrackerIntegryScanner

Tool to ensure the integrity of statuses scanning a StatusTracker() with non-terminal statuses (i.e., TaskStatus.FAILED or TaskStatus.SUCCEEDED) that no longer exist in the broker. Entries which meet this criteria will have their status set to TaskStatus.UNKNOWN.

Parameters:

Name Type Description Default
status_tracker StatusTracker

status tracker to scan for messages with non-terminal status

required
trigger (CronTrigger, DateTrigger, IntervalTrigger)

trigger which determines how often to perform the scan.

IntervalTrigger(hours=1)
Source code in alsek/core/status.py
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
class StatusTrackerIntegryScanner:
    """Tool to ensure the integrity of statuses scanning a ``StatusTracker()``
    with non-terminal statuses (i.e., ``TaskStatus.FAILED`` or ``TaskStatus.SUCCEEDED``)
    that no longer exist in the broker. Entries which meet this criteria will have
    their status set to ``TaskStatus.UNKNOWN``.

    Args:
        status_tracker (StatusTracker): status tracker to scan for messages with non-terminal status
        trigger (CronTrigger, DateTrigger, IntervalTrigger, optional):
            trigger which determines how often to perform the scan.

    """

    def __init__(
        self,
        status_tracker: StatusTracker,
        broker: Broker,
        trigger: Union[CronTrigger, DateTrigger, IntervalTrigger] = IntervalTrigger(hours=1),  # fmt: skip
    ) -> None:
        self.status_tracker = status_tracker
        self.broker = broker
        self.trigger = trigger

        self.scheduler: BackgroundScheduler = BackgroundScheduler()
        if trigger:
            self.scheduler.start()
            self.scheduler.add_job(
                self.scan,
                trigger=trigger,
                id="integrity_scan",
            )

    def scan(self) -> None:
        """Run the integrity scan.

        Returns:
            None

        """
        for name in self.status_tracker.backend.scan("status*"):
            message = _name2message(name)
            status = self.status_tracker.get(message).status
            if (
                status is not None
                and status not in TERMINAL_TASK_STATUSES
                and not self.broker.exists(message)
            ):
                self.status_tracker.set(message, status=TaskStatus.UNKNOWN)
scan()

Run the integrity scan.

Returns:

Type Description
None

None

Source code in alsek/core/status.py
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
def scan(self) -> None:
    """Run the integrity scan.

    Returns:
        None

    """
    for name in self.status_tracker.backend.scan("status*"):
        message = _name2message(name)
        status = self.status_tracker.get(message).status
        if (
            status is not None
            and status not in TERMINAL_TASK_STATUSES
            and not self.broker.exists(message)
        ):
            self.status_tracker.set(message, status=TaskStatus.UNKNOWN)
StatusUpdate

Bases: NamedTuple

Status information.

Source code in alsek/core/status.py
40
41
42
43
44
45
46
47
48
49
50
class StatusUpdate(NamedTuple):
    """Status information."""

    status: TaskStatus
    details: Optional[Any]

    def as_dict(self) -> dict[str, Any]:
        return dict(
            status=self.status.name,
            details=self.details,
        )
TaskStatus

Bases: Enum

Alsek task statuses.

Source code in alsek/core/status.py
26
27
28
29
30
31
32
33
34
class TaskStatus(Enum):
    """Alsek task statuses."""

    UNKNOWN = 0
    SUBMITTED = 1
    RUNNING = 2
    RETRYING = 3
    FAILED = 4
    SUCCEEDED = 5

task

Task

Task

Alsek Task.

Parameters:

Name Type Description Default
function callable

function to use for the main operation

required
broker Broker

an Alsek broker

required
name str

the name of the task. If None, the class name will be used.

None
queue str

the name of the queue to generate the task on. If None, the default queue will be used.

None
timeout int

the maximum amount of time (in milliseconds) this task is permitted to run.

DEFAULT_TASK_TIMEOUT
max_retries int

maximum number of allowed retries

DEFAULT_MAX_RETRIES
backoff Backoff

backoff algorithm and parameters to use when computing delay between retries

ExponentialBackoff()
result_store ResultStore

store for persisting task results

None
status_tracker StatusTracker

store for persisting task statuses

None
mechanism SupportedMechanismType

mechanism for executing the task. Must be either "process" or "thread".

DEFAULT_MECHANISM
no_positional_args bool

if True, the task will not accept positional arguments.

False
Notes
  • do_retry() can be overridden in cases where max_retries is not sufficiently complex to determine if a retry should occur.
Warning
  • Timeouts are not supported for mechanism='thread' on Python implementations other than CPython.
Source code in alsek/core/task.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
class Task:
    """Alsek Task.

    Args:
        function (callable): function to use for the main operation
        broker (Broker): an Alsek broker
        name (str, optional): the name of the task. If ``None``,
            the class name will be used.
        queue (str, optional): the name of the queue to generate the task on.
            If ``None``, the default queue will be used.
        timeout (int): the maximum amount of time (in milliseconds)
            this task is permitted to run.
        max_retries (int, optional): maximum number of allowed retries
        backoff (Backoff, optional): backoff algorithm and parameters to use when computing
            delay between retries
        result_store (ResultStore, optional): store for persisting task results
        status_tracker (StatusTracker, optional): store for persisting task statuses
        mechanism (SupportedMechanismType): mechanism for executing the task. Must
            be either "process" or "thread".
        no_positional_args (bool): if ``True``, the task will not accept positional arguments.

    Notes:
        * ``do_retry()`` can be overridden in cases where ``max_retries``
           is not sufficiently complex to determine if a retry should occur.

    Warning:
        * Timeouts are not supported for ``mechanism='thread'`` on Python
          implementations other than CPython.

    """

    def __init__(
        self,
        function: Callable[..., Any],
        broker: Broker,
        name: Optional[str] = None,
        queue: Optional[str] = None,
        timeout: int = DEFAULT_TASK_TIMEOUT,
        max_retries: Optional[int] = DEFAULT_MAX_RETRIES,
        backoff: Optional[Backoff] = ExponentialBackoff(),
        result_store: Optional[ResultStore] = None,
        status_tracker: Optional[StatusTracker] = None,
        mechanism: SupportedMechanismType = DEFAULT_MECHANISM,
        no_positional_args: bool = False,
    ) -> None:
        self.function = function
        self.broker = broker
        self.queue = queue or DEFAULT_QUEUE
        self.timeout = timeout
        self._name = name
        self.max_retries = max_retries
        self.backoff = backoff or ConstantBackoff(
            constant=0,
            floor=0,
            ceiling=0,
            zero_override=True,
        )
        self.result_store = result_store
        self.status_tracker = status_tracker
        self.mechanism = mechanism
        self.no_positional_args = no_positional_args

        if mechanism not in SUPPORTED_MECHANISMS:
            raise ValueError(f"Unsupported mechanism '{mechanism}'")

        self._deferred: bool = False

    def serialize(self) -> dict[str, Any]:
        settings = gather_init_params(self, ignore=("broker",))

        # Broker
        settings["broker"] = gather_init_params(self.broker, ignore=("backend",))
        settings["broker"]["backend"] = self.broker.backend.encode()

        # Status Tracker
        if self.status_tracker:
            settings["status_tracker"] = self.status_tracker.serialize()

        # Result Store
        if self.result_store:
            settings["result_store"] = self.result_store.serialize()

        return dict(task=self.__class__, settings=settings)

    @staticmethod
    def deserialize(data: dict[str, Any]) -> Task:
        def unwind_settings(settings: dict[str, Any]) -> dict[str, Any]:
            backend_data = dill.loads(settings["broker"]["backend"])

            # Broker
            settings["broker"]["backend"] = backend_data["backend"]._from_settings(
                backend_data["settings"]
            )
            settings["broker"] = Broker(**settings["broker"])

            # Status Tracker
            if status_tracker_settings := settings.get("status_tracker"):
                settings["status_tracker"] = StatusTracker.deserialize(status_tracker_settings)  # fmt: skip

            # Result Store
            if result_store_settings := settings.get("result_store"):
                settings["result_store"] = ResultStore.deserialize(
                    result_store_settings
                )

            return settings

        rebuilt_task = data["task"](**unwind_settings(data["settings"]))
        return cast(Task, rebuilt_task)

    @property
    def name(self) -> str:
        """Name of the task."""
        return self._name if self._name else self.function.__name__

    def __repr__(self) -> str:
        return auto_repr(
            self,
            function=self.function,
            broker=self.broker,
            name=self.name,
            queue=self.queue,
            max_retries=self.max_retries,
            backoff=self.backoff,
            mechanism=self.mechanism,
            deferred_mode="enabled" if self._deferred else "disabled",
        )

    def __call__(self, *args: Any, **kwargs: Any) -> Any:
        return self.function(*args, **kwargs)

    def update_status(self, message: Message, status: TaskStatus) -> None:
        """Update the status of a message.

        Args:
            message (Message): message to update the status of.
            status (TaskStatus): status to update the message to.

        Returns:
            None

        """
        if self.status_tracker:
            log.debug(f"Setting status of '{message.uuid}' to {status}...")
            self.status_tracker.set(message, status=status)

    @property
    def deferred(self) -> bool:
        """Whether deferred mode is currently enabled."""
        return self._deferred

    def defer(self) -> Task:
        """Enter "deferred" mode.

        Do not submit the next message created by ``generate()``
        to the broker.

        Returns:
            task (Task): the current task

        Warning:
            * Deferred mode is automatically cancelled by ``generate()``
              prior to it returning.

        """
        self._deferred = True
        return self

    def cancel_defer(self) -> Task:
        """Cancel "deferred" mode.

        Returns:
            task (Task): the current task

        """
        self._deferred = False
        return self

    def on_submit(self, message: Message) -> None:
        """Handles the action to be performed when a message is submitted.
        This method processes  the provided message and executes the required
        behavior upon submission.

        Args:
            message (Message): The message object submitted for processing.

        Returns:
            None

        """

    def _submit(self, message: Message, **options: Any) -> None:
        self.broker.submit(message, **options)

    def generate(
        self,
        args: Optional[Union[list[Any], tuple[Any, ...]]] = None,
        kwargs: Optional[dict[Any, Any]] = None,
        priority: int = 0,
        metadata: Optional[dict[Any, Any]] = None,
        result_ttl: Optional[int] = None,
        uuid: Optional[str] = None,
        timeout_override: Optional[int] = None,
        delay: Optional[int] = None,
        previous_result: Any = None,
        callback: Optional[Union[Message, tuple[Message, ...]]] = None,
        queue: Optional[str] = None,
        submit: bool = True,
        **options: Any,
    ) -> Message:
        """Generate an instance of the task for processing.

        This method generates a new message for the task and
        submit it to the broker.

        Args:
            args (list, tuple, optional): positional arguments to pass to ``function``
            kwargs (dict, optional): keyword arguments to pass to ``function``
            priority (int): priority of the message within the task.
                Messages with lower values will be executed before messages with higher values.
            metadata (dict, optional): a dictionary of user-defined message metadata.
                This can store any data types supported by the backend's serializer.
            result_ttl (int, optional): time to live (in milliseconds) for the
                result in the result store. If a result store is provided and
            this parameter is ``None``, the result will be persisted indefinitely.
            uuid (str, optional): universal unique identifier for the message.
                If ``None``, one will be generated automatically.
            timeout_override (int, optional): override the default maximum runtime
                (in milliseconds) for instances of this task.
            set maximum amount of time (in milliseconds)
                this message is permitted to run. This will override the default
                for the task.
            delay (int, optional): delay before message is ready (in milliseconds)
            previous_result (Any): result of a previous task.
            callback (Message, tuple[Message, ...], optional): one or more messages
                to be submitted to the broker after the proceeding message has been
                successfully processed by a worker.
            queue (str, optional): queue to use for the task. If none, the default
                queue for this task will be used.
            submit (bool): if ``True`` submit the task to the broker
            options (Keyword Args): options to use when submitting
                the message via the broker. See ``Broker.submit()``.

        Returns:
            message (Message): message generated for the task

        Warning:
            * ``submit`` is overridden to ``False`` if deferred mode is active
            * ``uuid`` is refreshed after the first event when using a trigger.
            * If manually overriding ``queue`` such that it differs from the default
              for this task, Worker Pools built using ``task_specific_mode=True`` will
              fail acknowledge its existence.

        """
        if result_ttl and not self.result_store:
            raise ValidationError(f"`result_ttl` invalid. No result store set.")
        elif args and self.no_positional_args:
            raise ValidationError(f"Task does not accept positional arguments.")
        elif not isinstance(priority, int) or priority < 0:
            raise ValueError("`priority` must be an int greater than or equal to zero")

        message = Message(
            task_name=self.name,
            queue=queue or self.queue,
            args=args,
            kwargs=kwargs,
            priority=priority,
            metadata=metadata,
            result_ttl=result_ttl,
            uuid=uuid,
            timeout=timeout_override or self.timeout,
            delay=delay,
            previous_result=previous_result,
            callback_message_data=_parse_callback(callback).data if callback else None,
            backoff_settings=self.backoff.settings,
            mechanism=self.mechanism,
        )
        if self._deferred:
            self.cancel_defer()
        elif submit:
            self._submit(message, **options)
            self.update_status(message, status=TaskStatus.SUBMITTED)
            self.on_submit(message)
        return message

    def pre_op(self, message: Message) -> None:
        """Operation to perform before running ``op``.

        Args:
            message (Message): message ``op`` will be run against.

        Returns:
            None

        """

    def op(self, message: Message) -> Any:
        """Pass ``message`` data to ``function`` for processing.

        Args:
            message (Message): message to perform the operation against

        Returns:
            result (Any): output of the function

        Notes:
            * If, and only if, the signature of ``function`` contains
              a ``message`` parameter, ``message`` itself will be passed
              along with any ``args`` and ``kwargs`` contained in the message.

        Warning:
            * ``message`` will not be passed in cases where a "message"
              exists in ``message.kwargs``.

        """
        if "message" not in message.kwargs and _expects_message(self.function):
            return self.function(*message.args, **message.kwargs, message=message)
        else:
            return self.function(*message.args, **message.kwargs)

    def post_op(self, message: Message, result: Any) -> None:
        """Operation to perform after running ``op``.

        Args:
            message (Message): message ``op()`` was run against
            result (Any): output of ``op()``

        Returns:
            None

        """

    def on_revocation(
        self, message: Message, exception: Optional[BaseException], result: Any
    ) -> None:
        """Handles the event when a message is revoked and logs the associated exception.

        Args:
            message: The message instance that was revoked.
            exception: The exception instance that represents the reason for the
                revocation.
            result (Any): The result of the revoked operation. This is only provided
                if the task succeeds

        Returns:
            None

        """

    def on_retry(self, message: Message, exception: BaseException) -> None:
        """Handles the retry logic when a processing failure occurs.

        Args:
            message: The message object that failed during processing and is
                subject to a retry attempt.
            exception: The exception instance that was raised during the
                failure of processing the message.

        Returns:
            None

        """

    def on_failure(self, message: Message, exception: BaseException) -> None:
        """
        Handles the actions to be performed when an operation fails.

        Args:
            message: The message object containing the details of the failed
                operation.
            exception: The exception object associated with the failure, providing
                additional context or details about what caused the failure.

        Returns:
            None

        """

    def on_success(self, message: Message, result: Any) -> None:
        """Handles successful outcomes of an operation by processing the given message
        and corresponding result.

        Args:
            message: An instance of the Message class that contains relevant information
                about the operation.
            result: The result of the completed operation, which can be of any type.

        Returns:
            None

        """

    def execute(self, message: Message) -> Any:
        """Execute the task against a message.

        Args:
            message (Message): message to process

        Returns:
            result (Any): output of ``op()``

        """
        return self.op(message)

    def do_retry(self, message: Message, exception: BaseException) -> bool:  # noqa
        """Whether a failed task should be retried.

        Args:
            message (Message): message which failed
            exception (BaseException): the exception which was raised

        Returns:
            bool

        """
        if self.is_revoked(message):
            return False
        return self.max_retries is None or message.retries < self.max_retries

    def do_callback(self, message: Message, result: Any) -> bool:  # noqa
        """Whether or to submit the callback provided.

        Args:
            message (Message): message with the callback
            result (Any): output of ``op()``

        Returns:
            bool

        Warning:
            * If the task message does not have a callback this
              method will *not* be invoked.

        """
        return True

    @staticmethod
    def _make_revoked_key_name(message: Message) -> str:
        return f"revoked:{get_message_name(message)}"

    @magic_logger(
        before=lambda message: log.info("Revoking %s...", message.summary),
        after=lambda input_: log.debug("Revoked %s.", input_["message"].summary),
    )
    def revoke(self, message: Message, skip_if_running: bool = False) -> None:
        """Revoke the task.

        Args:
            message (Message): message to revoke
            skip_if_running (bool): if ``True``, skip revoking the task if it is currently RUNNING.
                Notes: requires ``status_tracker`` to be set.

        Returns:
            None

        """
        log.info("Revoking %s...", message.summary)
        if (
            self.status_tracker
            and self.status_tracker.get(message).status in TERMINAL_TASK_STATUSES
        ):
            log.info("Message is already terminal: %s", message.summary)
            return
        elif skip_if_running and not self.status_tracker:
            raise AttributeError("`skip_if_running` requires `status_tracker` to be set")  # fmt: skip
        elif skip_if_running and (
            (status_update := self.status_tracker.get(message))
            and status_update.status == TaskStatus.RUNNING
        ):
            log.info("Message is currently running: %s", message.summary)
            return

        self.broker.backend.set(
            self._make_revoked_key_name(message),
            value=True,
            ttl=DEFAULT_TTL,
        )
        message.update(
            exception_details=ExceptionDetails(
                name=get_exception_name(RevokedError),
                text="Task Revoked",
                traceback=None,
            ).as_dict()
        )
        self.update_status(message, status=TaskStatus.FAILED)
        self.broker.fail(message)

    def is_revoked(self, message: Message) -> bool:
        """Check if a message is revoked.

        Args:
            message (Message): an Alsek message

        Returns:
            None

        """
        if self.broker.backend.get(self._make_revoked_key_name(message)):
            return True
        else:
            return False
deferred property

Whether deferred mode is currently enabled.

name property

Name of the task.

cancel_defer()

Cancel "deferred" mode.

Returns:

Name Type Description
task Task

the current task

Source code in alsek/core/task.py
277
278
279
280
281
282
283
284
285
def cancel_defer(self) -> Task:
    """Cancel "deferred" mode.

    Returns:
        task (Task): the current task

    """
    self._deferred = False
    return self
defer()

Enter "deferred" mode.

Do not submit the next message created by generate() to the broker.

Returns:

Name Type Description
task Task

the current task

Warning
  • Deferred mode is automatically cancelled by generate() prior to it returning.
Source code in alsek/core/task.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def defer(self) -> Task:
    """Enter "deferred" mode.

    Do not submit the next message created by ``generate()``
    to the broker.

    Returns:
        task (Task): the current task

    Warning:
        * Deferred mode is automatically cancelled by ``generate()``
          prior to it returning.

    """
    self._deferred = True
    return self
do_callback(message, result)

Whether or to submit the callback provided.

Parameters:

Name Type Description Default
message Message

message with the callback

required
result Any

output of op()

required

Returns:

Type Description
bool

bool

Warning
  • If the task message does not have a callback this method will not be invoked.
Source code in alsek/core/task.py
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
def do_callback(self, message: Message, result: Any) -> bool:  # noqa
    """Whether or to submit the callback provided.

    Args:
        message (Message): message with the callback
        result (Any): output of ``op()``

    Returns:
        bool

    Warning:
        * If the task message does not have a callback this
          method will *not* be invoked.

    """
    return True
do_retry(message, exception)

Whether a failed task should be retried.

Parameters:

Name Type Description Default
message Message

message which failed

required
exception BaseException

the exception which was raised

required

Returns:

Type Description
bool

bool

Source code in alsek/core/task.py
513
514
515
516
517
518
519
520
521
522
523
524
525
526
def do_retry(self, message: Message, exception: BaseException) -> bool:  # noqa
    """Whether a failed task should be retried.

    Args:
        message (Message): message which failed
        exception (BaseException): the exception which was raised

    Returns:
        bool

    """
    if self.is_revoked(message):
        return False
    return self.max_retries is None or message.retries < self.max_retries
execute(message)

Execute the task against a message.

Parameters:

Name Type Description Default
message Message

message to process

required

Returns:

Name Type Description
result Any

output of op()

Source code in alsek/core/task.py
501
502
503
504
505
506
507
508
509
510
511
def execute(self, message: Message) -> Any:
    """Execute the task against a message.

    Args:
        message (Message): message to process

    Returns:
        result (Any): output of ``op()``

    """
    return self.op(message)
generate(args=None, kwargs=None, priority=0, metadata=None, result_ttl=None, uuid=None, timeout_override=None, delay=None, previous_result=None, callback=None, queue=None, submit=True, **options)

Generate an instance of the task for processing.

This method generates a new message for the task and submit it to the broker.

Parameters:

Name Type Description Default
args (list, tuple)

positional arguments to pass to function

None
kwargs dict

keyword arguments to pass to function

None
priority int

priority of the message within the task. Messages with lower values will be executed before messages with higher values.

0
metadata dict

a dictionary of user-defined message metadata. This can store any data types supported by the backend's serializer.

None
result_ttl int

time to live (in milliseconds) for the result in the result store. If a result store is provided and

None
uuid str

universal unique identifier for the message. If None, one will be generated automatically.

None
timeout_override int

override the default maximum runtime (in milliseconds) for instances of this task.

None
delay int

delay before message is ready (in milliseconds)

None
previous_result Any

result of a previous task.

None
callback (Message, tuple[Message, ...])

one or more messages to be submitted to the broker after the proceeding message has been successfully processed by a worker.

None
queue str

queue to use for the task. If none, the default queue for this task will be used.

None
submit bool

if True submit the task to the broker

True
options Keyword Args

options to use when submitting the message via the broker. See Broker.submit().

{}

Returns:

Name Type Description
message Message

message generated for the task

Warning
  • submit is overridden to False if deferred mode is active
  • uuid is refreshed after the first event when using a trigger.
  • If manually overriding queue such that it differs from the default for this task, Worker Pools built using task_specific_mode=True will fail acknowledge its existence.
Source code in alsek/core/task.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
def generate(
    self,
    args: Optional[Union[list[Any], tuple[Any, ...]]] = None,
    kwargs: Optional[dict[Any, Any]] = None,
    priority: int = 0,
    metadata: Optional[dict[Any, Any]] = None,
    result_ttl: Optional[int] = None,
    uuid: Optional[str] = None,
    timeout_override: Optional[int] = None,
    delay: Optional[int] = None,
    previous_result: Any = None,
    callback: Optional[Union[Message, tuple[Message, ...]]] = None,
    queue: Optional[str] = None,
    submit: bool = True,
    **options: Any,
) -> Message:
    """Generate an instance of the task for processing.

    This method generates a new message for the task and
    submit it to the broker.

    Args:
        args (list, tuple, optional): positional arguments to pass to ``function``
        kwargs (dict, optional): keyword arguments to pass to ``function``
        priority (int): priority of the message within the task.
            Messages with lower values will be executed before messages with higher values.
        metadata (dict, optional): a dictionary of user-defined message metadata.
            This can store any data types supported by the backend's serializer.
        result_ttl (int, optional): time to live (in milliseconds) for the
            result in the result store. If a result store is provided and
        this parameter is ``None``, the result will be persisted indefinitely.
        uuid (str, optional): universal unique identifier for the message.
            If ``None``, one will be generated automatically.
        timeout_override (int, optional): override the default maximum runtime
            (in milliseconds) for instances of this task.
        set maximum amount of time (in milliseconds)
            this message is permitted to run. This will override the default
            for the task.
        delay (int, optional): delay before message is ready (in milliseconds)
        previous_result (Any): result of a previous task.
        callback (Message, tuple[Message, ...], optional): one or more messages
            to be submitted to the broker after the proceeding message has been
            successfully processed by a worker.
        queue (str, optional): queue to use for the task. If none, the default
            queue for this task will be used.
        submit (bool): if ``True`` submit the task to the broker
        options (Keyword Args): options to use when submitting
            the message via the broker. See ``Broker.submit()``.

    Returns:
        message (Message): message generated for the task

    Warning:
        * ``submit`` is overridden to ``False`` if deferred mode is active
        * ``uuid`` is refreshed after the first event when using a trigger.
        * If manually overriding ``queue`` such that it differs from the default
          for this task, Worker Pools built using ``task_specific_mode=True`` will
          fail acknowledge its existence.

    """
    if result_ttl and not self.result_store:
        raise ValidationError(f"`result_ttl` invalid. No result store set.")
    elif args and self.no_positional_args:
        raise ValidationError(f"Task does not accept positional arguments.")
    elif not isinstance(priority, int) or priority < 0:
        raise ValueError("`priority` must be an int greater than or equal to zero")

    message = Message(
        task_name=self.name,
        queue=queue or self.queue,
        args=args,
        kwargs=kwargs,
        priority=priority,
        metadata=metadata,
        result_ttl=result_ttl,
        uuid=uuid,
        timeout=timeout_override or self.timeout,
        delay=delay,
        previous_result=previous_result,
        callback_message_data=_parse_callback(callback).data if callback else None,
        backoff_settings=self.backoff.settings,
        mechanism=self.mechanism,
    )
    if self._deferred:
        self.cancel_defer()
    elif submit:
        self._submit(message, **options)
        self.update_status(message, status=TaskStatus.SUBMITTED)
        self.on_submit(message)
    return message
is_revoked(message)

Check if a message is revoked.

Parameters:

Name Type Description Default
message Message

an Alsek message

required

Returns:

Type Description
bool

None

Source code in alsek/core/task.py
596
597
598
599
600
601
602
603
604
605
606
607
608
609
def is_revoked(self, message: Message) -> bool:
    """Check if a message is revoked.

    Args:
        message (Message): an Alsek message

    Returns:
        None

    """
    if self.broker.backend.get(self._make_revoked_key_name(message)):
        return True
    else:
        return False
on_failure(message, exception)

Handles the actions to be performed when an operation fails.

Parameters:

Name Type Description Default
message Message

The message object containing the details of the failed operation.

required
exception BaseException

The exception object associated with the failure, providing additional context or details about what caused the failure.

required

Returns:

Type Description
None

None

Source code in alsek/core/task.py
472
473
474
475
476
477
478
479
480
481
482
483
484
485
def on_failure(self, message: Message, exception: BaseException) -> None:
    """
    Handles the actions to be performed when an operation fails.

    Args:
        message: The message object containing the details of the failed
            operation.
        exception: The exception object associated with the failure, providing
            additional context or details about what caused the failure.

    Returns:
        None

    """
on_retry(message, exception)

Handles the retry logic when a processing failure occurs.

Parameters:

Name Type Description Default
message Message

The message object that failed during processing and is subject to a retry attempt.

required
exception BaseException

The exception instance that was raised during the failure of processing the message.

required

Returns:

Type Description
None

None

Source code in alsek/core/task.py
458
459
460
461
462
463
464
465
466
467
468
469
470
def on_retry(self, message: Message, exception: BaseException) -> None:
    """Handles the retry logic when a processing failure occurs.

    Args:
        message: The message object that failed during processing and is
            subject to a retry attempt.
        exception: The exception instance that was raised during the
            failure of processing the message.

    Returns:
        None

    """
on_revocation(message, exception, result)

Handles the event when a message is revoked and logs the associated exception.

Parameters:

Name Type Description Default
message Message

The message instance that was revoked.

required
exception Optional[BaseException]

The exception instance that represents the reason for the revocation.

required
result Any

The result of the revoked operation. This is only provided if the task succeeds

required

Returns:

Type Description
None

None

Source code in alsek/core/task.py
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
def on_revocation(
    self, message: Message, exception: Optional[BaseException], result: Any
) -> None:
    """Handles the event when a message is revoked and logs the associated exception.

    Args:
        message: The message instance that was revoked.
        exception: The exception instance that represents the reason for the
            revocation.
        result (Any): The result of the revoked operation. This is only provided
            if the task succeeds

    Returns:
        None

    """
on_submit(message)

Handles the action to be performed when a message is submitted. This method processes the provided message and executes the required behavior upon submission.

Parameters:

Name Type Description Default
message Message

The message object submitted for processing.

required

Returns:

Type Description
None

None

Source code in alsek/core/task.py
287
288
289
290
291
292
293
294
295
296
297
298
def on_submit(self, message: Message) -> None:
    """Handles the action to be performed when a message is submitted.
    This method processes  the provided message and executes the required
    behavior upon submission.

    Args:
        message (Message): The message object submitted for processing.

    Returns:
        None

    """
on_success(message, result)

Handles successful outcomes of an operation by processing the given message and corresponding result.

Parameters:

Name Type Description Default
message Message

An instance of the Message class that contains relevant information about the operation.

required
result Any

The result of the completed operation, which can be of any type.

required

Returns:

Type Description
None

None

Source code in alsek/core/task.py
487
488
489
490
491
492
493
494
495
496
497
498
499
def on_success(self, message: Message, result: Any) -> None:
    """Handles successful outcomes of an operation by processing the given message
    and corresponding result.

    Args:
        message: An instance of the Message class that contains relevant information
            about the operation.
        result: The result of the completed operation, which can be of any type.

    Returns:
        None

    """
op(message)

Pass message data to function for processing.

Parameters:

Name Type Description Default
message Message

message to perform the operation against

required

Returns:

Name Type Description
result Any

output of the function

Notes
  • If, and only if, the signature of function contains a message parameter, message itself will be passed along with any args and kwargs contained in the message.
Warning
  • message will not be passed in cases where a "message" exists in message.kwargs.
Source code in alsek/core/task.py
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
def op(self, message: Message) -> Any:
    """Pass ``message`` data to ``function`` for processing.

    Args:
        message (Message): message to perform the operation against

    Returns:
        result (Any): output of the function

    Notes:
        * If, and only if, the signature of ``function`` contains
          a ``message`` parameter, ``message`` itself will be passed
          along with any ``args`` and ``kwargs`` contained in the message.

    Warning:
        * ``message`` will not be passed in cases where a "message"
          exists in ``message.kwargs``.

    """
    if "message" not in message.kwargs and _expects_message(self.function):
        return self.function(*message.args, **message.kwargs, message=message)
    else:
        return self.function(*message.args, **message.kwargs)
post_op(message, result)

Operation to perform after running op.

Parameters:

Name Type Description Default
message Message

message op() was run against

required
result Any

output of op()

required

Returns:

Type Description
None

None

Source code in alsek/core/task.py
429
430
431
432
433
434
435
436
437
438
439
def post_op(self, message: Message, result: Any) -> None:
    """Operation to perform after running ``op``.

    Args:
        message (Message): message ``op()`` was run against
        result (Any): output of ``op()``

    Returns:
        None

    """
pre_op(message)

Operation to perform before running op.

Parameters:

Name Type Description Default
message Message

message op will be run against.

required

Returns:

Type Description
None

None

Source code in alsek/core/task.py
394
395
396
397
398
399
400
401
402
403
def pre_op(self, message: Message) -> None:
    """Operation to perform before running ``op``.

    Args:
        message (Message): message ``op`` will be run against.

    Returns:
        None

    """
revoke(message, skip_if_running=False)

Revoke the task.

Parameters:

Name Type Description Default
message Message

message to revoke

required
skip_if_running bool

if True, skip revoking the task if it is currently RUNNING. Notes: requires status_tracker to be set.

False

Returns:

Type Description
None

None

Source code in alsek/core/task.py
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
@magic_logger(
    before=lambda message: log.info("Revoking %s...", message.summary),
    after=lambda input_: log.debug("Revoked %s.", input_["message"].summary),
)
def revoke(self, message: Message, skip_if_running: bool = False) -> None:
    """Revoke the task.

    Args:
        message (Message): message to revoke
        skip_if_running (bool): if ``True``, skip revoking the task if it is currently RUNNING.
            Notes: requires ``status_tracker`` to be set.

    Returns:
        None

    """
    log.info("Revoking %s...", message.summary)
    if (
        self.status_tracker
        and self.status_tracker.get(message).status in TERMINAL_TASK_STATUSES
    ):
        log.info("Message is already terminal: %s", message.summary)
        return
    elif skip_if_running and not self.status_tracker:
        raise AttributeError("`skip_if_running` requires `status_tracker` to be set")  # fmt: skip
    elif skip_if_running and (
        (status_update := self.status_tracker.get(message))
        and status_update.status == TaskStatus.RUNNING
    ):
        log.info("Message is currently running: %s", message.summary)
        return

    self.broker.backend.set(
        self._make_revoked_key_name(message),
        value=True,
        ttl=DEFAULT_TTL,
    )
    message.update(
        exception_details=ExceptionDetails(
            name=get_exception_name(RevokedError),
            text="Task Revoked",
            traceback=None,
        ).as_dict()
    )
    self.update_status(message, status=TaskStatus.FAILED)
    self.broker.fail(message)
update_status(message, status)

Update the status of a message.

Parameters:

Name Type Description Default
message Message

message to update the status of.

required
status TaskStatus

status to update the message to.

required

Returns:

Type Description
None

None

Source code in alsek/core/task.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
def update_status(self, message: Message, status: TaskStatus) -> None:
    """Update the status of a message.

    Args:
        message (Message): message to update the status of.
        status (TaskStatus): status to update the message to.

    Returns:
        None

    """
    if self.status_tracker:
        log.debug(f"Setting status of '{message.uuid}' to {status}...")
        self.status_tracker.set(message, status=status)
TriggerTask

Bases: Task

Triggered Task.

Parameters:

Name Type Description Default
function callable

function to use for the main operation

required
trigger (CronTrigger, DateTrigger, IntervalTrigger)

trigger for task execution.

required
broker Broker

an Alsek broker

required
name str

the name of the task. If None, the class name will be used.

None
queue str

the name of the queue to generate the task on. If None, the default queue will be used.

None
timeout int

the maximum amount of time (in milliseconds) this task is permitted to run.

DEFAULT_TASK_TIMEOUT
max_retries int

maximum number of allowed retries

DEFAULT_MAX_RETRIES
backoff Backoff

backoff algorithm and parameters to use when computing delay between retries

ExponentialBackoff()
result_store ResultStore

store for persisting task results

None
status_tracker StatusTracker

store for persisting task statuses

None
mechanism SupportedMechanismType

mechanism for executing the task. Must be either "process" or "thread".

DEFAULT_MECHANISM
no_positional_args bool

if True, the task will not accept positional arguments.

False

Raises:

Type Description
* ``SchedulingError``

if the signature of function includes parameters.

Source code in alsek/core/task.py
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
class TriggerTask(Task):
    """Triggered Task.

    Args:
        function (callable): function to use for the main operation
        trigger (CronTrigger, DateTrigger, IntervalTrigger): trigger
            for task execution.
        broker (Broker): an Alsek broker
        name (str, optional): the name of the task. If ``None``,
            the class name will be used.
        queue (str, optional): the name of the queue to generate the task on.
            If ``None``, the default queue will be used.
        timeout (int): the maximum amount of time (in milliseconds)
            this task is permitted to run.
        max_retries (int, optional): maximum number of allowed retries
        backoff (Backoff, optional): backoff algorithm and parameters to use when computing
            delay between retries
        result_store (ResultStore, optional): store for persisting task results
        status_tracker (StatusTracker, optional): store for persisting task statuses
        mechanism (SupportedMechanismType): mechanism for executing the task. Must
            be either "process" or "thread".
        no_positional_args (bool): if ``True``, the task will not accept positional arguments.

    Warnings:
        * The signature of ``function`` cannot contain parameters

    Raises:
        * ``SchedulingError``: if the signature of ``function`` includes
            parameters.

    """

    def __init__(
        self,
        function: Callable[..., Any],
        trigger: Union[CronTrigger, DateTrigger, IntervalTrigger],
        broker: Broker,
        name: Optional[str] = None,
        queue: Optional[str] = None,
        timeout: int = DEFAULT_TASK_TIMEOUT,
        max_retries: Optional[int] = DEFAULT_MAX_RETRIES,
        backoff: Optional[Backoff] = ExponentialBackoff(),
        result_store: Optional[ResultStore] = None,
        status_tracker: Optional[StatusTracker] = None,
        mechanism: SupportedMechanismType = DEFAULT_MECHANISM,
        no_positional_args: bool = False,
    ) -> None:
        if inspect.signature(function).parameters:
            raise SchedulingError("Function signature cannot includes parameters")
        super().__init__(
            function=function,
            broker=broker,
            name=name,
            queue=queue,
            timeout=timeout,
            max_retries=max_retries,
            backoff=backoff,
            result_store=result_store,
            status_tracker=status_tracker,
            mechanism=mechanism,
            no_positional_args=no_positional_args,
        )
        self.trigger = trigger

        self.scheduler = BackgroundScheduler()
        self.scheduler.start()

    def serialize(self) -> dict[str, Any]:
        serialized_task = super().serialize()
        serialized_task["settings"]["trigger"] = self.trigger
        return serialized_task

    @property
    def _job(self) -> Optional[Job]:
        return self.scheduler.get_job(self.name)

    def _submit(self, message: Message, **options: Any) -> None:
        if self._job:
            raise SchedulingError("Task already scheduled")

        self.scheduler.add_job(
            _MultiSubmit(
                message=message,
                broker=self.broker,
                on_submit=self.on_submit,
                callback_op=partial(self.update_status, status=TaskStatus.SUBMITTED),
                options=options,
            ),
            trigger=self.trigger,
            id=self.name,
        )

    @property
    def generated(self) -> bool:
        """If the task has been generated."""
        return bool(self._job)

    def clear(self) -> None:
        """Clear the currently scheduled task.

        Returns:
            None

        Raises
            * AttributeError: if a task has not yet
                been generated

        """
        try:
            self.scheduler.remove_job(self.name)
        except JobLookupError:
            raise AttributeError("Task not generated")

    def pause(self) -> None:
        """Pause the underlying scheduler.

        Returns:
            None

        """
        self.scheduler.pause()

    def resume(self) -> None:
        """Resume the underlying scheduler.

        Returns:
            None

        """
        self.scheduler.resume()

    def shutdown(self) -> None:
        """Shutdown the underlying scheduler.

        Returns:
            None

        """
        self.scheduler.shutdown()
generated property

If the task has been generated.

clear()

Clear the currently scheduled task.

Returns:

Type Description
None

None

Raises * AttributeError: if a task has not yet been generated

Source code in alsek/core/task.py
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
def clear(self) -> None:
    """Clear the currently scheduled task.

    Returns:
        None

    Raises
        * AttributeError: if a task has not yet
            been generated

    """
    try:
        self.scheduler.remove_job(self.name)
    except JobLookupError:
        raise AttributeError("Task not generated")
pause()

Pause the underlying scheduler.

Returns:

Type Description
None

None

Source code in alsek/core/task.py
725
726
727
728
729
730
731
732
def pause(self) -> None:
    """Pause the underlying scheduler.

    Returns:
        None

    """
    self.scheduler.pause()
resume()

Resume the underlying scheduler.

Returns:

Type Description
None

None

Source code in alsek/core/task.py
734
735
736
737
738
739
740
741
def resume(self) -> None:
    """Resume the underlying scheduler.

    Returns:
        None

    """
    self.scheduler.resume()
shutdown()

Shutdown the underlying scheduler.

Returns:

Type Description
None

None

Source code in alsek/core/task.py
743
744
745
746
747
748
749
750
def shutdown(self) -> None:
    """Shutdown the underlying scheduler.

    Returns:
        None

    """
    self.scheduler.shutdown()
task(broker, name=None, queue=None, timeout=DEFAULT_TASK_TIMEOUT, max_retries=DEFAULT_MAX_RETRIES, backoff=ExponentialBackoff(), trigger=None, result_store=None, status_tracker=None, mechanism=DEFAULT_MECHANISM, no_positional_args=False, base_task=None)

Wrapper for task construction.

Parameters:

Name Type Description Default
broker Broker

an Alsek broker

required
name str

the name of the task. If None, the class name will be used.

None
queue str

the name of the queue to generate the task on. If None, the default queue will be used.

None
timeout int

the maximum amount of time (in milliseconds) this task is permitted to run.

DEFAULT_TASK_TIMEOUT
max_retries int

maximum number of allowed retries

DEFAULT_MAX_RETRIES
backoff Backoff

backoff algorithm and parameters to use when computing delay between retries

ExponentialBackoff()
trigger (CronTrigger, DateTrigger, IntervalTrigger)

trigger for task execution.

None
result_store ResultStore

store for persisting task results

None
status_tracker StatusTracker

store for persisting task statuses

None
mechanism SupportedMechanismType

mechanism for executing the task. Must be either "process" or "thread".

DEFAULT_MECHANISM
no_positional_args bool

if True, the task will not accept positional arguments.

False
base_task Type[Task]

base to use for task constuction. If None, a base task will be selected automatically.

None

Returns:

Name Type Description
wrapper callable

task-wrapped function

Raises:

Type Description
* ValueError

if a trigger and not supported by base_task

Examples:

>>> from alsek import Broker, task
>>> from alsek.storage.backends.redis.standard import RedisBackend
>>> backend = RedisBackend()
>>> broker = Broker(backend)
>>> @task(broker)
... def add(a: int, b: int) -> int:
...     return a + b
Source code in alsek/core/task.py
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
def task(
    broker: Broker,
    name: Optional[str] = None,
    queue: Optional[str] = None,
    timeout: int = DEFAULT_TASK_TIMEOUT,
    max_retries: Optional[int] = DEFAULT_MAX_RETRIES,
    backoff: Optional[Backoff] = ExponentialBackoff(),
    trigger: Optional[Union[CronTrigger, DateTrigger, IntervalTrigger]] = None,
    result_store: Optional[ResultStore] = None,
    status_tracker: Optional[StatusTracker] = None,
    mechanism: SupportedMechanismType = DEFAULT_MECHANISM,
    no_positional_args: bool = False,
    base_task: Optional[Type[Task]] = None,
) -> Callable[..., Task]:
    """Wrapper for task construction.

    Args:
        broker (Broker): an Alsek broker
        name (str, optional): the name of the task. If ``None``,
            the class name will be used.
        queue (str, optional): the name of the queue to generate the task on.
            If ``None``, the default queue will be used.
        timeout (int): the maximum amount of time (in milliseconds)
            this task is permitted to run.
        max_retries (int, optional): maximum number of allowed retries
        backoff (Backoff, optional): backoff algorithm and parameters to use when computing
            delay between retries
        trigger (CronTrigger, DateTrigger, IntervalTrigger, optional): trigger
            for task execution.
        result_store (ResultStore, optional): store for persisting task results
        status_tracker (StatusTracker, optional): store for persisting task statuses
        mechanism (SupportedMechanismType): mechanism for executing the task. Must
            be either "process" or "thread".
        no_positional_args (bool): if ``True``, the task will not accept positional arguments.
        base_task (Type[Task]): base to use for task constuction.
            If ``None``, a base task will be selected automatically.

    Returns:
        wrapper (callable): task-wrapped function

    Raises:
        * ValueError: if a ``trigger`` and not supported by ``base_task``

    Examples:
        >>> from alsek import Broker, task
        >>> from alsek.storage.backends.redis.standard import RedisBackend

        >>> backend = RedisBackend()
        >>> broker = Broker(backend)

        >>> @task(broker)
        ... def add(a: int, b: int) -> int:
        ...     return a + b

    """
    parsed_base_task = _parse_base_task(base_task, trigger=trigger)
    base_task_signature = inspect.signature(parsed_base_task.__init__)

    if trigger and "trigger" not in base_task_signature.parameters:
        raise ValueError(f"Trigger not supported by {parsed_base_task}")

    def wrapper(function: Callable[..., Any]) -> Task:
        return parsed_base_task(  # type: ignore
            function=function,
            name=name,
            broker=broker,
            queue=queue,
            timeout=timeout,
            max_retries=max_retries,
            backoff=backoff,
            mechanism=mechanism,
            no_positional_args=no_positional_args,
            result_store=result_store,
            status_tracker=status_tracker,
            **(  # noqa (handled above)
                dict(trigger=trigger)
                if "trigger" in base_task_signature.parameters
                else dict()
            ),
        )

    return wrapper

worker

process

Process Worker Pool

ProcessWorkerPool

Bases: BaseWorkerPool

Fixed-size pool that runs each task in its own forked process (via ProcessTaskFuture).

Parameters:

Name Type Description Default
n_processes int

Maximum number of live ProcessTaskFutures.

None
prune_interval int

Number of milliseconds between background runs of a scan to prune spent futures.

100
Source code in alsek/core/worker/process.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
class ProcessWorkerPool(BaseWorkerPool):
    """Fixed-size pool that runs each task in its *own* forked process
    (via `ProcessTaskFuture`).

    Args:
        n_processes (int): Maximum number of live `ProcessTaskFuture`s.
        prune_interval (int): Number of milliseconds between background
            runs of a scan to prune spent futures.

    """

    def __init__(
        self,
        n_processes: Optional[int] = None,
        prune_interval: int = 100,
        **kwargs: Any,
    ) -> None:
        super().__init__(mechanism="process", **kwargs)
        self.n_processes = n_processes or smart_cpu_count()
        self.prune_interval = prune_interval

        self._futures: List[ProcessTaskFuture] = list()
        self._shutdown_event = threading.Event()

        # Use a simple daemon thread instead of APScheduler
        self._prune_thread = threading.Thread(
            target=self._prune_loop,
            daemon=True,
            name="ProcessPool-Pruner",
        )
        self._prune_thread.start()

    def _prune_loop(self) -> None:
        """Background thread that periodically prunes futures."""
        while not self._shutdown_event.is_set():
            try:
                self.prune()
            except Exception as e:
                log.exception("Error during future pruning: %s", e)

            # Sleep until next interval or shutdown
            self._shutdown_event.wait(self.prune_interval / 1000)

    def on_boot(self) -> None:
        log.info(
            "Starting process-based worker pool with up to %s workers (%s max process%s)...",
            self.n_processes,
            self.n_processes,
            "" if self.n_processes == 1 else "es",
        )
        super().on_boot()

    def has_slot(self) -> bool:
        return len(self._futures) < self.n_processes

    def prune(self) -> None:
        """Prune spent futures."""
        kept: list[ProcessTaskFuture] = list()
        for f in self._futures:
            if f.time_limit_exceeded:
                f.stop(TimeoutError)
                f.clean_up(ignore_errors=True)
            elif not f.complete:
                kept.append(f)
        self._futures = kept

    def on_shutdown(self) -> None:
        """Terminate everything that is still alive."""
        self._shutdown_event.set()

        for f in self._futures:
            if not f.complete:
                f.stop(TerminationError)
                f.clean_up(ignore_errors=True)
        self._futures.clear()

    def submit_message(self, message: Message) -> bool:
        """Submit a single message"""
        submitted = False
        if self.has_slot():
            self._futures.append(
                ProcessTaskFuture(
                    task=self._task_map[message.task_name],
                    message=message,
                )
            )
            submitted = True
        return submitted
on_shutdown()

Terminate everything that is still alive.

Source code in alsek/core/worker/process.py
88
89
90
91
92
93
94
95
96
def on_shutdown(self) -> None:
    """Terminate everything that is still alive."""
    self._shutdown_event.set()

    for f in self._futures:
        if not f.complete:
            f.stop(TerminationError)
            f.clean_up(ignore_errors=True)
    self._futures.clear()
prune()

Prune spent futures.

Source code in alsek/core/worker/process.py
77
78
79
80
81
82
83
84
85
86
def prune(self) -> None:
    """Prune spent futures."""
    kept: list[ProcessTaskFuture] = list()
    for f in self._futures:
        if f.time_limit_exceeded:
            f.stop(TimeoutError)
            f.clean_up(ignore_errors=True)
        elif not f.complete:
            kept.append(f)
    self._futures = kept
submit_message(message)

Submit a single message

Source code in alsek/core/worker/process.py
 98
 99
100
101
102
103
104
105
106
107
108
109
def submit_message(self, message: Message) -> bool:
    """Submit a single message"""
    submitted = False
    if self.has_slot():
        self._futures.append(
            ProcessTaskFuture(
                task=self._task_map[message.task_name],
                message=message,
            )
        )
        submitted = True
    return submitted
thread

Thread Worker Pool

ProcessGroup
Source code in alsek/core/worker/thread.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
class ProcessGroup:
    def __init__(
        self,
        n_threads: int,
        complete_only_on_thread_exit: bool,
        slot_wait_interval_seconds: float,
    ) -> None:
        self._n_threads = n_threads
        self.complete_only_on_thread_exit = complete_only_on_thread_exit
        self.slot_wait_interval_seconds = slot_wait_interval_seconds

        self.queue: Queue = Queue(maxsize=n_threads)
        self.shutdown_event: Event = Event()
        self.process = Process(
            target=_start_thread_worker,
            args=(
                self.queue,
                self.shutdown_event,
                n_threads,
                slot_wait_interval_seconds,
                get_logger().level,
            ),
            daemon=True,
        )
        self.process.start()

    def has_slot(self) -> bool:
        return not self.queue.full()

    def submit(self, task: Task, message: Message) -> bool:
        payload = (
            task.serialize(),
            message.data,
            self.complete_only_on_thread_exit,
        )
        try:
            self.queue.put(dill.dumps(payload), block=False)
            return True
        except queue.Full:
            return False

    def stop(self, timeout: int | float = 2) -> None:
        """Stop the group of threads in this process group.

        Args:
            timeout (int, float): the time to wait in seconds

        Returns:
            None

        """
        # 1. Signal
        self.shutdown_event.set()
        # 2. Wait a bit for graceful exit
        self.process.join(timeout=timeout)
        # 3. Hard kill if still alive
        if self.process.is_alive():
            self.process.kill()
stop(timeout=2)

Stop the group of threads in this process group.

Parameters:

Name Type Description Default
timeout (int, float)

the time to wait in seconds

2

Returns:

Type Description
None

None

Source code in alsek/core/worker/thread.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def stop(self, timeout: int | float = 2) -> None:
    """Stop the group of threads in this process group.

    Args:
        timeout (int, float): the time to wait in seconds

    Returns:
        None

    """
    # 1. Signal
    self.shutdown_event.set()
    # 2. Wait a bit for graceful exit
    self.process.join(timeout=timeout)
    # 3. Hard kill if still alive
    if self.process.is_alive():
        self.process.kill()
ThreadWorkerPool

Bases: BaseWorkerPool

Elastic thread-based pool.

Parameters:

Name Type Description Default
n_threads int

the number of threads to use per group.

8
n_processes int

the number of process groups to use

None
n_process_floor int

the minimum number of processes to have active at any given time, regardless of load.

1
complete_only_on_thread_exit bool

if True, only mark the future as complete when the thread formally exits (i.e., is not alive). Pro: more rigorous — avoids marking the task complete until the thread fully terminates. Useful when you need strict control over thread lifecycle (e.g., for resource management). Con: may lead to hanging if the thread doesn’t terminate quickly (e.g., when using thread_raise() during revocation). This can also temporarily result in more than the allotted number of threads running, because it entails treating a thread as expired regardless of its actual status.

False
**kwargs Keyword Args

Keyword arguments to pass to BaseWorkerPool().

{}
Notes
  • Spawns a new process (ThreadProcessGroup) only when all existing groups are saturated and the hard ceiling n_processes hasn’t been hit.
  • Each group runs up to n_threads true ThreadTaskFutures concurrently.
  • Total worker capacity is n_threads * n_processes.
Source code in alsek/core/worker/thread.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
class ThreadWorkerPool(BaseWorkerPool):
    """Elastic thread-based pool.

    Args:
        n_threads (int): the number of threads to use per group.
        n_processes (int, optional): the number of process groups to use
        n_process_floor (int): the minimum number of processes to have active
            at any given time, regardless of load.
        complete_only_on_thread_exit (bool): if ``True``, only mark the future
            as complete when the thread formally exits (i.e., is not alive).
            Pro: more rigorous — avoids marking the task complete until the thread fully terminates.
            Useful when you need strict control over thread lifecycle (e.g., for resource management).
            Con: may lead to hanging if the thread doesn’t terminate quickly (e.g., when using
            `thread_raise()` during revocation). This can also temporarily result in more than the
            allotted number of threads running, because it entails treating a thread as
            expired regardless of its actual status.
        **kwargs (Keyword Args): Keyword arguments to pass to ``BaseWorkerPool()``.

    Notes:
        * Spawns a new **process** (ThreadProcessGroup) only when all existing
          groups are saturated and the hard ceiling `n_processes` hasn’t been hit.
        * Each group runs up to `n_threads` true ThreadTaskFutures concurrently.
        * Total worker capacity is ``n_threads * n_processes``.

    """

    def __init__(
        self,
        n_threads: int = 8,
        n_processes: Optional[int] = None,
        n_process_floor: int = 1,
        complete_only_on_thread_exit: bool = False,
        **kwargs: Any,
    ) -> None:
        super().__init__(mechanism="thread", **kwargs)
        self.n_threads = n_threads
        self.n_processes = n_processes or smart_cpu_count()
        self.n_process_floor = n_process_floor
        self.complete_only_on_thread_exit = complete_only_on_thread_exit

        if self.n_threads <= 0:
            raise ValueError(f"n_threads must be > 0")
        elif self.n_processes <= 0:
            raise ValueError(f"n_processes must be > 0")
        if self.n_process_floor > self.n_processes:
            raise ValueError(f"n_process_floor must be <= n_processes.")

        self._progress_groups: List[ProcessGroup] = list()

    def _make_new_process_group(self) -> Optional[ProcessGroup]:
        if (
            self.stop_signal.exit_event.is_set()
            or len(self._progress_groups) >= self.n_processes
        ):
            return None

        log.debug("Starting new process group...")
        new_process_group = ProcessGroup(
            n_threads=self.n_threads,
            complete_only_on_thread_exit=self.complete_only_on_thread_exit,
            slot_wait_interval_seconds=self._slot_wait_interval_seconds,
        )
        self._progress_groups.append(new_process_group)
        return new_process_group

    def on_boot(self) -> None:
        log.info(
            "Starting thread-based worker pool with up to %s workers (%s max thread%s and %s max process%s)...",
            self.n_threads * self.n_processes,
            self.n_threads,
            "" if self.n_threads == 1 else "s",
            self.n_processes,
            "" if self.n_processes == 1 else "es",
        )
        if self.n_process_floor:
            log.info(
                "Provisioning initial collection of %s processes...",
                self.n_process_floor,
            )
            for _ in range(self.n_process_floor):
                self._make_new_process_group()
        super().on_boot()

    def on_shutdown(self) -> None:
        """Stop all futures in the pool."""
        for g in self._progress_groups:
            g.stop()

    def prune(self) -> None:
        """Prune exited process groups, but only enforce floor if NOT shutting down."""
        # 1. Filter out any groups whose processes have exited
        self._progress_groups = [
            g
            for g in self._progress_groups
            if g.process.join(timeout=0) or g.process.is_alive()
        ]

        # 2. Otherwise, ensure at least n_process_floor
        missing = max(0, self.n_process_floor - len(self._progress_groups))
        for _ in range(missing):
            self._make_new_process_group()

    def _acquire_group(self) -> Optional[ProcessGroup]:
        for g in self._progress_groups:
            if g.has_slot():
                return g

        if len(self._progress_groups) < self.n_processes:
            return self._make_new_process_group()
        return None

    def submit_message(self, message: Message) -> bool:
        """Submit a single message"""
        submitted = False
        if group := self._acquire_group():  # we have a slot → run it
            submitted = group.submit(
                task=self._task_map[message.task_name],
                message=message,
            )
        return submitted
on_shutdown()

Stop all futures in the pool.

Source code in alsek/core/worker/thread.py
272
273
274
275
def on_shutdown(self) -> None:
    """Stop all futures in the pool."""
    for g in self._progress_groups:
        g.stop()
prune()

Prune exited process groups, but only enforce floor if NOT shutting down.

Source code in alsek/core/worker/thread.py
277
278
279
280
281
282
283
284
285
286
287
288
289
def prune(self) -> None:
    """Prune exited process groups, but only enforce floor if NOT shutting down."""
    # 1. Filter out any groups whose processes have exited
    self._progress_groups = [
        g
        for g in self._progress_groups
        if g.process.join(timeout=0) or g.process.is_alive()
    ]

    # 2. Otherwise, ensure at least n_process_floor
    missing = max(0, self.n_process_floor - len(self._progress_groups))
    for _ in range(missing):
        self._make_new_process_group()
submit_message(message)

Submit a single message

Source code in alsek/core/worker/thread.py
300
301
302
303
304
305
306
307
308
def submit_message(self, message: Message) -> bool:
    """Submit a single message"""
    submitted = False
    if group := self._acquire_group():  # we have a slot → run it
        submitted = group.submit(
            task=self._task_map[message.task_name],
            message=message,
        )
    return submitted
ThreadsInProcessGroup

• Runs inside a forked process • Accepts work items via Queue • Spawns at most n_threads ThreadTaskFutures concurrently

Source code in alsek/core/worker/thread.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
class ThreadsInProcessGroup:
    """
    • Runs inside a forked process
    • Accepts work items via `Queue`
    • Spawns at most `n_threads` ThreadTaskFutures concurrently
    """

    def __init__(
        self,
        q: Queue,
        shutdown_event: Event,
        n_threads: int,
        slot_wait_interval_seconds: float,
        log_level: int = logging.INFO,
    ) -> None:
        self.q = q
        self.shutdown_event = shutdown_event
        self.n_threads = n_threads
        self.slot_wait_interval_seconds = slot_wait_interval_seconds
        self.log_level = log_level

        setup_logging(self.log_level)

        self._live: list[ThreadTaskFuture] = list()

    def _prune(self) -> None:
        kept: list[ThreadTaskFuture] = list()
        for f in self._live:
            if f.time_limit_exceeded:
                f.stop(TimeoutError)
                f.clean_up(ignore_errors=True)
            elif not f.complete:
                kept.append(f)
        self._live = kept

    def _has_capacity(self) -> bool:
        return len(self._live) < self.n_threads

    def _spawn_future(self, payload: bytes) -> None:
        task_dict, msg_dict, exit_flag = dill.loads(payload)
        self._live.append(
            ThreadTaskFuture(
                task=Task.deserialize(task_dict),
                message=Message(**msg_dict),
                complete_only_on_thread_exit=exit_flag,
            )
        )

    def _stop_all_live_futures(self) -> None:
        for f in self._live:
            if not f.complete:
                f.stop(TerminationError)
                f.clean_up(ignore_errors=True)

    @suppress_exception(
        KeyboardInterrupt,
        on_suppress=lambda error: log.info("Keyboard Interrupt Detected"),
    )
    def run(self) -> None:
        try:
            while not self.shutdown_event.is_set():
                # 1. reap finished / timed-out futures
                self._prune()

                # 2. Throttle if thread slots are full
                if not self._has_capacity():
                    # Wait *either* for a slot OR the shutdown flag
                    self.shutdown_event.wait(self.slot_wait_interval_seconds)
                    continue

                # 3. Try to pull one unit of work
                try:
                    payload = self.q.get(timeout=self.slot_wait_interval_seconds)
                except queue.Empty:
                    continue

                # 4. Launch a new ThreadTaskFuture
                self._spawn_future(payload)
        finally:
            self._stop_all_live_futures()

defaults

Defaults

exceptions

Exceptions

AlsekError

Bases: Exception

Base Alsek error.

Source code in alsek/exceptions.py
8
9
class AlsekError(Exception):
    """Base Alsek error."""

MessageAlreadyExistsError

Bases: AlsekError

Message already exists in backend.

Source code in alsek/exceptions.py
12
13
class MessageAlreadyExistsError(AlsekError):
    """Message already exists in backend."""

MessageDoesNotExistsError

Bases: AlsekError

Message does not exist in backend.

Source code in alsek/exceptions.py
16
17
class MessageDoesNotExistsError(AlsekError):
    """Message does not exist in backend."""

MultipleBrokersError

Bases: AlsekError

Multiple brokers in use.

Source code in alsek/exceptions.py
20
21
class MultipleBrokersError(AlsekError):
    """Multiple brokers in use."""

NoTasksFoundError

Bases: AlsekError

No tasks found.

Source code in alsek/exceptions.py
24
25
class NoTasksFoundError(AlsekError):
    """No tasks found."""

RevokedError

Bases: AlsekError

Alsek task revoked error.

Source code in alsek/exceptions.py
44
45
class RevokedError(AlsekError):
    """Alsek task revoked error."""

SchedulingError

Bases: AlsekError

Error scheduling work.

Source code in alsek/exceptions.py
32
33
class SchedulingError(AlsekError):
    """Error scheduling work."""

TaskNameCollisionError

Bases: AlsekError

Duplicate task detected.

Source code in alsek/exceptions.py
36
37
class TaskNameCollisionError(AlsekError):
    """Duplicate task detected."""

TerminationError

Bases: AlsekError

Alsek termination error.

Source code in alsek/exceptions.py
40
41
class TerminationError(AlsekError):
    """Alsek termination error."""

ValidationError

Bases: AlsekError

Data validation failed.

Source code in alsek/exceptions.py
28
29
class ValidationError(AlsekError):
    """Data validation failed."""

storage

Storage

backends

Backend

BaseBackend

Bases: ABC

Backend base class.

Parameters:

Name Type Description Default
namespace str

prefix to use when inserting names in the backend

DEFAULT_NAMESPACE
serializer Serializer

tool for encoding and decoding values written into the backend.

JsonSerializer()
Source code in alsek/storage/backends/__init__.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
class BaseBackend(ABC):
    """Backend base class.

    Args:
        namespace (str): prefix to use when inserting
            names in the backend
        serializer (Serializer): tool for encoding and decoding
            values written into the backend.

    """

    IS_ASYNC: bool = False
    SUPPORTS_PUBSUB: bool = False

    def __init__(
        self,
        namespace: str = DEFAULT_NAMESPACE,
        serializer: Serializer = JsonSerializer(),
    ) -> None:
        self.namespace = namespace
        self.serializer = serializer

    def __repr__(self) -> str:
        return auto_repr(
            self,
            namespace=self.namespace,
            serializer=self.serializer,
        )

    def encode(self) -> bytes:
        data = dict(backend=self.__class__, settings=gather_init_params(self))
        return cast(bytes, dill.dumps(data))

    @classmethod
    def _from_settings(cls, settings: dict[str, Any]) -> Backend:
        return cls(**settings)

    def in_namespace(self, name: str) -> bool:
        """Determine if ``name`` belong to the current namespace.

        Args:
            name (str): a name (key)

        Returns:
            bool

        Warning:
            * ``name`` should be a complete (i.e., _full_) name.

        """
        return name.startswith(f"{self.namespace}:")

    def full_name(self, name: str) -> str:
        """Get an item's complete name, including the namespace
        in which it exists.

        Args:
            name (str): the name of an item

        Returns:
            full_name (str): a name of the form ``"{NAMESPACE}:{name}"``

        Notes:
            * If ``name`` is already the full name, this method
              will collapse to a no-op.

        """
        if name.startswith(f"{self.namespace}:"):
            return name
        else:
            return f"{self.namespace}:{name}"

    def short_name(self, name: str) -> str:
        """Get an item's short name, without the namespace
        in which it exists.

        Args:
            name (str): the full name of an item

        Returns:
            short_name (str): ``name`` without the namespace prefix

        Notes:
            * If ``name`` is already the short name, this method
              will collapse to a no-op.

        """
        return re.sub(rf"^{self.namespace}:", repl="", string=name)

    @abstractmethod
    def exists(self, name: str) -> bool:
        """Check if ``name`` exists in the backend.

        Args:
            name (str): name of the item

        Returns:
            bool

        """
        raise NotImplementedError()

    @abstractmethod
    def set(
        self,
        name: str,
        value: Any,
        nx: bool = False,
        ttl: Optional[int] = None,
    ) -> None:
        """Set ``name`` to ``value`` in the backend.

        Args:
            name (str): name of the item
            value (Any): value to set for ``name``
            nx (bool): if ``True`` the item must not exist prior to being set
            ttl (int, optional): time to live for the entry in milliseconds

        Returns:
            None

        Raises:
            KeyError: if ``nx`` is ``True`` and ``name`` already exists

        """
        raise NotImplementedError()

    @abstractmethod
    def get(self, name: str, default: Optional[Union[Any, Type[Empty]]] = None) -> Any:
        """Get ``name`` from the backend.

        Args:
            name (str): name of the item
            default (Any, Type[Empty], optional): default value for ``name``

        Returns:
            Any

        """
        raise NotImplementedError()

    @abstractmethod
    def delete(self, name: str, missing_ok: bool = False) -> None:
        """Delete a ``name`` from the backend.

        Args:
            name (str): name of the item
            missing_ok (bool): if ``True``, do not raise for missing

        Returns:
            None

        Raises:
            KeyError: if ``missing_ok`` is ``False`` and ``name`` is not found.

        """
        raise NotImplementedError()

    @abstractmethod
    def priority_add(self, key: str, unique_id: str, priority: int | float) -> None:
        """Add an item to a priority-sorted set.

        Args:
            key (str): The name of the sorted set.
            unique_id (str): The item's (Message's) unique identifier
            priority (float): The numeric priority score (decide if lower or higher means higher priority).

        Returns:
            None

        """
        raise NotImplementedError()

    @abstractmethod
    def priority_get(self, key: str) -> Optional[str]:
        """
        Get (peek) the highest-priority item without removing it.

        Args:
            key (str): The name of the sorted set.

        Returns:
            item (str, optional): The member with the highest priority, or None if empty.

        """
        raise NotImplementedError()

    def priority_iter(self, key: str) -> Iterable[str]:
        """Iterate over the items in a priority-sorted set.

        Args:
            key (str): The name of the sorted set.

        Returns:
            priority (Iterable[str]): An iterable of members in the sorted set, sorted by priority.

        """
        raise NotImplementedError()

    @abstractmethod
    def priority_remove(self, key: str, unique_id: str) -> None:
        """Remove an item from a priority-sorted set.

        Args:
            key (str): The name of the sorted set.
            unique_id (str): The item's (Message's) unique identifier

        Returns:
            None

        """
        raise NotImplementedError()

    def pub(self, channel: str, value: Any) -> None:
        """Publish to a channel.

        Args:
            channel (str): channel name
            value (Any): value to publish

        Returns:
            None

        """
        raise NotImplementedError()

    def sub(self, channel: str) -> Iterable[str | dict[str, Any]]:
        """Subscribe to a channel.

        Args:
            channel (str): channel name

        Returns:
            Iterable[str | dict[str, Any]]

        """
        raise NotImplementedError()

    @abstractmethod
    def scan(self, pattern: Optional[str] = None) -> Iterable[str]:
        """Scan the backend for matching names.

        Args:
            pattern (str, optional): pattern to limit search to

        Returns:
            matches_stream (Iterable[str]): a stream of matching name

        """
        raise NotImplementedError()

    @abstractmethod
    def count(self, pattern: Optional[str] = None) -> int:
        """Count the number of items in the backend.

        Args:
            pattern (str, optional): pattern to limit count to

        Returns:
            count (int): number of matching names

        """
        raise NotImplementedError()

    @abstractmethod
    def clear_namespace(self, raise_on_error: bool = True) -> int:
        """Clear all items in backend under the current namespace.

        Args:
             raise_on_error (bool): raise if a delete operation fails

        Returns:
            count (int): number of items cleared

        Raises:
            KeyError: if ``raise_on_error`` and a delete operation fails

        """
        raise NotImplementedError()
clear_namespace(raise_on_error=True) abstractmethod

Clear all items in backend under the current namespace.

Parameters:

Name Type Description Default
raise_on_error bool

raise if a delete operation fails

True

Returns:

Name Type Description
count int

number of items cleared

Raises:

Type Description
KeyError

if raise_on_error and a delete operation fails

Source code in alsek/storage/backends/__init__.py
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
@abstractmethod
def clear_namespace(self, raise_on_error: bool = True) -> int:
    """Clear all items in backend under the current namespace.

    Args:
         raise_on_error (bool): raise if a delete operation fails

    Returns:
        count (int): number of items cleared

    Raises:
        KeyError: if ``raise_on_error`` and a delete operation fails

    """
    raise NotImplementedError()
count(pattern=None) abstractmethod

Count the number of items in the backend.

Parameters:

Name Type Description Default
pattern str

pattern to limit count to

None

Returns:

Name Type Description
count int

number of matching names

Source code in alsek/storage/backends/__init__.py
300
301
302
303
304
305
306
307
308
309
310
311
@abstractmethod
def count(self, pattern: Optional[str] = None) -> int:
    """Count the number of items in the backend.

    Args:
        pattern (str, optional): pattern to limit count to

    Returns:
        count (int): number of matching names

    """
    raise NotImplementedError()
delete(name, missing_ok=False) abstractmethod

Delete a name from the backend.

Parameters:

Name Type Description Default
name str

name of the item

required
missing_ok bool

if True, do not raise for missing

False

Returns:

Type Description
None

None

Raises:

Type Description
KeyError

if missing_ok is False and name is not found.

Source code in alsek/storage/backends/__init__.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
@abstractmethod
def delete(self, name: str, missing_ok: bool = False) -> None:
    """Delete a ``name`` from the backend.

    Args:
        name (str): name of the item
        missing_ok (bool): if ``True``, do not raise for missing

    Returns:
        None

    Raises:
        KeyError: if ``missing_ok`` is ``False`` and ``name`` is not found.

    """
    raise NotImplementedError()
exists(name) abstractmethod

Check if name exists in the backend.

Parameters:

Name Type Description Default
name str

name of the item

required

Returns:

Type Description
bool

bool

Source code in alsek/storage/backends/__init__.py
138
139
140
141
142
143
144
145
146
147
148
149
@abstractmethod
def exists(self, name: str) -> bool:
    """Check if ``name`` exists in the backend.

    Args:
        name (str): name of the item

    Returns:
        bool

    """
    raise NotImplementedError()
full_name(name)

Get an item's complete name, including the namespace in which it exists.

Parameters:

Name Type Description Default
name str

the name of an item

required

Returns:

Name Type Description
full_name str

a name of the form "{NAMESPACE}:{name}"

Notes
  • If name is already the full name, this method will collapse to a no-op.
Source code in alsek/storage/backends/__init__.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def full_name(self, name: str) -> str:
    """Get an item's complete name, including the namespace
    in which it exists.

    Args:
        name (str): the name of an item

    Returns:
        full_name (str): a name of the form ``"{NAMESPACE}:{name}"``

    Notes:
        * If ``name`` is already the full name, this method
          will collapse to a no-op.

    """
    if name.startswith(f"{self.namespace}:"):
        return name
    else:
        return f"{self.namespace}:{name}"
get(name, default=None) abstractmethod

Get name from the backend.

Parameters:

Name Type Description Default
name str

name of the item

required
default (Any, Type[Empty])

default value for name

None

Returns:

Type Description
Any

Any

Source code in alsek/storage/backends/__init__.py
176
177
178
179
180
181
182
183
184
185
186
187
188
@abstractmethod
def get(self, name: str, default: Optional[Union[Any, Type[Empty]]] = None) -> Any:
    """Get ``name`` from the backend.

    Args:
        name (str): name of the item
        default (Any, Type[Empty], optional): default value for ``name``

    Returns:
        Any

    """
    raise NotImplementedError()
in_namespace(name)

Determine if name belong to the current namespace.

Parameters:

Name Type Description Default
name str

a name (key)

required

Returns:

Type Description
bool

bool

Warning
  • name should be a complete (i.e., full) name.
Source code in alsek/storage/backends/__init__.py
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def in_namespace(self, name: str) -> bool:
    """Determine if ``name`` belong to the current namespace.

    Args:
        name (str): a name (key)

    Returns:
        bool

    Warning:
        * ``name`` should be a complete (i.e., _full_) name.

    """
    return name.startswith(f"{self.namespace}:")
priority_add(key, unique_id, priority) abstractmethod

Add an item to a priority-sorted set.

Parameters:

Name Type Description Default
key str

The name of the sorted set.

required
unique_id str

The item's (Message's) unique identifier

required
priority float

The numeric priority score (decide if lower or higher means higher priority).

required

Returns:

Type Description
None

None

Source code in alsek/storage/backends/__init__.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
@abstractmethod
def priority_add(self, key: str, unique_id: str, priority: int | float) -> None:
    """Add an item to a priority-sorted set.

    Args:
        key (str): The name of the sorted set.
        unique_id (str): The item's (Message's) unique identifier
        priority (float): The numeric priority score (decide if lower or higher means higher priority).

    Returns:
        None

    """
    raise NotImplementedError()
priority_get(key) abstractmethod

Get (peek) the highest-priority item without removing it.

Parameters:

Name Type Description Default
key str

The name of the sorted set.

required

Returns:

Name Type Description
item (str, optional)

The member with the highest priority, or None if empty.

Source code in alsek/storage/backends/__init__.py
222
223
224
225
226
227
228
229
230
231
232
233
234
@abstractmethod
def priority_get(self, key: str) -> Optional[str]:
    """
    Get (peek) the highest-priority item without removing it.

    Args:
        key (str): The name of the sorted set.

    Returns:
        item (str, optional): The member with the highest priority, or None if empty.

    """
    raise NotImplementedError()
priority_iter(key)

Iterate over the items in a priority-sorted set.

Parameters:

Name Type Description Default
key str

The name of the sorted set.

required

Returns:

Name Type Description
priority Iterable[str]

An iterable of members in the sorted set, sorted by priority.

Source code in alsek/storage/backends/__init__.py
236
237
238
239
240
241
242
243
244
245
246
def priority_iter(self, key: str) -> Iterable[str]:
    """Iterate over the items in a priority-sorted set.

    Args:
        key (str): The name of the sorted set.

    Returns:
        priority (Iterable[str]): An iterable of members in the sorted set, sorted by priority.

    """
    raise NotImplementedError()
priority_remove(key, unique_id) abstractmethod

Remove an item from a priority-sorted set.

Parameters:

Name Type Description Default
key str

The name of the sorted set.

required
unique_id str

The item's (Message's) unique identifier

required

Returns:

Type Description
None

None

Source code in alsek/storage/backends/__init__.py
248
249
250
251
252
253
254
255
256
257
258
259
260
@abstractmethod
def priority_remove(self, key: str, unique_id: str) -> None:
    """Remove an item from a priority-sorted set.

    Args:
        key (str): The name of the sorted set.
        unique_id (str): The item's (Message's) unique identifier

    Returns:
        None

    """
    raise NotImplementedError()
pub(channel, value)

Publish to a channel.

Parameters:

Name Type Description Default
channel str

channel name

required
value Any

value to publish

required

Returns:

Type Description
None

None

Source code in alsek/storage/backends/__init__.py
262
263
264
265
266
267
268
269
270
271
272
273
def pub(self, channel: str, value: Any) -> None:
    """Publish to a channel.

    Args:
        channel (str): channel name
        value (Any): value to publish

    Returns:
        None

    """
    raise NotImplementedError()
scan(pattern=None) abstractmethod

Scan the backend for matching names.

Parameters:

Name Type Description Default
pattern str

pattern to limit search to

None

Returns:

Name Type Description
matches_stream Iterable[str]

a stream of matching name

Source code in alsek/storage/backends/__init__.py
287
288
289
290
291
292
293
294
295
296
297
298
@abstractmethod
def scan(self, pattern: Optional[str] = None) -> Iterable[str]:
    """Scan the backend for matching names.

    Args:
        pattern (str, optional): pattern to limit search to

    Returns:
        matches_stream (Iterable[str]): a stream of matching name

    """
    raise NotImplementedError()
set(name, value, nx=False, ttl=None) abstractmethod

Set name to value in the backend.

Parameters:

Name Type Description Default
name str

name of the item

required
value Any

value to set for name

required
nx bool

if True the item must not exist prior to being set

False
ttl int

time to live for the entry in milliseconds

None

Returns:

Type Description
None

None

Raises:

Type Description
KeyError

if nx is True and name already exists

Source code in alsek/storage/backends/__init__.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
@abstractmethod
def set(
    self,
    name: str,
    value: Any,
    nx: bool = False,
    ttl: Optional[int] = None,
) -> None:
    """Set ``name`` to ``value`` in the backend.

    Args:
        name (str): name of the item
        value (Any): value to set for ``name``
        nx (bool): if ``True`` the item must not exist prior to being set
        ttl (int, optional): time to live for the entry in milliseconds

    Returns:
        None

    Raises:
        KeyError: if ``nx`` is ``True`` and ``name`` already exists

    """
    raise NotImplementedError()
short_name(name)

Get an item's short name, without the namespace in which it exists.

Parameters:

Name Type Description Default
name str

the full name of an item

required

Returns:

Name Type Description
short_name str

name without the namespace prefix

Notes
  • If name is already the short name, this method will collapse to a no-op.
Source code in alsek/storage/backends/__init__.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def short_name(self, name: str) -> str:
    """Get an item's short name, without the namespace
    in which it exists.

    Args:
        name (str): the full name of an item

    Returns:
        short_name (str): ``name`` without the namespace prefix

    Notes:
        * If ``name`` is already the short name, this method
          will collapse to a no-op.

    """
    return re.sub(rf"^{self.namespace}:", repl="", string=name)
sub(channel)

Subscribe to a channel.

Parameters:

Name Type Description Default
channel str

channel name

required

Returns:

Type Description
Iterable[str | dict[str, Any]]

Iterable[str | dict[str, Any]]

Source code in alsek/storage/backends/__init__.py
275
276
277
278
279
280
281
282
283
284
285
def sub(self, channel: str) -> Iterable[str | dict[str, Any]]:
    """Subscribe to a channel.

    Args:
        channel (str): channel name

    Returns:
        Iterable[str | dict[str, Any]]

    """
    raise NotImplementedError()
LazyClient

Lazy client.

Wrapper for lazy client initialization.

Parameters:

Name Type Description Default
client_func callable

a callable which returns a backend client.

required
Source code in alsek/storage/backends/__init__.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class LazyClient:
    """Lazy client.

    Wrapper for lazy client initialization.

    Args:
        client_func (callable): a callable which returns
            a backend client.

    """

    def __init__(self, client_func: Callable[[], Any]) -> None:
        self.client_func = client_func

    def get(self) -> Any:
        """Execute ``client_func``.

        Returns:
            client (Any): a backend client

        """
        return self.client_func()
get()

Execute client_func.

Returns:

Name Type Description
client Any

a backend client

Source code in alsek/storage/backends/__init__.py
39
40
41
42
43
44
45
46
def get(self) -> Any:
    """Execute ``client_func``.

    Returns:
        client (Any): a backend client

    """
    return self.client_func()
redis

Redis

asyncio

Asynchronous Redis Backend

RedisAsyncBackend

Bases: AsyncBackend

Asynchronous Redis Backend.

This backend is powered by Redis and provides asynchronous support for Redis operations.

Parameters:

Name Type Description Default
conn Optional[Union[str, AsyncRedis, LazyClient]]

A connection URL, an AsyncRedis instance, or a LazyClient. If None, a default AsyncRedis instance is created.

None
**kwargs Any

Additional keyword arguments passed to the base class initializer.

{}
Source code in alsek/storage/backends/redis/asyncio.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
class RedisAsyncBackend(AsyncBackend):
    """Asynchronous Redis Backend.

    This backend is powered by Redis and provides asynchronous support
    for Redis operations.

    Args:
        conn (Optional[Union[str, AsyncRedis, LazyClient]]): A connection URL,
            an `AsyncRedis` instance, or a `LazyClient`. If `None`, a default
            `AsyncRedis` instance is created.
        **kwargs: Additional keyword arguments passed to the base class initializer.

    """

    IS_ASYNC: bool = True

    def __init__(
        self,
        conn: Optional[Union[str, RedisAsync, LazyClient]] = None,
        **kwargs: Any,
    ) -> None:
        super().__init__(**kwargs)
        self._conn = self._conn_parse(conn)

    @staticmethod
    def _conn_parse(
        conn: Optional[Union[str, RedisAsync, LazyClient]]
    ) -> Union[RedisAsync, LazyClient]:
        """Parse the connection parameter to obtain an AsyncRedis or LazyClient instance.

        Args:
            conn (Optional[Union[str, AsyncRedis, LazyClient]]): The connection parameter.

        Returns:
            Union[AsyncRedis, LazyClient]: An `AsyncRedis` instance or a `LazyClient` wrapping one.

        Raises:
            ValueError: If the connection parameter is of an unsupported type.

        """
        if isinstance(conn, LazyClient):
            return conn
        elif conn is None:
            return RedisAsync(decode_responses=True)
        elif isinstance(conn, RedisAsync):
            return conn
        elif isinstance(conn, str):
            return RedisAsync.from_url(conn, decode_responses=True)
        else:
            raise ValueError(f"Unsupported `conn` {conn}")

    @property
    def conn(self) -> RedisAsync:
        """Asynchronous Redis connection.

        Returns:
            AsyncRedis: The asynchronous Redis client instance.

        """
        if isinstance(self._conn, LazyClient):
            self._conn = self._conn.get()
        return self._conn

    def __repr__(self) -> str:
        return auto_repr(
            self,
            conn=self.conn,
            namespace=self.namespace,
            serializer=self.serializer,
        )

    def encode(self) -> bytes:
        data: dict[str, Any] = dict(
            backend=self.__class__,
            settings=gather_init_params(self, ignore=("conn",)),
        )
        data["settings"]["conn"] = dict(
            connection_class=self.conn.connection_pool.connection_class,
            max_connections=self.conn.connection_pool.max_connections,
            connection_kwargs=self.conn.connection_pool.connection_kwargs,
        )
        return dill.dumps(data)

    @classmethod
    def _from_settings(cls, settings: dict[str, Any]) -> RedisAsyncBackend:
        settings["conn"] = RedisAsync(
            connection_pool=AsyncConnectionPool(
                connection_class=settings["conn"]["connection_class"],
                max_connections=settings["conn"]["max_connections"],
                **settings["conn"]["connection_kwargs"],
            )
        )
        return cls(**settings)

    async def exists(self, name: str) -> bool:
        """Check if a key exists in the Redis backend asynchronously.

        Args:
            name (str): The name of the key to check.

        Returns:
            bool: `True` if the key exists, `False` otherwise.

        """
        return await self.conn.exists(self.full_name(name))

    async def set(
        self,
        name: str,
        value: Any,
        nx: bool = False,
        ttl: Optional[int] = None,
    ) -> None:
        """Set a value for a key in the Redis backend asynchronously.

        Args:
            name (str): The name of the key.
            value (Any): The value to set.
            nx (bool, optional): If `True`, only set the key if it does not already exist.
            ttl (Optional[int], optional): Time to live for the key in milliseconds.

        Raises:
            KeyError: If `nx` is `True` and the key already exists.

        """
        response = await self.conn.set(
            self.full_name(name),
            value=self.serializer.forward(value),
            px=ttl,
            nx=nx,
            keepttl=ttl is None,
        )
        if nx and not response:
            raise KeyError(f"Name '{name}' already exists")

    async def get(
        self,
        name: str,
        default: Optional[Union[Any, Type[Empty]]] = Empty,
    ) -> Any:
        """Get the value of a key from the Redis backend asynchronously.

        Args:
            name (str): The name of the key.
            default (Optional[Union[Any, Type[Empty]]], optional): Default value if the key does not exist.

        Returns:
            Any: The value of the key.

        Raises:
            KeyError: If the key does not exist and no default is provided.

        """
        return await self._get_engine(
            lambda: self.conn.get(self.full_name(name)),
            default=default,
        )

    async def delete(self, name: str, missing_ok: bool = False) -> None:
        """Delete a key from the Redis backend asynchronously.

        Args:
            name (str): The name of the key to delete.
            missing_ok (bool, optional): If `True`, do not raise an error if the key does not exist.

        Raises:
            KeyError: If the key does not exist and `missing_ok` is `False`.

        """
        found = await self.conn.delete(self.full_name(name))
        if not missing_ok and not found:
            raise KeyError(f"No name '{name}' found")

    async def pub(self, channel: str, value: Any) -> None:
        """Publish a message to a Redis channel asynchronously.

        Args:
            channel (str): The name of the channel.
            value (Any): The message to publish.

        Returns:
            None

        """
        await self.conn.publish(
            channel=channel,
            message=self.serializer.forward(value),
        )

    async def sub(self, channel: str) -> AsyncIterable[dict[str, Any]]:
        """Subscribe to a Redis channel and asynchronously yield messages.

        Args:
            channel (str): The name of the channel to subscribe to.

        Yields:
            dict[str, Any]: A dictionary representing the message data.

        """
        pubsub = self.conn.pubsub()
        await pubsub.subscribe(channel)
        try:
            async for message in pubsub.listen():
                if message.get("type") == "message" and message.get("data"):
                    yield parse_sub_data(message, serializer=self.serializer)
        finally:
            await pubsub.unsubscribe(channel)
            await pubsub.close()

    async def scan(self, pattern: Optional[str] = None) -> AsyncIterable[str]:
        """Asynchronously scan the Redis backend for keys matching a pattern.

        Args:
            pattern (Optional[str], optional): The pattern to match keys against. Defaults to '*'.

        Yields:
            str: The names of matching keys without the namespace prefix.

        """
        match = self.full_name(pattern or "*")
        async for key in self.conn.scan_iter(match):
            yield self.short_name(key)

    async def priority_add(
        self,
        key: str,
        unique_id: str,
        priority: int | float,
    ) -> None:
        """Add an item to a priority-sorted set asynchronously.

        Args:
            key (str): The name of the sorted set.
            unique_id (str): The item's unique identifier.
            priority (int | float): The numeric priority score.

        Returns:
            None

        """
        await self.conn.zadd(
            self.full_name(key),
            mapping={unique_id: priority},
        )

    async def priority_get(self, key: str) -> Optional[str]:
        """Peek the highest-priority item in a sorted set asynchronously.

        Args:
            key (str): The name of the sorted set.

        Returns:
            item (str, optional): The ID of the highest-priority item, or None if empty.

        """
        results: list[str] = await self.conn.zrange(
            self.full_name(key),
            start=0,
            end=0,
        )
        return results[0] if results else None

    async def priority_iter(self, key: str) -> AsyncIterable[str]:
        """Iterate over all items in a priority-sorted set asynchronously.

        Args:
            key (str): The name of the sorted set.

        Yields:
            item (str): Member of the sorted set, in priority order.

        """
        # ToDo: switch to this for better async:
        #   async for item in self.conn.zscan_iter(self.full_name(key)):
        #       yield item[0]  # item is a (member, score) tuple
        for item in await self.conn.zrange(self.full_name(key), 0, -1):
            yield item

    async def priority_remove(self, key: str, unique_id: str) -> None:
        """Remove an item from a priority-sorted set asynchronously.

        Args:
            key (str): The name of the sorted set.
            unique_id (str): The ID of the item to remove.

        Returns:
            None

        """
        await self.conn.zrem(
            self.full_name(key),
            unique_id,
        )
conn property

Asynchronous Redis connection.

Returns:

Name Type Description
AsyncRedis Redis

The asynchronous Redis client instance.

delete(name, missing_ok=False) async

Delete a key from the Redis backend asynchronously.

Parameters:

Name Type Description Default
name str

The name of the key to delete.

required
missing_ok bool

If True, do not raise an error if the key does not exist.

False

Raises:

Type Description
KeyError

If the key does not exist and missing_ok is False.

Source code in alsek/storage/backends/redis/asyncio.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
async def delete(self, name: str, missing_ok: bool = False) -> None:
    """Delete a key from the Redis backend asynchronously.

    Args:
        name (str): The name of the key to delete.
        missing_ok (bool, optional): If `True`, do not raise an error if the key does not exist.

    Raises:
        KeyError: If the key does not exist and `missing_ok` is `False`.

    """
    found = await self.conn.delete(self.full_name(name))
    if not missing_ok and not found:
        raise KeyError(f"No name '{name}' found")
exists(name) async

Check if a key exists in the Redis backend asynchronously.

Parameters:

Name Type Description Default
name str

The name of the key to check.

required

Returns:

Name Type Description
bool bool

True if the key exists, False otherwise.

Source code in alsek/storage/backends/redis/asyncio.py
119
120
121
122
123
124
125
126
127
128
129
async def exists(self, name: str) -> bool:
    """Check if a key exists in the Redis backend asynchronously.

    Args:
        name (str): The name of the key to check.

    Returns:
        bool: `True` if the key exists, `False` otherwise.

    """
    return await self.conn.exists(self.full_name(name))
get(name, default=Empty) async

Get the value of a key from the Redis backend asynchronously.

Parameters:

Name Type Description Default
name str

The name of the key.

required
default Optional[Union[Any, Type[Empty]]]

Default value if the key does not exist.

Empty

Returns:

Name Type Description
Any Any

The value of the key.

Raises:

Type Description
KeyError

If the key does not exist and no default is provided.

Source code in alsek/storage/backends/redis/asyncio.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
async def get(
    self,
    name: str,
    default: Optional[Union[Any, Type[Empty]]] = Empty,
) -> Any:
    """Get the value of a key from the Redis backend asynchronously.

    Args:
        name (str): The name of the key.
        default (Optional[Union[Any, Type[Empty]]], optional): Default value if the key does not exist.

    Returns:
        Any: The value of the key.

    Raises:
        KeyError: If the key does not exist and no default is provided.

    """
    return await self._get_engine(
        lambda: self.conn.get(self.full_name(name)),
        default=default,
    )
priority_add(key, unique_id, priority) async

Add an item to a priority-sorted set asynchronously.

Parameters:

Name Type Description Default
key str

The name of the sorted set.

required
unique_id str

The item's unique identifier.

required
priority int | float

The numeric priority score.

required

Returns:

Type Description
None

None

Source code in alsek/storage/backends/redis/asyncio.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
async def priority_add(
    self,
    key: str,
    unique_id: str,
    priority: int | float,
) -> None:
    """Add an item to a priority-sorted set asynchronously.

    Args:
        key (str): The name of the sorted set.
        unique_id (str): The item's unique identifier.
        priority (int | float): The numeric priority score.

    Returns:
        None

    """
    await self.conn.zadd(
        self.full_name(key),
        mapping={unique_id: priority},
    )
priority_get(key) async

Peek the highest-priority item in a sorted set asynchronously.

Parameters:

Name Type Description Default
key str

The name of the sorted set.

required

Returns:

Name Type Description
item (str, optional)

The ID of the highest-priority item, or None if empty.

Source code in alsek/storage/backends/redis/asyncio.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
async def priority_get(self, key: str) -> Optional[str]:
    """Peek the highest-priority item in a sorted set asynchronously.

    Args:
        key (str): The name of the sorted set.

    Returns:
        item (str, optional): The ID of the highest-priority item, or None if empty.

    """
    results: list[str] = await self.conn.zrange(
        self.full_name(key),
        start=0,
        end=0,
    )
    return results[0] if results else None
priority_iter(key) async

Iterate over all items in a priority-sorted set asynchronously.

Parameters:

Name Type Description Default
key str

The name of the sorted set.

required

Yields:

Name Type Description
item str

Member of the sorted set, in priority order.

Source code in alsek/storage/backends/redis/asyncio.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
async def priority_iter(self, key: str) -> AsyncIterable[str]:
    """Iterate over all items in a priority-sorted set asynchronously.

    Args:
        key (str): The name of the sorted set.

    Yields:
        item (str): Member of the sorted set, in priority order.

    """
    # ToDo: switch to this for better async:
    #   async for item in self.conn.zscan_iter(self.full_name(key)):
    #       yield item[0]  # item is a (member, score) tuple
    for item in await self.conn.zrange(self.full_name(key), 0, -1):
        yield item
priority_remove(key, unique_id) async

Remove an item from a priority-sorted set asynchronously.

Parameters:

Name Type Description Default
key str

The name of the sorted set.

required
unique_id str

The ID of the item to remove.

required

Returns:

Type Description
None

None

Source code in alsek/storage/backends/redis/asyncio.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
async def priority_remove(self, key: str, unique_id: str) -> None:
    """Remove an item from a priority-sorted set asynchronously.

    Args:
        key (str): The name of the sorted set.
        unique_id (str): The ID of the item to remove.

    Returns:
        None

    """
    await self.conn.zrem(
        self.full_name(key),
        unique_id,
    )
pub(channel, value) async

Publish a message to a Redis channel asynchronously.

Parameters:

Name Type Description Default
channel str

The name of the channel.

required
value Any

The message to publish.

required

Returns:

Type Description
None

None

Source code in alsek/storage/backends/redis/asyncio.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
async def pub(self, channel: str, value: Any) -> None:
    """Publish a message to a Redis channel asynchronously.

    Args:
        channel (str): The name of the channel.
        value (Any): The message to publish.

    Returns:
        None

    """
    await self.conn.publish(
        channel=channel,
        message=self.serializer.forward(value),
    )
scan(pattern=None) async

Asynchronously scan the Redis backend for keys matching a pattern.

Parameters:

Name Type Description Default
pattern Optional[str]

The pattern to match keys against. Defaults to '*'.

None

Yields:

Name Type Description
str AsyncIterable[str]

The names of matching keys without the namespace prefix.

Source code in alsek/storage/backends/redis/asyncio.py
234
235
236
237
238
239
240
241
242
243
244
245
246
async def scan(self, pattern: Optional[str] = None) -> AsyncIterable[str]:
    """Asynchronously scan the Redis backend for keys matching a pattern.

    Args:
        pattern (Optional[str], optional): The pattern to match keys against. Defaults to '*'.

    Yields:
        str: The names of matching keys without the namespace prefix.

    """
    match = self.full_name(pattern or "*")
    async for key in self.conn.scan_iter(match):
        yield self.short_name(key)
set(name, value, nx=False, ttl=None) async

Set a value for a key in the Redis backend asynchronously.

Parameters:

Name Type Description Default
name str

The name of the key.

required
value Any

The value to set.

required
nx bool

If True, only set the key if it does not already exist.

False
ttl Optional[int]

Time to live for the key in milliseconds.

None

Raises:

Type Description
KeyError

If nx is True and the key already exists.

Source code in alsek/storage/backends/redis/asyncio.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
async def set(
    self,
    name: str,
    value: Any,
    nx: bool = False,
    ttl: Optional[int] = None,
) -> None:
    """Set a value for a key in the Redis backend asynchronously.

    Args:
        name (str): The name of the key.
        value (Any): The value to set.
        nx (bool, optional): If `True`, only set the key if it does not already exist.
        ttl (Optional[int], optional): Time to live for the key in milliseconds.

    Raises:
        KeyError: If `nx` is `True` and the key already exists.

    """
    response = await self.conn.set(
        self.full_name(name),
        value=self.serializer.forward(value),
        px=ttl,
        nx=nx,
        keepttl=ttl is None,
    )
    if nx and not response:
        raise KeyError(f"Name '{name}' already exists")
sub(channel) async

Subscribe to a Redis channel and asynchronously yield messages.

Parameters:

Name Type Description Default
channel str

The name of the channel to subscribe to.

required

Yields:

Type Description
AsyncIterable[dict[str, Any]]

dict[str, Any]: A dictionary representing the message data.

Source code in alsek/storage/backends/redis/asyncio.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
async def sub(self, channel: str) -> AsyncIterable[dict[str, Any]]:
    """Subscribe to a Redis channel and asynchronously yield messages.

    Args:
        channel (str): The name of the channel to subscribe to.

    Yields:
        dict[str, Any]: A dictionary representing the message data.

    """
    pubsub = self.conn.pubsub()
    await pubsub.subscribe(channel)
    try:
        async for message in pubsub.listen():
            if message.get("type") == "message" and message.get("data"):
                yield parse_sub_data(message, serializer=self.serializer)
    finally:
        await pubsub.unsubscribe(channel)
        await pubsub.close()
standard

Redis Backend

RedisBackend

Bases: Backend

Redis Backend.

Backend powered by Redis.

Parameters:

Name Type Description Default
conn (str, Redis, LazyClient)

a connection url, Redis() object or LazyClient.

None
namespace str

prefix to use when inserting names in the backend

DEFAULT_NAMESPACE
serializer Serializer

tool for encoding and decoding values written into the backend.

JsonSerializer()
Warning
  • If conn is a Redis() object, decode_responses is expected to be set to True.
Source code in alsek/storage/backends/redis/standard.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
class RedisBackend(Backend):
    """Redis Backend.

    Backend powered by Redis.

    Args:
        conn (str, Redis, LazyClient, optional): a connection url, ``Redis()`` object
            or ``LazyClient``.
        namespace (str): prefix to use when inserting
            names in the backend
        serializer (Serializer): tool for encoding and decoding
            values written into the backend.

    Warning:
        * If ``conn`` is a ``Redis()`` object, ``decode_responses``
          is expected to be set to ``True``.

    """

    SUPPORTS_PUBSUB: bool = True

    def __init__(
        self,
        conn: Optional[Union[str, Redis, LazyClient]] = None,
        namespace: str = DEFAULT_NAMESPACE,
        serializer: Serializer = JsonSerializer(),
    ) -> None:
        super().__init__(namespace, serializer=serializer)
        self._conn = self._conn_parse(conn)

    @staticmethod
    def _conn_parse(
        conn: Optional[Union[str, Redis, LazyClient]]
    ) -> Union[Redis, LazyClient]:
        if isinstance(conn, LazyClient):
            return conn

        if conn is None:
            return Redis(decode_responses=True)
        elif isinstance(conn, Redis):
            return conn
        elif isinstance(conn, str):
            return Redis.from_url(conn, decode_responses=True)
        else:
            raise ValueError(f"Unsupported `conn` {conn}")

    @property
    def conn(self) -> Redis:
        """Connection to the backend."""
        if isinstance(self._conn, LazyClient):
            self._conn = self._conn.get()
        return cast(Redis, self._conn)

    def __repr__(self) -> str:
        return auto_repr(
            self,
            conn=self.conn,
            namespace=self.namespace,
            serializer=self.serializer,
        )

    def encode(self) -> bytes:
        data: dict[str, Any] = dict(
            backend=self.__class__,
            settings=gather_init_params(self, ignore=("conn",)),
        )
        data["settings"]["conn"] = dict(
            connection_class=self.conn.connection_pool.connection_class,
            max_connections=self.conn.connection_pool.max_connections,
            connection_kwargs=self.conn.connection_pool.connection_kwargs,
        )
        return cast(bytes, dill.dumps(data))

    @classmethod
    def _from_settings(cls, settings: dict[str, Any]) -> RedisBackend:
        settings["conn"] = Redis(
            connection_pool=ConnectionPool(
                connection_class=settings["conn"]["connection_class"],
                max_connections=settings["conn"]["max_connections"],
                **settings["conn"]["connection_kwargs"],
            )
        )
        return cls(**settings)

    def exists(self, name: str) -> bool:
        """Check if ``name`` exists in the Redis backend.

        Args:
            name (str): name of the item

        Returns:
            bool

        """
        return bool(self.conn.exists(self.full_name(name)))

    def set(
        self,
        name: str,
        value: Any,
        nx: bool = False,
        ttl: Optional[int] = None,
    ) -> None:
        """Set ``name`` to ``value`` in the Redis backend.

        Args:
            name (str): name of the item
            value (Any): value to set for ``name``
            nx (bool): if ``True`` the item must not exist prior to being set
            ttl (int, optional): time to live for the entry in milliseconds

        Returns:
            None

        Raises:
            KeyError: if ``nx`` is ``True`` and ``name`` already exists

        """
        response = self.conn.set(
            self.full_name(name),
            value=self.serializer.forward(value),
            px=ttl,
            nx=nx,
            keepttl=ttl is None,  # type: ignore
        )
        if nx and response is None:
            raise KeyError(f"Name '{name}' already exists")

    def get(self, name: str, default: Optional[Union[Any, Type[Empty]]] = None) -> Any:
        """Get ``name`` from the Redis backend.

        Args:
            name (str): name of the item
            default (Any, Type[Empty], optional): default value for ``name``

        Returns:
            Any

        """
        return self._get_engine(
            lambda: self.conn.__getitem__(self.full_name(name)),
            default=default,
        )

    def delete(self, name: str, missing_ok: bool = False) -> None:
        """Delete a ``name`` from the Redis backend.

        Args:
            name (str): name of the item
            missing_ok (bool): if ``True``, do not raise for missing

        Returns:
            None

        Raises:
            KeyError: if ``missing_ok`` is ``False`` and ``name`` is not found.

        """
        found = self.conn.delete(self.full_name(name))
        if not missing_ok and not found:
            raise KeyError(f"No name '{name}' found")

    def priority_add(self, key: str, unique_id: str, priority: int | float) -> None:
        """Add an item to a priority-sorted set.

        Args:
            key (str): The name of the sorted set.
            unique_id (str): The item's (Message's) unique identifier
            priority (float): The numeric priority score (decide if lower or higher means higher priority).

        Returns:
            None

        """
        self.conn.zadd(
            self.full_name(key),
            mapping={unique_id: priority},
        )

    def priority_get(self, key: str) -> Optional[str]:
        """Get (peek) the highest-priority item without removing it.

        Args:
            key (str): The name of the sorted set.

        Returns:
            item (str, optional): The member with the highest priority, or None if empty.

        """
        results: list[str] = self.conn.zrange(
            self.full_name(key),
            start=0,
            end=0,
        )
        return results[0] if results else None

    def priority_iter(self, key: str) -> Iterable[str]:
        """Iterate over the items in a priority-sorted set.

        Args:
            key (str): The name of the sorted set.

        Returns:
            priority (Iterable[str]): An iterable of members in the sorted set, sorted by priority.

        """
        yield from self.conn.zrange(self.full_name(key), 0, -1)

    def priority_remove(self, key: str, unique_id: str) -> None:
        """Remove an item from a priority-sorted set.

        Args:
            key (str): The name of the sorted set.
            unique_id (str): The item's (Message's) unique identifier

        Returns:
            None

        """
        self.conn.zrem(
            self.full_name(key),
            unique_id,
        )

    def pub(self, channel: str, value: Any) -> None:
        self.conn.publish(
            channel=channel,
            message=self.serializer.forward(value),
        )

    def sub(self, channel: str) -> Iterable[str | dict[str, Any]]:
        pubsub = self.conn.pubsub()
        pubsub.subscribe(channel)
        try:
            for message in pubsub.listen():
                if message.get("type") == "message" and message.get("data"):
                    yield parse_sub_data(message, serializer=self.serializer)
        finally:
            pubsub.unsubscribe(channel)
            pubsub.close()

    def scan(self, pattern: Optional[str] = None) -> Iterable[str]:
        """Scan the backend for matching names.

        Args:
            pattern (str): pattern to match against

        Returns:
            names_stream (Iterable[str]): a stream of matching name

        """
        match = self.full_name(pattern or "*")
        yield from map(self.short_name, self.conn.scan_iter(match))
conn property

Connection to the backend.

delete(name, missing_ok=False)

Delete a name from the Redis backend.

Parameters:

Name Type Description Default
name str

name of the item

required
missing_ok bool

if True, do not raise for missing

False

Returns:

Type Description
None

None

Raises:

Type Description
KeyError

if missing_ok is False and name is not found.

Source code in alsek/storage/backends/redis/standard.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def delete(self, name: str, missing_ok: bool = False) -> None:
    """Delete a ``name`` from the Redis backend.

    Args:
        name (str): name of the item
        missing_ok (bool): if ``True``, do not raise for missing

    Returns:
        None

    Raises:
        KeyError: if ``missing_ok`` is ``False`` and ``name`` is not found.

    """
    found = self.conn.delete(self.full_name(name))
    if not missing_ok and not found:
        raise KeyError(f"No name '{name}' found")
exists(name)

Check if name exists in the Redis backend.

Parameters:

Name Type Description Default
name str

name of the item

required

Returns:

Type Description
bool

bool

Source code in alsek/storage/backends/redis/standard.py
112
113
114
115
116
117
118
119
120
121
122
def exists(self, name: str) -> bool:
    """Check if ``name`` exists in the Redis backend.

    Args:
        name (str): name of the item

    Returns:
        bool

    """
    return bool(self.conn.exists(self.full_name(name)))
get(name, default=None)

Get name from the Redis backend.

Parameters:

Name Type Description Default
name str

name of the item

required
default (Any, Type[Empty])

default value for name

None

Returns:

Type Description
Any

Any

Source code in alsek/storage/backends/redis/standard.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def get(self, name: str, default: Optional[Union[Any, Type[Empty]]] = None) -> Any:
    """Get ``name`` from the Redis backend.

    Args:
        name (str): name of the item
        default (Any, Type[Empty], optional): default value for ``name``

    Returns:
        Any

    """
    return self._get_engine(
        lambda: self.conn.__getitem__(self.full_name(name)),
        default=default,
    )
priority_add(key, unique_id, priority)

Add an item to a priority-sorted set.

Parameters:

Name Type Description Default
key str

The name of the sorted set.

required
unique_id str

The item's (Message's) unique identifier

required
priority float

The numeric priority score (decide if lower or higher means higher priority).

required

Returns:

Type Description
None

None

Source code in alsek/storage/backends/redis/standard.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def priority_add(self, key: str, unique_id: str, priority: int | float) -> None:
    """Add an item to a priority-sorted set.

    Args:
        key (str): The name of the sorted set.
        unique_id (str): The item's (Message's) unique identifier
        priority (float): The numeric priority score (decide if lower or higher means higher priority).

    Returns:
        None

    """
    self.conn.zadd(
        self.full_name(key),
        mapping={unique_id: priority},
    )
priority_get(key)

Get (peek) the highest-priority item without removing it.

Parameters:

Name Type Description Default
key str

The name of the sorted set.

required

Returns:

Name Type Description
item (str, optional)

The member with the highest priority, or None if empty.

Source code in alsek/storage/backends/redis/standard.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def priority_get(self, key: str) -> Optional[str]:
    """Get (peek) the highest-priority item without removing it.

    Args:
        key (str): The name of the sorted set.

    Returns:
        item (str, optional): The member with the highest priority, or None if empty.

    """
    results: list[str] = self.conn.zrange(
        self.full_name(key),
        start=0,
        end=0,
    )
    return results[0] if results else None
priority_iter(key)

Iterate over the items in a priority-sorted set.

Parameters:

Name Type Description Default
key str

The name of the sorted set.

required

Returns:

Name Type Description
priority Iterable[str]

An iterable of members in the sorted set, sorted by priority.

Source code in alsek/storage/backends/redis/standard.py
224
225
226
227
228
229
230
231
232
233
234
def priority_iter(self, key: str) -> Iterable[str]:
    """Iterate over the items in a priority-sorted set.

    Args:
        key (str): The name of the sorted set.

    Returns:
        priority (Iterable[str]): An iterable of members in the sorted set, sorted by priority.

    """
    yield from self.conn.zrange(self.full_name(key), 0, -1)
priority_remove(key, unique_id)

Remove an item from a priority-sorted set.

Parameters:

Name Type Description Default
key str

The name of the sorted set.

required
unique_id str

The item's (Message's) unique identifier

required

Returns:

Type Description
None

None

Source code in alsek/storage/backends/redis/standard.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def priority_remove(self, key: str, unique_id: str) -> None:
    """Remove an item from a priority-sorted set.

    Args:
        key (str): The name of the sorted set.
        unique_id (str): The item's (Message's) unique identifier

    Returns:
        None

    """
    self.conn.zrem(
        self.full_name(key),
        unique_id,
    )
scan(pattern=None)

Scan the backend for matching names.

Parameters:

Name Type Description Default
pattern str

pattern to match against

None

Returns:

Name Type Description
names_stream Iterable[str]

a stream of matching name

Source code in alsek/storage/backends/redis/standard.py
269
270
271
272
273
274
275
276
277
278
279
280
def scan(self, pattern: Optional[str] = None) -> Iterable[str]:
    """Scan the backend for matching names.

    Args:
        pattern (str): pattern to match against

    Returns:
        names_stream (Iterable[str]): a stream of matching name

    """
    match = self.full_name(pattern or "*")
    yield from map(self.short_name, self.conn.scan_iter(match))
set(name, value, nx=False, ttl=None)

Set name to value in the Redis backend.

Parameters:

Name Type Description Default
name str

name of the item

required
value Any

value to set for name

required
nx bool

if True the item must not exist prior to being set

False
ttl int

time to live for the entry in milliseconds

None

Returns:

Type Description
None

None

Raises:

Type Description
KeyError

if nx is True and name already exists

Source code in alsek/storage/backends/redis/standard.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def set(
    self,
    name: str,
    value: Any,
    nx: bool = False,
    ttl: Optional[int] = None,
) -> None:
    """Set ``name`` to ``value`` in the Redis backend.

    Args:
        name (str): name of the item
        value (Any): value to set for ``name``
        nx (bool): if ``True`` the item must not exist prior to being set
        ttl (int, optional): time to live for the entry in milliseconds

    Returns:
        None

    Raises:
        KeyError: if ``nx`` is ``True`` and ``name`` already exists

    """
    response = self.conn.set(
        self.full_name(name),
        value=self.serializer.forward(value),
        px=ttl,
        nx=nx,
        keepttl=ttl is None,  # type: ignore
    )
    if nx and response is None:
        raise KeyError(f"Name '{name}' already exists")

result

Result Storage

ResultStore

Alsek Result Store.

Parameters:

Name Type Description Default
backend Backend

backend for data storage

required
Warning
  • In order for a result to be stored, it must be serializable by the serializer used by backend.
Source code in alsek/storage/result.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
class ResultStore:
    """Alsek Result Store.

    Args:
        backend (Backend): backend for data storage

    Warning:
        * In order for a result to be stored, it must be
          serializable by the ``serializer`` used by ``backend``.

    """

    def __init__(self, backend: Backend) -> None:
        self.backend = backend

    def serialize(self) -> dict[str, Any]:
        return {
            "backend": self.backend.encode(),
        }

    @staticmethod
    def deserialize(data: dict[str, Any]) -> ResultStore:
        backend_data = dill.loads(data["backend"])
        backend = backend_data["backend"]._from_settings(backend_data["settings"])
        return ResultStore(
            backend=backend,
        )

    @staticmethod
    def _get_stable_prefix(message: Message) -> str:
        """Get a prefix that does not change based on
        whether the message has a progenitor."""
        return f"results:{message.progenitor_uuid if message.progenitor_uuid else message.uuid}"

    @staticmethod
    def get_storage_name(message: Message) -> str:
        """Get the name for ``message`` in the backend.

        Args:
            message (Message): an Alsek message

        Returns:
            name (str): message-specific name

        """
        if message.uuid is None:
            raise ValueError("Message does not have a uuid")

        if message.progenitor_uuid:
            return f"results:{message.progenitor_uuid}:descendants:{message.uuid}"
        else:
            return f"results:{message.uuid}"

    def _get_all_storage_names(self, message: Message, descendants: bool) -> list[str]:
        if descendants:
            if message.descendant_uuids:
                descendant_names = [
                    self.get_storage_name(
                        Message(message.task_name, uuid=u, progenitor_uuid=message.uuid)
                    )
                    for u in message.descendant_uuids
                ]
                return [*descendant_names, self.get_storage_name(message)]
            else:
                return sorted(
                    self.backend.scan(f"{self._get_stable_prefix(message)}*"),
                    key=lambda i: 1 if i == message.uuid else 0,
                )
        else:
            return [self.get_storage_name(message)]

    @staticmethod
    def _extract_uuid(storage_name: str) -> str:
        return storage_name.rsplit(":", 1)[-1]

    def exists(self, message: Message, descendants: bool = False) -> bool:
        """Whether data for ``message`` exists in the store.

        Args:
            message (Message): an Alsek message
            descendants (bool): if ``True``, this method will return ``True``
                iff all of the descendants of ``message`` also exist.

        Returns:
            bool

        """
        names = self._get_all_storage_names(message, descendants=descendants)
        return all(self.backend.exists(n) for n in names)

    def set(self, message: Message, result: Any, nx: bool = True) -> None:
        """Store a ``result`` for ``message``.

        Args:
            message (Message): an Alsek message.
            result (Any): the result to persist
            nx (bool): if ``True`` the item must not exist prior to being set

        Returns:
            None

        """
        self.backend.set(
            self.get_storage_name(message),
            value={"result": result, "timestamp": utcnow_timestamp_ms()},
            nx=nx,
            ttl=message.result_ttl,
        )

    def _get_engine(self, names: Iterable[str], with_metadata: bool) -> list[Any]:
        def bundle_data(n: str) -> dict[str, Any]:
            data: dict[str, Any] = self.backend.get(n)
            if with_metadata:
                data["uuid"] = self._extract_uuid(n)
            return data

        results = sorted(
            [bundle_data(n) for n in names],
            key=lambda d: d["timestamp"],  # type: ignore
        )
        return results if with_metadata else [r["result"] for r in results]

    def get(
        self,
        message: Message,
        timeout: int = 0,
        keep: bool = False,
        with_metadata: bool = False,
        descendants: bool = False,
    ) -> Union[Any, list[Any]]:
        """Get the result for ``message``.

        Args:
            message (Message): an Alsek message.
            timeout (int): amount of time (in milliseconds) to wait
                for the result to become available
            keep (bool): whether to keep the result after fetching it.
                Defaults to ``False`` to conserve storage space.
            with_metadata (bool): if ``True`` return results of the form
                ``{"result": <result>, "uuid": str, "timestamp": int}``, where
                "result" is the result persisted to the backend, "uuid" if the uuid
                 of the message associated with the result and "timestamp" is the
                time at which the result was written to the backend.
            descendants (bool): if ``True`` also fetch results for descendants.

        Returns:
            result (Any, list[Any]): the stored result. If ``descendants``
                is ``True`` a list of results will be returned.

        Raises:
            KeyError: if results are not available for ``message``
            TimeoutError: if results are not available for ``message``
                following ``timeout``.

        Notes:
            * The order of results when ``descendants=True`` is determined
              by the time at which the data was written to the backend.

        Warning:
            * If a message has a projenitor, the ``projenitor_uuid`` field in the
              ``message`` must be set.

        Examples:
            >>> from alsek import Message
            >>> from alsek.storage.backends.redis.standard import RedisBackend
            >>> from alsek.storage.result import ResultStore

            >>> backend = RedisBackend()
            >>> result_store = ResultStore(backend)

            >>> result_store.get(Message(uuid="..."))

        """
        if not self.exists(message, descendants=descendants):
            if timeout:
                waiter(
                    lambda: self.exists(message, descendants=descendants),
                    timeout=timeout,
                    timeout_msg=f"Timeout waiting on result for {message.summary}",
                    sleep_interval=_GET_RESULT_WAIT_SLEEP_INTERVAL,
                )
            else:
                raise KeyError(f"No results for {message.uuid}")

        names = self._get_all_storage_names(message, descendants=descendants)
        results = self._get_engine(names, with_metadata=with_metadata)
        if not keep:
            for n in names:
                self.backend.delete(n)

        return results if descendants else results[0]

    def delete(
        self,
        message: Message,
        descendants: bool = True,
        missing_ok: bool = False,
    ) -> int:
        """Delete any data for ``message`` from the backend.

        Args:
            message (Message): an Alsek message.
            descendants (bool): if ``True`` also delete results for descendants.
            missing_ok (bool): if ``True``, do not raise for missing

        Returns:
            count (int): number of results deleted

        """
        count: int = 0
        for name in self._get_all_storage_names(message, descendants=descendants):
            self.backend.delete(name, missing_ok=missing_ok)
            count += 1
        return count
delete(message, descendants=True, missing_ok=False)

Delete any data for message from the backend.

Parameters:

Name Type Description Default
message Message

an Alsek message.

required
descendants bool

if True also delete results for descendants.

True
missing_ok bool

if True, do not raise for missing

False

Returns:

Name Type Description
count int

number of results deleted

Source code in alsek/storage/result.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def delete(
    self,
    message: Message,
    descendants: bool = True,
    missing_ok: bool = False,
) -> int:
    """Delete any data for ``message`` from the backend.

    Args:
        message (Message): an Alsek message.
        descendants (bool): if ``True`` also delete results for descendants.
        missing_ok (bool): if ``True``, do not raise for missing

    Returns:
        count (int): number of results deleted

    """
    count: int = 0
    for name in self._get_all_storage_names(message, descendants=descendants):
        self.backend.delete(name, missing_ok=missing_ok)
        count += 1
    return count
exists(message, descendants=False)

Whether data for message exists in the store.

Parameters:

Name Type Description Default
message Message

an Alsek message

required
descendants bool

if True, this method will return True iff all of the descendants of message also exist.

False

Returns:

Type Description
bool

bool

Source code in alsek/storage/result.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def exists(self, message: Message, descendants: bool = False) -> bool:
    """Whether data for ``message`` exists in the store.

    Args:
        message (Message): an Alsek message
        descendants (bool): if ``True``, this method will return ``True``
            iff all of the descendants of ``message`` also exist.

    Returns:
        bool

    """
    names = self._get_all_storage_names(message, descendants=descendants)
    return all(self.backend.exists(n) for n in names)
get(message, timeout=0, keep=False, with_metadata=False, descendants=False)

Get the result for message.

Parameters:

Name Type Description Default
message Message

an Alsek message.

required
timeout int

amount of time (in milliseconds) to wait for the result to become available

0
keep bool

whether to keep the result after fetching it. Defaults to False to conserve storage space.

False
with_metadata bool

if True return results of the form {"result": <result>, "uuid": str, "timestamp": int}, where "result" is the result persisted to the backend, "uuid" if the uuid of the message associated with the result and "timestamp" is the time at which the result was written to the backend.

False
descendants bool

if True also fetch results for descendants.

False

Returns:

Name Type Description
result (Any, list[Any])

the stored result. If descendants is True a list of results will be returned.

Raises:

Type Description
KeyError

if results are not available for message

TimeoutError

if results are not available for message following timeout.

Notes
  • The order of results when descendants=True is determined by the time at which the data was written to the backend.
Warning
  • If a message has a projenitor, the projenitor_uuid field in the message must be set.

Examples:

>>> from alsek import Message
>>> from alsek.storage.backends.redis.standard import RedisBackend
>>> from alsek.storage.result import ResultStore
>>> backend = RedisBackend()
>>> result_store = ResultStore(backend)
>>> result_store.get(Message(uuid="..."))
Source code in alsek/storage/result.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def get(
    self,
    message: Message,
    timeout: int = 0,
    keep: bool = False,
    with_metadata: bool = False,
    descendants: bool = False,
) -> Union[Any, list[Any]]:
    """Get the result for ``message``.

    Args:
        message (Message): an Alsek message.
        timeout (int): amount of time (in milliseconds) to wait
            for the result to become available
        keep (bool): whether to keep the result after fetching it.
            Defaults to ``False`` to conserve storage space.
        with_metadata (bool): if ``True`` return results of the form
            ``{"result": <result>, "uuid": str, "timestamp": int}``, where
            "result" is the result persisted to the backend, "uuid" if the uuid
             of the message associated with the result and "timestamp" is the
            time at which the result was written to the backend.
        descendants (bool): if ``True`` also fetch results for descendants.

    Returns:
        result (Any, list[Any]): the stored result. If ``descendants``
            is ``True`` a list of results will be returned.

    Raises:
        KeyError: if results are not available for ``message``
        TimeoutError: if results are not available for ``message``
            following ``timeout``.

    Notes:
        * The order of results when ``descendants=True`` is determined
          by the time at which the data was written to the backend.

    Warning:
        * If a message has a projenitor, the ``projenitor_uuid`` field in the
          ``message`` must be set.

    Examples:
        >>> from alsek import Message
        >>> from alsek.storage.backends.redis.standard import RedisBackend
        >>> from alsek.storage.result import ResultStore

        >>> backend = RedisBackend()
        >>> result_store = ResultStore(backend)

        >>> result_store.get(Message(uuid="..."))

    """
    if not self.exists(message, descendants=descendants):
        if timeout:
            waiter(
                lambda: self.exists(message, descendants=descendants),
                timeout=timeout,
                timeout_msg=f"Timeout waiting on result for {message.summary}",
                sleep_interval=_GET_RESULT_WAIT_SLEEP_INTERVAL,
            )
        else:
            raise KeyError(f"No results for {message.uuid}")

    names = self._get_all_storage_names(message, descendants=descendants)
    results = self._get_engine(names, with_metadata=with_metadata)
    if not keep:
        for n in names:
            self.backend.delete(n)

    return results if descendants else results[0]
get_storage_name(message) staticmethod

Get the name for message in the backend.

Parameters:

Name Type Description Default
message Message

an Alsek message

required

Returns:

Name Type Description
name str

message-specific name

Source code in alsek/storage/result.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@staticmethod
def get_storage_name(message: Message) -> str:
    """Get the name for ``message`` in the backend.

    Args:
        message (Message): an Alsek message

    Returns:
        name (str): message-specific name

    """
    if message.uuid is None:
        raise ValueError("Message does not have a uuid")

    if message.progenitor_uuid:
        return f"results:{message.progenitor_uuid}:descendants:{message.uuid}"
    else:
        return f"results:{message.uuid}"
set(message, result, nx=True)

Store a result for message.

Parameters:

Name Type Description Default
message Message

an Alsek message.

required
result Any

the result to persist

required
nx bool

if True the item must not exist prior to being set

True

Returns:

Type Description
None

None

Source code in alsek/storage/result.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def set(self, message: Message, result: Any, nx: bool = True) -> None:
    """Store a ``result`` for ``message``.

    Args:
        message (Message): an Alsek message.
        result (Any): the result to persist
        nx (bool): if ``True`` the item must not exist prior to being set

    Returns:
        None

    """
    self.backend.set(
        self.get_storage_name(message),
        value={"result": result, "timestamp": utcnow_timestamp_ms()},
        nx=nx,
        ttl=message.result_ttl,
    )

serialization

Serialization

BinarySerializer

Bases: Serializer

Binary serialization.

Source code in alsek/storage/serialization.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
class BinarySerializer(Serializer):
    """Binary serialization."""

    @staticmethod
    def forward(obj: Any) -> Any:
        """Encode an object.

        Args:
            obj (Any): an object to encode

        Returns:
            encoded (Any): base64 encoded ``dill``-serialized object

        """
        dill_bytes = dill.dumps(obj)
        return b64encode(dill_bytes).decode("utf-8")

    @staticmethod
    def reverse(obj: Any) -> Any:
        """Decode an object.

        Args:
            obj (Any): an object to decode

        Returns:
            decoded (Any): ``dill``-deserialized object from base64 encoded string

        """
        if obj is None:
            return None
        base64_bytes = obj.encode("utf-8")
        return dill.loads(b64decode(base64_bytes))
forward(obj) staticmethod

Encode an object.

Parameters:

Name Type Description Default
obj Any

an object to encode

required

Returns:

Name Type Description
encoded Any

base64 encoded dill-serialized object

Source code in alsek/storage/serialization.py
87
88
89
90
91
92
93
94
95
96
97
98
99
@staticmethod
def forward(obj: Any) -> Any:
    """Encode an object.

    Args:
        obj (Any): an object to encode

    Returns:
        encoded (Any): base64 encoded ``dill``-serialized object

    """
    dill_bytes = dill.dumps(obj)
    return b64encode(dill_bytes).decode("utf-8")
reverse(obj) staticmethod

Decode an object.

Parameters:

Name Type Description Default
obj Any

an object to decode

required

Returns:

Name Type Description
decoded Any

dill-deserialized object from base64 encoded string

Source code in alsek/storage/serialization.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
@staticmethod
def reverse(obj: Any) -> Any:
    """Decode an object.

    Args:
        obj (Any): an object to decode

    Returns:
        decoded (Any): ``dill``-deserialized object from base64 encoded string

    """
    if obj is None:
        return None
    base64_bytes = obj.encode("utf-8")
    return dill.loads(b64decode(base64_bytes))
JsonSerializer

Bases: Serializer

JSON serialization.

Source code in alsek/storage/serialization.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
class JsonSerializer(Serializer):
    """JSON serialization."""

    @staticmethod
    def forward(obj: Any) -> Any:
        """Encode an object.

        Args:
            obj (Any): an object to encode

        Returns:
            encoded (Any): JSON encoded object

        """
        return json.dumps(obj)

    @staticmethod
    def reverse(obj: Any) -> Any:
        """Decode an object.

        Args:
            obj (Any): an object to decode

        Returns:
            decoded (Any): JSON decoded object

        """
        if obj is None:
            return None
        return json.loads(obj)
forward(obj) staticmethod

Encode an object.

Parameters:

Name Type Description Default
obj Any

an object to encode

required

Returns:

Name Type Description
encoded Any

JSON encoded object

Source code in alsek/storage/serialization.py
55
56
57
58
59
60
61
62
63
64
65
66
@staticmethod
def forward(obj: Any) -> Any:
    """Encode an object.

    Args:
        obj (Any): an object to encode

    Returns:
        encoded (Any): JSON encoded object

    """
    return json.dumps(obj)
reverse(obj) staticmethod

Decode an object.

Parameters:

Name Type Description Default
obj Any

an object to decode

required

Returns:

Name Type Description
decoded Any

JSON decoded object

Source code in alsek/storage/serialization.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@staticmethod
def reverse(obj: Any) -> Any:
    """Decode an object.

    Args:
        obj (Any): an object to decode

    Returns:
        decoded (Any): JSON decoded object

    """
    if obj is None:
        return None
    return json.loads(obj)
Serializer

Bases: ABC

Base Serializer Class.

Source code in alsek/storage/serialization.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class Serializer(ABC):
    """Base Serializer Class."""

    def __repr__(self) -> str:
        return auto_repr(self)

    @staticmethod
    @abstractmethod
    def forward(obj: Any) -> Any:
        """Encode an object for backend serialization.

        Args:
            obj (Any): an object to encode

        Returns:
            encoded (Any): encoded object

        """
        raise NotImplementedError()

    @staticmethod
    @abstractmethod
    def reverse(obj: Any) -> Any:
        """Decode an object.

        Args:
            obj (Any): an object to decode

        Returns:
            decoded (Any): decoded object

        """
        raise NotImplementedError()
forward(obj) abstractmethod staticmethod

Encode an object for backend serialization.

Parameters:

Name Type Description Default
obj Any

an object to encode

required

Returns:

Name Type Description
encoded Any

encoded object

Source code in alsek/storage/serialization.py
23
24
25
26
27
28
29
30
31
32
33
34
35
@staticmethod
@abstractmethod
def forward(obj: Any) -> Any:
    """Encode an object for backend serialization.

    Args:
        obj (Any): an object to encode

    Returns:
        encoded (Any): encoded object

    """
    raise NotImplementedError()
reverse(obj) abstractmethod staticmethod

Decode an object.

Parameters:

Name Type Description Default
obj Any

an object to decode

required

Returns:

Name Type Description
decoded Any

decoded object

Source code in alsek/storage/serialization.py
37
38
39
40
41
42
43
44
45
46
47
48
49
@staticmethod
@abstractmethod
def reverse(obj: Any) -> Any:
    """Decode an object.

    Args:
        obj (Any): an object to decode

    Returns:
        decoded (Any): decoded object

    """
    raise NotImplementedError()

tools

Tools

iteration

Result Iteration

ResultPool

Tooling for iterating over task results.

Parameters:

Name Type Description Default
result_store ResultStore

store where task results are persisted

required

Examples:

>>> from alsek.storage.result import ResultStore
>>> from alsek.tools.iteration import ResultPool
...
>>> result_store = ResultStore(...)
>>> pool = ResultPool(result_store)
...
>>> messages = [...]
>>> for uuid, result in pool.istream(*messages):
...     pass
Source code in alsek/tools/iteration.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
class ResultPool:
    """Tooling for iterating over task results.

    Args:
        result_store (ResultStore): store where task results are persisted

    Examples:
        >>> from alsek.storage.result import ResultStore
        >>> from alsek.tools.iteration import ResultPool
        ...
        >>> result_store = ResultStore(...)
        >>> pool = ResultPool(result_store)
        ...
        >>> messages = [...]
        >>> for uuid, result in pool.istream(*messages):
        ...     pass

    """

    def __init__(self, result_store: ResultStore) -> None:
        self.result_store = result_store

        self.stop_signal = StopSignalListener(exit_override=False)

    @staticmethod
    def _validate(messages: tuple[Message, ...]) -> None:
        if has_duplicates([m.uuid for m in messages]):
            raise ValidationError("Duplicate messages detected")

    def _engine(
        self,
        messages: tuple[Message, ...],
        wait: int,
        break_on_error: bool,
        **kwargs: Any,
    ) -> Iterable[tuple[Message, Any]]:
        self._validate(messages)

        outstanding = list(range(len(messages)))
        while outstanding and not self.stop_signal.received:
            to_drop = set()
            for i in outstanding:
                try:
                    yield messages[i], self.result_store.get(messages[i], **kwargs)
                    to_drop.add(i)
                except (KeyError, TimeoutError):
                    if break_on_error:
                        break

            outstanding = _idx_drop(outstanding, indexes=to_drop)
            self.stop_signal.wait(wait if outstanding else 0)

    def istream(
        self,
        *messages: Message,
        wait: int = 5 * 1000,
        descendants: bool = False,
        **kwargs: Any,
    ) -> Iterable[tuple[Message, Any]]:
        """Stream the results of one or more messages. Results are yielded
        in the order in which they become available. (This may differ from
        the order in which messages are provided.)

        Args:
            *messages (Message): one or more messages to iterate over
            wait (int): time to wait (in milliseconds) between checks for
                available results
            descendants (bool): if ``True``, wait for and return
                the results of all descendant (callback) messages.
            **kwargs (Keyword Args): keyword arguments to pass to
                ``result_store.get()``.

        results (iterable): an iterable of results of the form
            ``(Message, result)``.

        Warning:
            * By default, ``result_store`` does not keep messages once
              they have been collected. As a result, providing messages
              for which the corresponding results have already been collected
              (and deleted) will cause this method to loop indefinitely.
              In order to loop over messages multiple times set ``keep=True``.

        """
        yield from self._engine(
            messages,
            wait=wait,
            break_on_error=False,
            descendants=descendants,
            **kwargs,
        )

    def stream(
        self,
        *messages: Message,
        wait: int = 5 * 1000,
        descendants: bool = False,
        **kwargs: Any,
    ) -> Iterable[tuple[Message, Any]]:
        """Stream the results of one or more messages. The order of the
        results are guaranteed to match the order of ``messages``.

        Args:
            *messages (Message): one or more messages to iterate over
            wait (int): time to wait (in milliseconds) between checks for
                available results
            descendants (bool): if ``True``, wait for and return
                the results of all descendant (callback) messages.
            **kwargs (Keyword Args): keyword arguments to pass to
                ``result_store.get()``.

        Returns:
            results (iterable): an iterable of results of the form
                ``(Message, result)``.

        Warning:
            * By default, ``result_store`` does not keep messages once
              they have been collected. As a result, providing messages
              for which the corresponding results have already been collected
              (and deleted) will cause this method to loop indefinitely.
              In order to loop over messages multiple times set ``keep=True``.

        """
        yield from self._engine(
            messages,
            wait=wait,
            break_on_error=True,
            descendants=descendants,
            **kwargs,
        )
istream(*messages, wait=5 * 1000, descendants=False, **kwargs)

Stream the results of one or more messages. Results are yielded in the order in which they become available. (This may differ from the order in which messages are provided.)

Parameters:

Name Type Description Default
*messages Message

one or more messages to iterate over

()
wait int

time to wait (in milliseconds) between checks for available results

5 * 1000
descendants bool

if True, wait for and return the results of all descendant (callback) messages.

False
**kwargs Keyword Args

keyword arguments to pass to result_store.get().

{}

results (iterable): an iterable of results of the form (Message, result).

Warning
  • By default, result_store does not keep messages once they have been collected. As a result, providing messages for which the corresponding results have already been collected (and deleted) will cause this method to loop indefinitely. In order to loop over messages multiple times set keep=True.
Source code in alsek/tools/iteration.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def istream(
    self,
    *messages: Message,
    wait: int = 5 * 1000,
    descendants: bool = False,
    **kwargs: Any,
) -> Iterable[tuple[Message, Any]]:
    """Stream the results of one or more messages. Results are yielded
    in the order in which they become available. (This may differ from
    the order in which messages are provided.)

    Args:
        *messages (Message): one or more messages to iterate over
        wait (int): time to wait (in milliseconds) between checks for
            available results
        descendants (bool): if ``True``, wait for and return
            the results of all descendant (callback) messages.
        **kwargs (Keyword Args): keyword arguments to pass to
            ``result_store.get()``.

    results (iterable): an iterable of results of the form
        ``(Message, result)``.

    Warning:
        * By default, ``result_store`` does not keep messages once
          they have been collected. As a result, providing messages
          for which the corresponding results have already been collected
          (and deleted) will cause this method to loop indefinitely.
          In order to loop over messages multiple times set ``keep=True``.

    """
    yield from self._engine(
        messages,
        wait=wait,
        break_on_error=False,
        descendants=descendants,
        **kwargs,
    )
stream(*messages, wait=5 * 1000, descendants=False, **kwargs)

Stream the results of one or more messages. The order of the results are guaranteed to match the order of messages.

Parameters:

Name Type Description Default
*messages Message

one or more messages to iterate over

()
wait int

time to wait (in milliseconds) between checks for available results

5 * 1000
descendants bool

if True, wait for and return the results of all descendant (callback) messages.

False
**kwargs Keyword Args

keyword arguments to pass to result_store.get().

{}

Returns:

Name Type Description
results iterable

an iterable of results of the form (Message, result).

Warning
  • By default, result_store does not keep messages once they have been collected. As a result, providing messages for which the corresponding results have already been collected (and deleted) will cause this method to loop indefinitely. In order to loop over messages multiple times set keep=True.
Source code in alsek/tools/iteration.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def stream(
    self,
    *messages: Message,
    wait: int = 5 * 1000,
    descendants: bool = False,
    **kwargs: Any,
) -> Iterable[tuple[Message, Any]]:
    """Stream the results of one or more messages. The order of the
    results are guaranteed to match the order of ``messages``.

    Args:
        *messages (Message): one or more messages to iterate over
        wait (int): time to wait (in milliseconds) between checks for
            available results
        descendants (bool): if ``True``, wait for and return
            the results of all descendant (callback) messages.
        **kwargs (Keyword Args): keyword arguments to pass to
            ``result_store.get()``.

    Returns:
        results (iterable): an iterable of results of the form
            ``(Message, result)``.

    Warning:
        * By default, ``result_store`` does not keep messages once
          they have been collected. As a result, providing messages
          for which the corresponding results have already been collected
          (and deleted) will cause this method to loop indefinitely.
          In order to loop over messages multiple times set ``keep=True``.

    """
    yield from self._engine(
        messages,
        wait=wait,
        break_on_error=True,
        descendants=descendants,
        **kwargs,
    )

types

Types

Empty

Empty sentinel.

Source code in alsek/types.py
13
14
class Empty:
    """Empty sentinel."""

utils

aggregation

Aggregation Utils

gather_init_params(obj, ignore=None)

Extract the parameters passed to an object's __init__().

Parameters:

Name Type Description Default
obj object
required
ignore tuple

parameters in __init__() to ignore

None

Returns:

Name Type Description
params dict

parameters from __init__().

Source code in alsek/utils/aggregation.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def gather_init_params(
    obj: Any,
    ignore: Optional[tuple[str, ...]] = None,
) -> dict[str, Any]:
    """Extract the parameters passed to an object's ``__init__()``.

    Args:
        obj (object):
        ignore (tuple, optional): parameters in ``__init__()`` to ignore

    Returns:
        params (dict): parameters from ``__init__()``.

    """
    params = dict()
    for k in inspect.signature(obj.__init__).parameters:
        if ignore and k in ignore:
            continue
        params[k] = getattr(obj, k)
    return params

checks

Check Utils

has_duplicates(itera)

Determine if itera contains duplicates.

Parameters:

Name Type Description Default
itera Collection

a sized iterable

required

Returns:

Type Description
bool

bool

Source code in alsek/utils/checks.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def has_duplicates(itera: Collection[Any]) -> bool:
    """Determine if ``itera`` contains duplicates.

    Args:
        itera (Collection): a sized iterable

    Returns:
        bool

    """
    seen = set()
    for i in itera:
        if i in seen:
            return True
        seen.add(i)
    return False

decorators

Decorators

helpers

Helpers

logging

Logging

get_logger()

Get the Alsek logger.

Returns:

Name Type Description
logger Logger

Alsek logger

Source code in alsek/utils/logging.py
19
20
21
22
23
24
25
26
def get_logger() -> logging.Logger:
    """Get the Alsek logger.

    Returns:
        logger (logging.Logger): Alsek logger

    """
    return logging.getLogger("alsek")
magic_logger(before=lambda: None, after=lambda: None)

Logging decorator.

Parameters:

Name Type Description Default
before callable

function to log a message before function execution. This callable will be passed only those parameters which it shares with the deocrated function.

lambda: None
after callable

function to log a message after function execution. This callable will be passed: * input_: a dictionary where the key is a parameter accepted by the wrapped function and the value is the value passed. If not present in the signature of after this data will not be provided. * output: the output of the function. If not present in the signature of after this data will not be provided.

lambda: None

Returns:

Name Type Description
wrapper callable

wrapped function

Examples:

>>> import logging
>>> from alsek.utils.logging import magic_logger
>>> log = logging.getLogger(__name__)
>>> @magic_logger(
>>>    before=lambda a: log.debug(a),
>>>    after=lambda input_, output: log.info(output)
>>> )
>>> def add99(a: int) -> int:
>>>    return a + 99
Source code in alsek/utils/logging.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def magic_logger(
    before: Callable[..., Any] = lambda: None,
    after: Callable[..., Any] = lambda: None,
) -> Callable[..., Any]:
    """Logging decorator.

    Args:
        before (callable): function to log a message before function
            execution. This callable will be passed only those parameters
                which it shares with the deocrated function.
        after (callable): function to log a message after function
            execution. This callable will be passed:
                * ``input_``: a dictionary where the key is a parameter accepted
                  by the wrapped function and the value is the value passed. If not
                  present in the signature of ``after`` this data will not be provided.
                * ``output``: the output of the function. If not present in the signature
                  of ``after`` this data will not be provided.

    Returns:
        wrapper (callable): wrapped ``function``

    Examples:
        >>> import logging
        >>> from alsek.utils.logging import magic_logger

        >>> log = logging.getLogger(__name__)

        >>> @magic_logger(
        >>>    before=lambda a: log.debug(a),
        >>>    after=lambda input_, output: log.info(output)
        >>> )
        >>> def add99(a: int) -> int:
        >>>    return a + 99

    """

    def wrapper(function_raw: Callable[..., Any]) -> Callable[..., Any]:
        @wraps(function_raw)
        def inner(*args_raw: Any, **kwargs_raw: Any) -> Any:
            function, args, kwargs = _magic_parser(
                function_raw,
                args_raw=args_raw,
                kwargs_raw=kwargs_raw,
            )
            full_kwargs = _merge_args_kwargs(function, args=args, kwargs=kwargs)
            _run_func(before, **full_kwargs)
            output = function(*args, **kwargs)
            _run_func(after, input_=full_kwargs, output=output)
            return output

        return inner

    return wrapper
setup_logging(level)

Setup Alsek-style logging.

Parameters:

Name Type Description Default
level int

logging level to use

required

Returns:

Type Description
None

None

Source code in alsek/utils/logging.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def setup_logging(level: int) -> None:
    """Setup Alsek-style logging.

    Args:
        level (int): logging level to use

    Returns:
        None

    """
    logger = get_logger()
    handler = logging.StreamHandler()
    formatter = logging.Formatter(fmt=LOGGING_FORMAT, datefmt=LOGGING_DATEFMT)
    handler.setFormatter(formatter)
    logger.handlers = [handler]
    logger.setLevel(level)
    logger.propagate = False  # <-- super important!

namespacing

Namespacing

get_dlq_message_name(message)

Get the name for message in the backend's dead letter queue (DLQ).

Parameters:

Name Type Description Default
message Message

an Alsek message

required

Returns:

Name Type Description
dlq_name str

message-specific name in the DLQ

Source code in alsek/utils/namespacing.py
115
116
117
118
119
120
121
122
123
124
125
def get_dlq_message_name(message: Message) -> str:
    """Get the name for ``message`` in the backend's dead letter queue (DLQ).

    Args:
        message (Message): an Alsek message

    Returns:
        dlq_name (str): message-specific name in the DLQ

    """
    return f"{DLQ_NAMESPACE_KEY}:{get_message_name(message)}"
get_message_name(message)

Get the name for message in the backend.

Parameters:

Name Type Description Default
message Message

an Alsek message

required

Returns:

Name Type Description
name str

message-specific name

Source code in alsek/utils/namespacing.py
61
62
63
64
65
66
67
68
69
70
71
72
def get_message_name(message: Message) -> str:
    """Get the name for ``message`` in the backend.

    Args:
        message (Message): an Alsek message

    Returns:
        name (str): message-specific name

    """
    subnamespace = get_messages_namespace(message)
    return f"{subnamespace}:{message.uuid}"
get_message_signature(message)

Get the signature for message in the backend.

Parameters:

Name Type Description Default
message Message

an Alsek message

required

Returns:

Name Type Description
signature str

message-specific signature.

Source code in alsek/utils/namespacing.py
75
76
77
78
79
80
81
82
83
84
85
def get_message_signature(message: Message) -> str:
    """Get the signature for ``message`` in the backend.

    Args:
        message (Message): an Alsek message

    Returns:
        signature (str): message-specific signature.

    """
    return f"{get_message_name(message)}:retry:{message.retries}"
get_messages_namespace(message)

Get the namespace for a message.

Parameters:

Name Type Description Default
message Message

an Alsek message

required

Returns:

Name Type Description
namespace str

the namespace for the message

Source code in alsek/utils/namespacing.py
47
48
49
50
51
52
53
54
55
56
57
58
def get_messages_namespace(message: Message) -> str:
    """Get the namespace for a message.

    Args:
        message (Message): an Alsek message

    Returns:
        namespace (str): the namespace for the message

    """
    subnamespace = get_subnamespace(message.queue, message.task_name)
    return f"{subnamespace}:{MESSAGES_NAMESPACE_KEY}"
get_priority_namespace(subnamespace)

Get the namespace for a message's priority information.

Parameters:

Name Type Description Default
subnamespace str

the namespace for the message

required

Returns:

Name Type Description
priority_namespace str

the namespace for priority information

Source code in alsek/utils/namespacing.py
88
89
90
91
92
93
94
95
96
97
98
def get_priority_namespace(subnamespace: str) -> str:
    """Get the namespace for a message's priority information.

    Args:
        subnamespace (str): the namespace for the message

    Returns:
        priority_namespace (str): the namespace for priority information

    """
    return f"{PRIORITY_NAMESPACE_KEY}:{subnamespace}"
get_priority_namespace_from_message(message)

Get the namespace for message's priority information.

Parameters:

Name Type Description Default
message Message

an Alsek message

required

Returns:

Name Type Description
namespace str

the fully-qualified priority queue name

Source code in alsek/utils/namespacing.py
101
102
103
104
105
106
107
108
109
110
111
112
def get_priority_namespace_from_message(message: Message) -> str:
    """Get the namespace for message's priority information.

    Args:
        message (Message): an Alsek message

    Returns:
        namespace (str): the fully-qualified priority queue name

    """
    subnamespace = get_subnamespace(message.queue, message.task_name)
    return get_priority_namespace(subnamespace)
get_subnamespace(queue=None, task_name=None)

Get the subnamespace for a given queue and (optionally) task_name.

Parameters:

Name Type Description Default
queue str

the name of the queue

None
task_name str

name of the task

None

Returns:

Name Type Description
subnamespace str

queue-specific namespace

Raises:

Type Description
ValueError

if task_name is provided and queue is not.

Source code in alsek/utils/namespacing.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def get_subnamespace(
    queue: Optional[str] = None,
    task_name: Optional[str] = None,
) -> str:
    """Get the subnamespace for a given ``queue``
    and (optionally) ``task_name``.

    Args:
        queue (str, optional): the name of the queue
        task_name (str): name of the task

    Returns:
        subnamespace (str): queue-specific namespace

    Raises:
        ValueError: if ``task_name`` is provided and ``queue`` is not.

    """
    if queue is None and task_name is not None:
        raise ValueError("`queue` must be provided if `task_name` is not None")

    if queue and task_name:
        return f"{QUEUES_NAMESPACE_KEY}:{queue}:{TASK_NAMESPACE_KEY}:{task_name}"
    elif queue:
        return f"{QUEUES_NAMESPACE_KEY}:{queue}"
    else:
        return f"{QUEUES_NAMESPACE_KEY}"

parsing

Parsing

ExceptionDetails

Bases: NamedTuple

Source code in alsek/utils/parsing.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class ExceptionDetails(NamedTuple):
    name: str
    text: Optional[str] = None
    traceback: Optional[str] = None

    def as_dict(self) -> dict[str, str]:
        """Convert the NamedTuple to a dictionary

        Returns:
            dict

        """
        return self._asdict()

    def as_exception(self, strict: bool = True) -> BaseException:
        """Return parsed exception information as a Python exception.

        Args:
            strict (bool): if ``True`` do not coerce failures to
                import the correct error

        Returns:
            BaseException

        Warnings:
            This will not include the original traceback.

        """
        try:
            exc, text = _get_exception_class(self.name), self.text
            output = exc(text)
        except (ImportError, AttributeError, TypeError) as error:
            if strict:
                raise error
            else:
                exc, text = Exception, f"{self.name}: {self.text}"
            output = exc(text)
        return output
as_dict()

Convert the NamedTuple to a dictionary

Returns:

Type Description
dict[str, str]

dict

Source code in alsek/utils/parsing.py
44
45
46
47
48
49
50
51
def as_dict(self) -> dict[str, str]:
    """Convert the NamedTuple to a dictionary

    Returns:
        dict

    """
    return self._asdict()
as_exception(strict=True)

Return parsed exception information as a Python exception.

Parameters:

Name Type Description Default
strict bool

if True do not coerce failures to import the correct error

True

Returns:

Type Description
BaseException

BaseException

Source code in alsek/utils/parsing.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def as_exception(self, strict: bool = True) -> BaseException:
    """Return parsed exception information as a Python exception.

    Args:
        strict (bool): if ``True`` do not coerce failures to
            import the correct error

    Returns:
        BaseException

    Warnings:
        This will not include the original traceback.

    """
    try:
        exc, text = _get_exception_class(self.name), self.text
        output = exc(text)
    except (ImportError, AttributeError, TypeError) as error:
        if strict:
            raise error
        else:
            exc, text = Exception, f"{self.name}: {self.text}"
        output = exc(text)
    return output
get_exception_name(exception)

Get the name of an exception as a string.

Parameters:

Name Type Description Default
exception (BaseException, Type[BaseException])

Exception class

required

Returns:

Name Type Description
name str

the exception name

Source code in alsek/utils/parsing.py
24
25
26
27
28
29
30
31
32
33
34
35
36
def get_exception_name(exception: Union[BaseException, Type[BaseException]]) -> str:
    """Get the name of an exception as a string.

    Args:
        exception (BaseException, Type[BaseException]): Exception class

    Returns:
        name (str): the exception name

    """
    exception_type = exception if isinstance(exception, type) else type(exception)
    module, qualname = exception_type.__module__, exception_type.__qualname__
    return qualname if module == "builtins" else f"{module}.{qualname}"
parse_exception(error)

Extracts the exception type, exception message, and exception traceback from an error.

Parameters:

Name Type Description Default
error BaseException

The exception to extract details from.

required

Returns:

Name Type Description
details ExceptionDetails

A named tuple containing the exception information

Source code in alsek/utils/parsing.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def parse_exception(error: BaseException) -> ExceptionDetails:
    """Extracts the exception type, exception message, and exception
    traceback from an error.

    Args:
        error (BaseException): The exception to extract details from.

    Returns:
        details (ExceptionDetails): A named tuple containing the exception information

    """
    return ExceptionDetails(
        name=get_exception_name(error),
        text=str(error),
        traceback="".join(
            traceback.format_exception(
                type(error),
                value=error,
                tb=error.__traceback__,
            )
        ),
    )

printing

Printing Utils

auto_repr(obj, new_line_threshold=5, **params)

Autogenerate a class repr string.

Parameters:

Name Type Description Default
obj object

an object to generate a repr for

required
new_line_threshold int

number of params required to split the parameters over multiple lines.

5
**params Keyword Args

parameters to include in the repr string

{}

Returns:

Name Type Description
repr str

repr string

Source code in alsek/utils/printing.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def auto_repr(obj: object, new_line_threshold: Optional[int] = 5, **params: Any) -> str:
    """Autogenerate a class repr string.

    Args:
        obj (object): an object to generate a repr for
        new_line_threshold (int, optional): number of ``params``
            required to split the parameters over multiple lines.
        **params (Keyword Args): parameters to include in the
            repr string

    Returns:
        repr (str): repr string

    """
    class_name = obj.__class__.__name__
    if new_line_threshold is None or len(params) <= new_line_threshold:
        start, join_on, end = "", ", ", ""
    else:
        start, join_on, end = "\n    ", ",\n    ", "\n"
    return f"{class_name}({start}{_format_params(params, join_on=join_on)}{end})"

scanning

Helpers

collect_tasks(module)

Recursively collect all tasks in name.

Parameters:

Name Type Description Default
module (str, ModuleType)

name of a module

required

Returns:

Name Type Description
module tuple[Task, ...]

collected tasks

Raises:

Type Description
NoTasksFoundError

if no tasks can be found

Source code in alsek/utils/scanning.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@magic_logger(
    before=lambda module: log.debug("Scanning %r for tasks...", module),
    after=lambda output: log.debug(
        "Found %s task%s.",
        len(output),
        "s" if len(output) > 1 else "",
    ),
)
def collect_tasks(module: str | ModuleType) -> tuple[Task, ...]:
    """Recursively collect all tasks in ``name``.

    Args:
        module (str, ModuleType): name of a module

    Returns:
        module (tuple[Task, ...]): collected tasks

    Raises:
        NoTasksFoundError: if no tasks can be found

    """
    sys.path.append(os.getcwd())
    if isinstance(module, str):
        module = import_module(module)
    elif not isinstance(module, ModuleType):
        raise TypeError(f"Unsupported input type, got {type(module)}")

    all_tasks: dict[str, Task] = dict()
    for m in _enumerate_modules(module):
        for name, task in getmembers(m, predicate=_is_task):
            if name in all_tasks:
                if task != all_tasks[name]:
                    raise TaskNameCollisionError(f"Multiple tasks '{name}'")
            else:
                all_tasks[name] = task

    if all_tasks:
        return tuple(v for _, v in all_tasks.items())
    else:
        raise NoTasksFoundError("No tasks found")

sorting

Sorting

dict_sort(dictionary, key=None)

Sort a dictionary by key.

Parameters:

Name Type Description Default
dictionary dict[Any, Any]
required
key callable

a callable which consumes a key and returns an object which supports the less than comparison operator.

None

Returns:

Name Type Description
sorted_dictionary dict

dictionary sorted

Source code in alsek/utils/sorting.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def dict_sort(
    dictionary: dict[Any, Any],
    key: Optional[Callable[[Any], Any]] = None,
) -> dict[Any, Any]:
    """Sort a dictionary by key.

    Args:
        dictionary:
        key (callable): a callable which consumes a key
            and returns an object which supports the
            less than comparison operator.

    Returns:
        sorted_dictionary (dict): ``dictionary`` sorted

    """
    return dict(sorted(dictionary.items(), key=lambda x: (key or (lambda k: k))(x[0])))  # type: ignore

string

String Utils

smart_join(items, limit=None, delimiter=', ')

Joins a list of strings with a delimiter, limiting the number of items to display and optionally appending a continuation indicator or providing a grammatically correct conjunction for the last two items.

Parameters:

Name Type Description Default
items list[str]

A list of strings to be joined.

required
limit Optional[int]

The maximum number of items to include in the joined string. If None, join all items without limiting.

None
delimiter str

The string used to separate the items in the joined output.

', '

Returns:

Name Type Description
str str

A string containing the joined items, formatted according to the

str

specified delimiter and limits.

Raises:

Type Description
ValueError

If the items list is empty.

Source code in alsek/utils/string.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def smart_join(
    items: list[str],
    limit: Optional[int] = None,
    delimiter: str = ", ",
) -> str:
    """Joins a list of strings with a delimiter, limiting the number of items to display
    and optionally appending a continuation indicator or providing a grammatically
    correct conjunction for the last two items.

    Args:
        items (list[str]): A list of strings to be joined.
        limit (Optional[int]): The maximum number of items to include in the joined
            string. If None, join all items without limiting.
        delimiter (str): The string used to separate the items in the joined output.

    Returns:
        str: A string containing the joined items, formatted according to the
        specified delimiter and limits.

    Raises:
        ValueError: If the `items` list is empty.

    """
    if len(items) == 0:
        raise ValueError("No items to join")
    elif len(items) == 1:
        return items[0]
    elif limit is None or len(items) <= limit:
        *start, last = items
        return f"{delimiter.join(start)} and {last}"
    else:
        return delimiter.join(items[:limit]) + "..."

system

System Utils

StopSignalListener

Tool for listing for stop signals.

Parameters:

Name Type Description Default
stop_signals tuple[int, ...]

one or more stop signals to listen for.

DEFAULT_STOP_SIGNALS
exit_override bool

trigger an immediate and non-graceful shutdown of the current process if two or more SIGTERM or SIGINT signals are received.

True
Source code in alsek/utils/system.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class StopSignalListener:
    """Tool for listing for stop signals.

    Args:
        stop_signals (tuple[int, ...], optional): one or more stop
            signals to listen for.
        exit_override (bool): trigger an immediate and non-graceful shutdown
            of the current process if two or more SIGTERM or SIGINT signals
            are received.

    """

    def __init__(
        self,
        stop_signals: tuple[int, ...] = DEFAULT_STOP_SIGNALS,
        exit_override: bool = True,
    ) -> None:
        self.stop_signals = stop_signals
        self.exit_override = exit_override

        self.exit_event = Event()
        for s in self.stop_signals:
            signal.signal(s, self._signal_handler)

    def _signal_handler(self, signum: int, *args: Any) -> None:  # noqa
        log.debug("Received stop signal %s...", Signals(signum).name)
        if self.exit_override and self.received:
            sys.exit(1)
        self.exit_event.set()

    def wait(self, timeout: Optional[int]) -> None:
        """Wait for a stop signal to be received.

        Args:
            timeout (int, optional): amount of time
                (in milliseconds) to wait

        Returns:
            None

        """
        self.exit_event.wait(timeout if timeout is None else timeout / 1000)

    @property
    def received(self) -> bool:
        """Whether a stop signal has been received."""
        return self.exit_event.is_set()
received property

Whether a stop signal has been received.

wait(timeout)

Wait for a stop signal to be received.

Parameters:

Name Type Description Default
timeout int

amount of time (in milliseconds) to wait

required

Returns:

Type Description
None

None

Source code in alsek/utils/system.py
86
87
88
89
90
91
92
93
94
95
96
97
def wait(self, timeout: Optional[int]) -> None:
    """Wait for a stop signal to be received.

    Args:
        timeout (int, optional): amount of time
            (in milliseconds) to wait

    Returns:
        None

    """
    self.exit_event.wait(timeout if timeout is None else timeout / 1000)
smart_cpu_count()

Count the number of CPUs, with one reserved for the main process.

Returns:

Name Type Description
count int

number of cpus

Source code in alsek/utils/system.py
105
106
107
108
109
110
111
112
113
def smart_cpu_count() -> int:
    """Count the number of CPUs, with one reserved
    for the main process.

    Returns:
        count (int): number of cpus

    """
    return max(1, cpu_count() - 1)
thread_raise(ident, exception)

Raise an exception in a thread asynchronously.

Parameters:

Name Type Description Default
ident int

ident of the thread

required
exception Type[BaseException]

type of exception to raise in the thread

required
References
  • https://docs.python.org/3/c-api/init.html#c.PyThreadState_SetAsyncExc
Warning
  • Intended for use with CPython only
Source code in alsek/utils/system.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def thread_raise(ident: int, exception: Type[BaseException]) -> None:
    """Raise an exception in a thread asynchronously.

    Args:
        ident (int): ident of the thread
        exception (Type[BaseException]): type of exception to raise in the thread

    References:
        * https://docs.python.org/3/c-api/init.html#c.PyThreadState_SetAsyncExc

    Warning:
        * Intended for use with CPython only

    """
    n = ctypes.pythonapi.PyThreadState_SetAsyncExc(
        _cast_ident_to_ctype(ident),
        ctypes.py_object(exception),
    )
    if n != 1:
        log.warning(f"Raising {exception} in thread {ident} modified {n} threads")

temporal

Temporal Utils

from_timestamp_ms(timestamp)

Construct datetime object from UTC timestamp in milliseconds.

Parameters:

Name Type Description Default
timestamp int

UTC time in milliseconds

required

Returns:

Type Description
datetime

datetime

Source code in alsek/utils/temporal.py
21
22
23
24
25
26
27
28
29
30
31
def from_timestamp_ms(timestamp: int) -> datetime:
    """Construct datetime object from UTC timestamp in milliseconds.

    Args:
        timestamp (int): UTC time in milliseconds

    Returns:
        datetime

    """
    return datetime.fromtimestamp(timestamp / 1000)
time_ms()

Get the current time since the Epoch in milliseconds.

Returns:

Name Type Description
time int

current time in milliseconds

Source code in alsek/utils/temporal.py
34
35
36
37
38
39
40
41
def time_ms() -> int:
    """Get the current time since the Epoch in milliseconds.

    Returns:
        time (int): current time in milliseconds

    """
    return int(time.time() * 1000)
utcnow_timestamp_ms()

UTC timestamp in milliseconds.

Returns:

Name Type Description
timestamp int

UTC time in milliseconds

Source code in alsek/utils/temporal.py
11
12
13
14
15
16
17
18
def utcnow_timestamp_ms() -> int:
    """UTC timestamp in milliseconds.

    Returns:
        timestamp (int): UTC time in milliseconds

    """
    return int(datetime.utcnow().timestamp() * 1000)

waiting

Waiting

waiter(condition, sleep_interval=1 * 1000, timeout=None, timeout_msg=None)

Wait for condition.

Parameters:

Name Type Description Default
condition callable

condition to wait for

required
sleep_interval int

time (in milliseconds) to sleep between checks of condition.

1 * 1000
timeout int

maximum amount of time (in milliseconds) this function can wait for condition to evaluate to True.

None
timeout_msg str

message to display in the event of a timeout

None

Returns:

Type Description
bool

bool

Source code in alsek/utils/waiting.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def waiter(
    condition: Callable[[], bool],
    sleep_interval: int = 1 * 1000,
    timeout: Optional[int] = None,
    timeout_msg: Optional[str] = None,
) -> bool:
    """Wait for ``condition``.

    Args:
        condition (callable): condition to wait for
        sleep_interval (int): time (in milliseconds) to sleep
            between checks of ``condition``.
        timeout (int, optional): maximum amount of time (in milliseconds)
            this function can wait for ``condition`` to evaluate
            to ``True``.
        timeout_msg (str, optional): message to display in the
            event of a timeout

    Returns:
        bool

    """
    start = time_ms()
    stop_signal = StopSignalListener()
    while True:
        if stop_signal.received:
            return False
        elif condition():
            return True
        elif timeout is not None and (time_ms() - start) > timeout:
            raise TimeoutError(timeout_msg or "")
        else:
            stop_signal.wait(
                min(sleep_interval, timeout) if timeout else sleep_interval
            )