Skip to content

API Reference

docket

docket - A distributed background task system for Python functions.

docket focuses on scheduling future work as seamlessly and efficiently as immediate work.

Docket

A Docket represents a collection of tasks that may be scheduled for later execution. With a Docket, you can add, replace, and cancel tasks. Example:

@task
async def my_task(greeting: str, recipient: str) -> None:
    print(f"{greeting}, {recipient}!")

async with Docket() as docket:
    docket.add(my_task)("Hello", recipient="world")
Source code in src/docket/docket.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
class Docket:
    """A Docket represents a collection of tasks that may be scheduled for later
    execution.  With a Docket, you can add, replace, and cancel tasks.
    Example:

    ```python
    @task
    async def my_task(greeting: str, recipient: str) -> None:
        print(f"{greeting}, {recipient}!")

    async with Docket() as docket:
        docket.add(my_task)("Hello", recipient="world")
    ```
    """

    tasks: dict[str, TaskFunction]
    strike_list: StrikeList

    _monitor_strikes_task: asyncio.Task[None]
    _connection_pool: ConnectionPool

    def __init__(
        self,
        name: str = "docket",
        url: str = "redis://localhost:6379/0",
        heartbeat_interval: timedelta = timedelta(seconds=2),
        missed_heartbeats: int = 5,
    ) -> None:
        """
        Args:
            name: The name of the docket.
            url: The URL of the Redis server.  For example:
                - "redis://localhost:6379/0"
                - "redis://user:password@localhost:6379/0"
                - "redis://user:password@localhost:6379/0?ssl=true"
                - "rediss://localhost:6379/0"
                - "unix:///path/to/redis.sock"
            heartbeat_interval: How often workers send heartbeat messages to the docket.
            missed_heartbeats: How many heartbeats a worker can miss before it is
                considered dead.
        """
        self.name = name
        self.url = url
        self.heartbeat_interval = heartbeat_interval
        self.missed_heartbeats = missed_heartbeats

    @property
    def worker_group_name(self) -> str:
        return "docket-workers"

    async def __aenter__(self) -> Self:
        from .tasks import standard_tasks

        self.tasks = {fn.__name__: fn for fn in standard_tasks}
        self.strike_list = StrikeList()

        self._connection_pool = ConnectionPool.from_url(self.url)  # type: ignore
        self._monitor_strikes_task = asyncio.create_task(self._monitor_strikes())

        # Ensure that the stream and worker group exist
        try:
            async with self.redis() as r:
                await r.xgroup_create(
                    groupname=self.worker_group_name,
                    name=self.stream_key,
                    id="0-0",
                    mkstream=True,
                )
        except redis.exceptions.RedisError as e:
            if "BUSYGROUP" not in repr(e):
                raise

        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        del self.tasks
        del self.strike_list

        self._monitor_strikes_task.cancel()
        try:
            await self._monitor_strikes_task
        except asyncio.CancelledError:
            pass

        await asyncio.shield(self._connection_pool.disconnect())
        del self._connection_pool

    @asynccontextmanager
    async def redis(self) -> AsyncGenerator[Redis, None]:
        r = Redis(connection_pool=self._connection_pool)
        await r.__aenter__()
        try:
            yield r
        finally:
            await asyncio.shield(r.__aexit__(None, None, None))

    def register(self, function: TaskFunction) -> None:
        """Register a task with the Docket.

        Args:
            function: The task to register.
        """
        from .dependencies import validate_dependencies

        validate_dependencies(function)

        self.tasks[function.__name__] = function

    def register_collection(self, collection_path: str) -> None:
        """
        Register a collection of tasks.

        Args:
            collection_path: A path in the format "module:collection".
        """
        module_name, _, member_name = collection_path.rpartition(":")
        module = importlib.import_module(module_name)
        collection = getattr(module, member_name)
        for function in collection:
            self.register(function)

    def labels(self) -> Mapping[str, str]:
        return {
            "docket.name": self.name,
        }

    @overload
    def add(
        self,
        function: Callable[P, Awaitable[R]],
        when: datetime | None = None,
        key: str | None = None,
    ) -> Callable[P, Awaitable[Execution]]:
        """Add a task to the Docket.

        Args:
            function: The task function to add.
            when: The time to schedule the task.
            key: The key to schedule the task under.
        """

    @overload
    def add(
        self,
        function: str,
        when: datetime | None = None,
        key: str | None = None,
    ) -> Callable[..., Awaitable[Execution]]:
        """Add a task to the Docket.

        Args:
            function: The name of a task to add.
            when: The time to schedule the task.
            key: The key to schedule the task under.
        """

    def add(
        self,
        function: Callable[P, Awaitable[R]] | str,
        when: datetime | None = None,
        key: str | None = None,
    ) -> Callable[..., Awaitable[Execution]]:
        """Add a task to the Docket.

        Args:
            function: The task to add.
            when: The time to schedule the task.
            key: The key to schedule the task under.
        """
        if isinstance(function, str):
            function = self.tasks[function]
        else:
            self.register(function)

        if when is None:
            when = datetime.now(timezone.utc)

        if key is None:
            key = str(uuid7())

        async def scheduler(*args: P.args, **kwargs: P.kwargs) -> Execution:
            execution = Execution(function, args, kwargs, when, key, attempt=1)

            async with self.redis() as redis:
                async with redis.pipeline() as pipeline:
                    await self._schedule(redis, pipeline, execution, replace=False)
                    await pipeline.execute()

            TASKS_ADDED.add(1, {**self.labels(), **execution.general_labels()})
            TASKS_SCHEDULED.add(1, {**self.labels(), **execution.general_labels()})

            return execution

        return scheduler

    @overload
    def replace(
        self,
        function: Callable[P, Awaitable[R]],
        when: datetime,
        key: str,
    ) -> Callable[P, Awaitable[Execution]]:
        """Replace a previously scheduled task on the Docket.

        Args:
            function: The task function to replace.
            when: The time to schedule the task.
            key: The key to schedule the task under.
        """

    @overload
    def replace(
        self,
        function: str,
        when: datetime,
        key: str,
    ) -> Callable[..., Awaitable[Execution]]:
        """Replace a previously scheduled task on the Docket.

        Args:
            function: The name of a task to replace.
            when: The time to schedule the task.
            key: The key to schedule the task under.
        """

    def replace(
        self,
        function: Callable[P, Awaitable[R]] | str,
        when: datetime,
        key: str,
    ) -> Callable[..., Awaitable[Execution]]:
        """Replace a previously scheduled task on the Docket.

        Args:
            function: The task to replace.
            when: The time to schedule the task.
            key: The key to schedule the task under.
        """
        if isinstance(function, str):
            function = self.tasks[function]

        async def scheduler(*args: P.args, **kwargs: P.kwargs) -> Execution:
            execution = Execution(function, args, kwargs, when, key, attempt=1)

            async with self.redis() as redis:
                async with redis.pipeline() as pipeline:
                    await self._schedule(redis, pipeline, execution, replace=True)
                    await pipeline.execute()

            TASKS_REPLACED.add(1, {**self.labels(), **execution.general_labels()})
            TASKS_CANCELLED.add(1, {**self.labels(), **execution.general_labels()})
            TASKS_SCHEDULED.add(1, {**self.labels(), **execution.general_labels()})

            return execution

        return scheduler

    async def schedule(self, execution: Execution) -> None:
        with tracer.start_as_current_span(
            "docket.schedule",
            attributes={
                **self.labels(),
                **execution.specific_labels(),
                "code.function.name": execution.function.__name__,
            },
        ):
            async with self.redis() as redis:
                async with redis.pipeline() as pipeline:
                    await self._schedule(redis, pipeline, execution, replace=False)
                    await pipeline.execute()

        TASKS_SCHEDULED.add(1, {**self.labels(), **execution.general_labels()})

    async def cancel(self, key: str) -> None:
        """Cancel a previously scheduled task on the Docket.

        Args:
            key: The key of the task to cancel.
        """
        with tracer.start_as_current_span(
            "docket.cancel",
            attributes={**self.labels(), "docket.key": key},
        ):
            async with self.redis() as redis:
                async with redis.pipeline() as pipeline:
                    await self._cancel(pipeline, key)
                    await pipeline.execute()

        TASKS_CANCELLED.add(1, self.labels())

    @property
    def queue_key(self) -> str:
        return f"{self.name}:queue"

    @property
    def stream_key(self) -> str:
        return f"{self.name}:stream"

    def known_task_key(self, key: str) -> str:
        return f"{self.name}:known:{key}"

    def parked_task_key(self, key: str) -> str:
        return f"{self.name}:{key}"

    async def _schedule(
        self,
        redis: Redis,
        pipeline: Pipeline,
        execution: Execution,
        replace: bool = False,
    ) -> None:
        if self.strike_list.is_stricken(execution):
            logger.warning(
                "%r is stricken, skipping schedule of %r",
                execution.function.__name__,
                execution.key,
            )
            TASKS_STRICKEN.add(
                1,
                {
                    **self.labels(),
                    **execution.specific_labels(),
                    "docket.where": "docket",
                },
            )
            return

        message: dict[bytes, bytes] = execution.as_message()
        propagate.inject(message, setter=message_setter)

        key = execution.key
        when = execution.when
        known_task_key = self.known_task_key(key)

        async with redis.lock(f"{known_task_key}:lock", timeout=10):
            if replace:
                await self._cancel(pipeline, key)
            else:
                # if the task is already in the queue or stream, retain it
                if await redis.exists(known_task_key):
                    logger.debug(
                        "Task %r is already in the queue or stream, not scheduling",
                        key,
                        extra=self.labels(),
                    )
                    return

            pipeline.set(known_task_key, when.timestamp())

            if when <= datetime.now(timezone.utc):
                pipeline.xadd(self.stream_key, message)  # type: ignore[arg-type]
            else:
                pipeline.hset(self.parked_task_key(key), mapping=message)  # type: ignore[arg-type]
                pipeline.zadd(self.queue_key, {key: when.timestamp()})

    async def _cancel(self, pipeline: Pipeline, key: str) -> None:
        pipeline.delete(self.known_task_key(key))
        pipeline.delete(self.parked_task_key(key))
        pipeline.zrem(self.queue_key, key)

    @property
    def strike_key(self) -> str:
        return f"{self.name}:strikes"

    async def strike(
        self,
        function: Callable[P, Awaitable[R]] | str | None = None,
        parameter: str | None = None,
        operator: Operator | LiteralOperator = "==",
        value: Hashable | None = None,
    ) -> None:
        """Strike a task from the Docket.

        Args:
            function: The task to strike.
            parameter: The parameter to strike on.
            operator: The operator to use.
            value: The value to strike on.
        """
        if not isinstance(function, (str, type(None))):
            function = function.__name__

        operator = Operator(operator)

        strike = Strike(function, parameter, operator, value)
        return await self._send_strike_instruction(strike)

    async def restore(
        self,
        function: Callable[P, Awaitable[R]] | str | None = None,
        parameter: str | None = None,
        operator: Operator | LiteralOperator = "==",
        value: Hashable | None = None,
    ) -> None:
        """Restore a previously stricken task to the Docket.

        Args:
            function: The task to restore.
            parameter: The parameter to restore on.
            operator: The operator to use.
            value: The value to restore on.
        """
        if not isinstance(function, (str, type(None))):
            function = function.__name__

        operator = Operator(operator)

        restore = Restore(function, parameter, operator, value)
        return await self._send_strike_instruction(restore)

    async def _send_strike_instruction(self, instruction: StrikeInstruction) -> None:
        with tracer.start_as_current_span(
            f"docket.{instruction.direction}",
            attributes={
                **self.labels(),
                **instruction.labels(),
            },
        ):
            async with self.redis() as redis:
                message = instruction.as_message()
                await redis.xadd(self.strike_key, message)  # type: ignore[arg-type]
            self.strike_list.update(instruction)

    async def _monitor_strikes(self) -> NoReturn:
        last_id = "0-0"
        while True:
            try:
                async with self.redis() as r:
                    while True:
                        streams: RedisReadGroupResponse = await r.xread(
                            {self.strike_key: last_id},
                            count=100,
                            block=60_000,
                        )
                        for _, messages in streams:
                            for message_id, message in messages:
                                last_id = message_id
                                instruction = StrikeInstruction.from_message(message)
                                self.strike_list.update(instruction)
                                logger.info(
                                    "%s %r",
                                    (
                                        "Striking"
                                        if instruction.direction == "strike"
                                        else "Restoring"
                                    ),
                                    instruction.call_repr(),
                                    extra=self.labels(),
                                )

                                STRIKES_IN_EFFECT.add(
                                    1 if instruction.direction == "strike" else -1,
                                    {
                                        **self.labels(),
                                        **instruction.labels(),
                                    },
                                )

            except redis.exceptions.ConnectionError:  # pragma: no cover
                REDIS_DISRUPTIONS.add(1, {"docket": self.name})
                logger.warning("Connection error, sleeping for 1 second...")
                await asyncio.sleep(1)
            except Exception:  # pragma: no cover
                logger.exception("Error monitoring strikes")
                await asyncio.sleep(1)

    async def snapshot(self) -> DocketSnapshot:
        """Get a snapshot of the Docket, including which tasks are scheduled or currently
        running, as well as which workers are active.

        Returns:
            A snapshot of the Docket.
        """
        running: list[RunningExecution] = []
        future: list[Execution] = []

        async with self.redis() as r:
            async with r.pipeline() as pipeline:
                pipeline.xlen(self.stream_key)

                pipeline.zcard(self.queue_key)

                pipeline.xpending_range(
                    self.stream_key,
                    self.worker_group_name,
                    min="-",
                    max="+",
                    count=1000,
                )

                pipeline.xrange(self.stream_key, "-", "+", count=1000)

                pipeline.zrange(self.queue_key, 0, -1)

                total_stream_messages: int
                total_schedule_messages: int
                pending_messages: list[RedisStreamPendingMessage]
                stream_messages: list[tuple[RedisMessageID, RedisMessage]]
                scheduled_task_keys: list[bytes]

                now = datetime.now(timezone.utc)
                (
                    total_stream_messages,
                    total_schedule_messages,
                    pending_messages,
                    stream_messages,
                    scheduled_task_keys,
                ) = await pipeline.execute()

                for task_key in scheduled_task_keys:
                    pipeline.hgetall(self.parked_task_key(task_key.decode()))

                # Because these are two separate pipeline commands, it's possible that
                # a message has been moved from the schedule to the stream in the
                # meantime, which would end up being an empty `{}` message
                queued_messages: list[RedisMessage] = [
                    m for m in await pipeline.execute() if m
                ]

        total_tasks = total_stream_messages + total_schedule_messages

        pending_lookup: dict[RedisMessageID, RedisStreamPendingMessage] = {
            pending["message_id"]: pending for pending in pending_messages
        }

        for message_id, message in stream_messages:
            function = self.tasks[message[b"function"].decode()]
            execution = Execution.from_message(function, message)
            if message_id in pending_lookup:
                worker_name = pending_lookup[message_id]["consumer"].decode()
                started = now - timedelta(
                    milliseconds=pending_lookup[message_id]["time_since_delivered"]
                )
                running.append(RunningExecution(execution, worker_name, started))
            else:
                future.append(execution)  # pragma: no cover

        for message in queued_messages:
            function = self.tasks[message[b"function"].decode()]
            execution = Execution.from_message(function, message)
            future.append(execution)

        workers = await self.workers()

        return DocketSnapshot(now, total_tasks, future, running, workers)

    @property
    def workers_set(self) -> str:
        return f"{self.name}:workers"

    def worker_tasks_set(self, worker_name: str) -> str:
        return f"{self.name}:worker-tasks:{worker_name}"

    def task_workers_set(self, task_name: str) -> str:
        return f"{self.name}:task-workers:{task_name}"

    async def workers(self) -> Collection[WorkerInfo]:
        """Get a list of all workers that have sent heartbeats to the Docket.

        Returns:
            A list of all workers that have sent heartbeats to the Docket.
        """
        workers: list[WorkerInfo] = []

        oldest = datetime.now(timezone.utc).timestamp() - (
            self.heartbeat_interval.total_seconds() * self.missed_heartbeats
        )

        async with self.redis() as r:
            await r.zremrangebyscore(self.workers_set, 0, oldest)

            worker_name_bytes: bytes
            last_seen_timestamp: float

            for worker_name_bytes, last_seen_timestamp in await r.zrange(
                self.workers_set, 0, -1, withscores=True
            ):
                worker_name = worker_name_bytes.decode()
                last_seen = datetime.fromtimestamp(last_seen_timestamp, timezone.utc)

                task_names: set[str] = {
                    task_name_bytes.decode()
                    for task_name_bytes in cast(
                        set[bytes], await r.smembers(self.worker_tasks_set(worker_name))
                    )
                }

                workers.append(WorkerInfo(worker_name, last_seen, task_names))

        return workers

    async def task_workers(self, task_name: str) -> Collection[WorkerInfo]:
        """Get a list of all workers that are able to execute a given task.

        Args:
            task_name: The name of the task.

        Returns:
            A list of all workers that are able to execute the given task.
        """
        workers: list[WorkerInfo] = []
        oldest = datetime.now(timezone.utc).timestamp() - (
            self.heartbeat_interval.total_seconds() * self.missed_heartbeats
        )

        async with self.redis() as r:
            await r.zremrangebyscore(self.task_workers_set(task_name), 0, oldest)

            worker_name_bytes: bytes
            last_seen_timestamp: float

            for worker_name_bytes, last_seen_timestamp in await r.zrange(
                self.task_workers_set(task_name), 0, -1, withscores=True
            ):
                worker_name = worker_name_bytes.decode()
                last_seen = datetime.fromtimestamp(last_seen_timestamp, timezone.utc)

                task_names: set[str] = {
                    task_name_bytes.decode()
                    for task_name_bytes in cast(
                        set[bytes], await r.smembers(self.worker_tasks_set(worker_name))
                    )
                }

                workers.append(WorkerInfo(worker_name, last_seen, task_names))

        return workers

