Skip to content

API Reference

docket

docket - A distributed background task system for Python functions.

docket focuses on scheduling future work as seamlessly and efficiently as immediate work.

Agenda

A collection of tasks to be scheduled together on a Docket.

The Agenda allows you to build up a collection of tasks with their arguments, then schedule them all at once using various timing strategies like scattering.

Example

agenda = Agenda() agenda.add(process_item)(item1) agenda.add(process_item)(item2) agenda.add(send_email)(email) await agenda.scatter(docket, over=timedelta(minutes=50))

Source code in src/docket/agenda.py
class Agenda:
    """A collection of tasks to be scheduled together on a Docket.

    The Agenda allows you to build up a collection of tasks with their arguments,
    then schedule them all at once using various timing strategies like scattering.

    Example:
        >>> agenda = Agenda()
        >>> agenda.add(process_item)(item1)
        >>> agenda.add(process_item)(item2)
        >>> agenda.add(send_email)(email)
        >>> await agenda.scatter(docket, over=timedelta(minutes=50))
    """

    def __init__(self) -> None:
        """Initialize an empty Agenda."""
        self._tasks: list[
            tuple[TaskFunction | str, tuple[Any, ...], dict[str, Any]]
        ] = []

    def __len__(self) -> int:
        """Return the number of tasks in the agenda."""
        return len(self._tasks)

    def __iter__(
        self,
    ) -> Iterator[tuple[TaskFunction | str, tuple[Any, ...], dict[str, Any]]]:
        """Iterate over tasks in the agenda."""
        return iter(self._tasks)

    @overload
    def add(
        self,
        function: Callable[P, Awaitable[R]],
    ) -> Callable[P, None]:
        """Add a task function to the agenda.

        Args:
            function: The task function to add.

        Returns:
            A callable that accepts the task arguments.
        """

    @overload
    def add(
        self,
        function: str,
    ) -> Callable[..., None]:
        """Add a task by name to the agenda.

        Args:
            function: The name of a registered task.

        Returns:
            A callable that accepts the task arguments.
        """

    def add(
        self,
        function: Callable[P, Awaitable[R]] | str,
    ) -> Callable[..., None]:
        """Add a task to the agenda.

        Args:
            function: The task function or name to add.

        Returns:
            A callable that accepts the task arguments and adds them to the agenda.
        """

        def scheduler(*args: Any, **kwargs: Any) -> None:
            self._tasks.append((function, args, kwargs))

        return scheduler

    def clear(self) -> None:
        """Clear all tasks from the agenda."""
        self._tasks.clear()

    async def scatter(
        self,
        docket: Docket,
        over: timedelta,
        start: datetime | None = None,
        jitter: timedelta | None = None,
    ) -> list[Execution]:
        """Scatter the tasks in this agenda over a time period.

        Tasks are distributed evenly across the specified time window,
        optionally with random jitter to prevent thundering herd effects.

        If an error occurs during scheduling, some tasks may have already been
        scheduled successfully before the failure occurred.

        Args:
            docket: The Docket to schedule tasks on.
            over: Time period to scatter tasks over (required).
            start: When to start scattering from. Defaults to now.
            jitter: Maximum random offset to add/subtract from each scheduled time.

        Returns:
            List of Execution objects for the scheduled tasks.

        Raises:
            KeyError: If any task name is not registered with the docket.
            ValueError: If any task is stricken or 'over' is not positive.
        """
        if over.total_seconds() <= 0:
            raise ValueError("'over' parameter must be a positive duration")

        if not self._tasks:
            return []

        if start is None:
            start = datetime.now(timezone.utc)

        # Calculate even distribution over the time period
        task_count = len(self._tasks)

        if task_count == 1:
            # Single task goes in the middle of the window
            schedule_times = [start + over / 2]
        else:
            # Distribute tasks evenly across the window
            # For n tasks, we want n points from start to start+over inclusive
            interval = over / (task_count - 1)
            schedule_times = [start + interval * i for i in range(task_count)]

        # Apply jitter if specified
        if jitter:
            jittered_times: list[datetime] = []
            for schedule_time in schedule_times:
                # Random offset between -jitter and +jitter
                offset = timedelta(
                    seconds=random.uniform(
                        -jitter.total_seconds(), jitter.total_seconds()
                    )
                )
                # Ensure the jittered time doesn't go before start
                jittered_time = max(schedule_time + offset, start)
                jittered_times.append(jittered_time)
            schedule_times = jittered_times

        # Build all Execution objects first, validating as we go
        executions: list[Execution] = []
        for (task_func, args, kwargs), schedule_time in zip(
            self._tasks, schedule_times
        ):
            # Resolve task function if given by name
            if isinstance(task_func, str):
                if task_func not in docket.tasks:
                    raise KeyError(f"Task '{task_func}' is not registered")
                resolved_func = docket.tasks[task_func]
            else:
                # Ensure task is registered
                if task_func not in docket.tasks.values():
                    docket.register(task_func)
                resolved_func = task_func

            # Create execution with unique key
            key = str(uuid7())
            execution = Execution(
                docket=docket,
                function=resolved_func,
                args=args,
                kwargs=kwargs,
                key=key,
                when=schedule_time,
                attempt=1,
            )
            executions.append(execution)

        # Schedule all tasks - if any fail, some tasks may have been scheduled
        for execution in executions:
            scheduler = docket.add(
                execution.function, when=execution.when, key=execution.key
            )
            # Actually schedule the task - if this fails, earlier tasks remain scheduled
            await scheduler(*execution.args, **execution.kwargs)

        return executions

__init__()

Initialize an empty Agenda.

Source code in src/docket/agenda.py
def __init__(self) -> None:
    """Initialize an empty Agenda."""
    self._tasks: list[
        tuple[TaskFunction | str, tuple[Any, ...], dict[str, Any]]
    ] = []

__iter__()

Iterate over tasks in the agenda.

Source code in src/docket/agenda.py
def __iter__(
    self,
) -> Iterator[tuple[TaskFunction | str, tuple[Any, ...], dict[str, Any]]]:
    """Iterate over tasks in the agenda."""
    return iter(self._tasks)

__len__()

Return the number of tasks in the agenda.

Source code in src/docket/agenda.py
def __len__(self) -> int:
    """Return the number of tasks in the agenda."""
    return len(self._tasks)

add(function)

add(
    function: Callable[P, Awaitable[R]],
) -> Callable[P, None]
add(function: str) -> Callable[..., None]

Add a task to the agenda.

Parameters:

Name Type Description Default
function Callable[P, Awaitable[R]] | str

The task function or name to add.

required

Returns:

Type Description
Callable[..., None]

A callable that accepts the task arguments and adds them to the agenda.

Source code in src/docket/agenda.py
def add(
    self,
    function: Callable[P, Awaitable[R]] | str,
) -> Callable[..., None]:
    """Add a task to the agenda.

    Args:
        function: The task function or name to add.

    Returns:
        A callable that accepts the task arguments and adds them to the agenda.
    """

    def scheduler(*args: Any, **kwargs: Any) -> None:
        self._tasks.append((function, args, kwargs))

    return scheduler

clear()

Clear all tasks from the agenda.

Source code in src/docket/agenda.py
def clear(self) -> None:
    """Clear all tasks from the agenda."""
    self._tasks.clear()

scatter(docket, over, start=None, jitter=None) async

Scatter the tasks in this agenda over a time period.

Tasks are distributed evenly across the specified time window, optionally with random jitter to prevent thundering herd effects.

If an error occurs during scheduling, some tasks may have already been scheduled successfully before the failure occurred.

Parameters:

Name Type Description Default
docket Docket

The Docket to schedule tasks on.

required
over timedelta

Time period to scatter tasks over (required).

required
start datetime | None

When to start scattering from. Defaults to now.

None
jitter timedelta | None

Maximum random offset to add/subtract from each scheduled time.

None

Returns:

Type Description
list[Execution]

List of Execution objects for the scheduled tasks.

Raises:

Type Description
KeyError

If any task name is not registered with the docket.

ValueError

If any task is stricken or 'over' is not positive.

Source code in src/docket/agenda.py
async def scatter(
    self,
    docket: Docket,
    over: timedelta,
    start: datetime | None = None,
    jitter: timedelta | None = None,
) -> list[Execution]:
    """Scatter the tasks in this agenda over a time period.

    Tasks are distributed evenly across the specified time window,
    optionally with random jitter to prevent thundering herd effects.

    If an error occurs during scheduling, some tasks may have already been
    scheduled successfully before the failure occurred.

    Args:
        docket: The Docket to schedule tasks on.
        over: Time period to scatter tasks over (required).
        start: When to start scattering from. Defaults to now.
        jitter: Maximum random offset to add/subtract from each scheduled time.

    Returns:
        List of Execution objects for the scheduled tasks.

    Raises:
        KeyError: If any task name is not registered with the docket.
        ValueError: If any task is stricken or 'over' is not positive.
    """
    if over.total_seconds() <= 0:
        raise ValueError("'over' parameter must be a positive duration")

    if not self._tasks:
        return []

    if start is None:
        start = datetime.now(timezone.utc)

    # Calculate even distribution over the time period
    task_count = len(self._tasks)

    if task_count == 1:
        # Single task goes in the middle of the window
        schedule_times = [start + over / 2]
    else:
        # Distribute tasks evenly across the window
        # For n tasks, we want n points from start to start+over inclusive
        interval = over / (task_count - 1)
        schedule_times = [start + interval * i for i in range(task_count)]

    # Apply jitter if specified
    if jitter:
        jittered_times: list[datetime] = []
        for schedule_time in schedule_times:
            # Random offset between -jitter and +jitter
            offset = timedelta(
                seconds=random.uniform(
                    -jitter.total_seconds(), jitter.total_seconds()
                )
            )
            # Ensure the jittered time doesn't go before start
            jittered_time = max(schedule_time + offset, start)
            jittered_times.append(jittered_time)
        schedule_times = jittered_times

    # Build all Execution objects first, validating as we go
    executions: list[Execution] = []
    for (task_func, args, kwargs), schedule_time in zip(
        self._tasks, schedule_times
    ):
        # Resolve task function if given by name
        if isinstance(task_func, str):
            if task_func not in docket.tasks:
                raise KeyError(f"Task '{task_func}' is not registered")
            resolved_func = docket.tasks[task_func]
        else:
            # Ensure task is registered
            if task_func not in docket.tasks.values():
                docket.register(task_func)
            resolved_func = task_func

        # Create execution with unique key
        key = str(uuid7())
        execution = Execution(
            docket=docket,
            function=resolved_func,
            args=args,
            kwargs=kwargs,
            key=key,
            when=schedule_time,
            attempt=1,
        )
        executions.append(execution)

    # Schedule all tasks - if any fail, some tasks may have been scheduled
    for execution in executions:
        scheduler = docket.add(
            execution.function, when=execution.when, key=execution.key
        )
        # Actually schedule the task - if this fails, earlier tasks remain scheduled
        await scheduler(*execution.args, **execution.kwargs)

    return executions

ConcurrencyLimit

Bases: Dependency

Configures concurrency limits for a task based on specific argument values.

This allows fine-grained control over task execution by limiting concurrent tasks based on the value of specific arguments.

Example:

async def process_customer(
    customer_id: int,
    concurrency: ConcurrencyLimit = ConcurrencyLimit("customer_id", max_concurrent=1)
) -> None:
    # Only one task per customer_id will run at a time
    ...

async def backup_db(
    db_name: str,
    concurrency: ConcurrencyLimit = ConcurrencyLimit("db_name", max_concurrent=3)
) -> None:
    # Only 3 backup tasks per database name will run at a time
    ...
Source code in src/docket/dependencies.py
class ConcurrencyLimit(Dependency):
    """Configures concurrency limits for a task based on specific argument values.

    This allows fine-grained control over task execution by limiting concurrent
    tasks based on the value of specific arguments.

    Example:

    ```python
    async def process_customer(
        customer_id: int,
        concurrency: ConcurrencyLimit = ConcurrencyLimit("customer_id", max_concurrent=1)
    ) -> None:
        # Only one task per customer_id will run at a time
        ...

    async def backup_db(
        db_name: str,
        concurrency: ConcurrencyLimit = ConcurrencyLimit("db_name", max_concurrent=3)
    ) -> None:
        # Only 3 backup tasks per database name will run at a time
        ...
    ```
    """

    single: bool = True

    def __init__(
        self, argument_name: str, max_concurrent: int = 1, scope: str | None = None
    ) -> None:
        """
        Args:
            argument_name: The name of the task argument to use for concurrency grouping
            max_concurrent: Maximum number of concurrent tasks per unique argument value
            scope: Optional scope prefix for Redis keys (defaults to docket name)
        """
        self.argument_name = argument_name
        self.max_concurrent = max_concurrent
        self.scope = scope
        self._concurrency_key: str | None = None
        self._initialized: bool = False

    async def __aenter__(self) -> "ConcurrencyLimit":
        execution = self.execution.get()
        docket = self.docket.get()

        # Get the argument value to group by
        try:
            argument_value = execution.get_argument(self.argument_name)
        except KeyError:
            # If argument not found, create a bypass limit that doesn't apply concurrency control
            limit = ConcurrencyLimit(
                self.argument_name, self.max_concurrent, self.scope
            )
            limit._concurrency_key = None  # Special marker for bypassed concurrency
            limit._initialized = True  # Mark as initialized but bypassed
            return limit

        # Create a concurrency key for this specific argument value
        scope = self.scope or docket.name
        self._concurrency_key = (
            f"{scope}:concurrency:{self.argument_name}:{argument_value}"
        )

        limit = ConcurrencyLimit(self.argument_name, self.max_concurrent, self.scope)
        limit._concurrency_key = self._concurrency_key
        limit._initialized = True  # Mark as initialized
        return limit

    @property
    def concurrency_key(self) -> str | None:
        """Redis key used for tracking concurrency for this specific argument value.
        Returns None when concurrency control is bypassed due to missing arguments.
        Raises RuntimeError if accessed before initialization."""
        if not self._initialized:
            raise RuntimeError(
                "ConcurrencyLimit not initialized - use within task context"
            )
        return self._concurrency_key

    @property
    def is_bypassed(self) -> bool:
        """Returns True if concurrency control is bypassed due to missing arguments."""
        return self._initialized and self._concurrency_key is None

concurrency_key property

Redis key used for tracking concurrency for this specific argument value. Returns None when concurrency control is bypassed due to missing arguments. Raises RuntimeError if accessed before initialization.

is_bypassed property

Returns True if concurrency control is bypassed due to missing arguments.

__init__(argument_name, max_concurrent=1, scope=None)

Parameters:

Name Type Description Default
argument_name str

The name of the task argument to use for concurrency grouping

required
max_concurrent int

Maximum number of concurrent tasks per unique argument value

1
scope str | None

Optional scope prefix for Redis keys (defaults to docket name)

None
Source code in src/docket/dependencies.py
def __init__(
    self, argument_name: str, max_concurrent: int = 1, scope: str | None = None
) -> None:
    """
    Args:
        argument_name: The name of the task argument to use for concurrency grouping
        max_concurrent: Maximum number of concurrent tasks per unique argument value
        scope: Optional scope prefix for Redis keys (defaults to docket name)
    """
    self.argument_name = argument_name
    self.max_concurrent = max_concurrent
    self.scope = scope
    self._concurrency_key: str | None = None
    self._initialized: bool = False

Docket

A Docket represents a collection of tasks that may be scheduled for later execution. With a Docket, you can add, replace, and cancel tasks. Example:

@task
async def my_task(greeting: str, recipient: str) -> None:
    print(f"{greeting}, {recipient}!")

async with Docket() as docket:
    docket.add(my_task)("Hello", recipient="world")
