Tips & Tricks ⛷️
Multiprocessing Backend
It is possible to configure the multiprocessing library used by a worker pool.
By default, multiprocessing from
the Python standard library will be used. However, this can be changed to the PyTorch
implementation of multiprocessing
by setting ALSEK_MULTIPROCESSING_BACKEND to 'torch' prior to starting the worker pool.
export ALSEK_MULTIPROCESSING_BACKEND=torch
alsek thread-pool my_project
Capturing Status Updates
In some applications it may be desirable to persists status updates to multiple locations. For example, a Redis database and a PostgreSQL database. A short example of how to do this is provided below.
from typing import Optional, Any
from alsek.core.message import Message
from alsek.core.status.sync import StatusTracker
from alsek.core.status.types import TaskStatus
from sqlalchemy import Column, String, create_engine
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import declarative_base, sessionmaker
Base = declarative_base()
engine = create_engine("...")
Session = sessionmaker(bind=engine)
session = Session()
class Status(Base):
__tablename__ = "status"
id = Column(UUID, primary_key=True)
status = Column(String)
def __repr__(self) -> str:
return f"<Status(id='{self.id}', status='{self.status}')>"
class CustomStatusTracker(StatusTracker):
def set(self, message: Message, status: TaskStatus, detail: Optional[Any] = None) -> None:
super().set(message, status=status, detail=detail)
if status == TaskStatus.SUBMITTED:
session.add(Status(id=message.uuid, status=status.name))
else:
record = session.query(Status).get(message.uuid)
record.status = status.name
session.commit()
This new CustomStatusTracker() class is a drop-in replacement for StatusTracker().
Testing
Testing an application may require a worker pool to be brought online. A small example of how to do this with pytest and multiprocessing is provided below.
First, create a conftest.py file with a background_worker_pool fixture.
import pytest
from multiprocessing import Process
from alsek.core.worker.sync.thread import ThreadWorkerPool
from my_application.tasks import task_1, task_2
def _run_pool() -> None:
ThreadWorkerPool([task_1, task_2], backoff=None).run()
@pytest.fixture()
def background_worker_pool() -> None:
process = Process(target=_run_pool, daemon=True)
process.start()
yield
process.terminate()
Now, a worker pool can be brought online simply by including
background_worker_pool in the signature of a test.
from alsek import task
from my_application.tasks import task_1
def test_processing(background_worker_pool: None) -> None:
message = task_1.generate()
...
Async Worker Pool Configuration
When using async worker pools, consider these configuration tips for optimal performance:
Concurrency Tuning
# For I/O-bound tasks, you can safely set high concurrency
alsek async-pool my_project --n_tasks 500
# For mixed workloads, start conservatively
alsek async-pool my_project --n_tasks 100
Memory Management
Async worker pools can accumulate many concurrent tasks. Adjust pruning frequency for memory efficiency:
# More frequent pruning for memory-sensitive environments
alsek async-pool my_project --prune_interval_seconds 0.05
# Less frequent pruning for lower overhead
alsek async-pool my_project --prune_interval_seconds 0.5
Testing Async Tasks
Testing async tasks requires special consideration for the event loop. Here's an example using pytest-asyncio:
import pytest
import asyncio
from alsek.core.broker.asyncio import AsyncBroker
from alsek.core.task.asyncio import AsyncTask
from alsek.core.worker.asyncio.asyncio import AsyncSingleProcessWorkerPool
from alsek.storage.backends.redis.asyncio import AsyncRedisBackend
@pytest.fixture
async def async_worker_pool():
backend = AsyncRedisBackend()
broker = AsyncBroker(backend)
async def test_task(data: str) -> str:
await asyncio.sleep(0.01)
return f"processed: {data}"
task = AsyncTask(test_task, broker=broker)
pool = AsyncSingleProcessWorkerPool(
tasks=[task],
n_tasks=10,
prune_interval_seconds=0.01
)
# Start pool in background task
pool_task = asyncio.create_task(pool.run())
yield pool, task
# Cleanup
await pool.on_shutdown()
pool_task.cancel()
@pytest.mark.asyncio
async def test_async_processing(async_worker_pool: AsyncSingleProcessWorkerPool) -> None:
pool, task = async_worker_pool
message = await task.generate(args=["test_data"])
# Wait for processing
await asyncio.sleep(0.1)
# Verify result
result = await task.broker.result_store.get(message)
assert result == "processed: test_data"