__init__(name='docket', url='redis://localhost:6379/0', heartbeat_interval=timedelta(seconds=2), missed_heartbeats=5)

Parameters:

Name Type Description Default
name str

The name of the docket.

'docket'
url str

The URL of the Redis server. For example: - "redis://localhost:6379/0" - "redis://user:password@localhost:6379/0" - "redis://user:password@localhost:6379/0?ssl=true" - "rediss://localhost:6379/0" - "unix:///path/to/redis.sock"

'redis://localhost:6379/0'
heartbeat_interval timedelta

How often workers send heartbeat messages to the docket.

timedelta(seconds=2)
missed_heartbeats int

How many heartbeats a worker can miss before it is considered dead.

5
Source code in src/docket/docket.py
def __init__(
    self,
    name: str = "docket",
    url: str = "redis://localhost:6379/0",
    heartbeat_interval: timedelta = timedelta(seconds=2),
    missed_heartbeats: int = 5,
) -> None:
    """
    Args:
        name: The name of the docket.
        url: The URL of the Redis server.  For example:
            - "redis://localhost:6379/0"
            - "redis://user:password@localhost:6379/0"
            - "redis://user:password@localhost:6379/0?ssl=true"
            - "rediss://localhost:6379/0"
            - "unix:///path/to/redis.sock"
        heartbeat_interval: How often workers send heartbeat messages to the docket.
        missed_heartbeats: How many heartbeats a worker can miss before it is
            considered dead.
    """
    self.name = name
    self.url = url
    self.heartbeat_interval = heartbeat_interval
    self.missed_heartbeats = missed_heartbeats