Source code in src/docket/docket.py
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
class Docket:
    """A Docket represents a collection of tasks that may be scheduled for later
    execution.  With a Docket, you can add, replace, and cancel tasks.
    Example:

    ```python
    @task
    async def my_task(greeting: str, recipient: str) -> None:
        print(f"{greeting}, {recipient}!")

    async with Docket() as docket:
        docket.add(my_task)("Hello", recipient="world")
    ```
    """

    tasks: dict[str, TaskFunction]
    strike_list: StrikeList

    _connection_pool: ConnectionPool
    _cancel_task_script: _cancel_task | None

    def __init__(
        self,
        name: str = "docket",
        url: str = "redis://localhost:6379/0",
        heartbeat_interval: timedelta = timedelta(seconds=2),
        missed_heartbeats: int = 5,
        execution_ttl: timedelta = timedelta(minutes=15),
        result_storage: AsyncKeyValue | None = None,
        enable_internal_instrumentation: bool = False,
    ) -> None:
        """
        Args:
            name: The name of the docket.
            url: The URL of the Redis server or in-memory backend.  For example:
                - "redis://localhost:6379/0"
                - "redis://user:password@localhost:6379/0"
                - "redis://user:password@localhost:6379/0?ssl=true"
                - "rediss://localhost:6379/0"
                - "unix:///path/to/redis.sock"
                - "memory://" (in-memory backend for testing)
            heartbeat_interval: How often workers send heartbeat messages to the docket.
            missed_heartbeats: How many heartbeats a worker can miss before it is
                considered dead.
            execution_ttl: How long to keep completed or failed execution state records
                in Redis before they expire. Defaults to 15 minutes.
            enable_internal_instrumentation: Whether to enable OpenTelemetry spans
                for internal Redis polling operations like strike stream monitoring.
                Defaults to False.
        """
        self.name = name
        self.url = url
        self.heartbeat_interval = heartbeat_interval
        self.missed_heartbeats = missed_heartbeats
        self.execution_ttl = execution_ttl
        self.enable_internal_instrumentation = enable_internal_instrumentation
        self._cancel_task_script = None

        self.result_storage: AsyncKeyValue
        if url.startswith("memory://"):
            self.result_storage = MemoryStore()
        else:
            self.result_storage = RedisStore(
                url=url, default_collection=f"{name}:results"
            )

        from .tasks import standard_tasks

        self.tasks: dict[str, TaskFunction] = {fn.__name__: fn for fn in standard_tasks}

    @property
    def worker_group_name(self) -> str:
        return "docket-workers"

    async def __aenter__(self) -> Self:
        self.strike_list = StrikeList(
            url=self.url,
            name=self.name,
            enable_internal_instrumentation=self.enable_internal_instrumentation,
        )

        self._connection_pool = await connection_pool_from_url(self.url)

        # Connect the strike list to Redis and start monitoring
        await self.strike_list.connect()

        if isinstance(self.result_storage, BaseContextManagerStore):
            await self.result_storage.__aenter__()
        else:
            await self.result_storage.setup()
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        if isinstance(self.result_storage, BaseContextManagerStore):
            await self.result_storage.__aexit__(exc_type, exc_value, traceback)

        # Close the strike list (stops monitoring and disconnects)
        await self.strike_list.close()
        del self.strike_list

        await asyncio.shield(self._connection_pool.disconnect())
        del self._connection_pool

    @asynccontextmanager
    async def redis(self) -> AsyncGenerator[Redis, None]:
        r = Redis(connection_pool=self._connection_pool)
        await r.__aenter__()
        try:
            yield r
        finally:
            await asyncio.shield(r.__aexit__(None, None, None))

    def register(self, function: TaskFunction, names: list[str] | None = None) -> None:
        """Register a task with the Docket.

        Args:
            function: The task to register.
            names: Names to register the task under. Defaults to [function.__name__].
        """
        from .dependencies import validate_dependencies

        validate_dependencies(function)

        if not names:
            names = [function.__name__]

        for name in names:
            self.tasks[name] = function

    def register_collection(self, collection_path: str) -> None:
        """
        Register a collection of tasks.

        Args:
            collection_path: A path in the format "module:collection".
        """
        module_name, _, member_name = collection_path.rpartition(":")
        module = importlib.import_module(module_name)
        collection = getattr(module, member_name)
        for function in collection:
            self.register(function)

    def labels(self) -> Mapping[str, str]:
        return {
            "docket.name": self.name,
        }

    @overload
    def add(
        self,
        function: Callable[P, Awaitable[R]],
        when: datetime | None = None,
        key: str | None = None,
    ) -> Callable[P, Awaitable[Execution]]:
        """Add a task to the Docket.

        Args:
            function: The task function to add.
            when: The time to schedule the task.
            key: The key to schedule the task under.
        """

    @overload
    def add(
        self,
        function: str,
        when: datetime | None = None,
        key: str | None = None,
    ) -> Callable[..., Awaitable[Execution]]:
        """Add a task to the Docket.

        Args:
            function: The name of a task to add.
            when: The time to schedule the task.
            key: The key to schedule the task under.
        """

    def add(
        self,
        function: Callable[P, Awaitable[R]] | str,
        when: datetime | None = None,
        key: str | None = None,
    ) -> Callable[..., Awaitable[Execution]]:
        """Add a task to the Docket.

        Args:
            function: The task to add.
            when: The time to schedule the task.
            key: The key to schedule the task under.
        """
        function_name: str | None = None
        if isinstance(function, str):
            function_name = function
            function = self.tasks[function]
        else:
            self.register(function)

        if when is None:
            when = datetime.now(timezone.utc)

        if key is None:
            key = str(uuid7())

        async def scheduler(*args: P.args, **kwargs: P.kwargs) -> Execution:
            execution = Execution(
                self,
                function,
                args,
                kwargs,
                key,
                when,
                attempt=1,
                function_name=function_name,
            )

            # Check if task is stricken before scheduling
            if self.strike_list.is_stricken(execution):
                logger.warning(
                    "%r is stricken, skipping schedule of %r",
                    execution.function_name,
                    execution.key,
                )
                TASKS_STRICKEN.add(
                    1,
                    {
                        **self.labels(),
                        **execution.general_labels(),
                        "docket.where": "docket",
                    },
                )
                return execution

            # Schedule atomically (includes state record write)
            await execution.schedule(replace=False)

            TASKS_ADDED.add(1, {**self.labels(), **execution.general_labels()})
            TASKS_SCHEDULED.add(1, {**self.labels(), **execution.general_labels()})

            return execution

        return scheduler

    @overload
    def replace(
        self,
        function: Callable[P, Awaitable[R]],
        when: datetime,
        key: str,
    ) -> Callable[P, Awaitable[Execution]]:
        """Replace a previously scheduled task on the Docket.

        Args:
            function: The task function to replace.
            when: The time to schedule the task.
            key: The key to schedule the task under.
        """

    @overload
    def replace(
        self,
        function: str,
        when: datetime,
        key: str,
    ) -> Callable[..., Awaitable[Execution]]:
        """Replace a previously scheduled task on the Docket.

        Args:
            function: The name of a task to replace.
            when: The time to schedule the task.
            key: The key to schedule the task under.
        """

    def replace(
        self,
        function: Callable[P, Awaitable[R]] | str,
        when: datetime,
        key: str,
    ) -> Callable[..., Awaitable[Execution]]:
        """Replace a previously scheduled task on the Docket.

        Args:
            function: The task to replace.
            when: The time to schedule the task.
            key: The key to schedule the task under.
        """
        function_name: str | None = None
        if isinstance(function, str):
            function_name = function
            function = self.tasks[function]
        else:
            self.register(function)

        async def scheduler(*args: P.args, **kwargs: P.kwargs) -> Execution:
            execution = Execution(
                self,
                function,
                args,
                kwargs,
                key,
                when,
                attempt=1,
                function_name=function_name,
            )

            # Check if task is stricken before scheduling
            if self.strike_list.is_stricken(execution):
                logger.warning(
                    "%r is stricken, skipping schedule of %r",
                    execution.function_name,
                    execution.key,
                )
                TASKS_STRICKEN.add(
                    1,
                    {
                        **self.labels(),
                        **execution.general_labels(),
                        "docket.where": "docket",
                    },
                )
                return execution

            # Schedule atomically (includes state record write)
            await execution.schedule(replace=True)

            TASKS_REPLACED.add(1, {**self.labels(), **execution.general_labels()})
            TASKS_CANCELLED.add(1, {**self.labels(), **execution.general_labels()})
            TASKS_SCHEDULED.add(1, {**self.labels(), **execution.general_labels()})

            return execution

        return scheduler

    async def schedule(self, execution: Execution) -> None:
        with tracer.start_as_current_span(
            "docket.schedule",
            attributes={
                **self.labels(),
                **execution.specific_labels(),
                "code.function.name": execution.function_name,
            },
        ):
            # Check if task is stricken before scheduling
            if self.strike_list.is_stricken(execution):
                logger.warning(
                    "%r is stricken, skipping schedule of %r",
                    execution.function_name,
                    execution.key,
                )
                TASKS_STRICKEN.add(
                    1,
                    {
                        **self.labels(),
                        **execution.general_labels(),
                        "docket.where": "docket",
                    },
                )
                return

            # Schedule atomically (includes state record write)
            await execution.schedule(replace=False)

        TASKS_SCHEDULED.add(1, {**self.labels(), **execution.general_labels()})

    async def cancel(self, key: str) -> None:
        """Cancel a previously scheduled task on the Docket.

        If the task is scheduled (in the queue or stream), it will be removed.
        If the task is currently running, a cancellation signal will be sent
        to the worker, which will attempt to cancel the asyncio task. This is
        best-effort: if the task completes before the signal is processed,
        the cancellation will have no effect.

        Args:
            key: The key of the task to cancel.
        """
        with tracer.start_as_current_span(
            "docket.cancel",
            attributes={**self.labels(), "docket.key": key},
        ):
            async with self.redis() as redis:
                await self._cancel(redis, key)

                # Publish cancellation signal for running tasks (best-effort)
                cancellation_channel = f"{self.name}:cancel:{key}"
                await redis.publish(cancellation_channel, key)

        TASKS_CANCELLED.add(1, self.labels())

    async def get_execution(self, key: str) -> Execution | None:
        """Get a task Execution from the Docket by its key.

        Args:
            key: The task key.

        Returns:
            The Execution if found, None if the key doesn't exist.

        Example:
            # Claim check pattern: schedule a task, save the key,
            # then retrieve the execution later to check status or get results
            execution = await docket.add(my_task, key="important-task")(args)
            task_key = execution.key

            # Later, retrieve the execution by key
            execution = await docket.get_execution(task_key)
            if execution:
                await execution.get_result()
        """
        import cloudpickle

        async with self.redis() as redis:
            runs_key = f"{self.name}:runs:{key}"
            data = await redis.hgetall(runs_key)

            if not data:
                return None

            # Extract task definition from runs hash
            function_name = data.get(b"function")
            args_data = data.get(b"args")
            kwargs_data = data.get(b"kwargs")

            # TODO: Remove in next breaking release (v0.14.0) - fallback for 0.13.0 compatibility
            # Check parked hash if runs hash incomplete (0.13.0 didn't store task data in runs hash)
            if not function_name or not args_data or not kwargs_data:
                parked_key = self.parked_task_key(key)
                parked_data = await redis.hgetall(parked_key)
                if parked_data:
                    function_name = parked_data.get(b"function")
                    args_data = parked_data.get(b"args")
                    kwargs_data = parked_data.get(b"kwargs")

            if not function_name or not args_data or not kwargs_data:
                return None

            # Look up function in registry, or create a placeholder if not found
            function_name_str = function_name.decode()
            function = self.tasks.get(function_name_str)
            if not function:
                # Create a placeholder function for display purposes (e.g., CLI watch)
                # This allows viewing task state even if function isn't registered
                async def placeholder() -> None:
                    pass  # pragma: no cover

                placeholder.__name__ = function_name_str
                function = placeholder

            # Deserialize args and kwargs
            args = cloudpickle.loads(args_data)
            kwargs = cloudpickle.loads(kwargs_data)

            # Extract scheduling metadata
            when_str = data.get(b"when")
            if not when_str:
                return None
            when = datetime.fromtimestamp(float(when_str.decode()), tz=timezone.utc)

            # Build execution (attempt defaults to 1 for initial scheduling)
            from docket.execution import Execution

            execution = Execution(
                docket=self,
                function=function,
                args=args,
                kwargs=kwargs,
                key=key,
                when=when,
                attempt=1,
            )

            # Sync with current state from Redis
            await execution.sync()

            return execution

    @property
    def queue_key(self) -> str:
        return f"{self.name}:queue"

    @property
    def stream_key(self) -> str:
        return f"{self.name}:stream"

    def known_task_key(self, key: str) -> str:
        return f"{self.name}:known:{key}"

    def parked_task_key(self, key: str) -> str:
        return f"{self.name}:{key}"

    def stream_id_key(self, key: str) -> str:
        return f"{self.name}:stream-id:{key}"

    async def _ensure_stream_and_group(self) -> None:
        """Create stream and consumer group if they don't exist (idempotent).

        This is safe to call from multiple workers racing to initialize - the
        BUSYGROUP error is silently ignored since it just means another worker
        created the group first.
        """
        try:
            async with self.redis() as r:
                await r.xgroup_create(
                    groupname=self.worker_group_name,
                    name=self.stream_key,
                    id="0-0",
                    mkstream=True,
                )
        except redis.exceptions.ResponseError as e:
            if "BUSYGROUP" not in str(e):
                raise  # pragma: no cover

    async def _cancel(self, redis: Redis, key: str) -> None:
        """Cancel a task atomically.

        Handles cancellation regardless of task location:
        - From the stream (using stored message ID)
        - From the queue (scheduled tasks)
        - Cleans up all associated metadata keys
        """
        if self._cancel_task_script is None:
            self._cancel_task_script = cast(
                _cancel_task,
                redis.register_script(
                    # KEYS: stream_key, known_key, parked_key, queue_key, stream_id_key, runs_key
                    # ARGV: task_key, completed_at
                    """
                    local stream_key = KEYS[1]
                    -- TODO: Remove in next breaking release (v0.14.0) - legacy key locations
                    local known_key = KEYS[2]
                    local parked_key = KEYS[3]
                    local queue_key = KEYS[4]
                    local stream_id_key = KEYS[5]
                    local runs_key = KEYS[6]
                    local task_key = ARGV[1]
                    local completed_at = ARGV[2]

                    -- Get stream ID (check new location first, then legacy)
                    local message_id = redis.call('HGET', runs_key, 'stream_id')

                    -- TODO: Remove in next breaking release (v0.14.0) - check legacy location
                    if not message_id then
                        message_id = redis.call('GET', stream_id_key)
                    end

                    -- Delete from stream if message ID exists
                    if message_id then
                        redis.call('XDEL', stream_key, message_id)
                    end

                    -- Clean up legacy keys and parked data
                    redis.call('DEL', known_key, parked_key, stream_id_key)
                    redis.call('ZREM', queue_key, task_key)

                    -- Only set CANCELLED if not already in a terminal state
                    local current_state = redis.call('HGET', runs_key, 'state')
                    if current_state ~= 'completed' and current_state ~= 'failed' and current_state ~= 'cancelled' then
                        redis.call('HSET', runs_key, 'state', 'cancelled', 'completed_at', completed_at)
                    end

                    return 'OK'
                    """
                ),
            )
        cancel_task = self._cancel_task_script

        # Create tombstone with CANCELLED state
        completed_at = datetime.now(timezone.utc).isoformat()
        runs_key = f"{self.name}:runs:{key}"

        # Execute the cancellation script
        await cancel_task(
            keys=[
                self.stream_key,
                self.known_task_key(key),
                self.parked_task_key(key),
                self.queue_key,
                self.stream_id_key(key),
                runs_key,
            ],
            args=[key, completed_at],
        )

        # Apply TTL or delete tombstone based on execution_ttl
        if self.execution_ttl:
            ttl_seconds = int(self.execution_ttl.total_seconds())
            await redis.expire(runs_key, ttl_seconds)
        else:
            # execution_ttl=0 means no observability - delete tombstone immediately
            await redis.delete(runs_key)

    async def strike(
        self,
        function: Callable[P, Awaitable[R]] | str | None = None,
        parameter: str | None = None,
        operator: Operator | LiteralOperator = "==",
        value: Hashable | None = None,
    ) -> None:
        """Strike a task from the Docket.

        Args:
            function: The task to strike (function or name), or None for all tasks.
            parameter: The parameter to strike on, or None for entire task.
            operator: The comparison operator to use.
            value: The value to strike on.
        """
        function_name = function.__name__ if callable(function) else function

        instruction = Strike(function_name, parameter, Operator(operator), value)
        with tracer.start_as_current_span(
            "docket.strike",
            attributes={**self.labels(), **instruction.labels()},
        ):
            await self.strike_list.send_instruction(instruction)

    async def restore(
        self,
        function: Callable[P, Awaitable[R]] | str | None = None,
        parameter: str | None = None,
        operator: Operator | LiteralOperator = "==",
        value: Hashable | None = None,
    ) -> None:
        """Restore a previously stricken task to the Docket.

        Args:
            function: The task to restore (function or name), or None for all tasks.
            parameter: The parameter to restore on, or None for entire task.
            operator: The comparison operator to use.
            value: The value to restore on.
        """
        function_name = function.__name__ if callable(function) else function

        instruction = Restore(function_name, parameter, Operator(operator), value)
        with tracer.start_as_current_span(
            "docket.restore",
            attributes={**self.labels(), **instruction.labels()},
        ):
            await self.strike_list.send_instruction(instruction)

    async def wait_for_strikes_loaded(self) -> None:
        """Wait for all existing strikes to be loaded from the stream.

        This method blocks until the strike monitor has completed its initial
        non-blocking read of all existing strike messages. Call this before
        making decisions that depend on the current strike state, such as
        scheduling automatic perpetual tasks.
        """
        await self.strike_list.wait_for_strikes_loaded()

    async def snapshot(self) -> DocketSnapshot:
        """Get a snapshot of the Docket, including which tasks are scheduled or currently
        running, as well as which workers are active.

        Returns:
            A snapshot of the Docket.
        """
        # For memory:// URLs (fakeredis), ensure the group exists upfront. This
        # avoids a fakeredis bug where xpending_range raises TypeError instead
        # of NOGROUP when the consumer group doesn't exist.
        if self.url.startswith("memory://"):
            await self._ensure_stream_and_group()

        running: list[RunningExecution] = []
        future: list[Execution] = []

        async with self.redis() as r:
            async with r.pipeline() as pipeline:
                pipeline.xlen(self.stream_key)

                pipeline.zcard(self.queue_key)

                pipeline.xpending_range(
                    self.stream_key,
                    self.worker_group_name,
                    min="-",
                    max="+",
                    count=1000,
                )

                pipeline.xrange(self.stream_key, "-", "+", count=1000)

                pipeline.zrange(self.queue_key, 0, -1)

                total_stream_messages: int
                total_schedule_messages: int
                pending_messages: list[RedisStreamPendingMessage]
                stream_messages: list[tuple[RedisMessageID, RedisMessage]]
                scheduled_task_keys: list[bytes]

                now = datetime.now(timezone.utc)
                try:
                    (
                        total_stream_messages,
                        total_schedule_messages,
                        pending_messages,
                        stream_messages,
                        scheduled_task_keys,
                    ) = await pipeline.execute()
                except redis.exceptions.ResponseError as e:
                    # Check for NOGROUP error. Also check for XPENDING because
                    # redis-py 7.0 has a bug where pipeline errors lose the
                    # original NOGROUP message (shows "{exception.args}" instead).
                    error_str = str(e)
                    if "NOGROUP" in error_str or "XPENDING" in error_str:
                        await self._ensure_stream_and_group()
                        return await self.snapshot()
                    raise  # pragma: no cover

                for task_key in scheduled_task_keys:
                    pipeline.hgetall(self.parked_task_key(task_key.decode()))

                # Because these are two separate pipeline commands, it's possible that
                # a message has been moved from the schedule to the stream in the
                # meantime, which would end up being an empty `{}` message
                queued_messages: list[RedisMessage] = [
                    m for m in await pipeline.execute() if m
                ]

        total_tasks = total_stream_messages + total_schedule_messages

        pending_lookup: dict[RedisMessageID, RedisStreamPendingMessage] = {
            pending["message_id"]: pending for pending in pending_messages
        }

        for message_id, message in stream_messages:
            execution = await Execution.from_message(self, message)
            if message_id in pending_lookup:
                worker_name = pending_lookup[message_id]["consumer"].decode()
                started = now - timedelta(
                    milliseconds=pending_lookup[message_id]["time_since_delivered"]
                )
                running.append(RunningExecution(execution, worker_name, started))
            else:
                future.append(execution)  # pragma: no cover

        for message in queued_messages:
            execution = await Execution.from_message(self, message)
            future.append(execution)

        workers = await self.workers()

        return DocketSnapshot(now, total_tasks, future, running, workers)

    @property
    def workers_set(self) -> str:
        return f"{self.name}:workers"

    def worker_tasks_set(self, worker_name: str) -> str:
        return f"{self.name}:worker-tasks:{worker_name}"

    def task_workers_set(self, task_name: str) -> str:
        return f"{self.name}:task-workers:{task_name}"

    async def workers(self) -> Collection[WorkerInfo]:
        """Get a list of all workers that have sent heartbeats to the Docket.

        Returns:
            A list of all workers that have sent heartbeats to the Docket.
        """
        workers: list[WorkerInfo] = []

        oldest = datetime.now(timezone.utc).timestamp() - (
            self.heartbeat_interval.total_seconds() * self.missed_heartbeats
        )

        async with self.redis() as r:
            await r.zremrangebyscore(self.workers_set, 0, oldest)

            worker_name_bytes: bytes
            last_seen_timestamp: float

            for worker_name_bytes, last_seen_timestamp in await r.zrange(
                self.workers_set, 0, -1, withscores=True
            ):
                worker_name = worker_name_bytes.decode()
                last_seen = datetime.fromtimestamp(last_seen_timestamp, timezone.utc)

                task_names: set[str] = {
                    task_name_bytes.decode()
                    for task_name_bytes in cast(
                        set[bytes], await r.smembers(self.worker_tasks_set(worker_name))
                    )
                }

                workers.append(WorkerInfo(worker_name, last_seen, task_names))

        return workers

    async def task_workers(self, task_name: str) -> Collection[WorkerInfo]:
        """Get a list of all workers that are able to execute a given task.

        Args:
            task_name: The name of the task.

        Returns:
            A list of all workers that are able to execute the given task.
        """
        workers: list[WorkerInfo] = []
        oldest = datetime.now(timezone.utc).timestamp() - (
            self.heartbeat_interval.total_seconds() * self.missed_heartbeats
        )

        async with self.redis() as r:
            await r.zremrangebyscore(self.task_workers_set(task_name), 0, oldest)

            worker_name_bytes: bytes
            last_seen_timestamp: float

            for worker_name_bytes, last_seen_timestamp in await r.zrange(
                self.task_workers_set(task_name), 0, -1, withscores=True
            ):
                worker_name = worker_name_bytes.decode()
                last_seen = datetime.fromtimestamp(last_seen_timestamp, timezone.utc)

                task_names: set[str] = {
                    task_name_bytes.decode()
                    for task_name_bytes in cast(
                        set[bytes], await r.smembers(self.worker_tasks_set(worker_name))
                    )
                }

                workers.append(WorkerInfo(worker_name, last_seen, task_names))

        return workers

    async def clear(self) -> int:
        """Clear all queued and scheduled tasks from the docket.

        This removes all tasks from the stream (immediate tasks) and queue
        (scheduled tasks), along with their associated parked data. Running
        tasks are not affected.

        Returns:
            The total number of tasks that were cleared.
        """
        with tracer.start_as_current_span(
            "docket.clear",
            attributes=self.labels(),
        ):
            async with self.redis() as redis:
                async with redis.pipeline() as pipeline:
                    # Get counts before clearing
                    pipeline.xlen(self.stream_key)
                    pipeline.zcard(self.queue_key)
                    pipeline.zrange(self.queue_key, 0, -1)

                    stream_count: int
                    queue_count: int
                    scheduled_keys: list[bytes]
                    stream_count, queue_count, scheduled_keys = await pipeline.execute()

                # Get keys from stream messages before trimming
                stream_keys: list[str] = []
                if stream_count > 0:
                    # Read all messages from the stream
                    messages = await redis.xrange(self.stream_key, "-", "+")
                    for message_id, fields in messages:
                        # Extract the key field from the message
                        if b"key" in fields:  # pragma: no branch
                            stream_keys.append(fields[b"key"].decode())

                async with redis.pipeline() as pipeline:
                    # Clear all data
                    # Trim stream to 0 messages instead of deleting it to preserve consumer group
                    if stream_count > 0:
                        pipeline.xtrim(self.stream_key, maxlen=0, approximate=False)
                    pipeline.delete(self.queue_key)

                    # Clear parked task data and known task keys for scheduled tasks
                    for key_bytes in scheduled_keys:
                        key = key_bytes.decode()
                        pipeline.delete(self.parked_task_key(key))
                        pipeline.delete(self.known_task_key(key))
                        pipeline.delete(self.stream_id_key(key))

                        # Handle runs hash: set TTL or delete based on execution_ttl
                        runs_key = f"{self.name}:runs:{key}"
                        if self.execution_ttl:
                            ttl_seconds = int(self.execution_ttl.total_seconds())
                            pipeline.expire(runs_key, ttl_seconds)
                        else:
                            pipeline.delete(runs_key)

                    # Handle runs hash for immediate tasks from stream
                    for key in stream_keys:
                        runs_key = f"{self.name}:runs:{key}"
                        if self.execution_ttl:
                            ttl_seconds = int(self.execution_ttl.total_seconds())
                            pipeline.expire(runs_key, ttl_seconds)
                        else:
                            pipeline.delete(runs_key)

                    await pipeline.execute()

                    total_cleared = stream_count + queue_count
                    return total_cleared

__init__(name='docket', url='redis://localhost:6379/0', heartbeat_interval=timedelta(seconds=2), missed_heartbeats=5, execution_ttl=timedelta(minutes=15), result_storage=None, enable_internal_instrumentation=False)

Parameters:

Name Type Description Default
name str

The name of the docket.

'docket'
url str

The URL of the Redis server or in-memory backend. For example: - "redis://localhost:6379/0" - "redis://user:password@localhost:6379/0" - "redis://user:password@localhost:6379/0?ssl=true" - "rediss://localhost:6379/0" - "unix:///path/to/redis.sock" - "memory://" (in-memory backend for testing)

'redis://localhost:6379/0'
heartbeat_interval timedelta

How often workers send heartbeat messages to the docket.

timedelta(seconds=2)
missed_heartbeats int

How many heartbeats a worker can miss before it is considered dead.

5
execution_ttl timedelta

How long to keep completed or failed execution state records in Redis before they expire. Defaults to 15 minutes.

timedelta(minutes=15)
enable_internal_instrumentation bool

Whether to enable OpenTelemetry spans for internal Redis polling operations like strike stream monitoring. Defaults to False.

False
Source code in src/docket/docket.py
def __init__(
    self,
    name: str = "docket",
    url: str = "redis://localhost:6379/0",
    heartbeat_interval: timedelta = timedelta(seconds=2),
    missed_heartbeats: int = 5,
    execution_ttl: timedelta = timedelta(minutes=15),
    result_storage: AsyncKeyValue | None = None,
    enable_internal_instrumentation: bool = False,
) -> None:
    """
    Args:
        name: The name of the docket.
        url: The URL of the Redis server or in-memory backend.  For example:
            - "redis://localhost:6379/0"
            - "redis://user:password@localhost:6379/0"
            - "redis://user:password@localhost:6379/0?ssl=true"
            - "rediss://localhost:6379/0"
            - "unix:///path/to/redis.sock"
            - "memory://" (in-memory backend for testing)
        heartbeat_interval: How often workers send heartbeat messages to the docket.
        missed_heartbeats: How many heartbeats a worker can miss before it is
            considered dead.
        execution_ttl: How long to keep completed or failed execution state records
            in Redis before they expire. Defaults to 15 minutes.
        enable_internal_instrumentation: Whether to enable OpenTelemetry spans
            for internal Redis polling operations like strike stream monitoring.
            Defaults to False.
    """
    self.name = name
    self.url = url
    self.heartbeat_interval = heartbeat_interval
    self.missed_heartbeats = missed_heartbeats
    self.execution_ttl = execution_ttl
    self.enable_internal_instrumentation = enable_internal_instrumentation
    self._cancel_task_script = None

    self.result_storage: AsyncKeyValue
    if url.startswith("memory://"):
        self.result_storage = MemoryStore()
    else:
        self.result_storage = RedisStore(
            url=url, default_collection=f"{name}:results"
        )

    from .tasks import standard_tasks

    self.tasks: dict[str, TaskFunction] = {fn.__name__: fn for fn in standard_tasks}

add(function, when=None, key=None)

add(
    function: Callable[P, Awaitable[R]],
    when: datetime | None = None,
    key: str | None = None,
) -> Callable[P, Awaitable[Execution]]
add(
    function: str,
    when: datetime | None = None,
    key: str | None = None,
) -> Callable[..., Awaitable[Execution]]

Add a task to the Docket.

Parameters:

Name Type Description Default
function Callable[P, Awaitable[R]] | str

The task to add.

required
when datetime | None

The time to schedule the task.

None
key str | None

The key to schedule the task under.

None
Source code in src/docket/docket.py
def add(
    self,
    function: Callable[P, Awaitable[R]] | str,
    when: datetime | None = None,
    key: str | None = None,
) -> Callable[..., Awaitable[Execution]]:
    """Add a task to the Docket.

    Args:
        function: The task to add.
        when: The time to schedule the task.
        key: The key to schedule the task under.
    """
    function_name: str | None = None
    if isinstance(function, str):
        function_name = function
        function = self.tasks[function]
    else:
        self.register(function)

    if when is None:
        when = datetime.now(timezone.utc)

    if key is None:
        key = str(uuid7())

    async def scheduler(*args: P.args, **kwargs: P.kwargs) -> Execution:
        execution = Execution(
            self,
            function,
            args,
            kwargs,
            key,
            when,
            attempt=1,
            function_name=function_name,
        )

        # Check if task is stricken before scheduling
        if self.strike_list.is_stricken(execution):
            logger.warning(
                "%r is stricken, skipping schedule of %r",
                execution.function_name,
                execution.key,
            )
            TASKS_STRICKEN.add(
                1,
                {
                    **self.labels(),
                    **execution.general_labels(),
                    "docket.where": "docket",
                },
            )
            return execution

        # Schedule atomically (includes state record write)
        await execution.schedule(replace=False)

        TASKS_ADDED.add(1, {**self.labels(), **execution.general_labels()})
        TASKS_SCHEDULED.add(1, {**self.labels(), **execution.general_labels()})

        return execution

    return scheduler

cancel(key) async

Cancel a previously scheduled task on the Docket.

If the task is scheduled (in the queue or stream), it will be removed. If the task is currently running, a cancellation signal will be sent to the worker, which will attempt to cancel the asyncio task. This is best-effort: if the task completes before the signal is processed, the cancellation will have no effect.

Parameters:

Name Type Description Default
key str

The key of the task to cancel.

required
Source code in src/docket/docket.py
async def cancel(self, key: str) -> None:
    """Cancel a previously scheduled task on the Docket.

    If the task is scheduled (in the queue or stream), it will be removed.
    If the task is currently running, a cancellation signal will be sent
    to the worker, which will attempt to cancel the asyncio task. This is
    best-effort: if the task completes before the signal is processed,
    the cancellation will have no effect.

    Args:
        key: The key of the task to cancel.
    """
    with tracer.start_as_current_span(
        "docket.cancel",
        attributes={**self.labels(), "docket.key": key},
    ):
        async with self.redis() as redis:
            await self._cancel(redis, key)

            # Publish cancellation signal for running tasks (best-effort)
            cancellation_channel = f"{self.name}:cancel:{key}"
            await redis.publish(cancellation_channel, key)

    TASKS_CANCELLED.add(1, self.labels())

clear() async

Clear all queued and scheduled tasks from the docket.

This removes all tasks from the stream (immediate tasks) and queue (scheduled tasks), along with their associated parked data. Running tasks are not affected.

Returns:

Type Description
int

The total number of tasks that were cleared.

Source code in src/docket/docket.py
async def clear(self) -> int:
    """Clear all queued and scheduled tasks from the docket.

    This removes all tasks from the stream (immediate tasks) and queue
    (scheduled tasks), along with their associated parked data. Running
    tasks are not affected.

    Returns:
        The total number of tasks that were cleared.
    """
    with tracer.start_as_current_span(
        "docket.clear",
        attributes=self.labels(),
    ):
        async with self.redis() as redis:
            async with redis.pipeline() as pipeline:
                # Get counts before clearing
                pipeline.xlen(self.stream_key)
                pipeline.zcard(self.queue_key)
                pipeline.zrange(self.queue_key, 0, -1)

                stream_count: int
                queue_count: int
                scheduled_keys: list[bytes]
                stream_count, queue_count, scheduled_keys = await pipeline.execute()

            # Get keys from stream messages before trimming
            stream_keys: list[str] = []
            if stream_count > 0:
                # Read all messages from the stream
                messages = await redis.xrange(self.stream_key, "-", "+")
                for message_id, fields in messages:
                    # Extract the key field from the message
                    if b"key" in fields:  # pragma: no branch
                        stream_keys.append(fields[b"key"].decode())

            async with redis.pipeline() as pipeline:
                # Clear all data
                # Trim stream to 0 messages instead of deleting it to preserve consumer group
                if stream_count > 0:
                    pipeline.xtrim(self.stream_key, maxlen=0, approximate=False)
                pipeline.delete(self.queue_key)

                # Clear parked task data and known task keys for scheduled tasks
                for key_bytes in scheduled_keys:
                    key = key_bytes.decode()
                    pipeline.delete(self.parked_task_key(key))
                    pipeline.delete(self.known_task_key(key))
                    pipeline.delete(self.stream_id_key(key))

                    # Handle runs hash: set TTL or delete based on execution_ttl
                    runs_key = f"{self.name}:runs:{key}"
                    if self.execution_ttl:
                        ttl_seconds = int(self.execution_ttl.total_seconds())
                        pipeline.expire(runs_key, ttl_seconds)
                    else:
                        pipeline.delete(runs_key)

                # Handle runs hash for immediate tasks from stream
                for key in stream_keys:
                    runs_key = f"{self.name}:runs:{key}"
                    if self.execution_ttl:
                        ttl_seconds = int(self.execution_ttl.total_seconds())
                        pipeline.expire(runs_key, ttl_seconds)
                    else:
                        pipeline.delete(runs_key)

                await pipeline.execute()

                total_cleared = stream_count + queue_count
                return total_cleared

