Skip to content

API Reference

docket

docket - A distributed background task system for Python functions.

docket focuses on scheduling future work as seamlessly and efficiently as immediate work.

Agenda

A collection of tasks to be scheduled together on a Docket.

The Agenda allows you to build up a collection of tasks with their arguments, then schedule them all at once using various timing strategies like scattering.

Example

agenda = Agenda() agenda.add(process_item)(item1) agenda.add(process_item)(item2) agenda.add(send_email)(email) await agenda.scatter(docket, over=timedelta(minutes=50))

Source code in src/docket/agenda.py
class Agenda:
    """A collection of tasks to be scheduled together on a Docket.

    The Agenda allows you to build up a collection of tasks with their arguments,
    then schedule them all at once using various timing strategies like scattering.

    Example:
        >>> agenda = Agenda()
        >>> agenda.add(process_item)(item1)
        >>> agenda.add(process_item)(item2)
        >>> agenda.add(send_email)(email)
        >>> await agenda.scatter(docket, over=timedelta(minutes=50))
    """

    def __init__(self) -> None:
        """Initialize an empty Agenda."""
        self._tasks: list[
            tuple[TaskFunction | str, tuple[Any, ...], dict[str, Any]]
        ] = []

    def __len__(self) -> int:
        """Return the number of tasks in the agenda."""
        return len(self._tasks)

    def __iter__(
        self,
    ) -> Iterator[tuple[TaskFunction | str, tuple[Any, ...], dict[str, Any]]]:
        """Iterate over tasks in the agenda."""
        return iter(self._tasks)

    @overload
    def add(
        self,
        function: Callable[P, Awaitable[R]],
    ) -> Callable[P, None]:
        """Add a task function to the agenda.

        Args:
            function: The task function to add.

        Returns:
            A callable that accepts the task arguments.
        """

    @overload
    def add(
        self,
        function: str,
    ) -> Callable[..., None]:
        """Add a task by name to the agenda.

        Args:
            function: The name of a registered task.

        Returns:
            A callable that accepts the task arguments.
        """

    def add(
        self,
        function: Callable[P, Awaitable[R]] | str,
    ) -> Callable[..., None]:
        """Add a task to the agenda.

        Args:
            function: The task function or name to add.

        Returns:
            A callable that accepts the task arguments and adds them to the agenda.
        """

        def scheduler(*args: Any, **kwargs: Any) -> None:
            self._tasks.append((function, args, kwargs))

        return scheduler

    def clear(self) -> None:
        """Clear all tasks from the agenda."""
        self._tasks.clear()

    async def scatter(
        self,
        docket: Docket,
        over: timedelta,
        start: datetime | None = None,
        jitter: timedelta | None = None,
    ) -> list[Execution]:
        """Scatter the tasks in this agenda over a time period.

        Tasks are distributed evenly across the specified time window,
        optionally with random jitter to prevent thundering herd effects.

        If an error occurs during scheduling, some tasks may have already been
        scheduled successfully before the failure occurred.

        Args:
            docket: The Docket to schedule tasks on.
            over: Time period to scatter tasks over (required).
            start: When to start scattering from. Defaults to now.
            jitter: Maximum random offset to add/subtract from each scheduled time.

        Returns:
            List of Execution objects for the scheduled tasks.

        Raises:
            KeyError: If any task name is not registered with the docket.
            ValueError: If any task is stricken or 'over' is not positive.
        """
        if over.total_seconds() <= 0:
            raise ValueError("'over' parameter must be a positive duration")

        if not self._tasks:
            return []

        if start is None:
            start = datetime.now(timezone.utc)

        # Calculate even distribution over the time period
        task_count = len(self._tasks)

        if task_count == 1:
            # Single task goes in the middle of the window
            schedule_times = [start + over / 2]
        else:
            # Distribute tasks evenly across the window
            # For n tasks, we want n points from start to start+over inclusive
            interval = over / (task_count - 1)
            schedule_times = [start + interval * i for i in range(task_count)]

        # Apply jitter if specified
        if jitter:
            jittered_times: list[datetime] = []
            for schedule_time in schedule_times:
                # Random offset between -jitter and +jitter
                offset = timedelta(
                    seconds=random.uniform(
                        -jitter.total_seconds(), jitter.total_seconds()
                    )
                )
                # Ensure the jittered time doesn't go before start
                jittered_time = max(schedule_time + offset, start)
                jittered_times.append(jittered_time)
            schedule_times = jittered_times

        # Build all Execution objects first, validating as we go
        executions: list[Execution] = []
        for (task_func, args, kwargs), schedule_time in zip(
            self._tasks, schedule_times
        ):
            # Resolve task function if given by name
            if isinstance(task_func, str):
                if task_func not in docket.tasks:
                    raise KeyError(f"Task '{task_func}' is not registered")
                resolved_func = docket.tasks[task_func]
            else:
                # Ensure task is registered
                if task_func not in docket.tasks.values():
                    docket.register(task_func)
                resolved_func = task_func

            # Create execution with unique key
            key = str(uuid7())
            execution = Execution(
                function=resolved_func,
                args=args,
                kwargs=kwargs,
                when=schedule_time,
                key=key,
                attempt=1,
            )
            executions.append(execution)

        # Schedule all tasks - if any fail, some tasks may have been scheduled
        for execution in executions:
            scheduler = docket.add(
                execution.function, when=execution.when, key=execution.key
            )
            # Actually schedule the task - if this fails, earlier tasks remain scheduled
            await scheduler(*execution.args, **execution.kwargs)

        return executions

__init__()

Initialize an empty Agenda.

Source code in src/docket/agenda.py
def __init__(self) -> None:
    """Initialize an empty Agenda."""
    self._tasks: list[
        tuple[TaskFunction | str, tuple[Any, ...], dict[str, Any]]
    ] = []

__iter__()

Iterate over tasks in the agenda.

Source code in src/docket/agenda.py
def __iter__(
    self,
) -> Iterator[tuple[TaskFunction | str, tuple[Any, ...], dict[str, Any]]]:
    """Iterate over tasks in the agenda."""
    return iter(self._tasks)

__len__()

Return the number of tasks in the agenda.

Source code in src/docket/agenda.py
def __len__(self) -> int:
    """Return the number of tasks in the agenda."""
    return len(self._tasks)

add(function)

add(
    function: Callable[P, Awaitable[R]],
) -> Callable[P, None]
add(function: str) -> Callable[..., None]

Add a task to the agenda.

Parameters:

Name Type Description Default
function Callable[P, Awaitable[R]] | str

The task function or name to add.

required

Returns:

Type Description
Callable[..., None]

A callable that accepts the task arguments and adds them to the agenda.

Source code in src/docket/agenda.py
def add(
    self,
    function: Callable[P, Awaitable[R]] | str,
) -> Callable[..., None]:
    """Add a task to the agenda.

    Args:
        function: The task function or name to add.

    Returns:
        A callable that accepts the task arguments and adds them to the agenda.
    """

    def scheduler(*args: Any, **kwargs: Any) -> None:
        self._tasks.append((function, args, kwargs))

    return scheduler

clear()

Clear all tasks from the agenda.

Source code in src/docket/agenda.py
def clear(self) -> None:
    """Clear all tasks from the agenda."""
    self._tasks.clear()

scatter(docket, over, start=None, jitter=None) async

Scatter the tasks in this agenda over a time period.

Tasks are distributed evenly across the specified time window, optionally with random jitter to prevent thundering herd effects.

If an error occurs during scheduling, some tasks may have already been scheduled successfully before the failure occurred.

Parameters:

Name Type Description Default
docket Docket

The Docket to schedule tasks on.

required
over timedelta

Time period to scatter tasks over (required).

required
start datetime | None

When to start scattering from. Defaults to now.

None
jitter timedelta | None

Maximum random offset to add/subtract from each scheduled time.

None

Returns:

Type Description
list[Execution]

List of Execution objects for the scheduled tasks.

Raises:

Type Description
KeyError

If any task name is not registered with the docket.

ValueError

If any task is stricken or 'over' is not positive.

Source code in src/docket/agenda.py
async def scatter(
    self,
    docket: Docket,
    over: timedelta,
    start: datetime | None = None,
    jitter: timedelta | None = None,
) -> list[Execution]:
    """Scatter the tasks in this agenda over a time period.

    Tasks are distributed evenly across the specified time window,
    optionally with random jitter to prevent thundering herd effects.

    If an error occurs during scheduling, some tasks may have already been
    scheduled successfully before the failure occurred.

    Args:
        docket: The Docket to schedule tasks on.
        over: Time period to scatter tasks over (required).
        start: When to start scattering from. Defaults to now.
        jitter: Maximum random offset to add/subtract from each scheduled time.

    Returns:
        List of Execution objects for the scheduled tasks.

    Raises:
        KeyError: If any task name is not registered with the docket.
        ValueError: If any task is stricken or 'over' is not positive.
    """
    if over.total_seconds() <= 0:
        raise ValueError("'over' parameter must be a positive duration")

    if not self._tasks:
        return []

    if start is None:
        start = datetime.now(timezone.utc)

    # Calculate even distribution over the time period
    task_count = len(self._tasks)

    if task_count == 1:
        # Single task goes in the middle of the window
        schedule_times = [start + over / 2]
    else:
        # Distribute tasks evenly across the window
        # For n tasks, we want n points from start to start+over inclusive
        interval = over / (task_count - 1)
        schedule_times = [start + interval * i for i in range(task_count)]

    # Apply jitter if specified
    if jitter:
        jittered_times: list[datetime] = []
        for schedule_time in schedule_times:
            # Random offset between -jitter and +jitter
            offset = timedelta(
                seconds=random.uniform(
                    -jitter.total_seconds(), jitter.total_seconds()
                )
            )
            # Ensure the jittered time doesn't go before start
            jittered_time = max(schedule_time + offset, start)
            jittered_times.append(jittered_time)
        schedule_times = jittered_times

    # Build all Execution objects first, validating as we go
    executions: list[Execution] = []
    for (task_func, args, kwargs), schedule_time in zip(
        self._tasks, schedule_times
    ):
        # Resolve task function if given by name
        if isinstance(task_func, str):
            if task_func not in docket.tasks:
                raise KeyError(f"Task '{task_func}' is not registered")
            resolved_func = docket.tasks[task_func]
        else:
            # Ensure task is registered
            if task_func not in docket.tasks.values():
                docket.register(task_func)
            resolved_func = task_func

        # Create execution with unique key
        key = str(uuid7())
        execution = Execution(
            function=resolved_func,
            args=args,
            kwargs=kwargs,
            when=schedule_time,
            key=key,
            attempt=1,
        )
        executions.append(execution)

    # Schedule all tasks - if any fail, some tasks may have been scheduled
    for execution in executions:
        scheduler = docket.add(
            execution.function, when=execution.when, key=execution.key
        )
        # Actually schedule the task - if this fails, earlier tasks remain scheduled
        await scheduler(*execution.args, **execution.kwargs)

    return executions

ConcurrencyLimit

Bases: Dependency

Configures concurrency limits for a task based on specific argument values.

This allows fine-grained control over task execution by limiting concurrent tasks based on the value of specific arguments.

Example:

async def process_customer(
    customer_id: int,
    concurrency: ConcurrencyLimit = ConcurrencyLimit("customer_id", max_concurrent=1)
) -> None:
    # Only one task per customer_id will run at a time
    ...

async def backup_db(
    db_name: str,
    concurrency: ConcurrencyLimit = ConcurrencyLimit("db_name", max_concurrent=3)
) -> None:
    # Only 3 backup tasks per database name will run at a time
    ...
