← Back to Home

Implementing MCP with LangGraph: A Practical Walkthrough

langgraph-integrationmcp-implementationagent-systems
#model-context-protocol#langgraph#context-management#agent-orchestration#production-patterns#context-versioning#state-machines#llm-systems#observability

The Problem: Context and Control Flow Are Tangled

You've built a LangGraph agent that works great in demos. Then you deploy it and discover your context management is a mess. Conversation history grows unbounded. You're fetching the same database results three times because you're storing raw data in graph state. Your agent spends more time shuffling context around than actually reasoning. Token costs are 3x what you projected.

The root cause: you're treating LangGraph state as both execution state and data storage. These are different concerns with different lifecycle requirements. Execution state needs to be small, fast to serialize, and easy to reason about. Data storage needs to be efficient, cacheable, and potentially huge.

Most teams discover this the hard way. They start with a simple graph that passes everything through state. The query goes in, results come out, everything seems fine. Then they add more nodes. More data sources. More complex workflows. State objects balloon to hundreds of KB. Serialization becomes a bottleneck. Debugging becomes impossible because state dumps are massive.

The typical "fix" makes things worse: they add caching logic inside nodes, build custom context managers, implement ad-hoc versioning schemes. Now they have three different ways context flows through the system, none of them consistent, all of them buggy.

Model Context Protocol offers a better abstraction, but most examples show MCP as a replacement for LangGraph, not as a complement. That's wrong. MCP handles context assembly and versioning. LangGraph handles execution flow and decision-making. They solve different problems and work better together.

The integration point is subtle and critical: MCP sits between LangGraph state and the actual data sources. It's not part of your graph—it's infrastructure your graph uses. Get this placement wrong and you've just added complexity without solving the underlying problem.

This article shows you how to integrate MCP with LangGraph correctly. Not theory—patterns I've shipped to production and maintained.

The Mental Model: MCP as Context Infrastructure, Not Graph Nodes

Stop thinking of MCP servers as tools your agent calls. They're not nodes in your graph. They're infrastructure that provides context to your graph's decision-making nodes.

The key distinction: LangGraph manages execution flow. MCP manages context availability.

A LangGraph node doesn't "call MCP to get data." It declares what context it needs, and MCP ensures that context is available when the node executes. This is dependency injection for agent systems.

Think about how you'd build a web service. You don't put database queries inside your HTTP handlers as inline strings. You inject a database client that handles connection pooling, retry logic, and caching. The handler just calls db.query() without caring about those details.

MCP works the same way. Your LangGraph nodes declare context dependencies. The MCP layer provides that context. The node doesn't know or care whether context came from a database, an API, or a cache. It just processes the data.

The critical placement: MCP sits before the Observe step in your agent loop.

Traditional agent patterns (ReAct, Plan-Execute) have explicit steps: Think, Act, Observe, Repeat. In LangGraph terms, these become nodes. The mistake is treating context fetching as Action nodes. It's not. Context fetching is infrastructure that happens before nodes execute.

Here's the right mental model:

  1. LangGraph node begins execution
  2. Node declares context requirements to MCP layer
  3. MCP assembles required context (in parallel, with caching)
  4. Context is injected into node execution
  5. Node processes and updates graph state
  6. Next node begins

Notice: MCP is not in the graph flow. It's a layer beneath it. Your graph state contains context references, not context data. When a node needs data, it resolves those references through MCP.

The versioning insight: context must be versioned independently from execution state.

Your graph state evolves as the agent works through a task. Context evolves independently as sources update. If you store raw context in graph state, you're coupling these lifecycles. When you checkpoint a graph for later resumption, you're serializing potentially gigabytes of context data. When context updates (database changed, API returned new data), you have stale context in state with no way to refresh.

MCP separates these concerns. Graph state stores context versions: "customer_data@v123". MCP resolves versions to actual data. If data changes, you get a new version. If you checkpoint the graph, you checkpoint version references, not data. When you resume, MCP fetches current data for those references.

This is how you build agents that handle long-running tasks, resume after interruption, and adapt to changing data without architectural contortions.

Architecture: LangGraph State Graph with MCP Context Layer

The architecture has three distinct layers that must remain separate.

Figure: Architecture: LangGraph State Graph with MCP Context Layer
Figure: Architecture: LangGraph State Graph with MCP Context Layer

LangGraph Execution Layer