get_execution(key) async

Get a task Execution from the Docket by its key.

Parameters:

Name Type Description Default
key str

The task key.

required

Returns:

Type Description
Execution | None

The Execution if found, None if the key doesn't exist.

Example
Claim check pattern: schedule a task, save the key,
then retrieve the execution later to check status or get results

execution = await docket.add(my_task, key="important-task")(args) task_key = execution.key

Later, retrieve the execution by key

execution = await docket.get_execution(task_key) if execution: await execution.get_result()

Source code in src/docket/docket.py
async def get_execution(self, key: str) -> Execution | None:
    """Get a task Execution from the Docket by its key.

    Args:
        key: The task key.

    Returns:
        The Execution if found, None if the key doesn't exist.

    Example:
        # Claim check pattern: schedule a task, save the key,
        # then retrieve the execution later to check status or get results
        execution = await docket.add(my_task, key="important-task")(args)
        task_key = execution.key

        # Later, retrieve the execution by key
        execution = await docket.get_execution(task_key)
        if execution:
            await execution.get_result()
    """
    import cloudpickle

    async with self.redis() as redis:
        runs_key = f"{self.name}:runs:{key}"
        data = await redis.hgetall(runs_key)

        if not data:
            return None

        # Extract task definition from runs hash
        function_name = data.get(b"function")
        args_data = data.get(b"args")
        kwargs_data = data.get(b"kwargs")

        # TODO: Remove in next breaking release (v0.14.0) - fallback for 0.13.0 compatibility
        # Check parked hash if runs hash incomplete (0.13.0 didn't store task data in runs hash)
        if not function_name or not args_data or not kwargs_data:
            parked_key = self.parked_task_key(key)
            parked_data = await redis.hgetall(parked_key)
            if parked_data:
                function_name = parked_data.get(b"function")
                args_data = parked_data.get(b"args")
                kwargs_data = parked_data.get(b"kwargs")

        if not function_name or not args_data or not kwargs_data:
            return None

        # Look up function in registry, or create a placeholder if not found
        function_name_str = function_name.decode()
        function = self.tasks.get(function_name_str)
        if not function:
            # Create a placeholder function for display purposes (e.g., CLI watch)
            # This allows viewing task state even if function isn't registered
            async def placeholder() -> None:
                pass  # pragma: no cover

            placeholder.__name__ = function_name_str
            function = placeholder

        # Deserialize args and kwargs
        args = cloudpickle.loads(args_data)
        kwargs = cloudpickle.loads(kwargs_data)

        # Extract scheduling metadata
        when_str = data.get(b"when")
        if not when_str:
            return None
        when = datetime.fromtimestamp(float(when_str.decode()), tz=timezone.utc)

        # Build execution (attempt defaults to 1 for initial scheduling)
        from docket.execution import Execution

        execution = Execution(
            docket=self,
            function=function,
            args=args,
            kwargs=kwargs,
            key=key,
            when=when,
            attempt=1,
        )

        # Sync with current state from Redis
        await execution.sync()

        return execution

register(function, names=None)

Register a task with the Docket.

Parameters:

Name Type Description Default
function TaskFunction

The task to register.

required
names list[str] | None

Names to register the task under. Defaults to [function.name].

None
Source code in src/docket/docket.py
def register(self, function: TaskFunction, names: list[str] | None = None) -> None:
    """Register a task with the Docket.

    Args:
        function: The task to register.
        names: Names to register the task under. Defaults to [function.__name__].
    """
    from .dependencies import validate_dependencies

    validate_dependencies(function)

    if not names:
        names = [function.__name__]

    for name in names:
        self.tasks[name] = function

register_collection(collection_path)

Register a collection of tasks.

Parameters:

Name Type Description Default
collection_path str

A path in the format "module:collection".

required
Source code in src/docket/docket.py
def register_collection(self, collection_path: str) -> None:
    """
    Register a collection of tasks.

    Args:
        collection_path: A path in the format "module:collection".
    """
    module_name, _, member_name = collection_path.rpartition(":")
    module = importlib.import_module(module_name)
    collection = getattr(module, member_name)
    for function in collection:
        self.register(function)

replace(function, when, key)

replace(
    function: Callable[P, Awaitable[R]],
    when: datetime,
    key: str,
) -> Callable[P, Awaitable[Execution]]
replace(
    function: str, when: datetime, key: str
) -> Callable[..., Awaitable[Execution]]

Replace a previously scheduled task on the Docket.

Parameters:

Name Type Description Default
function Callable[P, Awaitable[R]] | str

The task to replace.

required
when datetime

The time to schedule the task.

required
key str

The key to schedule the task under.

required
Source code in src/docket/docket.py
def replace(
    self,
    function: Callable[P, Awaitable[R]] | str,
    when: datetime,
    key: str,
) -> Callable[..., Awaitable[Execution]]:
    """Replace a previously scheduled task on the Docket.

    Args:
        function: The task to replace.
        when: The time to schedule the task.
        key: The key to schedule the task under.
    """
    function_name: str | None = None
    if isinstance(function, str):
        function_name = function
        function = self.tasks[function]
    else:
        self.register(function)

    async def scheduler(*args: P.args, **kwargs: P.kwargs) -> Execution:
        execution = Execution(
            self,
            function,
            args,
            kwargs,
            key,
            when,
            attempt=1,
            function_name=function_name,
        )

        # Check if task is stricken before scheduling
        if self.strike_list.is_stricken(execution):
            logger.warning(
                "%r is stricken, skipping schedule of %r",
                execution.function_name,
                execution.key,
            )
            TASKS_STRICKEN.add(
                1,
                {
                    **self.labels(),
                    **execution.general_labels(),
                    "docket.where": "docket",
                },
            )
            return execution

        # Schedule atomically (includes state record write)
        await execution.schedule(replace=True)

        TASKS_REPLACED.add(1, {**self.labels(), **execution.general_labels()})
        TASKS_CANCELLED.add(1, {**self.labels(), **execution.general_labels()})
        TASKS_SCHEDULED.add(1, {**self.labels(), **execution.general_labels()})

        return execution

    return scheduler

restore(function=None, parameter=None, operator='==', value=None) async

Restore a previously stricken task to the Docket.

Parameters:

Name Type Description Default
function Callable[P, Awaitable[R]] | str | None

The task to restore (function or name), or None for all tasks.

None
parameter str | None

The parameter to restore on, or None for entire task.

None
operator Operator | LiteralOperator

The comparison operator to use.

'=='
value Hashable | None

The value to restore on.

None
Source code in src/docket/docket.py
async def restore(
    self,
    function: Callable[P, Awaitable[R]] | str | None = None,
    parameter: str | None = None,
    operator: Operator | LiteralOperator = "==",
    value: Hashable | None = None,
) -> None:
    """Restore a previously stricken task to the Docket.

    Args:
        function: The task to restore (function or name), or None for all tasks.
        parameter: The parameter to restore on, or None for entire task.
        operator: The comparison operator to use.
        value: The value to restore on.
    """
    function_name = function.__name__ if callable(function) else function

    instruction = Restore(function_name, parameter, Operator(operator), value)
    with tracer.start_as_current_span(
        "docket.restore",
        attributes={**self.labels(), **instruction.labels()},
    ):
        await self.strike_list.send_instruction(instruction)

snapshot() async

Get a snapshot of the Docket, including which tasks are scheduled or currently running, as well as which workers are active.

Returns:

Type Description
DocketSnapshot

A snapshot of the Docket.

Source code in src/docket/docket.py
async def snapshot(self) -> DocketSnapshot:
    """Get a snapshot of the Docket, including which tasks are scheduled or currently
    running, as well as which workers are active.

    Returns:
        A snapshot of the Docket.
    """
    # For memory:// URLs (fakeredis), ensure the group exists upfront. This
    # avoids a fakeredis bug where xpending_range raises TypeError instead
    # of NOGROUP when the consumer group doesn't exist.
    if self.url.startswith("memory://"):
        await self._ensure_stream_and_group()

    running: list[RunningExecution] = []
    future: list[Execution] = []

    async with self.redis() as r:
        async with r.pipeline() as pipeline:
            pipeline.xlen(self.stream_key)

            pipeline.zcard(self.queue_key)

            pipeline.xpending_range(
                self.stream_key,
                self.worker_group_name,
                min="-",
                max="+",
                count=1000,
            )

            pipeline.xrange(self.stream_key, "-", "+", count=1000)

            pipeline.zrange(self.queue_key, 0, -1)

            total_stream_messages: int
            total_schedule_messages: int
            pending_messages: list[RedisStreamPendingMessage]
            stream_messages: list[tuple[RedisMessageID, RedisMessage]]
            scheduled_task_keys: list[bytes]

            now = datetime.now(timezone.utc)
            try:
                (
                    total_stream_messages,
                    total_schedule_messages,
                    pending_messages,
                    stream_messages,
                    scheduled_task_keys,
                ) = await pipeline.execute()
            except redis.exceptions.ResponseError as e:
                # Check for NOGROUP error. Also check for XPENDING because
                # redis-py 7.0 has a bug where pipeline errors lose the
                # original NOGROUP message (shows "{exception.args}" instead).
                error_str = str(e)
                if "NOGROUP" in error_str or "XPENDING" in error_str:
                    await self._ensure_stream_and_group()
                    return await self.snapshot()
                raise  # pragma: no cover

            for task_key in scheduled_task_keys:
                pipeline.hgetall(self.parked_task_key(task_key.decode()))

            # Because these are two separate pipeline commands, it's possible that
            # a message has been moved from the schedule to the stream in the
            # meantime, which would end up being an empty `{}` message
            queued_messages: list[RedisMessage] = [
                m for m in await pipeline.execute() if m
            ]

    total_tasks = total_stream_messages + total_schedule_messages

    pending_lookup: dict[RedisMessageID, RedisStreamPendingMessage] = {
        pending["message_id"]: pending for pending in pending_messages
    }

    for message_id, message in stream_messages:
        execution = await Execution.from_message(self, message)
        if message_id in pending_lookup:
            worker_name = pending_lookup[message_id]["consumer"].decode()
            started = now - timedelta(
                milliseconds=pending_lookup[message_id]["time_since_delivered"]
            )
            running.append(RunningExecution(execution, worker_name, started))
        else:
            future.append(execution)  # pragma: no cover

    for message in queued_messages:
        execution = await Execution.from_message(self, message)
        future.append(execution)

    workers = await self.workers()

    return DocketSnapshot(now, total_tasks, future, running, workers)

strike(function=None, parameter=None, operator='==', value=None) async

Strike a task from the Docket.

Parameters:

Name Type Description Default
function Callable[P, Awaitable[R]] | str | None

The task to strike (function or name), or None for all tasks.

None
parameter str | None

The parameter to strike on, or None for entire task.

None
operator Operator | LiteralOperator

The comparison operator to use.

'=='
value Hashable | None

The value to strike on.

None
Source code in src/docket/docket.py
async def strike(
    self,
    function: Callable[P, Awaitable[R]] | str | None = None,
    parameter: str | None = None,
    operator: Operator | LiteralOperator = "==",
    value: Hashable | None = None,
) -> None:
    """Strike a task from the Docket.

    Args:
        function: The task to strike (function or name), or None for all tasks.
        parameter: The parameter to strike on, or None for entire task.
        operator: The comparison operator to use.
        value: The value to strike on.
    """
    function_name = function.__name__ if callable(function) else function

    instruction = Strike(function_name, parameter, Operator(operator), value)
    with tracer.start_as_current_span(
        "docket.strike",
        attributes={**self.labels(), **instruction.labels()},
    ):
        await self.strike_list.send_instruction(instruction)

task_workers(task_name) async

Get a list of all workers that are able to execute a given task.

Parameters:

Name Type Description Default
task_name str

The name of the task.

required

Returns:

Type Description
Collection[WorkerInfo]

A list of all workers that are able to execute the given task.

Source code in src/docket/docket.py
async def task_workers(self, task_name: str) -> Collection[WorkerInfo]:
    """Get a list of all workers that are able to execute a given task.

    Args:
        task_name: The name of the task.

    Returns:
        A list of all workers that are able to execute the given task.
    """
    workers: list[WorkerInfo] = []
    oldest = datetime.now(timezone.utc).timestamp() - (
        self.heartbeat_interval.total_seconds() * self.missed_heartbeats
    )

    async with self.redis() as r:
        await r.zremrangebyscore(self.task_workers_set(task_name), 0, oldest)

        worker_name_bytes: bytes
        last_seen_timestamp: float

        for worker_name_bytes, last_seen_timestamp in await r.zrange(
            self.task_workers_set(task_name), 0, -1, withscores=True
        ):
            worker_name = worker_name_bytes.decode()
            last_seen = datetime.fromtimestamp(last_seen_timestamp, timezone.utc)

            task_names: set[str] = {
                task_name_bytes.decode()
                for task_name_bytes in cast(
                    set[bytes], await r.smembers(self.worker_tasks_set(worker_name))
                )
            }

            workers.append(WorkerInfo(worker_name, last_seen, task_names))

    return workers

wait_for_strikes_loaded() async

Wait for all existing strikes to be loaded from the stream.

This method blocks until the strike monitor has completed its initial non-blocking read of all existing strike messages. Call this before making decisions that depend on the current strike state, such as scheduling automatic perpetual tasks.

Source code in src/docket/docket.py
async def wait_for_strikes_loaded(self) -> None:
    """Wait for all existing strikes to be loaded from the stream.

    This method blocks until the strike monitor has completed its initial
    non-blocking read of all existing strike messages. Call this before
    making decisions that depend on the current strike state, such as
    scheduling automatic perpetual tasks.
    """
    await self.strike_list.wait_for_strikes_loaded()

workers() async

Get a list of all workers that have sent heartbeats to the Docket.

Returns:

Type Description
Collection[WorkerInfo]

A list of all workers that have sent heartbeats to the Docket.

Source code in src/docket/docket.py
async def workers(self) -> Collection[WorkerInfo]:
    """Get a list of all workers that have sent heartbeats to the Docket.

    Returns:
        A list of all workers that have sent heartbeats to the Docket.
    """
    workers: list[WorkerInfo] = []

    oldest = datetime.now(timezone.utc).timestamp() - (
        self.heartbeat_interval.total_seconds() * self.missed_heartbeats
    )

    async with self.redis() as r:
        await r.zremrangebyscore(self.workers_set, 0, oldest)

        worker_name_bytes: bytes
        last_seen_timestamp: float

        for worker_name_bytes, last_seen_timestamp in await r.zrange(
            self.workers_set, 0, -1, withscores=True
        ):
            worker_name = worker_name_bytes.decode()
            last_seen = datetime.fromtimestamp(last_seen_timestamp, timezone.utc)

            task_names: set[str] = {
                task_name_bytes.decode()
                for task_name_bytes in cast(
                    set[bytes], await r.smembers(self.worker_tasks_set(worker_name))
                )
            }

            workers.append(WorkerInfo(worker_name, last_seen, task_names))

    return workers

Execution

Represents a task execution with state management and progress tracking.

Combines task invocation metadata (function, args, when, etc.) with Redis-backed lifecycle state tracking and user-reported progress.

