← Back to Home

Asynchronous Processing and Message Queues in Agentic AI Systems

agentic-aiai_mlai-engineering
#agent-based-ai-systems-architecture#agentic-ai-architecture#async-workers-and-job-queues#asynchronous-processing-in-agentic-ai#asynchronous-queues-for-ai-agents#asynchronous-workflows-in-ai-agents#background-task-queues#concurrent-agent-task-handling#decoupled-agent-execution#distributed-agentic-ai-architecture

1. Introduction

Modern agentic AI systems behave less like monolithic LLM applications and more like distributed, autonomous workers making decisions, invoking tools, coordinating tasks, and reacting to events. This autonomy introduces unpredictable timing, variable workloads, and long-running operations—all of which traditional synchronous architectures struggle to handle.

Modern agentic AI system architecture showing 6 autonomous agents coordinated through a central hub
Modern agentic AI system architecture showing 6 autonomous agents coordinated through a central hub

Figure 1: Modern Agentic AI Systems

Asynchronous processing and message queues solve these problems elegantly. They allow agentic AI systems to scale, stay responsive, and coordinate multiple agents working in parallel. Let’s break down how they do this.

2. Core Architectural Roles of Async & Queues

2.1 Handling Long-Running Agent Operations

Agentic AI workflows often include:

  • multiple LLM calls

  • tool invocation chains

  • web scraping

  • data extraction

  • reasoning loops

  • reflection cycles

These tasks can take anywhere from a few seconds to several minutes.

If executed synchronously:

  • user requests block

  • system threads get stuck

  • timeouts become common

  • overall throughput collapses

Async + Queues Fix This

The main thread:

  • accepts the request

  • places it in a queue

  • immediately responds with a task ID

Meanwhile, workers execute the long-running agent task independently.

Sequence diagram showing asynchronous agent workflow with user, API, queue, worker, and LLM components
Sequence diagram showing asynchronous agent workflow with user, API, queue, worker, and LLM components

Figure 2: Diagram — Long-running agent tasks using async workers

2.2 Basic Async Agent Task with Celery

Key Features Demonstrated:

  • ✅ Non-blocking task submission

  • ✅ Progress tracking with state updates

  • ✅ Automatic retry logic with exponential backoff

  • ✅ Timeout protection

  • ✅ RESTful API integration

  • ✅ Task result retrieval

code
# tasks.py - Define async agent tasksfrom celery import Celeryfrom typing import Dict, Anyimport timefrom openai import OpenAI<div></div># Initialize Celery with Redis as brokerapp = Celery('agentic_tasks',              broker='redis://localhost:6379/0',             backend='redis://localhost:6379/0')<div></div># Configure task settingsapp.conf.update(    task_serializer='json',    accept_content=['json'],    result_serializer='json',    timezone='UTC',    enable_utc=True,    task_track_started=True,    task_time_limit=3600,  # 1 hour max    task_soft_time_limit=3300,  # Warning at 55 minutes)<div></div>@app.task(bind=True, max_retries=3)def execute_agent_workflow(self, query: str, context: Dict[str, Any]) -> Dict[str, Any]:    """    Execute a long-running agent workflow asynchronously.        Args:        query: User's query or task        context: Additional context for the agent            Returns:        Dict containing agent's response and metadata    """    try:        # Update task state to indicate progress        self.update_state(            state='PROCESSING',            meta={'step': 'initializing', 'progress': 10}        )                # Initialize LLM client        client = OpenAI()                # Step 1: Initial reasoning        self.update_state(            state='PROCESSING',            meta={'step': 'reasoning', 'progress': 25}        )                reasoning_response = client.chat.completions.create(            model="gpt-4",            messages=[                {"role": "system", "content": "You are a helpful research agent."},                {"role": "user", "content": f"Analyze this query: {query}"}            ],            timeout=60        )                # Step 2: Tool invocation (simulated)        self.update_state(            state='PROCESSING',            meta={'step': 'tool_execution', 'progress': 50}        )                # Simulate web scraping or API calls        time.sleep(2)        tool_results = {"data": "scraped content"}                # Step 3: Final synthesis        self.update_state(            state='PROCESSING',            meta={'step': 'synthesis', 'progress': 75}        )                final_response = client.chat.completions.create(            model="gpt-4",            messages=[                {"role": "system", "content": "Synthesize the findings."},                {"role": "user", "content": f"Results: {tool_results}"}            ],            timeout=60        )                # Complete        self.update_state(            state='SUCCESS',            meta={'step': 'complete', 'progress': 100}        )                return {            'status': 'success',            'result': final_response.choices[0].message.content,            'metadata': {                'reasoning': reasoning_response.choices[0].message.content,                'tools_used': ['web_search', 'scraper']            }        }            except Exception as exc:        # Retry with exponential backoff        self.update_state(            state='FAILURE',            meta={'error': str(exc)}        )        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
code
# tasks.py - Define async agent tasksfrom celery import Celeryfrom typing import Dict, Anyimport timefrom openai import OpenAI# Initialize Celery with Redis as brokerapp = Celery('agentic_tasks',              broker='redis://localhost:6379/0',             backend='redis://localhost:6379/0')# Configure task settingsapp.conf.update(    task_serializer='json',    accept_content=['json'],    result_serializer='json',    timezone='UTC',    enable_utc=True,    task_track_started=True,    task_time_limit=3600,  # 1 hour max    task_soft_time_limit=3300,  # Warning at 55 minutes)@app.task(bind=True, max_retries=3)def execute_agent_workflow(self, query: str, context: Dict[str, Any]) -> Dict[str, Any]:    """    Execute a long-running agent workflow asynchronously.        Args:        query: User's query or task        context: Additional context for the agent            Returns:        Dict containing agent's response and metadata    """    try:        # Update task state to indicate progress        self.update_state(            state='PROCESSING',            meta={'step': 'initializing', 'progress': 10}        )                # Initialize LLM client        client = OpenAI()                # Step 1: Initial reasoning        self.update_state(            state='PROCESSING',            meta={'step': 'reasoning', 'progress': 25}        )                reasoning_response = client.chat.completions.create(            model="gpt-4",            messages=[                {"role": "system", "content": "You are a helpful research agent."},                {"role": "user", "content": f"Analyze this query: {query}"}            ],            timeout=60        )                # Step 2: Tool invocation (simulated)        self.update_state(            state='PROCESSING',            meta={'step': 'tool_execution', 'progress': 50}        )                # Simulate web scraping or API calls        time.sleep(2)        tool_results = {"data": "scraped content"}                # Step 3: Final synthesis        self.update_state(            state='PROCESSING',            meta={'step': 'synthesis', 'progress': 75}        )                final_response = client.chat.completions.create(            model="gpt-4",            messages=[                {"role": "system", "content": "Synthesize the findings."},                {"role": "user", "content": f"Results: {tool_results}"}            ],            timeout=60        )                # Complete        self.update_state(            state='SUCCESS',            meta={'step': 'complete', 'progress': 100}        )                return {            'status': 'success',            'result': final_response.choices[0].message.content,            'metadata': {                'reasoning': reasoning_response.choices[0].message.content,                'tools_used': ['web_search', 'scraper']            }        }            except Exception as exc:        # Retry with exponential backoff        self.update_state(            state='FAILURE',            meta={'error': str(exc)}        )        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
code
# API endpoint integration# api.pyfrom fastapi import FastAPI, BackgroundTasksfrom pydantic import BaseModelfrom celery.result import AsyncResult<div></div>app_api = FastAPI()<div></div>class AgentRequest(BaseModel):    query: str    context: dict = {}<div></div>class TaskResponse(BaseModel):    task_id: str    status: str    message: str<div></div>@app_api.post("/agent/execute", response_model=TaskResponse)async def execute_agent(request: AgentRequest):    """    Submit agent task to queue and return immediately.    """    # Enqueue the task    task = execute_agent_workflow.delay(        query=request.query,        context=request.context    )        return TaskResponse(        task_id=task.id,        status="queued",        message=f"Task submitted. Check status at /agent/status/{task.id}"    )<div></div>@app_api.get("/agent/status/{task_id}")async def get_agent_status(task_id: str):    """    Check the status of a running agent task.    """    task_result = AsyncResult(task_id, app=app)        if task_result.state == 'PENDING':        response = {            'task_id': task_id,            'state': task_result.state,            'status': 'Task is waiting in queue...'        }    elif task_result.state == 'PROCESSING':        response = {            'task_id': task_id,            'state': task_result.state,            'progress': task_result.info.get('progress', 0),            'current_step': task_result.info.get('step', 'unknown')        }    elif task_result.state == 'SUCCESS':        response = {            'task_id': task_id,            'state': task_result.state,            'result': task_result.result        }    else:        # Something went wrong        response = {            'task_id': task_id,            'state': task_result.state,            'error': str(task_result.info)        }        return response<div></div>@app_api.get("/agent/result/{task_id}")async def get_agent_result(task_id: str):    """    Retrieve the final result of a completed agent task.    """    task_result = AsyncResult(task_id, app=app)        if not task_result.ready():        return {            'task_id': task_id,            'status': 'not_ready',            'message': 'Task is still processing'        }        return {        'task_id': task_id,        'status': 'complete',        'result': task_result.get()    }
code
# API endpoint integration# api.pyfrom fastapi import FastAPI, BackgroundTasksfrom pydantic import BaseModelfrom celery.result import AsyncResultapp_api = FastAPI()class AgentRequest(BaseModel):    query: str    context: dict = {}class TaskResponse(BaseModel):    task_id: str    status: str    message: str@app_api.post("/agent/execute", response_model=TaskResponse)async def execute_agent(request: AgentRequest):    """    Submit agent task to queue and return immediately.    """    # Enqueue the task    task = execute_agent_workflow.delay(        query=request.query,        context=request.context    )        return TaskResponse(        task_id=task.id,        status="queued",        message=f"Task submitted. Check status at /agent/status/{task.id}"    )@app_api.get("/agent/status/{task_id}")async def get_agent_status(task_id: str):    """    Check the status of a running agent task.    """    task_result = AsyncResult(task_id, app=app)        if task_result.state == 'PENDING':        response = {            'task_id': task_id,            'state': task_result.state,            'status': 'Task is waiting in queue...'        }    elif task_result.state == 'PROCESSING':        response = {            'task_id': task_id,            'state': task_result.state,            'progress': task_result.info.get('progress', 0),            'current_step': task_result.info.get('step', 'unknown')        }    elif task_result.state == 'SUCCESS':        response = {            'task_id': task_id,            'state': task_result.state,            'result': task_result.result        }    else:        # Something went wrong        response = {            'task_id': task_id,            'state': task_result.state,            'error': str(task_result.info)        }        return response@app_api.get("/agent/result/{task_id}")async def get_agent_result(task_id: str):    """    Retrieve the final result of a completed agent task.    """    task_result = AsyncResult(task_id, app=app)        if not task_result.ready():        return {            'task_id': task_id,            'status': 'not_ready',            'message': 'Task is still processing'        }        return {        'task_id': task_id,        'status': 'complete',        'result': task_result.get()    }

2.3 Managing Concurrent Multi-Agent Behavior

In agentic ecosystems, you often have many agents working at once:

  • Research agent

  • Scraper agent

  • Reviewer agent

  • Planner agent

  • Tool agent

Without queues, simultaneous operations could overwhelm:

  • LLM API rate limits

  • vector database

  • external APIs

  • CPU-bound local inference

Queues allow:

  • throttling

  • prioritization

  • buffering

  • safe parallel execution

Architecture diagram showing three agents (Reviewer, Scraper, Research) connected to a central queue that distributes work to three workers
Architecture diagram showing three agents (Reviewer, Scraper, Research) connected to a central queue that distributes work to three workers

Figure 3: Diagram — Multi-agent system coordinated via queues

Workers share the load instead of agents fighting for resources.

2.4 Multi-Agent Coordination with Dedicated Queues

Key Features Demonstrated:

  • ✅ Dedicated queues per agent type

  • ✅ Rate limiting for external API calls

  • ✅ Parallel execution with group()

  • ✅ Sequential workflows with chain()

  • ✅ Result aggregation with chord()

  • ✅ Automatic load balancing across workers