This is your state machine. Nodes, edges, conditional routing, checkpoints. This layer knows nothing about where context comes from. It just declares requirements and receives results.

Responsibilities:

  • Execute workflow logic
  • Make routing decisions
  • Update execution state
  • Trigger checkpoints
  • Handle errors and retries

What it does NOT do:

  • Fetch data directly from databases or APIs
  • Cache results
  • Manage data versions
  • Handle connection pooling or rate limiting

MCP Context Layer

This provides context to nodes on demand. It handles the messy details of data fetching, caching, versioning, and failure recovery.

Responsibilities:

  • Resolve context references to actual data
  • Cache frequently accessed context
  • Version context for consistency
  • Handle MCP protocol communication
  • Aggregate context from multiple sources
  • Apply access control and tenant isolation

What it does NOT do:

  • Make workflow decisions
  • Update graph state directly
  • Know about node execution order

Graph State

This is the minimal data structure passed between nodes. It's small, fast to serialize, and contains references, not data.

What goes in graph state:

  • Current workflow step
  • Decision history
  • Context references (keys/versions, not actual data)
  • Error states
  • Checkpoint metadata

What does NOT go in graph state:

  • Raw database results
  • API responses
  • File contents
  • Anything larger than a few KB

The boundary is strict. If a node produces a large result, it stores that result in the MCP layer and puts a reference in state. The next node uses that reference to retrieve data through MCP.

Implementation: Building the Integration

Step 1: Define Graph State with Context References

Your graph state must contain references, not data.

code
from typing import TypedDict, List, Dict, Optional, Annotatedfrom dataclasses import dataclassfrom datetime import datetime@dataclassclass ContextReference:    """    A reference to context managed by MCP.    This is what goes in graph state.    """    key: str                    # Unique identifier    version: str                # Version hash    source: str                 # Which MCP server    expires_at: Optional[datetime] = None    metadata: Dict = None        def to_dict(self) -> Dict:        return {            "key": self.key,            "version": self.version,            "source": self.source,            "expires_at": self.expires_at.isoformat() if self.expires_at else None,            "metadata": self.metadata or {}        }class AgentState(TypedDict):    """    LangGraph state: small, serializable, reference-based.    """    # Workflow control    current_step: str    steps_completed: List[str]    decision_path: List[str]        # Context references (NOT actual data)    context_refs: Annotated[List[ContextReference], "Context references managed by MCP"]        # Execution metadata    user_id: str    session_id: str    started_at: datetime        # Results (small summaries, not full data)    summary: Optional[str]    final_response: Optional[str]        # Error tracking    errors: List[Dict]

Production considerations:

State must serialize quickly. If checkpointing takes >100ms, you're storing too much. Use references.

Context references include versions. When you resume from checkpoint, you know exactly which version of context the agent was using. You can decide whether to use the same version (consistency) or fetch latest (freshness).

Expiration times prevent stale context. If a reference expires, MCP forces a refresh. This handles cases where context changes frequently but you don't want to fetch on every access.

Step 2: Implement MCP Context Manager

This sits between your graph and MCP servers.