Source code in src/docket/execution.py
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
class Execution:
    """Represents a task execution with state management and progress tracking.

    Combines task invocation metadata (function, args, when, etc.) with
    Redis-backed lifecycle state tracking and user-reported progress.
    """

    def __init__(
        self,
        docket: "Docket",
        function: TaskFunction,
        args: tuple[Any, ...],
        kwargs: dict[str, Any],
        key: str,
        when: datetime,
        attempt: int,
        trace_context: opentelemetry.context.Context | None = None,
        redelivered: bool = False,
        function_name: str | None = None,
    ) -> None:
        # Task definition (immutable)
        self._docket = docket
        self._function = function
        self._function_name = function_name or function.__name__
        self._args = args
        self._kwargs = kwargs
        self._key = key

        # Scheduling metadata
        self.when = when
        self.attempt = attempt
        self._trace_context = trace_context
        self._redelivered = redelivered

        # Lifecycle state (mutable)
        self.state: ExecutionState = ExecutionState.SCHEDULED
        self.worker: str | None = None
        self.started_at: datetime | None = None
        self.completed_at: datetime | None = None
        self.error: str | None = None
        self.result_key: str | None = None

        # Progress tracking
        self.progress: ExecutionProgress = ExecutionProgress(docket, key)

        # Redis key
        self._redis_key = f"{docket.name}:runs:{key}"

    # Task definition properties (immutable)
    @property
    def docket(self) -> "Docket":
        """Parent docket instance."""
        return self._docket

    @property
    def function(self) -> TaskFunction:
        """Task function to execute."""
        return self._function

    @property
    def args(self) -> tuple[Any, ...]:
        """Positional arguments for the task."""
        return self._args

    @property
    def kwargs(self) -> dict[str, Any]:
        """Keyword arguments for the task."""
        return self._kwargs

    @property
    def key(self) -> str:
        """Unique task identifier."""
        return self._key

    @property
    def function_name(self) -> str:
        """Name of the task function (from message, may differ from function.__name__ for fallback tasks)."""
        return self._function_name

    # Scheduling metadata properties
    @property
    def trace_context(self) -> opentelemetry.context.Context | None:
        """OpenTelemetry trace context."""
        return self._trace_context

    @property
    def redelivered(self) -> bool:
        """Whether this message was redelivered."""
        return self._redelivered

    def as_message(self) -> Message:
        return {
            b"key": self.key.encode(),
            b"when": self.when.isoformat().encode(),
            b"function": self.function_name.encode(),
            b"args": cloudpickle.dumps(self.args),  # type: ignore[arg-type]
            b"kwargs": cloudpickle.dumps(self.kwargs),  # type: ignore[arg-type]
            b"attempt": str(self.attempt).encode(),
        }

    @classmethod
    async def from_message(
        cls,
        docket: "Docket",
        message: Message,
        redelivered: bool = False,
        fallback_task: TaskFunction | None = None,
    ) -> Self:
        function_name = message[b"function"].decode()
        if not (function := docket.tasks.get(function_name)):
            if fallback_task is None:
                raise ValueError(
                    f"Task function {function_name!r} is not registered with the current docket"
                )
            function = fallback_task

        instance = cls(
            docket=docket,
            function=function,
            args=cloudpickle.loads(message[b"args"]),
            kwargs=cloudpickle.loads(message[b"kwargs"]),
            key=message[b"key"].decode(),
            when=datetime.fromisoformat(message[b"when"].decode()),
            attempt=int(message[b"attempt"].decode()),
            trace_context=propagate.extract(message, getter=message_getter),
            redelivered=redelivered,
            function_name=function_name,
        )
        await instance.sync()
        return instance

    def general_labels(self) -> Mapping[str, str]:
        return {"docket.task": self.function_name}

    def specific_labels(self) -> Mapping[str, str | int]:
        return {
            "docket.task": self.function_name,
            "docket.key": self.key,
            "docket.when": self.when.isoformat(),
            "docket.attempt": self.attempt,
        }

    def get_argument(self, parameter: str) -> Any:
        signature = get_signature(self.function)
        bound_args = signature.bind(*self.args, **self.kwargs)
        return bound_args.arguments[parameter]

    def call_repr(self) -> str:
        arguments: list[str] = []
        function_name = self.function_name

        signature = get_signature(self.function)
        logged_parameters = Logged.annotated_parameters(signature)
        parameter_names = list(signature.parameters.keys())

        for i, argument in enumerate(self.args[: len(parameter_names)]):
            parameter_name = parameter_names[i]
            if logged := logged_parameters.get(parameter_name):
                arguments.append(logged.format(argument))
            else:
                arguments.append("...")

        for parameter_name, argument in self.kwargs.items():
            if logged := logged_parameters.get(parameter_name):
                arguments.append(f"{parameter_name}={logged.format(argument)}")
            else:
                arguments.append(f"{parameter_name}=...")

        return f"{function_name}({', '.join(arguments)}){{{self.key}}}"

    def incoming_span_links(self) -> list[trace.Link]:
        initiating_span = trace.get_current_span(self.trace_context)
        initiating_context = initiating_span.get_span_context()
        return [trace.Link(initiating_context)] if initiating_context.is_valid else []

    async def schedule(
        self, replace: bool = False, reschedule_message: "RedisMessageID | None" = None
    ) -> None:
        """Schedule this task atomically in Redis.

        This performs an atomic operation that:
        - Adds the task to the stream (immediate) or queue (future)
        - Writes the execution state record
        - Tracks metadata for later cancellation

        Usage patterns:
        - Normal add: schedule(replace=False)
        - Replace existing: schedule(replace=True)
        - Reschedule from stream: schedule(reschedule_message=message_id)
          This atomically acknowledges and deletes the stream message, then
          reschedules the task to the queue. Prevents both task loss and
          duplicate execution when rescheduling tasks (e.g., due to concurrency limits).

        Args:
            replace: If True, replaces any existing task with the same key.
                    If False, raises an error if the task already exists.
            reschedule_message: If provided, atomically acknowledges and deletes
                    this stream message ID before rescheduling the task to the queue.
                    Used when a task needs to be rescheduled from an active stream message.
        """
        message: dict[bytes, bytes] = self.as_message()
        propagate.inject(message, setter=message_setter)

        key = self.key
        when = self.when
        known_task_key = self.docket.known_task_key(key)
        is_immediate = when <= datetime.now(timezone.utc)

        async with self.docket.redis() as redis:
            # Lock per task key to prevent race conditions between concurrent operations
            async with redis.lock(f"{known_task_key}:lock", timeout=10):
                # Register script for this connection (not cached to avoid event loop issues)
                schedule_script = cast(
                    _schedule_task,
                    redis.register_script(
                        # KEYS: stream_key, known_key, parked_key, queue_key, stream_id_key, runs_key, worker_group_key
                        # ARGV: task_key, when_timestamp, is_immediate, replace, reschedule_message_id, ...message_fields
                        """
                            local stream_key = KEYS[1]
                            -- TODO: Remove in next breaking release (v0.14.0) - legacy key locations
                            local known_key = KEYS[2]
                            local parked_key = KEYS[3]
                            local queue_key = KEYS[4]
                            local stream_id_key = KEYS[5]
                            local runs_key = KEYS[6]
                            local worker_group_name = KEYS[7]

                            local task_key = ARGV[1]
                            local when_timestamp = ARGV[2]
                            local is_immediate = ARGV[3] == '1'
                            local replace = ARGV[4] == '1'
                            local reschedule_message_id = ARGV[5]

                            -- Extract message fields from ARGV[6] onwards
                            local message = {}
                            local function_name = nil
                            local args_data = nil
                            local kwargs_data = nil

                            for i = 6, #ARGV, 2 do
                                local field_name = ARGV[i]
                                local field_value = ARGV[i + 1]
                                message[#message + 1] = field_name
                                message[#message + 1] = field_value

                                -- Extract task data fields for runs hash
                                if field_name == 'function' then
                                    function_name = field_value
                                elseif field_name == 'args' then
                                    args_data = field_value
                                elseif field_name == 'kwargs' then
                                    kwargs_data = field_value
                                end
                            end

                            -- Handle rescheduling from stream: atomically ACK message and reschedule to queue
                            -- This prevents both task loss (ACK before reschedule) and duplicate execution
                            -- (reschedule before ACK with slow reschedule causing redelivery)
                            if reschedule_message_id ~= '' then
                                -- Acknowledge and delete the message from the stream
                                redis.call('XACK', stream_key, worker_group_name, reschedule_message_id)
                                redis.call('XDEL', stream_key, reschedule_message_id)

                                -- Park task data for future execution
                                redis.call('HSET', parked_key, unpack(message))

                                -- Add to sorted set queue
                                redis.call('ZADD', queue_key, when_timestamp, task_key)

                                -- Update state in runs hash (clear stream_id since task is no longer in stream)
                                redis.call('HSET', runs_key,
                                    'state', 'scheduled',
                                    'when', when_timestamp,
                                    'function', function_name,
                                    'args', args_data,
                                    'kwargs', kwargs_data
                                )
                                redis.call('HDEL', runs_key, 'stream_id')

                                return 'OK'
                            end

                            -- Handle replacement: cancel existing task if needed
                            if replace then
                                -- Get stream ID from runs hash (check new location first)
                                local existing_message_id = redis.call('HGET', runs_key, 'stream_id')

                                -- TODO: Remove in next breaking release (v0.14.0) - check legacy location
                                if not existing_message_id then
                                    existing_message_id = redis.call('GET', stream_id_key)
                                end

                                if existing_message_id then
                                    redis.call('XDEL', stream_key, existing_message_id)
                                end

                                redis.call('ZREM', queue_key, task_key)
                                redis.call('DEL', parked_key)

                                -- TODO: Remove in next breaking release (v0.14.0) - clean up legacy keys
                                redis.call('DEL', known_key, stream_id_key)

                                -- Note: runs_key is updated below, not deleted
                            else
                                -- Check if task already exists (check new location first, then legacy)
                                local known_exists = redis.call('HEXISTS', runs_key, 'known') == 1
                                if not known_exists then
                                    -- Check if task is currently running (known field deleted at claim time)
                                    local state = redis.call('HGET', runs_key, 'state')
                                    if state == 'running' then
                                        return 'EXISTS'
                                    end
                                    -- TODO: Remove in next breaking release (v0.14.0) - check legacy location
                                    known_exists = redis.call('EXISTS', known_key) == 1
                                end
                                if known_exists then
                                    return 'EXISTS'
                                end
                            end

                            if is_immediate then
                                -- Add to stream for immediate execution
                                local message_id = redis.call('XADD', stream_key, '*', unpack(message))

                                -- Store state and metadata in runs hash
                                redis.call('HSET', runs_key,
                                    'state', 'queued',
                                    'when', when_timestamp,
                                    'known', when_timestamp,
                                    'stream_id', message_id,
                                    'function', function_name,
                                    'args', args_data,
                                    'kwargs', kwargs_data
                                )
                            else
                                -- Park task data for future execution
                                redis.call('HSET', parked_key, unpack(message))

                                -- Add to sorted set queue
                                redis.call('ZADD', queue_key, when_timestamp, task_key)

                                -- Store state and metadata in runs hash
                                redis.call('HSET', runs_key,
                                    'state', 'scheduled',
                                    'when', when_timestamp,
                                    'known', when_timestamp,
                                    'function', function_name,
                                    'args', args_data,
                                    'kwargs', kwargs_data
                                )
                            end

                            return 'OK'
                            """
                    ),
                )

                await schedule_script(
                    keys=[
                        self.docket.stream_key,
                        known_task_key,
                        self.docket.parked_task_key(key),
                        self.docket.queue_key,
                        self.docket.stream_id_key(key),
                        self._redis_key,
                        self.docket.worker_group_name,
                    ],
                    args=[
                        key,
                        str(when.timestamp()),
                        "1" if is_immediate else "0",
                        "1" if replace else "0",
                        reschedule_message or b"",
                        *[
                            item
                            for field, value in message.items()
                            for item in (field, value)
                        ],
                    ],
                )

        # Update local state based on whether task is immediate, scheduled, or being rescheduled
        if reschedule_message:
            # When rescheduling from stream, task is always parked and queued (never immediate)
            self.state = ExecutionState.SCHEDULED
            await self._publish_state(
                {"state": ExecutionState.SCHEDULED.value, "when": when.isoformat()}
            )
        elif is_immediate:
            self.state = ExecutionState.QUEUED
            await self._publish_state(
                {"state": ExecutionState.QUEUED.value, "when": when.isoformat()}
            )
        else:
            self.state = ExecutionState.SCHEDULED
            await self._publish_state(
                {"state": ExecutionState.SCHEDULED.value, "when": when.isoformat()}
            )

    async def claim(self, worker: str) -> None:
        """Atomically claim task and transition to RUNNING state.

        This consolidates worker operations when claiming a task into a single
        atomic Lua script that:
        - Sets state to RUNNING with worker name and timestamp
        - Initializes progress tracking (current=0, total=100)
        - Deletes known/stream_id fields to allow task rescheduling
        - Cleans up legacy keys for backwards compatibility

        Args:
            worker: Name of the worker claiming the task
        """
        started_at = datetime.now(timezone.utc)
        started_at_iso = started_at.isoformat()

        async with self.docket.redis() as redis:
            claim_script = redis.register_script(
                # KEYS: runs_key, progress_key, known_key, stream_id_key
                # ARGV: worker, started_at_iso
                """
                local runs_key = KEYS[1]
                local progress_key = KEYS[2]
                -- TODO: Remove in next breaking release (v0.14.0) - legacy key locations
                local known_key = KEYS[3]
                local stream_id_key = KEYS[4]

                local worker = ARGV[1]
                local started_at = ARGV[2]

                -- Update execution state to running
                redis.call('HSET', runs_key,
                    'state', 'running',
                    'worker', worker,
                    'started_at', started_at
                )

                -- Initialize progress tracking
                redis.call('HSET', progress_key,
                    'current', '0',
                    'total', '100'
                )

                -- Delete known/stream_id fields to allow task rescheduling
                redis.call('HDEL', runs_key, 'known', 'stream_id')

                -- TODO: Remove in next breaking release (v0.14.0) - legacy key cleanup
                redis.call('DEL', known_key, stream_id_key)

                return 'OK'
                """
            )

            await claim_script(
                keys=[
                    self._redis_key,  # runs_key
                    self.progress._redis_key,  # progress_key
                    f"{self.docket.name}:known:{self.key}",  # legacy known_key
                    f"{self.docket.name}:stream-id:{self.key}",  # legacy stream_id_key
                ],
                args=[worker, started_at_iso],
            )

        # Update local state
        self.state = ExecutionState.RUNNING
        self.worker = worker
        self.started_at = started_at
        self.progress.current = 0
        self.progress.total = 100

        # Publish state change event
        await self._publish_state(
            {
                "state": ExecutionState.RUNNING.value,
                "worker": worker,
                "started_at": started_at_iso,
            }
        )

    async def _mark_as_terminal(
        self,
        state: ExecutionState,
        *,
        error: str | None = None,
        result_key: str | None = None,
    ) -> None:
        """Mark task as having reached a terminal state.

        Args:
            state: The terminal state (COMPLETED, FAILED, or CANCELLED)
            error: Optional error message (for FAILED state)
            result_key: Optional key where the result/exception is stored

        Sets TTL on state data (from docket.execution_ttl), or deletes state
        immediately if execution_ttl is 0. Also deletes progress data.
        """
        completed_at = datetime.now(timezone.utc).isoformat()

        mapping: dict[str, str] = {
            "state": state.value,
            "completed_at": completed_at,
        }
        if error:
            mapping["error"] = error
        if result_key is not None:
            mapping["result_key"] = result_key

        async with self.docket.redis() as redis:
            await redis.hset(self._redis_key, mapping=mapping)
            if self.docket.execution_ttl:
                ttl_seconds = int(self.docket.execution_ttl.total_seconds())
                await redis.expire(self._redis_key, ttl_seconds)
            else:
                await redis.delete(self._redis_key)

        self.state = state
        if result_key is not None:
            self.result_key = result_key

        await self.progress.delete()

        state_data: dict[str, str] = {
            "state": state.value,
            "completed_at": completed_at,
        }
        if error:
            state_data["error"] = error
        await self._publish_state(state_data)

    async def mark_as_completed(self, result_key: str | None = None) -> None:
        """Mark task as completed successfully.

        Args:
            result_key: Optional key where the task result is stored
        """
        await self._mark_as_terminal(ExecutionState.COMPLETED, result_key=result_key)

    async def mark_as_failed(
        self, error: str | None = None, result_key: str | None = None
    ) -> None:
        """Mark task as failed.

        Args:
            error: Optional error message describing the failure
            result_key: Optional key where the exception is stored
        """
        await self._mark_as_terminal(
            ExecutionState.FAILED, error=error, result_key=result_key
        )

    async def mark_as_cancelled(self) -> None:
        """Mark task as cancelled."""
        await self._mark_as_terminal(ExecutionState.CANCELLED)

    async def get_result(
        self,
        *,
        timeout: timedelta | None = None,
        deadline: datetime | None = None,
    ) -> Any:
        """Retrieve the result of this task execution.

        If the execution is not yet complete, this method will wait using
        pub/sub for state updates until completion.

        Args:
            timeout: Optional duration to wait before giving up.
                    If None and deadline is None, waits indefinitely.
            deadline: Optional absolute datetime when to stop waiting.
                     If None and timeout is None, waits indefinitely.

        Returns:
            The result of the task execution, or None if the task returned None.

        Raises:
            ValueError: If both timeout and deadline are provided
            Exception: If the task failed, raises the stored exception
            TimeoutError: If timeout/deadline is reached before execution completes
        """
        # Validate that only one time limit is provided
        if timeout is not None and deadline is not None:
            raise ValueError("Cannot specify both timeout and deadline")

        # Convert timeout to deadline if provided
        if timeout is not None:
            deadline = datetime.now(timezone.utc) + timeout

        # Wait for execution to complete if not already done
        if self.state not in (ExecutionState.COMPLETED, ExecutionState.FAILED):
            # Calculate timeout duration if absolute deadline provided
            timeout_seconds = None
            if deadline is not None:
                timeout_seconds = (
                    deadline - datetime.now(timezone.utc)
                ).total_seconds()
                if timeout_seconds <= 0:
                    raise TimeoutError(
                        f"Timeout waiting for execution {self.key} to complete"
                    )

            try:

                async def wait_for_completion():
                    async for event in self.subscribe():  # pragma: no branch
                        if event["type"] == "state":
                            state = ExecutionState(event["state"])
                            if state in (
                                ExecutionState.COMPLETED,
                                ExecutionState.FAILED,
                            ):
                                # Sync to get latest data including result key
                                await self.sync()
                                break

                # Use asyncio.wait_for to enforce timeout
                await asyncio.wait_for(wait_for_completion(), timeout=timeout_seconds)
            except asyncio.TimeoutError:
                raise TimeoutError(
                    f"Timeout waiting for execution {self.key} to complete"
                )

        # If failed, retrieve and raise the exception
        if self.state == ExecutionState.FAILED:
            if self.result_key:
                # Retrieve serialized exception from result_storage
                result_data = await self.docket.result_storage.get(self.result_key)
                if result_data and "data" in result_data:
                    # Base64-decode and unpickle
                    pickled_exception = base64.b64decode(result_data["data"])
                    exception = cloudpickle.loads(pickled_exception)  # type: ignore[arg-type]
                    raise exception
            # If no stored exception, raise a generic error with the error message
            error_msg = self.error or "Task execution failed"
            raise Exception(error_msg)

        # If completed successfully, retrieve result if available
        if self.result_key:
            result_data = await self.docket.result_storage.get(self.result_key)
            if result_data is not None and "data" in result_data:
                # Base64-decode and unpickle
                pickled_result = base64.b64decode(result_data["data"])
                return cloudpickle.loads(pickled_result)  # type: ignore[arg-type]

        # No result stored - task returned None
        return None

    async def sync(self) -> None:
        """Synchronize instance attributes with current execution data from Redis.

        Updates self.state, execution metadata, and progress data from Redis.
        Sets attributes to None if no data exists.
        """
        async with self.docket.redis() as redis:
            data = await redis.hgetall(self._redis_key)
            if data:
                # Update state
                state_value = data.get(b"state")
                if state_value:
                    if isinstance(state_value, bytes):
                        state_value = state_value.decode()
                    self.state = ExecutionState(state_value)

                # Update metadata
                self.worker = data[b"worker"].decode() if b"worker" in data else None
                self.started_at = (
                    datetime.fromisoformat(data[b"started_at"].decode())
                    if b"started_at" in data
                    else None
                )
                self.completed_at = (
                    datetime.fromisoformat(data[b"completed_at"].decode())
                    if b"completed_at" in data
                    else None
                )
                self.error = data[b"error"].decode() if b"error" in data else None
                self.result_key = (
                    data[b"result_key"].decode() if b"result_key" in data else None
                )
            else:
                # No data exists - reset to defaults
                self.state = ExecutionState.SCHEDULED
                self.worker = None
                self.started_at = None
                self.completed_at = None
                self.error = None
                self.result_key = None

        # Sync progress data
        await self.progress.sync()

    async def _publish_state(self, data: dict) -> None:
        """Publish state change to Redis pub/sub channel.

        Args:
            data: State data to publish
        """
        channel = f"{self.docket.name}:state:{self.key}"
        # Create ephemeral Redis client for publishing
        async with self.docket.redis() as redis:
            # Build payload with all relevant state information
            payload = {
                "type": "state",
                "key": self.key,
                **data,
            }
            await redis.publish(channel, json.dumps(payload))

    async def subscribe(self) -> AsyncGenerator[StateEvent | ProgressEvent, None]:
        """Subscribe to both state and progress updates for this task.

        Emits the current state as the first event, then subscribes to real-time
        state and progress updates via Redis pub/sub.

        Yields:
            Dict containing state or progress update events with a 'type' field:
            - For state events: type="state", state, worker, timestamps, error
            - For progress events: type="progress", current, total, message, updated_at
        """
        # First, emit the current state
        await self.sync()

        # Build initial state event from current attributes
        initial_state: StateEvent = {
            "type": "state",
            "key": self.key,
            "state": self.state,
            "when": self.when.isoformat(),
            "worker": self.worker,
            "started_at": self.started_at.isoformat() if self.started_at else None,
            "completed_at": self.completed_at.isoformat()
            if self.completed_at
            else None,
            "error": self.error,
        }

        yield initial_state

        progress_event: ProgressEvent = {
            "type": "progress",
            "key": self.key,
            "current": self.progress.current,
            "total": self.progress.total,
            "message": self.progress.message,
            "updated_at": self.progress.updated_at.isoformat()
            if self.progress.updated_at
            else None,
        }

        yield progress_event

        # Then subscribe to real-time updates
        state_channel = f"{self.docket.name}:state:{self.key}"
        progress_channel = f"{self.docket.name}:progress:{self.key}"
        async with self.docket.redis() as redis:
            async with redis.pubsub() as pubsub:
                await pubsub.subscribe(state_channel, progress_channel)
                try:
                    async for message in pubsub.listen():  # pragma: no cover
                        if message["type"] == "message":
                            message_data = json.loads(message["data"])
                            if message_data["type"] == "state":
                                message_data["state"] = ExecutionState(
                                    message_data["state"]
                                )
                            yield message_data
                finally:
                    # Explicitly unsubscribe to ensure clean shutdown
                    await pubsub.unsubscribe(state_channel, progress_channel)

args property

Positional arguments for the task.

docket property

Parent docket instance.

function property

Task function to execute.

function_name property

Name of the task function (from message, may differ from function.name for fallback tasks).

key property

Unique task identifier.

kwargs property

Keyword arguments for the task.

redelivered property

Whether this message was redelivered.

trace_context property

OpenTelemetry trace context.

claim(worker) async

Atomically claim task and transition to RUNNING state.

This consolidates worker operations when claiming a task into a single atomic Lua script that: - Sets state to RUNNING with worker name and timestamp - Initializes progress tracking (current=0, total=100) - Deletes known/stream_id fields to allow task rescheduling - Cleans up legacy keys for backwards compatibility

Parameters:

Name Type Description Default
worker str

Name of the worker claiming the task

required
Source code in src/docket/execution.py
async def claim(self, worker: str) -> None:
    """Atomically claim task and transition to RUNNING state.

    This consolidates worker operations when claiming a task into a single
    atomic Lua script that:
    - Sets state to RUNNING with worker name and timestamp
    - Initializes progress tracking (current=0, total=100)
    - Deletes known/stream_id fields to allow task rescheduling
    - Cleans up legacy keys for backwards compatibility

    Args:
        worker: Name of the worker claiming the task
    """
    started_at = datetime.now(timezone.utc)
    started_at_iso = started_at.isoformat()

    async with self.docket.redis() as redis:
        claim_script = redis.register_script(
            # KEYS: runs_key, progress_key, known_key, stream_id_key
            # ARGV: worker, started_at_iso
            """
            local runs_key = KEYS[1]
            local progress_key = KEYS[2]
            -- TODO: Remove in next breaking release (v0.14.0) - legacy key locations
            local known_key = KEYS[3]
            local stream_id_key = KEYS[4]

            local worker = ARGV[1]
            local started_at = ARGV[2]

            -- Update execution state to running
            redis.call('HSET', runs_key,
                'state', 'running',
                'worker', worker,
                'started_at', started_at
            )

            -- Initialize progress tracking
            redis.call('HSET', progress_key,
                'current', '0',
                'total', '100'
            )

            -- Delete known/stream_id fields to allow task rescheduling
            redis.call('HDEL', runs_key, 'known', 'stream_id')

            -- TODO: Remove in next breaking release (v0.14.0) - legacy key cleanup
            redis.call('DEL', known_key, stream_id_key)

            return 'OK'
            """
        )

        await claim_script(
            keys=[
                self._redis_key,  # runs_key
                self.progress._redis_key,  # progress_key
                f"{self.docket.name}:known:{self.key}",  # legacy known_key
                f"{self.docket.name}:stream-id:{self.key}",  # legacy stream_id_key
            ],
            args=[worker, started_at_iso],
        )

    # Update local state
    self.state = ExecutionState.RUNNING
    self.worker = worker
    self.started_at = started_at
    self.progress.current = 0
    self.progress.total = 100

    # Publish state change event
    await self._publish_state(
        {
            "state": ExecutionState.RUNNING.value,
            "worker": worker,
            "started_at": started_at_iso,
        }
    )

get_result(*, timeout=None, deadline=None) async

Retrieve the result of this task execution.

If the execution is not yet complete, this method will wait using pub/sub for state updates until completion.

Parameters:

Name Type Description Default
timeout timedelta | None

Optional duration to wait before giving up. If None and deadline is None, waits indefinitely.

None
deadline datetime | None

Optional absolute datetime when to stop waiting. If None and timeout is None, waits indefinitely.

None

Returns:

Type Description
Any

The result of the task execution, or None if the task returned None.

Raises:

Type Description
ValueError

If both timeout and deadline are provided

Exception

If the task failed, raises the stored exception

TimeoutError

If timeout/deadline is reached before execution completes

Source code in src/docket/execution.py
async def get_result(
    self,
    *,
    timeout: timedelta | None = None,
    deadline: datetime | None = None,
) -> Any:
    """Retrieve the result of this task execution.

    If the execution is not yet complete, this method will wait using
    pub/sub for state updates until completion.

    Args:
        timeout: Optional duration to wait before giving up.
                If None and deadline is None, waits indefinitely.
        deadline: Optional absolute datetime when to stop waiting.
                 If None and timeout is None, waits indefinitely.

    Returns:
        The result of the task execution, or None if the task returned None.

    Raises:
        ValueError: If both timeout and deadline are provided
        Exception: If the task failed, raises the stored exception
        TimeoutError: If timeout/deadline is reached before execution completes
    """
    # Validate that only one time limit is provided
    if timeout is not None and deadline is not None:
        raise ValueError("Cannot specify both timeout and deadline")

    # Convert timeout to deadline if provided
    if timeout is not None:
        deadline = datetime.now(timezone.utc) + timeout

    # Wait for execution to complete if not already done
    if self.state not in (ExecutionState.COMPLETED, ExecutionState.FAILED):
        # Calculate timeout duration if absolute deadline provided
        timeout_seconds = None
        if deadline is not None:
            timeout_seconds = (
                deadline - datetime.now(timezone.utc)
            ).total_seconds()
            if timeout_seconds <= 0:
                raise TimeoutError(
                    f"Timeout waiting for execution {self.key} to complete"
                )

        try:

            async def wait_for_completion():
                async for event in self.subscribe():  # pragma: no branch
                    if event["type"] == "state":
                        state = ExecutionState(event["state"])
                        if state in (
                            ExecutionState.COMPLETED,
                            ExecutionState.FAILED,
                        ):
                            # Sync to get latest data including result key
                            await self.sync()
                            break

            # Use asyncio.wait_for to enforce timeout
            await asyncio.wait_for(wait_for_completion(), timeout=timeout_seconds)
        except asyncio.TimeoutError:
            raise TimeoutError(
                f"Timeout waiting for execution {self.key} to complete"
            )

    # If failed, retrieve and raise the exception
    if self.state == ExecutionState.FAILED:
        if self.result_key:
            # Retrieve serialized exception from result_storage
            result_data = await self.docket.result_storage.get(self.result_key)
            if result_data and "data" in result_data:
                # Base64-decode and unpickle
                pickled_exception = base64.b64decode(result_data["data"])
                exception = cloudpickle.loads(pickled_exception)  # type: ignore[arg-type]
                raise exception
        # If no stored exception, raise a generic error with the error message
        error_msg = self.error or "Task execution failed"
        raise Exception(error_msg)

    # If completed successfully, retrieve result if available
    if self.result_key:
        result_data = await self.docket.result_storage.get(self.result_key)
        if result_data is not None and "data" in result_data:
            # Base64-decode and unpickle
            pickled_result = base64.b64decode(result_data["data"])
            return cloudpickle.loads(pickled_result)  # type: ignore[arg-type]

    # No result stored - task returned None
    return None

mark_as_cancelled() async

Mark task as cancelled.

Source code in src/docket/execution.py
async def mark_as_cancelled(self) -> None:
    """Mark task as cancelled."""
    await self._mark_as_terminal(ExecutionState.CANCELLED)

mark_as_completed(result_key=None) async

Mark task as completed successfully.

Parameters:

Name Type Description Default
result_key str | None

Optional key where the task result is stored

None
Source code in src/docket/execution.py
async def mark_as_completed(self, result_key: str | None = None) -> None:
    """Mark task as completed successfully.

    Args:
        result_key: Optional key where the task result is stored
    """
    await self._mark_as_terminal(ExecutionState.COMPLETED, result_key=result_key)

mark_as_failed(error=None, result_key=None) async

Mark task as failed.

Parameters:

Name Type Description Default
error str | None

Optional error message describing the failure

None
result_key str | None

Optional key where the exception is stored

None
Source code in src/docket/execution.py
async def mark_as_failed(
    self, error: str | None = None, result_key: str | None = None
) -> None:
    """Mark task as failed.

    Args:
        error: Optional error message describing the failure
        result_key: Optional key where the exception is stored
    """
    await self._mark_as_terminal(
        ExecutionState.FAILED, error=error, result_key=result_key
    )

schedule(replace=False, reschedule_message=None) async

Schedule this task atomically in Redis.

This performs an atomic operation that: - Adds the task to the stream (immediate) or queue (future) - Writes the execution state record - Tracks metadata for later cancellation

Usage patterns: - Normal add: schedule(replace=False) - Replace existing: schedule(replace=True) - Reschedule from stream: schedule(reschedule_message=message_id) This atomically acknowledges and deletes the stream message, then reschedules the task to the queue. Prevents both task loss and duplicate execution when rescheduling tasks (e.g., due to concurrency limits).

Parameters:

Name Type Description Default
replace bool

If True, replaces any existing task with the same key. If False, raises an error if the task already exists.

False
reschedule_message RedisMessageID | None

If provided, atomically acknowledges and deletes this stream message ID before rescheduling the task to the queue. Used when a task needs to be rescheduled from an active stream message.