code
# multi_agent_system.pyfrom celery import Celery, group, chain, chordfrom typing import List, Dict, Anyimport logging<div></div>logger = logging.getLogger(__name__)<div></div>app = Celery('multi_agent')<div></div># Configure multiple queues for different agent typesapp.conf.task_routes = {    'agents.research.*': {'queue': 'research'},    'agents.scraper.*': {'queue': 'scraper'},    'agents.reviewer.*': {'queue': 'reviewer'},    'agents.planner.*': {'queue': 'planner'},    'agents.tool.*': {'queue': 'tools'},}<div></div># Configure rate limits per queueapp.conf.task_annotations = {    'agents.scraper.*': {'rate_limit': '10/m'},  # 10 per minute    'agents.tool.api_call': {'rate_limit': '30/m'},  # Respect API limits}<div></div># Research Agent@app.task(queue='research', bind=True)def research_agent(self, topic: str) -> Dict[str, Any]:    """    Research agent: Gathers information on a topic.    """    logger.info(f"Research agent processing: {topic}")        try:        # Simulate research (replace with actual LLM call)        import time        time.sleep(2)                findings = {            'topic': topic,            'sources': ['source1.com', 'source2.com'],            'summary': f'Research findings for {topic}'        }                return {            'agent': 'research',            'status': 'success',            'data': findings        }    except Exception as e:        logger.error(f"Research agent failed: {e}")        raise<div></div># Scraper Agent@app.task(queue='scraper', bind=True, max_retries=5)def scraper_agent(self, urls: List[str]) -> Dict[str, Any]:    """    Scraper agent: Extracts content from URLs.    """    logger.info(f"Scraper agent processing {len(urls)} URLs")        try:        scraped_data = []        for url in urls:            # Simulate scraping (replace with actual scraping logic)            content = f"Content from {url}"            scraped_data.append({'url': url, 'content': content})                return {            'agent': 'scraper',            'status': 'success',            'data': scraped_data        }    except Exception as exc:        # Retry with exponential backoff        raise self.retry(exc=exc, countdown=30 * (2 ** self.request.retries))<div></div># Reviewer Agent@app.task(queue='reviewer', bind=True)def reviewer_agent(self, content: Dict[str, Any]) -> Dict[str, Any]:    """    Reviewer agent: Validates and scores content quality.    """    logger.info("Reviewer agent processing content")        try:        # Simulate review (replace with actual LLM evaluation)        quality_score = 0.85        issues = []                return {            'agent': 'reviewer',            'status': 'success',            'data': {                'quality_score': quality_score,                'issues': issues,                'approved': quality_score > 0.7            }        }    except Exception as e:        logger.error(f"Reviewer agent failed: {e}")        raise<div></div># Planner Agent@app.task(queue='planner', bind=True)def planner_agent(self, goal: str, available_agents: List[str]) -> Dict[str, Any]:    """    Planner agent: Creates execution plan for multi-agent workflow.    """    logger.info(f"Planner agent creating plan for: {goal}")        try:        # Create execution plan        plan = {            'goal': goal,            'steps': [                {'agent': 'research', 'action': 'gather_info'},                {'agent': 'scraper', 'action': 'extract_data'},                {'agent': 'reviewer', 'action': 'validate'},            ]        }                return {            'agent': 'planner',            'status': 'success',            'data': plan        }    except Exception as e:        logger.error(f"Planner agent failed: {e}")        raise<div></div># Tool Agent@app.task(queue='tools', bind=True, rate_limit='30/m')def tool_agent_api_call(self, endpoint: str, params: Dict) -> Dict[str, Any]:    """    Tool agent: Makes external API calls with rate limiting.    """    logger.info(f"Tool agent calling: {endpoint}")        try:        # Simulate API call (replace with actual API client)        import requests        response = requests.get(endpoint, params=params, timeout=10)                return {            'agent': 'tool',            'status': 'success',            'data': response.json()        }    except Exception as exc:        raise self.retry(exc=exc, countdown=60)<div></div># Orchestration: Coordinating Multiple Agents@app.taskdef orchestrate_multi_agent_workflow(query: str) -> Dict[str, Any]:    """    Orchestrate a complex workflow involving multiple agents.        Execution pattern:    1. Planner creates the plan    2. Research and Scraper work in parallel    3. Reviewer validates the combined results    """    logger.info(f"Orchestrating workflow for query: {query}")        # Step 1: Create plan    plan_task = planner_agent.s(        goal=query,        available_agents=['research', 'scraper', 'reviewer']    )        # Step 2: Execute research and scraping in parallel    parallel_tasks = group(        research_agent.s(topic=query),        scraper_agent.s(urls=['http://example.com/1', 'http://example.com/2'])    )        # Step 3: Review results after parallel execution completes    review_task = reviewer_agent.s()        # Chain the workflow: plan -> parallel execution -> review    workflow = chain(        plan_task,        parallel_tasks,        review_task    )        # Execute asynchronously    result = workflow.apply_async()        return {        'workflow_id': result.id,        'status': 'submitted',        'message': 'Multi-agent workflow initiated'    }<div></div># Advanced: Chord pattern for aggregation@app.taskdef aggregate_agent_results(results: List[Dict[str, Any]]) -> Dict[str, Any]:    """    Aggregate results from multiple agents.    Called after all parallel tasks complete.    """    logger.info("Aggregating results from multiple agents")        aggregated = {        'total_agents': len(results),        'successful': sum(1 for r in results if r.get('status') == 'success'),        'combined_data': [r.get('data') for r in results],        'timestamp': time.time()    }        return aggregated<div></div>@app.taskdef complex_multi_agent_workflow(query: str) -> str:    """    Advanced workflow using chord pattern for parallel execution + aggregation.    """    # Create a chord: parallel tasks + callback    workflow = chord(        group(            research_agent.s(topic=query),            scraper_agent.s(urls=['http://example.com']),            tool_agent_api_call.s(endpoint='http://api.example.com', params={})        )    )(aggregate_agent_results.s())        return workflow.id
code
# multi_agent_system.pyfrom celery import Celery, group, chain, chordfrom typing import List, Dict, Anyimport logginglogger = logging.getLogger(__name__)app = Celery('multi_agent')# Configure multiple queues for different agent typesapp.conf.task_routes = {    'agents.research.*': {'queue': 'research'},    'agents.scraper.*': {'queue': 'scraper'},    'agents.reviewer.*': {'queue': 'reviewer'},    'agents.planner.*': {'queue': 'planner'},    'agents.tool.*': {'queue': 'tools'},}# Configure rate limits per queueapp.conf.task_annotations = {    'agents.scraper.*': {'rate_limit': '10/m'},  # 10 per minute    'agents.tool.api_call': {'rate_limit': '30/m'},  # Respect API limits}# Research Agent@app.task(queue='research', bind=True)def research_agent(self, topic: str) -> Dict[str, Any]:    """    Research agent: Gathers information on a topic.    """    logger.info(f"Research agent processing: {topic}")        try:        # Simulate research (replace with actual LLM call)        import time        time.sleep(2)                findings = {            'topic': topic,            'sources': ['source1.com', 'source2.com'],            'summary': f'Research findings for {topic}'        }                return {            'agent': 'research',            'status': 'success',            'data': findings        }    except Exception as e:        logger.error(f"Research agent failed: {e}")        raise# Scraper Agent@app.task(queue='scraper', bind=True, max_retries=5)def scraper_agent(self, urls: List[str]) -> Dict[str, Any]:    """    Scraper agent: Extracts content from URLs.    """    logger.info(f"Scraper agent processing {len(urls)} URLs")        try:        scraped_data = []        for url in urls:            # Simulate scraping (replace with actual scraping logic)            content = f"Content from {url}"            scraped_data.append({'url': url, 'content': content})                return {            'agent': 'scraper',            'status': 'success',            'data': scraped_data        }    except Exception as exc:        # Retry with exponential backoff        raise self.retry(exc=exc, countdown=30 * (2 ** self.request.retries))# Reviewer Agent@app.task(queue='reviewer', bind=True)def reviewer_agent(self, content: Dict[str, Any]) -> Dict[str, Any]:    """    Reviewer agent: Validates and scores content quality.    """    logger.info("Reviewer agent processing content")        try:        # Simulate review (replace with actual LLM evaluation)        quality_score = 0.85        issues = []                return {            'agent': 'reviewer',            'status': 'success',            'data': {                'quality_score': quality_score,                'issues': issues,                'approved': quality_score > 0.7            }        }    except Exception as e:        logger.error(f"Reviewer agent failed: {e}")        raise# Planner Agent@app.task(queue='planner', bind=True)def planner_agent(self, goal: str, available_agents: List[str]) -> Dict[str, Any]:    """    Planner agent: Creates execution plan for multi-agent workflow.    """    logger.info(f"Planner agent creating plan for: {goal}")        try:        # Create execution plan        plan = {            'goal': goal,            'steps': [                {'agent': 'research', 'action': 'gather_info'},                {'agent': 'scraper', 'action': 'extract_data'},                {'agent': 'reviewer', 'action': 'validate'},            ]        }                return {            'agent': 'planner',            'status': 'success',            'data': plan        }    except Exception as e:        logger.error(f"Planner agent failed: {e}")        raise# Tool Agent@app.task(queue='tools', bind=True, rate_limit='30/m')def tool_agent_api_call(self, endpoint: str, params: Dict) -> Dict[str, Any]:    """    Tool agent: Makes external API calls with rate limiting.    """    logger.info(f"Tool agent calling: {endpoint}")        try:        # Simulate API call (replace with actual API client)        import requests        response = requests.get(endpoint, params=params, timeout=10)                return {            'agent': 'tool',            'status': 'success',            'data': response.json()        }    except Exception as exc:        raise self.retry(exc=exc, countdown=60)# Orchestration: Coordinating Multiple Agents@app.taskdef orchestrate_multi_agent_workflow(query: str) -> Dict[str, Any]:    """    Orchestrate a complex workflow involving multiple agents.        Execution pattern:    1. Planner creates the plan    2. Research and Scraper work in parallel    3. Reviewer validates the combined results    """    logger.info(f"Orchestrating workflow for query: {query}")        # Step 1: Create plan    plan_task = planner_agent.s(        goal=query,        available_agents=['research', 'scraper', 'reviewer']    )        # Step 2: Execute research and scraping in parallel    parallel_tasks = group(        research_agent.s(topic=query),        scraper_agent.s(urls=['http://example.com/1', 'http://example.com/2'])    )        # Step 3: Review results after parallel execution completes    review_task = reviewer_agent.s()        # Chain the workflow: plan -> parallel execution -> review    workflow = chain(        plan_task,        parallel_tasks,        review_task    )        # Execute asynchronously    result = workflow.apply_async()        return {        'workflow_id': result.id,        'status': 'submitted',        'message': 'Multi-agent workflow initiated'    }# Advanced: Chord pattern for aggregation@app.taskdef aggregate_agent_results(results: List[Dict[str, Any]]) -> Dict[str, Any]:    """    Aggregate results from multiple agents.    Called after all parallel tasks complete.    """    logger.info("Aggregating results from multiple agents")        aggregated = {        'total_agents': len(results),        'successful': sum(1 for r in results if r.get('status') == 'success'),        'combined_data': [r.get('data') for r in results],        'timestamp': time.time()    }        return aggregated@app.taskdef complex_multi_agent_workflow(query: str) -> str:    """    Advanced workflow using chord pattern for parallel execution + aggregation.    """    # Create a chord: parallel tasks + callback    workflow = chord(        group(            research_agent.s(topic=query),            scraper_agent.s(urls=['http://example.com']),            tool_agent_api_call.s(endpoint='http://api.example.com', params={})        )    )(aggregate_agent_results.s())        return workflow.id

Starting Workers for Different Queues:

code
# Terminal 1: Research queue workercelery -A multi_agent_system worker -Q research -n research_worker@%h -c 2<div></div># Terminal 2: Scraper queue worker (more concurrency for I/O)celery -A multi_agent_system worker -Q scraper -n scraper_worker@%h -c 5<div></div># Terminal 3: Reviewer queue workercelery -A multi_agent_system worker -Q reviewer -n reviewer_worker@%h -c 2<div></div># Terminal 4: Planner queue workercelery -A multi_agent_system worker -Q planner -n planner_worker@%h -c 1<div></div># Terminal 5: Tool queue worker (with rate limiting)celery -A multi_agent_system worker -Q tools -n tool_worker@%h -c 3<div></div># Or start all queues with one command (development only)celery -A multi_agent_system worker -Q research,scraper,reviewer,planner,tools -c 10
code
# Terminal 1: Research queue workercelery -A multi_agent_system worker -Q research -n research_worker@%h -c 2# Terminal 2: Scraper queue worker (more concurrency for I/O)celery -A multi_agent_system worker -Q scraper -n scraper_worker@%h -c 5# Terminal 3: Reviewer queue workercelery -A multi_agent_system worker -Q reviewer -n reviewer_worker@%h -c 2# Terminal 4: Planner queue workercelery -A multi_agent_system worker -Q planner -n planner_worker@%h -c 1# Terminal 5: Tool queue worker (with rate limiting)celery -A multi_agent_system worker -Q tools -n tool_worker@%h -c 3# Or start all queues with one command (development only)celery -A multi_agent_system worker -Q research,scraper,reviewer,planner,tools -c 10

