← Back to Home

Designing Secure MCP Servers: Preventing Context Injection & Data Exfiltration

mcp-securityllm-systemsproduction-ai
#model-context-protocol#context-injection#data-exfiltration#security-patterns#mcp-servers#llm-security#auth-systems#multi-tenancy#observability#rate-limiting

The Problem: MCP Servers Are Attack Surface, Not Just APIs

Your MCP server just exposed customer data to the wrong tenant. Not through a bug in your authorization logic. Not because someone cracked your database. Because an LLM convinced your server that it was acting on behalf of a different user.

This is the security reality of Model Context Protocol servers that most teams discover too late. MCP servers sit between untrusted LLM outputs and trusted backend systems. Every request comes wrapped in natural language that could be adversarial, every parameter could be an injection attempt, and every response might contain data the requesting context shouldn't see.

Traditional API security isn't enough. You can't just validate JSON schemas and call it secure. MCP servers process semantically rich requests from models that are explicitly designed to be persuasive. When an LLM makes a tool call, you're not just handling a function invocation—you're handling a structured argument generated by a system optimized for linguistic manipulation.

I've seen production MCP servers leak sensitive data because they trusted the "user_id" parameter in a tool call without validating it against the actual authenticated session. I've debugged servers that let models enumerate internal resources by simply asking in different phrasings. I've watched cost explosions from a single malicious prompt that figured out how to trigger expensive database queries in a loop.

The core issue: MCP servers must defend against three threat vectors simultaneously—malicious users crafting adversarial prompts, models making mistakes that bypass security boundaries, and legitimate but dangerous model behaviors that exhaust resources or leak information through side channels.

Most teams building MCP servers treat security as an afterthought. They focus on making the protocol work, getting capabilities exposed, and shipping features. Security review happens right before launch, if at all. By then, the architecture has baked in assumptions that are fundamentally incompatible with secure multi-tenant operation.

This article explains how to design MCP servers that are secure by default, not by accident. Not theory—patterns that survive production.

The Mental Model: MCP Servers Are Context-Aware Proxies Under Adversarial Load

Stop thinking of MCP servers as APIs. Start thinking of them as context-aware proxies operating under adversarial conditions.

An API receives requests, validates inputs, executes operations, returns responses. The trust boundary is clear: authenticate the client, authorize the operation, execute.

MCP servers do something fundamentally different. They receive requests that claim to operate in a specific context, but that context is mediated through an LLM that might be confused, manipulated, or simply wrong. The request says "fetch customer data for user_123" but you need to verify: Does the authenticated session actually have access to user_123? Is this request part of a conversation that established this context? Or is this the model hallucinating a user ID or being tricked into accessing data it shouldn't?

The key abstraction: Every MCP request operates within three distinct security contexts that must all be validated independently:

Authentication context: Who initiated the conversation? This is your traditional OAuth token, API key, or session identifier. This never changes during a conversation.

Authorization context: What resources is this conversation allowed to access? This derives from authentication but can be narrowed based on conversation state. A user who authenticated with read-only permissions shouldn't suddenly get write access because the model asked nicely.

Conversation context: What has the model been explicitly told it can do in this specific conversation? This is the narrowest scope. Even if a user has admin permissions, if this conversation is scoped to "analyze sales data," the model shouldn't be able to "delete all users" just because it has the technical capability.

Most security breaches in MCP systems come from mixing these contexts or validating only one when all three matter.

The other critical mental shift: MCP servers operate under continuous adversarial load, even from legitimate users. Prompt injection isn't just an attack vector—it's Tuesday. Users will accidentally craft prompts that cause models to attempt unauthorized operations. Models will creatively misinterpret instructions in ways that bypass security boundaries. Your server must assume every request might be adversarial while still being usable.

This is fundamentally different from traditional API security, where adversarial requests are anomalies. In MCP systems, they're expected behavior that requires architectural defenses, not just input validation.

Architecture: Layered Defense for Context-Mediated Requests

MCP servers require defense in depth with explicit boundaries between authentication, authorization, and execution.

Figure: Architecture: Layered Defense for Context-Mediated Requests
Figure: Architecture: Layered Defense for Context-Mediated Requests

Each layer must be independent. Authentication failure doesn't reach context validation. Context violations don't reach rate limiting. This prevents bypasses where a flaw in one layer leaks information about another.

Component Responsibilities

Auth Layer verifies the session is valid and identifies the authenticated principal. This is stateless—validate the token, extract claims, pass along. No business logic here.

