class documentation

Interceptor for workers.

This should be extended by any worker interceptors.

Method intercept_activity Method called for intercepting an activity.
Method workflow_interceptor_class Class that will be instantiated and used to intercept workflows.

Method called for intercepting an activity.

Parameters
next:ActivityInboundInterceptorThe underlying inbound interceptor this interceptor should delegate to.
Returns
ActivityInboundInterceptorThe new interceptor that will be used to for the activity.
def workflow_interceptor_class(self, input: WorkflowInterceptorClassInput) -> type[WorkflowInboundInterceptor] | None: (source)

Class that will be instantiated and used to intercept workflows.

This method is called on workflow start. The class must have the same init as WorkflowInboundInterceptor.__init__. The input can be altered to do things like add additional extern functions.

Parameters
input:WorkflowInterceptorClassInputInput to this method that contains mutable properties that can be altered by this interceptor.
Returns
type[WorkflowInboundInterceptor] | NoneThe class to construct to intercept each workflow.