← Back to Home

Building a Production MCP Server: Architecture, Pitfalls, and Best Practices

ai-engineeringmcp-protocolproduction-systems
#mcp-server#model-context-protocol#llm-tooling#agent-infrastructure#production-ai#tool-calling#observability#error-handling

Problem Framing

Most MCP server implementations die in production not because the protocol is complex, but because engineers treat them like stateless API endpoints. They're not. An MCP server sits between your LLM and your critical infrastructure—databases, APIs, file systems—and must handle three concurrent failure modes: the LLM making nonsensical requests, your backend systems timing out, and the client dropping connections mid-execution.

The naive approach looks like this: wrap your existing REST API in MCP tool definitions, deploy it, watch it handle 50 requests fine, then watch it silently fail at request 51 when the LLM decides to call delete_database with a hallucinated parameter. Or worse, it succeeds but your audit logs show nothing because you treated tool execution like a fire-and-forget operation.

The real problem isn't implementing the protocol specification—that's straightforward. The problem is that MCP servers operate in a fundamentally adversarial environment. Your LLM will generate invalid inputs. Your backend will fail at unpredictable times. Your network will partition. And unlike traditional API clients that fail fast and loud, LLM agents will retry with slightly different parameters, creating cascading failures that look like success until you check your database three hours later.

Production MCP servers require thinking about idempotency, partial failure recovery, request validation as a security boundary, and observability that captures both the LLM's intent and the system's actual behavior. The gap between "works in demo" and "works at 3am when your database is slow" is where most implementations fail.

Mental Model

Stop thinking of MCP servers as tool registries. They're protocol translators that sit at a trust boundary. On one side, you have an LLM that speaks in probabilistic tokens. On the other, you have deterministic systems that require exact inputs. The MCP server's job is to maintain invariants across this boundary while gracefully degrading when either side violates expectations.

The correct mental model has three layers. First, the protocol layer handles MCP message framing, capability negotiation, and transport concerns. This layer is stateless and should never make decisions about business logic. It receives JSON-RPC requests and routes them to the appropriate handler.

Second, the validation and transformation layer. This is where you enforce the contract between probabilistic and deterministic systems. Every tool input gets validated against a schema, but not just for type correctness—for semantic validity. An LLM might generate syntactically valid JSON that represents an impossible operation. This layer must catch "delete all users where role=admin" before it reaches your database, even if your tool schema technically allows it.

Third, the execution layer where actual work happens. This layer must be designed for partial failure and idempotency. When an LLM calls a tool that performs multiple operations, you need to know which ones succeeded so you can resume or rollback. Unlike REST APIs where clients handle retries, MCP servers often need to manage this themselves because the LLM will retry with modified parameters, not identical requests.

The key invariant: at any point in the execution pipeline, you should be able to reconstruct what the LLM intended, what actually happened, and whether the two diverged. This means comprehensive logging of both the original request and the transformed, validated version that hit your backend. When things go wrong—and they will—this audit trail is the only way to debug whether the failure was in the LLM's reasoning, your validation logic, or the backend system.

Think of an MCP server as a runtime type system for LLM behavior. Static schemas catch obvious errors, but production systems need runtime invariant checking, automatic rollback on constraint violations, and detailed provenance tracking. The server isn't just executing tools; it's maintaining the integrity of the boundary between learned and programmed behavior.

Architecture

A production MCP server architecture separates concerns across four primary components: the protocol handler, the tool registry, the execution engine, and the observability layer. Each has distinct responsibilities and failure modes.

Figure: A production MCP server architecture
Figure: A production MCP server architecture

The protocol handler owns transport concerns—WebSocket connections, stdio pipes, or HTTP endpoints depending on deployment. Its only job is parsing MCP protocol messages, maintaining connection state, and routing requests. It doesn't interpret tool semantics. When a tools/call request arrives, it extracts the tool name and arguments, then delegates to the registry. Connection failures, malformed JSON, and protocol version mismatches get handled here and logged before they propagate.