None
Source code in src/docket/execution.py
async def schedule(
    self, replace: bool = False, reschedule_message: "RedisMessageID | None" = None
) -> None:
    """Schedule this task atomically in Redis.

    This performs an atomic operation that:
    - Adds the task to the stream (immediate) or queue (future)
    - Writes the execution state record
    - Tracks metadata for later cancellation

    Usage patterns:
    - Normal add: schedule(replace=False)
    - Replace existing: schedule(replace=True)
    - Reschedule from stream: schedule(reschedule_message=message_id)
      This atomically acknowledges and deletes the stream message, then
      reschedules the task to the queue. Prevents both task loss and
      duplicate execution when rescheduling tasks (e.g., due to concurrency limits).

    Args:
        replace: If True, replaces any existing task with the same key.
                If False, raises an error if the task already exists.
        reschedule_message: If provided, atomically acknowledges and deletes
                this stream message ID before rescheduling the task to the queue.
                Used when a task needs to be rescheduled from an active stream message.
    """
    message: dict[bytes, bytes] = self.as_message()
    propagate.inject(message, setter=message_setter)

    key = self.key
    when = self.when
    known_task_key = self.docket.known_task_key(key)
    is_immediate = when <= datetime.now(timezone.utc)

    async with self.docket.redis() as redis:
        # Lock per task key to prevent race conditions between concurrent operations
        async with redis.lock(f"{known_task_key}:lock", timeout=10):
            # Register script for this connection (not cached to avoid event loop issues)
            schedule_script = cast(
                _schedule_task,
                redis.register_script(
                    # KEYS: stream_key, known_key, parked_key, queue_key, stream_id_key, runs_key, worker_group_key
                    # ARGV: task_key, when_timestamp, is_immediate, replace, reschedule_message_id, ...message_fields
                    """
                        local stream_key = KEYS[1]
                        -- TODO: Remove in next breaking release (v0.14.0) - legacy key locations
                        local known_key = KEYS[2]
                        local parked_key = KEYS[3]
                        local queue_key = KEYS[4]
                        local stream_id_key = KEYS[5]
                        local runs_key = KEYS[6]
                        local worker_group_name = KEYS[7]

                        local task_key = ARGV[1]
                        local when_timestamp = ARGV[2]
                        local is_immediate = ARGV[3] == '1'
                        local replace = ARGV[4] == '1'
                        local reschedule_message_id = ARGV[5]

                        -- Extract message fields from ARGV[6] onwards
                        local message = {}
                        local function_name = nil
                        local args_data = nil
                        local kwargs_data = nil

                        for i = 6, #ARGV, 2 do
                            local field_name = ARGV[i]
                            local field_value = ARGV[i + 1]
                            message[#message + 1] = field_name
                            message[#message + 1] = field_value

                            -- Extract task data fields for runs hash
                            if field_name == 'function' then
                                function_name = field_value
                            elseif field_name == 'args' then
                                args_data = field_value
                            elseif field_name == 'kwargs' then
                                kwargs_data = field_value
                            end
                        end

                        -- Handle rescheduling from stream: atomically ACK message and reschedule to queue
                        -- This prevents both task loss (ACK before reschedule) and duplicate execution
                        -- (reschedule before ACK with slow reschedule causing redelivery)
                        if reschedule_message_id ~= '' then
                            -- Acknowledge and delete the message from the stream
                            redis.call('XACK', stream_key, worker_group_name, reschedule_message_id)
                            redis.call('XDEL', stream_key, reschedule_message_id)

                            -- Park task data for future execution
                            redis.call('HSET', parked_key, unpack(message))

                            -- Add to sorted set queue
                            redis.call('ZADD', queue_key, when_timestamp, task_key)

                            -- Update state in runs hash (clear stream_id since task is no longer in stream)
                            redis.call('HSET', runs_key,
                                'state', 'scheduled',
                                'when', when_timestamp,
                                'function', function_name,
                                'args', args_data,
                                'kwargs', kwargs_data
                            )
                            redis.call('HDEL', runs_key, 'stream_id')

                            return 'OK'
                        end

                        -- Handle replacement: cancel existing task if needed
                        if replace then
                            -- Get stream ID from runs hash (check new location first)
                            local existing_message_id = redis.call('HGET', runs_key, 'stream_id')

                            -- TODO: Remove in next breaking release (v0.14.0) - check legacy location
                            if not existing_message_id then
                                existing_message_id = redis.call('GET', stream_id_key)
                            end

                            if existing_message_id then
                                redis.call('XDEL', stream_key, existing_message_id)
                            end

                            redis.call('ZREM', queue_key, task_key)
                            redis.call('DEL', parked_key)

                            -- TODO: Remove in next breaking release (v0.14.0) - clean up legacy keys
                            redis.call('DEL', known_key, stream_id_key)

                            -- Note: runs_key is updated below, not deleted
                        else
                            -- Check if task already exists (check new location first, then legacy)
                            local known_exists = redis.call('HEXISTS', runs_key, 'known') == 1
                            if not known_exists then
                                -- Check if task is currently running (known field deleted at claim time)
                                local state = redis.call('HGET', runs_key, 'state')
                                if state == 'running' then
                                    return 'EXISTS'
                                end
                                -- TODO: Remove in next breaking release (v0.14.0) - check legacy location
                                known_exists = redis.call('EXISTS', known_key) == 1
                            end
                            if known_exists then
                                return 'EXISTS'
                            end
                        end

                        if is_immediate then
                            -- Add to stream for immediate execution
                            local message_id = redis.call('XADD', stream_key, '*', unpack(message))

                            -- Store state and metadata in runs hash
                            redis.call('HSET', runs_key,
                                'state', 'queued',
                                'when', when_timestamp,
                                'known', when_timestamp,
                                'stream_id', message_id,
                                'function', function_name,
                                'args', args_data,
                                'kwargs', kwargs_data
                            )
                        else
                            -- Park task data for future execution
                            redis.call('HSET', parked_key, unpack(message))

                            -- Add to sorted set queue
                            redis.call('ZADD', queue_key, when_timestamp, task_key)

                            -- Store state and metadata in runs hash
                            redis.call('HSET', runs_key,
                                'state', 'scheduled',
                                'when', when_timestamp,
                                'known', when_timestamp,
                                'function', function_name,
                                'args', args_data,
                                'kwargs', kwargs_data
                            )
                        end

                        return 'OK'
                        """
                ),
            )

            await schedule_script(
                keys=[
                    self.docket.stream_key,
                    known_task_key,
                    self.docket.parked_task_key(key),
                    self.docket.queue_key,
                    self.docket.stream_id_key(key),
                    self._redis_key,
                    self.docket.worker_group_name,
                ],
                args=[
                    key,
                    str(when.timestamp()),
                    "1" if is_immediate else "0",
                    "1" if replace else "0",
                    reschedule_message or b"",
                    *[
                        item
                        for field, value in message.items()
                        for item in (field, value)
                    ],
                ],
            )

    # Update local state based on whether task is immediate, scheduled, or being rescheduled
    if reschedule_message:
        # When rescheduling from stream, task is always parked and queued (never immediate)
        self.state = ExecutionState.SCHEDULED
        await self._publish_state(
            {"state": ExecutionState.SCHEDULED.value, "when": when.isoformat()}
        )
    elif is_immediate:
        self.state = ExecutionState.QUEUED
        await self._publish_state(
            {"state": ExecutionState.QUEUED.value, "when": when.isoformat()}
        )
    else:
        self.state = ExecutionState.SCHEDULED
        await self._publish_state(
            {"state": ExecutionState.SCHEDULED.value, "when": when.isoformat()}
        )

subscribe() async

Subscribe to both state and progress updates for this task.

Emits the current state as the first event, then subscribes to real-time state and progress updates via Redis pub/sub.

Yields:

Type Description
AsyncGenerator[StateEvent | ProgressEvent, None]

Dict containing state or progress update events with a 'type' field:

AsyncGenerator[StateEvent | ProgressEvent, None]
  • For state events: type="state", state, worker, timestamps, error
AsyncGenerator[StateEvent | ProgressEvent, None]
  • For progress events: type="progress", current, total, message, updated_at
Source code in src/docket/execution.py
async def subscribe(self) -> AsyncGenerator[StateEvent | ProgressEvent, None]:
    """Subscribe to both state and progress updates for this task.

    Emits the current state as the first event, then subscribes to real-time
    state and progress updates via Redis pub/sub.

    Yields:
        Dict containing state or progress update events with a 'type' field:
        - For state events: type="state", state, worker, timestamps, error
        - For progress events: type="progress", current, total, message, updated_at
    """
    # First, emit the current state
    await self.sync()

    # Build initial state event from current attributes
    initial_state: StateEvent = {
        "type": "state",
        "key": self.key,
        "state": self.state,
        "when": self.when.isoformat(),
        "worker": self.worker,
        "started_at": self.started_at.isoformat() if self.started_at else None,
        "completed_at": self.completed_at.isoformat()
        if self.completed_at
        else None,
        "error": self.error,
    }

    yield initial_state

    progress_event: ProgressEvent = {
        "type": "progress",
        "key": self.key,
        "current": self.progress.current,
        "total": self.progress.total,
        "message": self.progress.message,
        "updated_at": self.progress.updated_at.isoformat()
        if self.progress.updated_at
        else None,
    }

    yield progress_event

    # Then subscribe to real-time updates
    state_channel = f"{self.docket.name}:state:{self.key}"
    progress_channel = f"{self.docket.name}:progress:{self.key}"
    async with self.docket.redis() as redis:
        async with redis.pubsub() as pubsub:
            await pubsub.subscribe(state_channel, progress_channel)
            try:
                async for message in pubsub.listen():  # pragma: no cover
                    if message["type"] == "message":
                        message_data = json.loads(message["data"])
                        if message_data["type"] == "state":
                            message_data["state"] = ExecutionState(
                                message_data["state"]
                            )
                        yield message_data
            finally:
                # Explicitly unsubscribe to ensure clean shutdown
                await pubsub.unsubscribe(state_channel, progress_channel)

sync() async

Synchronize instance attributes with current execution data from Redis.

Updates self.state, execution metadata, and progress data from Redis. Sets attributes to None if no data exists.

Source code in src/docket/execution.py
async def sync(self) -> None:
    """Synchronize instance attributes with current execution data from Redis.

    Updates self.state, execution metadata, and progress data from Redis.
    Sets attributes to None if no data exists.
    """
    async with self.docket.redis() as redis:
        data = await redis.hgetall(self._redis_key)
        if data:
            # Update state
            state_value = data.get(b"state")
            if state_value:
                if isinstance(state_value, bytes):
                    state_value = state_value.decode()
                self.state = ExecutionState(state_value)

            # Update metadata
            self.worker = data[b"worker"].decode() if b"worker" in data else None
            self.started_at = (
                datetime.fromisoformat(data[b"started_at"].decode())
                if b"started_at" in data
                else None
            )
            self.completed_at = (
                datetime.fromisoformat(data[b"completed_at"].decode())
                if b"completed_at" in data
                else None
            )
            self.error = data[b"error"].decode() if b"error" in data else None
            self.result_key = (
                data[b"result_key"].decode() if b"result_key" in data else None
            )
        else:
            # No data exists - reset to defaults
            self.state = ExecutionState.SCHEDULED
            self.worker = None
            self.started_at = None
            self.completed_at = None
            self.error = None
            self.result_key = None

    # Sync progress data
    await self.progress.sync()

ExecutionState

Bases: Enum

Lifecycle states for task execution.

Source code in src/docket/execution.py
class ExecutionState(enum.Enum):
    """Lifecycle states for task execution."""

    SCHEDULED = "scheduled"
    """Task is scheduled and waiting in the queue for its execution time."""

    QUEUED = "queued"
    """Task has been moved to the stream and is ready to be claimed by a worker."""

    RUNNING = "running"
    """Task is currently being executed by a worker."""

    COMPLETED = "completed"
    """Task execution finished successfully."""

    FAILED = "failed"
    """Task execution failed."""

    CANCELLED = "cancelled"
    """Task was explicitly cancelled before completion."""

CANCELLED = 'cancelled' class-attribute instance-attribute

Task was explicitly cancelled before completion.

COMPLETED = 'completed' class-attribute instance-attribute

Task execution finished successfully.

FAILED = 'failed' class-attribute instance-attribute

Task execution failed.

QUEUED = 'queued' class-attribute instance-attribute

Task has been moved to the stream and is ready to be claimed by a worker.

RUNNING = 'running' class-attribute instance-attribute

Task is currently being executed by a worker.

SCHEDULED = 'scheduled' class-attribute instance-attribute

Task is scheduled and waiting in the queue for its execution time.

ExponentialRetry

Bases: Retry

Configures exponential retries for a task. You can specify the total number of attempts (or None to retry indefinitely), and the minimum and maximum delays between attempts.

Example:

@task
async def my_task(retry: ExponentialRetry = ExponentialRetry(attempts=3)) -> None:
    ...
Source code in src/docket/dependencies.py
class ExponentialRetry(Retry):
    """Configures exponential retries for a task.  You can specify the total number
    of attempts (or `None` to retry indefinitely), and the minimum and maximum delays
    between attempts.

    Example:

    ```python
    @task
    async def my_task(retry: ExponentialRetry = ExponentialRetry(attempts=3)) -> None:
        ...
    ```
    """

    def __init__(
        self,
        attempts: int | None = 1,
        minimum_delay: timedelta = timedelta(seconds=1),
        maximum_delay: timedelta = timedelta(seconds=64),
    ) -> None:
        """
        Args:
            attempts: The total number of attempts to make.  If `None`, the task will
                be retried indefinitely.
            minimum_delay: The minimum delay between attempts.
            maximum_delay: The maximum delay between attempts.
        """
        super().__init__(attempts=attempts, delay=minimum_delay)
        self.maximum_delay = maximum_delay

    async def __aenter__(self) -> "ExponentialRetry":
        execution = self.execution.get()

        retry = ExponentialRetry(
            attempts=self.attempts,
            minimum_delay=self.delay,
            maximum_delay=self.maximum_delay,
        )
        retry.attempt = execution.attempt

        if execution.attempt > 1:
            backoff_factor = 2 ** (execution.attempt - 1)
            calculated_delay = self.delay * backoff_factor

            if calculated_delay > self.maximum_delay:
                retry.delay = self.maximum_delay
            else:
                retry.delay = calculated_delay

        return retry

__init__(attempts=1, minimum_delay=timedelta(seconds=1), maximum_delay=timedelta(seconds=64))

Parameters:

Name Type Description Default
attempts int | None

The total number of attempts to make. If None, the task will be retried indefinitely.

1
minimum_delay timedelta

The minimum delay between attempts.

timedelta(seconds=1)
maximum_delay timedelta

The maximum delay between attempts.

timedelta(seconds=64)
Source code in src/docket/dependencies.py
def __init__(
    self,
    attempts: int | None = 1,
    minimum_delay: timedelta = timedelta(seconds=1),
    maximum_delay: timedelta = timedelta(seconds=64),
) -> None:
    """
    Args:
        attempts: The total number of attempts to make.  If `None`, the task will
            be retried indefinitely.
        minimum_delay: The minimum delay between attempts.
        maximum_delay: The maximum delay between attempts.
    """
    super().__init__(attempts=attempts, delay=minimum_delay)
    self.maximum_delay = maximum_delay

Logged

Bases: Annotation

Instructs docket to include arguments to this parameter in the log.

If length_only is True, only the length of the argument will be included in the log.

Example:

@task
def setup_new_customer(
    customer_id: Annotated[int, Logged],
    addresses: Annotated[list[Address], Logged(length_only=True)],
    password: str,
) -> None:
    ...

In the logs, you's see the task referenced as:

setup_new_customer(customer_id=123, addresses[len 2], password=...)
Source code in src/docket/annotations.py
class Logged(Annotation):
    """Instructs docket to include arguments to this parameter in the log.

    If `length_only` is `True`, only the length of the argument will be included in
    the log.

    Example:

    ```python
    @task
    def setup_new_customer(
        customer_id: Annotated[int, Logged],
        addresses: Annotated[list[Address], Logged(length_only=True)],
        password: str,
    ) -> None:
        ...
    ```

    In the logs, you's see the task referenced as:

    ```
    setup_new_customer(customer_id=123, addresses[len 2], password=...)
    ```
    """

    length_only: bool = False

    def __init__(self, length_only: bool = False) -> None:
        self.length_only = length_only

    def format(self, argument: Any) -> str:
        if self.length_only:
            if isinstance(argument, (dict, set)):
                return f"{{len {len(argument)}}}"
            elif isinstance(argument, tuple):
                return f"(len {len(argument)})"
            elif hasattr(argument, "__len__"):
                return f"[len {len(argument)}]"

        return repr(argument)

Perpetual

Bases: Dependency

Declare a task that should be run perpetually. Perpetual tasks are automatically rescheduled for the future after they finish (whether they succeed or fail). A perpetual task can be scheduled at worker startup with the automatic=True.

Example:

@task
async def my_task(perpetual: Perpetual = Perpetual()) -> None:
    ...
Source code in src/docket/dependencies.py
class Perpetual(Dependency):
    """Declare a task that should be run perpetually.  Perpetual tasks are automatically
    rescheduled for the future after they finish (whether they succeed or fail).  A
    perpetual task can be scheduled at worker startup with the `automatic=True`.

    Example:

    ```python
    @task
    async def my_task(perpetual: Perpetual = Perpetual()) -> None:
        ...
    ```
    """

    single = True

    every: timedelta
    automatic: bool

    args: tuple[Any, ...]
    kwargs: dict[str, Any]

    cancelled: bool

    def __init__(
        self,
        every: timedelta = timedelta(0),
        automatic: bool = False,
    ) -> None:
        """
        Args:
            every: The target interval between task executions.
            automatic: If set, this task will be automatically scheduled during worker
                startup and continually through the worker's lifespan.  This ensures
                that the task will always be scheduled despite crashes and other
                adverse conditions.  Automatic tasks must not require any arguments.
        """
        self.every = every
        self.automatic = automatic
        self.cancelled = False

    async def __aenter__(self) -> "Perpetual":
        execution = self.execution.get()
        perpetual = Perpetual(every=self.every)
        perpetual.args = execution.args
        perpetual.kwargs = execution.kwargs
        return perpetual

    def cancel(self) -> None:
        self.cancelled = True

    def perpetuate(self, *args: Any, **kwargs: Any) -> None:
        self.args = args
        self.kwargs = kwargs

__init__(every=timedelta(0), automatic=False)

Parameters:

Name Type Description Default
every timedelta

The target interval between task executions.

timedelta(0)
automatic bool

If set, this task will be automatically scheduled during worker startup and continually through the worker's lifespan. This ensures that the task will always be scheduled despite crashes and other adverse conditions. Automatic tasks must not require any arguments.

False
Source code in src/docket/dependencies.py
def __init__(
    self,
    every: timedelta = timedelta(0),
    automatic: bool = False,
) -> None:
    """
    Args:
        every: The target interval between task executions.
        automatic: If set, this task will be automatically scheduled during worker
            startup and continually through the worker's lifespan.  This ensures
            that the task will always be scheduled despite crashes and other
            adverse conditions.  Automatic tasks must not require any arguments.
    """
    self.every = every
    self.automatic = automatic
    self.cancelled = False

Progress

Bases: Dependency

A dependency to report progress updates for the currently executing task.

Tasks can use this to report their current progress (current/total values) and status messages to external observers.

Example:

@task
async def process_records(records: list, progress: Progress = Progress()) -> None:
    await progress.set_total(len(records))
    for i, record in enumerate(records):
        await process(record)
        await progress.increment()
        await progress.set_message(f"Processed {record.id}")
Source code in src/docket/dependencies.py
class Progress(Dependency):
    """A dependency to report progress updates for the currently executing task.

    Tasks can use this to report their current progress (current/total values) and
    status messages to external observers.

    Example:

    ```python
    @task
    async def process_records(records: list, progress: Progress = Progress()) -> None:
        await progress.set_total(len(records))
        for i, record in enumerate(records):
            await process(record)
            await progress.increment()
            await progress.set_message(f"Processed {record.id}")
    ```
    """

    def __init__(self) -> None:
        self._progress: ExecutionProgress | None = None

    async def __aenter__(self) -> "Progress":
        execution = self.execution.get()
        self._progress = execution.progress
        return self

    @property
    def current(self) -> int | None:
        """Current progress value."""
        assert self._progress is not None, "Progress must be used as a dependency"
        return self._progress.current

    @property
    def total(self) -> int:
        """Total/target value for progress tracking."""
        assert self._progress is not None, "Progress must be used as a dependency"
        return self._progress.total

    @property
    def message(self) -> str | None:
        """User-provided status message."""
        assert self._progress is not None, "Progress must be used as a dependency"
        return self._progress.message

    async def set_total(self, total: int) -> None:
        """Set the total/target value for progress tracking."""
        assert self._progress is not None, "Progress must be used as a dependency"
        await self._progress.set_total(total)

    async def increment(self, amount: int = 1) -> None:
        """Atomically increment the current progress value."""
        assert self._progress is not None, "Progress must be used as a dependency"
        await self._progress.increment(amount)

    async def set_message(self, message: str | None) -> None:
        """Update the progress status message."""
        assert self._progress is not None, "Progress must be used as a dependency"
        await self._progress.set_message(message)

current property

Current progress value.

message property

User-provided status message.

total property

Total/target value for progress tracking.

increment(amount=1) async

Atomically increment the current progress value.

Source code in src/docket/dependencies.py
async def increment(self, amount: int = 1) -> None:
    """Atomically increment the current progress value."""
    assert self._progress is not None, "Progress must be used as a dependency"
    await self._progress.increment(amount)

set_message(message) async

Update the progress status message.

Source code in src/docket/dependencies.py
async def set_message(self, message: str | None) -> None:
    """Update the progress status message."""
    assert self._progress is not None, "Progress must be used as a dependency"
    await self._progress.set_message(message)

set_total(total) async

Set the total/target value for progress tracking.

Source code in src/docket/dependencies.py
async def set_total(self, total: int) -> None:
    """Set the total/target value for progress tracking."""
    assert self._progress is not None, "Progress must be used as a dependency"
    await self._progress.set_total(total)

Retry

Bases: Dependency

Configures linear retries for a task. You can specify the total number of attempts (or None to retry indefinitely), and the delay between attempts.

Example:

@task
async def my_task(retry: Retry = Retry(attempts=3)) -> None:
    ...
Source code in src/docket/dependencies.py
class Retry(Dependency):
    """Configures linear retries for a task.  You can specify the total number of
    attempts (or `None` to retry indefinitely), and the delay between attempts.

    Example:

    ```python
    @task
    async def my_task(retry: Retry = Retry(attempts=3)) -> None:
        ...
    ```
    """

    single: bool = True

    def __init__(
        self, attempts: int | None = 1, delay: timedelta = timedelta(0)
    ) -> None:
        """
        Args:
            attempts: The total number of attempts to make.  If `None`, the task will
                be retried indefinitely.
            delay: The delay between attempts.
        """
        self.attempts = attempts
        self.delay = delay
        self.attempt = 1

    async def __aenter__(self) -> "Retry":
        execution = self.execution.get()
        retry = Retry(attempts=self.attempts, delay=self.delay)
        retry.attempt = execution.attempt
        return retry

    def at(self, when: datetime) -> NoReturn:
        now = datetime.now(timezone.utc)
        diff = when - now
        diff = diff if diff.total_seconds() >= 0 else timedelta(0)

        self.in_(diff)

    def in_(self, when: timedelta) -> NoReturn:
        self.delay: timedelta = when
        raise ForcedRetry()

__init__(attempts=1, delay=timedelta(0))

Parameters:

Name Type Description Default
attempts int | None

The total number of attempts to make. If None, the task will be retried indefinitely.

1
delay timedelta

The delay between attempts.

timedelta(0)
Source code in src/docket/dependencies.py
def __init__(
    self, attempts: int | None = 1, delay: timedelta = timedelta(0)
) -> None:
    """
    Args:
        attempts: The total number of attempts to make.  If `None`, the task will
            be retried indefinitely.
        delay: The delay between attempts.
    """
    self.attempts = attempts
    self.delay = delay
    self.attempt = 1

StrikeList

A strike list that manages conditions for blocking task execution.

When a URL is provided, the strike list will connect to Redis and monitor a stream for strike/restore instructions. External processes (like Docket) can issue strikes, and all StrikeList instances listening to the same stream will receive and apply those updates.

Example using context manager with Redis

async with StrikeList(url="redis://localhost:6379/0", name="my-docket") as strikes: # External process issues: await docket.strike("my_task", "customer_id", "==", "blocked")

if strikes.is_stricken({"customer_id": "blocked"}):
    print("Customer is blocked")

Example with Docket (managed internally): async with Docket(name="my-docket", url="redis://localhost:6379/0") as docket: # Docket manages its own StrikeList internally await docket.strike(None, "customer_id", "==", "blocked")

Example using explicit connect/close: strikes = StrikeList(url="redis://localhost:6379/0", name="my-docket") await strikes.connect() try: if strikes.is_stricken({"customer_id": "blocked"}): print("Customer is blocked") finally: await strikes.close()

Example without Redis (local-only): strikes = StrikeList() # No URL = no Redis connection strikes.update(Strike(None, "customer_id", Operator.EQUAL, "blocked")) if strikes.is_stricken({"customer_id": "blocked"}): print("Customer is blocked")