add(function, when=None, key=None)

add(
    function: Callable[P, Awaitable[R]],
    when: datetime | None = None,
    key: str | None = None,
) -> Callable[P, Awaitable[Execution]]
add(
    function: str,
    when: datetime | None = None,
    key: str | None = None,
) -> Callable[..., Awaitable[Execution]]

Add a task to the Docket.

Parameters:

Name Type Description Default
function Callable[P, Awaitable[R]] | str

The task to add.

required
when datetime | None

The time to schedule the task.

None
key str | None

The key to schedule the task under.

None
Source code in src/docket/docket.py
def add(
    self,
    function: Callable[P, Awaitable[R]] | str,
    when: datetime | None = None,
    key: str | None = None,
) -> Callable[..., Awaitable[Execution]]:
    """Add a task to the Docket.

    Args:
        function: The task to add.
        when: The time to schedule the task.
        key: The key to schedule the task under.
    """
    if isinstance(function, str):
        function = self.tasks[function]
    else:
        self.register(function)

    if when is None:
        when = datetime.now(timezone.utc)

    if key is None:
        key = str(uuid7())

    async def scheduler(*args: P.args, **kwargs: P.kwargs) -> Execution:
        execution = Execution(function, args, kwargs, when, key, attempt=1)

        async with self.redis() as redis:
            async with redis.pipeline() as pipeline:
                await self._schedule(redis, pipeline, execution, replace=False)
                await pipeline.execute()

        TASKS_ADDED.add(1, {**self.labels(), **execution.general_labels()})
        TASKS_SCHEDULED.add(1, {**self.labels(), **execution.general_labels()})

        return execution

    return scheduler

cancel(key) async

Cancel a previously scheduled task on the Docket.

Parameters:

Name Type Description Default
key str

The key of the task to cancel.

required
Source code in src/docket/docket.py
async def cancel(self, key: str) -> None:
    """Cancel a previously scheduled task on the Docket.

    Args:
        key: The key of the task to cancel.
    """
    with tracer.start_as_current_span(
        "docket.cancel",
        attributes={**self.labels(), "docket.key": key},
    ):
        async with self.redis() as redis:
            async with redis.pipeline() as pipeline:
                await self._cancel(pipeline, key)
                await pipeline.execute()

    TASKS_CANCELLED.add(1, self.labels())

register(function)

Register a task with the Docket.

Parameters:

Name Type Description Default
function TaskFunction

The task to register.

required
Source code in src/docket/docket.py
def register(self, function: TaskFunction) -> None:
    """Register a task with the Docket.

    Args:
        function: The task to register.
    """
    from .dependencies import validate_dependencies

    validate_dependencies(function)

    self.tasks[function.__name__] = function

register_collection(collection_path)

Register a collection of tasks.

Parameters:

Name Type Description Default
collection_path str

A path in the format "module:collection".

required
Source code in src/docket/docket.py
def register_collection(self, collection_path: str) -> None:
    """
    Register a collection of tasks.

    Args:
        collection_path: A path in the format "module:collection".
    """
    module_name, _, member_name = collection_path.rpartition(":")
    module = importlib.import_module(module_name)
    collection = getattr(module, member_name)
    for function in collection:
        self.register(function)

replace(function, when, key)

replace(
    function: Callable[P, Awaitable[R]],
    when: datetime,
    key: str,
) -> Callable[P, Awaitable[Execution]]
replace(
    function: str, when: datetime, key: str
) -> Callable[..., Awaitable[Execution]]

Replace a previously scheduled task on the Docket.

Parameters:

Name Type Description Default
function Callable[P, Awaitable[R]] | str

The task to replace.

required
when datetime

The time to schedule the task.

required
key str

The key to schedule the task under.

required
Source code in src/docket/docket.py
def replace(
    self,
    function: Callable[P, Awaitable[R]] | str,
    when: datetime,
    key: str,
) -> Callable[..., Awaitable[Execution]]:
    """Replace a previously scheduled task on the Docket.

    Args:
        function: The task to replace.
        when: The time to schedule the task.
        key: The key to schedule the task under.
    """
    if isinstance(function, str):
        function = self.tasks[function]

    async def scheduler(*args: P.args, **kwargs: P.kwargs) -> Execution:
        execution = Execution(function, args, kwargs, when, key, attempt=1)

        async with self.redis() as redis:
            async with redis.pipeline() as pipeline:
                await self._schedule(redis, pipeline, execution, replace=True)
                await pipeline.execute()

        TASKS_REPLACED.add(1, {**self.labels(), **execution.general_labels()})
        TASKS_CANCELLED.add(1, {**self.labels(), **execution.general_labels()})
        TASKS_SCHEDULED.add(1, {**self.labels(), **execution.general_labels()})

        return execution

    return scheduler

restore(function=None, parameter=None, operator='==', value=None) async

Restore a previously stricken task to the Docket.

Parameters:

Name Type Description Default
function Callable[P, Awaitable[R]] | str | None

The task to restore.

None
parameter str | None

The parameter to restore on.

None
operator Operator | LiteralOperator

The operator to use.

'=='
value Hashable | None

The value to restore on.

None
Source code in src/docket/docket.py
async def restore(
    self,
    function: Callable[P, Awaitable[R]] | str | None = None,
    parameter: str | None = None,
    operator: Operator | LiteralOperator = "==",
    value: Hashable | None = None,
) -> None:
    """Restore a previously stricken task to the Docket.

    Args:
        function: The task to restore.
        parameter: The parameter to restore on.
        operator: The operator to use.
        value: The value to restore on.
    """
    if not isinstance(function, (str, type(None))):
        function = function.__name__

    operator = Operator(operator)

    restore = Restore(function, parameter, operator, value)
    return await self._send_strike_instruction(restore)

snapshot() async

Get a snapshot of the Docket, including which tasks are scheduled or currently running, as well as which workers are active.

Returns:

Type Description
DocketSnapshot

A snapshot of the Docket.