2.5 Decoupling Application Logic from Agent Execution

Decoupling is essential for:

  • responsiveness

  • fault isolation

  • easier maintenance

  • retry logic

  • observability

A synchronous model ties the lifespan of the user request to the agent's operation. An async/queue architecture breaks that dependency.

Benefits:

  • The system can acknowledge user requests instantly.

  • Agent execution happens independently.

  • Failures do not crash the main application.

  • The same job can be retried, resumed, or distributed.

3. Practical Applications of Async & Queues in Agentic AI

3.1 Tool Execution Buffering

Agents make frequent tool calls:

  • DB queries

  • URL fetches

  • external API calls

  • scraping

  • long-running computations

Queues help:

  • enforce rate limits

  • batch similar requests

  • retry failures

  • distribute load across workers

3.2 Rate-Limited Tool Execution with Retry Logic

Key Features Demonstrated:

  • ✅ Rate limiting with Redis

  • ✅ Result caching to reduce redundant calls

  • ✅ Retry logic with exponential backoff

  • ✅ Batch processing for efficiency

  • ✅ Priority queues for critical tasks

  • ✅ Connection pooling and timeout handling

code
# tool_executor.pyfrom celery import Celeryfrom typing import Dict, Any, Optionalimport timeimport loggingfrom functools import wrapsfrom redis import Redisimport hashlib<div></div>logger = logging.getLogger(__name__)<div></div>app = Celery('tool_executor')<div></div># Redis for caching and rate limitingredis_client = Redis(host='localhost', port=6379, db=1, decode_responses=True)<div></div>def rate_limit(key_prefix: str, max_calls: int, time_window: int):    """    Decorator for rate limiting tool calls.        Args:        key_prefix: Redis key prefix for this rate limit        max_calls: Maximum number of calls allowed        time_window: Time window in seconds    """    def decorator(func):        @wraps(func)        def wrapper(*args, **kwargs):            # Create rate limit key            rate_key = f"rate_limit:{key_prefix}"                        # Check current count            current_count = redis_client.get(rate_key)                        if current_count and int(current_count) >= max_calls:                wait_time = redis_client.ttl(rate_key)                raise Exception(                    f"Rate limit exceeded. Try again in {wait_time} seconds"                )                        # Increment counter            pipe = redis_client.pipeline()            pipe.incr(rate_key)            pipe.expire(rate_key, time_window)            pipe.execute()                        # Execute function            return func(*args, **kwargs)        return wrapper    return decorator<div></div>def cache_result(ttl: int = 3600):    """    Decorator to cache tool results.        Args:        ttl: Time to live in seconds (default 1 hour)    """    def decorator(func):        @wraps(func)        def wrapper(*args, **kwargs):            # Create cache key from function name and arguments            cache_key = f"cache:{func.__name__}:{hashlib.md5(str(args).encode()).hexdigest()}"                        # Check cache            cached_result = redis_client.get(cache_key)            if cached_result:                logger.info(f"Cache hit for {func.__name__}")                import json                return json.loads(cached_result)                        # Execute function            result = func(*args, **kwargs)                        # Store in cache            import json            redis_client.setex(cache_key, ttl, json.dumps(result))            logger.info(f"Cached result for {func.__name__}")                        return result        return wrapper    return decorator<div></div>@app.task(bind=True, max_retries=3, default_retry_delay=60)@rate_limit(key_prefix="external_api", max_calls=30, time_window=60)@cache_result(ttl=1800)  # Cache for 30 minutesdef call_external_api(self, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]:    """    Call external API with rate limiting, caching, and retry logic.    """    logger.info(f"Calling external API: {endpoint}")        try:        import requests                response = requests.get(            endpoint,            params=params,            timeout=30,            headers={'User-Agent': 'AgenticAI/1.0'}        )                response.raise_for_status()                return {            'status': 'success',            'data': response.json(),            'cached': False        }            except requests.exceptions.RequestException as exc:        logger.error(f"API call failed: {exc}")                # Retry with exponential backoff        countdown = 60 * (2 ** self.request.retries)        raise self.retry(exc=exc, countdown=countdown)<div></div>@app.task(bind=True, max_retries=5)@rate_limit(key_prefix="web_scraping", max_calls=10, time_window=60)def scrape_url(self, url: str) -> Dict[str, Any]:    """    Scrape URL with rate limiting and retry on failure.    """    logger.info(f"Scraping: {url}")        try:        import requests        from bs4 import BeautifulSoup                response = requests.get(            url,            timeout=30,            headers={                'User-Agent': 'Mozilla/5.0 (compatible; AgenticBot/1.0)'            }        )                response.raise_for_status()                soup = BeautifulSoup(response.content, 'html.parser')                # Extract title and main content        title = soup.find('title').text if soup.find('title') else 'No title'                # Remove script and style elements        for script in soup(['script', 'style']):            script.decompose()                text_content = soup.get_text(separator=' ', strip=True)                return {            'status': 'success',            'url': url,            'title': title,            'content': text_content[:5000],  # Limit content size            'length': len(text_content)        }            except Exception as exc:        logger.error(f"Scraping failed for {url}: {exc}")                # Exponential backoff: 1min, 2min, 4min, 8min, 16min        countdown = 60 * (2 ** self.request.retries)        raise self.retry(exc=exc, countdown=countdown, max_retries=5)<div></div>@app.task(bind=True)@rate_limit(key_prefix="database_query", max_calls=100, time_window=60)def execute_database_query(self, query: str, params: Optional[Dict] = None) -> List[Dict]:    """    Execute database query with rate limiting.    """    logger.info("Executing database query")        try:        import psycopg2        from psycopg2.extras import RealDictCursor                conn = psycopg2.connect(            host='localhost',            database='agent_db',            user='agent_user',            password='secure_password',            connect_timeout=10        )                with conn.cursor(cursor_factory=RealDictCursor) as cursor:            cursor.execute(query, params or {})            results = cursor.fetchall()                conn.close()                return {            'status': 'success',            'count': len(results),            'data': results        }            except Exception as exc:        logger.error(f"Database query failed: {exc}")        raise self.retry(exc=exc, countdown=30)<div></div># Batch processing for efficiency@app.taskdef batch_api_calls(endpoints: List[str]) -> List[Dict[str, Any]]:    """    Process multiple API calls efficiently using Celery groups.    """    from celery import group        # Create a group of parallel API call tasks    job = group(        call_external_api.s(endpoint=endpoint, params={})        for endpoint in endpoints    )        # Execute all in parallel    result = job.apply_async()        # Wait for all to complete (or use result.get() in a callback)    return {        'batch_id': result.id,        'total_tasks': len(endpoints),        'status': 'processing'    }<div></div>@app.taskdef batch_url_scraping(urls: List[str], callback_task: Optional[str] = None) -> str:    """    Scrape multiple URLs with automatic batching and rate limiting.    """    from celery import chord, group        # Create scraping tasks    scrape_tasks = group(scrape_url.s(url) for url in urls)        if callback_task:        # Use chord for aggregation callback        workflow = chord(scrape_tasks)(callback_task)    else:        # Just execute in parallel        workflow = scrape_tasks.apply_async()        return workflow.id
code
# tool_executor.pyfrom celery import Celeryfrom typing import Dict, Any, Optionalimport timeimport loggingfrom functools import wrapsfrom redis import Redisimport hashliblogger = logging.getLogger(__name__)app = Celery('tool_executor')# Redis for caching and rate limitingredis_client = Redis(host='localhost', port=6379, db=1, decode_responses=True)def rate_limit(key_prefix: str, max_calls: int, time_window: int):    """    Decorator for rate limiting tool calls.        Args:        key_prefix: Redis key prefix for this rate limit        max_calls: Maximum number of calls allowed        time_window: Time window in seconds    """    def decorator(func):        @wraps(func)        def wrapper(*args, **kwargs):            # Create rate limit key            rate_key = f"rate_limit:{key_prefix}"                        # Check current count            current_count = redis_client.get(rate_key)                        if current_count and int(current_count) >= max_calls:                wait_time = redis_client.ttl(rate_key)                raise Exception(                    f"Rate limit exceeded. Try again in {wait_time} seconds"                )                        # Increment counter            pipe = redis_client.pipeline()            pipe.incr(rate_key)            pipe.expire(rate_key, time_window)            pipe.execute()                        # Execute function            return func(*args, **kwargs)        return wrapper    return decoratordef cache_result(ttl: int = 3600):    """    Decorator to cache tool results.        Args:        ttl: Time to live in seconds (default 1 hour)    """    def decorator(func):        @wraps(func)        def wrapper(*args, **kwargs):            # Create cache key from function name and arguments            cache_key = f"cache:{func.__name__}:{hashlib.md5(str(args).encode()).hexdigest()}"                        # Check cache            cached_result = redis_client.get(cache_key)            if cached_result:                logger.info(f"Cache hit for {func.__name__}")                import json                return json.loads(cached_result)                        # Execute function            result = func(*args, **kwargs)                        # Store in cache            import json            redis_client.setex(cache_key, ttl, json.dumps(result))            logger.info(f"Cached result for {func.__name__}")                        return result        return wrapper    return decorator@app.task(bind=True, max_retries=3, default_retry_delay=60)@rate_limit(key_prefix="external_api", max_calls=30, time_window=60)@cache_result(ttl=1800)  # Cache for 30 minutesdef call_external_api(self, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]:    """    Call external API with rate limiting, caching, and retry logic.    """    logger.info(f"Calling external API: {endpoint}")        try:        import requests                response = requests.get(            endpoint,            params=params,            timeout=30,            headers={'User-Agent': 'AgenticAI/1.0'}        )                response.raise_for_status()                return {            'status': 'success',            'data': response.json(),            'cached': False        }            except requests.exceptions.RequestException as exc:        logger.error(f"API call failed: {exc}")                # Retry with exponential backoff        countdown = 60 * (2 ** self.request.retries)        raise self.retry(exc=exc, countdown=countdown)@app.task(bind=True, max_retries=5)@rate_limit(key_prefix="web_scraping", max_calls=10, time_window=60)def scrape_url(self, url: str) -> Dict[str, Any]:    """    Scrape URL with rate limiting and retry on failure.    """    logger.info(f"Scraping: {url}")        try:        import requests        from bs4 import BeautifulSoup                response = requests.get(            url,            timeout=30,            headers={                'User-Agent': 'Mozilla/5.0 (compatible; AgenticBot/1.0)'            }        )                response.raise_for_status()                soup = BeautifulSoup(response.content, 'html.parser')                # Extract title and main content        title = soup.find('title').text if soup.find('title') else 'No title'                # Remove script and style elements        for script in soup(['script', 'style']):            script.decompose()                text_content = soup.get_text(separator=' ', strip=True)                return {            'status': 'success',            'url': url,            'title': title,            'content': text_content[:5000],  # Limit content size            'length': len(text_content)        }            except Exception as exc:        logger.error(f"Scraping failed for {url}: {exc}")                # Exponential backoff: 1min, 2min, 4min, 8min, 16min        countdown = 60 * (2 ** self.request.retries)        raise self.retry(exc=exc, countdown=countdown, max_retries=5)@app.task(bind=True)@rate_limit(key_prefix="database_query", max_calls=100, time_window=60)def execute_database_query(self, query: str, params: Optional[Dict] = None) -> List[Dict]:    """    Execute database query with rate limiting.    """    logger.info("Executing database query")        try:        import psycopg2        from psycopg2.extras import RealDictCursor                conn = psycopg2.connect(            host='localhost',            database='agent_db',            user='agent_user',            password='secure_password',            connect_timeout=10        )                with conn.cursor(cursor_factory=RealDictCursor) as cursor:            cursor.execute(query, params or {})            results = cursor.fetchall()                conn.close()                return {            'status': 'success',            'count': len(results),            'data': results        }            except Exception as exc:        logger.error(f"Database query failed: {exc}")        raise self.retry(exc=exc, countdown=30)# Batch processing for efficiency@app.taskdef batch_api_calls(endpoints: List[str]) -> List[Dict[str, Any]]:    """    Process multiple API calls efficiently using Celery groups.    """    from celery import group        # Create a group of parallel API call tasks    job = group(        call_external_api.s(endpoint=endpoint, params={})        for endpoint in endpoints    )        # Execute all in parallel    result = job.apply_async()        # Wait for all to complete (or use result.get() in a callback)    return {        'batch_id': result.id,        'total_tasks': len(endpoints),        'status': 'processing'    }@app.taskdef batch_url_scraping(urls: List[str], callback_task: Optional[str] = None) -> str:    """    Scrape multiple URLs with automatic batching and rate limiting.    """    from celery import chord, group        # Create scraping tasks    scrape_tasks = group(scrape_url.s(url) for url in urls)        if callback_task:        # Use chord for aggregation callback        workflow = chord(scrape_tasks)(callback_task)    else:        # Just execute in parallel        workflow = scrape_tasks.apply_async()        return workflow.id