The tool registry maintains the catalog of available tools and their schemas. In simple implementations, this is a static dictionary. In production, it's dynamic—tools can be enabled/disabled based on client capabilities, feature flags, or runtime configuration. The registry returns not just tool definitions but also metadata: expected latency, cost estimates, required permissions, and whether the tool is idempotent. This metadata drives decisions in downstream layers.

The validation layer enforces the contract between LLM-generated inputs and your backend systems. Schema validation is table stakes. Production validation includes: semantic constraints (e.g., date ranges must be positive, user IDs must exist), cross-field dependencies (if action=delete, require confirmation=true), and rate limits per tool per client. This layer also handles parameter coercion—LLMs often generate strings when numbers are expected, or vice versa. Decide explicitly whether to coerce or reject.

The execution engine is where state management complexity lives. For simple read-only tools, this is straightforward—call the backend, return the result. For tools that mutate state, you need idempotency keys, transaction boundaries, and partial failure recovery. The engine maintains an execution context that tracks: the request ID, which operations succeeded, current retry count, and any intermediate state needed for resume/rollback.

Figure: A Production MCP Server Architecture Flow
Figure: A Production MCP Server Architecture Flow

The observability layer sits orthogonal to execution flow. Every request generates structured events: protocol messages received, validation outcomes, execution start/end, backend latencies, and final results. These events flow to your logging infrastructure—not as debug prints, but as structured JSON with consistent schema. Critical fields include: request_id, tool_name, execution_time_ms, validation_errors, backend_status, and result_summary. When debugging production failures, you need to correlate LLM behavior with system outcomes across potentially thousands of concurrent requests.