Source code in src/docket/dependencies.py
class ConcurrencyLimit(Dependency):
    """Configures concurrency limits for a task based on specific argument values.

    This allows fine-grained control over task execution by limiting concurrent
    tasks based on the value of specific arguments.

    Example:

    ```python
    async def process_customer(
        customer_id: int,
        concurrency: ConcurrencyLimit = ConcurrencyLimit("customer_id", max_concurrent=1)
    ) -> None:
        # Only one task per customer_id will run at a time
        ...

    async def backup_db(
        db_name: str,
        concurrency: ConcurrencyLimit = ConcurrencyLimit("db_name", max_concurrent=3)
    ) -> None:
        # Only 3 backup tasks per database name will run at a time
        ...
    ```
    """

    single: bool = True

    def __init__(
        self, argument_name: str, max_concurrent: int = 1, scope: str | None = None
    ) -> None:
        """
        Args:
            argument_name: The name of the task argument to use for concurrency grouping
            max_concurrent: Maximum number of concurrent tasks per unique argument value
            scope: Optional scope prefix for Redis keys (defaults to docket name)
        """
        self.argument_name = argument_name
        self.max_concurrent = max_concurrent
        self.scope = scope
        self._concurrency_key: str | None = None
        self._initialized: bool = False

    async def __aenter__(self) -> "ConcurrencyLimit":
        execution = self.execution.get()
        docket = self.docket.get()

        # Get the argument value to group by
        try:
            argument_value = execution.get_argument(self.argument_name)
        except KeyError:
            # If argument not found, create a bypass limit that doesn't apply concurrency control
            limit = ConcurrencyLimit(
                self.argument_name, self.max_concurrent, self.scope
            )
            limit._concurrency_key = None  # Special marker for bypassed concurrency
            limit._initialized = True  # Mark as initialized but bypassed
            return limit

        # Create a concurrency key for this specific argument value
        scope = self.scope or docket.name
        self._concurrency_key = (
            f"{scope}:concurrency:{self.argument_name}:{argument_value}"
        )

        limit = ConcurrencyLimit(self.argument_name, self.max_concurrent, self.scope)
        limit._concurrency_key = self._concurrency_key
        limit._initialized = True  # Mark as initialized
        return limit

    @property
    def concurrency_key(self) -> str | None:
        """Redis key used for tracking concurrency for this specific argument value.
        Returns None when concurrency control is bypassed due to missing arguments.
        Raises RuntimeError if accessed before initialization."""
        if not self._initialized:
            raise RuntimeError(
                "ConcurrencyLimit not initialized - use within task context"
            )
        return self._concurrency_key

    @property
    def is_bypassed(self) -> bool:
        """Returns True if concurrency control is bypassed due to missing arguments."""
        return self._initialized and self._concurrency_key is None

concurrency_key property

Redis key used for tracking concurrency for this specific argument value. Returns None when concurrency control is bypassed due to missing arguments. Raises RuntimeError if accessed before initialization.

is_bypassed property

Returns True if concurrency control is bypassed due to missing arguments.

__init__(argument_name, max_concurrent=1, scope=None)

Parameters:

Name Type Description Default
argument_name str

The name of the task argument to use for concurrency grouping

required
max_concurrent int

Maximum number of concurrent tasks per unique argument value

1
scope str | None

Optional scope prefix for Redis keys (defaults to docket name)

None
Source code in src/docket/dependencies.py
def __init__(
    self, argument_name: str, max_concurrent: int = 1, scope: str | None = None
) -> None:
    """
    Args:
        argument_name: The name of the task argument to use for concurrency grouping
        max_concurrent: Maximum number of concurrent tasks per unique argument value
        scope: Optional scope prefix for Redis keys (defaults to docket name)
    """
    self.argument_name = argument_name
    self.max_concurrent = max_concurrent
    self.scope = scope
    self._concurrency_key: str | None = None
    self._initialized: bool = False

Docket

A Docket represents a collection of tasks that may be scheduled for later execution. With a Docket, you can add, replace, and cancel tasks. Example:

@task
async def my_task(greeting: str, recipient: str) -> None:
    print(f"{greeting}, {recipient}!")

async with Docket() as docket:
    docket.add(my_task)("Hello", recipient="world")