code
from typing import Any, Dict, List, Optionalimport asyncioimport hashlibfrom datetime import datetime, timedeltaclass MCPContextManager:    """    Manages context lifecycle independently of graph execution.    This is your MCP integration layer.    """        def __init__(self, mcp_clients: Dict[str, Any], cache_ttl: int = 300):        self.mcp_clients = mcp_clients        self.cache_ttl = cache_ttl                # In-memory cache for hot context        # In production: Redis or similar        self._cache: Dict[str, tuple[Any, datetime]] = {}                # Version tracking        self._versions: Dict[str, str] = {}        async def resolve_context(        self,        ref: ContextReference,        use_cache: bool = True    ) -> Any:        """        Resolve a context reference to actual data.        This is called by graph nodes, not by the graph itself.        """        cache_key = f"{ref.source}:{ref.key}:{ref.version}"                # Check cache first        if use_cache and cache_key in self._cache:            data, cached_at = self._cache[cache_key]            if datetime.utcnow() - cached_at < timedelta(seconds=self.cache_ttl):                return data                # Cache miss: fetch from MCP server        client = self.mcp_clients.get(ref.source)        if not client:            raise ValueError(f"Unknown MCP source: {ref.source}")                data = await client.fetch_resource(ref.key)                # Cache the result        self._cache[cache_key] = (data, datetime.utcnow())                return data        async def create_context_reference(        self,        source: str,        key: str,        ttl_seconds: Optional[int] = None    ) -> ContextReference:        """        Create a new context reference with versioning.        Called when a node produces context that should be available to later nodes.        """        # Fetch current data to compute version        client = self.mcp_clients[source]        data = await client.fetch_resource(key)                # Version is hash of content        version = self._compute_version(data)                # Store version mapping        version_key = f"{source}:{key}"        self._versions[version_key] = version                expires_at = None        if ttl_seconds:            expires_at = datetime.utcnow() + timedelta(seconds=ttl_seconds)                return ContextReference(            key=key,            version=version,            source=source,            expires_at=expires_at,            metadata={"created_at": datetime.utcnow().isoformat()}        )        async def resolve_multiple(        self,        refs: List[ContextReference],        parallel: bool = True    ) -> Dict[str, Any]:        """        Resolve multiple context references efficiently.        Parallel fetching is the default—MCP shines here.        """        if not parallel:            results = {}            for ref in refs:                results[ref.key] = await self.resolve_context(ref)            return results                # Parallel resolution        tasks = {ref.key: self.resolve_context(ref) for ref in refs}        results = await asyncio.gather(*tasks.values(), return_exceptions=True)                # Handle failures gracefully        return {            key: result if not isinstance(result, Exception) else None            for key, result in zip(tasks.keys(), results)        }        def _compute_version(self, data: Any) -> str:        """        Compute version hash from data.        This ensures version changes when content changes.        """        # Serialize data deterministically        import json        serialized = json.dumps(data, sort_keys=True)        return hashlib.sha256(serialized.encode()).hexdigest()[:16]        async def refresh_if_stale(self, ref: ContextReference) -> ContextReference:        """        Check if context version has changed and return updated reference.        This is how you detect context updates.        """        version_key = f"{ref.source}:{ref.key}"        current_version = self._versions.get(version_key)                if current_version != ref.version:            # Version changed, create new reference            return await self.create_context_reference(                ref.source,                ref.key,                ttl_seconds=300            )                return ref

Production considerations:

The cache is critical for performance. Fetching from MCP on every node execution kills latency. Cache hit rates should be >80% for stable workloads.

Versioning is content-based hashing. This automatically detects changes. Alternative: use timestamps or explicit version IDs if MCP servers provide them.

Parallel resolution is default. If a node needs five context references, fetch all five simultaneously. MCP is designed for this.

Error handling is graceful. If one context source fails, others still succeed. Nodes get partial context and can decide how to proceed.

Step 3: Integrate with LangGraph Nodes

Your nodes declare context needs, not how to fetch it.