Configuration for Production:

code
# celeryconfig.pyfrom kombu import Queue, Exchange<div></div># Broker settingsbroker_url = 'redis://localhost:6379/0'result_backend = 'redis://localhost:6379/0'<div></div># Task execution settingstask_serializer = 'json'accept_content = ['json']result_serializer = 'json'timezone = 'UTC'enable_utc = True<div></div># Performance settingsworker_prefetch_multiplier = 4worker_max_tasks_per_child = 1000  # Restart worker after 1000 tasks<div></div># Queue configurationtask_default_queue = 'default'task_queues = (    Queue('default', Exchange('default'), routing_key='default'),    Queue('high_priority', Exchange('high_priority'), routing_key='high_priority'),    Queue('low_priority', Exchange('low_priority'), routing_key='low_priority'),)<div></div># Route tasks by prioritytask_routes = {    'tool_executor.call_external_api': {'queue': 'high_priority'},    'tool_executor.scrape_url': {'queue': 'low_priority'},}<div></div># Result expirationresult_expires = 3600  # Results expire after 1 hour<div></div># Task retry settingstask_acks_late = True  # Acknowledge tasks after completiontask_reject_on_worker_lost = True  # Requeue if worker crashes
code
# celeryconfig.pyfrom kombu import Queue, Exchange# Broker settingsbroker_url = 'redis://localhost:6379/0'result_backend = 'redis://localhost:6379/0'# Task execution settingstask_serializer = 'json'accept_content = ['json']result_serializer = 'json'timezone = 'UTC'enable_utc = True# Performance settingsworker_prefetch_multiplier = 4worker_max_tasks_per_child = 1000  # Restart worker after 1000 tasks# Queue configurationtask_default_queue = 'default'task_queues = (    Queue('default', Exchange('default'), routing_key='default'),    Queue('high_priority', Exchange('high_priority'), routing_key='high_priority'),    Queue('low_priority', Exchange('low_priority'), routing_key='low_priority'),)# Route tasks by prioritytask_routes = {    'tool_executor.call_external_api': {'queue': 'high_priority'},    'tool_executor.scrape_url': {'queue': 'low_priority'},}# Result expirationresult_expires = 3600  # Results expire after 1 hour# Task retry settingstask_acks_late = True  # Acknowledge tasks after completiontask_reject_on_worker_lost = True  # Requeue if worker crashes

3.3 State Management & Checkpointing

Agentic workflows are multi-step:

  1. Think

  2. Search

  3. Analyze

  4. Act

  5. Reflect

  6. Continue

If step 4 fails, you don’t want to restart steps 1–3.

Queues + async let you:

  • save intermediate state

  • resume partial workflows

  • persist progress

  • recover from failures gracefully

Flowchart showing multi-stage agent workflow with checkpoint decision point for resuming after failures
Flowchart showing multi-stage agent workflow with checkpoint decision point for resuming after failures

Figure 4: Diagram—Checkpoint-enabled agent workflow

3.4 Checkpoint-Enabled Agent Workflow

Key Features Demonstrated:

  • ✅ Multi-stage workflow with checkpointing

  • ✅ Automatic resume from failure point

  • ✅ Persistent state in PostgreSQL

  • ✅ Workflow history tracking

  • ✅ Graceful failure handling

  • ✅ No redundant work on retry

