Same Codebase vs Different Codebases
SAME CODEBASE (use delegation):
- Agents in same project
- Same team maintains all agents
- Direct function calls work
- Example: main_agent calls calculator_agent
DIFFERENT CODEBASES (use API):
- Agents from different teams
- Different languages or frameworks
- External vendor services
- Need formal contracts and versioning
- Example: your agent calls Stripe's API
Basic Agent Server (FastAPI)
Expose your agent as an HTTP API:
agent_api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from pydantic_ai import Agent
app = FastAPI()
# Your agent
assistant = Agent(
'openai:gpt-4o',
instructions='You are a helpful assistant.'
)
@assistant.tool_plain
def search(query: str) -> str:
return f"Results for: {query}"
# API models
class QueryRequest(BaseModel):
message: str
session_id: str | None = None
class QueryResponse(BaseModel):
response: str
tokens_used: int
# Endpoint
@app.post("/chat", response_model=QueryResponse)
async def chat(request: QueryRequest):
try:
result = await assistant.run(request.message)
return QueryResponse(
response=result.output,
tokens_used=result.usage().total_tokens
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health():
return {"status": "healthy"}
# Run with: uvicorn main:app --host 0.0.0.0 --port 8000Calling Remote Agents
Your agent can call other agents via HTTP:
remote_agent.py
import httpx
from pydantic_ai import Agent, RunContext
main_agent = Agent(
'openai:gpt-4o',
instructions='Use the product_lookup tool for product information.'
)
@main_agent.tool_plain
async def product_lookup(product_id: str) -> dict:
"""Look up product from external service."""
async with httpx.AsyncClient() as client:
response = await client.post(
'http://product-service:8001/query',
json={'message': f'Get info for product {product_id}'},
timeout=30.0
)
response.raise_for_status()
return response.json()Hub and Spoke Pattern
One coordinator routes to specialists. Use when different types of queries need different specialists.
+---------------+
| Coordinator |
+-------+-------+
|
+---------------+---------------+
| | |
v v v
+-----------+ +-----------+ +-----------+
| Sales | | Support | | Tech |
| Agent | | Agent | | Agent |
+-----------+ +-----------+ +-----------+
Hub Pattern Code
hub_pattern.py
from enum import Enum
from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
class AgentType(str, Enum):
SALES = "sales"
SUPPORT = "support"
TECH = "tech"
@dataclass
class AgentHub:
agents: dict[AgentType, Agent]
async def route(self, agent_type: AgentType, message: str) -> str:
agent = self.agents[agent_type]
result = await agent.run(message)
return result.output
# Coordinator decides which specialist to use
router = Agent(
'openai:gpt-4o',
deps_type=AgentHub,
instructions='''Route queries to the right specialist:
- sales: pricing, deals, purchases
- support: issues, complaints
- tech: technical problems'''
)
@router.tool
async def route_to_specialist(
ctx: RunContext[AgentHub],
agent_type: str,
query: str
) -> str:
"""Send query to specialist agent."""
return await ctx.deps.route(AgentType(agent_type), query)Event-Driven Pattern
Agents react to events instead of being called directly. Use when actions trigger other actions and loose coupling is needed.
event_driven.py
from dataclasses import dataclass
from pydantic_ai import Agent
@dataclass
class Event:
type: str
data: dict
class EventBus:
def __init__(self):
self.handlers = {}
def subscribe(self, event_type: str, handler):
if event_type not in self.handlers:
self.handlers[event_type] = []
self.handlers[event_type].append(handler)
async def publish(self, event: Event):
for handler in self.handlers.get(event.type, []):
await handler(event)
# Setup
bus = EventBus()
order_agent = Agent('openai:gpt-4o', instructions='Process orders.')
notification_agent = Agent('openai:gpt-4o', instructions='Send notifications.')
async def handle_new_order(event: Event):
result = await order_agent.run(f"Process: {event.data}")
# Trigger next event
await bus.publish(Event(
type='order_processed',
data={'order_id': event.data['id'], 'status': 'complete'}
))
async def handle_order_processed(event: Event):
await notification_agent.run(f"Notify customer: {event.data}")
bus.subscribe('new_order', handle_new_order)
bus.subscribe('order_processed', handle_order_processed)Key Takeaways
- 1Same codebase = Delegation. Use agent-as-tool pattern.
- 2Different codebases = API. Expose agents as HTTP endpoints.
- 3Hub pattern for routing. Central coordinator routes to specialists.
- 4Events for decoupling. Actions trigger other actions asynchronously.