Source code in src/docket/docket.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
class Docket:
    """A Docket represents a collection of tasks that may be scheduled for later
    execution.  With a Docket, you can add, replace, and cancel tasks.
    Example:

    ```python
    @task
    async def my_task(greeting: str, recipient: str) -> None:
        print(f"{greeting}, {recipient}!")

    async with Docket() as docket:
        docket.add(my_task)("Hello", recipient="world")
    ```
    """

    tasks: dict[str, TaskFunction]
    strike_list: StrikeList

    _monitor_strikes_task: asyncio.Task[None]
    _connection_pool: ConnectionPool
    _schedule_task_script: _schedule_task | None
    _cancel_task_script: _cancel_task | None

    def __init__(
        self,
        name: str = "docket",
        url: str = "redis://localhost:6379/0",
        heartbeat_interval: timedelta = timedelta(seconds=2),
        missed_heartbeats: int = 5,
    ) -> None:
        """
        Args:
            name: The name of the docket.
            url: The URL of the Redis server.  For example:
                - "redis://localhost:6379/0"
                - "redis://user:password@localhost:6379/0"
                - "redis://user:password@localhost:6379/0?ssl=true"
                - "rediss://localhost:6379/0"
                - "unix:///path/to/redis.sock"
            heartbeat_interval: How often workers send heartbeat messages to the docket.
            missed_heartbeats: How many heartbeats a worker can miss before it is
                considered dead.
        """
        self.name = name
        self.url = url
        self.heartbeat_interval = heartbeat_interval
        self.missed_heartbeats = missed_heartbeats
        self._schedule_task_script = None
        self._cancel_task_script = None

    @property
    def worker_group_name(self) -> str:
        return "docket-workers"

    async def __aenter__(self) -> Self:
        from .tasks import standard_tasks

        self.tasks = {fn.__name__: fn for fn in standard_tasks}
        self.strike_list = StrikeList()

        self._connection_pool = ConnectionPool.from_url(self.url)  # type: ignore
        self._monitor_strikes_task = asyncio.create_task(self._monitor_strikes())

        # Ensure that the stream and worker group exist
        try:
            async with self.redis() as r:
                await r.xgroup_create(
                    groupname=self.worker_group_name,
                    name=self.stream_key,
                    id="0-0",
                    mkstream=True,
                )
        except redis.exceptions.RedisError as e:
            if "BUSYGROUP" not in repr(e):
                raise

        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        del self.tasks
        del self.strike_list

        self._monitor_strikes_task.cancel()
        try:
            await self._monitor_strikes_task
        except asyncio.CancelledError:
            pass

        await asyncio.shield(self._connection_pool.disconnect())
        del self._connection_pool

    @asynccontextmanager
    async def redis(self) -> AsyncGenerator[Redis, None]:
        r = Redis(connection_pool=self._connection_pool)
        await r.__aenter__()
        try:
            yield r
        finally:
            await asyncio.shield(r.__aexit__(None, None, None))

    def register(self, function: TaskFunction) -> None:
        """Register a task with the Docket.

        Args:
            function: The task to register.
        """
        from .dependencies import validate_dependencies

        validate_dependencies(function)

        self.tasks[function.__name__] = function

    def register_collection(self, collection_path: str) -> None:
        """
        Register a collection of tasks.

        Args:
            collection_path: A path in the format "module:collection".
        """
        module_name, _, member_name = collection_path.rpartition(":")
        module = importlib.import_module(module_name)
        collection = getattr(module, member_name)
        for function in collection:
            self.register(function)

    def labels(self) -> Mapping[str, str]:
        return {
            "docket.name": self.name,
        }

    @overload
    def add(
        self,
        function: Callable[P, Awaitable[R]],
        when: datetime | None = None,
        key: str | None = None,
    ) -> Callable[P, Awaitable[Execution]]:
        """Add a task to the Docket.

        Args:
            function: The task function to add.
            when: The time to schedule the task.
            key: The key to schedule the task under.
        """

    @overload
    def add(
        self,
        function: str,
        when: datetime | None = None,
        key: str | None = None,
    ) -> Callable[..., Awaitable[Execution]]:
        """Add a task to the Docket.

        Args:
            function: The name of a task to add.
            when: The time to schedule the task.
            key: The key to schedule the task under.
        """

    def add(
        self,
        function: Callable[P, Awaitable[R]] | str,
        when: datetime | None = None,
        key: str | None = None,
    ) -> Callable[..., Awaitable[Execution]]:
        """Add a task to the Docket.

        Args:
            function: The task to add.
            when: The time to schedule the task.
            key: The key to schedule the task under.
        """
        if isinstance(function, str):
            function = self.tasks[function]
        else:
            self.register(function)

        if when is None:
            when = datetime.now(timezone.utc)

        if key is None:
            key = str(uuid7())

        async def scheduler(*args: P.args, **kwargs: P.kwargs) -> Execution:
            execution = Execution(function, args, kwargs, when, key, attempt=1)

            async with self.redis() as redis:
                await self._schedule(redis, execution, replace=False)

            TASKS_ADDED.add(1, {**self.labels(), **execution.general_labels()})
            TASKS_SCHEDULED.add(1, {**self.labels(), **execution.general_labels()})

            return execution

        return scheduler

    @overload
    def replace(
        self,
        function: Callable[P, Awaitable[R]],
        when: datetime,
        key: str,
    ) -> Callable[P, Awaitable[Execution]]:
        """Replace a previously scheduled task on the Docket.

        Args:
            function: The task function to replace.
            when: The time to schedule the task.
            key: The key to schedule the task under.
        """

    @overload
    def replace(
        self,
        function: str,
        when: datetime,
        key: str,
    ) -> Callable[..., Awaitable[Execution]]:
        """Replace a previously scheduled task on the Docket.

        Args:
            function: The name of a task to replace.
            when: The time to schedule the task.
            key: The key to schedule the task under.
        """

    def replace(
        self,
        function: Callable[P, Awaitable[R]] | str,
        when: datetime,
        key: str,
    ) -> Callable[..., Awaitable[Execution]]:
        """Replace a previously scheduled task on the Docket.

        Args:
            function: The task to replace.
            when: The time to schedule the task.
            key: The key to schedule the task under.
        """
        if isinstance(function, str):
            function = self.tasks[function]

        async def scheduler(*args: P.args, **kwargs: P.kwargs) -> Execution:
            execution = Execution(function, args, kwargs, when, key, attempt=1)

            async with self.redis() as redis:
                await self._schedule(redis, execution, replace=True)

            TASKS_REPLACED.add(1, {**self.labels(), **execution.general_labels()})
            TASKS_CANCELLED.add(1, {**self.labels(), **execution.general_labels()})
            TASKS_SCHEDULED.add(1, {**self.labels(), **execution.general_labels()})

            return execution

        return scheduler

    async def schedule(self, execution: Execution) -> None:
        with tracer.start_as_current_span(
            "docket.schedule",
            attributes={
                **self.labels(),
                **execution.specific_labels(),
                "code.function.name": execution.function.__name__,
            },
        ):
            async with self.redis() as redis:
                await self._schedule(redis, execution, replace=False)

        TASKS_SCHEDULED.add(1, {**self.labels(), **execution.general_labels()})

    async def cancel(self, key: str) -> None:
        """Cancel a previously scheduled task on the Docket.

        Args:
            key: The key of the task to cancel.
        """
        with tracer.start_as_current_span(
            "docket.cancel",
            attributes={**self.labels(), "docket.key": key},
        ):
            async with self.redis() as redis:
                await self._cancel(redis, key)

        TASKS_CANCELLED.add(1, self.labels())

    @property
    def queue_key(self) -> str:
        return f"{self.name}:queue"

    @property
    def stream_key(self) -> str:
        return f"{self.name}:stream"

    def known_task_key(self, key: str) -> str:
        return f"{self.name}:known:{key}"

    def parked_task_key(self, key: str) -> str:
        return f"{self.name}:{key}"

    def stream_id_key(self, key: str) -> str:
        return f"{self.name}:stream-id:{key}"

    async def _schedule(
        self,
        redis: Redis,
        execution: Execution,
        replace: bool = False,
    ) -> None:
        """Schedule a task atomically.

        Handles:
        - Checking for task existence
        - Cancelling existing tasks when replacing
        - Adding tasks to stream (immediate) or queue (future)
        - Tracking stream message IDs for later cancellation
        """
        if self.strike_list.is_stricken(execution):
            logger.warning(
                "%r is stricken, skipping schedule of %r",
                execution.function.__name__,
                execution.key,
            )
            TASKS_STRICKEN.add(
                1,
                {
                    **self.labels(),
                    **execution.specific_labels(),
                    "docket.where": "docket",
                },
            )
            return

        message: dict[bytes, bytes] = execution.as_message()
        propagate.inject(message, setter=message_setter)

        key = execution.key
        when = execution.when
        known_task_key = self.known_task_key(key)
        is_immediate = when <= datetime.now(timezone.utc)

        # Lock per task key to prevent race conditions between concurrent operations
        async with redis.lock(f"{known_task_key}:lock", timeout=10):
            if self._schedule_task_script is None:
                self._schedule_task_script = cast(
                    _schedule_task,
                    redis.register_script(
                        # KEYS: stream_key, known_key, parked_key, queue_key, stream_id_key
                        # ARGV: task_key, when_timestamp, is_immediate, replace, ...message_fields
                        """
                        local stream_key = KEYS[1]
                        local known_key = KEYS[2]
                        local parked_key = KEYS[3]
                        local queue_key = KEYS[4]
                        local stream_id_key = KEYS[5]

                        local task_key = ARGV[1]
                        local when_timestamp = ARGV[2]
                        local is_immediate = ARGV[3] == '1'
                        local replace = ARGV[4] == '1'

                        -- Extract message fields from ARGV[5] onwards
                        local message = {}
                        for i = 5, #ARGV, 2 do
                            message[#message + 1] = ARGV[i]     -- field name
                            message[#message + 1] = ARGV[i + 1] -- field value
                        end

                        -- Handle replacement: cancel existing task if needed
                        if replace then
                            local existing_message_id = redis.call('GET', stream_id_key)
                            if existing_message_id then
                                redis.call('XDEL', stream_key, existing_message_id)
                            end
                            redis.call('DEL', known_key, parked_key, stream_id_key)
                            redis.call('ZREM', queue_key, task_key)
                        else
                            -- Check if task already exists
                            if redis.call('EXISTS', known_key) == 1 then
                                return 'EXISTS'
                            end
                        end

                        if is_immediate then
                            -- Add to stream and store message ID for later cancellation
                            local message_id = redis.call('XADD', stream_key, '*', unpack(message))
                            redis.call('SET', known_key, when_timestamp)
                            redis.call('SET', stream_id_key, message_id)
                            return message_id
                        else
                            -- Add to queue with task data in parked hash
                            redis.call('SET', known_key, when_timestamp)
                            redis.call('HSET', parked_key, unpack(message))
                            redis.call('ZADD', queue_key, when_timestamp, task_key)
                            return 'QUEUED'
                        end
                        """
                    ),
                )
            schedule_task = self._schedule_task_script

            await schedule_task(
                keys=[
                    self.stream_key,
                    known_task_key,
                    self.parked_task_key(key),
                    self.queue_key,
                    self.stream_id_key(key),
                ],
                args=[
                    key,
                    str(when.timestamp()),
                    "1" if is_immediate else "0",
                    "1" if replace else "0",
                    *[
                        item
                        for field, value in message.items()
                        for item in (field, value)
                    ],
                ],
            )

    async def _cancel(self, redis: Redis, key: str) -> None:
        """Cancel a task atomically.

        Handles cancellation regardless of task location:
        - From the stream (using stored message ID)
        - From the queue (scheduled tasks)
        - Cleans up all associated metadata keys
        """
        if self._cancel_task_script is None:
            self._cancel_task_script = cast(
                _cancel_task,
                redis.register_script(
                    # KEYS: stream_key, known_key, parked_key, queue_key, stream_id_key
                    # ARGV: task_key
                    """
                    local stream_key = KEYS[1]
                    local known_key = KEYS[2]
                    local parked_key = KEYS[3]
                    local queue_key = KEYS[4]
                    local stream_id_key = KEYS[5]
                    local task_key = ARGV[1]

                    -- Delete from stream if message ID exists
                    local message_id = redis.call('GET', stream_id_key)
                    if message_id then
                        redis.call('XDEL', stream_key, message_id)
                    end

                    -- Clean up all task-related keys
                    redis.call('DEL', known_key, parked_key, stream_id_key)
                    redis.call('ZREM', queue_key, task_key)

                    return 'OK'
                    """
                ),
            )
        cancel_task = self._cancel_task_script

        # Execute the cancellation script
        await cancel_task(
            keys=[
                self.stream_key,
                self.known_task_key(key),
                self.parked_task_key(key),
                self.queue_key,
                self.stream_id_key(key),
            ],
            args=[key],
        )

    @property
    def strike_key(self) -> str:
        return f"{self.name}:strikes"

    async def strike(
        self,
        function: Callable[P, Awaitable[R]] | str | None = None,
        parameter: str | None = None,
        operator: Operator | LiteralOperator = "==",
        value: Hashable | None = None,
    ) -> None:
        """Strike a task from the Docket.

        Args:
            function: The task to strike.
            parameter: The parameter to strike on.
            operator: The operator to use.
            value: The value to strike on.
        """
        if not isinstance(function, (str, type(None))):
            function = function.__name__

        operator = Operator(operator)

        strike = Strike(function, parameter, operator, value)
        return await self._send_strike_instruction(strike)

    async def restore(
        self,
        function: Callable[P, Awaitable[R]] | str | None = None,
        parameter: str | None = None,
        operator: Operator | LiteralOperator = "==",
        value: Hashable | None = None,
    ) -> None:
        """Restore a previously stricken task to the Docket.

        Args:
            function: The task to restore.
            parameter: The parameter to restore on.
            operator: The operator to use.
            value: The value to restore on.
        """
        if not isinstance(function, (str, type(None))):
            function = function.__name__

        operator = Operator(operator)

        restore = Restore(function, parameter, operator, value)
        return await self._send_strike_instruction(restore)

    async def _send_strike_instruction(self, instruction: StrikeInstruction) -> None:
        with tracer.start_as_current_span(
            f"docket.{instruction.direction}",
            attributes={
                **self.labels(),
                **instruction.labels(),
            },
        ):
            async with self.redis() as redis:
                message = instruction.as_message()
                await redis.xadd(self.strike_key, message)  # type: ignore[arg-type]
            self.strike_list.update(instruction)

    async def _monitor_strikes(self) -> NoReturn:
        last_id = "0-0"
        while True:
            try:
                async with self.redis() as r:
                    while True:
                        streams: RedisReadGroupResponse = await r.xread(
                            {self.strike_key: last_id},
                            count=100,
                            block=60_000,
                        )
                        for _, messages in streams:
                            for message_id, message in messages:
                                last_id = message_id
                                instruction = StrikeInstruction.from_message(message)
                                self.strike_list.update(instruction)
                                logger.info(
                                    "%s %r",
                                    (
                                        "Striking"
                                        if instruction.direction == "strike"
                                        else "Restoring"
                                    ),
                                    instruction.call_repr(),
                                    extra=self.labels(),
                                )

                                STRIKES_IN_EFFECT.add(
                                    1 if instruction.direction == "strike" else -1,
                                    {
                                        **self.labels(),
                                        **instruction.labels(),
                                    },
                                )

            except redis.exceptions.ConnectionError:  # pragma: no cover
                REDIS_DISRUPTIONS.add(1, {"docket": self.name})
                logger.warning("Connection error, sleeping for 1 second...")
                await asyncio.sleep(1)
            except Exception:  # pragma: no cover
                logger.exception("Error monitoring strikes")
                await asyncio.sleep(1)

    async def snapshot(self) -> DocketSnapshot:
        """Get a snapshot of the Docket, including which tasks are scheduled or currently
        running, as well as which workers are active.

        Returns:
            A snapshot of the Docket.
        """
        running: list[RunningExecution] = []
        future: list[Execution] = []

        async with self.redis() as r:
            async with r.pipeline() as pipeline:
                pipeline.xlen(self.stream_key)

                pipeline.zcard(self.queue_key)

                pipeline.xpending_range(
                    self.stream_key,
                    self.worker_group_name,
                    min="-",
                    max="+",
                    count=1000,
                )

                pipeline.xrange(self.stream_key, "-", "+", count=1000)

                pipeline.zrange(self.queue_key, 0, -1)

                total_stream_messages: int
                total_schedule_messages: int
                pending_messages: list[RedisStreamPendingMessage]
                stream_messages: list[tuple[RedisMessageID, RedisMessage]]
                scheduled_task_keys: list[bytes]

                now = datetime.now(timezone.utc)
                (
                    total_stream_messages,
                    total_schedule_messages,
                    pending_messages,
                    stream_messages,
                    scheduled_task_keys,
                ) = await pipeline.execute()

                for task_key in scheduled_task_keys:
                    pipeline.hgetall(self.parked_task_key(task_key.decode()))

                # Because these are two separate pipeline commands, it's possible that
                # a message has been moved from the schedule to the stream in the
                # meantime, which would end up being an empty `{}` message
                queued_messages: list[RedisMessage] = [
                    m for m in await pipeline.execute() if m
                ]

        total_tasks = total_stream_messages + total_schedule_messages

        pending_lookup: dict[RedisMessageID, RedisStreamPendingMessage] = {
            pending["message_id"]: pending for pending in pending_messages
        }

        for message_id, message in stream_messages:
            function = self.tasks[message[b"function"].decode()]
            execution = Execution.from_message(function, message)
            if message_id in pending_lookup:
                worker_name = pending_lookup[message_id]["consumer"].decode()
                started = now - timedelta(
                    milliseconds=pending_lookup[message_id]["time_since_delivered"]
                )
                running.append(RunningExecution(execution, worker_name, started))
            else:
                future.append(execution)  # pragma: no cover

        for message in queued_messages:
            function = self.tasks[message[b"function"].decode()]
            execution = Execution.from_message(function, message)
            future.append(execution)

        workers = await self.workers()

        return DocketSnapshot(now, total_tasks, future, running, workers)

    @property
    def workers_set(self) -> str:
        return f"{self.name}:workers"

    def worker_tasks_set(self, worker_name: str) -> str:
        return f"{self.name}:worker-tasks:{worker_name}"

    def task_workers_set(self, task_name: str) -> str:
        return f"{self.name}:task-workers:{task_name}"

    async def workers(self) -> Collection[WorkerInfo]:
        """Get a list of all workers that have sent heartbeats to the Docket.

        Returns:
            A list of all workers that have sent heartbeats to the Docket.
        """
        workers: list[WorkerInfo] = []

        oldest = datetime.now(timezone.utc).timestamp() - (
            self.heartbeat_interval.total_seconds() * self.missed_heartbeats
        )

        async with self.redis() as r:
            await r.zremrangebyscore(self.workers_set, 0, oldest)

            worker_name_bytes: bytes
            last_seen_timestamp: float

            for worker_name_bytes, last_seen_timestamp in await r.zrange(
                self.workers_set, 0, -1, withscores=True
            ):
                worker_name = worker_name_bytes.decode()
                last_seen = datetime.fromtimestamp(last_seen_timestamp, timezone.utc)

                task_names: set[str] = {
                    task_name_bytes.decode()
                    for task_name_bytes in cast(
                        set[bytes], await r.smembers(self.worker_tasks_set(worker_name))
                    )
                }

                workers.append(WorkerInfo(worker_name, last_seen, task_names))

        return workers

    async def task_workers(self, task_name: str) -> Collection[WorkerInfo]:
        """Get a list of all workers that are able to execute a given task.

        Args:
            task_name: The name of the task.

        Returns:
            A list of all workers that are able to execute the given task.
        """
        workers: list[WorkerInfo] = []
        oldest = datetime.now(timezone.utc).timestamp() - (
            self.heartbeat_interval.total_seconds() * self.missed_heartbeats
        )

        async with self.redis() as r:
            await r.zremrangebyscore(self.task_workers_set(task_name), 0, oldest)

            worker_name_bytes: bytes
            last_seen_timestamp: float

            for worker_name_bytes, last_seen_timestamp in await r.zrange(
                self.task_workers_set(task_name), 0, -1, withscores=True
            ):
                worker_name = worker_name_bytes.decode()
                last_seen = datetime.fromtimestamp(last_seen_timestamp, timezone.utc)

                task_names: set[str] = {
                    task_name_bytes.decode()
                    for task_name_bytes in cast(
                        set[bytes], await r.smembers(self.worker_tasks_set(worker_name))
                    )
                }

                workers.append(WorkerInfo(worker_name, last_seen, task_names))

        return workers

    async def clear(self) -> int:
        """Clear all pending and scheduled tasks from the docket.

        This removes all tasks from the stream (immediate tasks) and queue
        (scheduled tasks), along with their associated parked data. Running
        tasks are not affected.

        Returns:
            The total number of tasks that were cleared.
        """
        with tracer.start_as_current_span(
            "docket.clear",
            attributes=self.labels(),
        ):
            async with self.redis() as redis:
                async with redis.pipeline() as pipeline:
                    # Get counts before clearing
                    pipeline.xlen(self.stream_key)
                    pipeline.zcard(self.queue_key)
                    pipeline.zrange(self.queue_key, 0, -1)

                    stream_count: int
                    queue_count: int
                    scheduled_keys: list[bytes]
                    stream_count, queue_count, scheduled_keys = await pipeline.execute()

                    # Clear all data
                    # Trim stream to 0 messages instead of deleting it to preserve consumer group
                    if stream_count > 0:
                        pipeline.xtrim(self.stream_key, maxlen=0, approximate=False)
                    pipeline.delete(self.queue_key)

                    # Clear parked task data and known task keys
                    for key_bytes in scheduled_keys:
                        key = key_bytes.decode()
                        pipeline.delete(self.parked_task_key(key))
                        pipeline.delete(self.known_task_key(key))
                        pipeline.delete(self.stream_id_key(key))

                    await pipeline.execute()

                    total_cleared = stream_count + queue_count
                    return total_cleared

