class documentation

Client for publishing to and subscribing from a workflow stream.

Warning

This class is experimental and may change in future versions.

Create via create (explicit client + workflow id), from_within_activity (infer both from the current activity context), or by passing a handle directly to the constructor.

For publishing, bind a typed topic handle and use the client as an async context manager to get automatic batching:

client = WorkflowStreamClient.create(temporal_client, workflow_id)
events = client.topic("events", type=MyEvent)
async with client:
    events.publish(my_event)
    events.publish(another_event, force_flush=True)
    ...  # more publishing
# Buffer is flushed automatically on context manager exit.

For subscribing:

client = WorkflowStreamClient.create(temporal_client, workflow_id)
async for item in client.subscribe(["events"], result_type=MyEvent):
    process(item.data)
Class Method create Create a stream client from a Temporal client and workflow ID.
Class Method from_within_activity Create a stream client targeting the current activity's parent workflow.
Async Method __aenter__ Start the background flusher task.
Async Method __aexit__ Stop the flusher and flush any remaining buffered entries.
Method __init__ Create a stream client from a workflow handle.
Async Method flush Flush buffered (and pending) items and wait for server confirmation.
Async Method get_offset Query the current global offset (base_offset + log length).
Async Method subscribe Async iterator that polls for new items.
Method topic Return a typed handle for publishing to and subscribing from name.
Method _encode_buffer Convert buffered (topic, value) pairs to wire entries.
Async Method _flush Send buffered or pending messages to the workflow via signal.
Async Method _follow_continue_as_new Check if the workflow continued-as-new and re-target the handle.
Method _payload_converter Return the sync payload converter for per-item encode/decode.
Method _publish_to_topic Internal publish path used by TopicHandle.
Async Method _run_flusher Background task: wait for timer OR force_flush wakeup, then flush.
Async Method _workflow_in_terminal_state Return True if the workflow has reached a terminal state.
Instance Variable _batch_interval Undocumented
Instance Variable _buffer Undocumented
Instance Variable _client Undocumented
Instance Variable _flush_event Undocumented
Instance Variable _flush_lock Undocumented
Instance Variable _flush_task Undocumented
Instance Variable _handle Undocumented
Instance Variable _max_batch_size Undocumented
Instance Variable _max_retry_duration Undocumented
Instance Variable _pending Undocumented
Instance Variable _pending_seq Undocumented
Instance Variable _pending_since Undocumented
Instance Variable _publisher_id Undocumented
Instance Variable _sequence Undocumented
Instance Variable _topic_types Undocumented
Instance Variable _workflow_id Undocumented
def create(cls, client: Client, workflow_id: str, *, batch_interval: timedelta = timedelta(seconds=2), max_batch_size: int | None = None, max_retry_duration: timedelta = timedelta(seconds=600)) -> WorkflowStreamClient: (source)

Create a stream client from a Temporal client and workflow ID.