code
# checkpoint_workflow.pyfrom celery import Celery, Taskfrom typing import Dict, Any, List, Optionalimport jsonimport loggingfrom datetime import datetimefrom enum import Enumimport psycopg2from psycopg2.extras import Json<div></div>logger = logging.getLogger(__name__)<div></div>app = Celery('checkpoint_workflow')<div></div>class WorkflowStage(Enum):    """Workflow stages for checkpointing."""    INITIALIZED = "initialized"    REASONING = "reasoning"    SEARCHING = "searching"    ANALYZING = "analyzing"    ACTING = "acting"    REFLECTING = "reflecting"    COMPLETED = "completed"    FAILED = "failed"<div></div>class CheckpointDB:    """Database handler for workflow checkpoints."""        def __init__(self):        self.conn_params = {            'host': 'localhost',            'database': 'agent_workflows',            'user': 'agent_user',            'password': 'secure_password'        }        def save_checkpoint(        self,        workflow_id: str,        stage: WorkflowStage,        state: Dict[str, Any],        metadata: Optional[Dict] = None    ) -> None:        """Save workflow checkpoint to database."""        try:            with psycopg2.connect(**self.conn_params) as conn:                with conn.cursor() as cursor:                    cursor.execute("""                        INSERT INTO workflow_checkpoints                         (workflow_id, stage, state, metadata, created_at)                        VALUES (%s, %s, %s, %s, %s)                        ON CONFLICT (workflow_id, stage)                         DO UPDATE SET                             state = EXCLUDED.state,                            metadata = EXCLUDED.metadata,                            updated_at = CURRENT_TIMESTAMP                    """, (                        workflow_id,                        stage.value,                        Json(state),                        Json(metadata or {}),                        datetime.utcnow()                    ))                conn.commit()                        logger.info(f"Checkpoint saved: {workflow_id} at stage {stage.value}")                    except Exception as e:            logger.error(f"Failed to save checkpoint: {e}")            raise        def load_checkpoint(        self,        workflow_id: str,        stage: Optional[WorkflowStage] = None    ) -> Optional[Dict[str, Any]]:        """Load workflow checkpoint from database."""        try:            with psycopg2.connect(**self.conn_params) as conn:                with conn.cursor() as cursor:                    if stage:                        cursor.execute("""                            SELECT stage, state, metadata, created_at                            FROM workflow_checkpoints                            WHERE workflow_id = %s AND stage = %s                            ORDER BY created_at DESC                            LIMIT 1                        """, (workflow_id, stage.value))                    else:                        # Get latest checkpoint                        cursor.execute("""                            SELECT stage, state, metadata, created_at                            FROM workflow_checkpoints                            WHERE workflow_id = %s                            ORDER BY created_at DESC                            LIMIT 1                        """, (workflow_id,))                                        result = cursor.fetchone()                                        if result:                        return {                            'stage': result[0],                            'state': result[1],                            'metadata': result[2],                            'created_at': result[3]                        }                        return None                    except Exception as e:            logger.error(f"Failed to load checkpoint: {e}")            return None        def get_workflow_history(self, workflow_id: str) -> List[Dict[str, Any]]:        """Get complete history of workflow checkpoints."""        try:            with psycopg2.connect(**self.conn_params) as conn:                with conn.cursor() as cursor:                    cursor.execute("""                        SELECT stage, state, metadata, created_at                        FROM workflow_checkpoints                        WHERE workflow_id = %s                        ORDER BY created_at ASC                    """, (workflow_id,))                                        results = cursor.fetchall()                                        return [                        {                            'stage': r[0],                            'state': r[1],                            'metadata': r[2],                            'created_at': r[3]                        }                        for r in results                    ]        except Exception as e:            logger.error(f"Failed to get workflow history: {e}")            return []<div></div>checkpoint_db = CheckpointDB()<div></div>class CheckpointableTask(Task):    """Base task class with checkpoint support."""        def on_failure(self, exc, task_id, args, kwargs, einfo):        """Handle task failure by saving checkpoint."""        workflow_id = kwargs.get('workflow_id')        if workflow_id:            checkpoint_db.save_checkpoint(                workflow_id=workflow_id,                stage=WorkflowStage.FAILED,                state={'error': str(exc)},                metadata={'task_id': task_id, 'traceback': str(einfo)}            )<div></div>@app.task(base=CheckpointableTask, bind=True, max_retries=3)def execute_checkpointed_workflow(    self,    workflow_id: str,    query: str,    context: Dict[str, Any],    resume_from: Optional[str] = None) -> Dict[str, Any]:    """    Execute a multi-stage workflow with checkpoint support.        If the workflow fails at any stage, it can resume from the last checkpoint.    """    logger.info(f"Starting workflow {workflow_id}")        # Check if we're resuming from a checkpoint    if resume_from:        checkpoint = checkpoint_db.load_checkpoint(workflow_id)        if checkpoint:            logger.info(f"Resuming from stage: {checkpoint['stage']}")            current_stage = WorkflowStage(checkpoint['stage'])            accumulated_state = checkpoint['state']        else:            current_stage = WorkflowStage.INITIALIZED            accumulated_state = {}    else:        current_stage = WorkflowStage.INITIALIZED        accumulated_state = {}        try:        # Stage 1: Reasoning        if current_stage.value in [WorkflowStage.INITIALIZED.value, WorkflowStage.REASONING.value]:            logger.info("Stage 1: Reasoning")                        reasoning_result = perform_reasoning(query, context)            accumulated_state['reasoning'] = reasoning_result                        checkpoint_db.save_checkpoint(                workflow_id=workflow_id,                stage=WorkflowStage.REASONING,                state=accumulated_state,                metadata={'completed_at': datetime.utcnow().isoformat()}            )                        current_stage = WorkflowStage.SEARCHING                # Stage 2: Searching        if current_stage.value == WorkflowStage.SEARCHING.value:            logger.info("Stage 2: Searching")                        search_queries = accumulated_state['reasoning'].get('search_queries', [])            search_results = perform_search(search_queries)            accumulated_state['search_results'] = search_results                        checkpoint_db.save_checkpoint(                workflow_id=workflow_id,                stage=WorkflowStage.SEARCHING,                state=accumulated_state,                metadata={'completed_at': datetime.utcnow().isoformat()}            )                        current_stage = WorkflowStage.ANALYZING                # Stage 3: Analyzing        if current_stage.value == WorkflowStage.ANALYZING.value:            logger.info("Stage 3: Analyzing")                        analysis = perform_analysis(                accumulated_state['search_results'],                accumulated_state['reasoning']            )            accumulated_state['analysis'] = analysis                        checkpoint_db.save_checkpoint(                workflow_id=workflow_id,                stage=WorkflowStage.ANALYZING,                state=accumulated_state,                metadata={'completed_at': datetime.utcnow().isoformat()}            )                        current_stage = WorkflowStage.ACTING                # Stage 4: Acting        if current_stage.value == WorkflowStage.ACTING.value:            logger.info("Stage 4: Acting")                        action_result = perform_action(accumulated_state['analysis'])            accumulated_state['action_result'] = action_result                        checkpoint_db.save_checkpoint(                workflow_id=workflow_id,                stage=WorkflowStage.ACTING,                state=accumulated_state,                metadata={'completed_at': datetime.utcnow().isoformat()}            )                        current_stage = WorkflowStage.REFLECTING                # Stage 5: Reflecting        if current_stage.value == WorkflowStage.REFLECTING.value:            logger.info("Stage 5: Reflecting")                        reflection = perform_reflection(accumulated_state)            accumulated_state['reflection'] = reflection                        checkpoint_db.save_checkpoint(                workflow_id=workflow_id,                stage=WorkflowStage.REFLECTING,                state=accumulated_state,                metadata={'completed_at': datetime.utcnow().isoformat()}            )                        current_stage = WorkflowStage.COMPLETED                # Final checkpoint        checkpoint_db.save_checkpoint(            workflow_id=workflow_id,            stage=WorkflowStage.COMPLETED,            state=accumulated_state,            metadata={                'completed_at': datetime.utcnow().isoformat(),                'success': True            }        )                return {            'workflow_id': workflow_id,            'status': 'completed',            'result': accumulated_state        }            except Exception as exc:        logger.error(f"Workflow failed at stage {current_stage.value}: {exc}")                # Save failure checkpoint        checkpoint_db.save_checkpoint(            workflow_id=workflow_id,            stage=current_stage,            state=accumulated_state,            metadata={                'error': str(exc),                'failed_at': datetime.utcnow().isoformat()            }        )                # Retry from current stage        raise self.retry(            exc=exc,            countdown=120,  # Wait 2 minutes before retry            kwargs={                'workflow_id': workflow_id,                'query': query,                'context': context,                'resume_from': current_stage.value            }        )<div></div># Helper functions for each stagedef perform_reasoning(query: str, context: Dict[str, Any]) -> Dict[str, Any]:    """Stage 1: Initial reasoning and planning."""    from openai import OpenAI        client = OpenAI()        response = client.chat.completions.create(        model="gpt-4",        messages=[            {"role": "system", "content": "You are a reasoning agent. Plan the search strategy."},            {"role": "user", "content": f"Query: {query}\nContext: {context}"}        ],        timeout=60    )        return {        'reasoning': response.choices[0].message.content,        'search_queries': ['query1', 'query2'],  # Extract from reasoning        'approach': 'comprehensive'    }<div></div>def perform_search(queries: List[str]) -> List[Dict[str, Any]]:    """Stage 2: Execute search queries."""    # Simulate search (replace with actual search implementation)    import time    time.sleep(1)        return [        {'query': q, 'results': [f'result for {q}']}        for q in queries    ]<div></div>def perform_analysis(search_results: List[Dict], reasoning: Dict) -> Dict[str, Any]:    """Stage 3: Analyze search results."""    from openai import OpenAI        client = OpenAI()        response = client.chat.completions.create(        model="gpt-4",        messages=[            {"role": "system", "content": "Analyze the search results."},            {"role": "user", "content": f"Results: {search_results}"}        ],        timeout=60    )        return {        'analysis': response.choices[0].message.content,        'confidence': 0.85,        'key_findings': ['finding1', 'finding2']    }<div></div>def perform_action(analysis: Dict[str, Any]) -> Dict[str, Any]:    """Stage 4: Take action based on analysis."""    # Simulate action (API call, database update, etc.)    import time    time.sleep(1)        return {        'action_taken': 'generated_report',        'status': 'success'    }<div></div>def perform_reflection(state: Dict[str, Any]) -> Dict[str, Any]:    """Stage 5: Reflect on the entire process."""    return {        'quality_assessment': 'high',        'improvements': ['More sources needed'],        'success_rate': 0.9    }<div></div># API endpoint for resuming workflows@app.taskdef resume_workflow(workflow_id: str) -> Dict[str, Any]:    """Resume a failed or interrupted workflow from last checkpoint."""    checkpoint = checkpoint_db.load_checkpoint(workflow_id)        if not checkpoint:        return {            'status': 'error',            'message': f'No checkpoint found for workflow {workflow_id}'        }        logger.info(f"Resuming workflow {workflow_id} from stage {checkpoint['stage']}")        # Resume execution    result = execute_checkpointed_workflow.delay(        workflow_id=workflow_id,        query=checkpoint['state'].get('query', ''),        context=checkpoint['state'].get('context', {}),        resume_from=checkpoint['stage']    )        return {        'status': 'resumed',        'task_id': result.id,        'resumed_from_stage': checkpoint['stage']    }
code
# checkpoint_workflow.pyfrom celery import Celery, Taskfrom typing import Dict, Any, List, Optionalimport jsonimport loggingfrom datetime import datetimefrom enum import Enumimport psycopg2from psycopg2.extras import Jsonlogger = logging.getLogger(__name__)app = Celery('checkpoint_workflow')class WorkflowStage(Enum):    """Workflow stages for checkpointing."""    INITIALIZED = "initialized"    REASONING = "reasoning"    SEARCHING = "searching"    ANALYZING = "analyzing"    ACTING = "acting"    REFLECTING = "reflecting"    COMPLETED = "completed"    FAILED = "failed"class CheckpointDB:    """Database handler for workflow checkpoints."""        def __init__(self):        self.conn_params = {            'host': 'localhost',            'database': 'agent_workflows',            'user': 'agent_user',            'password': 'secure_password'        }        def save_checkpoint(        self,        workflow_id: str,        stage: WorkflowStage,        state: Dict[str, Any],        metadata: Optional[Dict] = None    ) -> None:        """Save workflow checkpoint to database."""        try:            with psycopg2.connect(**self.conn_params) as conn:                with conn.cursor() as cursor:                    cursor.execute("""                        INSERT INTO workflow_checkpoints                         (workflow_id, stage, state, metadata, created_at)                        VALUES (%s, %s, %s, %s, %s)                        ON CONFLICT (workflow_id, stage)                         DO UPDATE SET                             state = EXCLUDED.state,                            metadata = EXCLUDED.metadata,                            updated_at = CURRENT_TIMESTAMP                    """, (                        workflow_id,                        stage.value,                        Json(state),                        Json(metadata or {}),                        datetime.utcnow()                    ))                conn.commit()                        logger.info(f"Checkpoint saved: {workflow_id} at stage {stage.value}")                    except Exception as e:            logger.error(f"Failed to save checkpoint: {e}")            raise        def load_checkpoint(        self,        workflow_id: str,        stage: Optional[WorkflowStage] = None    ) -> Optional[Dict[str, Any]]:        """Load workflow checkpoint from database."""        try:            with psycopg2.connect(**self.conn_params) as conn:                with conn.cursor() as cursor:                    if stage:                        cursor.execute("""                            SELECT stage, state, metadata, created_at                            FROM workflow_checkpoints                            WHERE workflow_id = %s AND stage = %s                            ORDER BY created_at DESC                            LIMIT 1                        """, (workflow_id, stage.value))                    else:                        # Get latest checkpoint                        cursor.execute("""                            SELECT stage, state, metadata, created_at                            FROM workflow_checkpoints                            WHERE workflow_id = %s                            ORDER BY created_at DESC                            LIMIT 1                        """, (workflow_id,))                                        result = cursor.fetchone()                                        if result:                        return {                            'stage': result[0],                            'state': result[1],                            'metadata': result[2],                            'created_at': result[3]                        }                        return None                    except Exception as e:            logger.error(f"Failed to load checkpoint: {e}")            return None        def get_workflow_history(self, workflow_id: str) -> List[Dict[str, Any]]:        """Get complete history of workflow checkpoints."""        try:            with psycopg2.connect(**self.conn_params) as conn:                with conn.cursor() as cursor:                    cursor.execute("""                        SELECT stage, state, metadata, created_at                        FROM workflow_checkpoints                        WHERE workflow_id = %s                        ORDER BY created_at ASC                    """, (workflow_id,))                                        results = cursor.fetchall()                                        return [                        {                            'stage': r[0],                            'state': r[1],                            'metadata': r[2],                            'created_at': r[3]                        }                        for r in results                    ]        except Exception as e:            logger.error(f"Failed to get workflow history: {e}")            return []checkpoint_db = CheckpointDB()class CheckpointableTask(Task):    """Base task class with checkpoint support."""        def on_failure(self, exc, task_id, args, kwargs, einfo):        """Handle task failure by saving checkpoint."""        workflow_id = kwargs.get('workflow_id')        if workflow_id:            checkpoint_db.save_checkpoint(                workflow_id=workflow_id,                stage=WorkflowStage.FAILED,                state={'error': str(exc)},                metadata={'task_id': task_id, 'traceback': str(einfo)}            )@app.task(base=CheckpointableTask, bind=True, max_retries=3)def execute_checkpointed_workflow(    self,    workflow_id: str,    query: str,    context: Dict[str, Any],    resume_from: Optional[str] = None) -> Dict[str, Any]:    """    Execute a multi-stage workflow with checkpoint support.        If the workflow fails at any stage, it can resume from the last checkpoint.    """    logger.info(f"Starting workflow {workflow_id}")        # Check if we're resuming from a checkpoint    if resume_from:        checkpoint = checkpoint_db.load_checkpoint(workflow_id)        if checkpoint:            logger.info(f"Resuming from stage: {checkpoint['stage']}")            current_stage = WorkflowStage(checkpoint['stage'])            accumulated_state = checkpoint['state']        else:            current_stage = WorkflowStage.INITIALIZED            accumulated_state = {}    else:        current_stage = WorkflowStage.INITIALIZED        accumulated_state = {}        try:        # Stage 1: Reasoning        if current_stage.value in [WorkflowStage.INITIALIZED.value, WorkflowStage.REASONING.value]:            logger.info("Stage 1: Reasoning")                        reasoning_result = perform_reasoning(query, context)            accumulated_state['reasoning'] = reasoning_result                        checkpoint_db.save_checkpoint(                workflow_id=workflow_id,                stage=WorkflowStage.REASONING,                state=accumulated_state,                metadata={'completed_at': datetime.utcnow().isoformat()}            )                        current_stage = WorkflowStage.SEARCHING                # Stage 2: Searching        if current_stage.value == WorkflowStage.SEARCHING.value:            logger.info("Stage 2: Searching")                        search_queries = accumulated_state['reasoning'].get('search_queries', [])            search_results = perform_search(search_queries)            accumulated_state['search_results'] = search_results                        checkpoint_db.save_checkpoint(                workflow_id=workflow_id,                stage=WorkflowStage.SEARCHING,                state=accumulated_state,                metadata={'completed_at': datetime.utcnow().isoformat()}            )                        current_stage = WorkflowStage.ANALYZING                # Stage 3: Analyzing        if current_stage.value == WorkflowStage.ANALYZING.value:            logger.info("Stage 3: Analyzing")                        analysis = perform_analysis(                accumulated_state['search_results'],                accumulated_state['reasoning']            )            accumulated_state['analysis'] = analysis                        checkpoint_db.save_checkpoint(                workflow_id=workflow_id,                stage=WorkflowStage.ANALYZING,                state=accumulated_state,                metadata={'completed_at': datetime.utcnow().isoformat()}            )                        current_stage = WorkflowStage.ACTING                # Stage 4: Acting        if current_stage.value == WorkflowStage.ACTING.value:            logger.info("Stage 4: Acting")                        action_result = perform_action(accumulated_state['analysis'])            accumulated_state['action_result'] = action_result                        checkpoint_db.save_checkpoint(                workflow_id=workflow_id,                stage=WorkflowStage.ACTING,                state=accumulated_state,                metadata={'completed_at': datetime.utcnow().isoformat()}            )                        current_stage = WorkflowStage.REFLECTING                # Stage 5: Reflecting        if current_stage.value == WorkflowStage.REFLECTING.value:            logger.info("Stage 5: Reflecting")                        reflection = perform_reflection(accumulated_state)            accumulated_state['reflection'] = reflection                        checkpoint_db.save_checkpoint(                workflow_id=workflow_id,                stage=WorkflowStage.REFLECTING,                state=accumulated_state,                metadata={'completed_at': datetime.utcnow().isoformat()}            )                        current_stage = WorkflowStage.COMPLETED                # Final checkpoint        checkpoint_db.save_checkpoint(            workflow_id=workflow_id,            stage=WorkflowStage.COMPLETED,            state=accumulated_state,            metadata={                'completed_at': datetime.utcnow().isoformat(),                'success': True            }        )                return {            'workflow_id': workflow_id,            'status': 'completed',            'result': accumulated_state        }            except Exception as exc:        logger.error(f"Workflow failed at stage {current_stage.value}: {exc}")                # Save failure checkpoint        checkpoint_db.save_checkpoint(            workflow_id=workflow_id,            stage=current_stage,            state=accumulated_state,            metadata={                'error': str(exc),                'failed_at': datetime.utcnow().isoformat()            }        )                # Retry from current stage        raise self.retry(            exc=exc,            countdown=120,  # Wait 2 minutes before retry            kwargs={                'workflow_id': workflow_id,                'query': query,                'context': context,                'resume_from': current_stage.value            }        )# Helper functions for each stagedef perform_reasoning(query: str, context: Dict[str, Any]) -> Dict[str, Any]:    """Stage 1: Initial reasoning and planning."""    from openai import OpenAI        client = OpenAI()        response = client.chat.completions.create(        model="gpt-4",        messages=[            {"role": "system", "content": "You are a reasoning agent. Plan the search strategy."},            {"role": "user", "content": f"Query: {query}\nContext: {context}"}        ],        timeout=60    )        return {        'reasoning': response.choices[0].message.content,        'search_queries': ['query1', 'query2'],  # Extract from reasoning        'approach': 'comprehensive'    }def perform_search(queries: List[str]) -> List[Dict[str, Any]]:    """Stage 2: Execute search queries."""    # Simulate search (replace with actual search implementation)    import time    time.sleep(1)        return [        {'query': q, 'results': [f'result for {q}']}        for q in queries    ]def perform_analysis(search_results: List[Dict], reasoning: Dict) -> Dict[str, Any]:    """Stage 3: Analyze search results."""    from openai import OpenAI        client = OpenAI()        response = client.chat.completions.create(        model="gpt-4",        messages=[            {"role": "system", "content": "Analyze the search results."},            {"role": "user", "content": f"Results: {search_results}"}        ],        timeout=60    )        return {        'analysis': response.choices[0].message.content,        'confidence': 0.85,        'key_findings': ['finding1', 'finding2']    }def perform_action(analysis: Dict[str, Any]) -> Dict[str, Any]:    """Stage 4: Take action based on analysis."""    # Simulate action (API call, database update, etc.)    import time    time.sleep(1)        return {        'action_taken': 'generated_report',        'status': 'success'    }def perform_reflection(state: Dict[str, Any]) -> Dict[str, Any]:    """Stage 5: Reflect on the entire process."""    return {        'quality_assessment': 'high',        'improvements': ['More sources needed'],        'success_rate': 0.9    }# API endpoint for resuming workflows@app.taskdef resume_workflow(workflow_id: str) -> Dict[str, Any]:    """Resume a failed or interrupted workflow from last checkpoint."""    checkpoint = checkpoint_db.load_checkpoint(workflow_id)        if not checkpoint:        return {            'status': 'error',            'message': f'No checkpoint found for workflow {workflow_id}'        }        logger.info(f"Resuming workflow {workflow_id} from stage {checkpoint['stage']}")        # Resume execution    result = execute_checkpointed_workflow.delay(        workflow_id=workflow_id,        query=checkpoint['state'].get('query', ''),        context=checkpoint['state'].get('context', {}),        resume_from=checkpoint['stage']    )        return {        'status': 'resumed',        'task_id': result.id,        'resumed_from_stage': checkpoint['stage']    }

