Skip to Content
All blogs

From Chaos to Orchestration: Building Production-Grade Multi-Agent Systems with Pydantic AI

 — #Pydantic AI#Multi-Agent Systems#Agent Orchestration#AI Agents#MCP#Python AI Framework#LLMOps

After architecting eleven specialized agents across separate repositories for financial analysis systems, I’ve learned that connecting multi-agent systems isn’t just about making agents talk—it’s about building resilient, observable, and cost-effective orchestration that scales under production load. Pydantic AI’s recent V1.0 release has matured into the framework I wish I’d had when building those early A2A protocols and MCP integrations. Here’s what I’ve discovered about building multi-agent systems that actually survive contact with real users and production workloads.

Why Multi-Agent Architecture Matters in Production

Single monolithic agents break down when you need institutional-grade capabilities. I learned this the hard way when our initial investment thesis agent tried to handle market research, SEC filing analysis, and risk assessment in one bloated prompt. The token costs were astronomical, debugging was impossible, and the failure modes were unpredictable.

Pydantic AI solves this with four distinct architectural patterns that I’ve now implemented across financial, e-commerce, and content systems. Each pattern serves specific production needs: simple delegation for basic task distribution, agent delegation for hierarchical tool-based workflows, programmatic handoff for complex business logic, and graph-based orchestration for sophisticated state machines.

The framework’s production-first philosophy shows in every design decision. Type safety prevents runtime failures. Native async support handles concurrent workflows without blocking. Comprehensive usage tracking prevents runaway costs. Most importantly, the dependency injection system makes testing and deployment straightforward—something you appreciate when you’re managing dozens of agents across multiple environments.

Architecture Patterns That Scale

Core Communication Primitives

Pydantic AI’s agent communication revolves around three fundamental concepts that I use in every production system. Agents are stateless containers that encapsulate models, prompts, tools, and output validation—think of them as functions that happen to call LLMs. RunContext provides type-safe dependency injection, carrying database connections, API clients, and configuration between agents while tracking usage across the entire workflow. Tools decorated with @agent.tool enable delegation, creating clean interfaces for agent-to-agent communication.

This is what I like about it: Pydantic AI does not force you into a specific orchestration pattern. It gives you primitives that compose naturally. Need a router? Build an agent with tools that delegate to specialists. Need sequential processing? Pass RunContext between agent calls. Need parallel execution? Use asyncio with shared usage tracking. The framework stays out of your way.

graph TB
    subgraph P1["🔀 Pattern 1: DELEGATION"]
        direction TB
        PA["Parent Agent"]
        CA1["Specialist A"]
        CA2["Specialist B"]
        CA3["Specialist C"]
        PA -->|"tool call"| CA1
        PA -->|"tool call"| CA2
        PA -->|"tool call"| CA3
    end

    subgraph P2["➡️ Pattern 2: HANDOFF"]
        direction TB
        HA["Agent A"] -->|"result"| HB["Agent B"]
        HB -->|"result"| HC["Agent C"]
    end

    subgraph P3["⚡ Pattern 3: PARALLEL"]
        direction TB
        PP["Coordinator"]
        PP1["Worker 1"]
        PP2["Worker 2"]
        PP3["Worker 3"]
        PP -->|"async"| PP1
        PP -->|"async"| PP2
        PP -->|"async"| PP3
    end

    subgraph P4["🔄 Pattern 4: GRAPH"]
        direction TB
        GA["Start"] -->|"if score > 0.6"| GB["Deep Analysis"]
        GA -->|"if score ≤ 0.6"| GC["Early Exit"]
        GB --> GD["Risk Check"]
    end

    style PA fill:#7E57C2,stroke:#4527A0,color:#fff
    style CA1 fill:#AB47BC,stroke:#6A1B9A,color:#fff
    style CA2 fill:#BA68C8,stroke:#8E24AA,color:#fff
    style CA3 fill:#CE93D8,stroke:#7B1FA2,color:#fff
    style HA fill:#29B6F6,stroke:#0277BD,color:#fff
    style HB fill:#4FC3F7,stroke:#0288D1,color:#fff
    style HC fill:#81D4FA,stroke:#0277BD,color:#fff
    style PP fill:#FF7043,stroke:#D84315,color:#fff
    style PP1 fill:#FF8A65,stroke:#BF360C,color:#fff
    style PP2 fill:#FFA726,stroke:#E65100,color:#fff
    style PP3 fill:#FFB74D,stroke:#EF6C00,color:#fff
    style GA fill:#66BB6A,stroke:#2E7D32,color:#fff
    style GB fill:#81C784,stroke:#388E3C,color:#fff
    style GC fill:#EF5350,stroke:#C62828,color:#fff
    style GD fill:#A5D6A7,stroke:#2E7D32,color:#333