Source code in src/docket/strikelist.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
class StrikeList:
    """A strike list that manages conditions for blocking task execution.

    When a URL is provided, the strike list will connect to Redis and monitor
    a stream for strike/restore instructions. External processes (like Docket)
    can issue strikes, and all StrikeList instances listening to the same
    stream will receive and apply those updates.

    Example using context manager with Redis:
        async with StrikeList(url="redis://localhost:6379/0", name="my-docket") as strikes:
            # External process issues: await docket.strike("my_task", "customer_id", "==", "blocked")

            if strikes.is_stricken({"customer_id": "blocked"}):
                print("Customer is blocked")

    Example with Docket (managed internally):
        async with Docket(name="my-docket", url="redis://localhost:6379/0") as docket:
            # Docket manages its own StrikeList internally
            await docket.strike(None, "customer_id", "==", "blocked")

    Example using explicit connect/close:
        strikes = StrikeList(url="redis://localhost:6379/0", name="my-docket")
        await strikes.connect()
        try:
            if strikes.is_stricken({"customer_id": "blocked"}):
                print("Customer is blocked")
        finally:
            await strikes.close()

    Example without Redis (local-only):
        strikes = StrikeList()  # No URL = no Redis connection
        strikes.update(Strike(None, "customer_id", Operator.EQUAL, "blocked"))
        if strikes.is_stricken({"customer_id": "blocked"}):
            print("Customer is blocked")
    """

    task_strikes: TaskStrikes
    parameter_strikes: ParameterStrikes
    _conditions: list[Callable[["Execution"], bool]]
    _connection_pool: ConnectionPool | None
    _monitor_task: asyncio.Task[NoReturn] | None
    _strikes_loaded: asyncio.Event | None

    def __init__(
        self,
        url: str | None = None,
        name: str = "strikelist",
        enable_internal_instrumentation: bool = False,
    ) -> None:
        """Initialize a StrikeList.

        Args:
            url: Redis connection URL. Use "memory://" for in-memory testing.
                 If None, no Redis connection is made (local-only mode).
            name: Name used as prefix for Redis keys (should match the Docket name
                  if you want to receive strikes from that Docket).
            enable_internal_instrumentation: If True, allows OpenTelemetry spans
                for internal Redis operations. Default False suppresses these spans.
        """
        self.url = url
        self.name = name
        self.enable_internal_instrumentation = enable_internal_instrumentation
        self.task_strikes = {}
        self.parameter_strikes = {}
        self._conditions = [self._matches_task_or_parameter_strike]
        self._connection_pool = None
        self._monitor_task = None
        self._strikes_loaded = None

    @property
    def strike_key(self) -> str:
        """Redis stream key for strike instructions."""
        return f"{self.name}:strikes"

    @contextmanager
    def _maybe_suppress_instrumentation(self) -> Generator[None, None, None]:
        """Suppress OTel auto-instrumentation for internal Redis operations."""
        if not self.enable_internal_instrumentation:
            with suppress_instrumentation():
                yield
        else:
            yield

    async def connect(self) -> None:
        """Connect to Redis and start monitoring for strike updates.

        If no URL was provided during initialization, this is a no-op.
        This method sets up the Redis connection pool and starts a background
        task that monitors the strike stream for updates.
        """
        if self.url is None:
            return  # No Redis connection needed

        if self._connection_pool is not None:
            return  # Already connected

        self._connection_pool = await connection_pool_from_url(self.url)

        self._strikes_loaded = asyncio.Event()
        self._monitor_task = asyncio.create_task(self._monitor_strikes())

    async def close(self) -> None:
        """Close the Redis connection and stop monitoring.

        This method cancels the background monitoring task and disconnects
        from Redis. It is safe to call multiple times.
        """
        if self._monitor_task is not None:
            self._monitor_task.cancel()
            try:
                await self._monitor_task
            except asyncio.CancelledError:
                pass
            self._monitor_task = None

        self._strikes_loaded = None

        if self._connection_pool is not None:
            await asyncio.shield(self._connection_pool.disconnect())
            self._connection_pool = None

    async def __aenter__(self) -> Self:
        """Async context manager entry - connects to Redis if URL provided."""
        await self.connect()
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        """Async context manager exit - closes Redis connection."""
        await self.close()

    def add_condition(self, condition: Callable[["Execution"], bool]) -> None:
        """Adds a temporary condition that indicates an execution is stricken."""
        self._conditions.insert(0, condition)

    def remove_condition(self, condition: Callable[["Execution"], bool]) -> None:
        """Removes a temporary condition that indicates an execution is stricken."""
        assert condition is not self._matches_task_or_parameter_strike
        self._conditions.remove(condition)

    async def wait_for_strikes_loaded(self) -> None:
        """Wait for all existing strikes to be loaded from the stream.

        This method blocks until the strike monitor has completed its initial
        non-blocking read of all existing strike messages. Call this before
        making decisions that depend on the current strike state.

        If not connected to Redis (local-only mode), returns immediately.
        """
        if self._strikes_loaded is None:
            return
        await self._strikes_loaded.wait()

    async def send_instruction(self, instruction: StrikeInstruction) -> None:
        """Send a strike instruction to Redis and update local state.

        Args:
            instruction: The Strike or Restore instruction to send.

        Raises:
            RuntimeError: If not connected to Redis.
        """
        if self._connection_pool is None:
            raise RuntimeError(
                "Cannot send strike instruction: not connected to Redis. "
                "Use connect() or async context manager first."
            )

        async with Redis(connection_pool=self._connection_pool) as r:
            await r.xadd(self.strike_key, instruction.as_message())  # type: ignore[arg-type]

        self.update(instruction)

    async def strike(
        self,
        function: str | None = None,
        parameter: str | None = None,
        operator: "Operator | LiteralOperator" = "==",
        value: Hashable | None = None,
    ) -> None:
        """Issue a strike to block matching tasks or parameters.

        Args:
            function: Task function name to strike, or None for all tasks.
            parameter: Parameter name to match, or None for entire task.
            operator: Comparison operator for the value.
            value: Value to compare against.
        """
        instruction = Strike(function, parameter, Operator(operator), value)
        await self.send_instruction(instruction)

    async def restore(
        self,
        function: str | None = None,
        parameter: str | None = None,
        operator: "Operator | LiteralOperator" = "==",
        value: Hashable | None = None,
    ) -> None:
        """Restore a previously issued strike.

        Args:
            function: Task function name to restore, or None for all tasks.
            parameter: Parameter name to match, or None for entire task.
            operator: Comparison operator for the value.
            value: Value to compare against.
        """
        instruction = Restore(function, parameter, Operator(operator), value)
        await self.send_instruction(instruction)

    def is_stricken(self, target: "Execution | Mapping[str, Any]") -> bool:
        """Check if a target matches any strike condition.

        Args:
            target: Either an Execution object (for Docket/Worker use) or
                   a dictionary of parameter names to values (for standalone use).

        Returns:
            True if any parameter matches a strike condition.
        """
        # Check if this is a dict-like object (Mapping)
        if isinstance(target, Mapping):
            return self._is_dict_stricken(target)

        # Otherwise it's an Execution - use the full condition checking
        return any(condition(target) for condition in self._conditions)

    def _is_dict_stricken(self, params: Mapping[str, Any]) -> bool:
        """Check if a parameter dict matches any strike condition.

        Args:
            params: Dictionary of parameter names to values.

        Returns:
            True if any parameter matches a strike condition.
        """
        for parameter, argument in params.items():
            if parameter not in self.parameter_strikes:
                continue

            for operator, strike_value in self.parameter_strikes[parameter]:
                if self._is_match(argument, operator, strike_value):
                    return True

        return False

    def _matches_task_or_parameter_strike(self, execution: "Execution") -> bool:
        from .execution import get_signature

        function_name = execution.function_name

        # Check if the entire task is stricken (without parameter conditions)
        task_strikes = self.task_strikes.get(function_name, {})
        if function_name in self.task_strikes and not task_strikes:
            return True

        signature = get_signature(execution.function)

        try:
            bound_args = signature.bind(*execution.args, **execution.kwargs)
            bound_args.apply_defaults()
        except TypeError:
            # If we can't make sense of the arguments, just assume the task is fine
            return False

        all_arguments = {
            **bound_args.arguments,
            **{
                k: v
                for k, v in execution.kwargs.items()
                if k not in bound_args.arguments
            },
        }

        for parameter, argument in all_arguments.items():
            for strike_source in [task_strikes, self.parameter_strikes]:
                if parameter not in strike_source:
                    continue

                for operator, strike_value in strike_source[parameter]:
                    if self._is_match(argument, operator, strike_value):
                        return True

        return False

    def _is_match(self, value: Any, operator: Operator, strike_value: Any) -> bool:
        """Determines if a value matches a strike condition."""
        try:
            match operator:
                case "==":
                    return value == strike_value
                case "!=":
                    return value != strike_value
                case ">":
                    return value > strike_value
                case ">=":
                    return value >= strike_value
                case "<":
                    return value < strike_value
                case "<=":
                    return value <= strike_value
                case "between":  # pragma: no branch
                    lower, upper = strike_value
                    return lower <= value <= upper
                case _:  # pragma: no cover
                    raise ValueError(f"Unknown operator: {operator}")
        except (ValueError, TypeError):
            # If we can't make the comparison due to incompatible types, just log the
            # error and assume the task is not stricken
            logger.warning(
                "Incompatible type for strike condition: %r %s %r",
                strike_value,
                operator,
                value,
                exc_info=True,
            )
            return False

    def update(self, instruction: StrikeInstruction) -> None:
        try:
            hash(instruction.value)
        except TypeError:
            logger.warning(
                "Incompatible type for strike condition: %s %r",
                instruction.operator,
                instruction.value,
            )
            return

        if isinstance(instruction, Strike):
            self._strike(instruction)
        elif isinstance(instruction, Restore):  # pragma: no branch
            self._restore(instruction)

    def _strike(self, strike: Strike) -> None:
        if strike.function and strike.parameter:
            try:
                task_strikes = self.task_strikes[strike.function]
            except KeyError:
                task_strikes = self.task_strikes[strike.function] = {}

            try:
                parameter_strikes = task_strikes[strike.parameter]
            except KeyError:
                parameter_strikes = task_strikes[strike.parameter] = set()

            parameter_strikes.add((strike.operator, strike.value))

        elif strike.function:
            try:
                task_strikes = self.task_strikes[strike.function]
            except KeyError:
                task_strikes = self.task_strikes[strike.function] = {}

        elif strike.parameter:  # pragma: no branch
            try:
                parameter_strikes = self.parameter_strikes[strike.parameter]
            except KeyError:
                parameter_strikes = self.parameter_strikes[strike.parameter] = set()

            parameter_strikes.add((strike.operator, strike.value))

    def _restore(self, restore: Restore) -> None:
        if restore.function and restore.parameter:
            try:
                task_strikes = self.task_strikes[restore.function]
            except KeyError:
                return

            try:
                parameter_strikes = task_strikes[restore.parameter]
            except KeyError:
                task_strikes.pop(restore.parameter, None)
                return

            try:
                parameter_strikes.remove((restore.operator, restore.value))
            except KeyError:
                pass

            if not parameter_strikes:
                task_strikes.pop(restore.parameter, None)
                if not task_strikes:
                    self.task_strikes.pop(restore.function, None)

        elif restore.function:
            try:
                task_strikes = self.task_strikes[restore.function]
            except KeyError:
                return

            # If there are no parameter strikes, this was a full task strike
            if not task_strikes:
                self.task_strikes.pop(restore.function, None)

        elif restore.parameter:  # pragma: no branch
            try:
                parameter_strikes = self.parameter_strikes[restore.parameter]
            except KeyError:
                return

            try:
                parameter_strikes.remove((restore.operator, restore.value))
            except KeyError:
                pass

            if not parameter_strikes:
                self.parameter_strikes.pop(restore.parameter, None)

    async def _monitor_strikes(self) -> NoReturn:
        """Background task that monitors Redis for strike updates."""
        from .instrumentation import REDIS_DISRUPTIONS, STRIKES_IN_EFFECT

        last_id = "0-0"
        initial_load_complete = False
        while True:
            try:
                async with Redis(connection_pool=self._connection_pool) as r:
                    while True:
                        with self._maybe_suppress_instrumentation():
                            # Non-blocking for initial load (block=None), then block
                            # for new messages (block=60_000). Note: block=0 means
                            # "block forever" in Redis, not "non-blocking".
                            streams = await r.xread(
                                {self.strike_key: last_id},
                                count=100,
                                block=60_000 if initial_load_complete else None,
                            )

                        # If no messages and we haven't signaled yet, initial load is done
                        if not streams and not initial_load_complete:
                            initial_load_complete = True
                            # _strikes_loaded is always set when _monitor_strikes runs
                            assert self._strikes_loaded is not None
                            self._strikes_loaded.set()
                            continue

                        for _, messages in streams:
                            for message_id, message in messages:
                                last_id = message_id
                                instruction = StrikeInstruction.from_message(message)
                                self.update(instruction)
                                logger.info(
                                    "%s %r",
                                    (
                                        "Striking"
                                        if instruction.direction == "strike"
                                        else "Restoring"
                                    ),
                                    instruction.call_repr(),
                                )

                                STRIKES_IN_EFFECT.add(
                                    1 if instruction.direction == "strike" else -1,
                                    {
                                        "docket.name": self.name,
                                        **instruction.labels(),
                                    },
                                )

            except redis.exceptions.ConnectionError:  # pragma: no cover
                REDIS_DISRUPTIONS.add(1, {"docket": self.name})
                logger.warning("Connection error, sleeping for 1 second...")
                await asyncio.sleep(1)
            except Exception:  # pragma: no cover
                logger.exception("Error monitoring strikes")
                await asyncio.sleep(1)

strike_key property

Redis stream key for strike instructions.

__aenter__() async

Async context manager entry - connects to Redis if URL provided.

Source code in src/docket/strikelist.py
async def __aenter__(self) -> Self:
    """Async context manager entry - connects to Redis if URL provided."""
    await self.connect()
    return self

__aexit__(exc_type, exc_value, traceback) async

Async context manager exit - closes Redis connection.

Source code in src/docket/strikelist.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None:
    """Async context manager exit - closes Redis connection."""
    await self.close()

__init__(url=None, name='strikelist', enable_internal_instrumentation=False)

Initialize a StrikeList.

Parameters:

Name Type Description Default
url str | None

Redis connection URL. Use "memory://" for in-memory testing. If None, no Redis connection is made (local-only mode).

None
name str

Name used as prefix for Redis keys (should match the Docket name if you want to receive strikes from that Docket).

'strikelist'
enable_internal_instrumentation bool

If True, allows OpenTelemetry spans for internal Redis operations. Default False suppresses these spans.

False
Source code in src/docket/strikelist.py
def __init__(
    self,
    url: str | None = None,
    name: str = "strikelist",
    enable_internal_instrumentation: bool = False,
) -> None:
    """Initialize a StrikeList.

    Args:
        url: Redis connection URL. Use "memory://" for in-memory testing.
             If None, no Redis connection is made (local-only mode).
        name: Name used as prefix for Redis keys (should match the Docket name
              if you want to receive strikes from that Docket).
        enable_internal_instrumentation: If True, allows OpenTelemetry spans
            for internal Redis operations. Default False suppresses these spans.
    """
    self.url = url
    self.name = name
    self.enable_internal_instrumentation = enable_internal_instrumentation
    self.task_strikes = {}
    self.parameter_strikes = {}
    self._conditions = [self._matches_task_or_parameter_strike]
    self._connection_pool = None
    self._monitor_task = None
    self._strikes_loaded = None

add_condition(condition)

Adds a temporary condition that indicates an execution is stricken.

Source code in src/docket/strikelist.py
def add_condition(self, condition: Callable[["Execution"], bool]) -> None:
    """Adds a temporary condition that indicates an execution is stricken."""
    self._conditions.insert(0, condition)

close() async

Close the Redis connection and stop monitoring.

This method cancels the background monitoring task and disconnects from Redis. It is safe to call multiple times.

Source code in src/docket/strikelist.py
async def close(self) -> None:
    """Close the Redis connection and stop monitoring.

    This method cancels the background monitoring task and disconnects
    from Redis. It is safe to call multiple times.
    """
    if self._monitor_task is not None:
        self._monitor_task.cancel()
        try:
            await self._monitor_task
        except asyncio.CancelledError:
            pass
        self._monitor_task = None

    self._strikes_loaded = None

    if self._connection_pool is not None:
        await asyncio.shield(self._connection_pool.disconnect())
        self._connection_pool = None

connect() async

Connect to Redis and start monitoring for strike updates.

If no URL was provided during initialization, this is a no-op. This method sets up the Redis connection pool and starts a background task that monitors the strike stream for updates.

Source code in src/docket/strikelist.py
async def connect(self) -> None:
    """Connect to Redis and start monitoring for strike updates.

    If no URL was provided during initialization, this is a no-op.
    This method sets up the Redis connection pool and starts a background
    task that monitors the strike stream for updates.
    """
    if self.url is None:
        return  # No Redis connection needed

    if self._connection_pool is not None:
        return  # Already connected

    self._connection_pool = await connection_pool_from_url(self.url)

    self._strikes_loaded = asyncio.Event()
    self._monitor_task = asyncio.create_task(self._monitor_strikes())

is_stricken(target)

Check if a target matches any strike condition.

Parameters:

Name Type Description Default
target Execution | Mapping[str, Any]

Either an Execution object (for Docket/Worker use) or a dictionary of parameter names to values (for standalone use).

required

Returns:

Type Description
bool

True if any parameter matches a strike condition.

Source code in src/docket/strikelist.py
def is_stricken(self, target: "Execution | Mapping[str, Any]") -> bool:
    """Check if a target matches any strike condition.

    Args:
        target: Either an Execution object (for Docket/Worker use) or
               a dictionary of parameter names to values (for standalone use).

    Returns:
        True if any parameter matches a strike condition.
    """
    # Check if this is a dict-like object (Mapping)
    if isinstance(target, Mapping):
        return self._is_dict_stricken(target)

    # Otherwise it's an Execution - use the full condition checking
    return any(condition(target) for condition in self._conditions)

remove_condition(condition)

Removes a temporary condition that indicates an execution is stricken.

Source code in src/docket/strikelist.py
def remove_condition(self, condition: Callable[["Execution"], bool]) -> None:
    """Removes a temporary condition that indicates an execution is stricken."""
    assert condition is not self._matches_task_or_parameter_strike
    self._conditions.remove(condition)

restore(function=None, parameter=None, operator='==', value=None) async

Restore a previously issued strike.

Parameters:

Name Type Description Default
function str | None

Task function name to restore, or None for all tasks.

None
parameter str | None

Parameter name to match, or None for entire task.

None
operator Operator | LiteralOperator

Comparison operator for the value.

'=='
value Hashable | None

Value to compare against.

None
Source code in src/docket/strikelist.py
async def restore(
    self,
    function: str | None = None,
    parameter: str | None = None,
    operator: "Operator | LiteralOperator" = "==",
    value: Hashable | None = None,
) -> None:
    """Restore a previously issued strike.

    Args:
        function: Task function name to restore, or None for all tasks.
        parameter: Parameter name to match, or None for entire task.
        operator: Comparison operator for the value.
        value: Value to compare against.
    """
    instruction = Restore(function, parameter, Operator(operator), value)
    await self.send_instruction(instruction)

send_instruction(instruction) async

Send a strike instruction to Redis and update local state.

Parameters:

Name Type Description Default
instruction StrikeInstruction

The Strike or Restore instruction to send.

required

Raises:

Type Description
RuntimeError

If not connected to Redis.

Source code in src/docket/strikelist.py
async def send_instruction(self, instruction: StrikeInstruction) -> None:
    """Send a strike instruction to Redis and update local state.

    Args:
        instruction: The Strike or Restore instruction to send.

    Raises:
        RuntimeError: If not connected to Redis.
    """
    if self._connection_pool is None:
        raise RuntimeError(
            "Cannot send strike instruction: not connected to Redis. "
            "Use connect() or async context manager first."
        )

    async with Redis(connection_pool=self._connection_pool) as r:
        await r.xadd(self.strike_key, instruction.as_message())  # type: ignore[arg-type]

    self.update(instruction)

strike(function=None, parameter=None, operator='==', value=None) async

Issue a strike to block matching tasks or parameters.

Parameters:

Name Type Description Default
function str | None

Task function name to strike, or None for all tasks.

None
parameter str | None

Parameter name to match, or None for entire task.

None
operator Operator | LiteralOperator

Comparison operator for the value.

'=='
value Hashable | None

Value to compare against.

None
Source code in src/docket/strikelist.py
async def strike(
    self,
    function: str | None = None,
    parameter: str | None = None,
    operator: "Operator | LiteralOperator" = "==",
    value: Hashable | None = None,
) -> None:
    """Issue a strike to block matching tasks or parameters.

    Args:
        function: Task function name to strike, or None for all tasks.
        parameter: Parameter name to match, or None for entire task.
        operator: Comparison operator for the value.
        value: Value to compare against.
    """
    instruction = Strike(function, parameter, Operator(operator), value)
    await self.send_instruction(instruction)

wait_for_strikes_loaded() async

Wait for all existing strikes to be loaded from the stream.

This method blocks until the strike monitor has completed its initial non-blocking read of all existing strike messages. Call this before making decisions that depend on the current strike state.

If not connected to Redis (local-only mode), returns immediately.

Source code in src/docket/strikelist.py
async def wait_for_strikes_loaded(self) -> None:
    """Wait for all existing strikes to be loaded from the stream.

    This method blocks until the strike monitor has completed its initial
    non-blocking read of all existing strike messages. Call this before
    making decisions that depend on the current strike state.

    If not connected to Redis (local-only mode), returns immediately.
    """
    if self._strikes_loaded is None:
        return
    await self._strikes_loaded.wait()

Timeout

Bases: Dependency

Configures a timeout for a task. You can specify the base timeout, and the task will be cancelled if it exceeds this duration. The timeout may be extended within the context of a single running task.

Example:

@task
async def my_task(timeout: Timeout = Timeout(timedelta(seconds=10))) -> None:
    ...
Source code in src/docket/dependencies.py
class Timeout(Dependency):
    """Configures a timeout for a task.  You can specify the base timeout, and the
    task will be cancelled if it exceeds this duration.  The timeout may be extended
    within the context of a single running task.

    Example:

    ```python
    @task
    async def my_task(timeout: Timeout = Timeout(timedelta(seconds=10))) -> None:
        ...
    ```
    """

    single: bool = True

    base: timedelta
    _deadline: float

    def __init__(self, base: timedelta) -> None:
        """
        Args:
            base: The base timeout duration.
        """
        self.base = base

    async def __aenter__(self) -> "Timeout":
        timeout = Timeout(base=self.base)
        timeout.start()
        return timeout

    def start(self) -> None:
        self._deadline = time.monotonic() + self.base.total_seconds()

    def expired(self) -> bool:
        return time.monotonic() >= self._deadline

    def remaining(self) -> timedelta:
        """Get the remaining time until the timeout expires."""
        return timedelta(seconds=self._deadline - time.monotonic())

    def extend(self, by: timedelta | None = None) -> None:
        """Extend the timeout by a given duration.  If no duration is provided, the
        base timeout will be used.

        Args:
            by: The duration to extend the timeout by.
        """
        if by is None:
            by = self.base
        self._deadline += by.total_seconds()

__init__(base)

Parameters:

Name Type Description Default
base timedelta

The base timeout duration.

required
Source code in src/docket/dependencies.py
def __init__(self, base: timedelta) -> None:
    """
    Args:
        base: The base timeout duration.
    """
    self.base = base

extend(by=None)

Extend the timeout by a given duration. If no duration is provided, the base timeout will be used.

Parameters:

Name Type Description Default
by timedelta | None

The duration to extend the timeout by.

None
Source code in src/docket/dependencies.py
def extend(self, by: timedelta | None = None) -> None:
    """Extend the timeout by a given duration.  If no duration is provided, the
    base timeout will be used.

    Args:
        by: The duration to extend the timeout by.
    """
    if by is None:
        by = self.base
    self._deadline += by.total_seconds()

remaining()

Get the remaining time until the timeout expires.

Source code in src/docket/dependencies.py
def remaining(self) -> timedelta:
    """Get the remaining time until the timeout expires."""
    return timedelta(seconds=self._deadline - time.monotonic())

Worker

A Worker executes tasks on a Docket. You may run as many workers as you like to work a single Docket.

Example:

async with Docket() as docket:
    async with Worker(docket) as worker:
        await worker.run_forever()
