GitHub
Back to course
Lesson 8
60 min

Production Deployment

Deploy agents with proper architecture and monitoring

Production Architecture

Key principles:

  1. Stateless servers: Store sessions externally (Redis)
  2. Horizontal scaling: Add more servers to handle load
  3. Rate limiting: Protect against abuse
  4. Monitoring: Track everything
+-------------------------------------------------------------+ | PRODUCTION SETUP | | | | Users | | | | | v | | Load Balancer | | | | | v | | Agent Servers (stateless, multiple instances) | | | | | +---> LLM Provider (OpenAI, Anthropic, etc.) | | | | | +---> Redis (session storage) | | | | | +---> Database (long-term memory, logs) | | | | Monitoring: Latency, errors, token usage, costs | | | +-------------------------------------------------------------+

Production-Ready Server

Full example with Redis sessions, rate limiting, and logging:

production_server.py
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from pydantic_ai import Agent, ModelMessage, UsageLimits
from pydantic_ai import ModelMessagesTypeAdapter
import redis.asyncio as redis
from pydantic_core import to_json
import logging
import time

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Config
REDIS_URL = "redis://localhost:6379"
MAX_TOKENS = 4000
RATE_LIMIT = 60  # requests per minute

# Globals
redis_client: redis.Redis = None
agent: Agent = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global redis_client, agent

    # Startup
    redis_client = redis.from_url(REDIS_URL)
    agent = Agent('openai:gpt-4o', instructions='Be helpful.')
    logger.info("Server started")

    yield

    # Shutdown
    await redis_client.close()
    logger.info("Server stopped")

app = FastAPI(lifespan=lifespan)

class ChatRequest(BaseModel):
    message: str = Field(..., max_length=10000)
    session_id: str

class ChatResponse(BaseModel):
    response: str
    tokens_used: int
    latency_ms: float

async def get_session_history(session_id: str) -> list[ModelMessage]:
    """Load session from Redis."""
    data = await redis_client.get(f"session:{session_id}")
    if data:
        return ModelMessagesTypeAdapter.validate_json(data)
    return []

async def save_session_history(session_id: str, messages: list[ModelMessage]):
    """Save session to Redis with 24h TTL."""
    await redis_client.setex(f"session:{session_id}", 86400, to_json(messages))

async def check_rate_limit(session_id: str) -> bool:
    """Check if session is within rate limit."""
    key = f"rate:{session_id}"
    count = await redis_client.get(key)

    if count and int(count) >= RATE_LIMIT:
        return False

    pipe = redis_client.pipeline()
    pipe.incr(key)
    pipe.expire(key, 60)
    await pipe.execute()
    return True

@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    start = time.time()

    # Rate limiting
    if not await check_rate_limit(request.session_id):
        raise HTTPException(429, "Rate limit exceeded. Try again in a minute.")

    try:
        # Load session
        history = await get_session_history(request.session_id)

        # Run agent with limits
        result = await agent.run(
            request.message,
            message_history=history,
            usage_limits=UsageLimits(response_tokens_limit=MAX_TOKENS)
        )

        # Save session
        await save_session_history(request.session_id, result.all_messages())

        # Calculate metrics
        latency = (time.time() - start) * 1000
        tokens = result.usage().total_tokens

        # Log for monitoring
        logger.info(f"session={request.session_id} latency={latency:.0f}ms tokens={tokens}")

        return ChatResponse(response=result.output, tokens_used=tokens, latency_ms=latency)

    except Exception as e:
        logger.error(f"Error: {e}")
        raise HTTPException(500, "Internal error")

@app.get("/health")
async def health():
    """Health check for load balancer."""
    try:
        await redis_client.ping()
        return {"status": "healthy"}
    except:
        return {"status": "degraded"}

Cost Management

Track costs and set limits to prevent runaway spending:

cost_management.py
from pydantic_ai import Agent, UsageLimits

# Cost per 1K tokens (example rates, check current pricing)
COSTS = {
    'gpt-4o': {'input': 0.0025, 'output': 0.01},
    'gpt-4o-mini': {'input': 0.00015, 'output': 0.0006},
    'claude-sonnet': {'input': 0.003, 'output': 0.015},
}

def calculate_cost(usage, model: str) -> float:
    """Calculate cost for a single request."""
    rates = COSTS.get(model, {'input': 0, 'output': 0})
    input_cost = (usage.input_tokens / 1000) * rates['input']
    output_cost = (usage.output_tokens / 1000) * rates['output']
    return input_cost + output_cost

# Usage
result = agent.run_sync('Hello')
cost = calculate_cost(result.usage(), 'gpt-4o')
print(f"Cost: ${cost:.4f}")

# Set limits to prevent runaway costs
result = agent.run_sync(
    'Hello',
    usage_limits=UsageLimits(
        request_limit=5,             # Max 5 LLM calls per run
        response_tokens_limit=1000,  # Max tokens per response
        total_tokens_limit=5000      # Max total tokens per run
    )
)

Dockerfile

Containerize your agent for deployment:

Dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .

# Run with gunicorn for production
CMD ["gunicorn", "main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "-b", "0.0.0.0:8000"]

Docker Compose

Run your agent with Redis:

docker-compose.yml
version: '3.8'

services:
  agent:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - REDIS_URL=redis://redis:6379
    depends_on:
      - redis

  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data

volumes:
  redis_data:
Key Takeaways
  • 1Stateless servers. Store sessions in Redis or database, not in memory.
  • 2Rate limiting. Protect against abuse and runaway costs.
  • 3Monitor everything. Latency, tokens, errors, costs.
  • 4Set usage limits. Prevent single requests from consuming too many resources.
  • 5Health checks. Let load balancers know when instances are healthy.