code
from langgraph.graph import StateGraph, ENDclass MCPAwareLangGraph:    """    LangGraph integration with MCP context management.    """        def __init__(self, context_manager: MCPContextManager):        self.context_manager = context_manager        self.graph = self._build_graph()        def _build_graph(self) -> StateGraph:        """Build the state graph with MCP-aware nodes"""        workflow = StateGraph(AgentState)                # Define nodes        workflow.add_node("analyze", self._analyze_node)        workflow.add_node("fetch_data", self._fetch_data_node)        workflow.add_node("process", self._process_node)        workflow.add_node("synthesize", self._synthesize_node)                # Define edges        workflow.set_entry_point("analyze")        workflow.add_edge("analyze", "fetch_data")        workflow.add_edge("fetch_data", "process")        workflow.add_edge("process", "synthesize")        workflow.add_edge("synthesize", END)                return workflow.compile()        async def _analyze_node(self, state: AgentState) -> AgentState:        """        Analyze query and determine what context is needed.        This node DOES NOT fetch data—it creates references.        """        # Determine required context sources        # This is fast business logic, not data fetching        required_sources = self._determine_required_sources(state)                # Create context references (doesn't fetch yet)        context_refs = []        for source, key in required_sources:            ref = await self.context_manager.create_context_reference(                source=source,                key=key,                ttl_seconds=300            )            context_refs.append(ref)                return {            **state,            "current_step": "analyze",            "steps_completed": state.get("steps_completed", []) + ["analyze"],            "context_refs": context_refs        }        async def _fetch_data_node(self, state: AgentState) -> AgentState:        """        Resolve context references to actual data.        This is where MCP actually fetches.        """        refs = state.get("context_refs", [])                # Parallel fetch through MCP        context_data = await self.context_manager.resolve_multiple(            refs,            parallel=True        )                # DON'T put data in state        # Instead, store it in MCP and update references                # For nodes that need the data, they'll resolve refs themselves        # State just tracks that fetching is complete                return {            **state,            "current_step": "fetch_data",            "steps_completed": state.get("steps_completed", []) + ["fetch_data"]        }        async def _process_node(self, state: AgentState) -> AgentState:        """        Process context data.        This node resolves references as needed.        """        refs = state.get("context_refs", [])                # Resolve only the context we need for this node        # Other nodes might need different context        needed_refs = [r for r in refs if r.source in ["database", "api"]]        context_data = await self.context_manager.resolve_multiple(needed_refs)                # Do actual processing        # This is where your business logic goes        processed_result = self._process_business_logic(context_data)                # Store large result back in MCP        result_ref = await self._store_result_in_mcp(processed_result)                # Update state with reference to result        updated_refs = state.get("context_refs", []) + [result_ref]                return {            **state,            "current_step": "process",            "steps_completed": state.get("steps_completed", []) + ["process"],            "context_refs": updated_refs        }        async def _synthesize_node(self, state: AgentState) -> AgentState:        """        Final synthesis using all available context.        """        refs = state.get("context_refs", [])                # Resolve all context for final synthesis        all_context = await self.context_manager.resolve_multiple(refs)                # Generate final response        # In production: call your LLM here with assembled context        final_response = self._generate_final_response(all_context, state)                return {            **state,            "current_step": "synthesize",            "steps_completed": state.get("steps_completed", []) + ["synthesize"],            "final_response": final_response        }        async def _store_result_in_mcp(self, result: Any) -> ContextReference:        """        Store large results in MCP instead of state.        """        # Generate unique key for this result        result_key = f"result_{datetime.utcnow().timestamp()}"                # Store in MCP (implementation depends on your MCP server)        # For now: add to cache        version = self.context_manager._compute_version(result)        cache_key = f"results:{result_key}:{version}"        self.context_manager._cache[cache_key] = (result, datetime.utcnow())                return ContextReference(            key=result_key,            version=version,            source="results",            metadata={"type": "processed_result"}        )

Production considerations:

Nodes never store data directly in state. They create references and store data in the MCP layer. This keeps state small and checkpoints fast.

Context resolution is lazy. If a node doesn't need certain context, it doesn't resolve those references. This prevents unnecessary fetching.

Results from one node become context for later nodes. Store large results through MCP, not in state. Next node gets a reference and resolves when needed.

Parallel context resolution happens automatically. The context manager handles this. Nodes just declare what they need.

Step 4: Context Versioning and Consistency

Handle context updates gracefully.

code
class ContextVersionManager:    """    Manages context versioning for consistency and freshness trade-offs.    """        def __init__(self, context_manager: MCPContextManager):        self.context_manager = context_manager        async def ensure_consistency(        self,        state: AgentState,        strategy: str = "snapshot"    ) -> AgentState:        """        Ensure context consistency across workflow execution.                Strategies:        - snapshot: Use versions from start, ignore updates        - latest: Always use latest versions        - hybrid: Use snapshot for critical context, latest for auxiliary        """        refs = state.get("context_refs", [])                if strategy == "snapshot":            # Keep original versions, don't refresh            return state                elif strategy == "latest":            # Refresh all context to latest versions            updated_refs = []            for ref in refs:                updated_ref = await self.context_manager.refresh_if_stale(ref)                updated_refs.append(updated_ref)                        return {**state, "context_refs": updated_refs}                elif strategy == "hybrid":            # Refresh only non-critical context            critical_sources = {"database", "api"}            updated_refs = []                        for ref in refs:                if ref.source in critical_sources:                    # Keep original version for consistency                    updated_refs.append(ref)                else:                    # Use latest for auxiliary context                    updated_ref = await self.context_manager.refresh_if_stale(ref)                    updated_refs.append(updated_ref)                        return {**state, "context_refs": updated_refs}                return state        async def detect_context_drift(        self,        old_state: AgentState,        new_state: AgentState    ) -> List[Dict]:        """        Detect which context has changed between states.        Useful for debugging and observability.        """        old_refs = {r.key: r for r in old_state.get("context_refs", [])}        new_refs = {r.key: r for r in new_state.get("context_refs", [])}                changes = []                for key, new_ref in new_refs.items():            old_ref = old_refs.get(key)                        if not old_ref:                changes.append({                    "key": key,                    "change": "added",                    "new_version": new_ref.version                })            elif old_ref.version != new_ref.version:                changes.append({                    "key": key,                    "change": "updated",                    "old_version": old_ref.version,                    "new_version": new_ref.version                })                return changes

