module documentation

Workflow-side stream object for Workflow Streams.

Instantiate WorkflowStream once from your workflow's @workflow.init method. The constructor registers the stream signal, update, and query handlers on the current workflow via temporalio.workflow.set_signal_handler, temporalio.workflow.set_update_handler, and temporalio.workflow.set_query_handler.

For workflows that support continue-as-new, include a WorkflowStreamState | None field on the workflow input and pass it as prior_state — it is None on fresh starts and carries accumulated state on continue-as-new.

Workflow-side and client-side topic handles (WorkflowTopicHandle.publish and TopicHandle.publish) both use the synchronous payload converter for per-item Payload construction. The codec chain (e.g. encryption, compression) is not run per item on either side — it runs once at the envelope level when Temporal's SDK encodes the signal/update that carries the batch. Running it per item as well would double-encrypt, because every signal arg already goes through the client's DataConverter.encode at dispatch time.

Type Variable T Undocumented
Function _payload_wire_size Approximate poll-response contribution of a single item.
Constant _MAX_POLL_RESPONSE_BYTES Undocumented
Constant _OFFSET_QUERY Undocumented
Constant _POLL_UPDATE Undocumented
Constant _PUBLISH_SIGNAL Undocumented

Undocumented

Value
TypeVar('T')
def _payload_wire_size(payload: Payload, topic: str) -> int: (source)

Approximate poll-response contribution of a single item.

Wire form is _WorkflowStreamWireItem(topic, base64(proto(Payload)), offset). Base64 inflates by ~4/3; we use the serialized length as a conservative approximation.

_MAX_POLL_RESPONSE_BYTES: int = (source)

Undocumented

Value
1000000
_OFFSET_QUERY: str = (source)

Undocumented

Value
'__temporal_workflow_stream_offset'
_POLL_UPDATE: str = (source)

Undocumented

Value
'__temporal_workflow_stream_poll'
_PUBLISH_SIGNAL: str = (source)

Undocumented

Value
'__temporal_workflow_stream_publish'