__init__(name='docket', url='redis://localhost:6379/0', heartbeat_interval=timedelta(seconds=2), missed_heartbeats=5)

Parameters:

Name Type Description Default
name str

The name of the docket.

'docket'
url str

The URL of the Redis server. For example: - "redis://localhost:6379/0" - "redis://user:password@localhost:6379/0" - "redis://user:password@localhost:6379/0?ssl=true" - "rediss://localhost:6379/0" - "unix:///path/to/redis.sock"

'redis://localhost:6379/0'
heartbeat_interval timedelta

How often workers send heartbeat messages to the docket.

timedelta(seconds=2)
missed_heartbeats int

How many heartbeats a worker can miss before it is considered dead.

5
Source code in src/docket/docket.py
def __init__(
    self,
    name: str = "docket",
    url: str = "redis://localhost:6379/0",
    heartbeat_interval: timedelta = timedelta(seconds=2),
    missed_heartbeats: int = 5,
) -> None:
    """
    Args:
        name: The name of the docket.
        url: The URL of the Redis server.  For example:
            - "redis://localhost:6379/0"
            - "redis://user:password@localhost:6379/0"
            - "redis://user:password@localhost:6379/0?ssl=true"
            - "rediss://localhost:6379/0"
            - "unix:///path/to/redis.sock"
        heartbeat_interval: How often workers send heartbeat messages to the docket.
        missed_heartbeats: How many heartbeats a worker can miss before it is
            considered dead.
    """
    self.name = name
    self.url = url
    self.heartbeat_interval = heartbeat_interval
    self.missed_heartbeats = missed_heartbeats
    self._schedule_task_script = None
    self._cancel_task_script = None

add(function, when=None, key=None)

add(
    function: Callable[P, Awaitable[R]],
    when: datetime | None = None,
    key: str | None = None,
) -> Callable[P, Awaitable[Execution]]
add(
    function: str,
    when: datetime | None = None,
    key: str | None = None,
) -> Callable[..., Awaitable[Execution]]

Add a task to the Docket.

Parameters:

Name Type Description Default
function Callable[P, Awaitable[R]] | str

The task to add.

required
when datetime | None

The time to schedule the task.

None
key str | None

The key to schedule the task under.

None
Source code in src/docket/docket.py
def add(
    self,
    function: Callable[P, Awaitable[R]] | str,
    when: datetime | None = None,
    key: str | None = None,
) -> Callable[..., Awaitable[Execution]]:
    """Add a task to the Docket.

    Args:
        function: The task to add.
        when: The time to schedule the task.
        key: The key to schedule the task under.
    """
    if isinstance(function, str):
        function = self.tasks[function]
    else:
        self.register(function)

    if when is None:
        when = datetime.now(timezone.utc)

    if key is None:
        key = str(uuid7())

    async def scheduler(*args: P.args, **kwargs: P.kwargs) -> Execution:
        execution = Execution(function, args, kwargs, when, key, attempt=1)

        async with self.redis() as redis:
            await self._schedule(redis, execution, replace=False)

        TASKS_ADDED.add(1, {**self.labels(), **execution.general_labels()})
        TASKS_SCHEDULED.add(1, {**self.labels(), **execution.general_labels()})

        return execution

    return scheduler

cancel(key) async

Cancel a previously scheduled task on the Docket.

Parameters:

Name Type Description Default
key str

The key of the task to cancel.

required
Source code in src/docket/docket.py
async def cancel(self, key: str) -> None:
    """Cancel a previously scheduled task on the Docket.

    Args:
        key: The key of the task to cancel.
    """
    with tracer.start_as_current_span(
        "docket.cancel",
        attributes={**self.labels(), "docket.key": key},
    ):
        async with self.redis() as redis:
            await self._cancel(redis, key)

    TASKS_CANCELLED.add(1, self.labels())

clear() async

Clear all pending and scheduled tasks from the docket.

This removes all tasks from the stream (immediate tasks) and queue (scheduled tasks), along with their associated parked data. Running tasks are not affected.

Returns:

Type Description
int

The total number of tasks that were cleared.

Source code in src/docket/docket.py
async def clear(self) -> int:
    """Clear all pending and scheduled tasks from the docket.

    This removes all tasks from the stream (immediate tasks) and queue
    (scheduled tasks), along with their associated parked data. Running
    tasks are not affected.

    Returns:
        The total number of tasks that were cleared.
    """
    with tracer.start_as_current_span(
        "docket.clear",
        attributes=self.labels(),
    ):
        async with self.redis() as redis:
            async with redis.pipeline() as pipeline:
                # Get counts before clearing
                pipeline.xlen(self.stream_key)
                pipeline.zcard(self.queue_key)
                pipeline.zrange(self.queue_key, 0, -1)

                stream_count: int
                queue_count: int
                scheduled_keys: list[bytes]
                stream_count, queue_count, scheduled_keys = await pipeline.execute()

                # Clear all data
                # Trim stream to 0 messages instead of deleting it to preserve consumer group
                if stream_count > 0:
                    pipeline.xtrim(self.stream_key, maxlen=0, approximate=False)
                pipeline.delete(self.queue_key)

                # Clear parked task data and known task keys
                for key_bytes in scheduled_keys:
                    key = key_bytes.decode()
                    pipeline.delete(self.parked_task_key(key))
                    pipeline.delete(self.known_task_key(key))
                    pipeline.delete(self.stream_id_key(key))

                await pipeline.execute()

                total_cleared = stream_count + queue_count
                return total_cleared

register(function)

Register a task with the Docket.

Parameters:

Name Type Description Default
function TaskFunction

The task to register.

required
Source code in src/docket/docket.py
def register(self, function: TaskFunction) -> None:
    """Register a task with the Docket.

    Args:
        function: The task to register.
    """
    from .dependencies import validate_dependencies

    validate_dependencies(function)

    self.tasks[function.__name__] = function

register_collection(collection_path)

Register a collection of tasks.

Parameters:

Name Type Description Default
collection_path str

A path in the format "module:collection".

required
Source code in src/docket/docket.py
def register_collection(self, collection_path: str) -> None:
    """
    Register a collection of tasks.

    Args:
        collection_path: A path in the format "module:collection".
    """
    module_name, _, member_name = collection_path.rpartition(":")
    module = importlib.import_module(module_name)
    collection = getattr(module, member_name)
    for function in collection:
        self.register(function)

replace(function, when, key)

replace(
    function: Callable[P, Awaitable[R]],
    when: datetime,
    key: str,
) -> Callable[P, Awaitable[Execution]]
replace(
    function: str, when: datetime, key: str
) -> Callable[..., Awaitable[Execution]]

Replace a previously scheduled task on the Docket.

Parameters:

Name Type Description Default
function Callable[P, Awaitable[R]] | str

The task to replace.

required
when datetime

The time to schedule the task.

required
key str

The key to schedule the task under.

required
Source code in src/docket/docket.py
def replace(
    self,
    function: Callable[P, Awaitable[R]] | str,
    when: datetime,
    key: str,
) -> Callable[..., Awaitable[Execution]]:
    """Replace a previously scheduled task on the Docket.

    Args:
        function: The task to replace.
        when: The time to schedule the task.
        key: The key to schedule the task under.
    """
    if isinstance(function, str):
        function = self.tasks[function]

    async def scheduler(*args: P.args, **kwargs: P.kwargs) -> Execution:
        execution = Execution(function, args, kwargs, when, key, attempt=1)

        async with self.redis() as redis:
            await self._schedule(redis, execution, replace=True)

        TASKS_REPLACED.add(1, {**self.labels(), **execution.general_labels()})
        TASKS_CANCELLED.add(1, {**self.labels(), **execution.general_labels()})
        TASKS_SCHEDULED.add(1, {**self.labels(), **execution.general_labels()})

        return execution

    return scheduler

restore(function=None, parameter=None, operator='==', value=None) async

Restore a previously stricken task to the Docket.

Parameters:

Name Type Description Default
function Callable[P, Awaitable[R]] | str | None

The task to restore.

None
parameter str | None

The parameter to restore on.

None
operator Operator | LiteralOperator

The operator to use.

'=='
value Hashable | None

The value to restore on.

None
Source code in src/docket/docket.py
async def restore(
    self,
    function: Callable[P, Awaitable[R]] | str | None = None,
    parameter: str | None = None,
    operator: Operator | LiteralOperator = "==",
    value: Hashable | None = None,
) -> None:
    """Restore a previously stricken task to the Docket.

    Args:
        function: The task to restore.
        parameter: The parameter to restore on.
        operator: The operator to use.
        value: The value to restore on.
    """
    if not isinstance(function, (str, type(None))):
        function = function.__name__

    operator = Operator(operator)

    restore = Restore(function, parameter, operator, value)
    return await self._send_strike_instruction(restore)

snapshot() async

Get a snapshot of the Docket, including which tasks are scheduled or currently running, as well as which workers are active.

Returns:

Type Description
DocketSnapshot

A snapshot of the Docket.

Source code in src/docket/docket.py
async def snapshot(self) -> DocketSnapshot:
    """Get a snapshot of the Docket, including which tasks are scheduled or currently
    running, as well as which workers are active.

    Returns:
        A snapshot of the Docket.
    """
    running: list[RunningExecution] = []
    future: list[Execution] = []

    async with self.redis() as r:
        async with r.pipeline() as pipeline:
            pipeline.xlen(self.stream_key)

            pipeline.zcard(self.queue_key)

            pipeline.xpending_range(
                self.stream_key,
                self.worker_group_name,
                min="-",
                max="+",
                count=1000,
            )

            pipeline.xrange(self.stream_key, "-", "+", count=1000)

            pipeline.zrange(self.queue_key, 0, -1)

            total_stream_messages: int
            total_schedule_messages: int
            pending_messages: list[RedisStreamPendingMessage]
            stream_messages: list[tuple[RedisMessageID, RedisMessage]]
            scheduled_task_keys: list[bytes]

            now = datetime.now(timezone.utc)
            (
                total_stream_messages,
                total_schedule_messages,
                pending_messages,
                stream_messages,
                scheduled_task_keys,
            ) = await pipeline.execute()

            for task_key in scheduled_task_keys:
                pipeline.hgetall(self.parked_task_key(task_key.decode()))

            # Because these are two separate pipeline commands, it's possible that
            # a message has been moved from the schedule to the stream in the
            # meantime, which would end up being an empty `{}` message
            queued_messages: list[RedisMessage] = [
                m for m in await pipeline.execute() if m
            ]

    total_tasks = total_stream_messages + total_schedule_messages

    pending_lookup: dict[RedisMessageID, RedisStreamPendingMessage] = {
        pending["message_id"]: pending for pending in pending_messages
    }

    for message_id, message in stream_messages:
        function = self.tasks[message[b"function"].decode()]
        execution = Execution.from_message(function, message)
        if message_id in pending_lookup:
            worker_name = pending_lookup[message_id]["consumer"].decode()
            started = now - timedelta(
                milliseconds=pending_lookup[message_id]["time_since_delivered"]
            )
            running.append(RunningExecution(execution, worker_name, started))
        else:
            future.append(execution)  # pragma: no cover

    for message in queued_messages:
        function = self.tasks[message[b"function"].decode()]
        execution = Execution.from_message(function, message)
        future.append(execution)

    workers = await self.workers()

    return DocketSnapshot(now, total_tasks, future, running, workers)