Production considerations:

Consistency strategy depends on your use case. Financial calculations need snapshot consistency. News summaries need latest data. Choose explicitly.

Version drift detection is critical for debugging. When a workflow behaves differently after resuming from checkpoint, check if context versions changed.

Timestamp-based TTLs force refreshes. Balance freshness against consistency. 5-minute TTL works for most cases.

Pitfalls & Failure Modes

Storing Data in State Instead of References

The most common mistake. Teams put raw database results, API responses, or file contents in state.

Symptom: Checkpoint serialization takes seconds. State dumps are megabytes. Graph execution slows down as state grows.

Why it happens: It's the easiest thing to do. Fetching data and immediately storing it in state requires no extra architecture.

Detection: Monitor state size per node execution. Alert when state exceeds 100KB. Measure checkpoint latency.

Prevention: Strict rule: if data is >10KB, it goes in MCP with a reference in state. No exceptions.

MCP Server Calls Inside Nodes

Teams treat MCP like tools and call MCP servers directly from node logic.

Symptom: Nodes become slow. Parallel execution opportunities are missed. Retry logic gets duplicated across nodes.

Why it happens: Examples show MCP as tool-calling patterns. Teams copy this into nodes.

Detection: Profile node execution time. If >50% is spent in I/O, calls are in the wrong place.

Prevention: Context resolution happens outside nodes. Nodes receive resolved context as parameters, not fetch it themselves.

Ignoring Context Versioning

Teams use MCP without tracking versions, then wonder why resumed workflows behave inconsistently.

Symptom: Workflow produces different results when resumed from checkpoint. Debugging is impossible because you can't tell if context changed.

Why it happens: Versioning seems like extra complexity. Teams skip it initially and never add it back.

Detection: Track how often resumed workflows fail or produce unexpected results. Compare context at start vs. resume point.

Prevention: Version tracking from day one. It's easier to ignore versions when you have them than to add versioning retroactively.

Over-Caching Context

Teams cache aggressively to improve performance, then serve stale data.

Symptom: Users report seeing old data. Workflows make decisions based on outdated information. Cache invalidation bugs proliferate.

Why it happens: Performance pressure drives aggressive caching without considering freshness requirements.

Detection: Monitor cache TTLs and hit rates. Track incidents caused by stale context. User reports of incorrect data.

Prevention: Explicit TTL policies per context type. Critical data gets short TTLs (30s). Stable data gets long TTLs (30m). Never cache indefinitely.

Not Handling Partial Context Failures

One MCP server fails, entire workflow stops, even though partial context was sufficient.

Symptom: Workflows fail completely when non-critical context sources are unavailable. Error rates spike during partial outages.

Why it happens: Teams treat all context as equally critical and fail-fast on any error.

Detection: Analyze failure patterns. Check if workflows fail when only auxiliary context is unavailable.

Prevention: Classify context as critical vs. auxiliary. Critical failures stop workflow. Auxiliary failures log warnings but continue with degraded context.

Summary & Next Steps

Integrating MCP with LangGraph correctly requires treating them as complementary layers, not overlapping tools. LangGraph manages execution flow with small, reference-based state. MCP manages context lifecycle with versioning, caching, and parallel resolution. Keep these concerns separated and your agents will be faster, more maintainable, and easier to debug.

The key insights: state contains references, not data. Context resolution happens outside nodes through the MCP layer. Versions track content changes independently from execution state. Parallel resolution is default because MCP is designed for it.

Start with these implementations:

This week: Refactor one LangGraph workflow to use context references instead of storing data in state. Measure state size before and after. Target: 90% reduction.

Next sprint: Build an MCP context manager with basic caching and version tracking. Integrate with your refactored workflow. Measure cache hit rates and context resolution latency.

Within month: Implement consistency strategies for your specific use cases. Add observability for context drift. Build dashboards showing version changes, cache performance, and resolution latency.

Test with long-running workflows. Checkpoint midway, modify context sources, resume workflow. Verify behavior matches expectations. This is where version tracking proves its value.

The goal isn't perfect context management—it's separating concerns so each layer does what it's good at. LangGraph for control flow, MCP for context. Keep them separate and both become easier to reason about, debug, and scale.