Source code in src/docket/docket.py
async def snapshot(self) -> DocketSnapshot:
    """Get a snapshot of the Docket, including which tasks are scheduled or currently
    running, as well as which workers are active.

    Returns:
        A snapshot of the Docket.
    """
    running: list[RunningExecution] = []
    future: list[Execution] = []

    async with self.redis() as r:
        async with r.pipeline() as pipeline:
            pipeline.xlen(self.stream_key)

            pipeline.zcard(self.queue_key)

            pipeline.xpending_range(
                self.stream_key,
                self.worker_group_name,
                min="-",
                max="+",
                count=1000,
            )

            pipeline.xrange(self.stream_key, "-", "+", count=1000)

            pipeline.zrange(self.queue_key, 0, -1)

            total_stream_messages: int
            total_schedule_messages: int
            pending_messages: list[RedisStreamPendingMessage]
            stream_messages: list[tuple[RedisMessageID, RedisMessage]]
            scheduled_task_keys: list[bytes]

            now = datetime.now(timezone.utc)
            (
                total_stream_messages,
                total_schedule_messages,
                pending_messages,
                stream_messages,
                scheduled_task_keys,
            ) = await pipeline.execute()

            for task_key in scheduled_task_keys:
                pipeline.hgetall(self.parked_task_key(task_key.decode()))

            # Because these are two separate pipeline commands, it's possible that
            # a message has been moved from the schedule to the stream in the
            # meantime, which would end up being an empty `{}` message
            queued_messages: list[RedisMessage] = [
                m for m in await pipeline.execute() if m
            ]

    total_tasks = total_stream_messages + total_schedule_messages

    pending_lookup: dict[RedisMessageID, RedisStreamPendingMessage] = {
        pending["message_id"]: pending for pending in pending_messages
    }

    for message_id, message in stream_messages:
        function = self.tasks[message[b"function"].decode()]
        execution = Execution.from_message(function, message)
        if message_id in pending_lookup:
            worker_name = pending_lookup[message_id]["consumer"].decode()
            started = now - timedelta(
                milliseconds=pending_lookup[message_id]["time_since_delivered"]
            )
            running.append(RunningExecution(execution, worker_name, started))
        else:
            future.append(execution)  # pragma: no cover

    for message in queued_messages:
        function = self.tasks[message[b"function"].decode()]
        execution = Execution.from_message(function, message)
        future.append(execution)

    workers = await self.workers()

    return DocketSnapshot(now, total_tasks, future, running, workers)

strike(function=None, parameter=None, operator='==', value=None) async

Strike a task from the Docket.

Parameters:

Name Type Description Default
function Callable[P, Awaitable[R]] | str | None

The task to strike.

None
parameter str | None

The parameter to strike on.

None
operator Operator | LiteralOperator

The operator to use.

'=='
value Hashable | None

The value to strike on.

None
Source code in src/docket/docket.py
async def strike(
    self,
    function: Callable[P, Awaitable[R]] | str | None = None,
    parameter: str | None = None,
    operator: Operator | LiteralOperator = "==",
    value: Hashable | None = None,
) -> None:
    """Strike a task from the Docket.

    Args:
        function: The task to strike.
        parameter: The parameter to strike on.
        operator: The operator to use.
        value: The value to strike on.
    """
    if not isinstance(function, (str, type(None))):
        function = function.__name__

    operator = Operator(operator)

    strike = Strike(function, parameter, operator, value)
    return await self._send_strike_instruction(strike)

task_workers(task_name) async

Get a list of all workers that are able to execute a given task.

Parameters:

Name Type Description Default
task_name str

The name of the task.

required

Returns:

Type Description
Collection[WorkerInfo]

A list of all workers that are able to execute the given task.

Source code in src/docket/docket.py
async def task_workers(self, task_name: str) -> Collection[WorkerInfo]:
    """Get a list of all workers that are able to execute a given task.

    Args:
        task_name: The name of the task.

    Returns:
        A list of all workers that are able to execute the given task.
    """
    workers: list[WorkerInfo] = []
    oldest = datetime.now(timezone.utc).timestamp() - (
        self.heartbeat_interval.total_seconds() * self.missed_heartbeats
    )

    async with self.redis() as r:
        await r.zremrangebyscore(self.task_workers_set(task_name), 0, oldest)

        worker_name_bytes: bytes
        last_seen_timestamp: float

        for worker_name_bytes, last_seen_timestamp in await r.zrange(
            self.task_workers_set(task_name), 0, -1, withscores=True
        ):
            worker_name = worker_name_bytes.decode()
            last_seen = datetime.fromtimestamp(last_seen_timestamp, timezone.utc)

            task_names: set[str] = {
                task_name_bytes.decode()
                for task_name_bytes in cast(
                    set[bytes], await r.smembers(self.worker_tasks_set(worker_name))
                )
            }

            workers.append(WorkerInfo(worker_name, last_seen, task_names))

    return workers

workers() async

Get a list of all workers that have sent heartbeats to the Docket.

Returns:

Type Description
Collection[WorkerInfo]

A list of all workers that have sent heartbeats to the Docket.

Source code in src/docket/docket.py
async def workers(self) -> Collection[WorkerInfo]:
    """Get a list of all workers that have sent heartbeats to the Docket.

    Returns:
        A list of all workers that have sent heartbeats to the Docket.
    """
    workers: list[WorkerInfo] = []

    oldest = datetime.now(timezone.utc).timestamp() - (
        self.heartbeat_interval.total_seconds() * self.missed_heartbeats
    )

    async with self.redis() as r:
        await r.zremrangebyscore(self.workers_set, 0, oldest)

        worker_name_bytes: bytes
        last_seen_timestamp: float

        for worker_name_bytes, last_seen_timestamp in await r.zrange(
            self.workers_set, 0, -1, withscores=True
        ):
            worker_name = worker_name_bytes.decode()
            last_seen = datetime.fromtimestamp(last_seen_timestamp, timezone.utc)

            task_names: set[str] = {
                task_name_bytes.decode()
                for task_name_bytes in cast(
                    set[bytes], await r.smembers(self.worker_tasks_set(worker_name))
                )
            }

            workers.append(WorkerInfo(worker_name, last_seen, task_names))

    return workers

ExponentialRetry

Bases: Retry

Configures exponential retries for a task. You can specify the total number of attempts (or None to retry indefinitely), and the minimum and maximum delays between attempts.

Example:

@task
async def my_task(retry: ExponentialRetry = ExponentialRetry(attempts=3)) -> None:
    ...
Source code in src/docket/dependencies.py
class ExponentialRetry(Retry):
    """Configures exponential retries for a task.  You can specify the total number
    of attempts (or `None` to retry indefinitely), and the minimum and maximum delays
    between attempts.

    Example:

    ```python
    @task
    async def my_task(retry: ExponentialRetry = ExponentialRetry(attempts=3)) -> None:
        ...
    ```
    """

    def __init__(
        self,
        attempts: int | None = 1,
        minimum_delay: timedelta = timedelta(seconds=1),
        maximum_delay: timedelta = timedelta(seconds=64),
    ) -> None:
        """
        Args:
            attempts: The total number of attempts to make.  If `None`, the task will
                be retried indefinitely.
            minimum_delay: The minimum delay between attempts.
            maximum_delay: The maximum delay between attempts.
        """
        super().__init__(attempts=attempts, delay=minimum_delay)
        self.minimum_delay = minimum_delay
        self.maximum_delay = maximum_delay

    async def __aenter__(self) -> "ExponentialRetry":
        execution = self.execution.get()

        retry = ExponentialRetry(
            attempts=self.attempts,
            minimum_delay=self.minimum_delay,
            maximum_delay=self.maximum_delay,
        )
        retry.attempt = execution.attempt

        if execution.attempt > 1:
            backoff_factor = 2 ** (execution.attempt - 1)
            calculated_delay = self.minimum_delay * backoff_factor

            if calculated_delay > self.maximum_delay:
                retry.delay = self.maximum_delay
            else:
                retry.delay = calculated_delay

        return retry

__init__(attempts=1, minimum_delay=timedelta(seconds=1), maximum_delay=timedelta(seconds=64))

Parameters:

Name Type Description Default
attempts int | None

The total number of attempts to make. If None, the task will be retried indefinitely.

1
minimum_delay timedelta

The minimum delay between attempts.

timedelta(seconds=1)
maximum_delay timedelta

The maximum delay between attempts.

timedelta(seconds=64)
Source code in src/docket/dependencies.py
def __init__(
    self,
    attempts: int | None = 1,
    minimum_delay: timedelta = timedelta(seconds=1),
    maximum_delay: timedelta = timedelta(seconds=64),
) -> None:
    """
    Args:
        attempts: The total number of attempts to make.  If `None`, the task will
            be retried indefinitely.
        minimum_delay: The minimum delay between attempts.
        maximum_delay: The maximum delay between attempts.
    """
    super().__init__(attempts=attempts, delay=minimum_delay)
    self.minimum_delay = minimum_delay
    self.maximum_delay = maximum_delay

Logged

Bases: Annotation

Instructs docket to include arguments to this parameter in the log.

If length_only is True, only the length of the argument will be included in the log.

Example:

@task
def setup_new_customer(
    customer_id: Annotated[int, Logged],
    addresses: Annotated[list[Address], Logged(length_only=True)],
    password: str,
) -> None:
    ...

In the logs, you's see the task referenced as:

setup_new_customer(customer_id=123, addresses[len 2], password=...)
Source code in src/docket/annotations.py
class Logged(Annotation):
    """Instructs docket to include arguments to this parameter in the log.

    If `length_only` is `True`, only the length of the argument will be included in
    the log.

    Example:

    ```python
    @task
    def setup_new_customer(
        customer_id: Annotated[int, Logged],
        addresses: Annotated[list[Address], Logged(length_only=True)],
        password: str,
    ) -> None:
        ...
    ```

    In the logs, you's see the task referenced as:

    ```
    setup_new_customer(customer_id=123, addresses[len 2], password=...)
    ```
    """

    length_only: bool = False

    def __init__(self, length_only: bool = False) -> None:
        self.length_only = length_only

    def format(self, argument: Any) -> str:
        if self.length_only:
            if isinstance(argument, (dict, set)):
                return f"{{len {len(argument)}}}"
            elif isinstance(argument, tuple):
                return f"(len {len(argument)})"
            elif hasattr(argument, "__len__"):
                return f"[len {len(argument)}]"

        return repr(argument)

Perpetual

Bases: Dependency

Declare a task that should be run perpetually. Perpetual tasks are automatically rescheduled for the future after they finish (whether they succeed or fail). A perpetual task can be scheduled at worker startup with the automatic=True.

Example:

@task
async def my_task(perpetual: Perpetual = Perpetual()) -> None:
    ...
Source code in src/docket/dependencies.py
class Perpetual(Dependency):
    """Declare a task that should be run perpetually.  Perpetual tasks are automatically
    rescheduled for the future after they finish (whether they succeed or fail).  A
    perpetual task can be scheduled at worker startup with the `automatic=True`.

    Example:

    ```python
    @task
    async def my_task(perpetual: Perpetual = Perpetual()) -> None:
        ...
    ```
    """

    single = True

    every: timedelta
    automatic: bool

    args: tuple[Any, ...]
    kwargs: dict[str, Any]

    cancelled: bool

    def __init__(
        self,
        every: timedelta = timedelta(0),
        automatic: bool = False,
    ) -> None:
        """
        Args:
            every: The target interval between task executions.
            automatic: If set, this task will be automatically scheduled during worker
                startup and continually through the worker's lifespan.  This ensures
                that the task will always be scheduled despite crashes and other
                adverse conditions.  Automatic tasks must not require any arguments.
        """
        self.every = every
        self.automatic = automatic
        self.cancelled = False

    async def __aenter__(self) -> "Perpetual":
        execution = self.execution.get()
        perpetual = Perpetual(every=self.every)
        perpetual.args = execution.args
        perpetual.kwargs = execution.kwargs
        return perpetual

    def cancel(self) -> None:
        self.cancelled = True

    def perpetuate(self, *args: Any, **kwargs: Any) -> None:
        self.args = args
        self.kwargs = kwargs

__init__(every=timedelta(0), automatic=False)

Parameters:

Name Type Description Default
every timedelta

The target interval between task executions.

timedelta(0)
automatic bool

If set, this task will be automatically scheduled during worker startup and continually through the worker's lifespan. This ensures that the task will always be scheduled despite crashes and other adverse conditions. Automatic tasks must not require any arguments.

False
Source code in src/docket/dependencies.py
def __init__(
    self,
    every: timedelta = timedelta(0),
    automatic: bool = False,
) -> None:
    """
    Args:
        every: The target interval between task executions.
        automatic: If set, this task will be automatically scheduled during worker
            startup and continually through the worker's lifespan.  This ensures
            that the task will always be scheduled despite crashes and other
            adverse conditions.  Automatic tasks must not require any arguments.
    """
    self.every = every
    self.automatic = automatic
    self.cancelled = False

Retry

Bases: Dependency

Configures linear retries for a task. You can specify the total number of attempts (or None to retry indefinitely), and the delay between attempts.

Example:

@task
async def my_task(retry: Retry = Retry(attempts=3)) -> None:
    ...
Source code in src/docket/dependencies.py
class Retry(Dependency):
    """Configures linear retries for a task.  You can specify the total number of
    attempts (or `None` to retry indefinitely), and the delay between attempts.

    Example:

    ```python
    @task
    async def my_task(retry: Retry = Retry(attempts=3)) -> None:
        ...
    ```
    """

    single: bool = True

    def __init__(
        self, attempts: int | None = 1, delay: timedelta = timedelta(0)
    ) -> None:
        """
        Args:
            attempts: The total number of attempts to make.  If `None`, the task will
                be retried indefinitely.
            delay: The delay between attempts.
        """
        self.attempts = attempts
        self.delay = delay
        self.attempt = 1

    async def __aenter__(self) -> "Retry":
        execution = self.execution.get()
        retry = Retry(attempts=self.attempts, delay=self.delay)
        retry.attempt = execution.attempt
        return retry

__init__(attempts=1, delay=timedelta(0))

Parameters:

Name Type Description Default
attempts int | None

The total number of attempts to make. If None, the task will be retried indefinitely.

1
delay timedelta

The delay between attempts.

timedelta(0)
Source code in src/docket/dependencies.py
def __init__(
    self, attempts: int | None = 1, delay: timedelta = timedelta(0)
) -> None:
    """
    Args:
        attempts: The total number of attempts to make.  If `None`, the task will
            be retried indefinitely.
        delay: The delay between attempts.
    """
    self.attempts = attempts
    self.delay = delay
    self.attempt = 1

Timeout

Bases: Dependency

Configures a timeout for a task. You can specify the base timeout, and the task will be cancelled if it exceeds this duration. The timeout may be extended within the context of a single running task.

Example:

@task
async def my_task(timeout: Timeout = Timeout(timedelta(seconds=10))) -> None:
    ...
Source code in src/docket/dependencies.py
class Timeout(Dependency):
    """Configures a timeout for a task.  You can specify the base timeout, and the
    task will be cancelled if it exceeds this duration.  The timeout may be extended
    within the context of a single running task.

    Example:

    ```python
    @task
    async def my_task(timeout: Timeout = Timeout(timedelta(seconds=10))) -> None:
        ...
    ```
    """

    single: bool = True

    base: timedelta
    _deadline: float

    def __init__(self, base: timedelta) -> None:
        """
        Args:
            base: The base timeout duration.
        """
        self.base = base

    async def __aenter__(self) -> "Timeout":
        timeout = Timeout(base=self.base)
        timeout.start()
        return timeout

    def start(self) -> None:
        self._deadline = time.monotonic() + self.base.total_seconds()

    def expired(self) -> bool:
        return time.monotonic() >= self._deadline

    def remaining(self) -> timedelta:
        """Get the remaining time until the timeout expires."""
        return timedelta(seconds=self._deadline - time.monotonic())

    def extend(self, by: timedelta | None = None) -> None:
        """Extend the timeout by a given duration.  If no duration is provided, the
        base timeout will be used.

        Args:
            by: The duration to extend the timeout by.
        """
        if by is None:
            by = self.base
        self._deadline += by.total_seconds()

__init__(base)

Parameters:

Name Type Description Default
base timedelta

The base timeout duration.

required
Source code in src/docket/dependencies.py
def __init__(self, base: timedelta) -> None:
    """
    Args:
        base: The base timeout duration.
    """
    self.base = base

extend(by=None)

Extend the timeout by a given duration. If no duration is provided, the base timeout will be used.

Parameters:

Name Type Description Default
by timedelta | None

The duration to extend the timeout by.

None
Source code in src/docket/dependencies.py
def extend(self, by: timedelta | None = None) -> None:
    """Extend the timeout by a given duration.  If no duration is provided, the
    base timeout will be used.

    Args:
        by: The duration to extend the timeout by.
    """
    if by is None:
        by = self.base
    self._deadline += by.total_seconds()

remaining()

Get the remaining time until the timeout expires.

Source code in src/docket/dependencies.py
def remaining(self) -> timedelta:
    """Get the remaining time until the timeout expires."""
    return timedelta(seconds=self._deadline - time.monotonic())

Worker

A Worker executes tasks on a Docket. You may run as many workers as you like to work a single Docket.

Example:

async with Docket() as docket:
    async with Worker(docket) as worker:
        await worker.run_forever()
