class documentation
class TemporalModel(Model): (source)
Constructor: TemporalModel(model_name, task_queue, schedule_to_close_timeout, schedule_to_start_timeout, ...)
A Strands Model that runs stream() as a Temporal activity.
model_name selects which factory the plugin will invoke worker-side; it must match a key in StrandsPlugin(models={...}). Construction of this TemporalModel itself does no I/O, so it is safe to instantiate at module level.
When streaming_topic is set, each StreamEvent is also published to
the named topic on the workflow's
temporalio.contrib.workflow_streams.WorkflowStream for external
consumers.
| Method | __init__ |
Configure the model name, activity options, and streaming settings. |
| Method | get |
Return an empty config; configuration lives on the worker-side model. |
| Async Method | stream |
Run the model via the registered Temporal activity and yield events. |
| Method | structured |
Not supported; use TemporalAgent(structured_output_model=...) instead. |
| Method | update |
No-op; the real model is configured worker-side via the plugin's factories. |
| Instance Variable | _model |
Undocumented |
| Instance Variable | _options |
Undocumented |
| Instance Variable | _streaming |
Undocumented |
| Instance Variable | _streaming |
Undocumented |
def __init__(self, model_name: milliseconds=100)):
(source)
¶
str | None = None, *, task_queue: str | None = None, schedule_to_close_timeout: timedelta | None = None, schedule_to_start_timeout: timedelta | None = None, start_to_close_timeout: timedelta | None = None, heartbeat_timeout: timedelta | None = None, retry_policy: RetryPolicy | None = None, cancellation_type: ActivityCancellationType = ActivityCancellationType.TRY_CANCEL, versioning_intent: VersioningIntent | None = None, summary: str | None = None, priority: Priority = Priority.default, streaming_topic: str | None = None, streaming_batch_interval: timedelta = timedelta(Configure the model name, activity options, and streaming settings.
async def stream(self, messages:
Messages, tool_specs: list[ ToolSpec] | None = None, system_prompt: str | None = None, *, tool_choice: ToolChoice | None = None, system_prompt_content: list[ SystemContentBlock] | None = None, invocation_state: dict[ str, Any] | None = None, **kwargs: Any) -> AsyncIterable[ StreamEvent]:
(source)
¶
Run the model via the registered Temporal activity and yield events.