Skip to content

Tips & Tricks ⛷️

Multiprocessing Backend

It is possible to configure the multiprocessing library used by a worker pool. By default, multiprocessing from the Python standard library will be used. However, this can be changed to the PyTorch implementation of multiprocessing by setting ALSEK_MULTIPROCESSING_BACKEND to 'torch' prior to starting the worker pool.

export ALSEK_MULTIPROCESSING_BACKEND=torch

alsek thread-pool my_project

Capturing Status Updates

In some applications it may be desirable to persists status updates to multiple locations. For example, a Redis database and a PostgreSQL database. A short example of how to do this is provided below.

from typing import Optional, Any

from alsek.core.message import Message
from alsek.core.status.sync import StatusTracker
from alsek.core.status.types import TaskStatus

from sqlalchemy import Column, String, create_engine
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import declarative_base, sessionmaker

Base = declarative_base()

engine = create_engine("...")
Session = sessionmaker(bind=engine)
session = Session()


class Status(Base):
    __tablename__ = "status"

    id = Column(UUID, primary_key=True)
    status = Column(String)

    def __repr__(self) -> str:
        return f"<Status(id='{self.id}', status='{self.status}')>"


class CustomStatusTracker(StatusTracker):
    def set(self, message: Message, status: TaskStatus, detail: Optional[Any] = None) -> None:
        super().set(message, status=status, detail=detail)

        if status == TaskStatus.SUBMITTED:
            session.add(Status(id=message.uuid, status=status.name))
        else:
            record = session.query(Status).get(message.uuid)
            record.status = status.name
        session.commit()

This new CustomStatusTracker() class is a drop-in replacement for StatusTracker().

Testing

Testing an application may require a worker pool to be brought online. A small example of how to do this with pytest and multiprocessing is provided below.

First, create a conftest.py file with a background_worker_pool fixture.

import pytest
from multiprocessing import Process

from alsek.core.worker.sync.thread import ThreadWorkerPool

from my_application.tasks import task_1, task_2


def _run_pool() -> None:
    ThreadWorkerPool([task_1, task_2], backoff=None).run()


@pytest.fixture()
def background_worker_pool() -> None:
    process = Process(target=_run_pool, daemon=True)
    process.start()
    yield
    process.terminate()

Now, a worker pool can be brought online simply by including background_worker_pool in the signature of a test.

from alsek import task
from my_application.tasks import task_1


def test_processing(background_worker_pool: None) -> None:
    message = task_1.generate()
    ...

Async Worker Pool Configuration

When using async worker pools, consider these configuration tips for optimal performance:

Concurrency Tuning

# For I/O-bound tasks, you can safely set high concurrency
alsek async-pool my_project --n_tasks 500

# For mixed workloads, start conservatively 
alsek async-pool my_project --n_tasks 100

Memory Management

Async worker pools can accumulate many concurrent tasks. Adjust pruning frequency for memory efficiency:

# More frequent pruning for memory-sensitive environments
alsek async-pool my_project --prune_interval_seconds 0.05

# Less frequent pruning for lower overhead
alsek async-pool my_project --prune_interval_seconds 0.5

Testing Async Tasks

Testing async tasks requires special consideration for the event loop. Here's an example using pytest-asyncio:

import pytest
import asyncio
from alsek.core.broker.asyncio import AsyncBroker
from alsek.core.task.asyncio import AsyncTask
from alsek.core.worker.asyncio.asyncio import AsyncSingleProcessWorkerPool
from alsek.storage.backends.redis.asyncio import AsyncRedisBackend


@pytest.fixture
async def async_worker_pool():
    backend = AsyncRedisBackend()
    broker = AsyncBroker(backend)

    async def test_task(data: str) -> str:
        await asyncio.sleep(0.01)
        return f"processed: {data}"

    task = AsyncTask(test_task, broker=broker)

    pool = AsyncSingleProcessWorkerPool(
        tasks=[task],
        n_tasks=10,
        prune_interval_seconds=0.01
    )

    # Start pool in background task
    pool_task = asyncio.create_task(pool.run())
    yield pool, task

    # Cleanup
    await pool.on_shutdown()
    pool_task.cancel()


@pytest.mark.asyncio
async def test_async_processing(async_worker_pool: AsyncSingleProcessWorkerPool) -> None:
    pool, task = async_worker_pool
    message = await task.generate(args=["test_data"])

    # Wait for processing
    await asyncio.sleep(0.1)

    # Verify result
    result = await task.broker.result_store.get(message)
    assert result == "processed: test_data"