Database Schema for Checkpoints:

code
-- Create checkpoints tableCREATE TABLE IF NOT EXISTS workflow_checkpoints (    id SERIAL PRIMARY KEY,    workflow_id VARCHAR(255) NOT NULL,    stage VARCHAR(50) NOT NULL,    state JSONB NOT NULL,    metadata JSONB,    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,    UNIQUE(workflow_id, stage));<div></div>-- Create indexes for performanceCREATE INDEX idx_workflow_id ON workflow_checkpoints(workflow_id);CREATE INDEX idx_workflow_stage ON workflow_checkpoints(workflow_id, stage);CREATE INDEX idx_created_at ON workflow_checkpoints(created_at DESC);<div></div>-- Create function to update updated_at automaticallyCREATE OR REPLACE FUNCTION update_updated_at_column()RETURNS TRIGGER AS $$BEGIN    NEW.updated_at = CURRENT_TIMESTAMP;    RETURN NEW;END;$$ language 'plpgsql';<div></div>CREATE TRIGGER update_workflow_checkpoints_updated_at     BEFORE UPDATE ON workflow_checkpoints     FOR EACH ROW     EXECUTE FUNCTION update_updated_at_column();
code
-- Create checkpoints tableCREATE TABLE IF NOT EXISTS workflow_checkpoints (    id SERIAL PRIMARY KEY,    workflow_id VARCHAR(255) NOT NULL,    stage VARCHAR(50) NOT NULL,    state JSONB NOT NULL,    metadata JSONB,    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,    UNIQUE(workflow_id, stage));-- Create indexes for performanceCREATE INDEX idx_workflow_id ON workflow_checkpoints(workflow_id);CREATE INDEX idx_workflow_stage ON workflow_checkpoints(workflow_id, stage);CREATE INDEX idx_created_at ON workflow_checkpoints(created_at DESC);-- Create function to update updated_at automaticallyCREATE OR REPLACE FUNCTION update_updated_at_column()RETURNS TRIGGER AS $$BEGIN    NEW.updated_at = CURRENT_TIMESTAMP;    RETURN NEW;END;$$ language 'plpgsql';CREATE TRIGGER update_workflow_checkpoints_updated_at     BEFORE UPDATE ON workflow_checkpoints     FOR EACH ROW     EXECUTE FUNCTION update_updated_at_column();

4. Scaling & Load Distribution

Horizontal scaling is the backbone of robust agent systems.

With queues:

  • Add more workers = handle more tasks

  • Remove workers = lower costs

  • System auto-balances workloads

Scaling doesn’t require changing the main app.

5. Event-Driven Agent Architectures

5.1 Architecture

Many agent tasks are triggered by:

  • new data arriving

  • changes in the environment

  • user updates

  • periodic schedules (Celery Beat)

  • external webhooks

Message queues make this possible:

  • agents can subscribe to events

  • workflows run asynchronously

  • each agent wakes up only when relevant work arrives

Event-driven architecture diagram showing EventSource triggering three agents through a central Message Queue
Event-driven architecture diagram showing EventSource triggering three agents through a central Message Queue

Figure 5: Diagram—Event-driven agent pipeline

5.2 Event-Driven Agent System with Webhooks

Key Features Demonstrated:

  • ✅ Event-driven architecture with pub/sub

  • ✅ Webhook endpoints for external integrations

  • ✅ Periodic tasks with Celery Beat

  • ✅ Event routing to appropriate agents

  • ✅ Health monitoring and cleanup

  • ✅ Signature verification for webhooks

  • ✅ Redis-based event bus

