Documentation Index Fetch the complete documentation index at: https://docs.ag-kit.dev/llms.txt
Use this file to discover all available pages before exploring further.
MCP Integration
The AG-Kit Python MCP integration provides seamless bidirectional conversion between AG-Kit’s tool system and the Model Context Protocol (MCP). This allows you to:
Connect to standard MCP servers and use their tools as AG-Kit BaseTool instances
Expose AG-Kit tools as MCP servers for external clients to consume
Support multiple transport protocols including stdio, HTTP, SSE, in-memory etc.
Overview
The MCP integration consists of several key components:
MCPClientTool : Wraps external MCP tools to work within AG-Kit
MCPToolkit : High-level toolkit for managing MCP tools
AGKitMCPServer : Exposes AG-Kit tools as a standard MCP server
MCPClientManager : Manages connections to multiple MCP servers
Quick Start
from ag_kit_py.tools.mcp import MCPClientManager, MCPClientTool
# Create a client manager
client_manager = MCPClientManager()
# Connect to an external MCP server
await client_manager.add_server( 'math-server' , {
'name' : 'math-client' ,
'version' : '1.0.0' ,
'transport' : {
'type' : 'stdio' ,
'command' : 'python' ,
'args' : [ '-m' , 'math_mcp_server' ]
}
})
# Create AG-Kit tools from MCP tools
client_tools = client_manager.create_client_tools( 'math-server' )
# Use the tools in your AG-Kit application
for tool in client_tools:
result = await tool.invoke({ 'a' : 5 , 'b' : 3 })
print (result.data)
from ag_kit_py.tools.mcp import AGKitMCPServer
from my_tools import MyCustomTool
# Create your AG-Kit tools
calculator_tool = MyCustomTool()
# Create MCP server
server = AGKitMCPServer({
'name' : 'ag-kit-mcp-server' ,
'version' : '1.0.0' ,
'description' : 'AG-Kit tools exposed via MCP'
})
# Register tools
server.register_tool(calculator_tool)
# Start server with stdio transport
await server.run({ 'type' : 'stdio' })
Transport Protocols
Stdio Transport
The most common transport for MCP servers, using standard input/output.
Server Configuration
# Start server with stdio
await server.run({ 'type' : 'stdio' })
Client Configuration
await client_manager.add_server( 'my-server' , {
'name' : 'my-client' ,
'version' : '1.0.0' ,
'transport' : {
'type' : 'stdio' ,
'command' : 'python' ,
'args' : [ '-m' , 'my_mcp_server' ],
'timeout' : 10000
}
})
HTTP Transport (StreamableHTTP)
For web-based MCP servers and clients.
Server Configuration
from fastapi import FastAPI
from ag_kit_py.tools.mcp import AGKitMCPServer
app = FastAPI()
server = AGKitMCPServer({
'name' : 'http-server' ,
'version' : '1.0.0'
})
async def http_setup ( server_instance , create_transport ):
@app.post ( '/mcp' )
async def handle_mcp ( request : Request):
transport = await create_transport({
'enable_json_response' : True ,
'session_id_generator' : lambda : str (uuid.uuid4())
})
body = await request.json()
return await transport.handle_request(request, body)
await server.run({
'type' : 'streamableHttp' ,
'streamable_http_setup' : http_setup
})
# Start FastAPI server
import uvicorn
uvicorn.run(app, host = '0.0.0.0' , port = 3000 )
Client Configuration
await client_manager.add_server( 'http-server' , {
'name' : 'http-client' ,
'version' : '1.0.0' ,
'transport' : {
'type' : 'streamableHttp' ,
'url' : 'http://localhost:3000/mcp' ,
'timeout' : 15000
}
})
SSE Transport
Server-Sent Events transport for real-time communication.
Server Configuration
from fastapi import FastAPI
from ag_kit_py.tools.mcp import AGKitMCPServer
app = FastAPI()
transport = None
async def sse_setup ( server_instance , create_transport_func ):
global transport
@app.get ( '/mcp/sse' )
async def handle_sse ( request : Request):
nonlocal transport
transport = await create_transport_func( '/mcp/sse' , request, {
'enable_dns_rebinding_protection' : False
})
@app.post ( '/mcp/sse' )
async def handle_post ( request : Request):
if transport:
body = await request.json()
return await transport.handle_post_message(request, body)
else :
return { 'error' : 'No active SSE connection' }
await server.run({
'type' : 'sse' ,
'sse_setup' : sse_setup
})
Client Configuration
await client_manager.add_server( 'sse-server' , {
'name' : 'sse-client' ,
'version' : '1.0.0' ,
'transport' : {
'type' : 'sse' ,
'url' : 'http://localhost:3000/mcp/sse'
}
})
In-Memory Transport
For testing and same-process communication.
Server Configuration
memory_id = 'my-memory-server'
await server.run({
'type' : 'memory' ,
'memory_id' : memory_id
})
Client Configuration
await client_manager.add_server( 'memory-server' , {
'name' : 'memory-client' ,
'version' : '1.0.0' ,
'transport' : {
'type' : 'memory' ,
'memory_id' : 'my-memory-server'
}
})
Core Examples
Example 1: Creating and Running an MCP Server
Expose AG-Kit tools as a standard MCP server:
from ag_kit_py.tools.mcp import AGKitMCPServer
from my_tools import MyExistingTool
# Create MCP server instance
server = AGKitMCPServer({
'name' : 'my-agkit-server' ,
'version' : '1.0.0' ,
'description' : 'AG-Kit tools exposed via MCP'
})
# Register existing AG-Kit tools
my_tool = MyExistingTool()
server.register_tool(my_tool)
# Register multiple tools with shared configuration
server.register_tools([tool1, tool2, tool3], {
'name_prefix' : 'agkit_' # All tools will be prefixed with 'agkit_'
})
# Start server with different transports
# Option 1: Stdio (most common)
await server.run({ 'type' : 'stdio' })
# Option 2: HTTP
await server.run({
'type' : 'streamableHttp' ,
'streamable_http_setup' : async_http_setup_function
})
# Option 3: Memory (for testing)
await server.run({ 'type' : 'memory' , 'memory_id' : 'test-server' })
Example 2: Connecting to External MCP Servers
Use external MCP servers as AG-Kit tools:
from ag_kit_py.tools.mcp import MCPClientManager
# Create client manager
client_manager = MCPClientManager()
# Connect to different types of MCP servers
# Stdio server
await client_manager.add_server( 'filesystem-server' , {
'name' : 'fs-client' ,
'version' : '1.0.0' ,
'transport' : {
'type' : 'stdio' ,
'command' : 'npx' ,
'args' : [ '@modelcontextprotocol/server-filesystem' , './workspace' ]
}
})
# HTTP server
await client_manager.add_server( 'api-server' , {
'name' : 'api-client' ,
'version' : '1.0.0' ,
'transport' : {
'type' : 'streamableHttp' ,
'url' : 'https://api.example.com/mcp'
}
})
# Create AG-Kit tools from MCP tools
all_client_tools = client_manager.create_client_tools()
# Create specific tool with custom name
specific_tool = client_manager.create_client_tool(
'filesystem-server' ,
'read_file' ,
'custom_file_reader'
)
# Use the tools in AG-Kit
result = await specific_tool.invoke({ 'path' : 'README.md' })
print (result.data)
High-level toolkit approach for managing multiple MCP connections:
from ag_kit_py.tools.mcp import MCPToolkit, create_mcp_toolkit
# Method 1: Create toolkit and add servers dynamically
toolkit = MCPToolkit( 'my-toolkit' )
await toolkit.add_server( 'filesystem' , {
'name' : 'filesystem' ,
'version' : '1.0.0' ,
'transport' : { 'type' : 'stdio' , 'command' : 'fs-server' }
})
await toolkit.add_server( 'database' , {
'name' : 'db-client' ,
'version' : '1.0.0' ,
'transport' : { 'type' : 'streamableHttp' , 'url' : 'http://db-server/mcp' }
})
# Method 2: Create toolkit with pre-configured servers
toolkit2 = await create_mcp_toolkit([
{
'id' : 'filesystem' ,
'config' : {
'name' : 'fs-client' ,
'version' : '1.0.0' ,
'transport' : { 'type' : 'stdio' , 'command' : 'fs-server' }
}
},
{
'id' : 'database' ,
'config' : {
'name' : 'db-client' ,
'version' : '1.0.0' ,
'transport' : { 'type' : 'streamableHttp' , 'url' : 'http://db-server/mcp' }
}
}
], 'multi-server-toolkit' )
# Access tools by server
fs_tools = toolkit.get_server_tools( 'filesystem' )
db_tools = toolkit.get_server_tools( 'database' )
# Access all tools
all_tools = toolkit.get_tools()
# Refresh connections
await toolkit.refresh()
# Cleanup when done
await toolkit.destroy()
Example 4: Memory Transport for Testing
Use memory transport for testing MCP integrations:
from ag_kit_py.tools.mcp import AGKitMCPServer, MCPClientManager
import asyncio
# Test setup
memory_id = 'test-memory-transport'
server_info = {
'name' : 'test-server' ,
'version' : '1.0.0'
}
# Create server
server = AGKitMCPServer(server_info)
server.register_tool(MyTestTool())
# Start server in background
server_task = asyncio.create_task(server.run({ 'type' : 'memory' , 'memory_id' : memory_id}))
await asyncio.sleep( 0.1 ) # Give server time to start
# Create client
client_manager = MCPClientManager()
await client_manager.add_server(server_info[ 'name' ], {
'name' : 'test-client' ,
'version' : '1.0.0' ,
'transport' : { 'type' : 'memory' , 'memory_id' : memory_id}
})
# Test tool execution
tools = client_manager.create_client_tools()
test_tool = tools[ 0 ]
result = await test_tool.invoke({ 'test' : 'data' })
# Cleanup
await client_manager.disconnect_all()
await server.stop()
server_task.cancel()
try :
await server_task
except asyncio.CancelledError:
pass
Example 5: Advanced Connection Management
Handle connection options and error scenarios:
from ag_kit_py.tools.mcp import MCPClientManager
client_manager = MCPClientManager()
# Add server with connection options
await client_manager.add_server( 'reliable-server' , {
'name' : 'reliable-client' ,
'version' : '1.0.0' ,
'transport' : {
'type' : 'stdio' ,
'command' : 'external-server' ,
'timeout' : 10000
}
}, {
'auto_reconnect' : True ,
'reconnect_delay' : 5000 ,
'max_reconnect_attempts' : 3 ,
'heartbeat_interval' : 30000
})
# Monitor connection status
import time
while True :
is_connected = client_manager.is_server_connected( 'reliable-server' )
print ( f 'Server connected: { is_connected } ' )
await asyncio.sleep( 5 )
Example 6: Event Handling and Monitoring
Monitor MCP operations with event listeners:
from ag_kit_py.tools.mcp import AGKitMCPServer, MCPClientManager
# Server event monitoring
server = AGKitMCPServer({
'name' : 'monitored-server' ,
'version' : '1.0.0'
})
def server_event_handler ( event ):
event_type = event.type
if event_type == 'connected' :
print ( f "Client connected: { event.data.get( 'transport' ) } " )
elif event_type == 'tool_called' :
print ( f "Tool { event.data.get( 'tool_name' ) } called" )
elif event_type == 'tool_result' :
print ( f "Tool returned: { event.data.get( 'result' ) } " )
elif event_type == 'error' :
print ( f "Server error: { event.data.get( 'error' ) } " )
server.add_event_listener(server_event_handler)
# Client event monitoring
client_manager = MCPClientManager()
def client_event_handler ( event ):
event_type = event.type
if event_type == 'connected' :
print ( f "Connected to server: { event.data.get( 'server_id' ) } " )
elif event_type == 'tool_discovered' :
print ( f "Discovered tool: { event.data.get( 'tool_name' ) } " )
elif event_type == 'disconnected' :
print ( f "Disconnected: { event.data.get( 'reason' , 'Unknown' ) } " )
elif event_type == 'error' :
print ( f "Client error: { event.data.get( 'error' ) } " )
client_manager.add_event_listener(client_event_handler)
Advanced Configuration
Customize how AG-Kit tools are exposed via MCP:
# Register tool with custom configuration
server.register_tool(calculator_tool, {
'name_prefix' : 'math_' , # Prefix tool name
'description' : 'Custom calculator' # Override description
})
# Register multiple tools with shared config
server.register_tools([tool1, tool2, tool3], {
'name_prefix' : 'utils_'
})
Customize MCP client tools:
from ag_kit_py.tools.mcp import MCPClientTool
from ag_kit_py.tools import ToolResult
def transform_input ( input_data ):
return { ** input_data, 'timestamp' : time.time()}
def transform_output ( output_data ):
return { 'result' : output_data, 'processed' : True }
def error_handler ( error ):
return ToolResult(
success = False ,
error = f 'Custom error: { str (error) } '
)
client_tool = MCPClientTool(
mcp_client = mcp_client,
mcp_tool_metadata = tool_metadata,
config = {
'name' : 'custom_tool_name' , # Custom AG-Kit name
'timeout' : 30000 , # Call timeout
'retries' : 3 # Retry attempts
},
adapter_config = {
'include_metadata' : True , # Include execution metadata
'transform_input' : transform_input,
'transform_output' : transform_output,
'error_handler' : error_handler
}
)
Connection Management
Advanced connection options:
await client_manager.add_server( 'reliable-server' , config, {
'auto_reconnect' : True , # Auto-reconnect on failure
'reconnect_delay' : 5000 , # Delay between reconnect attempts
'max_reconnect_attempts' : 5 , # Maximum reconnect attempts
'heartbeat_interval' : 30000 # Heartbeat interval
})
Event Handling
Monitor MCP operations:
# Server events
def handle_server_event ( event ):
if event.type == 'connected' :
print ( f "Client connected" )
elif event.type == 'tool_called' :
print ( f "Tool called: { event.data[ 'tool_name' ] } " )
elif event.type == 'error' :
print ( f "Error: { event.data[ 'error' ] } " )
server.add_event_listener(handle_server_event)
# Client events
def handle_client_event ( event ):
if event.type == 'connected' :
print ( f "Connected to server" )
elif event.type == 'tool_discovered' :
print ( f "Tool discovered: { event.data[ 'tool_name' ] } " )
elif event.type == 'disconnected' :
print ( f "Disconnected" )
client_manager.add_event_listener(handle_client_event)
Schema Conversion
AG-Kit automatically converts between Pydantic models and MCP JSON schemas:
Pydantic to MCP Schema
from pydantic import BaseModel, Field
from ag_kit_py.tools.mcp.utils import pydantic_to_json_schema
class UserInput ( BaseModel ):
name: str = Field( description = 'User name' )
age: int = Field( ge = 0 , le = 150 )
email: str | None = None
tags: list[ str ]
preferences: dict[ str , Any]
mcp_schema = pydantic_to_json_schema(UserInput)
# Results in MCP-compatible JSON schema
MCP to Pydantic Schema
# MCPClientTool automatically handles MCP schemas
mcp_tool_metadata = {
'name' : 'user_tool' ,
'inputSchema' : {
'type' : 'object' ,
'properties' : {
'name' : { 'type' : 'string' },
'age' : { 'type' : 'number' }
},
'required' : [ 'name' , 'age' ]
}
}
client_tool = MCPClientTool(client, mcp_tool_metadata)
# Tool now has schema for AG-Kit compatibility
Error Handling
Server Error Handling
from ag_kit_py.tools import BaseTool, ToolResult
class SafeTool ( BaseTool ):
async def _invoke ( self , input_data ):
try :
# Tool logic here
return ToolResult( success = True , data = result)
except Exception as error:
return ToolResult(
success = False ,
error = f 'Tool execution failed: { str (error) } '
)
server = AGKitMCPServer({
'name' : 'error-handling-server' ,
'version' : '1.0.0' ,
'error_handling' : 'return_error' , # or 'throw'
})
Client Error Handling
def custom_error_handler ( error ):
if 'timeout' in str (error):
return ToolResult(
success = False ,
error = 'Operation timed out, please try again'
)
return ToolResult(
success = False ,
error = f 'MCP tool error: { str (error) } '
)
client_tool = MCPClientTool(
mcp_client = client,
mcp_tool_metadata = metadata,
config = { 'retries' : 3 , 'timeout' : 10000 },
adapter_config = { 'error_handler' : custom_error_handler}
)
API Reference
For complete API documentation including all interfaces, types, and detailed method signatures, see the MCP API Reference .
AGKitMCPServer - Exposes AG-Kit tools as a standard MCP server
register_tool(tool: BaseTool, config: dict = None) -> dict - Register a single tool
register_tools(tools: list[BaseTool], config: dict = None) -> list[dict] - Register multiple tools
unregister_tool(name: str) -> bool - Remove a tool from the server
run(transport_config: dict) -> None - Start the server with specified transport
stop() -> None - Stop the server and cleanup resources
call_tool(name: str, args: dict) -> dict - Execute a tool directly
list_tools() -> list[dict] - Get list of all registered tools
is_server_running() -> bool - Check if server is running
get_stats() -> dict - Get server statistics
MCPClientManager - Manages connections to external MCP servers
add_server(server_id: str, config: dict, options: dict = None) -> None - Connect to an MCP server
disconnect_server(server_id: str) -> None - Disconnect from a specific server
disconnect_all() -> None - Disconnect from all servers
create_client_tools(server_id: str = None) -> list[MCPClientTool] - Create AG-Kit tool wrappers
create_client_tool(server_id: str, tool_name: str, agkit_tool_name: str = None) -> MCPClientTool - Create specific tool wrapper
call_tool(server_id: str, tool_name: str, args: Any) -> Any - Call an MCP tool directly
is_server_connected(server_id: str) -> bool - Check connection status
get_stats() -> dict - Get client manager statistics
MCPClientTool - Wraps external MCP tools for AG-Kit compatibility
invoke(input_data: Any, context: dict = None) -> ToolResult - Execute the MCP tool
get_mcp_metadata() -> dict - Get original MCP tool metadata
is_connected() -> bool - Check if underlying client is connected
update_config(new_config: dict) -> None - Update tool configuration
MCPToolkit - High-level toolkit for managing MCP tools
add_server(server_id: str, config: dict) -> None - Add an MCP server
remove_server(server_id: str) -> None - Remove a server
get_connected_servers() -> list[str] - Get list of connected servers
get_server_tools(server_id: str) -> list[MCPClientTool] - Get tools from specific server
refresh() -> None - Refresh all connections
destroy() -> None - Cleanup all resources
get_client_manager() -> MCPClientManager - Get underlying client manager
See the complete API documentation for detailed interface definitions, type information, utility functions, and advanced configuration options.
Best Practices
Use appropriate transports : stdio for CLI tools, HTTP for web services, memory for testing
Handle errors gracefully : Implement proper error handling and retry logic
Monitor connections : Use event listeners to track connection status
Clean up resources : Always call cleanup methods when done
Test thoroughly : Use memory transport for comprehensive testing
Schema validation : Ensure your schemas are compatible between Pydantic and MCP JSON Schema
Connection management : Use connection options for reliable production deployments
Async/await properly : Use asyncio.create_task() for background server tasks and handle CancelledError properly
Troubleshooting
Common Issues
Connection timeouts : Increase timeout values in transport configuration
Schema conversion errors : Ensure Pydantic models use supported types
Tool not found : Check tool registration and naming
Transport errors : Verify transport configuration and server availability
Memory leaks : Always clean up connections and event listeners
CancelledError : Handle task cancellation properly in async contexts
Debug Mode
Enable logging for debugging:
server = AGKitMCPServer({
'name' : 'debug-server' ,
'version' : '1.0.0' ,
'log_level' : 'debug' # Enable debug logging
})
Examples Repository
Find complete working examples at:
Framework Adapters LangChain & LlamaIndex integration
Code Executor Execute Python code
Bash Tools Execute shell commands
Filesystem Tools File system operations