Pattern 1: Agent Delegation

This is my go-to pattern for 80% of multi-agent workflows. A parent agent uses other agents as tools, maintaining usage tracking and dependency sharing throughout the hierarchy. I’ve implemented this in healthcare systems where a triage agent routes patient queries to symptom checkers, appointment schedulers, or prescription assistants based on intent classification.

from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
from pydantic import BaseModel, Field

@dataclass
class HealthcareDependencies:
    patient_db: DatabaseConnection
    symptom_api: SymptomAnalysisAPI
    patient_id: str
    security_context: dict

class DiagnosticOutput(BaseModel):
    assessment: str = Field(description='Clinical assessment')
    urgency_level: int = Field(ge=1, le=5, description='Urgency from 1-5')
    requires_specialist: bool = Field(description='Needs specialist referral')
    recommended_actions: list[str] = Field(default=[])

# Router agent with medical triage logic
triage_agent = Agent(
    'openai:gpt-4o',
    deps_type=HealthcareDependencies,
    system_prompt='Route medical inquiries based on symptom severity and patient history'
)

# Specialized diagnostic agent
diagnostic_agent = Agent(
    'openai:gpt-4o',
    deps_type=HealthcareDependencies,
    output_type=DiagnosticOutput,
    system_prompt='Analyze symptoms and provide structured medical assessment'
)

@triage_agent.tool
async def analyze_symptoms(ctx: RunContext[HealthcareDependencies], symptoms: str) -> DiagnosticOutput:
    """Delegate to diagnostic agent with shared medical context and usage tracking"""
    result = await diagnostic_agent.run(
        symptoms,
        deps=ctx.deps,  # Pass medical database and API access
        usage=ctx.usage  # Track tokens across medical consultation
    )
    return result.output

The delegation pattern handles 90% of our medical query routing with proper urgency classification. Each specialist agent receives exactly the dependencies it needs—diagnostic agents get symptom databases, scheduling agents get calendar access, prescription agents get pharmacy integrations. Security boundaries are maintained through the dependency system, ensuring HIPAA compliance across agent boundaries.

Pattern 2: Programmatic Handoff

When business logic is complex, programmatic handoff gives you full control over agent orchestration. Unlike delegation where control returns to the parent, handoff lets your application code manage state transitions and decision points. I use this pattern extensively in financial analysis workflows where market research agents feed sector analysis agents, which then inform risk assessment agents, with our application logic validating regulatory compliance at each step.

async def financial_analysis_pipeline(ticker: str, deps: FinancialDependencies) -> AnalysisReport:
    """Sequential analysis with business logic controlling handoffs"""
    usage = Usage()

    # Step 1: Market research
    market_result = await market_research_agent.run(
        f"Analyze market conditions for {ticker}",
        deps=deps,
        usage=usage
    )

    # Business logic: Check if market conditions warrant further analysis
    if market_result.output.risk_score > 0.7:
        return AnalysisReport(recommendation="HIGH_RISK", reason="Market volatility too high")

    # Step 2: Sector analysis with market context
    sector_result = await sector_agent.run(
        f"Sector analysis for {ticker} given market conditions: {market_result.output.summary}",
        deps=deps,
        usage=usage
    )

    # Step 3: Final recommendation synthesis
    final_analysis = await synthesis_agent.run(
        f"Investment recommendation for {ticker} based on market: {market_result.output} and sector: {sector_result.output}",
        deps=deps,
        usage=usage
    )

    return AnalysisReport(
        ticker=ticker,
        recommendation=final_analysis.output.recommendation,
        total_cost=usage.total_cost(),
        confidence=final_analysis.output.confidence
    )

This pattern is essential when you need regulatory compliance, cost controls, or complex decision trees between agents. Our financial analysis pipeline includes SEC compliance checks, risk limit validation, and audit logging at each handoff point—requirements that pure agent delegation can’t handle cleanly.

Pattern 3: Parallel Execution

Production systems need parallelization to meet SLA requirements. I’ve seen 4x latency improvements using asyncio patterns with proper resource management. The key insight: share usage tracking and dependency state while controlling concurrency with semaphores.