Source code in src/docket/worker.py
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
class Worker:
    """A Worker executes tasks on a Docket.  You may run as many workers as you like
    to work a single Docket.

    Example:

    ```python
    async with Docket() as docket:
        async with Worker(docket) as worker:
            await worker.run_forever()
    ```
    """

    docket: Docket
    name: str
    concurrency: int
    redelivery_timeout: timedelta
    reconnection_delay: timedelta
    minimum_check_interval: timedelta
    scheduling_resolution: timedelta
    schedule_automatic_tasks: bool
    enable_internal_instrumentation: bool
    fallback_task: TaskFunction

    def __init__(
        self,
        docket: Docket,
        name: str | None = None,
        concurrency: int = 10,
        redelivery_timeout: timedelta = timedelta(minutes=5),
        reconnection_delay: timedelta = timedelta(seconds=5),
        minimum_check_interval: timedelta = timedelta(milliseconds=250),
        scheduling_resolution: timedelta = timedelta(milliseconds=250),
        schedule_automatic_tasks: bool = True,
        enable_internal_instrumentation: bool = False,
        fallback_task: TaskFunction | None = None,
    ) -> None:
        self.docket = docket
        self.name = name or f"{socket.gethostname()}#{os.getpid()}"
        self.concurrency = concurrency
        self.redelivery_timeout = redelivery_timeout
        self.reconnection_delay = reconnection_delay
        self.minimum_check_interval = minimum_check_interval
        self.scheduling_resolution = scheduling_resolution
        self.schedule_automatic_tasks = schedule_automatic_tasks
        self.enable_internal_instrumentation = enable_internal_instrumentation
        self.fallback_task = fallback_task or default_fallback_task

    @contextmanager
    def _maybe_suppress_instrumentation(self) -> Generator[None, None, None]:
        """Suppress OTel auto-instrumentation for internal Redis operations.

        When enable_internal_instrumentation is False (default), this context manager
        suppresses OpenTelemetry auto-instrumentation spans for internal Redis polling
        operations like XREADGROUP, XAUTOCLAIM, and Lua script evaluations. This prevents
        thousands of noisy spans per minute from overwhelming trace storage.

        Task execution spans and user-facing operations (schedule, cancel, etc.) are
        NOT suppressed.
        """
        if not self.enable_internal_instrumentation:
            with suppress_instrumentation():
                yield
        else:
            yield

    async def __aenter__(self) -> Self:
        self._heartbeat_task = asyncio.create_task(self._heartbeat())
        self._execution_counts: dict[str, int] = {}
        # Track concurrency slots for active tasks so we can refresh them during
        # lease renewal. Maps execution.key → concurrency_key
        self._concurrency_slots: dict[str, str] = {}
        # Track running tasks for cancellation lookup
        self._tasks_by_key: dict[TaskKey, asyncio.Task[None]] = {}
        self._cancellation_listener_task = asyncio.create_task(
            self._cancellation_listener()
        )
        # Events for coordinating worker loop shutdown
        self._worker_stopping = asyncio.Event()
        self._worker_done = asyncio.Event()
        self._worker_done.set()  # Initially done (not running)
        # Signaled when cancellation listener is subscribed and ready
        self._cancellation_ready = asyncio.Event()
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        # Signal worker loop to stop and wait for it to drain
        self._worker_stopping.set()
        await self._worker_done.wait()

        self._cancellation_listener_task.cancel()
        with suppress(asyncio.CancelledError):
            await self._cancellation_listener_task
        del self._cancellation_listener_task

        self._heartbeat_task.cancel()
        with suppress(asyncio.CancelledError):
            await self._heartbeat_task
        del self._heartbeat_task

        del self._execution_counts
        del self._concurrency_slots
        del self._tasks_by_key
        del self._worker_stopping
        del self._worker_done
        del self._cancellation_ready

    def labels(self) -> Mapping[str, str]:
        return {
            **self.docket.labels(),
            "docket.worker": self.name,
        }

    def _log_context(self) -> Mapping[str, str]:
        return {
            **self.labels(),
            "docket.queue_key": self.docket.queue_key,
            "docket.stream_key": self.docket.stream_key,
        }

    @classmethod
    async def run(
        cls,
        docket_name: str = "docket",
        url: str = "redis://localhost:6379/0",
        name: str | None = None,
        concurrency: int = 10,
        redelivery_timeout: timedelta = timedelta(minutes=5),
        reconnection_delay: timedelta = timedelta(seconds=5),
        minimum_check_interval: timedelta = timedelta(milliseconds=100),
        scheduling_resolution: timedelta = timedelta(milliseconds=250),
        schedule_automatic_tasks: bool = True,
        enable_internal_instrumentation: bool = False,
        until_finished: bool = False,
        healthcheck_port: int | None = None,
        metrics_port: int | None = None,
        tasks: list[str] = ["docket.tasks:standard_tasks"],
        fallback_task: str | None = None,
    ) -> None:
        """Run a worker as the main entry point (CLI).

        This method installs signal handlers for graceful shutdown since it
        assumes ownership of the event loop. When embedding Docket in another
        framework (e.g., FastAPI with uvicorn), use Worker.run_forever() or
        Worker.run_until_finished() directly - those methods do not install
        signal handlers and rely on the framework to handle shutdown signals.
        """
        # Parse fallback_task string if provided (module:function format)
        resolved_fallback_task: TaskFunction | None = None
        if fallback_task:
            module_name, _, member_name = fallback_task.rpartition(":")
            module = importlib.import_module(module_name)
            resolved_fallback_task = getattr(module, member_name)

        with (
            healthcheck_server(port=healthcheck_port),
            metrics_server(port=metrics_port),
        ):
            async with Docket(
                name=docket_name,
                url=url,
                enable_internal_instrumentation=enable_internal_instrumentation,
            ) as docket:
                for task_path in tasks:
                    docket.register_collection(task_path)

                async with (
                    Worker(  # pragma: no branch - context manager exit varies across interpreters
                        docket=docket,
                        name=name,
                        concurrency=concurrency,
                        redelivery_timeout=redelivery_timeout,
                        reconnection_delay=reconnection_delay,
                        minimum_check_interval=minimum_check_interval,
                        scheduling_resolution=scheduling_resolution,
                        schedule_automatic_tasks=schedule_automatic_tasks,
                        enable_internal_instrumentation=enable_internal_instrumentation,
                        fallback_task=resolved_fallback_task,
                    ) as worker
                ):
                    # Install signal handlers for graceful shutdown.
                    # This is only appropriate when we own the event loop (CLI entry point).
                    # Embedded usage should let the framework handle signals.
                    loop = asyncio.get_running_loop()
                    run_task: asyncio.Task[None] | None = None

                    def handle_shutdown(sig_name: str) -> None:  # pragma: no cover
                        logger.info(
                            "Received %s, initiating graceful shutdown...", sig_name
                        )
                        if run_task and not run_task.done():
                            run_task.cancel()

                    if hasattr(signal, "SIGTERM"):  # pragma: no cover
                        loop.add_signal_handler(
                            signal.SIGTERM, lambda: handle_shutdown("SIGTERM")
                        )
                        loop.add_signal_handler(
                            signal.SIGINT, lambda: handle_shutdown("SIGINT")
                        )

                    try:
                        if until_finished:
                            run_task = asyncio.create_task(worker.run_until_finished())
                        else:
                            run_task = asyncio.create_task(
                                worker.run_forever()
                            )  # pragma: no cover
                        await run_task
                    except asyncio.CancelledError:  # pragma: no cover
                        pass
                    finally:
                        if hasattr(signal, "SIGTERM"):  # pragma: no cover
                            loop.remove_signal_handler(signal.SIGTERM)
                            loop.remove_signal_handler(signal.SIGINT)

    async def run_until_finished(self) -> None:
        """Run the worker until there are no more tasks to process."""
        return await self._run(forever=False)

    async def run_forever(self) -> None:
        """Run the worker indefinitely."""
        return await self._run(forever=True)  # pragma: no cover

    _execution_counts: dict[str, int]

    async def run_at_most(self, iterations_by_key: Mapping[str, int]) -> None:
        """
        Run the worker until there are no more tasks to process, but limit specified
        task keys to a maximum number of iterations.

        This is particularly useful for testing self-perpetuating tasks that would
        otherwise run indefinitely.

        Args:
            iterations_by_key: Maps task keys to their maximum allowed executions
        """
        self._execution_counts = {key: 0 for key in iterations_by_key}

        def has_reached_max_iterations(execution: Execution) -> bool:
            key = execution.key

            if key not in iterations_by_key:
                return False

            if self._execution_counts[key] >= iterations_by_key[key]:
                return True

            return False

        self.docket.strike_list.add_condition(has_reached_max_iterations)
        try:
            await self.run_until_finished()
        finally:
            self.docket.strike_list.remove_condition(has_reached_max_iterations)
            self._execution_counts = {}

    async def _run(self, forever: bool = False) -> None:
        self._startup_log()

        while True:
            try:
                async with self.docket.redis() as redis:
                    return await self._worker_loop(redis, forever=forever)
            except ConnectionError:
                REDIS_DISRUPTIONS.add(1, self.labels())
                logger.warning(
                    "Error connecting to redis, retrying in %s...",
                    self.reconnection_delay,
                    exc_info=True,
                )
                await asyncio.sleep(self.reconnection_delay.total_seconds())

    async def _worker_loop(self, redis: Redis, forever: bool = False):
        self._worker_stopping.clear()
        self._worker_done.clear()

        await self._cancellation_ready.wait()

        if self.schedule_automatic_tasks:
            await self._schedule_all_automatic_perpetual_tasks()

        active_tasks: dict[asyncio.Task[None], RedisMessageID] = {}

        scheduler_task = asyncio.create_task(self._scheduler_loop(redis))
        lease_renewal_task = asyncio.create_task(
            self._renew_leases(redis, active_tasks)
        )
        task_executions: dict[asyncio.Task[None], Execution] = {}
        available_slots = self.concurrency

        log_context = self._log_context()

        async def check_for_work() -> bool:
            logger.debug("Checking for work", extra=log_context)
            async with redis.pipeline() as pipeline:
                pipeline.xlen(self.docket.stream_key)
                pipeline.zcard(self.docket.queue_key)
                results: list[int] = await pipeline.execute()
                stream_len = results[0]
                queue_len = results[1]
                return stream_len > 0 or queue_len > 0

        async def get_redeliveries(redis: Redis) -> RedisReadGroupResponse:
            logger.debug("Getting redeliveries", extra=log_context)
            try:
                with self._maybe_suppress_instrumentation():
                    _, redeliveries, *_ = await redis.xautoclaim(
                        name=self.docket.stream_key,
                        groupname=self.docket.worker_group_name,
                        consumername=self.name,
                        min_idle_time=int(
                            self.redelivery_timeout.total_seconds() * 1000
                        ),
                        start_id="0-0",
                        count=available_slots,
                    )
            except ResponseError as e:
                if "NOGROUP" in str(e):
                    await self.docket._ensure_stream_and_group()
                    return await get_redeliveries(redis)
                raise  # pragma: no cover
            return [(b"__redelivery__", redeliveries)]

        async def get_new_deliveries(redis: Redis) -> RedisReadGroupResponse:
            logger.debug("Getting new deliveries", extra=log_context)
            # Use non-blocking read with in-memory backend + manual sleep
            # This is necessary because fakeredis's async blocking operations don't
            # properly yield control to the asyncio event loop
            is_memory = self.docket.url.startswith("memory://")
            try:
                with self._maybe_suppress_instrumentation():
                    result = await redis.xreadgroup(
                        groupname=self.docket.worker_group_name,
                        consumername=self.name,
                        streams={self.docket.stream_key: ">"},
                        block=0
                        if is_memory
                        else int(self.minimum_check_interval.total_seconds() * 1000),
                        count=available_slots,
                    )
            except ResponseError as e:
                if "NOGROUP" in str(e):
                    await self.docket._ensure_stream_and_group()
                    return await get_new_deliveries(redis)
                raise  # pragma: no cover
            if is_memory and not result:
                await asyncio.sleep(self.minimum_check_interval.total_seconds())
            return result

        async def start_task(
            message_id: RedisMessageID,
            message: RedisMessage,
            is_redelivery: bool = False,
        ) -> None:
            execution = await Execution.from_message(
                self.docket,
                message,
                redelivered=is_redelivery,
                fallback_task=self.fallback_task,
            )

            task = asyncio.create_task(self._execute(execution), name=execution.key)
            active_tasks[task] = message_id
            task_executions[task] = execution
            self._tasks_by_key[execution.key] = task

            nonlocal available_slots
            available_slots -= 1

        async def process_completed_tasks() -> None:
            completed_tasks = {task for task in active_tasks if task.done()}
            for task in completed_tasks:
                message_id = active_tasks.pop(task)
                task_executions.pop(task)
                self._tasks_by_key.pop(task.get_name(), None)
                try:
                    await task
                    await ack_message(redis, message_id)
                except ConcurrencyBlocked as e:
                    logger.debug(
                        "🔒 Task %s blocked by concurrency limit, rescheduling",
                        e.execution.key,
                        extra=log_context,
                    )
                    e.execution.when = (
                        datetime.now(timezone.utc) + CONCURRENCY_BLOCKED_RETRY_DELAY
                    )
                    await e.execution.schedule(reschedule_message=message_id)

        async def ack_message(redis: Redis, message_id: RedisMessageID) -> None:
            logger.debug("Acknowledging message", extra=log_context)
            async with redis.pipeline() as pipeline:
                pipeline.xack(
                    self.docket.stream_key,
                    self.docket.worker_group_name,
                    message_id,
                )
                pipeline.xdel(
                    self.docket.stream_key,
                    message_id,
                )
                await pipeline.execute()

        has_work: bool = True
        stopping = self._worker_stopping.is_set

        try:
            while (forever or has_work or active_tasks) and not stopping():
                await process_completed_tasks()

                available_slots = self.concurrency - len(active_tasks)

                if available_slots <= 0:
                    await asyncio.sleep(self.minimum_check_interval.total_seconds())
                    continue

                for source in [get_redeliveries, get_new_deliveries]:
                    for stream_key, messages in await source(redis):
                        is_redelivery = stream_key == b"__redelivery__"
                        for message_id, message in messages:
                            if not message:  # pragma: no cover
                                continue

                            await start_task(message_id, message, is_redelivery)

                    if available_slots <= 0:
                        break

                if not forever and not active_tasks:
                    has_work = await check_for_work()

        except asyncio.CancelledError:
            if active_tasks:  # pragma: no cover
                logger.info(
                    "Shutdown requested, finishing %d active tasks...",
                    len(active_tasks),
                    extra=log_context,
                )
        finally:
            # Drain any remaining active tasks
            if active_tasks:
                await asyncio.gather(*active_tasks, return_exceptions=True)
                await process_completed_tasks()

            self._worker_stopping.set()
            await scheduler_task
            await lease_renewal_task
            self._worker_done.set()

    async def _scheduler_loop(self, redis: Redis) -> None:
        """Loop that moves due tasks from the queue to the stream."""

        stream_due_tasks: _stream_due_tasks = cast(
            _stream_due_tasks,
            redis.register_script(
                # Lua script to atomically move scheduled tasks to the stream
                # KEYS[1]: queue key (sorted set)
                # KEYS[2]: stream key
                # ARGV[1]: current timestamp
                # ARGV[2]: docket name prefix
                """
            local total_work = redis.call('ZCARD', KEYS[1])
            local due_work = 0

            if total_work > 0 then
                local tasks = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1])

                for i, key in ipairs(tasks) do
                    local hash_key = ARGV[2] .. ":" .. key
                    local task_data = redis.call('HGETALL', hash_key)

                    if #task_data > 0 then
                        local task = {}
                        for j = 1, #task_data, 2 do
                            task[task_data[j]] = task_data[j+1]
                        end

                        redis.call('XADD', KEYS[2], '*',
                            'key', task['key'],
                            'when', task['when'],
                            'function', task['function'],
                            'args', task['args'],
                            'kwargs', task['kwargs'],
                            'attempt', task['attempt']
                        )
                        redis.call('DEL', hash_key)

                        -- Set run state to queued
                        local run_key = ARGV[2] .. ":runs:" .. task['key']
                        redis.call('HSET', run_key, 'state', 'queued')

                        -- Publish state change event to pub/sub
                        local channel = ARGV[2] .. ":state:" .. task['key']
                        local payload = '{"type":"state","key":"' .. task['key'] .. '","state":"queued","when":"' .. task['when'] .. '"}'
                        redis.call('PUBLISH', channel, payload)

                        due_work = due_work + 1
                    end
                end
            end

            if due_work > 0 then
                redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, ARGV[1])
            end

            return {total_work, due_work}
            """
            ),
        )

        log_context = self._log_context()

        while not self._worker_stopping.is_set():
            try:
                logger.debug("Scheduling due tasks", extra=log_context)
                with self._maybe_suppress_instrumentation():
                    total_work, due_work = await stream_due_tasks(
                        keys=[self.docket.queue_key, self.docket.stream_key],
                        args=[datetime.now(timezone.utc).timestamp(), self.docket.name],
                    )

                if due_work > 0:
                    logger.debug(
                        "Moved %d/%d due tasks from %s to %s",
                        due_work,
                        total_work,
                        self.docket.queue_key,
                        self.docket.stream_key,
                        extra=log_context,
                    )
            except Exception:  # pragma: no cover
                logger.exception(
                    "Error in scheduler loop",
                    exc_info=True,
                    extra=log_context,
                )

            # Wait for worker to stop or scheduling interval to pass
            try:
                await asyncio.wait_for(
                    self._worker_stopping.wait(),
                    timeout=self.scheduling_resolution.total_seconds(),
                )
            except asyncio.TimeoutError:
                pass  # Time to check for due tasks again

        logger.debug("Scheduler loop finished", extra=log_context)

    async def _renew_leases(
        self,
        redis: Redis,
        active_messages: dict[asyncio.Task[None], RedisMessageID],
    ) -> None:
        """Periodically renew leases on messages and concurrency slots.

        Calls XCLAIM with idle=0 to reset the message's idle time, preventing
        XAUTOCLAIM from reclaiming it while we're still processing.

        Also refreshes concurrency slot timestamps to prevent them from being
        garbage collected while tasks are still running.
        """
        renewal_interval = (
            self.redelivery_timeout.total_seconds() / LEASE_RENEWAL_FACTOR
        )

        while not self._worker_stopping.is_set():  # pragma: no branch
            try:
                await asyncio.wait_for(
                    self._worker_stopping.wait(),
                    timeout=renewal_interval,
                )
                break  # Worker is stopping
            except asyncio.TimeoutError:
                pass  # Time to renew leases

            # Snapshot to avoid concurrent modification with main loop
            message_ids = list(active_messages.values())
            concurrency_slots = dict(self._concurrency_slots)
            if not message_ids and not concurrency_slots:
                continue

            try:
                with self._maybe_suppress_instrumentation():
                    # Renew message leases
                    if message_ids:  # pragma: no branch
                        await redis.xclaim(
                            name=self.docket.stream_key,
                            groupname=self.docket.worker_group_name,
                            consumername=self.name,
                            min_idle_time=0,
                            message_ids=message_ids,
                            idle=0,
                        )

                    # Refresh concurrency slot timestamps and TTLs
                    if concurrency_slots:
                        current_time = datetime.now(timezone.utc).timestamp()
                        key_ttl = max(
                            MINIMUM_TTL_SECONDS,
                            int(
                                self.redelivery_timeout.total_seconds()
                                * LEASE_RENEWAL_FACTOR
                            ),
                        )
                        async with redis.pipeline() as pipe:
                            for task_key, concurrency_key in concurrency_slots.items():
                                pipe.zadd(concurrency_key, {task_key: current_time})  # type: ignore
                                pipe.expire(concurrency_key, key_ttl)  # type: ignore
                            await pipe.execute()
            except Exception:
                logger.warning("Failed to renew leases", exc_info=True)

    async def _schedule_all_automatic_perpetual_tasks(self) -> None:
        # Wait for strikes to be fully loaded before scheduling to avoid
        # scheduling struck tasks or missing restored tasks
        await self.docket.wait_for_strikes_loaded()

        async with self.docket.redis() as redis:
            try:
                async with redis.lock(
                    f"{self.docket.name}:perpetual:lock",
                    timeout=AUTOMATIC_PERPETUAL_LOCK_TIMEOUT_SECONDS,
                    blocking=False,
                ):
                    for task_function in self.docket.tasks.values():
                        perpetual = get_single_dependency_parameter_of_type(
                            task_function, Perpetual
                        )
                        if perpetual is None:
                            continue

                        if not perpetual.automatic:
                            continue

                        key = task_function.__name__

                        await self.docket.add(task_function, key=key)()
            except LockError:  # pragma: no cover
                return

    async def _delete_known_task(self, redis: Redis, execution: Execution) -> None:
        logger.debug("Deleting known task", extra=self._log_context())
        # Delete known/stream_id from runs hash to allow task rescheduling
        runs_key = f"{self.docket.name}:runs:{execution.key}"
        await redis.hdel(runs_key, "known", "stream_id")

        # TODO: Remove in next breaking release (v0.14.0) - legacy key cleanup
        known_task_key = self.docket.known_task_key(execution.key)
        stream_id_key = self.docket.stream_id_key(execution.key)
        await redis.delete(known_task_key, stream_id_key)

    async def _execute(self, execution: Execution) -> None:
        log_context = {**self._log_context(), **execution.specific_labels()}
        counter_labels = {**self.labels(), **execution.general_labels()}

        call = execution.call_repr()

        if self.docket.strike_list.is_stricken(execution):
            async with self.docket.redis() as redis:
                await self._delete_known_task(redis, execution)

            logger.warning("🗙 %s", call, extra=log_context)
            TASKS_STRICKEN.add(1, counter_labels | {"docket.where": "worker"})
            return

        if execution.key in self._execution_counts:
            self._execution_counts[execution.key] += 1

        start = time.time()
        punctuality = start - execution.when.timestamp()
        log_context = {**log_context, "punctuality": punctuality}
        duration = 0.0

        TASKS_STARTED.add(1, counter_labels)
        if execution.redelivered:
            TASKS_REDELIVERED.add(1, counter_labels)
        TASKS_RUNNING.add(1, counter_labels)
        TASK_PUNCTUALITY.record(punctuality, counter_labels)

        arrow = "↬" if execution.attempt > 1 else "↪"
        logger.info("%s [%s] %s", arrow, ms(punctuality), call, extra=log_context)

        # Atomically claim task and transition to running state
        # This also initializes progress and cleans up known/stream_id to allow rescheduling
        await execution.claim(self.name)

        dependencies: dict[str, Dependency] = {}
        acquired_concurrency_slot = False

        with tracer.start_as_current_span(
            execution.function_name,
            kind=trace.SpanKind.CONSUMER,
            attributes={
                **self.labels(),
                **execution.specific_labels(),
                "code.function.name": execution.function_name,
            },
            links=execution.incoming_span_links(),
        ) as span:
            try:
                async with resolved_dependencies(self, execution) as dependencies:
                    # Check concurrency limits after dependency resolution
                    concurrency_limit = get_single_dependency_of_type(
                        dependencies, ConcurrencyLimit
                    )
                    if (
                        concurrency_limit and not concurrency_limit.is_bypassed
                    ):  # pragma: no branch - coverage.py on Python 3.10 struggles with this
                        async with self.docket.redis() as redis:
                            # Check if we can acquire a concurrency slot
                            can_start = await self._can_start_task(redis, execution)
                            if not can_start:  # pragma: no branch - 3.10 failure
                                # Task cannot start due to concurrency limits
                                raise ConcurrencyBlocked(execution)
                            acquired_concurrency_slot = True

                    dependency_failures = {
                        k: v
                        for k, v in dependencies.items()
                        if isinstance(v, FailedDependency)
                    }
                    if dependency_failures:
                        raise ExceptionGroup(
                            (
                                "Failed to resolve dependencies for parameter(s): "
                                + ", ".join(dependency_failures.keys())
                            ),
                            [
                                dependency.error
                                for dependency in dependency_failures.values()
                            ],
                        )

                    # Run task with user-specified timeout, or no timeout
                    # Lease renewal keeps messages alive so we don't need implicit timeouts
                    user_timeout = get_single_dependency_of_type(dependencies, Timeout)
                    if user_timeout:
                        user_timeout.start()
                        result = await self._run_function_with_timeout(
                            execution, dependencies, user_timeout
                        )
                    else:
                        result = await execution.function(
                            *execution.args,
                            **{
                                **execution.kwargs,
                                **dependencies,
                            },
                        )

                    duration = log_context["duration"] = time.time() - start
                    TASKS_SUCCEEDED.add(1, counter_labels)

                    span.set_status(Status(StatusCode.OK))

                    rescheduled = await self._perpetuate_if_requested(
                        execution, dependencies, timedelta(seconds=duration)
                    )

                    if rescheduled:
                        # Task was rescheduled - still mark this execution as completed
                        # to set TTL on the runs hash (the new execution has its own entry)
                        await execution.mark_as_completed(result_key=None)
                    else:
                        # Store result if appropriate
                        result_key = None
                        if result is not None and self.docket.execution_ttl:
                            # Serialize and store result
                            pickled_result = cloudpickle.dumps(result)  # type: ignore[arg-type]
                            # Base64-encode for JSON serialization
                            encoded_result = base64.b64encode(pickled_result).decode(
                                "ascii"
                            )
                            result_key = execution.key
                            ttl_seconds = int(self.docket.execution_ttl.total_seconds())
                            await self.docket.result_storage.put(
                                result_key, {"data": encoded_result}, ttl=ttl_seconds
                            )
                        # Mark execution as completed
                        await execution.mark_as_completed(result_key=result_key)

                    arrow = "↫" if rescheduled else "↩"
                    logger.info(
                        "%s [%s] %s", arrow, ms(duration), call, extra=log_context
                    )
            except ConcurrencyBlocked:
                # Re-raise to be handled by process_completed_tasks
                raise
            except asyncio.CancelledError:
                # Task was cancelled externally via docket.cancel()
                duration = log_context["duration"] = time.time() - start
                span.set_status(Status(StatusCode.OK))
                await execution.mark_as_cancelled()
                logger.info(
                    "✗ [%s] %s (cancelled)", ms(duration), call, extra=log_context
                )
            except Exception as e:
                duration = log_context["duration"] = time.time() - start
                TASKS_FAILED.add(1, counter_labels)

                span.record_exception(e)
                span.set_status(Status(StatusCode.ERROR, str(e)))

                retried = await self._retry_if_requested(execution, dependencies)
                if not retried:
                    retried = await self._perpetuate_if_requested(
                        execution, dependencies, timedelta(seconds=duration)
                    )

                # Store exception in result_storage
                result_key = None
                if self.docket.execution_ttl:
                    pickled_exception = cloudpickle.dumps(e)  # type: ignore[arg-type]
                    # Base64-encode for JSON serialization
                    encoded_exception = base64.b64encode(pickled_exception).decode(
                        "ascii"
                    )
                    result_key = execution.key
                    ttl_seconds = int(self.docket.execution_ttl.total_seconds())
                    await self.docket.result_storage.put(
                        result_key, {"data": encoded_exception}, ttl=ttl_seconds
                    )

                # Mark execution as failed with error message
                error_msg = f"{type(e).__name__}: {str(e)}"
                await execution.mark_as_failed(error_msg, result_key=result_key)

                arrow = "↫" if retried else "↩"
                logger.exception(
                    "%s [%s] %s", arrow, ms(duration), call, extra=log_context
                )
            finally:
                # Release concurrency slot only if we actually acquired one
                if acquired_concurrency_slot:
                    async with self.docket.redis() as redis:
                        await self._release_concurrency_slot(redis, execution)

                TASKS_RUNNING.add(-1, counter_labels)
                TASKS_COMPLETED.add(1, counter_labels)
                TASK_DURATION.record(duration, counter_labels)

    async def _run_function_with_timeout(
        self,
        execution: Execution,
        dependencies: dict[str, Dependency],
        timeout: Timeout,
    ) -> Any:
        task_coro = cast(
            Coroutine[None, None, Any],
            execution.function(
                *execution.args,
                **{
                    **execution.kwargs,
                    **dependencies,
                },
            ),
        )
        task = asyncio.create_task(task_coro)
        try:
            while not task.done():  # pragma: no branch
                remaining = timeout.remaining().total_seconds()
                if timeout.expired():
                    task.cancel()
                    break

                try:
                    result = await asyncio.wait_for(
                        asyncio.shield(task), timeout=remaining
                    )
                    return result
                except asyncio.TimeoutError:
                    continue
        finally:
            if not task.done():  # pragma: no branch
                task.cancel()

        try:
            return await task
        except asyncio.CancelledError:
            raise asyncio.TimeoutError

    async def _retry_if_requested(
        self,
        execution: Execution,
        dependencies: dict[str, Dependency],
    ) -> bool:
        retry = get_single_dependency_of_type(dependencies, Retry)
        if not retry:
            return False

        if retry.attempts is not None and execution.attempt >= retry.attempts:
            return False

        execution.when = datetime.now(timezone.utc) + retry.delay
        execution.attempt += 1
        # Use replace=True since the task is being rescheduled after failure
        await execution.schedule(replace=True)

        TASKS_RETRIED.add(1, {**self.labels(), **execution.general_labels()})
        return True

    async def _perpetuate_if_requested(
        self,
        execution: Execution,
        dependencies: dict[str, Dependency],
        duration: timedelta,
    ) -> bool:
        perpetual = get_single_dependency_of_type(dependencies, Perpetual)
        if not perpetual:
            return False

        if perpetual.cancelled:
            await self.docket.cancel(execution.key)
            return False

        now = datetime.now(timezone.utc)
        when = max(now, now + perpetual.every - duration)

        await self.docket.replace(execution.function, when, execution.key)(
            *perpetual.args,
            **perpetual.kwargs,
        )

        TASKS_PERPETUATED.add(1, {**self.labels(), **execution.general_labels()})

        return True

    def _startup_log(self) -> None:
        logger.info("Starting worker %r with the following tasks:", self.name)
        for task_name, task in self.docket.tasks.items():
            logger.info("* %s(%s)", task_name, compact_signature(get_signature(task)))

    @property
    def workers_set(self) -> str:
        return self.docket.workers_set

    def worker_tasks_set(self, worker_name: str) -> str:
        return self.docket.worker_tasks_set(worker_name)

    def task_workers_set(self, task_name: str) -> str:
        return self.docket.task_workers_set(task_name)

    async def _heartbeat(self) -> None:
        while True:
            try:
                now = datetime.now(timezone.utc).timestamp()
                maximum_age = (
                    self.docket.heartbeat_interval * self.docket.missed_heartbeats
                )
                oldest = now - maximum_age.total_seconds()

                task_names = list(self.docket.tasks)

                async with self.docket.redis() as r:
                    with self._maybe_suppress_instrumentation():
                        async with r.pipeline() as pipeline:
                            pipeline.zremrangebyscore(self.workers_set, 0, oldest)
                            pipeline.zadd(self.workers_set, {self.name: now})

                            for task_name in task_names:
                                task_workers_set = self.task_workers_set(task_name)
                                pipeline.zremrangebyscore(task_workers_set, 0, oldest)
                                pipeline.zadd(task_workers_set, {self.name: now})

                            pipeline.sadd(self.worker_tasks_set(self.name), *task_names)
                            pipeline.expire(
                                self.worker_tasks_set(self.name),
                                max(
                                    maximum_age, timedelta(seconds=MINIMUM_TTL_SECONDS)
                                ),
                            )

                            await pipeline.execute()

                        async with r.pipeline() as pipeline:
                            pipeline.xlen(self.docket.stream_key)
                            pipeline.zcount(self.docket.queue_key, 0, now)
                            pipeline.zcount(self.docket.queue_key, now, "+inf")

                            results: list[int] = await pipeline.execute()

                    stream_depth = results[0]
                    overdue_depth = results[1]
                    schedule_depth = results[2]

                    QUEUE_DEPTH.set(stream_depth + overdue_depth, self.docket.labels())
                    SCHEDULE_DEPTH.set(schedule_depth, self.docket.labels())

            except asyncio.CancelledError:  # pragma: no cover
                return
            except ConnectionError:
                REDIS_DISRUPTIONS.add(1, self.labels())
                logger.exception(
                    "Error sending worker heartbeat",
                    exc_info=True,
                    extra=self._log_context(),
                )
            except Exception:  # pragma: no cover
                logger.exception(
                    "Error sending worker heartbeat",
                    exc_info=True,
                    extra=self._log_context(),
                )

            await asyncio.sleep(self.docket.heartbeat_interval.total_seconds())

    async def _cancellation_listener(self) -> None:
        """Listen for cancellation signals and cancel matching tasks."""
        cancel_pattern = f"{self.docket.name}:cancel:*"
        log_context = self._log_context()

        while True:
            try:
                async with self.docket.redis() as redis:
                    async with redis.pubsub() as pubsub:
                        await pubsub.psubscribe(cancel_pattern)
                        self._cancellation_ready.set()
                        try:
                            async for message in pubsub.listen():
                                if message["type"] == "pmessage":
                                    await self._handle_cancellation(message)
                        finally:
                            await pubsub.punsubscribe(cancel_pattern)
            except asyncio.CancelledError:
                return
            except ConnectionError:
                REDIS_DISRUPTIONS.add(1, self.labels())
                logger.warning(
                    "Redis connection error in cancellation listener, reconnecting...",
                    extra=log_context,
                )
                await asyncio.sleep(1)
            except Exception:
                logger.exception(
                    "Error in cancellation listener",
                    exc_info=True,
                    extra=log_context,
                )
                await asyncio.sleep(1)

    async def _handle_cancellation(self, message: PubSubMessage) -> None:
        """Handle a cancellation message by cancelling the matching task."""
        data = message["data"]
        key: TaskKey = data.decode() if isinstance(data, bytes) else data

        if task := self._tasks_by_key.get(key):
            logger.info(
                "Cancelling running task %r",
                key,
                extra=self._log_context(),
            )
            task.cancel()

    async def _can_start_task(self, redis: Redis, execution: Execution) -> bool:
        """Check if a task can start based on concurrency limits.

        Uses a Redis sorted set to track concurrency slots per task. Each entry
        is keyed by task_key with the timestamp as the score.

        When XAUTOCLAIM reclaims a message (because the original worker stopped
        renewing its lease), execution.redelivered=True signals that slot takeover
        is safe. If the message is NOT a redelivery and a slot already exists,
        we block to prevent duplicate execution.

        Slots are refreshed during lease renewal (in _renew_leases) every
        redelivery_timeout/4. If all slots are full, we scavenge any slot older
        than redelivery_timeout (meaning it hasn't been refreshed and the worker
        must be dead).
        """
        # Check if task has a concurrency limit dependency
        concurrency_limit = get_single_dependency_parameter_of_type(
            execution.function, ConcurrencyLimit
        )

        if not concurrency_limit:
            return True  # No concurrency limit, can always start

        # Get the concurrency key for this task
        try:
            argument_value = execution.get_argument(concurrency_limit.argument_name)
        except KeyError:
            # If argument not found, let the task fail naturally in execution
            return True

        scope = concurrency_limit.scope or self.docket.name
        concurrency_key = (
            f"{scope}:concurrency:{concurrency_limit.argument_name}:{argument_value}"
        )

        # Lua script for atomic concurrency slot management.
        # Slot takeover requires BOTH redelivery (via XAUTOCLAIM) AND stale slot.
        # Slots are kept alive by periodic refresh in _renew_leases.
        # When full, we scavenge any stale slot (older than redelivery_timeout).
        lua_script = """
        local key = KEYS[1]
        local max_concurrent = tonumber(ARGV[1])
        local task_key = ARGV[2]
        local current_time = tonumber(ARGV[3])
        local is_redelivery = tonumber(ARGV[4])
        local stale_threshold = tonumber(ARGV[5])
        local key_ttl = tonumber(ARGV[6])

        -- Check if this task already has a slot (from a previous delivery attempt)
        local slot_time = redis.call('ZSCORE', key, task_key)
        if slot_time then
            slot_time = tonumber(slot_time)
            if is_redelivery == 1 and slot_time <= stale_threshold then
                -- Redelivery AND slot is stale: original worker stopped renewing,
                -- safe to take over the slot.
                redis.call('ZADD', key, current_time, task_key)
                redis.call('EXPIRE', key, key_ttl)
                return 1
            else
                -- Either not a redelivery, or slot is still fresh (original worker
                -- is just slow, not dead). Don't take over.
                return 0
            end
        end

        -- No existing slot for this task - check if we can acquire a new one
        if redis.call('ZCARD', key) < max_concurrent then
            redis.call('ZADD', key, current_time, task_key)
            redis.call('EXPIRE', key, key_ttl)
            return 1
        end

        -- All slots are full. Scavenge any stale slot (not refreshed recently).
        -- Slots are refreshed every redelivery_timeout/4, so anything older than
        -- redelivery_timeout hasn't been refreshed and the worker must be dead.
        local stale_slots = redis.call('ZRANGEBYSCORE', key, 0, stale_threshold, 'LIMIT', 0, 1)
        if #stale_slots > 0 then
            redis.call('ZREM', key, stale_slots[1])
            redis.call('ZADD', key, current_time, task_key)
            redis.call('EXPIRE', key, key_ttl)
            return 1
        end

        return 0
        """

        current_time = datetime.now(timezone.utc).timestamp()
        stale_threshold = current_time - self.redelivery_timeout.total_seconds()
        key_ttl = max(
            MINIMUM_TTL_SECONDS,
            int(self.redelivery_timeout.total_seconds() * LEASE_RENEWAL_FACTOR),
        )

        result = await redis.eval(  # type: ignore
            lua_script,
            1,
            concurrency_key,
            str(concurrency_limit.max_concurrent),
            execution.key,
            current_time,
            1 if execution.redelivered else 0,
            stale_threshold,
            key_ttl,
        )

        acquired = bool(result)
        if acquired:
            # Track the slot so we can refresh it during lease renewal
            self._concurrency_slots[execution.key] = concurrency_key

        return acquired

    async def _release_concurrency_slot(
        self, redis: Redis, execution: Execution
    ) -> None:
        """Release a concurrency slot when task completes."""
        # Clean up tracking regardless of whether we actually release a slot
        self._concurrency_slots.pop(execution.key, None)

        # Check if task has a concurrency limit dependency
        concurrency_limit = get_single_dependency_parameter_of_type(
            execution.function, ConcurrencyLimit
        )

        if not concurrency_limit:
            return  # No concurrency limit to release

        # Get the concurrency key for this task
        try:
            argument_value = execution.get_argument(concurrency_limit.argument_name)
        except KeyError:
            return  # If argument not found, nothing to release

        scope = concurrency_limit.scope or self.docket.name
        concurrency_key = (
            f"{scope}:concurrency:{concurrency_limit.argument_name}:{argument_value}"
        )

        # Remove this task from the sorted set and delete the key if empty
        lua_script = """
        redis.call('ZREM', KEYS[1], ARGV[1])
        if redis.call('ZCARD', KEYS[1]) == 0 then
            redis.call('DEL', KEYS[1])
        end
        """
        await redis.eval(lua_script, 1, concurrency_key, execution.key)  # type: ignore

