Documentation Index
Fetch the complete documentation index at: https://docs.ag-kit.dev/llms.txt
Use this file to discover all available pages before exploring further.
LangGraph Agent
The LangGraphAgent class provides a LangGraph-based agent implementation that extends BaseAgent. It includes critical stability patches to fix known issues in the ag-ui-langgraph library, particularly around message regeneration and streaming event handling.
Installation
pip install ag_kit_py langgraph langchain-core
Optional dependencies:
# For OpenAI models
pip install langchain-openai
# For Anthropic models
pip install langchain-anthropic
# For community tools and integrations
pip install langchain-community
Quick Start
Basic Chat Agent
from ag_kit_py.agents import LangGraphAgent
from ag_kit_py.providers import create_provider
from langgraph.graph import StateGraph, MessagesState, START, END
# Create provider and model
provider = create_provider("openai")
model = provider.get_langchain_model()
# Define chat node
def chat_node(state: MessagesState):
response = model.invoke(state["messages"])
return {"messages": [response]}
# Build workflow
workflow = StateGraph(MessagesState)
workflow.add_node("chat", chat_node)
workflow.add_edge(START, "chat")
workflow.add_edge(END, "chat")
# Compile graph
graph = workflow.compile()
# Create agent (stability patches applied automatically)
agent = LangGraphAgent(
name="ChatAgent",
description="A conversational assistant",
graph=graph
)
With Server
from ag_kit_py.server import AgentServiceApp
def create_agent():
return {"agent": agent}
AgentServiceApp().run(create_agent, port=8000)
LangGraphAgent Class
Class Definition
from ag_kit_py.agents import LangGraphAgent
from langgraph.graph.state import CompiledStateGraph
class LangGraphAgent(BaseAgent):
"""LangGraph Agent implementation extending BaseAgent.
This class wraps the ag-ui-langgraph LangGraphAgent with stability patches
and provides a consistent interface following the AG-Kit agent pattern.
"""
def __init__(
self,
name: str,
description: str,
graph: CompiledStateGraph
):
"""Initialize the LangGraph agent with stability patches.
Args:
name: Human-readable name for the agent
description: Detailed description of the agent's purpose
graph: Compiled LangGraph state graph defining the agent's workflow
"""
Parameters
- name (
str): Human-readable name for the agent
- description (
str): Detailed description of the agent’s purpose and capabilities
- graph (
CompiledStateGraph): Compiled LangGraph state graph defining the agent’s workflow
Methods
run()
Execute the LangGraph agent with the given input.
async def run(
self,
run_input: RunAgentInput
) -> AsyncGenerator[BaseEvent, None]:
"""Execute the LangGraph agent with the given input.
Args:
run_input: Input data for the agent execution
Yields:
BaseEvent: Events representing the agent's execution progress
"""
Example:
from ag_ui.core import RunAgentInput
run_input = RunAgentInput(
messages=[{"role": "user", "content": "Hello"}],
run_id="run-123",
thread_id="thread-456",
state={},
context=[],
tools=[],
forwarded_props={}
)
async for event in agent.run(run_input):
print(f"Event: {event.type}")
destroy()
Clean up resources used by the agent.
def destroy(self) -> None:
"""Clean up resources used by the agent."""
Stability Patches
AG-Kit Python SDK includes critical stability patches that fix known issues in the ag-ui-langgraph library. These patches are applied automatically when creating a LangGraphAgent instance.
Fixed Issues
- Message Regeneration Logic: Prevents “Message ID not found in history” errors
- Checkpoint Error Handling: Improves error handling for missing checkpoints
- Streaming Event Stability: Enhances streaming event processing reliability
Patch Details
The patches modify the following behaviors:
# 1. Prevents automatic regeneration that causes errors
# Only regenerates when explicitly requested, not on automatic detection
# 2. Enhanced error handling for missing checkpoints
# Gracefully degrades instead of crashing when checkpoints are missing
# 3. Improved streaming event processing
# Better handling of edge cases in streaming responses
Automatic Application
Patches are applied automatically on first agent creation:
from ag_kit_py.agents import LangGraphAgent
# First call automatically applies all patches
agent = LangGraphAgent("test", "Test agent", graph)
# Subsequent agents use the patched methods
agent2 = LangGraphAgent("test2", "Another agent", graph2)
Usage Examples
from langchain_core.tools import tool
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode
from ag_kit_py.providers import create_provider
# Define tools
@tool
def search_web(query: str) -> str:
"""Search the web for information."""
return f"Search results for: {query}"
@tool
def calculate(expression: str) -> str:
"""Calculate a mathematical expression."""
try:
result = eval(expression)
return f"Result: {result}"
except:
return "Calculation error"
tools = [search_web, calculate]
# Create model with tools
provider = create_provider("openai")
model = provider.get_langchain_model().bind_tools(tools)
# Define agent node
def agent_node(state: MessagesState):
response = model.invoke(state["messages"])
return {"messages": [response]}
# Define tool node
tool_node = ToolNode(tools)
# Route function
def should_continue(state: MessagesState):
messages = state["messages"]
last_message = messages[-1]
if last_message.tool_calls:
return "tools"
return END
# Build workflow
workflow = StateGraph(MessagesState)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)
workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", should_continue)
workflow.add_edge("tools", "agent")
# Create agent
agent = LangGraphAgent(
name="ToolAgent",
description="Agent with web search and calculator tools",
graph=workflow.compile()
)
Multi-Step Workflow
from langgraph.graph import StateGraph, MessagesState, START, END
from typing import Literal
from langchain_core.messages import HumanMessage
class WorkflowState(MessagesState):
current_step: str = "analyze"
analysis_result: str = ""
plan: str = ""
def analyze_node(state: WorkflowState):
# Analysis logic
analysis = "Analysis complete"
return {
"current_step": "plan",
"analysis_result": analysis,
"messages": [HumanMessage(content=f"Analysis: {analysis}")]
}
def plan_node(state: WorkflowState):
# Planning logic
plan = f"Plan based on {state['analysis_result']}"
return {
"current_step": "execute",
"plan": plan,
"messages": [HumanMessage(content=f"Plan: {plan}")]
}
def execute_node(state: WorkflowState):
# Execution logic
result = f"Executed plan: {state['plan']}"
return {
"current_step": "done",
"messages": [HumanMessage(content=f"Result: {result}")]
}
def route_step(state: WorkflowState) -> Literal["plan", "execute", "end"]:
step = state.get("current_step", "analyze")
if step == "analyze":
return "plan"
elif step == "plan":
return "execute"
else:
return "end"
# Build workflow
workflow = StateGraph(WorkflowState)
workflow.add_node("analyze", analyze_node)
workflow.add_node("plan", plan_node)
workflow.add_node("execute", execute_node)
workflow.add_edge(START, "analyze")
workflow.add_conditional_edges("analyze", route_step)
workflow.add_conditional_edges("plan", route_step)
workflow.add_conditional_edges("execute", route_step)
agent = LangGraphAgent(
name="WorkflowAgent",
description="Multi-step workflow agent",
graph=workflow.compile()
)
Checkpoints and Persistence
from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver
# Memory checkpoint
memory_saver = MemorySaver()
workflow_with_memory = workflow.compile(checkpointer=memory_saver)
# SQLite checkpoint
sqlite_saver = SqliteSaver.from_conn_string("checkpoints.db")
workflow_with_sqlite = workflow.compile(checkpointer=sqlite_saver)
# Create agent with persistence
persistent_agent = LangGraphAgent(
name="PersistentAgent",
description="Agent with persistent state",
graph=workflow_with_sqlite
)
Human-in-the-Loop
from langgraph.graph import StateGraph, MessagesState, START, END, interrupt
from langchain_core.messages import HumanMessage
class InterruptState(MessagesState):
user_approval: bool = False
def approval_node(state: InterruptState):
# Request user approval
interrupt("User approval required to continue")
# Check approval status
if state.get("user_approval", False):
return {"messages": [HumanMessage(content="Approved, continuing")]}
else:
return {"messages": [HumanMessage(content="Waiting for approval")]}
workflow = StateGraph(InterruptState)
workflow.add_node("approval", approval_node)
workflow.add_edge(START, "approval")
workflow.add_edge("approval", END)
agent = LangGraphAgent(
name="InterruptAgent",
description="Agent with human-in-the-loop approval",
graph=workflow.compile()
)
Streaming Responses
from ag_ui.core import RunAgentInput, EventType
async def stream_agent_response(agent, user_message: str):
"""Stream agent responses in real-time."""
run_input = RunAgentInput(
messages=[{"role": "user", "content": user_message}],
run_id="run-123",
thread_id="thread-456",
state={},
context=[],
tools=[],
forwarded_props={}
)
async for event in agent.run(run_input):
if event.type == EventType.TEXT_MESSAGE_CONTENT:
content = event.raw_event["data"]["chunk"]["content"]
print(content, end="", flush=True)
elif event.type == EventType.TOOL_CALL_START:
print(f"\n[Calling tool: {event.tool_call_name}]")
elif event.type == EventType.TOOL_CALL_RESULT:
print(f"[Tool result received]")
# Usage
await stream_agent_response(agent, "What's the weather in Paris?")
Error Handling
Common Errors and Solutions
1. Message ID Not Found
# Fixed by stability patches
# The patches automatically handle this case without manual intervention
2. Missing Checkpoint
# Fixed by stability patches
# Gracefully degrades to normal flow when checkpoint is missing
from langchain_core.tools import tool
@tool
def safe_tool(input: str) -> str:
"""Tool with error handling."""
try:
# Tool logic
return process(input)
except Exception as e:
return f"Tool execution error: {str(e)}"
Debugging
import logging
# Enable detailed logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ag_kit_py.agents")
# Patch application is logged automatically
# Look for: "🔧 MONKEY PATCH APPLIED: LangGraphAgent methods replaced successfully"
1. State Optimization
# Good: Keep state minimal
class OptimizedState(MessagesState):
current_step: str = ""
# Only store necessary fields
# Bad: Avoid large objects in state
class BadState(MessagesState):
large_data: dict = {} # ❌ Avoid
2. Node Optimization
async def optimized_node(state: MessagesState):
# Use async operations
result = await async_operation()
# Return only changed fields
return {"messages": [result]} # ✅ Good
# return state # ❌ Avoid returning entire state
@tool
async def async_tool(query: str) -> str:
"""Use async tools for better performance."""
result = await async_api_call(query)
return result
Best Practices
1. State Design
- Keep state structure simple
- Store only necessary data
- Use type annotations
from typing import TypedDict
from langgraph.graph import MessagesState
class MyState(MessagesState):
"""Well-designed state with type hints."""
current_step: str
result: str
2. Node Implementation
- Use async functions
- Handle exceptions
- Return explicit state updates
async def my_node(state: MyState):
"""Well-implemented node."""
try:
result = await process(state)
return {"result": result}
except Exception as e:
return {"error": str(e)}
- Provide clear tool descriptions
- Implement error handling
- Use typed parameters
@tool
def well_documented_tool(query: str, limit: int = 10) -> str:
"""Search for information.
Args:
query: The search query
limit: Maximum number of results (default: 10)
Returns:
Search results as formatted string
"""
try:
return search(query, limit)
except Exception as e:
return f"Search failed: {e}"
4. Error Handling
- Catch exceptions in nodes
- Provide meaningful error messages
- Implement graceful degradation
def error_handling_node(state: MyState):
"""Node with proper error handling."""
try:
result = risky_operation(state)
return {"result": result}
except ValueError as e:
# Handle specific errors
return {"error": f"Invalid input: {e}"}
except Exception as e:
# Handle unexpected errors
return {"error": f"Unexpected error: {e}"}
Integration with Other Components
Server Integration
from ag_kit_py.server import AgentServiceApp
def create_agent():
return {"agent": agent}
# Quick start
AgentServiceApp().run(create_agent, port=8000)
# Advanced configuration
app = AgentServiceApp()
fastapi_app = app.build(
create_agent,
base_path="/api",
enable_cors=True
)
Provider Integration
from ag_kit_py.providers import create_provider
# Create provider
provider = create_provider("openai")
# Get LangChain model
model = provider.get_langchain_model(
model="gpt-4o-mini",
temperature=0.7
)
# Use in LangGraph workflow
def chat_node(state: MessagesState):
response = model.invoke(state["messages"])
return {"messages": [response]}
Complete Example
"""
Complete example of LangGraph agent with tools, checkpoints, and server.
"""
from ag_kit_py.agents import LangGraphAgent
from ag_kit_py.providers import create_provider
from ag_kit_py.server import AgentServiceApp
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.sqlite import SqliteSaver
from langchain_core.tools import tool
# Define tools
@tool
def search(query: str) -> str:
"""Search for information."""
return f"Results for: {query}"
@tool
def calculate(expression: str) -> str:
"""Calculate expression."""
try:
return str(eval(expression))
except:
return "Error"
tools = [search, calculate]
# Create provider and model
provider = create_provider("openai")
model = provider.get_langchain_model().bind_tools(tools)
# Define nodes
def agent_node(state: MessagesState):
response = model.invoke(state["messages"])
return {"messages": [response]}
tool_node = ToolNode(tools)
# Route function
def should_continue(state: MessagesState):
last_message = state["messages"][-1]
if last_message.tool_calls:
return "tools"
return END
# Build workflow
workflow = StateGraph(MessagesState)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)
workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", should_continue)
workflow.add_edge("tools", "agent")
# Add persistence
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
graph = workflow.compile(checkpointer=checkpointer)
# Create agent
agent = LangGraphAgent(
name="CompleteAgent",
description="Agent with tools and persistence",
graph=graph
)
# Create agent creator
def create_agent():
return {"agent": agent}
# Run server
if __name__ == "__main__":
AgentServiceApp().run(create_agent, port=8000)