Context Validation checks whether the requested operation is consistent with conversation state. This is stateful—it requires knowing what context was established earlier in the conversation. If this is the first message and the model immediately requests "user_456's private data," that's suspicious.

Rate Limiting operates at multiple granularities: per-session, per-resource-type, per-tenant. This prevents both malicious exhaustion attacks and accidentally expensive model behaviors.

Request Sanitization strips or escapes anything that could be used for injection. Parameter names, resource URIs, query strings—all get validated against allowlists, not blocklists.

Resource Access Control enforces fine-grained permissions. This is where "user has admin role" gets translated to "user can access resources X, Y, Z with operations A, B, C."

Response Sanitization ensures responses don't leak information beyond what the context should see. Even if execution succeeded, maybe the response contains fields this context shouldn't access.

Audit Logging records every decision at every layer. When something goes wrong, you need to reconstruct why it was allowed.

State Management: The Core Security Decision

The hardest architectural choice: stateless vs. stateful context tracking.

Stateless MCP servers validate each request independently using only the authentication token and the request itself. Simpler to scale, no session management, no conversation state to lose.

Stateful MCP servers maintain conversation context across requests. They know what resources were mentioned earlier, what permissions were granted, what operations have already been performed.

Most teams start stateless and regret it when they realize they can't detect when a model is accessing resources it shouldn't have known existed.

The hybrid pattern that works: Stateless auth, stateful context boundaries.

Validate tokens statelessly. Track conversation context in short-lived session state (Redis, memcached, or even local memory with sticky sessions). This gives you security benefits of context tracking without requiring persistent sessions for every request.

Figure: State Management: The Core Security Decision
Figure: State Management: The Core Security Decision

Context state has a TTL (30 minutes is typical). After timeout, the conversation starts fresh with a narrower default context. This limits damage from context pollution attacks where an adversary slowly expands permissions over many turns.

Implementation: Building Production-Grade Secure MCP Servers

Layer 1: Authentication & Multi-Tenant Isolation

Authentication must be bulletproof before anything else runs. This is the foundation.

code
from typing import Optional, Dict, Anyfrom dataclasses import dataclassfrom datetime import datetime, timedeltaimport jwtimport secrets@dataclassclass AuthenticatedContext:    """    Immutable authentication context for an MCP session.    This NEVER changes after initial auth.    """    tenant_id: str    user_id: str    roles: list[str]    scopes: list[str]    authenticated_at: datetime    expires_at: datetime        def is_expired(self) -> bool:        return datetime.utcnow() > self.expires_at        def has_scope(self, required_scope: str) -> bool:        return required_scope in self.scopes        def has_role(self, required_role: str) -> bool:        return required_role in self.rolesclass AuthenticationLayer:    """    Stateless auth validation.    Zero trust: validate every token on every request.    """        def __init__(self, jwt_secret: str, allowed_issuers: list[str]):        self.jwt_secret = jwt_secret        self.allowed_issuers = set(allowed_issuers)        def authenticate(self, token: str) -> Optional[AuthenticatedContext]:        """        Validate token and extract authenticated context.        Returns None if token is invalid.        """        try:            # Decode and verify JWT            payload = jwt.decode(                token,                self.jwt_secret,                algorithms=["HS256"],                options={"verify_exp": True}            )                        # Verify issuer is allowed            if payload.get("iss") not in self.allowed_issuers:                return None                        # Extract claims            return AuthenticatedContext(                tenant_id=payload["tenant_id"],                user_id=payload["sub"],                roles=payload.get("roles", []),                scopes=payload.get("scopes", []),                authenticated_at=datetime.fromtimestamp(payload["iat"]),                expires_at=datetime.fromtimestamp(payload["exp"])            )                    except jwt.InvalidTokenError:            return None        except KeyError:            # Missing required claims            return Noneclass TenantIsolation:    """    Enforce strict tenant boundaries.    Multi-tenancy is where most security failures happen.    """        @staticmethod    def validate_resource_access(        auth_context: AuthenticatedContext,        resource_uri: str    ) -> bool:        """        Critical: Validate resource belongs to authenticated tenant.                Common mistakes:        - Trusting resource_id from request without checking tenant        - Allowing cross-tenant access for admin users        - Not validating derived resources (if user owns resource A,          and resource B is child of A, validate B's tenant too)        """        # Parse tenant from resource URI        # Resource URIs must encode tenant: tenant://{tenant_id}/resource/{id}        if not resource_uri.startswith("tenant://"):            return False                parts = resource_uri.split("/")        if len(parts) < 4:            return False                resource_tenant = parts[2]                # Strict equality check        # NO exceptions for admin users, NO cross-tenant access        return resource_tenant == auth_context.tenant_id