import asyncio
from pydantic_ai.usage import Usage, UsageLimits

async def parallel_content_optimization_workflow(website_url: str, target_keywords: list[str]) -> ContentOptimization:
    """Execute SEO analysis agents in parallel with resource controls"""
    shared_usage = Usage()
    shared_deps = SEODependencies(
        analytics_api=analytics_client,
        keyword_db=keyword_database,
        website_url=website_url
    )
    semaphore = asyncio.Semaphore(6)  # Prevent API rate limiting

    async def execute_seo_agent(agent, analysis_type, target):
        async with semaphore:
            return await agent.run(
                f"Analyze {analysis_type} for {target}",
                deps=shared_deps,
                usage=shared_usage,
                usage_limits=UsageLimits(
                    request_limit=5,  # Cost control per agent
                    total_tokens_limit=2000
                )
            )

    # Launch parallel SEO analysis tasks
    tasks = [
        execute_seo_agent(technical_seo_agent, "technical issues", website_url),
        execute_seo_agent(content_seo_agent, "content optimization", website_url),
        execute_seo_agent(keyword_research_agent, "keyword opportunities", target_keywords),
        execute_seo_agent(competitor_agent, "competitive analysis", target_keywords)
    ]

    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Handle partial failures gracefully
    successful_results = [r for r in results if not isinstance(r, Exception)]

    # Synthesize recommendations from successful analyses
    optimization_plan = await seo_synthesis_agent.run(
        f"Create SEO optimization plan from: {successful_results}",
        deps=shared_deps,
        usage=shared_usage
    )

    return ContentOptimization(
        analyses=successful_results,
        optimization_plan=optimization_plan.output,
        total_cost=shared_usage.total_cost(),
        failed_analyses=len(results) - len(successful_results)
    )

This pattern consistently delivers 60-70% latency reduction compared to sequential execution. The semaphore prevents API throttling, usage limits control costs per agent, and graceful failure handling ensures partial results are still useful. I’ve deployed this in SEO analysis systems processing thousands of websites daily.

Pattern 4: Graph-Based Orchestration

For workflows with conditional branching and state persistence, Pydantic AI's pydantic-graph library provides graph-based orchestration. The library is still evolving (check the latest docs for current API), but the concept is straightforward: define nodes as processing steps, connect them with conditional edges, and let the graph runtime handle execution order and state.

I use this approach in investment research workflows where agents need to iterate, backtrack, and explore multiple analysis paths based on intermediate findings.

In practice, this means defining each analysis step as a node and wiring conditional transitions between them:

from dataclasses import dataclass

@dataclass
class InvestmentContext:
    sector: str
    criteria: dict
    threshold: float
    deps: FinancialDependencies
    usage: Usage

async def investment_research_pipeline(context: InvestmentContext):
    """Graph-style orchestration using conditional handoffs between agents"""

    # Step 1: Initial screening
    screen_result = await screening_agent.run(
        f"Screen stocks in {context.sector} with criteria: {context.criteria}",
        deps=context.deps,
        usage=context.usage
    )

    if screen_result.output.score <= context.threshold:
        return {"recommendation": "NO_ACTION", "reason": "Failed initial screen"}

    # Step 2: Parallel fundamental + technical analysis (conditional on screening)
    fundamental_task = fundamental_agent.run(
        f"Fundamental analysis for {screen_result.output.stocks}",
        deps=context.deps, usage=context.usage
    )
    technical_task = technical_agent.run(
        f"Technical analysis for {screen_result.output.stocks}",
        deps=context.deps, usage=context.usage
    )
    fundamental_result, technical_result = await asyncio.gather(
        fundamental_task, technical_task
    )

    # Step 3: Risk assessment only if fundamental score passes threshold
    if fundamental_result.output.score > 0.6:
        risk_result = await risk_agent.run(
            f"Risk assessment given fundamentals: {fundamental_result.output} "
            f"and technicals: {technical_result.output}",
            deps=context.deps, usage=context.usage
        )
        return await generate_recommendation(
            screen_result, fundamental_result, technical_result, risk_result
        )

    return {"recommendation": "HOLD", "reason": "Weak fundamentals"}

The key value here is explicit conditional branching with early exits. Real investment research does not follow a fixed pipeline — some paths terminate early, some branch into parallel analysis, and some loop back for deeper research. Modeling this as a graph (even without the formal graph library) keeps the logic readable and testable.

