Production Architecture
Key principles:
- Stateless servers: Store sessions externally (Redis)
- Horizontal scaling: Add more servers to handle load
- Rate limiting: Protect against abuse
- Monitoring: Track everything
+-------------------------------------------------------------+
| PRODUCTION SETUP |
| |
| Users |
| | |
| v |
| Load Balancer |
| | |
| v |
| Agent Servers (stateless, multiple instances) |
| | |
| +---> LLM Provider (OpenAI, Anthropic, etc.) |
| | |
| +---> Redis (session storage) |
| | |
| +---> Database (long-term memory, logs) |
| |
| Monitoring: Latency, errors, token usage, costs |
| |
+-------------------------------------------------------------+
Production-Ready Server
Full example with Redis sessions, rate limiting, and logging:
production_server.py
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from pydantic_ai import Agent, ModelMessage, UsageLimits
from pydantic_ai import ModelMessagesTypeAdapter
import redis.asyncio as redis
from pydantic_core import to_json
import logging
import time
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Config
REDIS_URL = "redis://localhost:6379"
MAX_TOKENS = 4000
RATE_LIMIT = 60 # requests per minute
# Globals
redis_client: redis.Redis = None
agent: Agent = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global redis_client, agent
# Startup
redis_client = redis.from_url(REDIS_URL)
agent = Agent('openai:gpt-4o', instructions='Be helpful.')
logger.info("Server started")
yield
# Shutdown
await redis_client.close()
logger.info("Server stopped")
app = FastAPI(lifespan=lifespan)
class ChatRequest(BaseModel):
message: str = Field(..., max_length=10000)
session_id: str
class ChatResponse(BaseModel):
response: str
tokens_used: int
latency_ms: float
async def get_session_history(session_id: str) -> list[ModelMessage]:
"""Load session from Redis."""
data = await redis_client.get(f"session:{session_id}")
if data:
return ModelMessagesTypeAdapter.validate_json(data)
return []
async def save_session_history(session_id: str, messages: list[ModelMessage]):
"""Save session to Redis with 24h TTL."""
await redis_client.setex(f"session:{session_id}", 86400, to_json(messages))
async def check_rate_limit(session_id: str) -> bool:
"""Check if session is within rate limit."""
key = f"rate:{session_id}"
count = await redis_client.get(key)
if count and int(count) >= RATE_LIMIT:
return False
pipe = redis_client.pipeline()
pipe.incr(key)
pipe.expire(key, 60)
await pipe.execute()
return True
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
start = time.time()
# Rate limiting
if not await check_rate_limit(request.session_id):
raise HTTPException(429, "Rate limit exceeded. Try again in a minute.")
try:
# Load session
history = await get_session_history(request.session_id)
# Run agent with limits
result = await agent.run(
request.message,
message_history=history,
usage_limits=UsageLimits(response_tokens_limit=MAX_TOKENS)
)
# Save session
await save_session_history(request.session_id, result.all_messages())
# Calculate metrics
latency = (time.time() - start) * 1000
tokens = result.usage().total_tokens
# Log for monitoring
logger.info(f"session={request.session_id} latency={latency:.0f}ms tokens={tokens}")
return ChatResponse(response=result.output, tokens_used=tokens, latency_ms=latency)
except Exception as e:
logger.error(f"Error: {e}")
raise HTTPException(500, "Internal error")
@app.get("/health")
async def health():
"""Health check for load balancer."""
try:
await redis_client.ping()
return {"status": "healthy"}
except:
return {"status": "degraded"}Cost Management
Track costs and set limits to prevent runaway spending:
cost_management.py
from pydantic_ai import Agent, UsageLimits
# Cost per 1K tokens (example rates, check current pricing)
COSTS = {
'gpt-4o': {'input': 0.0025, 'output': 0.01},
'gpt-4o-mini': {'input': 0.00015, 'output': 0.0006},
'claude-sonnet': {'input': 0.003, 'output': 0.015},
}
def calculate_cost(usage, model: str) -> float:
"""Calculate cost for a single request."""
rates = COSTS.get(model, {'input': 0, 'output': 0})
input_cost = (usage.input_tokens / 1000) * rates['input']
output_cost = (usage.output_tokens / 1000) * rates['output']
return input_cost + output_cost
# Usage
result = agent.run_sync('Hello')
cost = calculate_cost(result.usage(), 'gpt-4o')
print(f"Cost: ${cost:.4f}")
# Set limits to prevent runaway costs
result = agent.run_sync(
'Hello',
usage_limits=UsageLimits(
request_limit=5, # Max 5 LLM calls per run
response_tokens_limit=1000, # Max tokens per response
total_tokens_limit=5000 # Max total tokens per run
)
)Dockerfile
Containerize your agent for deployment:
Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
# Run with gunicorn for production
CMD ["gunicorn", "main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "-b", "0.0.0.0:8000"]Docker Compose
Run your agent with Redis:
docker-compose.yml
version: '3.8'
services:
agent:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- REDIS_URL=redis://redis:6379
depends_on:
- redis
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
volumes:
redis_data:Key Takeaways
- 1Stateless servers. Store sessions in Redis or database, not in memory.
- 2Rate limiting. Protect against abuse and runaway costs.
- 3Monitor everything. Latency, tokens, errors, costs.
- 4Set usage limits. Prevent single requests from consuming too many resources.
- 5Health checks. Let load balancers know when instances are healthy.