strike(function=None, parameter=None, operator='==', value=None) async

Strike a task from the Docket.

Parameters:

Name Type Description Default
function Callable[P, Awaitable[R]] | str | None

The task to strike.

None
parameter str | None

The parameter to strike on.

None
operator Operator | LiteralOperator

The operator to use.

'=='
value Hashable | None

The value to strike on.

None
Source code in src/docket/docket.py
async def strike(
    self,
    function: Callable[P, Awaitable[R]] | str | None = None,
    parameter: str | None = None,
    operator: Operator | LiteralOperator = "==",
    value: Hashable | None = None,
) -> None:
    """Strike a task from the Docket.

    Args:
        function: The task to strike.
        parameter: The parameter to strike on.
        operator: The operator to use.
        value: The value to strike on.
    """
    if not isinstance(function, (str, type(None))):
        function = function.__name__

    operator = Operator(operator)

    strike = Strike(function, parameter, operator, value)
    return await self._send_strike_instruction(strike)

task_workers(task_name) async

Get a list of all workers that are able to execute a given task.

Parameters:

Name Type Description Default
task_name str

The name of the task.

required

Returns:

Type Description
Collection[WorkerInfo]

A list of all workers that are able to execute the given task.

Source code in src/docket/docket.py
async def task_workers(self, task_name: str) -> Collection[WorkerInfo]:
    """Get a list of all workers that are able to execute a given task.

    Args:
        task_name: The name of the task.

    Returns:
        A list of all workers that are able to execute the given task.
    """
    workers: list[WorkerInfo] = []
    oldest = datetime.now(timezone.utc).timestamp() - (
        self.heartbeat_interval.total_seconds() * self.missed_heartbeats
    )

    async with self.redis() as r:
        await r.zremrangebyscore(self.task_workers_set(task_name), 0, oldest)

        worker_name_bytes: bytes
        last_seen_timestamp: float

        for worker_name_bytes, last_seen_timestamp in await r.zrange(
            self.task_workers_set(task_name), 0, -1, withscores=True
        ):
            worker_name = worker_name_bytes.decode()
            last_seen = datetime.fromtimestamp(last_seen_timestamp, timezone.utc)

            task_names: set[str] = {
                task_name_bytes.decode()
                for task_name_bytes in cast(
                    set[bytes], await r.smembers(self.worker_tasks_set(worker_name))
                )
            }

            workers.append(WorkerInfo(worker_name, last_seen, task_names))

    return workers

workers() async

Get a list of all workers that have sent heartbeats to the Docket.

Returns:

Type Description
Collection[WorkerInfo]

A list of all workers that have sent heartbeats to the Docket.

Source code in src/docket/docket.py
async def workers(self) -> Collection[WorkerInfo]:
    """Get a list of all workers that have sent heartbeats to the Docket.

    Returns:
        A list of all workers that have sent heartbeats to the Docket.
    """
    workers: list[WorkerInfo] = []

    oldest = datetime.now(timezone.utc).timestamp() - (
        self.heartbeat_interval.total_seconds() * self.missed_heartbeats
    )

    async with self.redis() as r:
        await r.zremrangebyscore(self.workers_set, 0, oldest)

        worker_name_bytes: bytes
        last_seen_timestamp: float

        for worker_name_bytes, last_seen_timestamp in await r.zrange(
            self.workers_set, 0, -1, withscores=True
        ):
            worker_name = worker_name_bytes.decode()
            last_seen = datetime.fromtimestamp(last_seen_timestamp, timezone.utc)

            task_names: set[str] = {
                task_name_bytes.decode()
                for task_name_bytes in cast(
                    set[bytes], await r.smembers(self.worker_tasks_set(worker_name))
                )
            }

            workers.append(WorkerInfo(worker_name, last_seen, task_names))

    return workers

ExponentialRetry

Bases: Retry

Configures exponential retries for a task. You can specify the total number of attempts (or None to retry indefinitely), and the minimum and maximum delays between attempts.

Example:

@task
async def my_task(retry: ExponentialRetry = ExponentialRetry(attempts=3)) -> None:
    ...
Source code in src/docket/dependencies.py
class ExponentialRetry(Retry):
    """Configures exponential retries for a task.  You can specify the total number
    of attempts (or `None` to retry indefinitely), and the minimum and maximum delays
    between attempts.

    Example:

    ```python
    @task
    async def my_task(retry: ExponentialRetry = ExponentialRetry(attempts=3)) -> None:
        ...
    ```
    """

    def __init__(
        self,
        attempts: int | None = 1,
        minimum_delay: timedelta = timedelta(seconds=1),
        maximum_delay: timedelta = timedelta(seconds=64),
    ) -> None:
        """
        Args:
            attempts: The total number of attempts to make.  If `None`, the task will
                be retried indefinitely.
            minimum_delay: The minimum delay between attempts.
            maximum_delay: The maximum delay between attempts.
        """
        super().__init__(attempts=attempts, delay=minimum_delay)
        self.maximum_delay = maximum_delay

    async def __aenter__(self) -> "ExponentialRetry":
        execution = self.execution.get()

        retry = ExponentialRetry(
            attempts=self.attempts,
            minimum_delay=self.delay,
            maximum_delay=self.maximum_delay,
        )
        retry.attempt = execution.attempt

        if execution.attempt > 1:
            backoff_factor = 2 ** (execution.attempt - 1)
            calculated_delay = self.delay * backoff_factor

            if calculated_delay > self.maximum_delay:
                retry.delay = self.maximum_delay
            else:
                retry.delay = calculated_delay

        return retry

__init__(attempts=1, minimum_delay=timedelta(seconds=1), maximum_delay=timedelta(seconds=64))

Parameters:

Name Type Description Default
attempts int | None

The total number of attempts to make. If None, the task will be retried indefinitely.

1
minimum_delay timedelta

The minimum delay between attempts.

timedelta(seconds=1)
maximum_delay timedelta

The maximum delay between attempts.

timedelta(seconds=64)
Source code in src/docket/dependencies.py
def __init__(
    self,
    attempts: int | None = 1,
    minimum_delay: timedelta = timedelta(seconds=1),
    maximum_delay: timedelta = timedelta(seconds=64),
) -> None:
    """
    Args:
        attempts: The total number of attempts to make.  If `None`, the task will
            be retried indefinitely.
        minimum_delay: The minimum delay between attempts.
        maximum_delay: The maximum delay between attempts.
    """
    super().__init__(attempts=attempts, delay=minimum_delay)
    self.maximum_delay = maximum_delay

Logged

Bases: Annotation

Instructs docket to include arguments to this parameter in the log.

If length_only is True, only the length of the argument will be included in the log.

Example:

@task
def setup_new_customer(
    customer_id: Annotated[int, Logged],
    addresses: Annotated[list[Address], Logged(length_only=True)],
    password: str,
) -> None:
    ...

In the logs, you's see the task referenced as:

setup_new_customer(customer_id=123, addresses[len 2], password=...)
Source code in src/docket/annotations.py
class Logged(Annotation):
    """Instructs docket to include arguments to this parameter in the log.

    If `length_only` is `True`, only the length of the argument will be included in
    the log.

    Example:

    ```python
    @task
    def setup_new_customer(
        customer_id: Annotated[int, Logged],
        addresses: Annotated[list[Address], Logged(length_only=True)],
        password: str,
    ) -> None:
        ...
    ```

    In the logs, you's see the task referenced as:

    ```
    setup_new_customer(customer_id=123, addresses[len 2], password=...)
    ```
    """

    length_only: bool = False

    def __init__(self, length_only: bool = False) -> None:
        self.length_only = length_only

    def format(self, argument: Any) -> str:
        if self.length_only:
            if isinstance(argument, (dict, set)):
                return f"{{len {len(argument)}}}"
            elif isinstance(argument, tuple):
                return f"(len {len(argument)})"
            elif hasattr(argument, "__len__"):
                return f"[len {len(argument)}]"

        return repr(argument)

Perpetual

Bases: Dependency

Declare a task that should be run perpetually. Perpetual tasks are automatically rescheduled for the future after they finish (whether they succeed or fail). A perpetual task can be scheduled at worker startup with the automatic=True.

Example:

@task
async def my_task(perpetual: Perpetual = Perpetual()) -> None:
    ...
Source code in src/docket/dependencies.py
class Perpetual(Dependency):
    """Declare a task that should be run perpetually.  Perpetual tasks are automatically
    rescheduled for the future after they finish (whether they succeed or fail).  A
    perpetual task can be scheduled at worker startup with the `automatic=True`.

    Example:

    ```python
    @task
    async def my_task(perpetual: Perpetual = Perpetual()) -> None:
        ...
    ```
    """

    single = True

    every: timedelta
    automatic: bool

    args: tuple[Any, ...]
    kwargs: dict[str, Any]

    cancelled: bool

    def __init__(
        self,
        every: timedelta = timedelta(0),
        automatic: bool = False,
    ) -> None:
        """
        Args:
            every: The target interval between task executions.
            automatic: If set, this task will be automatically scheduled during worker
                startup and continually through the worker's lifespan.  This ensures
                that the task will always be scheduled despite crashes and other
                adverse conditions.  Automatic tasks must not require any arguments.
        """
        self.every = every
        self.automatic = automatic
        self.cancelled = False

    async def __aenter__(self) -> "Perpetual":
        execution = self.execution.get()
        perpetual = Perpetual(every=self.every)
        perpetual.args = execution.args
        perpetual.kwargs = execution.kwargs
        return perpetual

    def cancel(self) -> None:
        self.cancelled = True

    def perpetuate(self, *args: Any, **kwargs: Any) -> None:
        self.args = args
        self.kwargs = kwargs

__init__(every=timedelta(0), automatic=False)

Parameters:

Name Type Description Default
every timedelta

The target interval between task executions.

timedelta(0)
automatic bool

If set, this task will be automatically scheduled during worker startup and continually through the worker's lifespan. This ensures that the task will always be scheduled despite crashes and other adverse conditions. Automatic tasks must not require any arguments.

False
Source code in src/docket/dependencies.py
def __init__(
    self,
    every: timedelta = timedelta(0),
    automatic: bool = False,
) -> None:
    """
    Args:
        every: The target interval between task executions.
        automatic: If set, this task will be automatically scheduled during worker
            startup and continually through the worker's lifespan.  This ensures
            that the task will always be scheduled despite crashes and other
            adverse conditions.  Automatic tasks must not require any arguments.
    """
    self.every = every
    self.automatic = automatic
    self.cancelled = False

Retry

Bases: Dependency

Configures linear retries for a task. You can specify the total number of attempts (or None to retry indefinitely), and the delay between attempts.

Example:

@task
async def my_task(retry: Retry = Retry(attempts=3)) -> None:
    ...
Source code in src/docket/dependencies.py
class Retry(Dependency):
    """Configures linear retries for a task.  You can specify the total number of
    attempts (or `None` to retry indefinitely), and the delay between attempts.

    Example:

    ```python
    @task
    async def my_task(retry: Retry = Retry(attempts=3)) -> None:
        ...
    ```
    """

    single: bool = True

    def __init__(
        self, attempts: int | None = 1, delay: timedelta = timedelta(0)
    ) -> None:
        """
        Args:
            attempts: The total number of attempts to make.  If `None`, the task will
                be retried indefinitely.
            delay: The delay between attempts.
        """
        self.attempts = attempts
        self.delay = delay
        self.attempt = 1

    async def __aenter__(self) -> "Retry":
        execution = self.execution.get()
        retry = Retry(attempts=self.attempts, delay=self.delay)
        retry.attempt = execution.attempt
        return retry

    def at(self, when: datetime) -> NoReturn:
        now = datetime.now(timezone.utc)
        diff = when - now
        diff = diff if diff.total_seconds() >= 0 else timedelta(0)

        self.in_(diff)

    def in_(self, when: timedelta) -> NoReturn:
        self.delay: timedelta = when
        raise ForcedRetry()

