Skip to content

Dependencies Guide

Docket tasks include a dependency injection system that provides access to context, configuration, and custom resources. This system is similar to FastAPI's dependency injection but tailored for background task patterns.

Built-in Context Dependencies

Accessing the Current Docket

Tasks often need to schedule more work. The CurrentDocket dependency gives you access to the same docket the worker is processing:

from pathlib import Path
from datetime import datetime, timedelta, timezone
from docket import Docket, CurrentDocket

def now() -> datetime:
    return datetime.now(timezone.utc)

async def poll_for_file(
    file_path: str,
    docket: Docket = CurrentDocket()
) -> None:
    path = Path(file_path)
    if path.exists():
        print(f"File {file_path} found!")
        return

    # Schedule another check in 30 seconds
    await docket.add(
        poll_for_file,
        when=now() + timedelta(seconds=30)
    )(file_path)

This is especially useful for self-perpetuating tasks that create chains of future work.

Getting Your Task Key

Use TaskKey to access the current task's key, which is helpful for creating related work or maintaining task chains:

from docket import CurrentDocket, TaskKey

async def process_data_chunk(
    dataset_id: int,
    chunk: int,
    total_chunks: int,
    key: str = TaskKey(),
    docket: Docket = CurrentDocket()
) -> None:
    print(f"Processing chunk {chunk}/{total_chunks} for dataset {dataset_id}")

    # Process this chunk...
    await process_chunk_data(dataset_id, chunk)

    if chunk < total_chunks:
        # Schedule next chunk with a related key
        next_key = f"dataset-{dataset_id}-chunk-{chunk + 1}"
        await docket.add(
            process_data_chunk,
            key=next_key
        )(dataset_id, chunk + 1, total_chunks)

Worker and Execution Context

Access the current worker and execution details when needed:

from docket import CurrentWorker, CurrentExecution, Worker, Execution

async def diagnostic_task(
    worker: Worker = CurrentWorker(),
    execution: Execution = CurrentExecution()
) -> None:
    print(f"Running on worker: {worker.name}")
    print(f"Task key: {execution.key}")
    print(f"Scheduled at: {execution.when}")
    print(f"Worker concurrency: {worker.concurrency}")

Advanced Retry Patterns

Exponential Backoff

For services that might be overloaded, exponential backoff gives them time to recover:

from docket import ExponentialRetry

async def call_external_api(
    url: str,
    retry: ExponentialRetry = ExponentialRetry(
        attempts=5,
        minimum_delay=timedelta(seconds=1),
        maximum_delay=timedelta(minutes=5)
    )
) -> None:
    # Retries with delays: 1s, 2s, 4s, 8s, 16s (but capped at 5 minutes)
    try:
        response = await http_client.get(url)
        response.raise_for_status()
        print(f"API call succeeded on attempt {retry.attempt}")
    except Exception as e:
        print(f"Attempt {retry.attempt} failed: {e}")
        raise

Unlimited Retries

For critical tasks that must eventually succeed, use attempts=None:

from docket import Retry

async def critical_data_sync(
    source_url: str,
    retry: Retry = Retry(attempts=None, delay=timedelta(minutes=5))
) -> None:
    # This will retry forever with 5-minute delays until it succeeds
    await sync_critical_data(source_url)
    print(f"Critical sync completed after {retry.attempt} attempts")

Both Retry and ExponentialRetry support unlimited retries this way.

Task Timeouts

Prevent tasks from running too long with the Timeout dependency:

from docket import Timeout

async def data_processing_task(
    large_dataset: dict,
    timeout: Timeout = Timeout(timedelta(minutes=10))
) -> None:
    # This task will be cancelled if it runs longer than 10 minutes
    await process_dataset_phase_one(large_dataset)

    # Extend timeout if we need more time for phase two
    timeout.extend(timedelta(minutes=5))
    await process_dataset_phase_two(large_dataset)

The extend() method can take a specific duration or default to the original timeout duration:

async def adaptive_timeout_task(
    timeout: Timeout = Timeout(timedelta(minutes=2))
) -> None:
    await quick_check()

    # Extend by the base timeout (another 2 minutes)
    timeout.extend()
    await longer_operation()

Timeouts work alongside retries. If a task times out, it can be retried according to its retry policy.

Custom Dependencies