code
# event_driven_agents.pyfrom celery import Celeryfrom celery.schedules import crontabfrom typing import Dict, Any, List, Callableimport loggingimport jsonfrom datetime import datetimefrom redis import Redisfrom fastapi import FastAPI, Request, BackgroundTasksimport hmacimport hashlib<div></div>logger = logging.getLogger(__name__)<div></div>app = Celery('event_driven_agents')api = FastAPI()<div></div># Redis for event pub/subredis_client = Redis(host='localhost', port=6379, db=2, decode_responses=True)<div></div># Configure periodic tasksapp.conf.beat_schedule = {    'monitor-data-sources-every-hour': {        'task': 'event_driven_agents.monitor_data_sources',        'schedule': crontab(minute=0),  # Every hour    },    'cleanup-old-events-daily': {        'task': 'event_driven_agents.cleanup_old_events',        'schedule': crontab(hour=2, minute=0),  # Daily at 2 AM    },    'health-check-every-5-minutes': {        'task': 'event_driven_agents.health_check',        'schedule': 300.0,  # Every 5 minutes    },}<div></div>class EventBus:    """Event bus for publish/subscribe pattern."""        def __init__(self):        self.redis = redis_client        self.subscribers: Dict[str, List[Callable]] = {}        def publish(self, event_type: str, data: Dict[str, Any]) -> None:        """Publish an event to all subscribers."""        event = {            'type': event_type,            'data': data,            'timestamp': datetime.utcnow().isoformat(),            'event_id': f"{event_type}_{int(datetime.utcnow().timestamp())}"        }                # Store event in Redis        event_key = f"events:{event_type}:{event['event_id']}"        self.redis.setex(event_key, 86400, json.dumps(event))  # 24 hour TTL                # Publish to channel        self.redis.publish(f"channel:{event_type}", json.dumps(event))                logger.info(f"Published event: {event_type}")        def subscribe(self, event_type: str, handler: Callable) -> None:        """Subscribe a handler to an event type."""        if event_type not in self.subscribers:            self.subscribers[event_type] = []        self.subscribers[event_type].append(handler)        logger.info(f"Subscribed handler to {event_type}")<div></div>event_bus = EventBus()<div></div># Event-triggered agents@app.task(bind=True)def agent_on_new_data(self, event_data: Dict[str, Any]) -> Dict[str, Any]:    """    Agent triggered when new data arrives.    """    logger.info("Agent 1: Processing new data event")        try:        data_source = event_data.get('source')        data_content = event_data.get('content')                # Process the new data        processed_result = {            'source': data_source,            'processed_at': datetime.utcnow().isoformat(),            'summary': f"Processed data from {data_source}",            'status': 'success'        }                # Publish processed event for downstream agents        event_bus.publish('data_processed', processed_result)                return processed_result            except Exception as e:        logger.error(f"Agent 1 failed: {e}")        raise<div></div>@app.task(bind=True)def agent_on_environment_change(self, event_data: Dict[str, Any]) -> Dict[str, Any]:    """    Agent triggered when environment changes.    """    logger.info("Agent 2: Responding to environment change")        try:        change_type = event_data.get('change_type')        impact = event_data.get('impact')                # Adapt strategy based on change        adaptation = {            'change_detected': change_type,            'adaptation_strategy': f"Adjusted for {change_type}",            'timestamp': datetime.utcnow().isoformat()        }                # Notify other systems        event_bus.publish('agent_adapted', adaptation)                return adaptation            except Exception as e:        logger.error(f"Agent 2 failed: {e}")        raise<div></div>@app.task(bind=True)def agent_on_user_update(self, event_data: Dict[str, Any]) -> Dict[str, Any]:    """    Agent triggered when user provides updates.    """    logger.info("Agent 3: Processing user update")        try:        user_id = event_data.get('user_id')        update_type = event_data.get('update_type')                # Handle user update        response = {            'user_id': user_id,            'acknowledgment': f"Processed {update_type} update",            'next_action': 'user_notified',            'timestamp': datetime.utcnow().isoformat()        }                return response            except Exception as e:        logger.error(f"Agent 3 failed: {e}")        raise<div></div># Event router@app.taskdef route_event(event_type: str, event_data: Dict[str, Any]) -> List[str]:    """    Route events to appropriate agent handlers.    """    logger.info(f"Routing event: {event_type}")        event_handlers = {        'new_data_arrived': agent_on_new_data,        'environment_changed': agent_on_environment_change,        'user_updated': agent_on_user_update,    }        handler = event_handlers.get(event_type)        if handler:        # Trigger the appropriate agent asynchronously        result = handler.delay(event_data)        return [result.id]    else:        logger.warning(f"No handler found for event: {event_type}")        return []<div></div># Webhook endpoint for external events@api.post("/webhook/github")async def github_webhook(request: Request):    """    Receive GitHub webhook events and trigger appropriate agents.    """    # Verify webhook signature    signature = request.headers.get('X-Hub-Signature-256')    if not verify_github_signature(await request.body(), signature):        return {'error': 'Invalid signature'}, 401        payload = await request.json()    event_type = request.headers.get('X-GitHub-Event')        logger.info(f"Received GitHub webhook: {event_type}")        # Transform webhook to internal event    event_data = {        'source': 'github',        'event_type': event_type,        'payload': payload,        'received_at': datetime.utcnow().isoformat()    }        # Route to appropriate agent    route_event.delay('new_data_arrived', event_data)        return {'status': 'accepted'}<div></div>@api.post("/webhook/slack")async def slack_webhook(request: Request):    """    Receive Slack events and trigger agents.    """    payload = await request.json()        # Handle Slack URL verification    if payload.get('type') == 'url_verification':        return {'challenge': payload['challenge']}        event = payload.get('event', {})    event_type = event.get('type')        logger.info(f"Received Slack event: {event_type}")        # Transform to internal event    event_data = {        'source': 'slack',        'event_type': event_type,        'user': event.get('user'),        'text': event.get('text'),        'channel': event.get('channel')    }        # Trigger user update agent    route_event.delay('user_updated', event_data)        return {'status': 'ok'}<div></div>@api.post("/webhook/custom")async def custom_webhook(request: Request):    """    Generic webhook endpoint for custom integrations.    """    payload = await request.json()        event_type = payload.get('event_type', 'environment_changed')    event_data = payload.get('data', {})        logger.info(f"Received custom webhook: {event_type}")        # Route to appropriate agent    task_ids = route_event.delay(event_type, event_data)        return {        'status': 'accepted',        'task_ids': task_ids    }<div></div># Periodic monitoring tasks@app.taskdef monitor_data_sources():    """    Periodically check data sources for changes.    Runs every hour via Celery Beat.    """    logger.info("Monitoring data sources for changes")        # Check various data sources    data_sources = ['database', 'api', 's3_bucket']        for source in data_sources:        # Simulate checking for changes        has_changes = check_data_source(source)                if has_changes:            event_bus.publish('new_data_arrived', {                'source': source,                'content': 'New data detected',                'priority': 'high'            })<div></div>@app.taskdef cleanup_old_events():    """    Clean up old events from Redis.    Runs daily at 2 AM.    """    logger.info("Cleaning up old events")        # Get all event keys    pattern = "events:*"    cursor = 0    deleted_count = 0        while True:        cursor, keys = redis_client.scan(            cursor=cursor,            match=pattern,            count=100        )                for key in keys:            # Check if event is older than 7 days            ttl = redis_client.ttl(key)            if ttl == -1:  # No expiration set                redis_client.delete(key)                deleted_count += 1                if cursor == 0:            break        logger.info(f"Deleted {deleted_count} old events")<div></div>@app.taskdef health_check():    """    Perform health check on all agents.    Runs every 5 minutes.    """    logger.info("Performing health check")        # Check Redis connection    try:        redis_client.ping()        redis_status = 'healthy'    except Exception:        redis_status = 'unhealthy'        # Publish health status    event_bus.publish('health_check_completed', {        'redis': redis_status,        'timestamp': datetime.utcnow().isoformat()    })<div></div># Utility functionsdef verify_github_signature(payload: bytes, signature: str) -> bool:    """Verify GitHub webhook signature."""    secret = b'your_webhook_secret'        if not signature:        return False        expected_signature = 'sha256=' + hmac.new(        secret,        payload,        hashlib.sha256    ).hexdigest()        return hmac.compare_digest(signature, expected_signature)<div></div>def check_data_source(source: str) -> bool:    """Check if a data source has new data."""    # Implement actual checking logic    import random    return random.choice([True, False])
code
# event_driven_agents.pyfrom celery import Celeryfrom celery.schedules import crontabfrom typing import Dict, Any, List, Callableimport loggingimport jsonfrom datetime import datetimefrom redis import Redisfrom fastapi import FastAPI, Request, BackgroundTasksimport hmacimport hashliblogger = logging.getLogger(__name__)app = Celery('event_driven_agents')api = FastAPI()# Redis for event pub/subredis_client = Redis(host='localhost', port=6379, db=2, decode_responses=True)# Configure periodic tasksapp.conf.beat_schedule = {    'monitor-data-sources-every-hour': {        'task': 'event_driven_agents.monitor_data_sources',        'schedule': crontab(minute=0),  # Every hour    },    'cleanup-old-events-daily': {        'task': 'event_driven_agents.cleanup_old_events',        'schedule': crontab(hour=2, minute=0),  # Daily at 2 AM    },    'health-check-every-5-minutes': {        'task': 'event_driven_agents.health_check',        'schedule': 300.0,  # Every 5 minutes    },}class EventBus:    """Event bus for publish/subscribe pattern."""        def __init__(self):        self.redis = redis_client        self.subscribers: Dict[str, List[Callable]] = {}        def publish(self, event_type: str, data: Dict[str, Any]) -> None:        """Publish an event to all subscribers."""        event = {            'type': event_type,            'data': data,            'timestamp': datetime.utcnow().isoformat(),            'event_id': f"{event_type}_{int(datetime.utcnow().timestamp())}"        }                # Store event in Redis        event_key = f"events:{event_type}:{event['event_id']}"        self.redis.setex(event_key, 86400, json.dumps(event))  # 24 hour TTL                # Publish to channel        self.redis.publish(f"channel:{event_type}", json.dumps(event))                logger.info(f"Published event: {event_type}")        def subscribe(self, event_type: str, handler: Callable) -> None:        """Subscribe a handler to an event type."""        if event_type not in self.subscribers:            self.subscribers[event_type] = []        self.subscribers[event_type].append(handler)        logger.info(f"Subscribed handler to {event_type}")event_bus = EventBus()# Event-triggered agents@app.task(bind=True)def agent_on_new_data(self, event_data: Dict[str, Any]) -> Dict[str, Any]:    """    Agent triggered when new data arrives.    """    logger.info("Agent 1: Processing new data event")        try:        data_source = event_data.get('source')        data_content = event_data.get('content')                # Process the new data        processed_result = {            'source': data_source,            'processed_at': datetime.utcnow().isoformat(),            'summary': f"Processed data from {data_source}",            'status': 'success'        }                # Publish processed event for downstream agents        event_bus.publish('data_processed', processed_result)                return processed_result            except Exception as e:        logger.error(f"Agent 1 failed: {e}")        raise@app.task(bind=True)def agent_on_environment_change(self, event_data: Dict[str, Any]) -> Dict[str, Any]:    """    Agent triggered when environment changes.    """    logger.info("Agent 2: Responding to environment change")        try:        change_type = event_data.get('change_type')        impact = event_data.get('impact')                # Adapt strategy based on change        adaptation = {            'change_detected': change_type,            'adaptation_strategy': f"Adjusted for {change_type}",            'timestamp': datetime.utcnow().isoformat()        }                # Notify other systems        event_bus.publish('agent_adapted', adaptation)                return adaptation            except Exception as e:        logger.error(f"Agent 2 failed: {e}")        raise@app.task(bind=True)def agent_on_user_update(self, event_data: Dict[str, Any]) -> Dict[str, Any]:    """    Agent triggered when user provides updates.    """    logger.info("Agent 3: Processing user update")        try:        user_id = event_data.get('user_id')        update_type = event_data.get('update_type')                # Handle user update        response = {            'user_id': user_id,            'acknowledgment': f"Processed {update_type} update",            'next_action': 'user_notified',            'timestamp': datetime.utcnow().isoformat()        }                return response            except Exception as e:        logger.error(f"Agent 3 failed: {e}")        raise# Event router@app.taskdef route_event(event_type: str, event_data: Dict[str, Any]) -> List[str]:    """    Route events to appropriate agent handlers.    """    logger.info(f"Routing event: {event_type}")        event_handlers = {        'new_data_arrived': agent_on_new_data,        'environment_changed': agent_on_environment_change,        'user_updated': agent_on_user_update,    }        handler = event_handlers.get(event_type)        if handler:        # Trigger the appropriate agent asynchronously        result = handler.delay(event_data)        return [result.id]    else:        logger.warning(f"No handler found for event: {event_type}")        return []# Webhook endpoint for external events@api.post("/webhook/github")async def github_webhook(request: Request):    """    Receive GitHub webhook events and trigger appropriate agents.    """    # Verify webhook signature    signature = request.headers.get('X-Hub-Signature-256')    if not verify_github_signature(await request.body(), signature):        return {'error': 'Invalid signature'}, 401        payload = await request.json()    event_type = request.headers.get('X-GitHub-Event')        logger.info(f"Received GitHub webhook: {event_type}")        # Transform webhook to internal event    event_data = {        'source': 'github',        'event_type': event_type,        'payload': payload,        'received_at': datetime.utcnow().isoformat()    }        # Route to appropriate agent    route_event.delay('new_data_arrived', event_data)        return {'status': 'accepted'}@api.post("/webhook/slack")async def slack_webhook(request: Request):    """    Receive Slack events and trigger agents.    """    payload = await request.json()        # Handle Slack URL verification    if payload.get('type') == 'url_verification':        return {'challenge': payload['challenge']}        event = payload.get('event', {})    event_type = event.get('type')        logger.info(f"Received Slack event: {event_type}")        # Transform to internal event    event_data = {        'source': 'slack',        'event_type': event_type,        'user': event.get('user'),        'text': event.get('text'),        'channel': event.get('channel')    }        # Trigger user update agent    route_event.delay('user_updated', event_data)        return {'status': 'ok'}@api.post("/webhook/custom")async def custom_webhook(request: Request):    """    Generic webhook endpoint for custom integrations.    """    payload = await request.json()        event_type = payload.get('event_type', 'environment_changed')    event_data = payload.get('data', {})        logger.info(f"Received custom webhook: {event_type}")        # Route to appropriate agent    task_ids = route_event.delay(event_type, event_data)        return {        'status': 'accepted',        'task_ids': task_ids    }# Periodic monitoring tasks@app.taskdef monitor_data_sources():    """    Periodically check data sources for changes.    Runs every hour via Celery Beat.    """    logger.info("Monitoring data sources for changes")        # Check various data sources    data_sources = ['database', 'api', 's3_bucket']        for source in data_sources:        # Simulate checking for changes        has_changes = check_data_source(source)                if has_changes:            event_bus.publish('new_data_arrived', {                'source': source,                'content': 'New data detected',                'priority': 'high'            })@app.taskdef cleanup_old_events():    """    Clean up old events from Redis.    Runs daily at 2 AM.    """    logger.info("Cleaning up old events")        # Get all event keys    pattern = "events:*"    cursor = 0    deleted_count = 0        while True:        cursor, keys = redis_client.scan(            cursor=cursor,            match=pattern,            count=100        )                for key in keys:            # Check if event is older than 7 days            ttl = redis_client.ttl(key)            if ttl == -1:  # No expiration set                redis_client.delete(key)                deleted_count += 1                if cursor == 0:            break        logger.info(f"Deleted {deleted_count} old events")@app.taskdef health_check():    """    Perform health check on all agents.    Runs every 5 minutes.    """    logger.info("Performing health check")        # Check Redis connection    try:        redis_client.ping()        redis_status = 'healthy'    except Exception:        redis_status = 'unhealthy'        # Publish health status    event_bus.publish('health_check_completed', {        'redis': redis_status,        'timestamp': datetime.utcnow().isoformat()    })# Utility functionsdef verify_github_signature(payload: bytes, signature: str) -> bool:    """Verify GitHub webhook signature."""    secret = b'your_webhook_secret'        if not signature:        return False        expected_signature = 'sha256=' + hmac.new(        secret,        payload,        hashlib.sha256    ).hexdigest()        return hmac.compare_digest(signature, expected_signature)def check_data_source(source: str) -> bool:    """Check if a data source has new data."""    # Implement actual checking logic    import random    return random.choice([True, False])

Starting the Event-Driven System:

code
# Terminal 1: Start Celery worker for event processingcelery -A event_driven_agents worker --loglevel=info -Q default -c 4<div></div># Terminal 2: Start Celery Beat for periodic taskscelery -A event_driven_agents beat --loglevel=info<div></div># Terminal 3: Start FastAPI webhook serveruvicorn event_driven_agents:api --host 0.0.0.0 --port 8000 --reload<div></div># Terminal 4: Monitor with Flowercelery -A event_driven_agents flower --port=5555
code
# Terminal 1: Start Celery worker for event processingcelery -A event_driven_agents worker --loglevel=info -Q default -c 4# Terminal 2: Start Celery Beat for periodic taskscelery -A event_driven_agents beat --loglevel=info# Terminal 3: Start FastAPI webhook serveruvicorn event_driven_agents:api --host 0.0.0.0 --port 8000 --reload# Terminal 4: Monitor with Flowercelery -A event_driven_agents flower --port=5555

Testing the Event System:

code
# test_events.pyimport requestsimport json<div></div># Test custom webhookresponse = requests.post(    'http://localhost:8000/webhook/custom',    json={        'event_type': 'environment_changed',        'data': {            'change_type': 'api_rate_limit_increased',            'impact': 'high',            'details': 'Rate limit increased to 1000/hour'        }    })<div></div>print(f"Response: {response.json()}")<div></div># Test direct event publishingfrom event_driven_agents import event_bus<div></div>event_bus.publish('new_data_arrived', {    'source': 'manual_trigger',    'content': 'Test data',    'priority': 'low'})
code
# test_events.pyimport requestsimport json# Test custom webhookresponse = requests.post(    'http://localhost:8000/webhook/custom',    json={        'event_type': 'environment_changed',        'data': {            'change_type': 'api_rate_limit_increased',            'impact': 'high',            'details': 'Rate limit increased to 1000/hour'        }    })print(f"Response: {response.json()}")# Test direct event publishingfrom event_driven_agents import event_busevent_bus.publish('new_data_arrived', {    'source': 'manual_trigger',    'content': 'Test data',    'priority': 'low'})

6. Production Deployment with Docker

docker-compose.yml