For more complex graph needs with formal state machines, LangGraph is a strong alternative — I cover that in building multi-agent systems with LangGraph.

Production-Ready Implementation Strategies

Dependency Injection That Actually Works

After managing eleven agents across production environments, I’ve learned that dependency injection makes or breaks system maintainability. The cardinal rule: never include agents in dependencies. Agents are stateless and global—dependencies should contain only shared resources and configuration.

@dataclass
class ProductionDependencies:
    # Infrastructure resources
    postgres_db: asyncpg.Pool
    redis_cache: aioredis.Redis
    s3_client: aioboto3.Session

    # API clients with authentication
    external_apis: dict[str, httpx.AsyncClient]

    # Configuration and context
    tenant_id: str
    user_permissions: PermissionSet
    environment: str
    correlation_id: str

    # Never include agents here - they're stateless and global
    # ❌ analysis_agent: Agent  # Don't do this

Hierarchical dependency structures enable complex resource sharing while maintaining security boundaries. Service-level dependencies contain infrastructure, agent-level dependencies add specific configuration, and request-level dependencies include user context. This pattern has proven essential for multi-tenant systems where different agents need different permission levels.

Usage Tracking That Prevents Disasters

Token costs in multi-agent systems can spiral quickly. I learned this during a runaway loop that cost $400 in ten minutes. Now every production system includes comprehensive usage tracking and limits.

async def cost_controlled_agent_execution(query: str, deps: Dependencies) -> Result:
    """Execute agents with cost controls and monitoring"""
    usage = Usage()

    try:
        result = await primary_agent.run(
            query,
            deps=deps,
            usage=usage,
            usage_limits=UsageLimits(
                request_limit=50,  # Prevent infinite loops
                total_tokens_limit=10000,  # Cost ceiling
                request_duration_limit=300  # Timeout protection
            )
        )

        # Log usage for monitoring and billing
        await log_usage_metrics(
            agent="primary_agent",
            correlation_id=deps.correlation_id,
            tokens_used=usage.total_tokens,
            cost=usage.total_cost(),
            success=True
        )

        return result

    except UsageLimitExceeded as e:
        # Handle cost overruns gracefully
        await alert_cost_overrun(deps.correlation_id, e.usage)
        return fallback_response(query, reason="cost_limit_exceeded")

Always pass ctx.usage when delegating to maintain comprehensive cost tracking across agent hierarchies. Set reasonable usage limits—I typically use 100 requests and 50,000 tokens for complex workflows. Monitor usage patterns to optimize model selection and identify expensive operations.

Error Resilience That Actually Helps

Production multi-agent systems need multiple layers of error resilience. Circuit breakers prevent cascade failures, retry policies handle transient errors, and fallback agents provide graceful degradation.

from tenacity import AsyncRetrying, stop_after_attempt, wait_exponential

class ResilientAgentOrchestrator:
    def __init__(self):
        self.circuit_breakers = {}
        self.retry_policy = AsyncRetrying(
            wait=wait_exponential(multiplier=1, max=60),
            stop=stop_after_attempt(3),
            reraise=True
        )

    async def execute_with_resilience(self, agent_name: str, agent, query: str, deps, fallback_agent=None):
        """Execute agent with circuit breaker, retry, and fallback"""
        circuit = self.circuit_breakers.get(agent_name, CircuitBreaker(failure_threshold=5))

        if not circuit.can_execute():
            if fallback_agent:
                return await self._execute_fallback(fallback_agent, query, deps, "circuit_open")
            raise Exception(f"Agent {agent_name} circuit breaker open")

        try:
            async for attempt in self.retry_policy:
                with attempt:
                    result = await agent.run(query, deps=deps)
                    circuit.record_success()
                    return result

        except Exception as e:
            circuit.record_failure()
            if fallback_agent and not isinstance(e, UsageLimitExceeded):
                return await self._execute_fallback(fallback_agent, query, deps, str(e))
            raise

    async def _execute_fallback(self, fallback_agent, query, deps, reason):
        """Execute fallback with degraded context"""
        fallback_query = f"[Fallback - {reason}] Provide basic response to: {query}"
        return await fallback_agent.run(fallback_query, deps=deps)

Circuit breakers have saved us from cascade failures multiple times. When OpenAI had that outage last month, our circuit breakers automatically switched to Anthropic models while logging the degradation. Fallback agents using cheaper models kept the system operational at reduced capability rather than complete failure.