Source code in src/docket/worker.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
class Worker:
    """A Worker executes tasks on a Docket.  You may run as many workers as you like
    to work a single Docket.

    Example:

    ```python
    async with Docket() as docket:
        async with Worker(docket) as worker:
            await worker.run_forever()
    ```
    """

    docket: Docket
    name: str
    concurrency: int
    redelivery_timeout: timedelta
    reconnection_delay: timedelta
    minimum_check_interval: timedelta
    scheduling_resolution: timedelta
    schedule_automatic_tasks: bool

    def __init__(
        self,
        docket: Docket,
        name: str | None = None,
        concurrency: int = 10,
        redelivery_timeout: timedelta = timedelta(minutes=5),
        reconnection_delay: timedelta = timedelta(seconds=5),
        minimum_check_interval: timedelta = timedelta(milliseconds=250),
        scheduling_resolution: timedelta = timedelta(milliseconds=250),
        schedule_automatic_tasks: bool = True,
    ) -> None:
        self.docket = docket
        self.name = name or f"{socket.gethostname()}#{os.getpid()}"
        self.concurrency = concurrency
        self.redelivery_timeout = redelivery_timeout
        self.reconnection_delay = reconnection_delay
        self.minimum_check_interval = minimum_check_interval
        self.scheduling_resolution = scheduling_resolution
        self.schedule_automatic_tasks = schedule_automatic_tasks

    async def __aenter__(self) -> Self:
        self._heartbeat_task = asyncio.create_task(self._heartbeat())
        self._execution_counts = {}
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        del self._execution_counts

        self._heartbeat_task.cancel()
        try:
            await self._heartbeat_task
        except asyncio.CancelledError:
            pass
        del self._heartbeat_task

    def labels(self) -> Mapping[str, str]:
        return {
            **self.docket.labels(),
            "docket.worker": self.name,
        }

    def _log_context(self) -> Mapping[str, str]:
        return {
            **self.labels(),
            "docket.queue_key": self.docket.queue_key,
            "docket.stream_key": self.docket.stream_key,
        }

    @classmethod
    async def run(
        cls,
        docket_name: str = "docket",
        url: str = "redis://localhost:6379/0",
        name: str | None = None,
        concurrency: int = 10,
        redelivery_timeout: timedelta = timedelta(minutes=5),
        reconnection_delay: timedelta = timedelta(seconds=5),
        minimum_check_interval: timedelta = timedelta(milliseconds=100),
        scheduling_resolution: timedelta = timedelta(milliseconds=250),
        schedule_automatic_tasks: bool = True,
        until_finished: bool = False,
        metrics_port: int | None = None,
        tasks: list[str] = ["docket.tasks:standard_tasks"],
    ) -> None:
        with metrics_server(port=metrics_port):
            async with Docket(name=docket_name, url=url) as docket:
                for task_path in tasks:
                    docket.register_collection(task_path)

                async with Worker(
                    docket=docket,
                    name=name,
                    concurrency=concurrency,
                    redelivery_timeout=redelivery_timeout,
                    reconnection_delay=reconnection_delay,
                    minimum_check_interval=minimum_check_interval,
                    scheduling_resolution=scheduling_resolution,
                    schedule_automatic_tasks=schedule_automatic_tasks,
                ) as worker:
                    if until_finished:
                        await worker.run_until_finished()
                    else:
                        await worker.run_forever()  # pragma: no cover

    async def run_until_finished(self) -> None:
        """Run the worker until there are no more tasks to process."""
        return await self._run(forever=False)

    async def run_forever(self) -> None:
        """Run the worker indefinitely."""
        return await self._run(forever=True)  # pragma: no cover

    _execution_counts: dict[str, int]

    async def run_at_most(self, iterations_by_key: Mapping[str, int]) -> None:
        """
        Run the worker until there are no more tasks to process, but limit specified
        task keys to a maximum number of iterations.

        This is particularly useful for testing self-perpetuating tasks that would
        otherwise run indefinitely.

        Args:
            iterations_by_key: Maps task keys to their maximum allowed executions
        """
        self._execution_counts = {key: 0 for key in iterations_by_key}

        def has_reached_max_iterations(execution: Execution) -> bool:
            key = execution.key

            if key not in iterations_by_key:
                return False

            if self._execution_counts[key] >= iterations_by_key[key]:
                return True

            return False

        self.docket.strike_list.add_condition(has_reached_max_iterations)
        try:
            await self.run_until_finished()
        finally:
            self.docket.strike_list.remove_condition(has_reached_max_iterations)
            self._execution_counts = {}

    async def _run(self, forever: bool = False) -> None:
        self._startup_log()

        while True:
            try:
                async with self.docket.redis() as redis:
                    return await self._worker_loop(redis, forever=forever)
            except ConnectionError:
                REDIS_DISRUPTIONS.add(1, self.labels())
                logger.warning(
                    "Error connecting to redis, retrying in %s...",
                    self.reconnection_delay,
                    exc_info=True,
                )
                await asyncio.sleep(self.reconnection_delay.total_seconds())

    async def _worker_loop(self, redis: Redis, forever: bool = False):
        worker_stopping = asyncio.Event()

        if self.schedule_automatic_tasks:
            await self._schedule_all_automatic_perpetual_tasks()

        scheduler_task = asyncio.create_task(
            self._scheduler_loop(redis, worker_stopping)
        )

        active_tasks: dict[asyncio.Task[None], RedisMessageID] = {}
        available_slots = self.concurrency

        log_context = self._log_context()

        async def check_for_work() -> bool:
            logger.debug("Checking for work", extra=log_context)
            async with redis.pipeline() as pipeline:
                pipeline.xlen(self.docket.stream_key)
                pipeline.zcard(self.docket.queue_key)
                results: list[int] = await pipeline.execute()
                stream_len = results[0]
                queue_len = results[1]
                return stream_len > 0 or queue_len > 0

        async def get_redeliveries(redis: Redis) -> RedisReadGroupResponse:
            logger.debug("Getting redeliveries", extra=log_context)
            _, redeliveries, *_ = await redis.xautoclaim(
                name=self.docket.stream_key,
                groupname=self.docket.worker_group_name,
                consumername=self.name,
                min_idle_time=int(self.redelivery_timeout.total_seconds() * 1000),
                start_id="0-0",
                count=available_slots,
            )
            return [(b"__redelivery__", redeliveries)]

        async def get_new_deliveries(redis: Redis) -> RedisReadGroupResponse:
            logger.debug("Getting new deliveries", extra=log_context)
            return await redis.xreadgroup(
                groupname=self.docket.worker_group_name,
                consumername=self.name,
                streams={self.docket.stream_key: ">"},
                block=int(self.minimum_check_interval.total_seconds() * 1000),
                count=available_slots,
            )

        def start_task(message_id: RedisMessageID, message: RedisMessage) -> bool:
            function_name = message[b"function"].decode()
            if not (function := self.docket.tasks.get(function_name)):
                logger.warning(
                    "Task function %r not found",
                    function_name,
                    extra=log_context,
                )
                return False

            execution = Execution.from_message(function, message)

            task = asyncio.create_task(self._execute(execution), name=execution.key)
            active_tasks[task] = message_id

            nonlocal available_slots
            available_slots -= 1

            return True

        async def process_completed_tasks() -> None:
            completed_tasks = {task for task in active_tasks if task.done()}
            for task in completed_tasks:
                message_id = active_tasks.pop(task)
                await task
                await ack_message(redis, message_id)

        async def ack_message(redis: Redis, message_id: RedisMessageID) -> None:
            logger.debug("Acknowledging message", extra=log_context)
            async with redis.pipeline() as pipeline:
                pipeline.xack(
                    self.docket.stream_key,
                    self.docket.worker_group_name,
                    message_id,
                )
                pipeline.xdel(
                    self.docket.stream_key,
                    message_id,
                )
                await pipeline.execute()

        has_work: bool = True

        try:
            while forever or has_work or active_tasks:
                await process_completed_tasks()

                available_slots = self.concurrency - len(active_tasks)

                if available_slots <= 0:
                    await asyncio.sleep(self.minimum_check_interval.total_seconds())
                    continue

                for source in [get_redeliveries, get_new_deliveries]:
                    for _, messages in await source(redis):
                        for message_id, message in messages:
                            if not message:  # pragma: no cover
                                continue

                            if not start_task(message_id, message):
                                await self._delete_known_task(redis, message)
                                await ack_message(redis, message_id)

                    if available_slots <= 0:
                        break

                if not forever and not active_tasks:
                    has_work = await check_for_work()

        except asyncio.CancelledError:
            if active_tasks:  # pragma: no cover
                logger.info(
                    "Shutdown requested, finishing %d active tasks...",
                    len(active_tasks),
                    extra=log_context,
                )
        finally:
            if active_tasks:
                await asyncio.gather(*active_tasks, return_exceptions=True)
                await process_completed_tasks()

            worker_stopping.set()
            await scheduler_task

    async def _scheduler_loop(
        self,
        redis: Redis,
        worker_stopping: asyncio.Event,
    ) -> None:
        """Loop that moves due tasks from the queue to the stream."""

        stream_due_tasks: _stream_due_tasks = cast(
            _stream_due_tasks,
            redis.register_script(
                # Lua script to atomically move scheduled tasks to the stream
                # KEYS[1]: queue key (sorted set)
                # KEYS[2]: stream key
                # ARGV[1]: current timestamp
                # ARGV[2]: docket name prefix
                """
            local total_work = redis.call('ZCARD', KEYS[1])
            local due_work = 0

            if total_work > 0 then
                local tasks = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1])

                for i, key in ipairs(tasks) do
                    local hash_key = ARGV[2] .. ":" .. key
                    local task_data = redis.call('HGETALL', hash_key)

                    if #task_data > 0 then
                        local task = {}
                        for j = 1, #task_data, 2 do
                            task[task_data[j]] = task_data[j+1]
                        end

                        redis.call('XADD', KEYS[2], '*',
                            'key', task['key'],
                            'when', task['when'],
                            'function', task['function'],
                            'args', task['args'],
                            'kwargs', task['kwargs'],
                            'attempt', task['attempt']
                        )
                        redis.call('DEL', hash_key)
                        due_work = due_work + 1
                    end
                end
            end

            if due_work > 0 then
                redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, ARGV[1])
            end

            return {total_work, due_work}
            """
            ),
        )

        total_work: int = sys.maxsize

        log_context = self._log_context()

        while not worker_stopping.is_set() or total_work:
            try:
                logger.debug("Scheduling due tasks", extra=log_context)
                total_work, due_work = await stream_due_tasks(
                    keys=[self.docket.queue_key, self.docket.stream_key],
                    args=[datetime.now(timezone.utc).timestamp(), self.docket.name],
                )

                if due_work > 0:
                    logger.debug(
                        "Moved %d/%d due tasks from %s to %s",
                        due_work,
                        total_work,
                        self.docket.queue_key,
                        self.docket.stream_key,
                        extra=log_context,
                    )
            except Exception:  # pragma: no cover
                logger.exception(
                    "Error in scheduler loop",
                    exc_info=True,
                    extra=log_context,
                )
            finally:
                await asyncio.sleep(self.scheduling_resolution.total_seconds())

        logger.debug("Scheduler loop finished", extra=log_context)

    async def _schedule_all_automatic_perpetual_tasks(self) -> None:
        async with self.docket.redis() as redis:
            try:
                async with redis.lock(
                    f"{self.docket.name}:perpetual:lock", timeout=10, blocking=False
                ):
                    for task_function in self.docket.tasks.values():
                        perpetual = get_single_dependency_parameter_of_type(
                            task_function, Perpetual
                        )
                        if perpetual is None:
                            continue

                        if not perpetual.automatic:
                            continue

                        key = task_function.__name__

                        await self.docket.add(task_function, key=key)()
            except LockError:  # pragma: no cover
                return

    async def _delete_known_task(
        self, redis: Redis, execution_or_message: Execution | RedisMessage
    ) -> None:
        if isinstance(execution_or_message, Execution):
            key = execution_or_message.key
        elif bytes_key := execution_or_message.get(b"key"):
            key = bytes_key.decode()
        else:  # pragma: no cover
            return

        logger.debug("Deleting known task", extra=self._log_context())
        known_task_key = self.docket.known_task_key(key)
        await redis.delete(known_task_key)

    async def _execute(self, execution: Execution) -> None:
        log_context = {**self._log_context(), **execution.specific_labels()}
        counter_labels = {**self.labels(), **execution.general_labels()}

        call = execution.call_repr()

        if self.docket.strike_list.is_stricken(execution):
            async with self.docket.redis() as redis:
                await self._delete_known_task(redis, execution)

            logger.warning("🗙 %s", call, extra=log_context)
            TASKS_STRICKEN.add(1, counter_labels | {"docket.where": "worker"})
            return

        if execution.key in self._execution_counts:
            self._execution_counts[execution.key] += 1

        start = time.time()
        punctuality = start - execution.when.timestamp()
        log_context = {**log_context, "punctuality": punctuality}
        duration = 0.0

        TASKS_STARTED.add(1, counter_labels)
        TASKS_RUNNING.add(1, counter_labels)
        TASK_PUNCTUALITY.record(punctuality, counter_labels)

        arrow = "↬" if execution.attempt > 1 else "↪"
        logger.info("%s [%s] %s", arrow, ms(punctuality), call, extra=log_context)

        dependencies: dict[str, Dependency] = {}

        with tracer.start_as_current_span(
            execution.function.__name__,
            kind=trace.SpanKind.CONSUMER,
            attributes={
                **self.labels(),
                **execution.specific_labels(),
                "code.function.name": execution.function.__name__,
            },
            links=execution.incoming_span_links(),
        ):
            try:
                async with resolved_dependencies(self, execution) as dependencies:
                    # Preemptively reschedule the perpetual task for the future, or clear
                    # the known task key for this task
                    rescheduled = await self._perpetuate_if_requested(
                        execution, dependencies
                    )
                    if not rescheduled:
                        async with self.docket.redis() as redis:
                            await self._delete_known_task(redis, execution)

                    dependency_failures = {
                        k: v
                        for k, v in dependencies.items()
                        if isinstance(v, FailedDependency)
                    }
                    if dependency_failures:
                        raise ExceptionGroup(
                            (
                                "Failed to resolve dependencies for parameter(s): "
                                + ", ".join(dependency_failures.keys())
                            ),
                            [
                                dependency.error
                                for dependency in dependency_failures.values()
                            ],
                        )

                    if timeout := get_single_dependency_of_type(dependencies, Timeout):
                        await self._run_function_with_timeout(
                            execution, dependencies, timeout
                        )
                    else:
                        await execution.function(
                            *execution.args,
                            **{
                                **execution.kwargs,
                                **dependencies,
                            },
                        )

                    duration = log_context["duration"] = time.time() - start
                    TASKS_SUCCEEDED.add(1, counter_labels)

                    rescheduled = await self._perpetuate_if_requested(
                        execution, dependencies, timedelta(seconds=duration)
                    )

                    arrow = "↫" if rescheduled else "↩"
                    logger.info(
                        "%s [%s] %s", arrow, ms(duration), call, extra=log_context
                    )
            except Exception:
                duration = log_context["duration"] = time.time() - start
                TASKS_FAILED.add(1, counter_labels)

                retried = await self._retry_if_requested(execution, dependencies)
                if not retried:
                    retried = await self._perpetuate_if_requested(
                        execution, dependencies, timedelta(seconds=duration)
                    )

                arrow = "↫" if retried else "↩"
                logger.exception(
                    "%s [%s] %s", arrow, ms(duration), call, extra=log_context
                )
            finally:
                TASKS_RUNNING.add(-1, counter_labels)
                TASKS_COMPLETED.add(1, counter_labels)
                TASK_DURATION.record(duration, counter_labels)

    async def _run_function_with_timeout(
        self,
        execution: Execution,
        dependencies: dict[str, Dependency],
        timeout: Timeout,
    ) -> None:
        task_coro = cast(
            Coroutine[None, None, None],
            execution.function(*execution.args, **execution.kwargs, **dependencies),
        )
        task = asyncio.create_task(task_coro)
        try:
            while not task.done():  # pragma: no branch
                remaining = timeout.remaining().total_seconds()
                if timeout.expired():
                    task.cancel()
                    break

                try:
                    await asyncio.wait_for(asyncio.shield(task), timeout=remaining)
                    return
                except asyncio.TimeoutError:
                    continue
        finally:
            if not task.done():
                task.cancel()

            try:
                await task
            except asyncio.CancelledError:
                raise asyncio.TimeoutError

    async def _retry_if_requested(
        self,
        execution: Execution,
        dependencies: dict[str, Dependency],
    ) -> bool:
        retry = get_single_dependency_of_type(dependencies, Retry)
        if not retry:
            return False

        if retry.attempts is not None and execution.attempt >= retry.attempts:
            return False

        execution.when = datetime.now(timezone.utc) + retry.delay
        execution.attempt += 1
        await self.docket.schedule(execution)

        TASKS_RETRIED.add(1, {**self.labels(), **execution.specific_labels()})
        return True

    async def _perpetuate_if_requested(
        self,
        execution: Execution,
        dependencies: dict[str, Dependency],
        duration: timedelta | None = None,
    ) -> bool:
        perpetual = get_single_dependency_of_type(dependencies, Perpetual)
        if not perpetual:
            return False

        if perpetual.cancelled:
            await self.docket.cancel(execution.key)
            return False

        now = datetime.now(timezone.utc)
        when = max(now, now + perpetual.every - (duration or timedelta(0)))

        await self.docket.replace(execution.function, when, execution.key)(
            *perpetual.args,
            **perpetual.kwargs,
        )

        if duration is not None:
            TASKS_PERPETUATED.add(1, {**self.labels(), **execution.specific_labels()})

        return True

    def _startup_log(self) -> None:
        logger.info("Starting worker %r with the following tasks:", self.name)
        for task_name, task in self.docket.tasks.items():
            logger.info("* %s(%s)", task_name, compact_signature(get_signature(task)))

    @property
    def workers_set(self) -> str:
        return self.docket.workers_set

    def worker_tasks_set(self, worker_name: str) -> str:
        return self.docket.worker_tasks_set(worker_name)

    def task_workers_set(self, task_name: str) -> str:
        return self.docket.task_workers_set(task_name)

    async def _heartbeat(self) -> None:
        while True:
            await asyncio.sleep(self.docket.heartbeat_interval.total_seconds())
            try:
                now = datetime.now(timezone.utc).timestamp()
                maximum_age = (
                    self.docket.heartbeat_interval * self.docket.missed_heartbeats
                )
                oldest = now - maximum_age.total_seconds()

                task_names = list(self.docket.tasks)

                async with self.docket.redis() as r:
                    async with r.pipeline() as pipeline:
                        pipeline.zremrangebyscore(self.workers_set, 0, oldest)
                        pipeline.zadd(self.workers_set, {self.name: now})

                        for task_name in task_names:
                            task_workers_set = self.task_workers_set(task_name)
                            pipeline.zremrangebyscore(task_workers_set, 0, oldest)
                            pipeline.zadd(task_workers_set, {self.name: now})

                        pipeline.sadd(self.worker_tasks_set(self.name), *task_names)
                        pipeline.expire(
                            self.worker_tasks_set(self.name),
                            max(maximum_age, timedelta(seconds=1)),
                        )

                        await pipeline.execute()

                    async with r.pipeline() as pipeline:
                        pipeline.xlen(self.docket.stream_key)
                        pipeline.zcount(self.docket.queue_key, 0, now)
                        pipeline.zcount(self.docket.queue_key, now, "+inf")

                        results: list[int] = await pipeline.execute()
                        stream_depth = results[0]
                        overdue_depth = results[1]
                        schedule_depth = results[2]

                        QUEUE_DEPTH.set(
                            stream_depth + overdue_depth, self.docket.labels()
                        )
                        SCHEDULE_DEPTH.set(schedule_depth, self.docket.labels())

            except asyncio.CancelledError:  # pragma: no cover
                return
            except ConnectionError:
                REDIS_DISRUPTIONS.add(1, self.labels())
                logger.exception(
                    "Error sending worker heartbeat",
                    exc_info=True,
                    extra=self._log_context(),
                )
            except Exception:
                logger.exception(
                    "Error sending worker heartbeat",
                    exc_info=True,
                    extra=self._log_context(),
                )