Critical production considerations:

Never trust user IDs or tenant IDs from request parameters. Extract them exclusively from validated auth tokens. I've debugged three separate incidents where "user_id" was a request parameter that the model could manipulate.

Token validation must be synchronous and fast. If JWT verification takes >10ms, you'll notice latency. Consider local key caching or JWT parsing optimizations. But never skip validation for performance.

Multi-tenant isolation is non-negotiable. Admin users shouldn't bypass tenant boundaries. Support staff shouldn't either. Build separate admin APIs with explicit cross-tenant operations if needed, but your MCP server operates in single-tenant mode always.

Layer 2: Conversation Context & Boundary Enforcement

Context tracking prevents confused deputy attacks where the model accesses resources it shouldn't know about.

code
from typing import Set, Optionalfrom datetime import datetime, timedeltaimport redisimport json@dataclassclass ConversationContext:    """    Tracks what resources and capabilities this conversation has accessed.    This is your defense against context expansion attacks.    """    conversation_id: str    tenant_id: str    user_id: str        # Resources explicitly mentioned or accessed    accessed_resources: Set[str]        # Operations performed (for audit trail)    operation_history: list[Dict[str, Any]]        # Context boundaries    max_resources_per_conversation: int = 100    allowed_resource_patterns: Set[str] = None        created_at: datetime = None    last_accessed: datetime = None        def can_access_resource(self, resource_uri: str) -> bool:        """        Check if resource access is consistent with conversation state.        """        # Too many resources? Possible enumeration attack        if len(self.accessed_resources) >= self.max_resources_per_conversation:            return False                # Resource pattern allowlist        if self.allowed_resource_patterns:            if not any(                self._matches_pattern(resource_uri, pattern)                for pattern in self.allowed_resource_patterns            ):                return False                return True        def record_access(self, resource_uri: str, operation: str):        """Track resource access for audit and anomaly detection"""        self.accessed_resources.add(resource_uri)        self.operation_history.append({            "resource": resource_uri,            "operation": operation,            "timestamp": datetime.utcnow().isoformat()        })        self.last_accessed = datetime.utcnow()class ConversationContextManager:    """    Manages conversation state with TTL and boundaries.    Uses Redis for distributed deployments.    """        def __init__(self, redis_client: redis.Redis, default_ttl: int = 1800):        self.redis = redis_client        self.default_ttl = default_ttl        def get_context(        self,        conversation_id: str,        auth_context: AuthenticatedContext    ) -> Optional[ConversationContext]:        """        Fetch conversation context, validating it belongs to auth context.        """        key = f"mcp:context:{conversation_id}"        data = self.redis.get(key)                if not data:            return None                context = ConversationContext(**json.loads(data))                # Validate context matches auth        if (context.tenant_id != auth_context.tenant_id or            context.user_id != auth_context.user_id):            # Possible session hijacking attempt            return None                return context        def create_context(        self,        conversation_id: str,        auth_context: AuthenticatedContext,        allowed_patterns: Optional[Set[str]] = None    ) -> ConversationContext:        """        Create new conversation context with explicit boundaries.        """        context = ConversationContext(            conversation_id=conversation_id,            tenant_id=auth_context.tenant_id,            user_id=auth_context.user_id,            accessed_resources=set(),            operation_history=[],            allowed_resource_patterns=allowed_patterns,            created_at=datetime.utcnow(),            last_accessed=datetime.utcnow()        )                self._save_context(context)        return context        def _save_context(self, context: ConversationContext):        """Persist context with TTL"""        key = f"mcp:context:{context.conversation_id}"                # Serialize (sets need special handling)        data = {            **context.__dict__,            "accessed_resources": list(context.accessed_resources),            "allowed_resource_patterns": list(context.allowed_resource_patterns)                 if context.allowed_resource_patterns else None        }                self.redis.setex(            key,            self.default_ttl,            json.dumps(data, default=str)        )

Production patterns:

Context TTL should be short—30 minutes maximum. Long-lived contexts accumulate permissions and accessed resources, expanding attack surface. Fresh contexts force re-establishing security boundaries.