run(docket_name='docket', url='redis://localhost:6379/0', name=None, concurrency=10, redelivery_timeout=timedelta(minutes=5), reconnection_delay=timedelta(seconds=5), minimum_check_interval=timedelta(milliseconds=100), scheduling_resolution=timedelta(milliseconds=250), schedule_automatic_tasks=True, enable_internal_instrumentation=False, until_finished=False, healthcheck_port=None, metrics_port=None, tasks=['docket.tasks:standard_tasks'], fallback_task=None) async classmethod

Run a worker as the main entry point (CLI).

This method installs signal handlers for graceful shutdown since it assumes ownership of the event loop. When embedding Docket in another framework (e.g., FastAPI with uvicorn), use Worker.run_forever() or Worker.run_until_finished() directly - those methods do not install signal handlers and rely on the framework to handle shutdown signals.

Source code in src/docket/worker.py
@classmethod
async def run(
    cls,
    docket_name: str = "docket",
    url: str = "redis://localhost:6379/0",
    name: str | None = None,
    concurrency: int = 10,
    redelivery_timeout: timedelta = timedelta(minutes=5),
    reconnection_delay: timedelta = timedelta(seconds=5),
    minimum_check_interval: timedelta = timedelta(milliseconds=100),
    scheduling_resolution: timedelta = timedelta(milliseconds=250),
    schedule_automatic_tasks: bool = True,
    enable_internal_instrumentation: bool = False,
    until_finished: bool = False,
    healthcheck_port: int | None = None,
    metrics_port: int | None = None,
    tasks: list[str] = ["docket.tasks:standard_tasks"],
    fallback_task: str | None = None,
) -> None:
    """Run a worker as the main entry point (CLI).

    This method installs signal handlers for graceful shutdown since it
    assumes ownership of the event loop. When embedding Docket in another
    framework (e.g., FastAPI with uvicorn), use Worker.run_forever() or
    Worker.run_until_finished() directly - those methods do not install
    signal handlers and rely on the framework to handle shutdown signals.
    """
    # Parse fallback_task string if provided (module:function format)
    resolved_fallback_task: TaskFunction | None = None
    if fallback_task:
        module_name, _, member_name = fallback_task.rpartition(":")
        module = importlib.import_module(module_name)
        resolved_fallback_task = getattr(module, member_name)

    with (
        healthcheck_server(port=healthcheck_port),
        metrics_server(port=metrics_port),
    ):
        async with Docket(
            name=docket_name,
            url=url,
            enable_internal_instrumentation=enable_internal_instrumentation,
        ) as docket:
            for task_path in tasks:
                docket.register_collection(task_path)

            async with (
                Worker(  # pragma: no branch - context manager exit varies across interpreters
                    docket=docket,
                    name=name,
                    concurrency=concurrency,
                    redelivery_timeout=redelivery_timeout,
                    reconnection_delay=reconnection_delay,
                    minimum_check_interval=minimum_check_interval,
                    scheduling_resolution=scheduling_resolution,
                    schedule_automatic_tasks=schedule_automatic_tasks,
                    enable_internal_instrumentation=enable_internal_instrumentation,
                    fallback_task=resolved_fallback_task,
                ) as worker
            ):
                # Install signal handlers for graceful shutdown.
                # This is only appropriate when we own the event loop (CLI entry point).
                # Embedded usage should let the framework handle signals.
                loop = asyncio.get_running_loop()
                run_task: asyncio.Task[None] | None = None

                def handle_shutdown(sig_name: str) -> None:  # pragma: no cover
                    logger.info(
                        "Received %s, initiating graceful shutdown...", sig_name
                    )
                    if run_task and not run_task.done():
                        run_task.cancel()

                if hasattr(signal, "SIGTERM"):  # pragma: no cover
                    loop.add_signal_handler(
                        signal.SIGTERM, lambda: handle_shutdown("SIGTERM")
                    )
                    loop.add_signal_handler(
                        signal.SIGINT, lambda: handle_shutdown("SIGINT")
                    )

                try:
                    if until_finished:
                        run_task = asyncio.create_task(worker.run_until_finished())
                    else:
                        run_task = asyncio.create_task(
                            worker.run_forever()
                        )  # pragma: no cover
                    await run_task
                except asyncio.CancelledError:  # pragma: no cover
                    pass
                finally:
                    if hasattr(signal, "SIGTERM"):  # pragma: no cover
                        loop.remove_signal_handler(signal.SIGTERM)
                        loop.remove_signal_handler(signal.SIGINT)

run_at_most(iterations_by_key) async

Run the worker until there are no more tasks to process, but limit specified task keys to a maximum number of iterations.

This is particularly useful for testing self-perpetuating tasks that would otherwise run indefinitely.

Parameters:

Name Type Description Default
iterations_by_key Mapping[str, int]

Maps task keys to their maximum allowed executions

required
Source code in src/docket/worker.py
async def run_at_most(self, iterations_by_key: Mapping[str, int]) -> None:
    """
    Run the worker until there are no more tasks to process, but limit specified
    task keys to a maximum number of iterations.

    This is particularly useful for testing self-perpetuating tasks that would
    otherwise run indefinitely.

    Args:
        iterations_by_key: Maps task keys to their maximum allowed executions
    """
    self._execution_counts = {key: 0 for key in iterations_by_key}

    def has_reached_max_iterations(execution: Execution) -> bool:
        key = execution.key

        if key not in iterations_by_key:
            return False

        if self._execution_counts[key] >= iterations_by_key[key]:
            return True

        return False

    self.docket.strike_list.add_condition(has_reached_max_iterations)
    try:
        await self.run_until_finished()
    finally:
        self.docket.strike_list.remove_condition(has_reached_max_iterations)
        self._execution_counts = {}

run_forever() async

Run the worker indefinitely.

Source code in src/docket/worker.py
async def run_forever(self) -> None:
    """Run the worker indefinitely."""
    return await self._run(forever=True)  # pragma: no cover

run_until_finished() async

Run the worker until there are no more tasks to process.

Source code in src/docket/worker.py
async def run_until_finished(self) -> None:
    """Run the worker until there are no more tasks to process."""
    return await self._run(forever=False)

CurrentDocket()

A dependency to access the current Docket.

Example:

@task
async def my_task(docket: Docket = CurrentDocket()) -> None:
    assert isinstance(docket, Docket)
Source code in src/docket/dependencies.py
def CurrentDocket() -> Docket:
    """A dependency to access the current Docket.

    Example:

    ```python
    @task
    async def my_task(docket: Docket = CurrentDocket()) -> None:
        assert isinstance(docket, Docket)
    ```
    """
    return cast(Docket, _CurrentDocket())

CurrentExecution()

A dependency to access the current Execution.

Example:

@task
async def my_task(execution: Execution = CurrentExecution()) -> None:
    assert isinstance(execution, Execution)
Source code in src/docket/dependencies.py
def CurrentExecution() -> Execution:
    """A dependency to access the current Execution.

    Example:

    ```python
    @task
    async def my_task(execution: Execution = CurrentExecution()) -> None:
        assert isinstance(execution, Execution)
    ```
    """
    return cast(Execution, _CurrentExecution())

CurrentWorker()

A dependency to access the current Worker.

Example:

@task
async def my_task(worker: Worker = CurrentWorker()) -> None:
    assert isinstance(worker, Worker)
Source code in src/docket/dependencies.py
def CurrentWorker() -> "Worker":
    """A dependency to access the current Worker.

    Example:

    ```python
    @task
    async def my_task(worker: Worker = CurrentWorker()) -> None:
        assert isinstance(worker, Worker)
    ```
    """
    return cast("Worker", _CurrentWorker())

Depends(dependency)

Include a user-defined function as a dependency. Dependencies may be: - Synchronous functions returning a value - Asynchronous functions returning a value (awaitable) - Synchronous context managers (using @contextmanager) - Asynchronous context managers (using @asynccontextmanager)

If a dependency returns a context manager, it will be entered and exited around the task, giving an opportunity to control the lifetime of a resource.

Important: Synchronous dependencies should NOT include blocking I/O operations (file access, network calls, database queries, etc.). Use async dependencies for any I/O. Sync dependencies are best for: - Pure computations - In-memory data structure access - Configuration lookups from memory - Non-blocking transformations

Examples:

# Sync dependency - pure computation, no I/O
def get_config() -> dict:
    # Access in-memory config, no I/O
    return {"api_url": "https://api.example.com", "timeout": 30}

# Sync dependency - compute value from arguments
def build_query_params(
    user_id: int = TaskArgument(),
    config: dict = Depends(get_config)
) -> dict:
    # Pure computation, no I/O
    return {"user_id": user_id, "timeout": config["timeout"]}

# Async dependency - I/O operations
async def get_user(user_id: int = TaskArgument()) -> User:
    # Network I/O - must be async
    return await fetch_user_from_api(user_id)

# Async context manager - I/O resource management
from contextlib import asynccontextmanager

@asynccontextmanager
async def get_db_connection():
    # I/O operations - must be async
    conn = await db.connect()
    try:
        yield conn
    finally:
        await conn.close()

@task
async def my_task(
    params: dict = Depends(build_query_params),
    user: User = Depends(get_user),
    db: Connection = Depends(get_db_connection),
) -> None:
    await db.execute("UPDATE users SET ...", params)
Source code in src/docket/dependencies.py
def Depends(dependency: DependencyFunction[R]) -> R:
    """Include a user-defined function as a dependency.  Dependencies may be:
    - Synchronous functions returning a value
    - Asynchronous functions returning a value (awaitable)
    - Synchronous context managers (using @contextmanager)
    - Asynchronous context managers (using @asynccontextmanager)

    If a dependency returns a context manager, it will be entered and exited around
    the task, giving an opportunity to control the lifetime of a resource.

    **Important**: Synchronous dependencies should NOT include blocking I/O operations
    (file access, network calls, database queries, etc.). Use async dependencies for
    any I/O. Sync dependencies are best for:
    - Pure computations
    - In-memory data structure access
    - Configuration lookups from memory
    - Non-blocking transformations

    Examples:

    ```python
    # Sync dependency - pure computation, no I/O
    def get_config() -> dict:
        # Access in-memory config, no I/O
        return {"api_url": "https://api.example.com", "timeout": 30}

    # Sync dependency - compute value from arguments
    def build_query_params(
        user_id: int = TaskArgument(),
        config: dict = Depends(get_config)
    ) -> dict:
        # Pure computation, no I/O
        return {"user_id": user_id, "timeout": config["timeout"]}

    # Async dependency - I/O operations
    async def get_user(user_id: int = TaskArgument()) -> User:
        # Network I/O - must be async
        return await fetch_user_from_api(user_id)

    # Async context manager - I/O resource management
    from contextlib import asynccontextmanager

    @asynccontextmanager
    async def get_db_connection():
        # I/O operations - must be async
        conn = await db.connect()
        try:
            yield conn
        finally:
            await conn.close()

    @task
    async def my_task(
        params: dict = Depends(build_query_params),
        user: User = Depends(get_user),
        db: Connection = Depends(get_db_connection),
    ) -> None:
        await db.execute("UPDATE users SET ...", params)
    ```
    """
    return cast(R, _Depends(dependency))

TaskArgument(parameter=None, optional=False)

A dependency to access a argument of the currently executing task. This is often useful in dependency functions so they can access the arguments of the task they are injected into.

Example:

async def customer_name(customer_id: int = TaskArgument()) -> str:
    ...look up the customer's name by ID...
    return "John Doe"

@task
async def greet_customer(customer_id: int, name: str = Depends(customer_name)) -> None:
    print(f"Hello, {name}!")
Source code in src/docket/dependencies.py
def TaskArgument(parameter: str | None = None, optional: bool = False) -> Any:
    """A dependency to access a argument of the currently executing task.  This is
    often useful in dependency functions so they can access the arguments of the
    task they are injected into.

    Example:

    ```python
    async def customer_name(customer_id: int = TaskArgument()) -> str:
        ...look up the customer's name by ID...
        return "John Doe"

    @task
    async def greet_customer(customer_id: int, name: str = Depends(customer_name)) -> None:
        print(f"Hello, {name}!")
    ```
    """
    return cast(Any, _TaskArgument(parameter, optional))

TaskKey()

A dependency to access the key of the currently executing task.

Example:

@task
async def my_task(key: str = TaskKey()) -> None:
    assert isinstance(key, str)
Source code in src/docket/dependencies.py
def TaskKey() -> str:
    """A dependency to access the key of the currently executing task.

    Example:

    ```python
    @task
    async def my_task(key: str = TaskKey()) -> None:
        assert isinstance(key, str)
    ```
    """
    return cast(str, _TaskKey())

TaskLogger()

A dependency to access a logger for the currently executing task. The logger will automatically inject contextual information such as the worker and docket name, the task key, and the current execution attempt number.

Example:

@task
async def my_task(logger: "LoggerAdapter[Logger]" = TaskLogger()) -> None:
    logger.info("Hello, world!")
Source code in src/docket/dependencies.py
def TaskLogger() -> "logging.LoggerAdapter[logging.Logger]":
    """A dependency to access a logger for the currently executing task.  The logger
    will automatically inject contextual information such as the worker and docket
    name, the task key, and the current execution attempt number.

    Example:

    ```python
    @task
    async def my_task(logger: "LoggerAdapter[Logger]" = TaskLogger()) -> None:
        logger.info("Hello, world!")
    ```
    """
    return cast("logging.LoggerAdapter[logging.Logger]", _TaskLogger())