Create your own dependencies using Depends() for reusable resources and patterns. Dependencies can be either synchronous or asynchronous.

Synchronous Dependencies

Use sync dependencies for pure computations and in-memory operations:

from docket import Depends

# In-memory config lookup - no I/O
def get_config() -> dict:
    """Access configuration from memory."""
    return {"api_url": "https://api.example.com", "timeout": 30}

# Pure computation - no I/O
def build_request_headers(config: dict = Depends(get_config)) -> dict:
    """Construct headers from config."""
    return {
        "User-Agent": "MyApp/1.0",
        "Timeout": str(config["timeout"])
    }

async def call_api(
    headers: dict = Depends(build_request_headers)
) -> None:
    # Headers are computed without blocking
    # Network I/O happens here (async)
    response = await http_client.get(url, headers=headers)

Important: Synchronous dependencies should NOT include blocking I/O operations (file access, network calls, database queries, etc.) as it will block the event loop and prevent tasks from being executed. Use async dependencies for any I/O. Sync dependencies are best for: - Pure computations - In-memory data structure access - Configuration lookups from memory - Non-blocking transformations

Asynchronous Dependencies

from contextlib import asynccontextmanager
from docket import Depends

@asynccontextmanager
async def get_database_connection():
    """Async dependency that returns a database connection."""
    conn = await database.connect()
    try:
        yield conn
    finally:
        await conn.close()

async def process_user_data(
    user_id: int,
    db=Depends(get_database_connection)
) -> None:
    # Database connection is automatically provided and cleaned up
    user = await db.fetch_user(user_id)
    await db.update_user(user_id, {"last_seen": datetime.now()})

Synchronous Context Managers

Use sync context managers only for managing in-memory resources or quick non-blocking operations:

from contextlib import contextmanager
from docket import Depends

# In-memory resource tracking - no I/O
@contextmanager
def track_operation(operation_name: str):
    """Track operation execution without blocking."""
    operations_in_progress.add(operation_name)  # In-memory set
    try:
        yield operation_name
    finally:
        operations_in_progress.remove(operation_name)

async def process_data(
    tracker=Depends(lambda: track_operation("data_processing"))
) -> None:
    # Operation tracked in memory, no blocking
    await perform_async_work()

Mixed Sync and Async Dependencies

You can freely mix synchronous and asynchronous dependencies in the same task. Use sync for computations, async for I/O:

# Sync - in-memory config lookup
def get_local_config() -> dict:
    """Access local config from memory - no I/O."""
    return {"retry_count": 3, "batch_size": 100}

# Async - network I/O
async def get_remote_config() -> dict:
    """Fetch remote config via network - requires I/O."""
    response = await http_client.get("/api/config")
    return await response.json()

# Sync - pure computation
def merge_configs(
    local: dict = Depends(get_local_config),
    remote: dict = Depends(get_remote_config)
) -> dict:
    """Merge configs without blocking - pure computation."""
    return {**local, **remote}

async def process_batch(
    config: dict = Depends(merge_configs)
) -> None:
    # Config is computed/fetched appropriately
    # Now do the actual I/O work
    for i in range(config["batch_size"]):
        await process_item(i, retries=config["retry_count"])

Nested Dependencies

Dependencies can depend on other dependencies, and Docket resolves them in the correct order:

async def get_auth_service(db=Depends(get_database_connection)):
    """A service that depends on the database connection."""
    return AuthService(db)

async def get_user_service(
    db=Depends(get_database_connection),
    auth=Depends(get_auth_service)
):
    """A service that depends on both database and auth service."""
    return UserService(db, auth)

async def update_user_profile(
    user_id: int,
    profile_data: dict,
    user_service=Depends(get_user_service)
) -> None:
    # All dependencies are resolved automatically:
    # db -> auth_service -> user_service -> this task
    await user_service.update_profile(user_id, profile_data)

Dependencies are resolved once per task execution and cached, so if multiple parameters depend on the same resource, only one instance is created. This caching works across both sync and async dependencies.

Dependencies with Built-in Context

Dependencies can access Docket's built-in context dependencies:

async def get_task_logger(
    execution: Execution = CurrentExecution(),
    worker: Worker = CurrentWorker()
) -> LoggerAdapter:
    """Create a logger with task and worker context."""
    logger = logging.getLogger(f"worker.{worker.name}")
    return LoggerAdapter(logger, {
        'task_key': execution.key,
        'worker_name': worker.name
    })