__init__(attempts=1, delay=timedelta(0))

Parameters:

Name Type Description Default
attempts int | None

The total number of attempts to make. If None, the task will be retried indefinitely.

1
delay timedelta

The delay between attempts.

timedelta(0)
Source code in src/docket/dependencies.py
def __init__(
    self, attempts: int | None = 1, delay: timedelta = timedelta(0)
) -> None:
    """
    Args:
        attempts: The total number of attempts to make.  If `None`, the task will
            be retried indefinitely.
        delay: The delay between attempts.
    """
    self.attempts = attempts
    self.delay = delay
    self.attempt = 1

Timeout

Bases: Dependency

Configures a timeout for a task. You can specify the base timeout, and the task will be cancelled if it exceeds this duration. The timeout may be extended within the context of a single running task.

Example:

@task
async def my_task(timeout: Timeout = Timeout(timedelta(seconds=10))) -> None:
    ...
Source code in src/docket/dependencies.py
class Timeout(Dependency):
    """Configures a timeout for a task.  You can specify the base timeout, and the
    task will be cancelled if it exceeds this duration.  The timeout may be extended
    within the context of a single running task.

    Example:

    ```python
    @task
    async def my_task(timeout: Timeout = Timeout(timedelta(seconds=10))) -> None:
        ...
    ```
    """

    single: bool = True

    base: timedelta
    _deadline: float

    def __init__(self, base: timedelta) -> None:
        """
        Args:
            base: The base timeout duration.
        """
        self.base = base

    async def __aenter__(self) -> "Timeout":
        timeout = Timeout(base=self.base)
        timeout.start()
        return timeout

    def start(self) -> None:
        self._deadline = time.monotonic() + self.base.total_seconds()

    def expired(self) -> bool:
        return time.monotonic() >= self._deadline

    def remaining(self) -> timedelta:
        """Get the remaining time until the timeout expires."""
        return timedelta(seconds=self._deadline - time.monotonic())

    def extend(self, by: timedelta | None = None) -> None:
        """Extend the timeout by a given duration.  If no duration is provided, the
        base timeout will be used.

        Args:
            by: The duration to extend the timeout by.
        """
        if by is None:
            by = self.base
        self._deadline += by.total_seconds()

__init__(base)

Parameters:

Name Type Description Default
base timedelta

The base timeout duration.

required
Source code in src/docket/dependencies.py
def __init__(self, base: timedelta) -> None:
    """
    Args:
        base: The base timeout duration.
    """
    self.base = base

extend(by=None)

Extend the timeout by a given duration. If no duration is provided, the base timeout will be used.

Parameters:

Name Type Description Default
by timedelta | None

The duration to extend the timeout by.

None
Source code in src/docket/dependencies.py
def extend(self, by: timedelta | None = None) -> None:
    """Extend the timeout by a given duration.  If no duration is provided, the
    base timeout will be used.

    Args:
        by: The duration to extend the timeout by.
    """
    if by is None:
        by = self.base
    self._deadline += by.total_seconds()

remaining()

Get the remaining time until the timeout expires.

Source code in src/docket/dependencies.py
def remaining(self) -> timedelta:
    """Get the remaining time until the timeout expires."""
    return timedelta(seconds=self._deadline - time.monotonic())

Worker

A Worker executes tasks on a Docket. You may run as many workers as you like to work a single Docket.

Example:

async with Docket() as docket:
    async with Worker(docket) as worker:
        await worker.run_forever()
