Workflow-side stream object — append-only log with publish/poll handlers.
Warning
This class is experimental and may change in future versions.
Construct once from @workflow.init; the constructor registers
the stream signal, update, and query handlers on the current
workflow. Raises RuntimeError if a WorkflowStream has
already been registered on the workflow.
Registered handlers:
- __temporal_workflow_stream_publish signal — external publish with dedup
- __temporal_workflow_stream_poll update — long-poll subscription
- __temporal_workflow_stream_offset query — current log length
Note
Because the publish handler is registered dynamically from __init__, on the activation where the stream is constructed the publish signal can be buffered until after class-level signal/update handlers are scheduled. Define such handlers as async and await asyncio.sleep(0) before reading stream state, so the publish signal is processed first.
| Method | __init__ |
Initialize stream state and register workflow handlers. |
| Async Method | continue |
Detach pollers, wait for handlers, continue-as-new with built args. |
| Method | detach |
Release waiting pollers and reject new poll updates. |
| Method | get |
Return a serializable snapshot of stream state for continue-as-new. |
| Method | topic |
Return a typed handle for publishing to name from this workflow. |
| Method | truncate |
Discard log entries before up_to_offset. |
| Method | _on |
Return the current global offset (base_offset + log length). |
| Async Method | _on |
Long-poll: block until new items available or detaching, then return. |
| Method | _on |
Receive publications from external clients (activities, starters). |
| Method | _publish |
Internal publish path used by WorkflowTopicHandle. |
| Method | _validate |
Reject new polls when pollers are detached for continue-as-new. |
| Instance Variable | _base |
Undocumented |
| Instance Variable | _detaching |
Undocumented |
| Instance Variable | _log |
Undocumented |
| Instance Variable | _publishers |
Undocumented |
| Instance Variable | _topic |
Undocumented |
Initialize stream state and register workflow handlers.
Must be called directly from the workflow's @workflow.init
method. Calls made from @workflow.run, helper methods, or
signal/update/query handlers raise RuntimeError.
The check inspects the immediate caller's frame and requires the function name to be __init__.
Note
When carrying state across continue-as-new, type the carrying field as WorkflowStreamState | None, not Any. The default data converter deserializes Any fields as plain dicts, which silently strips the WorkflowStreamState type and breaks the new run.
| Parameters | |
priorWorkflowStreamState | None | State carried from a previous run via
get_state through continue-as-new, or None
on first start. |
| Raises | |
RuntimeError | If not called directly from a method named __init__, or if the stream signal handler is already registered on this workflow (i.e., WorkflowStream was instantiated twice). |
Callable[ [ WorkflowStreamState], Sequence[ Any]], *, publisher_ttl: timedelta = timedelta(NoReturn:
(source)
¶
Detach pollers, wait for handlers, continue-as-new with built args.
Replaces this three-line recipe for the common case where the only continue-as-new parameter that varies is args:
self.stream.detach_pollers()
await workflow.wait_condition(workflow.all_handlers_finished)
workflow.continue_as_new(args=...)build_args is invoked after pollers have been detached,
with the post-detach WorkflowStreamState as its single
argument. The caller threads that state into whatever input
dataclass the workflow expects:
await self.stream.continue_as_new(lambda state: [WorkflowInput( items_processed=self.items_processed, stream_state=state, )])
Workflows that need to override other CAN parameters (task_queue, retry_policy, run_timeout, etc.) should keep using the explicit detach_pollers / wait_condition / workflow.continue_as_new(...) recipe.
Does not return; workflow.continue_as_new raises an internal exception that the SDK uses to close the run.
| Parameters | |
buildCallable[ | Callable that receives the post-detach stream state and returns the positional args for the new run. |
publishertimedelta | Forwarded to get_state. |
| Returns | |
NoReturn | Undocumented |
Release waiting pollers and reject new poll updates.
After this call the stream's __temporal_workflow_stream_poll update handler releases its in-flight subscribers on this run: each waiting poll returns its current item batch (often empty) so the consumer can either follow continue-as-new or stop, and new polls are rejected at the validator. Publishes still land in the in-memory log and get_state / continue_as_new remain valid — the stream is being held open just long enough to snapshot state and hand off to the next run.
Call this before await workflow.wait_condition(workflow.all_handlers_finished) and workflow.continue_as_new().
timedelta = timedelta(WorkflowStreamState:
(source)
¶
Return a serializable snapshot of stream state for continue-as-new.
Drops dedup state for publishers idle longer than publisher_ttl. The TTL must exceed the max_retry_duration of any client that may still be retrying a failed flush.
| Parameters | |
publishertimedelta | Duration after which an idle publisher's dedup state is dropped. Default 15 minutes. |
| Returns | |
WorkflowStreamState | Undocumented |
Return a typed handle for publishing to name from this workflow.
The handle records the topic name and value type so call sites
do not have to repeat them. Each WorkflowStream
instance binds a topic name to exactly one type: a second call
with an unequal type raises RuntimeError. Repeating the
same call with the same type is idempotent and returns an
equivalent handle.
Type uniformity is checked only on this stream instance — it does not coordinate across publishers (other workflows, activities, external clients). The check uses Python equality on the type object; subtype and union-superset relationships are not recognized.
Omitting type (or passing type=typing.Any) is the
documented escape hatch for heterogeneous topics. Pre-built
Payload values can be passed to
WorkflowTopicHandle.publish regardless of the bound
type (zero-copy fast path) — there is no need to bind the
topic to Payload itself.
| Parameters | |
name:str | Topic name. |
type:type[ | Value type bound to this handle. Defaults to typing.Any (heterogeneous topic). |
| Returns | |
WorkflowTopicHandle[ | WorkflowTopicHandle bound to name and the
resolved type. |
| Raises | |
RuntimeError | If name is already bound on this stream to a different type. |
Discard log entries before up_to_offset.
After truncation, polls requesting an offset before the new base will receive an ApplicationError. All global offsets remain monotonic.
Raises ApplicationError (not ValueError) when up_to_offset is past the end of the log so that callers invoking this from an update handler surface it as an update failure rather than a workflow-task poison pill.
| Parameters | |
upint | The global offset to truncate up to (exclusive). Entries at offsets [base_offset, up_to_offset) are discarded. |
Receive publications from external clients (activities, starters).
Deduplicates using (publisher_id, sequence). If publisher_id is set and the sequence is <= the last seen sequence for that publisher, the entire batch is dropped as a duplicate. Batches are atomic: the dedup decision applies to the whole batch, not individual items.
This block is a polyfill for missing server-side request_id dedup across continue-as-new. If the SDK ever exposes request_id on signals and the server dedups it across CAN, this branch and the _publishers state become redundant. See DESIGN §"Replace workflow-side dedup with server-side request_id" for the migration plan.
Internal publish path used by WorkflowTopicHandle.
Not part of the public API — call
WorkflowTopicHandle.publish instead.