module documentation

Undocumented

Function build_call_tool_activity Return the per-server {server}-call-tool activity for registration.
Function build_list_tools_activity Return the per-server {server}-list-tools activity for registration.
Async Function get_connection Return the cached session for server, opening one lazily if needed.
Class _CallToolArgs Undocumented
Class _ConnectionRecord A single MCP session held open by a dedicated owner task.
Class _MCPToolInfo Undocumented
Async Function _evict_connection Undocumented
Async Function _paginate_list_tools Undocumented
Function _tool_infos Apply the client's tool filters and project to serializable records.
Constant _CONNECTIONS Undocumented
Constant _MCP_CONNECTION_IDLE Undocumented
def build_call_tool_activity(server: str, client_factory: Callable[[], MCPClient], idle_timeout: timedelta | None = None) -> Callable: (source)

Return the per-server {server}-call-tool activity for registration.

Reuses a worker-process MCP session opened lazily through client_factory. Idle connections are disconnected after idle_timeout (defaults to _MCP_CONNECTION_IDLE).

def build_list_tools_activity(server: str, client_factory: Callable[[], MCPClient], idle_timeout: timedelta | None = None) -> Callable: (source)

Return the per-server {server}-list-tools activity for registration.

Lists the server's tools (applying the client's tool filters) and reuses the same lazily-opened, idle-evicted worker-process MCP session as {server}-call-tool.

async def get_connection(server: str, client_factory: Callable[[], MCPClient], idle_timeout: timedelta) -> tuple[MCPClient, ClientSession, _ConnectionRecord]: (source)

Return the cached session for server, opening one lazily if needed.

Concurrent first-callers dedupe onto a single connect handshake by awaiting the same record. The returned record is acquired; the caller must release() it once the call completes so idle eviction can resume.

async def _evict_connection(server: str): (source)

Undocumented

async def _paginate_list_tools(session: ClientSession) -> list[Tool]: (source)

Undocumented

def _tool_infos(client: MCPClient, tools: Sequence[Tool]) -> list[_MCPToolInfo]: (source)

Apply the client's tool filters and project to serializable records.

Undocumented

Value
{}
_MCP_CONNECTION_IDLE = (source)

Undocumented

Value
timedelta(minutes=5)