Source code in src/docket/worker.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
class Worker:
    """A Worker executes tasks on a Docket.  You may run as many workers as you like
    to work a single Docket.

    Example:

    ```python
    async with Docket() as docket:
        async with Worker(docket) as worker:
            await worker.run_forever()
    ```
    """

    docket: Docket
    name: str
    concurrency: int
    redelivery_timeout: timedelta
    reconnection_delay: timedelta
    minimum_check_interval: timedelta
    scheduling_resolution: timedelta
    schedule_automatic_tasks: bool

    def __init__(
        self,
        docket: Docket,
        name: str | None = None,
        concurrency: int = 10,
        redelivery_timeout: timedelta = timedelta(minutes=5),
        reconnection_delay: timedelta = timedelta(seconds=5),
        minimum_check_interval: timedelta = timedelta(milliseconds=250),
        scheduling_resolution: timedelta = timedelta(milliseconds=250),
        schedule_automatic_tasks: bool = True,
    ) -> None:
        self.docket = docket
        self.name = name or f"{socket.gethostname()}#{os.getpid()}"
        self.concurrency = concurrency
        self.redelivery_timeout = redelivery_timeout
        self.reconnection_delay = reconnection_delay
        self.minimum_check_interval = minimum_check_interval
        self.scheduling_resolution = scheduling_resolution
        self.schedule_automatic_tasks = schedule_automatic_tasks

    async def __aenter__(self) -> Self:
        self._heartbeat_task = asyncio.create_task(self._heartbeat())
        self._execution_counts = {}
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        del self._execution_counts

        self._heartbeat_task.cancel()
        try:
            await self._heartbeat_task
        except asyncio.CancelledError:
            pass
        del self._heartbeat_task

    def labels(self) -> Mapping[str, str]:
        return {
            **self.docket.labels(),
            "docket.worker": self.name,
        }

    def _log_context(self) -> Mapping[str, str]:
        return {
            **self.labels(),
            "docket.queue_key": self.docket.queue_key,
            "docket.stream_key": self.docket.stream_key,
        }

    @classmethod
    async def run(
        cls,
        docket_name: str = "docket",
        url: str = "redis://localhost:6379/0",
        name: str | None = None,
        concurrency: int = 10,
        redelivery_timeout: timedelta = timedelta(minutes=5),
        reconnection_delay: timedelta = timedelta(seconds=5),
        minimum_check_interval: timedelta = timedelta(milliseconds=100),
        scheduling_resolution: timedelta = timedelta(milliseconds=250),
        schedule_automatic_tasks: bool = True,
        until_finished: bool = False,
        healthcheck_port: int | None = None,
        metrics_port: int | None = None,
        tasks: list[str] = ["docket.tasks:standard_tasks"],
    ) -> None:
        with (
            healthcheck_server(port=healthcheck_port),
            metrics_server(port=metrics_port),
        ):
            async with Docket(name=docket_name, url=url) as docket:
                for task_path in tasks:
                    docket.register_collection(task_path)

                async with Worker(
                    docket=docket,
                    name=name,
                    concurrency=concurrency,
                    redelivery_timeout=redelivery_timeout,
                    reconnection_delay=reconnection_delay,
                    minimum_check_interval=minimum_check_interval,
                    scheduling_resolution=scheduling_resolution,
                    schedule_automatic_tasks=schedule_automatic_tasks,
                ) as worker:
                    if until_finished:
                        await worker.run_until_finished()
                    else:
                        await worker.run_forever()  # pragma: no cover

    async def run_until_finished(self) -> None:
        """Run the worker until there are no more tasks to process."""
        return await self._run(forever=False)

    async def run_forever(self) -> None:
        """Run the worker indefinitely."""
        return await self._run(forever=True)  # pragma: no cover

    _execution_counts: dict[str, int]

    async def run_at_most(self, iterations_by_key: Mapping[str, int]) -> None:
        """
        Run the worker until there are no more tasks to process, but limit specified
        task keys to a maximum number of iterations.

        This is particularly useful for testing self-perpetuating tasks that would
        otherwise run indefinitely.

        Args:
            iterations_by_key: Maps task keys to their maximum allowed executions
        """
        self._execution_counts = {key: 0 for key in iterations_by_key}

        def has_reached_max_iterations(execution: Execution) -> bool:
            key = execution.key

            if key not in iterations_by_key:
                return False

            if self._execution_counts[key] >= iterations_by_key[key]:
                return True

            return False

        self.docket.strike_list.add_condition(has_reached_max_iterations)
        try:
            await self.run_until_finished()
        finally:
            self.docket.strike_list.remove_condition(has_reached_max_iterations)
            self._execution_counts = {}

    async def _run(self, forever: bool = False) -> None:
        self._startup_log()

        while True:
            try:
                async with self.docket.redis() as redis:
                    return await self._worker_loop(redis, forever=forever)
            except ConnectionError:
                REDIS_DISRUPTIONS.add(1, self.labels())
                logger.warning(
                    "Error connecting to redis, retrying in %s...",
                    self.reconnection_delay,
                    exc_info=True,
                )
                await asyncio.sleep(self.reconnection_delay.total_seconds())

    async def _worker_loop(self, redis: Redis, forever: bool = False):
        worker_stopping = asyncio.Event()

        if self.schedule_automatic_tasks:
            await self._schedule_all_automatic_perpetual_tasks()

        scheduler_task = asyncio.create_task(
            self._scheduler_loop(redis, worker_stopping)
        )

        active_tasks: dict[asyncio.Task[None], RedisMessageID] = {}
        task_executions: dict[asyncio.Task[None], Execution] = {}
        available_slots = self.concurrency

        log_context = self._log_context()

        async def check_for_work() -> bool:
            logger.debug("Checking for work", extra=log_context)
            async with redis.pipeline() as pipeline:
                pipeline.xlen(self.docket.stream_key)
                pipeline.zcard(self.docket.queue_key)
                results: list[int] = await pipeline.execute()
                stream_len = results[0]
                queue_len = results[1]
                return stream_len > 0 or queue_len > 0

        async def get_redeliveries(redis: Redis) -> RedisReadGroupResponse:
            logger.debug("Getting redeliveries", extra=log_context)
            _, redeliveries, *_ = await redis.xautoclaim(
                name=self.docket.stream_key,
                groupname=self.docket.worker_group_name,
                consumername=self.name,
                min_idle_time=int(self.redelivery_timeout.total_seconds() * 1000),
                start_id="0-0",
                count=available_slots,
            )
            return [(b"__redelivery__", redeliveries)]

        async def get_new_deliveries(redis: Redis) -> RedisReadGroupResponse:
            logger.debug("Getting new deliveries", extra=log_context)
            return await redis.xreadgroup(
                groupname=self.docket.worker_group_name,
                consumername=self.name,
                streams={self.docket.stream_key: ">"},
                block=int(self.minimum_check_interval.total_seconds() * 1000),
                count=available_slots,
            )

        def start_task(
            message_id: RedisMessageID,
            message: RedisMessage,
            is_redelivery: bool = False,
        ) -> bool:
            function_name = message[b"function"].decode()
            if not (function := self.docket.tasks.get(function_name)):
                logger.warning(
                    "Task function %r not found",
                    function_name,
                    extra=log_context,
                )
                return False

            execution = Execution.from_message(function, message)
            execution.redelivered = is_redelivery

            task = asyncio.create_task(self._execute(execution), name=execution.key)
            active_tasks[task] = message_id
            task_executions[task] = execution

            nonlocal available_slots
            available_slots -= 1

            return True

        async def process_completed_tasks() -> None:
            completed_tasks = {task for task in active_tasks if task.done()}
            for task in completed_tasks:
                message_id = active_tasks.pop(task)
                task_executions.pop(task)
                await task
                await ack_message(redis, message_id)

        async def ack_message(redis: Redis, message_id: RedisMessageID) -> None:
            logger.debug("Acknowledging message", extra=log_context)
            async with redis.pipeline() as pipeline:
                pipeline.xack(
                    self.docket.stream_key,
                    self.docket.worker_group_name,
                    message_id,
                )
                pipeline.xdel(
                    self.docket.stream_key,
                    message_id,
                )
                await pipeline.execute()

        has_work: bool = True

        try:
            while forever or has_work or active_tasks:
                await process_completed_tasks()

                available_slots = self.concurrency - len(active_tasks)

                if available_slots <= 0:
                    await asyncio.sleep(self.minimum_check_interval.total_seconds())
                    continue

                for source in [get_redeliveries, get_new_deliveries]:
                    for stream_key, messages in await source(redis):
                        is_redelivery = stream_key == b"__redelivery__"
                        for message_id, message in messages:
                            if not message:  # pragma: no cover
                                continue

                            task_started = start_task(
                                message_id, message, is_redelivery
                            )
                            if not task_started:
                                # Other errors - delete and ack
                                await self._delete_known_task(redis, message)
                                await ack_message(redis, message_id)

                    if available_slots <= 0:
                        break

                if not forever and not active_tasks:
                    has_work = await check_for_work()

        except asyncio.CancelledError:
            if active_tasks:  # pragma: no cover
                logger.info(
                    "Shutdown requested, finishing %d active tasks...",
                    len(active_tasks),
                    extra=log_context,
                )
        finally:
            if active_tasks:
                await asyncio.gather(*active_tasks, return_exceptions=True)
                await process_completed_tasks()

            worker_stopping.set()
            await scheduler_task

    async def _scheduler_loop(
        self,
        redis: Redis,
        worker_stopping: asyncio.Event,
    ) -> None:
        """Loop that moves due tasks from the queue to the stream."""

        stream_due_tasks: _stream_due_tasks = cast(
            _stream_due_tasks,
            redis.register_script(
                # Lua script to atomically move scheduled tasks to the stream
                # KEYS[1]: queue key (sorted set)
                # KEYS[2]: stream key
                # ARGV[1]: current timestamp
                # ARGV[2]: docket name prefix
                """
            local total_work = redis.call('ZCARD', KEYS[1])
            local due_work = 0

            if total_work > 0 then
                local tasks = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1])

                for i, key in ipairs(tasks) do
                    local hash_key = ARGV[2] .. ":" .. key
                    local task_data = redis.call('HGETALL', hash_key)

                    if #task_data > 0 then
                        local task = {}
                        for j = 1, #task_data, 2 do
                            task[task_data[j]] = task_data[j+1]
                        end

                        redis.call('XADD', KEYS[2], '*',
                            'key', task['key'],
                            'when', task['when'],
                            'function', task['function'],
                            'args', task['args'],
                            'kwargs', task['kwargs'],
                            'attempt', task['attempt']
                        )
                        redis.call('DEL', hash_key)
                        due_work = due_work + 1
                    end
                end
            end

            if due_work > 0 then
                redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, ARGV[1])
            end

            return {total_work, due_work}
            """
            ),
        )

        total_work: int = sys.maxsize

        log_context = self._log_context()

        while not worker_stopping.is_set() or total_work:
            try:
                logger.debug("Scheduling due tasks", extra=log_context)
                total_work, due_work = await stream_due_tasks(
                    keys=[self.docket.queue_key, self.docket.stream_key],
                    args=[datetime.now(timezone.utc).timestamp(), self.docket.name],
                )

                if due_work > 0:
                    logger.debug(
                        "Moved %d/%d due tasks from %s to %s",
                        due_work,
                        total_work,
                        self.docket.queue_key,
                        self.docket.stream_key,
                        extra=log_context,
                    )
            except Exception:  # pragma: no cover
                logger.exception(
                    "Error in scheduler loop",
                    exc_info=True,
                    extra=log_context,
                )
            finally:
                await asyncio.sleep(self.scheduling_resolution.total_seconds())

        logger.debug("Scheduler loop finished", extra=log_context)

    async def _schedule_all_automatic_perpetual_tasks(self) -> None:
        async with self.docket.redis() as redis:
            try:
                async with redis.lock(
                    f"{self.docket.name}:perpetual:lock", timeout=10, blocking=False
                ):
                    for task_function in self.docket.tasks.values():
                        perpetual = get_single_dependency_parameter_of_type(
                            task_function, Perpetual
                        )
                        if perpetual is None:
                            continue

                        if not perpetual.automatic:
                            continue

                        key = task_function.__name__

                        await self.docket.add(task_function, key=key)()
            except LockError:  # pragma: no cover
                return

    async def _delete_known_task(
        self, redis: Redis, execution_or_message: Execution | RedisMessage
    ) -> None:
        if isinstance(execution_or_message, Execution):
            key = execution_or_message.key
        elif bytes_key := execution_or_message.get(b"key"):
            key = bytes_key.decode()
        else:  # pragma: no cover
            return

        logger.debug("Deleting known task", extra=self._log_context())
        known_task_key = self.docket.known_task_key(key)
        stream_id_key = self.docket.stream_id_key(key)
        await redis.delete(known_task_key, stream_id_key)

    async def _execute(self, execution: Execution) -> None:
        log_context = {**self._log_context(), **execution.specific_labels()}
        counter_labels = {**self.labels(), **execution.general_labels()}

        call = execution.call_repr()

        if self.docket.strike_list.is_stricken(execution):
            async with self.docket.redis() as redis:
                await self._delete_known_task(redis, execution)

            logger.warning("🗙 %s", call, extra=log_context)
            TASKS_STRICKEN.add(1, counter_labels | {"docket.where": "worker"})
            return

        if execution.key in self._execution_counts:
            self._execution_counts[execution.key] += 1

        start = time.time()
        punctuality = start - execution.when.timestamp()
        log_context = {**log_context, "punctuality": punctuality}
        duration = 0.0

        TASKS_STARTED.add(1, counter_labels)
        if execution.redelivered:
            TASKS_REDELIVERED.add(1, counter_labels)
        TASKS_RUNNING.add(1, counter_labels)
        TASK_PUNCTUALITY.record(punctuality, counter_labels)

        arrow = "↬" if execution.attempt > 1 else "↪"
        logger.info("%s [%s] %s", arrow, ms(punctuality), call, extra=log_context)

        dependencies: dict[str, Dependency] = {}

        with tracer.start_as_current_span(
            execution.function.__name__,
            kind=trace.SpanKind.CONSUMER,
            attributes={
                **self.labels(),
                **execution.specific_labels(),
                "code.function.name": execution.function.__name__,
            },
            links=execution.incoming_span_links(),
        ) as span:
            try:
                async with resolved_dependencies(self, execution) as dependencies:
                    # Check concurrency limits after dependency resolution
                    concurrency_limit = get_single_dependency_of_type(
                        dependencies, ConcurrencyLimit
                    )
                    if concurrency_limit and not concurrency_limit.is_bypassed:
                        async with self.docket.redis() as redis:
                            # Check if we can acquire a concurrency slot
                            if not await self._can_start_task(redis, execution):
                                # Task cannot start due to concurrency limits - reschedule
                                logger.debug(
                                    "🔒 Task %s blocked by concurrency limit, rescheduling",
                                    execution.key,
                                    extra=log_context,
                                )
                                # Reschedule for a few milliseconds in the future
                                when = datetime.now(timezone.utc) + timedelta(
                                    milliseconds=50
                                )
                                await self.docket.add(execution.function, when=when)(
                                    *execution.args, **execution.kwargs
                                )
                                return
                            else:
                                # Successfully acquired slot
                                pass

                    # Preemptively reschedule the perpetual task for the future, or clear
                    # the known task key for this task
                    rescheduled = await self._perpetuate_if_requested(
                        execution, dependencies
                    )
                    if not rescheduled:
                        async with self.docket.redis() as redis:
                            await self._delete_known_task(redis, execution)

                    dependency_failures = {
                        k: v
                        for k, v in dependencies.items()
                        if isinstance(v, FailedDependency)
                    }
                    if dependency_failures:
                        raise ExceptionGroup(
                            (
                                "Failed to resolve dependencies for parameter(s): "
                                + ", ".join(dependency_failures.keys())
                            ),
                            [
                                dependency.error
                                for dependency in dependency_failures.values()
                            ],
                        )

                    # Apply timeout logic - either user's timeout or redelivery timeout
                    user_timeout = get_single_dependency_of_type(dependencies, Timeout)
                    if user_timeout:
                        # If user timeout is longer than redelivery timeout, limit it
                        if user_timeout.base > self.redelivery_timeout:
                            # Create a new timeout limited by redelivery timeout
                            # Remove the user timeout from dependencies to avoid conflicts
                            limited_dependencies = {
                                k: v
                                for k, v in dependencies.items()
                                if not isinstance(v, Timeout)
                            }
                            limited_timeout = Timeout(self.redelivery_timeout)
                            limited_timeout.start()
                            await self._run_function_with_timeout(
                                execution, limited_dependencies, limited_timeout
                            )
                        else:
                            # User timeout is within redelivery timeout, use as-is
                            await self._run_function_with_timeout(
                                execution, dependencies, user_timeout
                            )
                    else:
                        # No user timeout - apply redelivery timeout as hard limit
                        redelivery_timeout = Timeout(self.redelivery_timeout)
                        redelivery_timeout.start()
                        await self._run_function_with_timeout(
                            execution, dependencies, redelivery_timeout
                        )

                    duration = log_context["duration"] = time.time() - start
                    TASKS_SUCCEEDED.add(1, counter_labels)

                    span.set_status(Status(StatusCode.OK))

                    rescheduled = await self._perpetuate_if_requested(
                        execution, dependencies, timedelta(seconds=duration)
                    )

                    arrow = "↫" if rescheduled else "↩"
                    logger.info(
                        "%s [%s] %s", arrow, ms(duration), call, extra=log_context
                    )
            except Exception as e:
                duration = log_context["duration"] = time.time() - start
                TASKS_FAILED.add(1, counter_labels)

                span.record_exception(e)
                span.set_status(Status(StatusCode.ERROR, str(e)))

                retried = await self._retry_if_requested(execution, dependencies)
                if not retried:
                    retried = await self._perpetuate_if_requested(
                        execution, dependencies, timedelta(seconds=duration)
                    )

                arrow = "↫" if retried else "↩"
                logger.exception(
                    "%s [%s] %s", arrow, ms(duration), call, extra=log_context
                )
            finally:
                # Release concurrency slot if we acquired one
                if dependencies:
                    concurrency_limit = get_single_dependency_of_type(
                        dependencies, ConcurrencyLimit
                    )
                    if concurrency_limit and not concurrency_limit.is_bypassed:
                        async with self.docket.redis() as redis:
                            await self._release_concurrency_slot(redis, execution)

                TASKS_RUNNING.add(-1, counter_labels)
                TASKS_COMPLETED.add(1, counter_labels)
                TASK_DURATION.record(duration, counter_labels)

    async def _run_function_with_timeout(
        self,
        execution: Execution,
        dependencies: dict[str, Dependency],
        timeout: Timeout,
    ) -> None:
        task_coro = cast(
            Coroutine[None, None, None],
            execution.function(
                *execution.args,
                **{
                    **execution.kwargs,
                    **dependencies,
                },
            ),
        )
        task = asyncio.create_task(task_coro)
        try:
            while not task.done():  # pragma: no branch
                remaining = timeout.remaining().total_seconds()
                if timeout.expired():
                    task.cancel()
                    break

                try:
                    await asyncio.wait_for(asyncio.shield(task), timeout=remaining)
                    return
                except asyncio.TimeoutError:
                    continue
        finally:
            if not task.done():
                task.cancel()

            try:
                await task
            except asyncio.CancelledError:
                raise asyncio.TimeoutError

    async def _retry_if_requested(
        self,
        execution: Execution,
        dependencies: dict[str, Dependency],
    ) -> bool:
        retry = get_single_dependency_of_type(dependencies, Retry)
        if not retry:
            return False

        if retry.attempts is not None and execution.attempt >= retry.attempts:
            return False

        execution.when = datetime.now(timezone.utc) + retry.delay
        execution.attempt += 1
        await self.docket.schedule(execution)

        TASKS_RETRIED.add(1, {**self.labels(), **execution.specific_labels()})
        return True

    async def _perpetuate_if_requested(
        self,
        execution: Execution,
        dependencies: dict[str, Dependency],
        duration: timedelta | None = None,
    ) -> bool:
        perpetual = get_single_dependency_of_type(dependencies, Perpetual)
        if not perpetual:
            return False

        if perpetual.cancelled:
            await self.docket.cancel(execution.key)
            return False

        now = datetime.now(timezone.utc)
        when = max(now, now + perpetual.every - (duration or timedelta(0)))

        await self.docket.replace(execution.function, when, execution.key)(
            *perpetual.args,
            **perpetual.kwargs,
        )

        if duration is not None:
            TASKS_PERPETUATED.add(1, {**self.labels(), **execution.specific_labels()})

        return True

    def _startup_log(self) -> None:
        logger.info("Starting worker %r with the following tasks:", self.name)
        for task_name, task in self.docket.tasks.items():
            logger.info("* %s(%s)", task_name, compact_signature(get_signature(task)))

    @property
    def workers_set(self) -> str:
        return self.docket.workers_set

    def worker_tasks_set(self, worker_name: str) -> str:
        return self.docket.worker_tasks_set(worker_name)

    def task_workers_set(self, task_name: str) -> str:
        return self.docket.task_workers_set(task_name)

    async def _heartbeat(self) -> None:
        while True:
            await asyncio.sleep(self.docket.heartbeat_interval.total_seconds())
            try:
                now = datetime.now(timezone.utc).timestamp()
                maximum_age = (
                    self.docket.heartbeat_interval * self.docket.missed_heartbeats
                )
                oldest = now - maximum_age.total_seconds()

                task_names = list(self.docket.tasks)

                async with self.docket.redis() as r:
                    async with r.pipeline() as pipeline:
                        pipeline.zremrangebyscore(self.workers_set, 0, oldest)
                        pipeline.zadd(self.workers_set, {self.name: now})

                        for task_name in task_names:
                            task_workers_set = self.task_workers_set(task_name)
                            pipeline.zremrangebyscore(task_workers_set, 0, oldest)
                            pipeline.zadd(task_workers_set, {self.name: now})

                        pipeline.sadd(self.worker_tasks_set(self.name), *task_names)
                        pipeline.expire(
                            self.worker_tasks_set(self.name),
                            max(maximum_age, timedelta(seconds=1)),
                        )

                        await pipeline.execute()

                    async with r.pipeline() as pipeline:
                        pipeline.xlen(self.docket.stream_key)
                        pipeline.zcount(self.docket.queue_key, 0, now)
                        pipeline.zcount(self.docket.queue_key, now, "+inf")

                        results: list[int] = await pipeline.execute()
                        stream_depth = results[0]
                        overdue_depth = results[1]
                        schedule_depth = results[2]

                        QUEUE_DEPTH.set(
                            stream_depth + overdue_depth, self.docket.labels()
                        )
                        SCHEDULE_DEPTH.set(schedule_depth, self.docket.labels())

            except asyncio.CancelledError:  # pragma: no cover
                return
            except ConnectionError:
                REDIS_DISRUPTIONS.add(1, self.labels())
                logger.exception(
                    "Error sending worker heartbeat",
                    exc_info=True,
                    extra=self._log_context(),
                )
            except Exception:
                logger.exception(
                    "Error sending worker heartbeat",
                    exc_info=True,
                    extra=self._log_context(),
                )

    async def _can_start_task(self, redis: Redis, execution: Execution) -> bool:
        """Check if a task can start based on concurrency limits."""
        # Check if task has a concurrency limit dependency
        concurrency_limit = get_single_dependency_parameter_of_type(
            execution.function, ConcurrencyLimit
        )

        if not concurrency_limit:
            return True  # No concurrency limit, can always start

        # Get the concurrency key for this task
        try:
            argument_value = execution.get_argument(concurrency_limit.argument_name)
        except KeyError:
            # If argument not found, let the task fail naturally in execution
            return True

        scope = concurrency_limit.scope or self.docket.name
        concurrency_key = (
            f"{scope}:concurrency:{concurrency_limit.argument_name}:{argument_value}"
        )

        # Use Redis sorted set with timestamps to track concurrency and handle expiration
        lua_script = """
        local key = KEYS[1]
        local max_concurrent = tonumber(ARGV[1])
        local worker_id = ARGV[2]
        local task_key = ARGV[3]
        local current_time = tonumber(ARGV[4])
        local expiration_time = tonumber(ARGV[5])

        -- Remove expired entries
        local expired_cutoff = current_time - expiration_time
        redis.call('ZREMRANGEBYSCORE', key, 0, expired_cutoff)

        -- Get current count
        local current = redis.call('ZCARD', key)

        if current < max_concurrent then
            -- Add this worker's task to the sorted set with current timestamp
            redis.call('ZADD', key, current_time, worker_id .. ':' .. task_key)
            return 1
        else
            return 0
        end
        """

        current_time = datetime.now(timezone.utc).timestamp()
        expiration_seconds = self.redelivery_timeout.total_seconds()

        result = await redis.eval(  # type: ignore
            lua_script,
            1,
            concurrency_key,
            str(concurrency_limit.max_concurrent),
            self.name,
            execution.key,
            current_time,
            expiration_seconds,
        )

        return bool(result)

    async def _release_concurrency_slot(
        self, redis: Redis, execution: Execution
    ) -> None:
        """Release a concurrency slot when task completes."""
        # Check if task has a concurrency limit dependency
        concurrency_limit = get_single_dependency_parameter_of_type(
            execution.function, ConcurrencyLimit
        )

        if not concurrency_limit:
            return  # No concurrency limit to release

        # Get the concurrency key for this task
        try:
            argument_value = execution.get_argument(concurrency_limit.argument_name)
        except KeyError:
            return  # If argument not found, nothing to release

        scope = concurrency_limit.scope or self.docket.name
        concurrency_key = (
            f"{scope}:concurrency:{concurrency_limit.argument_name}:{argument_value}"
        )

        # Remove this worker's task from the sorted set
        await redis.zrem(concurrency_key, f"{self.name}:{execution.key}")  # type: ignore