Resource access limits prevent enumeration attacks. If a conversation has accessed 100 different customer records, that's suspicious. Most legitimate conversations access 5-10 resources.

Allowed resource patterns constrain what the conversation can touch. If a user says "analyze my sales data," the conversation context should be scoped to tenant://{tenant_id}/sales/*. Any attempt to access tenant://{tenant_id}/hr/* gets blocked regardless of user permissions.

Layer 3: Rate Limiting & Resource Exhaustion Protection

Models can accidentally (or intentionally) exhaust resources. Rate limiting must operate at multiple granularities.

code
from collections import defaultdictfrom datetime import datetime, timedeltaimport timeclass MultiGranularityRateLimiter:    """    Rate limiting at tenant, user, conversation, and resource levels.    Prevents both malicious attacks and expensive model mistakes.    """        def __init__(self, redis_client: redis.Redis):        self.redis = redis_client                # Limits: (window_seconds, max_requests)        self.limits = {            "tenant": (60, 1000),      # 1000 requests per minute per tenant            "user": (60, 100),         # 100 requests per minute per user            "conversation": (60, 50),  # 50 requests per minute per conversation            "resource": (60, 20)       # 20 accesses per minute per resource        }        def check_limits(        self,        tenant_id: str,        user_id: str,        conversation_id: str,        resource_uri: Optional[str] = None    ) -> tuple[bool, Optional[str]]:        """        Check all applicable rate limits.        Returns (allowed, reason_if_denied)        """        checks = [            ("tenant", tenant_id),            ("user", f"{tenant_id}:{user_id}"),            ("conversation", conversation_id)        ]                if resource_uri:            checks.append(("resource", resource_uri))                for limit_type, identifier in checks:            allowed = self._check_limit(limit_type, identifier)            if not allowed:                return False, f"Rate limit exceeded: {limit_type}"                return True, None        def _check_limit(self, limit_type: str, identifier: str) -> bool:        """        Sliding window rate limit check using Redis sorted sets.        """        window_seconds, max_requests = self.limits[limit_type]        key = f"ratelimit:{limit_type}:{identifier}"        now = time.time()        window_start = now - window_seconds                # Remove old entries        self.redis.zremrangebyscore(key, 0, window_start)                # Count requests in current window        current_count = self.redis.zcard(key)                if current_count >= max_requests:            return False                # Add current request        self.redis.zadd(key, {str(now): now})        self.redis.expire(key, window_seconds)                return Trueclass ResourceCostTracking:    """    Track cost per operation for budget enforcement.    Some operations (large DB queries, expensive API calls) cost more.    """        def __init__(self, redis_client: redis.Redis):        self.redis = redis_client                # Cost in arbitrary units        self.operation_costs = {            "read_resource": 1,            "list_resources": 5,            "search_resources": 10,            "write_resource": 2,            "execute_query": 20        }        def check_budget(        self,        tenant_id: str,        operation: str,        daily_budget: int = 10000    ) -> bool:        """        Check if tenant has budget for this operation.        Resets daily.        """        cost = self.operation_costs.get(operation, 1)        key = f"budget:{tenant_id}:{datetime.utcnow().strftime('%Y-%m-%d')}"                current = int(self.redis.get(key) or 0)                if current + cost > daily_budget:            return False                # Increment budget usage        self.redis.incrby(key, cost)        self.redis.expire(key, 86400)  # 24 hours                return True

Production patterns:

Separate rate limits by granularity. Tenant limits prevent one customer from affecting others. User limits prevent individuals from exhausting tenant quotas. Conversation limits catch runaway model behavior.

Resource-level limits are critical. Without them, a model can repeatedly query the same expensive resource. I've seen $10k database bills from a single conversation that kept re-fetching the same large dataset.

Cost-based limiting catches what request-based limiting misses. A hundred cheap operations might be fine. A hundred expensive database queries are not. Track cost in abstract units and enforce budgets.

Layer 4: Request & Response Sanitization

Injection attacks work because you trust structured data from an LLM. Don't.

