class WorkflowStreamClient: (source)
Constructors: WorkflowStreamClient.create(client, workflow_id, batch_interval, max_batch_size, ...), WorkflowStreamClient.from_within_activity(batch_interval, max_batch_size, max_retry_duration), WorkflowStreamClient(handle, client, batch_interval, max_batch_size, ...)
Client for publishing to and subscribing from a workflow stream.
Warning
This class is experimental and may change in future versions.
Create via create (explicit client + workflow id),
from_within_activity (infer both from the current activity
context), or by passing a handle directly to the constructor.
For publishing, bind a typed topic handle and use the client as an async context manager to get automatic batching:
client = WorkflowStreamClient.create(temporal_client, workflow_id)
events = client.topic("events", type=MyEvent)
async with client:
events.publish(my_event)
events.publish(another_event, force_flush=True)
... # more publishing
# Buffer is flushed automatically on context manager exit.
For subscribing:
client = WorkflowStreamClient.create(temporal_client, workflow_id)
async for item in client.subscribe(["events"], result_type=MyEvent):
process(item.data)
| Class Method | create |
Create a stream client from a Temporal client and workflow ID. |
| Class Method | from |
Create a stream client targeting the current activity's parent workflow. |
| Async Method | __aenter__ |
Start the background flusher task. |
| Async Method | __aexit__ |
Stop the flusher and flush any remaining buffered entries. |
| Method | __init__ |
Create a stream client from a workflow handle. |
| Async Method | flush |
Flush buffered (and pending) items and wait for server confirmation. |
| Async Method | get |
Query the current global offset (base_offset + log length). |
| Async Method | subscribe |
Async iterator that polls for new items. |
| Method | topic |
Return a typed handle for publishing to and subscribing from name. |
| Method | _encode |
Convert buffered (topic, value) pairs to wire entries. |
| Async Method | _flush |
Send buffered or pending messages to the workflow via signal. |
| Async Method | _follow |
Check if the workflow continued-as-new and re-target the handle. |
| Method | _payload |
Return the sync payload converter for per-item encode/decode. |
| Method | _publish |
Internal publish path used by TopicHandle. |
| Async Method | _run |
Background task: wait for timer OR force_flush wakeup, then flush. |
| Async Method | _workflow |
Return True if the workflow has reached a terminal state. |
| Instance Variable | _batch |
Undocumented |
| Instance Variable | _buffer |
Undocumented |
| Instance Variable | _client |
Undocumented |
| Instance Variable | _flush |
Undocumented |
| Instance Variable | _flush |
Undocumented |
| Instance Variable | _flush |
Undocumented |
| Instance Variable | _handle |
Undocumented |
| Instance Variable | _max |
Undocumented |
| Instance Variable | _max |
Undocumented |
| Instance Variable | _pending |
Undocumented |
| Instance Variable | _pending |
Undocumented |
| Instance Variable | _pending |
Undocumented |
| Instance Variable | _publisher |
Undocumented |
| Instance Variable | _sequence |
Undocumented |
| Instance Variable | _topic |
Undocumented |
| Instance Variable | _workflow |
Undocumented |
Client, workflow_id: str, *, batch_interval: timedelta = timedelta(int | None = None, max_retry_duration: timedelta = timedelta(WorkflowStreamClient:
(source)
¶
Create a stream client from a Temporal client and workflow ID.
Use this when the caller has an explicit Client and
workflow_id in hand (starters, BFFs, other workflows'
activities). For code running inside an activity that targets
its own parent workflow, see from_within_activity.
A client created through this method follows continue-as-new chains in subscribe() and uses the client's payload converter for per-item Payload construction.
| Parameters | |
client:Client | Temporal client. |
workflowstr | ID of the workflow hosting the stream. |
batchtimedelta | Interval between automatic flushes. |
maxint | None | Auto-flush when buffer reaches this size. |
maxtimedelta | Maximum time to retry a failed flush before raising TimeoutError. Default: 10 minutes. |
| Returns | |
WorkflowStreamClient | Undocumented |
timedelta = timedelta(int | None = None, max_retry_duration: timedelta = timedelta(WorkflowStreamClient:
(source)
¶
Create a stream client targeting the current activity's parent workflow.
Must be called from within an activity that was scheduled by a workflow. The Temporal client and parent workflow id are taken from the activity context.
Standalone activities — those started directly via
temporalio.client.Client.start_activity rather than
from a workflow — have no parent workflow, so this method
raises. Use create from a standalone activity,
passing activity.client() and the target workflow id
explicitly (typically threaded through the activity's input).
| Parameters | |
batchtimedelta | Interval between automatic flushes. |
maxint | None | Auto-flush when buffer reaches this size. |
maxtimedelta | Maximum time to retry a failed flush before raising TimeoutError. Default: 10 minutes. |
| Returns | |
WorkflowStreamClient | Undocumented |
WorkflowHandle[ Any, Any], *, client: Client | None = None, batch_interval: timedelta = timedelta(int | None = None, max_retry_duration: timedelta = timedelta(Create a stream client from a workflow handle.
Prefer create — it enables continue-as-new following
in subscribe() and supplies the Client needed to
reach the data converter chain.
| Parameters | |
handle:WorkflowHandle[ | Workflow handle to the workflow hosting the stream. |
client:Client | None | Temporal client whose payload converter will be used to turn published values into Payload objects and to decode subscriptions when result_type is set. The codec chain is not applied per item (doing so would double-encrypt — see module docstring). If None, the default payload converter is used. |
batchtimedelta | Interval between automatic flushes. |
maxint | None | Auto-flush when buffer reaches this size. |
maxtimedelta | Maximum time to retry a failed flush before raising TimeoutError. Must be less than the workflow's publisher_ttl (default 15 minutes) to preserve exactly-once delivery. Default: 10 minutes. |
Flush buffered (and pending) items and wait for server confirmation.
Returns once the items buffered at call time have been signaled to the workflow and acknowledged by the server. Returns immediately if there is nothing to send.
This is in addition to the declarative force_flush=True on
TopicHandle.publish and to the automatic flush on
context-manager exit. Use this when you need a synchronization
point — proof that prior publications have reached the
server — at a moment that does not naturally correspond to a
specific event.
Safe to call concurrently with topic-handle publishes and with the background flusher: the flush lock serializes signal sends. Items added concurrently after entry may piggyback on this flush or be deferred to a subsequent one.
| Raises | |
TimeoutError | If a pending batch from a prior failure cannot be sent within max_retry_duration. The pending batch is dropped; subsequent publications use a fresh sequence. |
Async iterator that polls for new items.
Automatically follows continue-as-new chains when the client
was created via create.
| Parameters | |
topics:str | list[ | Topic filter. A single topic name, a list of topic names, or None. None or empty list means all topics. |
fromint | Global offset to start reading from. |
resulttype | None | Optional target type. Each yielded
WorkflowStreamItem has its data decoded via
the client's sync payload converter. When omitted, the
converter's default Any decoding is used (for the
stock JSON converter that means a Python primitive,
dict, or list). Pass
result_type=temporalio.common.RawValue for an
opaque RawValue wrapping the original
Payload — useful for heterogeneous topics where
the caller dispatches on Payload.metadata or wants
to forward the bytes without decoding. |
polltimedelta | Minimum interval between polls to avoid overwhelming the workflow when items arrive faster than the poll round-trip. Defaults to 100ms. |
| Returns | |
AsyncIterator[ | Undocumented |
| Yields | |
WorkflowStreamItem for each matching item. |
Return a typed handle for publishing to and subscribing from name.
The handle records the topic name and value type so call sites
do not have to repeat them. Each WorkflowStreamClient
instance binds a topic name to exactly one type: a second call
with an unequal type raises RuntimeError. Repeating the
same call with the same type is idempotent and returns an
equivalent handle.
Type uniformity is checked only on this client instance — it does not coordinate across processes. The check uses Python equality on the type object; subtype and union-superset relationships are not recognized.
Omitting type (or passing type=typing.Any) is the
documented escape hatch for heterogeneous topics or
dynamic-topic forwarders: the handle accepts any value, and
subscribers receive the converter's default decoded value.
Pre-built Payload values can be passed to
TopicHandle.publish regardless of the bound type
(zero-copy fast path) — there is no need to bind the topic to
Payload itself, and doing so would break the subscribe
path (use result_type=RawValue on
WorkflowStreamClient.subscribe if you need raw
payloads on a subscriber).
| Parameters | |
name:str | Topic name. |
type:type[ | Value type bound to this handle. Used as the result_type when subscribing through the handle. Defaults to typing.Any (heterogeneous topic). |
| Returns | |
TopicHandle[ | TopicHandle bound to name and the resolved
type. |
| Raises | |
RuntimeError | If name is already bound on this client to a different type. |
Convert buffered (topic, value) pairs to wire entries.
Non-Payload values go through the sync payload converter so the resulting Payload carries encoding metadata for result_type= decode on the consumer side. Pre-built Payloads bypass conversion.
Send buffered or pending messages to the workflow via signal.
On failure, the pending batch and sequence are kept for retry. Only advances the confirmed sequence on success.
Check if the workflow continued-as-new and re-target the handle.
Returns True if the handle was updated (caller should retry).
Return the sync payload converter for per-item encode/decode.
Uses the configured client's payload converter when available; otherwise falls back to the default. The codec chain (e.g. encryption, compression) is intentionally not invoked here — it runs once at the envelope level when the signal/update goes over the wire. See module docstring.
Internal publish path used by TopicHandle.
Not part of the public API — call
TopicHandle.publish instead.