run_at_most(iterations_by_key) async

Run the worker until there are no more tasks to process, but limit specified task keys to a maximum number of iterations.

This is particularly useful for testing self-perpetuating tasks that would otherwise run indefinitely.

Parameters:

Name Type Description Default
iterations_by_key Mapping[str, int]

Maps task keys to their maximum allowed executions

required
Source code in src/docket/worker.py
async def run_at_most(self, iterations_by_key: Mapping[str, int]) -> None:
    """
    Run the worker until there are no more tasks to process, but limit specified
    task keys to a maximum number of iterations.

    This is particularly useful for testing self-perpetuating tasks that would
    otherwise run indefinitely.

    Args:
        iterations_by_key: Maps task keys to their maximum allowed executions
    """
    self._execution_counts = {key: 0 for key in iterations_by_key}

    def has_reached_max_iterations(execution: Execution) -> bool:
        key = execution.key

        if key not in iterations_by_key:
            return False

        if self._execution_counts[key] >= iterations_by_key[key]:
            return True

        return False

    self.docket.strike_list.add_condition(has_reached_max_iterations)
    try:
        await self.run_until_finished()
    finally:
        self.docket.strike_list.remove_condition(has_reached_max_iterations)
        self._execution_counts = {}

run_forever() async

Run the worker indefinitely.

Source code in src/docket/worker.py
async def run_forever(self) -> None:
    """Run the worker indefinitely."""
    return await self._run(forever=True)  # pragma: no cover

run_until_finished() async

Run the worker until there are no more tasks to process.

Source code in src/docket/worker.py
async def run_until_finished(self) -> None:
    """Run the worker until there are no more tasks to process."""
    return await self._run(forever=False)

CurrentDocket()

A dependency to access the current Docket.

Example:

@task
async def my_task(docket: Docket = CurrentDocket()) -> None:
    assert isinstance(docket, Docket)
Source code in src/docket/dependencies.py
def CurrentDocket() -> Docket:
    """A dependency to access the current Docket.

    Example:

    ```python
    @task
    async def my_task(docket: Docket = CurrentDocket()) -> None:
        assert isinstance(docket, Docket)
    ```
    """
    return cast(Docket, _CurrentDocket())

CurrentExecution()

A dependency to access the current Execution.

Example:

@task
async def my_task(execution: Execution = CurrentExecution()) -> None:
    assert isinstance(execution, Execution)
Source code in src/docket/dependencies.py
def CurrentExecution() -> Execution:
    """A dependency to access the current Execution.

    Example:

    ```python
    @task
    async def my_task(execution: Execution = CurrentExecution()) -> None:
        assert isinstance(execution, Execution)
    ```
    """
    return cast(Execution, _CurrentExecution())

CurrentWorker()

A dependency to access the current Worker.

Example:

@task
async def my_task(worker: Worker = CurrentWorker()) -> None:
    assert isinstance(worker, Worker)
Source code in src/docket/dependencies.py
def CurrentWorker() -> "Worker":
    """A dependency to access the current Worker.

    Example:

    ```python
    @task
    async def my_task(worker: Worker = CurrentWorker()) -> None:
        assert isinstance(worker, Worker)
    ```
    """
    return cast("Worker", _CurrentWorker())

Depends(dependency)

Include a user-defined function as a dependency. Dependencies may either return a value or an async context manager. If it returns a context manager, the dependency will be entered and exited around the task, giving an opportunity to control the lifetime of a resource, like a database connection.

Example:

async def my_dependency() -> str:
    return "Hello, world!"

@task async def my_task(dependency: str = Depends(my_dependency)) -> None:
    print(dependency)
Source code in src/docket/dependencies.py
def Depends(dependency: DependencyFunction[R]) -> R:
    """Include a user-defined function as a dependency.  Dependencies may either return
    a value or an async context manager.  If it returns a context manager, the
    dependency will be entered and exited around the task, giving an opportunity to
    control the lifetime of a resource, like a database connection.

    Example:

    ```python

    async def my_dependency() -> str:
        return "Hello, world!"

    @task async def my_task(dependency: str = Depends(my_dependency)) -> None:
        print(dependency)

    ```
    """
    return cast(R, _Depends(dependency))

TaskArgument(parameter=None, optional=False)

A dependency to access a argument of the currently executing task. This is often useful in dependency functions so they can access the arguments of the task they are injected into.

Example:

async def customer_name(customer_id: int = TaskArgument()) -> str:
    ...look up the customer's name by ID...
    return "John Doe"

@task
async def greet_customer(customer_id: int, name: str = Depends(customer_name)) -> None:
    print(f"Hello, {name}!")
Source code in src/docket/dependencies.py
def TaskArgument(parameter: str | None = None, optional: bool = False) -> Any:
    """A dependency to access a argument of the currently executing task.  This is
    often useful in dependency functions so they can access the arguments of the
    task they are injected into.

    Example:

    ```python
    async def customer_name(customer_id: int = TaskArgument()) -> str:
        ...look up the customer's name by ID...
        return "John Doe"

    @task
    async def greet_customer(customer_id: int, name: str = Depends(customer_name)) -> None:
        print(f"Hello, {name}!")
    ```
    """
    return cast(Any, _TaskArgument(parameter, optional))

TaskKey()

A dependency to access the key of the currently executing task.

Example:

@task
async def my_task(key: str = TaskKey()) -> None:
    assert isinstance(key, str)
Source code in src/docket/dependencies.py
def TaskKey() -> str:
    """A dependency to access the key of the currently executing task.

    Example:

    ```python
    @task
    async def my_task(key: str = TaskKey()) -> None:
        assert isinstance(key, str)
    ```
    """
    return cast(str, _TaskKey())

TaskLogger()

A dependency to access a logger for the currently executing task. The logger will automatically inject contextual information such as the worker and docket name, the task key, and the current execution attempt number.

Example:

@task
async def my_task(logger: LoggerAdapter[Logger] = TaskLogger()) -> None:
    logger.info("Hello, world!")
Source code in src/docket/dependencies.py
def TaskLogger() -> logging.LoggerAdapter[logging.Logger]:
    """A dependency to access a logger for the currently executing task.  The logger
    will automatically inject contextual information such as the worker and docket
    name, the task key, and the current execution attempt number.

    Example:

    ```python
    @task
    async def my_task(logger: LoggerAdapter[Logger] = TaskLogger()) -> None:
        logger.info("Hello, world!")
    ```
    """
    return cast(logging.LoggerAdapter[logging.Logger], _TaskLogger())