State management deserves explicit architectural attention. Most MCP servers need to maintain: active connections (which clients are connected), in-flight requests (what's currently executing), and execution history (for idempotency checking). Choose your state store based on your deployment model. Single-instance servers can use in-memory state with periodic snapshots. Multi-instance deployments need Redis or similar for shared state. Don't use your application database for this—state store failures should not cascade to backend failures.

Authentication and authorization sit at the protocol handler level but get enforced in the execution engine. The protocol handler validates the client identity (API key, JWT, mutual TLS). The execution engine checks whether that identity can invoke the requested tool with the provided arguments. This separation matters because authorization often depends on argument values, not just tool names. A client might be allowed to read user profiles but only for users in their organization.

Implementation

Start with the protocol handler. The MCP specification defines JSON-RPC 2.0 over various transports. Here's a production-grade WebSocket handler that actually handles the error cases that matter:

code
import asyncioimport jsonfrom typing import Dict, Any, Optionalfrom websockets.server import serve, WebSocketServerProtocolfrom websockets.exceptions import ConnectionClosedimport structloglogger = structlog.get_logger()class MCPProtocolHandler:    def __init__(self, tool_registry, execution_engine):        self.registry = tool_registry        self.engine = execution_engine        self.active_connections: Dict[str, WebSocketServerProtocol] = {}            async def handle_connection(self, websocket: WebSocketServerProtocol):        connection_id = id(websocket)        self.active_connections[connection_id] = websocket                log = logger.bind(connection_id=connection_id)        log.info("mcp_connection_established")                try:            await self._handle_messages(websocket, connection_id, log)        except ConnectionClosed:            log.info("mcp_connection_closed")        except Exception as e:            log.error("mcp_connection_error", error=str(e))        finally:            del self.active_connections[connection_id]                async def _handle_messages(self, websocket, connection_id, log):        async for message in websocket:            try:                request = json.loads(message)                response = await self._route_request(request, connection_id, log)                await websocket.send(json.dumps(response))            except json.JSONDecodeError:                error_response = {                    "jsonrpc": "2.0",                    "error": {"code": -32700, "message": "Parse error"},                    "id": None                }                await websocket.send(json.dumps(error_response))                    async def _route_request(self, request: Dict, connection_id: str, log) -> Dict:        method = request.get("method")        request_id = request.get("id")                log = log.bind(method=method, request_id=request_id)        log.info("mcp_request_received")                if method == "tools/list":            return await self._handle_list_tools(request_id)        elif method == "tools/call":            return await self._handle_tool_call(request, connection_id, log)        else:            return {                "jsonrpc": "2.0",                "error": {"code": -32601, "message": f"Method not found: {method}"},                "id": request_id            }

The validation layer is where most implementations get lazy. Schema validation alone is insufficient. You need semantic validation that understands your domain:

code
from pydantic import BaseModel, validator, Fieldfrom typing import List, Optionalfrom datetime import datetimeclass ToolValidator:    def __init__(self):        self.validators = {}            def register_tool_validator(self, tool_name: str, validator_func):        self.validators[tool_name] = validator_func            async def validate(self, tool_name: str, arguments: Dict) -> tuple[bool, Optional[str], Dict]:        # First: schema validation        tool_schema = self.get_tool_schema(tool_name)        if not tool_schema:            return False, f"Unknown tool: {tool_name}", {}                    try:            validated_args = tool_schema(**arguments)        except Exception as e:            return False, f"Schema validation failed: {str(e)}", {}                    # Second: semantic validation        validator_func = self.validators.get(tool_name)        if validator_func:            is_valid, error_msg = await validator_func(validated_args)            if not is_valid:                return False, error_msg, {}                        # Third: transform to backend format        backend_args = self._transform_for_backend(validated_args)                return True, None, backend_args# Example: validating a database query toolclass QueryDatabaseArgs(BaseModel):    query: str    limit: int = Field(default=100, le=1000)    timeout_seconds: int = Field(default=30, le=300)        @validator('query')    def validate_query_safety(cls, v):        # Block obvious dangerous patterns        dangerous_patterns = ['DROP', 'DELETE', 'TRUNCATE', 'ALTER']        query_upper = v.upper()        for pattern in dangerous_patterns:            if pattern in query_upper:                raise ValueError(f"Query contains forbidden operation: {pattern}")        return vasync def validate_query_tool(args: QueryDatabaseArgs) -> tuple[bool, Optional[str]]:    # Additional semantic checks beyond schema    if len(args.query) > 10000:        return False, "Query exceeds maximum length"            # Check if user has permission for referenced tables    tables = extract_tables_from_query(args.query)    for table in tables:        if not await check_table_permission(table):            return False, f"Insufficient permissions for table: {table}"                return True, None

The execution engine handles the actual work and must be designed for observability and failure recovery:

code
import asynciofrom contextlib import asynccontextmanagerfrom dataclasses import dataclassfrom typing import Any, Optionalimport time@dataclassclass ExecutionContext:    request_id: str    tool_name: str    arguments: Dict[str, Any]    started_at: float    attempt: int = 1    max_attempts: int = 3    class ExecutionEngine:    def __init__(self, metrics_client, state_store):        self.metrics = metrics_client        self.state = state_store        self.tool_executors = {}            def register_executor(self, tool_name: str, executor_func):        self.tool_executors[tool_name] = executor_func            async def execute(self, tool_name: str, arguments: Dict, request_id: str) -> Dict:        ctx = ExecutionContext(            request_id=request_id,            tool_name=tool_name,            arguments=arguments,            started_at=time.time()        )                # Check for duplicate request (idempotency)        cached_result = await self.state.get(f"result:{request_id}")        if cached_result:            self.metrics.increment("execution.cache_hit", tags=[f"tool:{tool_name}"])            return cached_result                    executor = self.tool_executors.get(tool_name)        if not executor:            return {"error": f"No executor registered for {tool_name}"}                    # Execute with retry logic        result = await self._execute_with_retry(executor, ctx)                # Cache successful results        if "error" not in result:            await self.state.set(                f"result:{request_id}",                 result,                 ttl=3600  # 1 hour            )                    # Record metrics        duration_ms = (time.time() - ctx.started_at) * 1000        self.metrics.histogram(            "execution.duration_ms",            duration_ms,            tags=[f"tool:{tool_name}", f"success:{'error' not in result}"]        )                return result            async def _execute_with_retry(self, executor, ctx: ExecutionContext) -> Dict:        last_error = None                for attempt in range(1, ctx.max_attempts + 1):            ctx.attempt = attempt                        try:                result = await asyncio.wait_for(                    executor(ctx.arguments),                    timeout=self._get_timeout(ctx.tool_name)                )                return result                            except asyncio.TimeoutError:                last_error = "Execution timeout"                self.metrics.increment(                    "execution.timeout",                    tags=[f"tool:{ctx.tool_name}", f"attempt:{attempt}"]                )                            except RetryableError as e:                last_error = str(e)                if attempt < ctx.max_attempts:                    await asyncio.sleep(2 ** attempt)  # Exponential backoff                                except Exception as e:                # Non-retryable error                return {"error": f"Execution failed: {str(e)}"}                        return {"error": f"Max retries exceeded. Last error: {last_error}"}            def _get_timeout(self, tool_name: str) -> float:        # Tool-specific timeouts        timeouts = {            "query_database": 30.0,            "fetch_url": 10.0,            "generate_report": 120.0        }        return timeouts.get(tool_name, 60.0)

State management for multi-instance deployments requires distributed coordination:

code
import redis.asyncio as redisimport picklefrom typing import Any, Optionalclass RedisStateStore:    def __init__(self, redis_url: str):        self.redis = redis.from_url(redis_url, decode_responses=False)            async def get(self, key: str) -> Optional[Any]:        data = await self.redis.get(key)        return pickle.loads(data) if data else None            async def set(self, key: str, value: Any, ttl: int = 3600):        await self.redis.set(key, pickle.dumps(value), ex=ttl)            async def acquire_lock(self, resource: str, timeout: int = 10) -> bool:        """Distributed lock for preventing concurrent execution"""        lock_key = f"lock:{resource}"        acquired = await self.redis.set(lock_key, "1", nx=True, ex=timeout)        return bool(acquired)            async def release_lock(self, resource: str):        await self.redis.delete(f"lock:{resource}")

Cost tracking and rate limiting prevent runaway execution:

code
class CostTracker:    def __init__(self, state_store):        self.state = state_store        self.cost_per_tool = {            "query_database": 0.01,  # dollars            "call_external_api": 0.05,            "generate_report": 0.10        }            async def check_and_record(self, client_id: str, tool_name: str) -> tuple[bool, str]:        # Check rate limits        minute_key = f"ratelimit:{client_id}:{tool_name}:minute"        minute_count = await self.state.redis.incr(minute_key)                if minute_count == 1:            await self.state.redis.expire(minute_key, 60)                    if minute_count > self._get_rate_limit(tool_name):            return False, "Rate limit exceeded"                    # Check cost budget        cost = self.cost_per_tool.get(tool_name, 0.01)        daily_key = f"cost:{client_id}:daily"        daily_cost = float(await self.state.redis.incrbyfloat(daily_key, cost))                if daily_cost > await self._get_budget(client_id):            return False, "Budget exceeded"                    return True, ""            def _get_rate_limit(self, tool_name: str) -> int:        limits = {            "query_database": 60,  # per minute            "call_external_api": 30,            "generate_report": 10        }        return limits.get(tool_name, 100)

Pitfalls & Failure Modes

The silent failure mode is the most dangerous. An LLM calls a tool, the tool returns an error, but the error message is too generic for the LLM to recover. The agent retries with slightly modified parameters, fails again, and enters a loop. You won't see this in metrics because each individual request succeeds at the protocol level—it returns valid JSON. But your backend is getting hammered with variations of an impossible request.

Prevention requires error messages that are both machine-readable and actionable. Don't return {"error": "Invalid input"}. Return:

code
{  "error": {    "code": "INVALID_DATE_RANGE",    "message": "Start date must be before end date",    "details": {      "provided_start": "2024-03-15",      "provided_end": "2024-03-10",      "constraint": "start_date < end_date"    },    "suggestions": [      "Swap start_date and end_date values",      "Verify date format is YYYY-MM-DD"    ]  }}

Cost explosions happen when you don't account for LLM behavior patterns. An agent decides to analyze 10,000 documents. Instead of calling your analyze_document tool 10,000 times sequentially, it generates a parallel batch of 500 calls because that's what it learned is "efficient." Your execution engine dutifully spawns 500 concurrent operations, overwhelming your backend. Your database connections saturate, requests start timing out, the LLM sees failures and retries, and now you have 1,000 concurrent requests.

The fix is explicit concurrency control at the execution engine level:

code
class ExecutionEngine:    def __init__(self, max_concurrent_per_tool: Dict[str, int]):        self.semaphores = {            tool: asyncio.Semaphore(limit)             for tool, limit in max_concurrent_per_tool.items()        }            async def execute(self, tool_name: str, arguments: Dict, request_id: str):        semaphore = self.semaphores.get(tool_name)        if semaphore:            async with semaphore:                return await self._execute_internal(tool_name, arguments, request_id)        return await self._execute_internal(tool_name, arguments, request_id)

State management failures appear as duplicate operations. An agent calls create_user, the operation succeeds but the response gets lost due to network hiccup, the LLM retries, and now you have two users. Idempotency keys prevent this but only if you implement them correctly. The key must be stable across retries but unique across distinct operations.

Wrong approach: use timestamp as idempotency key. Each retry generates a new timestamp, defeating the purpose.

Right approach: hash the operation parameters plus client-provided request ID:

code
import hashlibimport jsondef generate_idempotency_key(tool_name: str, arguments: Dict, request_id: str) -> str:    # Create stable representation of arguments    stable_args = json.dumps(arguments, sort_keys=True)    content = f"{tool_name}:{stable_args}:{request_id}"    return hashlib.sha256(content.encode()).hexdigest()

Validation bypass through parameter injection is common. LLMs learn to encode instructions in parameter values. You validate that a filename parameter doesn't contain path traversal characters, but the LLM generates ../../../../etc/passwd encoded as unicode escapes or URL encoding. Your validation regex misses it, but your file system doesn't.

Defense requires normalizing inputs before validation:

code
import urllib.parseimport unicodedatadef normalize_string_parameter(value: str) -> str:    # Decode URL encoding    decoded = urllib.parse.unquote(value)    # Normalize unicode    normalized = unicodedata.normalize('NFKC', decoded)    # Additional normalizations based on your threat model    return normalized

Observability gaps manifest as "it worked in staging." The difference is staging doesn't have 50 concurrent clients, doesn't have clients that maintain connections for hours, and doesn't have the variety of inputs production LLMs generate. Your logs show individual requests succeeding but miss the pattern of repeated failures for a specific client or tool combination.

Structured logging with correlation IDs across the entire request lifecycle is non-negotiable:

code
import structlogfrom contextvars import ContextVarrequest_id_var = ContextVar('request_id', default=None)def setup_logging():    structlog.configure(        processors=[            structlog.contextvars.merge_contextvars,            structlog.processors.add_log_level,            structlog.processors.TimeStamper(fmt="iso"),            structlog.processors.JSONRenderer()        ]    )    # Use itlog = structlog.get_logger()request_id_var.set(request_id)log.info("tool_executed", tool=tool_name, duration_ms=duration)

Summary & Next Steps

Production MCP servers are protocol translators at a trust boundary, not simple API wrappers. The architecture that works separates protocol handling, validation, execution, and observability into distinct layers with clear failure semantics. Validation must be semantic, not just schematic. Execution must handle partial failures, implement idempotency, and enforce rate limits. Observability must capture both LLM intent and system behavior.

The failure modes that matter in production are silent failures, cost explosions, duplicate operations, validation bypass, and correlation gaps in logs. Each has specific technical mitigations but they all share a common pattern: treating the LLM as an adversarial client that will eventually explore every edge case in your system.

Start your production implementation by building the state store and observability layer first. Don't add tool executors until you can track their lifecycle through structured logs and metrics. Implement idempotency before you implement complex operations. Add rate limiting and cost tracking before you expose write operations.

Next steps: implement circuit breakers for backend dependencies, add request prioritization based on tool cost, and build dashboards that show tool success rates segmented by error type. When you start seeing patterns in your failure logs, that's when you know your MCP server is actually ready for production load.