async def important_task(
    data: dict,
    logger=Depends(get_task_logger)
) -> None:
    logger.info("Starting important task")
    await process_important_data(data)
    logger.info("Important task completed")

TaskArgument: Accessing Task Parameters

Dependencies can access the task's input arguments using TaskArgument:

from docket import TaskArgument

async def get_user_context(user_id: int = TaskArgument()) -> dict:
    """Dependency that fetches user context based on task argument."""
    user = await fetch_user(user_id)
    return {
        'user': user,
        'permissions': await fetch_user_permissions(user_id),
        'preferences': await fetch_user_preferences(user_id)
    }

async def send_personalized_email(
    user_id: int,
    message: str,
    user_context=Depends(get_user_context)
) -> None:
    # user_context is populated based on the user_id argument
    email = personalize_email(message, user_context['preferences'])
    await send_email(user_context['user'].email, email)

You can access arguments by name or make them optional:

async def get_optional_config(
    config_name: str | None = TaskArgument("config", optional=True)
) -> dict:
    """Get configuration if provided, otherwise use defaults."""
    if config_name:
        return await load_config(config_name)
    return DEFAULT_CONFIG

async def flexible_task(
    data: dict,
    config: str | None = None,  # Optional argument
    resolved_config=Depends(get_optional_config)
) -> None:
    # resolved_config will be loaded config or defaults
    await process_data(data, resolved_config)

Dependency Error Handling

When dependencies fail, the entire task fails with detailed error information:

async def unreliable_dependency():
    if random.random() < 0.5:
        raise ValueError("Service unavailable")
    return "success"

async def dependent_task(
    value=Depends(unreliable_dependency)
) -> None:
    print(f"Got value: {value}")

If unreliable_dependency fails, the task won't execute and the error will be logged with context about which dependency failed. This prevents tasks from running with incomplete or invalid dependencies.

Dependency Guidelines

Choose Sync vs Async Appropriately

Use synchronous dependencies for: - Pure computations (math, string manipulation, data transformations) - In-memory data structure access (dicts, lists, sets) - Configuration lookups from memory - Non-blocking operations that complete instantly

Use asynchronous dependencies for: - Network I/O (HTTP requests, API calls) - File I/O (reading/writing files) - Database queries - Any operation that involves await - Resource management requiring async cleanup

# ✅ Good: Sync for pure computation
def calculate_batch_size(item_count: int) -> int:
    return min(item_count, 1000)

# ✅ Good: Async for I/O
async def fetch_user_data(user_id: int) -> dict:
    return await api_client.get(f"/users/{user_id}")

# ❌ Bad: Sync with blocking I/O
def load_config_from_file() -> dict:
    with open("config.json") as f:  # Blocks the event loop!
        return json.load(f)

# ✅ Good: Use async for file I/O instead
async def load_config_from_file() -> dict:
    async with aiofiles.open("config.json") as f:
        return json.loads(await f.read())

Design for Reusability

Create dependencies that can be used across multiple tasks:

# Good: Reusable across many tasks
async def get_api_client():
    return APIClient(api_key=os.getenv("API_KEY"))

# Less ideal: Too specific to one task
async def get_user_api_client_for_profile_updates():
    return APIClient(api_key=os.getenv("API_KEY"), timeout=30)

Keep Dependencies Focused

Each dependency should have a single responsibility:

# Good: Focused dependencies
async def get_database():
    return await database.connect()

async def get_cache():
    return redis.Redis()

# Less ideal: Too many responsibilities
async def get_all_services():
    return {
        'db': await database.connect(),
        'cache': redis.Redis(),
        'api': APIClient(),
        'metrics': MetricsClient()
    }

Handle Resource Cleanup

Always use context managers or try/finally for resource cleanup:

# Good: Automatic cleanup
async def get_database():
    conn = await database.connect()
    try:
        yield conn
    finally:
        await conn.close()

# Risky: Manual cleanup required
async def get_database_no_cleanup():
    return await database.connect()  # Who closes this?

The dependency injection system supports flexible task design while maintaining clear separation of concerns. Dependencies can be simple values, complex services, or entire subsystems that your tasks need to operate effectively.