code
# docker-compose.ymlversion: '3.8'<div></div>services:  # Redis - Message broker and cache  redis:    image: redis:7-alpine    ports:      - "6379:6379"    volumes:      - redis_data:/data    command: redis-server --appendonly yes    healthcheck:      test: ["CMD", "redis-cli", "ping"]      interval: 10s      timeout: 3s      retries: 3<div></div>  # PostgreSQL - State management  postgres:    image: postgres:15-alpine    environment:      POSTGRES_DB: agent_workflows      POSTGRES_USER: agent_user      POSTGRES_PASSWORD: secure_password    ports:      - "5432:5432"    volumes:      - postgres_data:/var/lib/postgresql/data      - ./init_db.sql:/docker-entrypoint-initdb.d/init.sql    healthcheck:      test: ["CMD-SHELL", "pg_isready -U agent_user"]      interval: 10s      timeout: 5s      retries: 5<div></div>  # Celery Worker - Agent execution  celery_worker:    build: .    command: celery -A multi_agent_system worker --loglevel=info -c 4    depends_on:      - redis      - postgres    environment:      CELERY_BROKER_URL: redis://redis:6379/0      CELERY_RESULT_BACKEND: redis://redis:6379/0      DATABASE_URL: postgresql://agent_user:secure_password@postgres:5432/agent_workflows      OPENAI_API_KEY: ${OPENAI_API_KEY}    volumes:      - ./:/app    restart: unless-stopped<div></div>  # Celery Beat - Periodic tasks  celery_beat:    build: .    command: celery -A event_driven_agents beat --loglevel=info    depends_on:      - redis      - postgres    environment:      CELERY_BROKER_URL: redis://redis:6379/0      CELERY_RESULT_BACKEND: redis://redis:6379/0    volumes:      - ./:/app    restart: unless-stopped<div></div>  # Flower - Monitoring dashboard  flower:    build: .    command: celery -A multi_agent_system flower --port=5555    ports:      - "5555:5555"    depends_on:      - redis      - celery_worker    environment:      CELERY_BROKER_URL: redis://redis:6379/0      CELERY_RESULT_BACKEND: redis://redis:6379/0    restart: unless-stopped<div></div>  # FastAPI - Web server for webhooks  api:    build: .    command: uvicorn event_driven_agents:api --host 0.0.0.0 --port 8000    ports:      - "8000:8000"    depends_on:      - redis      - postgres    environment:      CELERY_BROKER_URL: redis://redis:6379/0      DATABASE_URL: postgresql://agent_user:secure_password@postgres:5432/agent_workflows    volumes:      - ./:/app    restart: unless-stopped<div></div>volumes:  redis_data:  postgres_data:
code
# docker-compose.ymlversion: '3.8'services:  # Redis - Message broker and cache  redis:    image: redis:7-alpine    ports:      - "6379:6379"    volumes:      - redis_data:/data    command: redis-server --appendonly yes    healthcheck:      test: ["CMD", "redis-cli", "ping"]      interval: 10s      timeout: 3s      retries: 3  # PostgreSQL - State management  postgres:    image: postgres:15-alpine    environment:      POSTGRES_DB: agent_workflows      POSTGRES_USER: agent_user      POSTGRES_PASSWORD: secure_password    ports:      - "5432:5432"    volumes:      - postgres_data:/var/lib/postgresql/data      - ./init_db.sql:/docker-entrypoint-initdb.d/init.sql    healthcheck:      test: ["CMD-SHELL", "pg_isready -U agent_user"]      interval: 10s      timeout: 5s      retries: 5  # Celery Worker - Agent execution  celery_worker:    build: .    command: celery -A multi_agent_system worker --loglevel=info -c 4    depends_on:      - redis      - postgres    environment:      CELERY_BROKER_URL: redis://redis:6379/0      CELERY_RESULT_BACKEND: redis://redis:6379/0      DATABASE_URL: postgresql://agent_user:secure_password@postgres:5432/agent_workflows      OPENAI_API_KEY: ${OPENAI_API_KEY}    volumes:      - ./:/app    restart: unless-stopped  # Celery Beat - Periodic tasks  celery_beat:    build: .    command: celery -A event_driven_agents beat --loglevel=info    depends_on:      - redis      - postgres    environment:      CELERY_BROKER_URL: redis://redis:6379/0      CELERY_RESULT_BACKEND: redis://redis:6379/0    volumes:      - ./:/app    restart: unless-stopped  # Flower - Monitoring dashboard  flower:    build: .    command: celery -A multi_agent_system flower --port=5555    ports:      - "5555:5555"    depends_on:      - redis      - celery_worker    environment:      CELERY_BROKER_URL: redis://redis:6379/0      CELERY_RESULT_BACKEND: redis://redis:6379/0    restart: unless-stopped  # FastAPI - Web server for webhooks  api:    build: .    command: uvicorn event_driven_agents:api --host 0.0.0.0 --port 8000    ports:      - "8000:8000"    depends_on:      - redis      - postgres    environment:      CELERY_BROKER_URL: redis://redis:6379/0      DATABASE_URL: postgresql://agent_user:secure_password@postgres:5432/agent_workflows    volumes:      - ./:/app    restart: unless-stoppedvolumes:  redis_data:  postgres_data:

Dockerfile

code
# DockerfileFROM python:3.11-slim<div></div>WORKDIR /app<div></div># Install system dependenciesRUN apt-get update && apt-get install -y \    gcc \    postgresql-client \    && rm -rf /var/lib/apt/lists/*<div></div># Install Python dependenciesCOPY requirements.txt .RUN pip install --no-cache-dir -r requirements.txt<div></div># Copy application codeCOPY . .<div></div># Run as non-root userRUN useradd -m -u 1000 celeryuser && chown -R celeryuser:celeryuser /appUSER celeryuser<div></div>CMD ["celery", "-A", "multi_agent_system", "worker", "--loglevel=info"]
code
# DockerfileFROM python:3.11-slimWORKDIR /app# Install system dependenciesRUN apt-get update && apt-get install -y \    gcc \    postgresql-client \    && rm -rf /var/lib/apt/lists/*# Install Python dependenciesCOPY requirements.txt .RUN pip install --no-cache-dir -r requirements.txt# Copy application codeCOPY . .# Run as non-root userRUN useradd -m -u 1000 celeryuser && chown -R celeryuser:celeryuser /appUSER celeryuserCMD ["celery", "-A", "multi_agent_system", "worker", "--loglevel=info"]

requirements.txt

code
# requirements.txtcelery[redis]==5.3.4redis==5.0.1psycopg2-binary==2.9.9openai==1.3.0fastapi==0.104.1uvicorn[standard]==0.24.0flower==2.0.1requests==2.31.0beautifulsoup4==4.12.2pydantic==2.5.0python-multipart==0.0.6
code
# requirements.txtcelery[redis]==5.3.4redis==5.0.1psycopg2-binary==2.9.9openai==1.3.0fastapi==0.104.1uvicorn[standard]==0.24.0flower==2.0.1requests==2.31.0beautifulsoup4==4.12.2pydantic==2.5.0python-multipart==0.0.6

Start the entire system:

code
# Build and start all servicesdocker-compose up --build -d<div></div># View logsdocker-compose logs -f celery_worker<div></div># Scale workersdocker-compose up -d --scale celery_worker=5<div></div># Stop all servicesdocker-compose down
code
# Build and start all servicesdocker-compose up --build -d# View logsdocker-compose logs -f celery_worker# Scale workersdocker-compose up -d --scale celery_worker=5# Stop all servicesdocker-compose down

7. Frequently Asked Questions

7.1 What is asynchronous processing in agentic AI?

Asynchronous processing in agentic AI allows autonomous agents to execute tasks without blocking the main application thread. Instead of waiting for long-running operations like LLM calls, tool invocations, or web scraping to complete, the system places these tasks in a queue and immediately returns control to the user. Worker processes handle the actual execution independently, enabling the system to remain responsive while agents perform complex, time-consuming operations in the background.

7.2 Why do AI agents need message queues?

AI agents need message queues to handle three critical challenges: unpredictable timing (agent operations can take seconds to minutes), variable workloads (multiple agents may need resources simultaneously), and coordination complexity (agents must communicate without conflicts). Message queues act as buffers that throttle requests, prioritize tasks, enable retry logic, and distribute workload across multiple workers, preventing system overload and resource contention.

7.3 What's the difference between synchronous and asynchronous agent execution?

In synchronous execution, the system waits for each agent operation to complete before proceeding, causing user requests to block, threads to get stuck, and timeouts to occur frequently. In asynchronous execution, the system immediately acknowledges requests, places tasks in a queue with a tracking ID, and allows worker processes to handle operations independently. This decoupling means failures don't crash the main application, tasks can be retried automatically, and the system scales by simply adding more workers.

7.4 Which message broker is best for agentic AI systems?

The choice depends on your requirements:

  • Redis - Best for simple, high-speed queuing with low latency; ideal for prototypes and moderate-scale systems

  • RabbitMQ - Excellent for complex routing, reliable delivery guarantees, and fine-grained control; suited for enterprise production systems

  • Apache Kafka - Optimal for event streaming, high-throughput scenarios, and when you need message replay capabilities

  • AWS SQS - Best for cloud-native applications requiring minimal infrastructure management

Most production agentic AI systems start with Redis for simplicity and scale to RabbitMQ or Kafka as requirements grow.

7.5 How do queues enable multi-agent coordination?

Queues enable multi-agent coordination by providing a centralized task distribution mechanism. Instead of agents competing directly for resources like API rate limits, database connections, or external services, they submit work to specialized queues. Workers pull tasks at a controlled rate, preventing overwhelming downstream services. Different agent types (research, scraper, reviewer, planner) can have dedicated queues with different priorities, and the system automatically load-balances work across available workers.

7.6 What happens if an agent task fails in a queue-based system?

Queue-based systems provide robust failure handling through several mechanisms:

  1. Automatic retries - Failed tasks return to the queue with exponential backoff

  2. Dead letter queues - Tasks failing repeatedly move to a separate queue for investigation

  3. State persistence - Intermediate results are checkpointed, so work doesn't need to restart from scratch

  4. Circuit breakers - Repeated failures can temporarily disable problematic agents

  5. Monitoring - Failed tasks generate alerts for investigation

This graceful degradation ensures one failing agent doesn't bring down the entire system.

7.7 How does async processing improve agent scalability?

Async processing enables horizontal scalability - the easiest and most cost-effective scaling strategy. When demand increases, you simply add more worker processes without modifying application code. The queue automatically distributes work across all available workers. When demand decreases, you reduce worker count to save costs. This elastic scaling is impossible with synchronous architectures, where each additional concurrent user requires dedicated thread resources that remain blocked during long operations.

7.8 Can I use async processing for real-time agent interactions?

Yes, but with careful architecture. For truly real-time interactions (sub-second responses), use async processing for heavy operations while keeping lightweight responses synchronous. Implement streaming responses where the agent immediately returns a connection, then streams results as they become available. Use WebSockets or Server-Sent Events (SSE) to push updates to users. Reserve synchronous execution only for simple queries that complete in milliseconds, and use queues for everything else.

7.9 What tools do I need to implement async agent processing?

A production-ready async agent system typically requires:

Task Queue Framework:

  • Celery (Python) - Most popular, mature ecosystem

  • RQ (Redis Queue) - Simpler alternative for smaller projects

  • Dramatiq - Modern alternative with better defaults

Message Broker:

  • Redis - Fast, simple setup

  • RabbitMQ - Enterprise-grade reliability

  • AWS SQS - Cloud-native managed service

State Management:

  • PostgreSQL - Structured data and ACID guarantees

  • MongoDB - Flexible schema for agent states

  • Redis - Fast intermediate state storage

Monitoring:

  • Flower - Celery monitoring dashboard

  • Prometheus + Grafana - Metrics and alerting

  • CloudWatch - AWS-native monitoring

7.10 How do I handle long-running agent workflows with checkpoints?

Implement checkpointing by breaking workflows into discrete steps and persisting state after each step:

  1. Define stages - Break workflows into logical units (Think → Search → Analyze → Act → Reflect)

  2. Save intermediate state - Store results and context after each stage completion

  3. Use unique task IDs - Track workflow progress with persistent identifiers

  4. Implement resume logic - On failure, check last completed stage and continue from there

  5. Set timeouts per stage - Prevent individual steps from hanging indefinitely

  6. Store in durable storage - Use databases, not just in-memory caches

This approach means a failure at step 4 doesn't require restarting steps 1-3, saving time and API costs.

8. Conclusion: Async + Queues = Agentic AI Superpower

Asynchronous processing and message queues are not optional in agentic systems—they are foundational.

They enable:

✔ Non-blocking agent tasks
✔ Multi-agent concurrency
✔ Reliable tool execution
✔ State persistence
✔ Event-driven autonomy
✔ Horizontal scaling
✔ Decoupled architecture

In short:

Without async and queues, autonomous AI would collapse under its own complexity. They make agentic systems resilient, scalable, and production-grade.