Integration Patterns That Actually Work

MCP Integration for Enterprise Systems

Model Context Protocol integration has become essential for connecting agents to enterprise systems. I’ve implemented MCP servers for financial data feeds, regulatory compliance systems, and audit logging that handle thousands of daily requests with enterprise security requirements.

from pydantic_ai import Agent
from pydantic_ai.mcp import MCPServerHTTP
import httpx

# Production MCP server configuration for financial data
financial_mcp = MCPServerHTTP(
    url='https://findata.internal.company.com/mcp',
    tool_prefix='findata',
    http_client=httpx.AsyncClient(
        verify='/etc/ssl/certs/company-internal-ca.pem',
        headers={
            'Authorization': f'Bearer {financial_api_token}',
            'X-Correlation-ID': '{correlation_id}',  # Templated per request
        },
        timeout=httpx.Timeout(60.0),  # Financial queries can be slow
        limits=httpx.Limits(max_connections=20)  # Connection pooling
    )
)

# Agent with financial MCP tools
financial_analyst_agent = Agent(
    'openai:gpt-4o',
    toolsets=[financial_mcp],
    system_prompt='''You are a financial analyst with access to internal market data systems.
    Use findata tools to retrieve accurate, real-time financial information.
    Always cite data sources and include timestamps for time-sensitive data.'''
)

# MCP sampling allows servers to request additional LLM analysis
result = await financial_analyst_agent.run(
    "Generate risk assessment for portfolio rebalancing proposal",
    # MCP server can request additional LLM inference for complex calculations
)

The key insight: MCP servers should encapsulate complex authentication, data validation, and business logic while exposing simple tool interfaces to agents. Our financial MCP server handles real-time market data subscriptions, regulatory compliance validation, and audit trail generation—complexity that individual agents shouldn’t manage.

Performance Optimization That Matters

Model selection dramatically impacts both performance and cost. I’ve implemented adaptive routing that analyzes query complexity to optimize model choice automatically.

class AdaptiveModelRouter:
    def __init__(self):
        self.complexity_analyzer = Agent(
            'openai:gpt-3.5-turbo',  # Cheap model for classification
            output_type=ComplexityAnalysis,
            system_prompt='Analyze query complexity and recommend appropriate model tier'
        )

        self.model_tiers = {
            'simple': 'openai:gpt-3.5-turbo',
            'moderate': 'anthropic:claude-3-haiku-20240307',
            'complex': 'openai:gpt-4o',
            'critical': 'anthropic:claude-3-opus-20240229'
        }

    async def route_query(self, query: str, agent_config: dict) -> Agent:
        """Route query to appropriate model based on complexity analysis"""
        analysis = await self.complexity_analyzer.run(
            f"Analyze complexity of query: {query}"
        )

        selected_model = self.model_tiers[analysis.output.complexity_tier]

        return Agent(
            selected_model,
            **agent_config,
            system_prompt=f"{agent_config['system_prompt']}\n\nModel selected for {analysis.output.complexity_tier} complexity query."
        )

This routing achieves 40-60% cost reduction without quality degradation. Simple queries like “What is the current stock price?” route to GPT-3.5, while complex financial analysis uses GPT-4o. The complexity analyzer itself costs pennies but saves dollars on routing decisions.

Monitoring and Observability

Pydantic Logfire Integration

Pydantic Logfire provides production-grade observability that I wish existed in our early multi-agent deployments. OpenTelemetry compliance means it integrates with existing monitoring infrastructure while providing AI-specific semantic conventions.

import logfire

# Configure structured logging for multi-agent workflows
logfire.configure(
    send_to_logfire='if-token-present',
    console=logfire.ConsoleOptions(colors='auto', verbose=True),
    additional_metadata={
        'service': 'financial-analysis-agents',
        'environment': environment,
        'version': app_version
    }
)

@logfire.instrument('agent_execution')
async def monitored_agent_execution(agent_name: str, query: str, deps: Dependencies):
    """Execute agent with comprehensive observability"""
    with logfire.span('agent_execution', agent=agent_name, query_hash=hash(query)):
        try:
            result = await agent.run(query, deps=deps)

            logfire.info(
                'Agent execution successful',
                agent=agent_name,
                tokens_used=result.usage.total_tokens,
                cost=result.usage.total_cost(),
                response_length=len(str(result.output))
            )

            return result

        except Exception as e:
            logfire.error(
                'Agent execution failed',
                agent=agent_name,
                error=str(e),
                error_type=type(e).__name__
            )
            raise

