Tips & Tricks ⛷️
Multiprocessing Backend
It is possible to configure the multiprocessing library used by a worker pool.
By default, multiprocessing
from
the Python standard library will be used. However, this can be changed to the PyTorch
implementation of multiprocessing
by setting ALSEK_MULTIPROCESSING_BACKEND
to 'torch'
prior to starting the worker pool.
export ALSEK_MULTIPROCESSING_BACKEND=torch
alsek thread-pool my_project
Capturing Status Updates
In some applications it may be desirable to persists status updates to multiple locations. For example, a Redis database and a PostgreSQL database. A short example of how to do this is provided below.
from typing import Optional, Any
from alsek.core.message import Message
from alsek.core.status.standard import StatusTracker
from alsek.core.status.types import TaskStatus
from sqlalchemy import Column, String, create_engine
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import declarative_base, sessionmaker
Base = declarative_base()
engine = create_engine("...")
Session = sessionmaker(bind=engine)
session = Session()
class Status(Base):
__tablename__ = "status"
id = Column(UUID, primary_key=True)
status = Column(String)
def __repr__(self) -> str:
return f"<Status(id='{self.id}', status='{self.status}')>"
class CustomStatusTracker(StatusTracker):
def set(self, message: Message, status: TaskStatus, detail: Optional[Any] = None) -> None:
super().set(message, status=status, detail=detail)
if status == TaskStatus.SUBMITTED:
session.add(Status(id=message.uuid, status=status.name))
else:
record = session.query(Status).get(message.uuid)
record.status = status.name
session.commit()
This new CustomStatusTracker()
class is a drop-in replacement for StatusTracker()
.
Testing
Testing an application may require a worker pool to be brought online. A small example of how to do this with pytest and multiprocessing is provided below.
First, create a conftest.py
file with a background_worker_pool
fixture.
import pytest
from multiprocessing import Process
from alsek.core.worker.thread import ThreadWorkerPool
from my_application.tasks import task_1, task_2
def _run_pool() -> None:
ThreadWorkerPool([task_1, task_2], backoff=None).run()
@pytest.fixture()
def background_worker_pool() -> None:
process = Process(target=_run_pool, daemon=True)
process.start()
yield
process.terminate()
Now, a worker pool can be brought online simply by including
background_worker_pool
in the signature of a test.
from alsek import task
from my_application.tasks import task_1
def test_processing(background_worker_pool: None) -> None:
message = task_1.generate()
...