GitHub
Back to course
Lesson 7
45 min

Multi-Agent Communication

Service patterns and building agent APIs

Same Codebase vs Different Codebases

SAME CODEBASE (use delegation):

  • Agents in same project
  • Same team maintains all agents
  • Direct function calls work
  • Example: main_agent calls calculator_agent

DIFFERENT CODEBASES (use API):

  • Agents from different teams
  • Different languages or frameworks
  • External vendor services
  • Need formal contracts and versioning
  • Example: your agent calls Stripe's API

Basic Agent Server (FastAPI)

Expose your agent as an HTTP API:

agent_api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from pydantic_ai import Agent

app = FastAPI()

# Your agent
assistant = Agent(
    'openai:gpt-4o',
    instructions='You are a helpful assistant.'
)

@assistant.tool_plain
def search(query: str) -> str:
    return f"Results for: {query}"

# API models
class QueryRequest(BaseModel):
    message: str
    session_id: str | None = None

class QueryResponse(BaseModel):
    response: str
    tokens_used: int

# Endpoint
@app.post("/chat", response_model=QueryResponse)
async def chat(request: QueryRequest):
    try:
        result = await assistant.run(request.message)
        return QueryResponse(
            response=result.output,
            tokens_used=result.usage().total_tokens
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health():
    return {"status": "healthy"}

# Run with: uvicorn main:app --host 0.0.0.0 --port 8000

Calling Remote Agents

Your agent can call other agents via HTTP:

remote_agent.py
import httpx
from pydantic_ai import Agent, RunContext

main_agent = Agent(
    'openai:gpt-4o',
    instructions='Use the product_lookup tool for product information.'
)

@main_agent.tool_plain
async def product_lookup(product_id: str) -> dict:
    """Look up product from external service."""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            'http://product-service:8001/query',
            json={'message': f'Get info for product {product_id}'},
            timeout=30.0
        )
        response.raise_for_status()
        return response.json()

Hub and Spoke Pattern

One coordinator routes to specialists. Use when different types of queries need different specialists.

+---------------+ | Coordinator | +-------+-------+ | +---------------+---------------+ | | | v v v +-----------+ +-----------+ +-----------+ | Sales | | Support | | Tech | | Agent | | Agent | | Agent | +-----------+ +-----------+ +-----------+

Hub Pattern Code

hub_pattern.py
from enum import Enum
from pydantic_ai import Agent, RunContext
from dataclasses import dataclass

class AgentType(str, Enum):
    SALES = "sales"
    SUPPORT = "support"
    TECH = "tech"

@dataclass
class AgentHub:
    agents: dict[AgentType, Agent]

    async def route(self, agent_type: AgentType, message: str) -> str:
        agent = self.agents[agent_type]
        result = await agent.run(message)
        return result.output

# Coordinator decides which specialist to use
router = Agent(
    'openai:gpt-4o',
    deps_type=AgentHub,
    instructions='''Route queries to the right specialist:
    - sales: pricing, deals, purchases
    - support: issues, complaints
    - tech: technical problems'''
)

@router.tool
async def route_to_specialist(
    ctx: RunContext[AgentHub],
    agent_type: str,
    query: str
) -> str:
    """Send query to specialist agent."""
    return await ctx.deps.route(AgentType(agent_type), query)

Event-Driven Pattern

Agents react to events instead of being called directly. Use when actions trigger other actions and loose coupling is needed.

event_driven.py
from dataclasses import dataclass
from pydantic_ai import Agent

@dataclass
class Event:
    type: str
    data: dict

class EventBus:
    def __init__(self):
        self.handlers = {}

    def subscribe(self, event_type: str, handler):
        if event_type not in self.handlers:
            self.handlers[event_type] = []
        self.handlers[event_type].append(handler)

    async def publish(self, event: Event):
        for handler in self.handlers.get(event.type, []):
            await handler(event)

# Setup
bus = EventBus()
order_agent = Agent('openai:gpt-4o', instructions='Process orders.')
notification_agent = Agent('openai:gpt-4o', instructions='Send notifications.')

async def handle_new_order(event: Event):
    result = await order_agent.run(f"Process: {event.data}")
    # Trigger next event
    await bus.publish(Event(
        type='order_processed',
        data={'order_id': event.data['id'], 'status': 'complete'}
    ))

async def handle_order_processed(event: Event):
    await notification_agent.run(f"Notify customer: {event.data}")

bus.subscribe('new_order', handle_new_order)
bus.subscribe('order_processed', handle_order_processed)
Key Takeaways
  • 1Same codebase = Delegation. Use agent-as-tool pattern.
  • 2Different codebases = API. Expose agents as HTTP endpoints.
  • 3Hub pattern for routing. Central coordinator routes to specialists.
  • 4Events for decoupling. Actions trigger other actions asynchronously.