Real-time trace visualization shows agent handoffs, tool calls, and decision paths—invaluable for debugging complex workflows. Cost tracking at the agent level identifies optimization opportunities. Our typical production deployment shows 30-40% cost reduction through monitoring-driven optimization.

Message History and Debugging

Message history preservation enables conversation continuity and audit compliance. The result.all_messages() method captures complete interaction history while include_content=False protects sensitive data.

async def auditable_agent_interaction(query: str, deps: Dependencies) -> AuditableResult:
    """Execute agent with complete audit trail"""
    result = await compliance_agent.run(query, deps=deps)

    # Capture full interaction history for audit
    full_history = result.all_messages()

    # Sanitized history for logging (removes sensitive content)
    sanitized_history = result.all_messages(include_content=False)

    # Store audit trail
    await audit_store.record_interaction(
        correlation_id=deps.correlation_id,
        agent_name="compliance_agent",
        query_hash=hash(query),
        result_summary=result.output.summary,
        full_history=full_history,
        sanitized_history=sanitized_history,
        cost=result.usage.total_cost()
    )

    return AuditableResult(
        output=result.output,
        audit_id=audit_store.last_audit_id,
        cost=result.usage.total_cost()
    )

Testing Strategies That Work

Multi-agent systems require testing at multiple levels. Unit tests with TestModel prevent API calls during development, integration tests verify end-to-end functionality with cheap models, and production smoke tests ensure deployment success.

import pytest
from pydantic_ai.testing import TestModel

class TestMultiAgentWorkflow:
    def setup_method(self):
        # Mock model for unit testing
        self.test_model = TestModel()

        # Configure test agents with mock model
        self.test_triage_agent = Agent(
            self.test_model,
            deps_type=TestDependencies,
            system_prompt='Route test queries'
        )

    @pytest.mark.asyncio
    async def test_agent_delegation_flow(self):
        """Test agent delegation without API calls"""
        # Configure mock responses
        self.test_model.add_response('Route to product support')

        deps = TestDependencies(
            database=mock_db,
            customer_id="test_customer"
        )

        result = await self.test_triage_agent.run(
            "I have a product question",
            deps=deps
        )

        assert "product support" in result.output.lower()
        assert result.usage.total_tokens == 0  # No API calls made

    @pytest.mark.integration
    async def test_real_agent_workflow(self):
        """Integration test with cheap models"""
        cheap_agent = Agent(
            'openai:gpt-3.5-turbo',  # Cheap for testing
            deps_type=ProductionDependencies,
            system_prompt='Test agent workflow'
        )

        result = await cheap_agent.run(
            "Test query for integration",
            deps=test_dependencies,
            usage_limits=UsageLimits(total_tokens_limit=100)  # Cost control
        )

        assert result.output is not None
        assert result.usage.total_cost() < 0.01  # Penny cost limit

Production smoke tests run after deployment with limited token budgets to verify system health without significant cost. I typically allocate $5 for post-deployment validation across all agent workflows.

Real-World Lessons Learned

After two years building production multi-agent systems, several patterns have proven essential. Start simple with single agents and add complexity incrementally—I’ve seen too many projects fail because they tried to build graph orchestration on day one. Monitor everything from the beginning because debugging distributed agent failures without observability is impossible. Plan for partial failures because agents will fail and your system needs graceful degradation.

Cost management requires constant attention. Set usage limits on everything, monitor token consumption patterns, and optimize model selection based on actual performance data. Our financial analysis system now costs 60% less than our initial implementation while providing better results through careful model routing and prompt optimization.

Security boundaries matter more than you think. Never pass sensitive data through agent system prompts—use dependency injection and tool parameters instead. Implement proper authentication at the MCP layer. Log everything for audit compliance but sanitize sensitive content. I’ve seen HIPAA violations from poorly designed agent communication patterns.

Pydantic AI is my current go-to for new multi-agent systems. The V1.0 stability commitment means I can build on it without worrying about breaking changes every month. Type safety catches bugs before they hit production. Async support and usage tracking handle the operational concerns I used to solve with custom code.

If you are building agents that need to work reliably under production load, start simple — one agent, one workflow, proper monitoring. Add complexity only when you have the observability to understand what is happening. That is the lesson from two years of building these systems.

Related reading