run_at_most(iterations_by_key) async

Run the worker until there are no more tasks to process, but limit specified task keys to a maximum number of iterations.

This is particularly useful for testing self-perpetuating tasks that would otherwise run indefinitely.

Parameters:

Name Type Description Default
iterations_by_key Mapping[str, int]

Maps task keys to their maximum allowed executions

required
Source code in src/docket/worker.py
async def run_at_most(self, iterations_by_key: Mapping[str, int]) -> None:
    """
    Run the worker until there are no more tasks to process, but limit specified
    task keys to a maximum number of iterations.

    This is particularly useful for testing self-perpetuating tasks that would
    otherwise run indefinitely.

    Args:
        iterations_by_key: Maps task keys to their maximum allowed executions
    """
    self._execution_counts = {key: 0 for key in iterations_by_key}

    def has_reached_max_iterations(execution: Execution) -> bool:
        key = execution.key

        if key not in iterations_by_key:
            return False

        if self._execution_counts[key] >= iterations_by_key[key]:
            return True

        return False

    self.docket.strike_list.add_condition(has_reached_max_iterations)
    try:
        await self.run_until_finished()
    finally:
        self.docket.strike_list.remove_condition(has_reached_max_iterations)
        self._execution_counts = {}

run_forever() async

Run the worker indefinitely.