Use this when the caller has an explicit Client and workflow_id in hand (starters, BFFs, other workflows' activities). For code running inside an activity that targets its own parent workflow, see from_within_activity.

A client created through this method follows continue-as-new chains in subscribe() and uses the client's payload converter for per-item Payload construction.

Parameters
client:ClientTemporal client.
workflow_id:strID of the workflow hosting the stream.
batch_interval:timedeltaInterval between automatic flushes.
max_batch_size:int | NoneAuto-flush when buffer reaches this size.
max_retry_duration:timedeltaMaximum time to retry a failed flush before raising TimeoutError. Default: 10 minutes.
Returns
WorkflowStreamClientUndocumented
def from_within_activity(cls, *, batch_interval: timedelta = timedelta(seconds=2), max_batch_size: int | None = None, max_retry_duration: timedelta = timedelta(seconds=600)) -> WorkflowStreamClient: (source)

Create a stream client targeting the current activity's parent workflow.

Must be called from within an activity that was scheduled by a workflow. The Temporal client and parent workflow id are taken from the activity context.

Standalone activities — those started directly via temporalio.client.Client.start_activity rather than from a workflow — have no parent workflow, so this method raises. Use create from a standalone activity, passing activity.client() and the target workflow id explicitly (typically threaded through the activity's input).

Parameters
batch_interval:timedeltaInterval between automatic flushes.
max_batch_size:int | NoneAuto-flush when buffer reaches this size.
max_retry_duration:timedeltaMaximum time to retry a failed flush before raising TimeoutError. Default: 10 minutes.
Returns
WorkflowStreamClientUndocumented
async def __aenter__(self) -> Self: (source)

Start the background flusher task.

async def __aexit__(self, *_exc: object): (source)

Stop the flusher and flush any remaining buffered entries.

def __init__(self, handle: WorkflowHandle[Any, Any], *, client: Client | None = None, batch_interval: timedelta = timedelta(seconds=2), max_batch_size: int | None = None, max_retry_duration: timedelta = timedelta(seconds=600)): (source)

Create a stream client from a workflow handle.

Prefer create — it enables continue-as-new following in subscribe() and supplies the Client needed to reach the data converter chain.

Parameters
handle:WorkflowHandle[Any, Any]Workflow handle to the workflow hosting the stream.
client:Client | NoneTemporal client whose payload converter will be used to turn published values into Payload objects and to decode subscriptions when result_type is set. The codec chain is not applied per item (doing so would double-encrypt — see module docstring). If None, the default payload converter is used.
batch_interval:timedeltaInterval between automatic flushes.
max_batch_size:int | NoneAuto-flush when buffer reaches this size.
max_retry_duration:timedeltaMaximum time to retry a failed flush before raising TimeoutError. Must be less than the workflow's publisher_ttl (default 15 minutes) to preserve exactly-once delivery. Default: 10 minutes.
async def flush(self): (source)

Flush buffered (and pending) items and wait for server confirmation.

Returns once the items buffered at call time have been signaled to the workflow and acknowledged by the server. Returns immediately if there is nothing to send.

This is in addition to the declarative force_flush=True on TopicHandle.publish and to the automatic flush on context-manager exit. Use this when you need a synchronization point — proof that prior publications have reached the server — at a moment that does not naturally correspond to a specific event.

Safe to call concurrently with topic-handle publishes and with the background flusher: the flush lock serializes signal sends. Items added concurrently after entry may piggyback on this flush or be deferred to a subsequent one.

Raises
TimeoutErrorIf a pending batch from a prior failure cannot be sent within max_retry_duration. The pending batch is dropped; subsequent publications use a fresh sequence.
async def get_offset(self) -> int: (source)

Query the current global offset (base_offset + log length).

@overload
async def subscribe(self, topics: str | list[str] | None = ..., from_offset: int = ..., *, result_type: type[T], poll_cooldown: timedelta = ...) -> AsyncIterator[WorkflowStreamItem[T]]:
@overload
async def subscribe(self, topics: str | list[str] | None = ..., from_offset: int = ..., *, result_type: None = None, poll_cooldown: timedelta = ...) -> AsyncIterator[WorkflowStreamItem[Any]]:
(source)

Async iterator that polls for new items.

Automatically follows continue-as-new chains when the client was created via create.

Parameters
topics:str | list[str] | NoneTopic filter. A single topic name, a list of topic names, or None. None or empty list means all topics.
from_offset:intGlobal offset to start reading from.
result_type:type | NoneOptional target type. Each yielded WorkflowStreamItem has its data decoded via the client's sync payload converter. When omitted, the converter's default Any decoding is used (for the stock JSON converter that means a Python primitive, dict, or list). Pass result_type=temporalio.common.RawValue for an opaque RawValue wrapping the original Payload — useful for heterogeneous topics where the caller dispatches on Payload.metadata or wants to forward the bytes without decoding.
poll_cooldown:timedeltaMinimum interval between polls to avoid overwhelming the workflow when items arrive faster than the poll round-trip. Defaults to 100ms.
Returns
AsyncIterator[WorkflowStreamItem[Any]]Undocumented
Yields
WorkflowStreamItem for each matching item.
@overload
def topic(self, name: str) -> TopicHandle[Any]:
@overload
def topic(self, name: str, *, type: type[T]) -> TopicHandle[T]:
(source)

Return a typed handle for publishing to and subscribing from name.

The handle records the topic name and value type so call sites do not have to repeat them. Each WorkflowStreamClient instance binds a topic name to exactly one type: a second call with an unequal type raises RuntimeError. Repeating the same call with the same type is idempotent and returns an equivalent handle.

Type uniformity is checked only on this client instance — it does not coordinate across processes. The check uses Python equality on the type object; subtype and union-superset relationships are not recognized.

Omitting type (or passing type=typing.Any) is the documented escape hatch for heterogeneous topics or dynamic-topic forwarders: the handle accepts any value, and subscribers receive the converter's default decoded value. Pre-built Payload values can be passed to TopicHandle.publish regardless of the bound type (zero-copy fast path) — there is no need to bind the topic to Payload itself, and doing so would break the subscribe path (use result_type=RawValue on WorkflowStreamClient.subscribe if you need raw payloads on a subscriber).

Parameters
name:strTopic name.
type:type[T] | NoneValue type bound to this handle. Used as the result_type when subscribing through the handle. Defaults to typing.Any (heterogeneous topic).
Returns
TopicHandle[T] | TopicHandle[Any]TopicHandle bound to name and the resolved type.
Raises
RuntimeErrorIf name is already bound on this client to a different type.
def _encode_buffer(self, entries: list[tuple[str, Any]]) -> list[PublishEntry]: (source)

Convert buffered (topic, value) pairs to wire entries.

Non-Payload values go through the sync payload converter so the resulting Payload carries encoding metadata for result_type= decode on the consumer side. Pre-built Payloads bypass conversion.

async def _flush(self): (source)

Send buffered or pending messages to the workflow via signal.

On failure, the pending batch and sequence are kept for retry. Only advances the confirmed sequence on success.

async def _follow_continue_as_new(self) -> bool: (source)

Check if the workflow continued-as-new and re-target the handle.

Returns True if the handle was updated (caller should retry).

def _payload_converter(self) -> PayloadConverter: (source)

Return the sync payload converter for per-item encode/decode.

Uses the configured client's payload converter when available; otherwise falls back to the default. The codec chain (e.g. encryption, compression) is intentionally not invoked here — it runs once at the envelope level when the signal/update goes over the wire. See module docstring.

def _publish_to_topic(self, topic: str, value: Any, *, force_flush: bool = False): (source)

Internal publish path used by TopicHandle.

Not part of the public API — call TopicHandle.publish instead.

async def _run_flusher(self): (source)

Background task: wait for timer OR force_flush wakeup, then flush.

async def _workflow_in_terminal_state(self) -> bool: (source)

Return True if the workflow has reached a terminal state.

Used by subscribe() to distinguish "workflow finished — stream is done" from "wrong workflow id" when a poll RPC returns NOT_FOUND.

_batch_interval = (source)

Undocumented

_buffer: list[tuple[str, Any]] = (source)

Undocumented

_client: Client | None = (source)

Undocumented

_flush_event = (source)

Undocumented

_flush_lock = (source)

Undocumented

Undocumented

_handle: WorkflowHandle[Any, Any] = (source)

Undocumented

_max_batch_size = (source)

Undocumented

_max_retry_duration = (source)

Undocumented

Undocumented

_pending_seq: int = (source)

Undocumented

_pending_since: float | None = (source)

Undocumented

_publisher_id: str = (source)

Undocumented

_sequence: int = (source)

Undocumented

_topic_types: dict[str, type[Any]] = (source)

Undocumented

_workflow_id = (source)

Undocumented