class documentation

class WorkflowStream: (source)

Constructor: WorkflowStream(prior_state)

View In Hierarchy

Workflow-side stream object — append-only log with publish/poll handlers.

Warning

This class is experimental and may change in future versions.

Construct once from @workflow.init; the constructor registers the stream signal, update, and query handlers on the current workflow. Raises RuntimeError if a WorkflowStream has already been registered on the workflow.

Registered handlers:

  • __temporal_workflow_stream_publish signal — external publish with dedup
  • __temporal_workflow_stream_poll update — long-poll subscription
  • __temporal_workflow_stream_offset query — current log length

Note

Because the publish handler is registered dynamically from __init__, on the activation where the stream is constructed the publish signal can be buffered until after class-level signal/update handlers are scheduled. Define such handlers as async and await asyncio.sleep(0) before reading stream state, so the publish signal is processed first.

Method __init__ Initialize stream state and register workflow handlers.
Async Method continue_as_new Detach pollers, wait for handlers, continue-as-new with built args.
Method detach_pollers Release waiting pollers and reject new poll updates.
Method get_state Return a serializable snapshot of stream state for continue-as-new.
Method topic Return a typed handle for publishing to name from this workflow.
Method truncate Discard log entries before up_to_offset.
Method _on_offset Return the current global offset (base_offset + log length).
Async Method _on_poll Long-poll: block until new items available or detaching, then return.
Method _on_publish Receive publications from external clients (activities, starters).
Method _publish_to_topic Internal publish path used by WorkflowTopicHandle.
Method _validate_poll Reject new polls when pollers are detached for continue-as-new.
Instance Variable _base_offset Undocumented
Instance Variable _detaching Undocumented
Instance Variable _log Undocumented
Instance Variable _publishers Undocumented
Instance Variable _topic_types Undocumented
def __init__(self, prior_state: WorkflowStreamState | None = None): (source)

Initialize stream state and register workflow handlers.

Must be called directly from the workflow's @workflow.init method. Calls made from @workflow.run, helper methods, or signal/update/query handlers raise RuntimeError.

The check inspects the immediate caller's frame and requires the function name to be __init__.

Note

When carrying state across continue-as-new, type the carrying field as WorkflowStreamState | None, not Any. The default data converter deserializes Any fields as plain dicts, which silently strips the WorkflowStreamState type and breaks the new run.

Parameters
prior_state:WorkflowStreamState | NoneState carried from a previous run via get_state through continue-as-new, or None on first start.
Raises
RuntimeErrorIf not called directly from a method named __init__, or if the stream signal handler is already registered on this workflow (i.e., WorkflowStream was instantiated twice).
async def continue_as_new(self, build_args: Callable[[WorkflowStreamState], Sequence[Any]], *, publisher_ttl: timedelta = timedelta(seconds=900)) -> NoReturn: (source)

Detach pollers, wait for handlers, continue-as-new with built args.

Replaces this three-line recipe for the common case where the only continue-as-new parameter that varies is args:

self.stream.detach_pollers()
await workflow.wait_condition(workflow.all_handlers_finished)
workflow.continue_as_new(args=...)

build_args is invoked after pollers have been detached, with the post-detach WorkflowStreamState as its single argument. The caller threads that state into whatever input dataclass the workflow expects:

await self.stream.continue_as_new(lambda state: [WorkflowInput(
    items_processed=self.items_processed,
    stream_state=state,
)])

Workflows that need to override other CAN parameters (task_queue, retry_policy, run_timeout, etc.) should keep using the explicit detach_pollers / wait_condition / workflow.continue_as_new(...) recipe.

Does not return; workflow.continue_as_new raises an internal exception that the SDK uses to close the run.

Parameters
build_args:Callable[[WorkflowStreamState], Sequence[Any]]Callable that receives the post-detach stream state and returns the positional args for the new run.
publisher_ttl:timedeltaForwarded to get_state.
Returns
NoReturnUndocumented
def detach_pollers(self): (source)

Release waiting pollers and reject new poll updates.