Source code in src/docket/worker.py
async def run_forever(self) -> None:
    """Run the worker indefinitely."""
    return await self._run(forever=True)  # pragma: no cover

run_until_finished() async

Run the worker until there are no more tasks to process.

Source code in src/docket/worker.py
async def run_until_finished(self) -> None:
    """Run the worker until there are no more tasks to process."""
    return await self._run(forever=False)

CurrentDocket()

A dependency to access the current Docket.

Example:

@task
async def my_task(docket: Docket = CurrentDocket()) -> None:
    assert isinstance(docket, Docket)
Source code in src/docket/dependencies.py
def CurrentDocket() -> Docket:
    """A dependency to access the current Docket.

    Example:

    ```python
    @task
    async def my_task(docket: Docket = CurrentDocket()) -> None:
        assert isinstance(docket, Docket)
    ```
    """
    return cast(Docket, _CurrentDocket())

CurrentExecution()

A dependency to access the current Execution.

Example:

@task
async def my_task(execution: Execution = CurrentExecution()) -> None:
    assert isinstance(execution, Execution)
Source code in src/docket/dependencies.py
def CurrentExecution() -> Execution:
    """A dependency to access the current Execution.

    Example:

    ```python
    @task
    async def my_task(execution: Execution = CurrentExecution()) -> None:
        assert isinstance(execution, Execution)
    ```
    """
    return cast(Execution, _CurrentExecution())

CurrentWorker()

A dependency to access the current Worker.

Example:

@task
async def my_task(worker: Worker = CurrentWorker()) -> None:
    assert isinstance(worker, Worker)
Source code in src/docket/dependencies.py
def CurrentWorker() -> "Worker":
    """A dependency to access the current Worker.

    Example:

    ```python
    @task
    async def my_task(worker: Worker = CurrentWorker()) -> None:
        assert isinstance(worker, Worker)
    ```
    """
    return cast("Worker", _CurrentWorker())

Depends(dependency)

Include a user-defined function as a dependency. Dependencies may either return a value or an async context manager. If it returns a context manager, the dependency will be entered and exited around the task, giving an opportunity to control the lifetime of a resource, like a database connection.

Example:

async def my_dependency() -> str:
    return "Hello, world!"

@task async def my_task(dependency: str = Depends(my_dependency)) -> None:
    print(dependency)
Source code in src/docket/dependencies.py
def Depends(dependency: DependencyFunction[R]) -> R:
    """Include a user-defined function as a dependency.  Dependencies may either return
    a value or an async context manager.  If it returns a context manager, the
    dependency will be entered and exited around the task, giving an opportunity to
    control the lifetime of a resource, like a database connection.

    Example:

    ```python

    async def my_dependency() -> str:
        return "Hello, world!"

    @task async def my_task(dependency: str = Depends(my_dependency)) -> None:
        print(dependency)

    ```
    """
    return cast(R, _Depends(dependency))

TaskArgument(parameter=None, optional=False)

A dependency to access a argument of the currently executing task. This is often useful in dependency functions so they can access the arguments of the task they are injected into.

Example:

async def customer_name(customer_id: int = TaskArgument()) -> str:
    ...look up the customer's name by ID...
    return "John Doe"

@task
async def greet_customer(customer_id: int, name: str = Depends(customer_name)) -> None:
    print(f"Hello, {name}!")
Source code in src/docket/dependencies.py
def TaskArgument(parameter: str | None = None, optional: bool = False) -> Any:
    """A dependency to access a argument of the currently executing task.  This is
    often useful in dependency functions so they can access the arguments of the
    task they are injected into.

    Example:

    ```python
    async def customer_name(customer_id: int = TaskArgument()) -> str:
        ...look up the customer's name by ID...
        return "John Doe"

    @task
    async def greet_customer(customer_id: int, name: str = Depends(customer_name)) -> None:
        print(f"Hello, {name}!")
    ```
    """
    return cast(Any, _TaskArgument(parameter, optional))

TaskKey()

A dependency to access the key of the currently executing task.

Example:

@task
async def my_task(key: str = TaskKey()) -> None:
    assert isinstance(key, str)
Source code in src/docket/dependencies.py
def TaskKey() -> str:
    """A dependency to access the key of the currently executing task.

    Example:

    ```python
    @task
    async def my_task(key: str = TaskKey()) -> None:
        assert isinstance(key, str)
    ```
    """
    return cast(str, _TaskKey())

TaskLogger()

A dependency to access a logger for the currently executing task. The logger will automatically inject contextual information such as the worker and docket name, the task key, and the current execution attempt number.

Example:

@task
async def my_task(logger: LoggerAdapter[Logger] = TaskLogger()) -> None:
    logger.info("Hello, world!")
Source code in src/docket/dependencies.py
def TaskLogger() -> logging.LoggerAdapter[logging.Logger]:
    """A dependency to access a logger for the currently executing task.  The logger
    will automatically inject contextual information such as the worker and docket
    name, the task key, and the current execution attempt number.

    Example:

    ```python
    @task
    async def my_task(logger: LoggerAdapter[Logger] = TaskLogger()) -> None:
        logger.info("Hello, world!")
    ```
    """
    return cast(logging.LoggerAdapter[logging.Logger], _TaskLogger())