code
import refrom typing import Any, Dictfrom urllib.parse import urlparseclass RequestSanitizer:    """    Sanitize all inputs before execution.    Never trust LLM-generated parameters.    """        # Allowlist patterns for common fields    SAFE_PATTERNS = {        "resource_id": r'^[a-zA-Z0-9_-]{1,64}$',        "operation": r'^[a-z_]{1,32}$',        "field_name": r'^[a-zA-Z_][a-zA-Z0-9_]{0,63}$'    }        @classmethod    def sanitize_resource_uri(cls, uri: str) -> Optional[str]:        """        Validate resource URI format and extract safe components.        """        # Must match expected format        if not uri.startswith("tenant://"):            return None                parsed = urlparse(uri)                # Validate each component        tenant_id = parsed.netloc        if not re.match(cls.SAFE_PATTERNS["resource_id"], tenant_id):            return None                path_parts = [p for p in parsed.path.split("/") if p]        for part in path_parts:            if not re.match(cls.SAFE_PATTERNS["resource_id"], part):                return None                # Reconstruct sanitized URI        return f"tenant://{tenant_id}/{'/'.join(path_parts)}"        @classmethod    def sanitize_parameters(cls, params: Dict[str, Any]) -> Dict[str, Any]:        """        Strip or escape dangerous parameter values.        """        sanitized = {}                for key, value in params.items():            # Validate key format            if not re.match(cls.SAFE_PATTERNS["field_name"], key):                continue                        # Sanitize value based on type            if isinstance(value, str):                # Strip control characters                sanitized[key] = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', value)            elif isinstance(value, (int, float, bool)):                sanitized[key] = value            elif isinstance(value, dict):                # Recursive sanitization                sanitized[key] = cls.sanitize_parameters(value)            else:                # Unknown type, skip                continue                return sanitizedclass ResponseSanitizer:    """    Sanitize responses to prevent data leakage.    """        def __init__(self, field_redaction_rules: Dict[str, callable]):        self.redaction_rules = field_redaction_rules        def sanitize_response(        self,        response: Dict[str, Any],        auth_context: AuthenticatedContext    ) -> Dict[str, Any]:        """        Remove or redact fields based on context permissions.        """        sanitized = {}                for field, value in response.items():            # Apply redaction rules            if field in self.redaction_rules:                value = self.redaction_rules[field](value, auth_context)                        if isinstance(value, dict):                sanitized[field] = self.sanitize_response(value, auth_context)            else:                sanitized[field] = value                return sanitized# Example redaction rulesdef redact_pii(value: str, auth_context: AuthenticatedContext) -> str:    """Redact PII unless user has pii_read scope"""    if not auth_context.has_scope("pii_read"):        # Replace with masked version        return "***REDACTED***"    return valuedef redact_sensitive_fields(value: Any, auth_context: AuthenticatedContext) -> Any:    """Remove sensitive fields entirely"""    if not auth_context.has_role("admin"):        return None    return value

Production considerations:

Allowlists beat blocklists every time. Don't try to filter out "bad" characters. Define exactly what's allowed and reject everything else.

Sanitize recursively. Parameters can be nested. Models can put injection attempts in nested objects. Traverse the entire structure.

Response sanitization is as important as request sanitization. Just because your backend returned a field doesn't mean this context should see it. Apply field-level permissions based on scopes and roles.

Layer 5: Observability & Audit Logging

Security that can't be audited is security theater.

code
import structlogfrom typing import Dict, Anyfrom datetime import datetimeclass MCPAuditLogger:    """    Comprehensive audit logging for security analysis.    Every decision must be reconstructable.    """        def __init__(self):        self.logger = structlog.get_logger()        def log_request(        self,        conversation_id: str,        auth_context: AuthenticatedContext,        operation: str,        resource_uri: str,        parameters: Dict[str, Any],        request_id: str    ):        """Log every incoming request with full context"""        self.logger.info(            "mcp_request",            request_id=request_id,            conversation_id=conversation_id,            tenant_id=auth_context.tenant_id,            user_id=auth_context.user_id,            operation=operation,            resource_uri=resource_uri,            parameter_keys=list(parameters.keys()),            timestamp=datetime.utcnow().isoformat()        )        def log_authorization_decision(        self,        request_id: str,        decision: str,        reason: str,        checked_permissions: list[str]    ):        """Log every authorization decision with reasoning"""        self.logger.info(            "mcp_authz_decision",            request_id=request_id,            decision=decision,            reason=reason,            checked_permissions=checked_permissions,            timestamp=datetime.utcnow().isoformat()        )        def log_context_violation(        self,        request_id: str,        conversation_id: str,        violation_type: str,        details: Dict[str, Any]    ):        """Log potential security violations"""        self.logger.warning(            "mcp_context_violation",            request_id=request_id,            conversation_id=conversation_id,            violation_type=violation_type,            details=details,            timestamp=datetime.utcnow().isoformat()        )        def log_rate_limit_exceeded(        self,        request_id: str,        limit_type: str,        identifier: str    ):        """Track rate limit violations for attack detection"""        self.logger.warning(            "mcp_rate_limit_exceeded",            request_id=request_id,            limit_type=limit_type,            identifier=identifier,            timestamp=datetime.utcnow().isoformat()        )