After this call the stream's __temporal_workflow_stream_poll update handler releases its in-flight subscribers on this run: each waiting poll returns its current item batch (often empty) so the consumer can either follow continue-as-new or stop, and new polls are rejected at the validator. Publishes still land in the in-memory log and get_state / continue_as_new remain valid — the stream is being held open just long enough to snapshot state and hand off to the next run.

Call this before await workflow.wait_condition(workflow.all_handlers_finished) and workflow.continue_as_new().

def get_state(self, *, publisher_ttl: timedelta = timedelta(seconds=900)) -> WorkflowStreamState: (source)

Return a serializable snapshot of stream state for continue-as-new.

Drops dedup state for publishers idle longer than publisher_ttl. The TTL must exceed the max_retry_duration of any client that may still be retrying a failed flush.

Parameters
publisher_ttl:timedeltaDuration after which an idle publisher's dedup state is dropped. Default 15 minutes.
Returns
WorkflowStreamStateUndocumented
@overload
def topic(self, name: str) -> WorkflowTopicHandle[Any]:
@overload
def topic(self, name: str, *, type: type[T]) -> WorkflowTopicHandle[T]:
(source)

Return a typed handle for publishing to name from this workflow.

The handle records the topic name and value type so call sites do not have to repeat them. Each WorkflowStream instance binds a topic name to exactly one type: a second call with an unequal type raises RuntimeError. Repeating the same call with the same type is idempotent and returns an equivalent handle.

Type uniformity is checked only on this stream instance — it does not coordinate across publishers (other workflows, activities, external clients). The check uses Python equality on the type object; subtype and union-superset relationships are not recognized.

Omitting type (or passing type=typing.Any) is the documented escape hatch for heterogeneous topics. Pre-built Payload values can be passed to WorkflowTopicHandle.publish regardless of the bound type (zero-copy fast path) — there is no need to bind the topic to Payload itself.

Parameters
name:strTopic name.
type:type[T] | NoneValue type bound to this handle. Defaults to typing.Any (heterogeneous topic).
Returns
WorkflowTopicHandle[T] | WorkflowTopicHandle[Any]WorkflowTopicHandle bound to name and the resolved type.
Raises
RuntimeErrorIf name is already bound on this stream to a different type.
def truncate(self, up_to_offset: int): (source)

Discard log entries before up_to_offset.

After truncation, polls requesting an offset before the new base will receive an ApplicationError. All global offsets remain monotonic.

Raises ApplicationError (not ValueError) when up_to_offset is past the end of the log so that callers invoking this from an update handler surface it as an update failure rather than a workflow-task poison pill.

Parameters
up_to_offset:intThe global offset to truncate up to (exclusive). Entries at offsets [base_offset, up_to_offset) are discarded.
def _on_offset(self) -> int: (source)

Return the current global offset (base_offset + log length).

async def _on_poll(self, payload: PollInput) -> PollResult: (source)

Long-poll: block until new items available or detaching, then return.

def _on_publish(self, payload: PublishInput): (source)

Receive publications from external clients (activities, starters).

Deduplicates using (publisher_id, sequence). If publisher_id is set and the sequence is <= the last seen sequence for that publisher, the entire batch is dropped as a duplicate. Batches are atomic: the dedup decision applies to the whole batch, not individual items.

This block is a polyfill for missing server-side request_id dedup across continue-as-new. If the SDK ever exposes request_id on signals and the server dedups it across CAN, this branch and the _publishers state become redundant. See DESIGN §"Replace workflow-side dedup with server-side request_id" for the migration plan.

def _publish_to_topic(self, topic: str, value: Any): (source)

Internal publish path used by WorkflowTopicHandle.

Not part of the public API — call WorkflowTopicHandle.publish instead.

def _validate_poll(self, _payload: PollInput): (source)

Reject new polls when pollers are detached for continue-as-new.

_base_offset: int = (source)

Undocumented

_detaching: bool = (source)

Undocumented

Undocumented

Undocumented

_topic_types: dict[str, type[Any]] = (source)

Undocumented