module documentation

Undocumented

Function build_call_tool_activity Return the per-server {server}-call-tool activity for registration.
Function clear_cache Drop the cached tool list for server.
Async Function get_connection Return the cached session for server, opening one lazily if needed.
Async Function populate_cache Connect to the MCP server, list tools, fill _TOOL_CACHE.
Class _CallToolArgs Undocumented
Class _ConnectionRecord A single MCP session held open by a dedicated owner task.
Class _MCPToolInfo Undocumented
Function _agent_tool_for_filtering Undocumented
Async Function _evict_connection Undocumented
Async Function _list_mcp_tools Undocumented
Constant _CONNECTIONS Undocumented
Constant _MCP_CONNECTION_IDLE Undocumented
Constant _TOOL_CACHE Undocumented
def build_call_tool_activity(server: str, client_factory: Callable[[], MCPClient], idle_timeout: timedelta | None = None) -> Callable: (source)

Return the per-server {server}-call-tool activity for registration.

Reuses a worker-process MCP session opened lazily through client_factory. Idle connections are disconnected after idle_timeout (defaults to _MCP_CONNECTION_IDLE).

def clear_cache(server: str): (source)

Drop the cached tool list for server.

async def get_connection(server: str, client_factory: Callable[[], MCPClient], idle_timeout: timedelta) -> tuple[MCPClient, ClientSession, _ConnectionRecord]: (source)

Return the cached session for server, opening one lazily if needed.

Concurrent first-callers dedupe onto a single connect handshake by awaiting the same record. The returned record is acquired; the caller must release() it once the call completes so idle eviction can resume.

async def populate_cache(server: str, client_factory: Callable[[], MCPClient]): (source)

Connect to the MCP server, list tools, fill _TOOL_CACHE.

def _agent_tool_for_filtering(client: MCPClient, tool: Tool) -> MCPAgentTool: (source)

Undocumented

async def _evict_connection(server: str): (source)

Undocumented

async def _list_mcp_tools(client: MCPClient) -> Sequence[Tool]: (source)

Undocumented

Undocumented

Value
{}
_MCP_CONNECTION_IDLE = (source)

Undocumented

Value
timedelta(minutes=5)

Undocumented

Value
{}