Log everything. Request IDs, auth context, operations attempted, authorization decisions, rate limit checks, resource access patterns. When you're investigating a breach, you need complete reconstruction.

Use structured logging. JSON logs are searchable and analyzable. Free-form text logs are useless at scale.

Separate audit logs from application logs. Audit logs have different retention requirements and compliance implications. Store them separately with appropriate access controls.

Pitfalls & Failure Modes

Context Injection Through Parameter Manipulation

Models will attempt to access resources by manipulating parameters. Not maliciously—just because that's how they solve problems.

Symptom: User asks "show me customer data" and the model generates a request with customer_id="admin" or customer_id="../../../sensitive/data".

Detection: Spike in 403 authorization failures. Unusual resource URIs in logs. Parameter values that don't match expected patterns.

Prevention: Never trust resource identifiers from request parameters. Always derive them from authenticated context and validate against conversation state.

Conversation Context Expansion Attacks

Attacker slowly expands conversation permissions over many turns, eventually accessing resources outside original scope.

Symptom: Conversation starts with "analyze sales data" but ends with accessing HR records. Gradual permission creep over 20+ requests.

Detection: Track resources accessed per conversation. Alert when count exceeds normal bounds or resource types deviate from initial scope.

Prevention: Explicit conversation context boundaries set at creation. Hard limits on total resources accessed. Context TTL forces reset.

Rate Limit Bypass Through Conversation Rotation

Attacker creates new conversations to reset conversation-level rate limits, effectively bypassing per-conversation restrictions.

Symptom: Same user creating dozens of conversations in short time windows. Each conversation stays under per-conversation limits but aggregate usage is excessive.

Detection: Track conversation creation rate per user. Alert on >5 new conversations per minute.

Prevention: User-level rate limits must exist independently. Conversation limits catch mistakes; user limits catch attacks.

Response Data Leakage Through Error Messages

Error messages leak information about resources the context shouldn't know exist.

Symptom: Request for unauthorized resource returns "Resource exists but access denied" vs. "Resource not found". Attacker learns which resources exist.

Detection: Review error messages in audit logs. Check if unauthorized requests get different errors than missing-resource requests.

Prevention: Uniform error responses for all authorization failures. Don't distinguish "resource exists but forbidden" from "resource doesn't exist."

Multi-Tenant Isolation Bypass Through Shared Resources

Resources that span tenants (shared configurations, global settings) create bypass opportunities.

Symptom: User accesses data from different tenant through shared resource references.

Detection: Analyze resource access patterns for cross-tenant traversal. Graph database queries showing unexpected tenant relationships.

Prevention: Avoid shared resources entirely. If unavoidable, implement separate access control layer for cross-tenant resources. Never trust tenant ID from resource URI for shared resources.

Summary & Next Steps

Secure MCP servers require defense at every layer: authentication, authorization, context validation, rate limiting, sanitization, and audit logging. Each layer must operate independently—failure in one doesn't expose others.

The core insight: MCP servers mediate between untrusted LLM outputs and trusted backend systems. Every request is potentially adversarial. Design for that reality.

Build security in from the start. Retrofitting security onto an existing MCP server means rewriting architecture. Authentication and context tracking aren't features you add later—they're foundational design decisions.

Start with these implementations:

This week: Implement stateless JWT authentication with strict tenant isolation. No request reaches execution without validated auth context.

Next week: Add conversation context tracking with explicit resource boundaries and TTL. Track what each conversation has accessed.

Next sprint: Multi-granularity rate limiting and cost-based budgets. Protect against both attacks and expensive mistakes.

Within month: Comprehensive audit logging with structured output. Build dashboards for anomaly detection on authorization failures, rate limits, and context violations.

Test your security. Write integration tests that attempt context injection, rate limit bypass, and cross-tenant access. If your tests don't fail, your security isn't working.

The goal isn't perfect security—no production system achieves that. The goal is making attacks visible, limiting blast radius when they succeed, and maintaining complete audit trails for investigation.

MCP servers are the new attack surface in LLM systems. Build them like you're expecting adversarial load, because you are.