How to Build Multi-Agent Workflows with OpenAI Codex: Automating 8-Hour Tasks with Parallel Agent Orchestration

How to Build Multi-Agent Workflows with OpenAI Codex: Automating 8-Hour Tasks with Parallel Agent Orchestration

Ambitious, end-to-end automation projects often stall not because your model can’t write code, but because you don’t have a reliable way to coordinate multiple agents over hours of runtime, while staying on budget, recovering from transient failures, and proving progress. This tutorial walks through a practical, production-grade architecture for building “Codex-style” multi-agent workflows: a planner that decomposes a complex objective, specialist agents that work in parallel, a reviewer that protects quality, and an orchestrator that keeps everything moving. You will learn how to design the architecture, wire up parallel execution, implement token/credit budgeting, checkpoint long runs, monitor progress, and integrate with CI/CD pipelines—complete with Python code examples and command-line utilities you can drop into your own projects. The 2026 Prompt Library: 5 Templates for Prompt Engineering

How to Build Multi-Agent Workflows with OpenAI Codex: Automating 8-Hour Tasks with Parallel Agent Orchestration

1. Introduction to Codex agent architecture

When people say “Codex agent,” they often mean a code-capable LLM paired with tools and guardrails, not just a single prompt. In a multi-agent architecture, we position the model inside a system that plans, executes, reviews, and monitors longer-running work. Think of it as a small, well-managed team: a planner breaks down the big goal into a DAG of tasks; multiple implementers (specialists) work in parallel; a reviewer or validator checks the outputs; and a coordinator handles orchestration, budgeting, retries, and checkpoints.

This architecture doesn’t require any specific model name. You can use a code-capable model in the OpenAI ecosystem (for example, a GPT-4-class model) to achieve Codex-like behavior. The architectural patterns below remain the same regardless of the exact model: planner, parallel workers, reviewer, and orchestrator.

Main roles and responsibilities

  • Planner (decomposer): Reads a high-level spec and produces a DAG of tasks—each with a definition, dependencies, acceptance criteria, and rough budget. Optionally assigns tasks to specialized agent types (e.g., “data_extraction,” “refactor,” “test_scaffold”).
  • Implementers (specialist agents): Execute the subtasks (writing code, running tools, reading docs, or transforming data). Each has a clear interface: task in, artifact(s) out, plus logs and usage.
  • Reviewer (quality gate): Checks outputs against acceptance criteria; can request fixes or escalate issues back to the planner.
  • Orchestrator (controller): Schedules tasks based on dependencies and budgets, handles concurrency limits, manages retries and backoff, persists checkpoints, and provides observability.
Component Inputs Outputs Key Concerns
Planner Goal spec, constraints DAG: tasks + deps + budgets Granularity, ordering, budget allocation
Implementers Task spec, resources Artifacts, logs Token usage, determinism, retryability
Reviewer Artifacts, criteria Pass/fail, fixes Quality, consistency
Orchestrator DAG, budgets Schedule, metrics, checkpoints Parallelism, error handling, observability

The rest of this tutorial builds a skeleton system you can extend. We’ll start with parallel execution, then add decomposition, budgeting, checkpointing, and monitoring. By the end, you will have an end-to-end script that can run for hours while keeping your spend predictable and your results traceable. How to Build a a Research Assistant with Claude Code in 2026: Step-by-Step

How to Build Multi-Agent Workflows with OpenAI Codex: Automating 8-Hour Tasks with Parallel Agent Orchestration - Section 1

2. Setting up parallel agent orchestration

Parallelism is what turns an 8-hour serial task into a 2-hour workflow. The orchestrator’s job is to keep a pipeline of ready tasks flowing into worker agents while respecting dependency order, per-agent budgets, and concurrency limits. In Python, the simplest robust way to do this is with asyncio: a semaphore to constrain concurrency, asyncio.gather for batches, and a task scheduler that yields tasks whose dependencies are satisfied.

Design decisions

  • Async-first: Use asyncio to control concurrency. Avoid CPU-bound work in the event loop; offload it to threads or processes if needed.
  • Idempotency: Tag each API call with an idempotency key so retries don’t duplicate work.
  • Backpressure: Bound concurrency with a semaphore to prevent resource exhaustion or rate-limit breaches.
  • Dependency-aware scheduling: Use a DAG and only schedule tasks whose prerequisites are complete and successful.

Core orchestration scaffold

Below is a minimal coordinator that pulls tasks from a ready queue, dispatches to agents asynchronously, records results, and respects a concurrency cap. We’ll stub a fake planner and implementer here, then replace them with real LLM-backed logic in later sections.

# orchestrator.py
import asyncio
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Any
import time
import uuid

@dataclass
class Task:
    id: str
    name: str
    description: str
    deps: List[str] = field(default_factory=list)
    budget_credits: int = 100  # internal credit abstraction
    agent_type: str = "general"
    status: str = "PENDING"
    result: Optional[str] = None
    error: Optional[str] = None
    attempts: int = 0

@dataclass
class RunConfig:
    max_concurrency: int = 4
    max_retries: int = 3
    retry_backoff: float = 2.0
    run_id: str = field(default_factory=lambda: str(uuid.uuid4()))

class Orchestrator:
    def __init__(self, tasks: List[Task], config: RunConfig) -> None:
        self.tasks: Dict[str, Task] = {t.id: t for t in tasks}
        self.config = config
        self.sem = asyncio.Semaphore(config.max_concurrency)
        self.start_time = time.time()

    def ready_tasks(self) -> List[Task]:
        ready = []
        for t in self.tasks.values():
            if t.status == "PENDING":
                if all(self.tasks[d].status == "DONE" for d in t.deps):
                    ready.append(t)
        return ready

    async def run(self) -> None:
        while True:
            # If all tasks are terminal, break
            states = [t.status for t in self.tasks.values()]
            if all(s in {"DONE", "FAILED"} for s in states):
                break

            # Schedule ready tasks
            batch = self.ready_tasks()
            if not batch:
                await asyncio.sleep(0.2)
                continue

            await asyncio.gather(*(self._run_one(t) for t in batch))

        duration = time.time() - self.start_time
        successes = sum(1 for t in self.tasks.values() if t.status == "DONE")
        failures = sum(1 for t in self.tasks.values() if t.status == "FAILED")
        print(f"Run {self.config.run_id} finished in {duration:.1f}s: "
              f"{successes} success, {failures} failed.")

    async def _run_one(self, task: Task) -> None:
        if task.status != "PENDING":
            return
        task.status = "RUNNING"
        await self.sem.acquire()
        try:
            for attempt in range(1, self.config.max_retries + 1):
                task.attempts = attempt
                try:
                    # placeholder implementation; we replace with LLM-backed agent later
                    task.result = await self._fake_agent(task)
                    task.status = "DONE"
                    break
                except Exception as e:
                    task.error = str(e)
                    if attempt >= self.config.max_retries:
                        task.status = "FAILED"
                        break
                    await asyncio.sleep(self.config.retry_backoff ** attempt)
        finally:
            self.sem.release()

    async def _fake_agent(self, task: Task) -> str:
        # Simulate variable time and occasional errors
        await asyncio.sleep(0.2)
        if "fail" in task.name.lower():
            raise RuntimeError("simulated transient error")
        return f"completed {task.name}"

Run this with a few tasks and dependencies to verify the flow. Next we’ll replace the fake agent with a real LLM-backed worker that calls the OpenAI Responses API and tracks usage details to manage budgets.

How to Build Multi-Agent Workflows with OpenAI Codex: Automating 8-Hour Tasks with Parallel Agent Orchestration - Section 2

3. Breaking down complex tasks into subtasks

The planner transforms a single objective into an actionable DAG of subtasks with explicit dependencies and budgets. Good planning is the difference between a single, oversized prompt and a pipeline that can progress even when some tasks are blocked. We’ll prompt the model to return strict JSON and parse it into our Task objects. Later, we’ll enforce acceptance criteria and budget allocations.

Planner prompts and JSON schemas

A reliable way to get structured output is to provide a schema and ask the model to produce JSON that fits it. We’ll use the Responses API with a JSON schema response format to request a list of tasks. The model will name tasks, describe them, estimate effort, and propose dependencies. Then we’ll transform that result into our internal Task representation, assigning budgets based on estimated effort and a global budget.

# planner.py
from typing import List
from dataclasses import dataclass
from openai import OpenAI

from orchestrator import Task

client = OpenAI()

TASK_SCHEMA = {
    "name": "TaskPlan",
    "schema": {
        "type": "object",
        "properties": {
            "tasks": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "id": {"type": "string"},
                        "name": {"type": "string"},
                        "description": {"type": "string"},
                        "deps": {"type": "array", "items": {"type": "string"}},
                        "agent_type": {"type": "string"},
                        "effort_estimate": {"type": "integer"}  # 1..5
                    },
                    "required": ["id", "name", "description", "deps", "agent_type", "effort_estimate"],
                    "additionalProperties": False
                }
            }
        },
        "required": ["tasks"],
        "additionalProperties": False
    }
}

def make_planner_prompt(goal: str, constraints: str) -> List[dict]:
    return [{
        "role": "system",
        "content": [
            {"type": "text", "text":
             "You are a senior project planner. Decompose goals into tasks. "
             "Return strictly valid JSON matching the schema."}
        ]
    }, {
        "role": "user",
        "content": [
            {"type": "text", "text": f"High-level goal:\n{goal}\n\nConstraints:\n{constraints}\n"}
        ]
    }]

def plan_tasks(goal: str, constraints: str, total_budget_credits: int = 1000) -> List[Task]:
    resp = client.responses.create(
        model="gpt-4.1-mini",
        input=make_planner_prompt(goal, constraints),
        response_format={
            "type": "json_schema",
            "json_schema": TASK_SCHEMA
        },
        metadata={"component": "planner"}
    )

    # Extract JSON
    plan = resp.output[0].content[0].text  # output_text can also be used in SDKs that provide it
    import json
    data = json.loads(plan)

    tasks_raw = data["tasks"]
    total_effort = sum(max(1, int(t.get("effort_estimate", 1))) for t in tasks_raw) or 1

    tasks: List[Task] = []
    for t in tasks_raw:
        effort = max(1, int(t.get("effort_estimate", 1)))
        budget = max(50, int(total_budget_credits * (effort / total_effort)))
        tasks.append(Task(
            id=t["id"],
            name=t["name"],
            description=t["description"],
            deps=t.get("deps", []),
            agent_type=t.get("agent_type", "general"),
            budget_credits=budget
        ))
    return tasks

You can tune the schema to capture acceptance criteria, required artifacts, test strategies, or review gates per task. When planning, keep tasks small enough to finish under a few iterations of your retry policy. Larger tasks can block downstream progress and increase the cost of mistakes. The Complete Guide to ChatGPT’s New Custom GPTs Interface: How the Updated Plugin Menu, MCP Connectors, and Action Icons Transform Enterprise AI Workflows

4. Token budget management with credit-based pricing

OpenAI APIs are priced per token, but it’s often useful to introduce an internal “credit” layer in your system. This lets you allocate budgets to agents and tasks independent of model choice, and enforce spend limits per project or CI run. The idea: convert token usage to credits via a configurable rate, and decrement per-task budgets as agents make calls. If a task runs out of credits, pause or escalate.

Designing the BudgetManager

  • Credits: Arbitrary unit that maps to token cost via a configurable factor. Example: 1 credit per 1,000 tokens estimate, or 1 credit per $0.01 of spend—use what makes sense in your org.
  • Per-task budgets: Planner sets initial budgets. Orchestrator enforces them, allowing limited overage or top-ups with approval.
  • Usage accounting: Track input/output tokens and convert to credits. Use the API response’s usage fields to get exact counts.
# budget.py
from dataclasses import dataclass

@dataclass
class Budget:
    total_credits: int
    used_credits: int = 0

    @property
    def remaining(self) -> int:
        return max(0, self.total_credits - self.used_credits)

class BudgetManager:
    def __init__(self, credit_per_1k_tokens: float = 1.0) -> None:
        # Example: 1 credit for each 1,000 tokens
        self.credit_per_1k_tokens = credit_per_1k_tokens

    def tokens_to_credits(self, input_tokens: int, output_tokens: int) -> int:
        # convert to credits; round up to keep safe
        total_tokens = max(0, input_tokens) + max(0, output_tokens)
        credits = int(((total_tokens / 1000.0) * self.credit_per_1k_tokens) + 0.9999)
        return max(1, credits) if total_tokens > 0 else 0

    def charge(self, budget: Budget, input_tokens: int, output_tokens: int) -> None:
        c = self.tokens_to_credits(input_tokens, output_tokens)
        if c > budget.remaining:
            raise RuntimeError(f"Budget exceeded: need {c}, have {budget.remaining}")
        budget.used_credits += c

We’ll integrate this into the implementer so that every LLM call checks against the task’s budget. If a task needs a top-up, the orchestrator can approve it or fail early with a clear reason. This keeps long runs predictable and auditable.

5. Error handling and checkpoints for long-running agents

A long-running workflow (hours or days) must survive transient API errors, network blips, process restarts, and partial failures. Checkpointing ensures you never repeat completed work. Combined with idempotency keys and retries with exponential backoff, you can deliver robust automations that confidently resume where they left off.

Key patterns

  • Idempotency keys: Include a unique key per task-attempt when calling the API. If a retry sends the same request, the server returns the previous result.
  • Exponential backoff with jitter: Retry predictable transient failures with increasing delays.
  • Checkpoint store: Persist task states (PENDING, RUNNING, DONE, FAILED), attempts, artifacts, and usage. A small SQLite DB is perfect for local runs and CI.
  • Heartbeats: Periodically write “I’m alive” updates so a supervisor or dashboard knows the run hasn’t frozen.
# checkpoints.py
import sqlite3
from dataclasses import dataclass
from typing import Optional, Dict, Any
import json
import time
from pathlib import Path

SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS tasks (
  id TEXT PRIMARY KEY,
  name TEXT,
  status TEXT,
  attempts INTEGER,
  result TEXT,
  error TEXT,
  usage_json TEXT,
  updated_at REAL
);

CREATE TABLE IF NOT EXISTS runs (
  run_id TEXT PRIMARY KEY,
  started_at REAL,
  completed_at REAL,
  meta_json TEXT
);
"""

class CheckpointStore:
    def __init__(self, db_path: str = "checkpoints.sqlite") -> None:
        self.db_path = db_path
        Path(db_path).touch(exist_ok=True)
        self._init_db()

    def _init_db(self) -> None:
        with sqlite3.connect(self.db_path) as conn:
            conn.executescript(SCHEMA_SQL)
            conn.commit()

    def upsert_task(self, task_id: str, name: str, status: str, attempts: int,
                    result: Optional[str], error: Optional[str],
                    usage: Optional[Dict[str, Any]]) -> None:
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                """
                INSERT INTO tasks (id, name, status, attempts, result, error, usage_json, updated_at)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                ON CONFLICT(id) DO UPDATE SET
                  status=excluded.status,
                  attempts=excluded.attempts,
                  result=excluded.result,
                  error=excluded.error,
                  usage_json=excluded.usage_json,
                  updated_at=excluded.updated_at
                """,
                (task_id, name, status, attempts, result, error,
                 json.dumps(usage or {}), time.time())
            )
            conn.commit()

    def set_run_start(self, run_id: str, meta: Optional[Dict[str, Any]] = None) -> None:
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                """
                INSERT INTO runs (run_id, started_at, completed_at, meta_json)
                VALUES (?, ?, NULL, ?)
                ON CONFLICT(run_id) DO UPDATE SET
                  started_at=excluded.started_at,
                  meta_json=excluded.meta_json
                """,
                (run_id, time.time(), json.dumps(meta or {}))
            )
            conn.commit()

    def set_run_complete(self, run_id: str) -> None:
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                "UPDATE runs SET completed_at = ? WHERE run_id = ?",
                (time.time(), run_id)
            )
            conn.commit()

We will call this store inside the orchestrator at every status change. On restart, the orchestrator can load tasks from the DB and skip those that are already DONE. You can also use Redis or a cloud database for distributed runs.

6. Python code examples for API calls and task decomposition

Now let’s wire up the implementer agent to call the OpenAI Responses API for code generation or transformation tasks, track usage to enforce budgets, and write checkpoints after each attempt. We’ll combine the orchestrator, planner, budget manager, and checkpoint store into an executable script.

Implementer agent

# agent.py
from typing import Dict, Any, Optional
from openai import OpenAI
from budget import Budget, BudgetManager

client = OpenAI()

class ImplementerAgent:
    def __init__(self, agent_type: str, budget: Budget, budget_manager: BudgetManager) -> None:
        self.agent_type = agent_type
        self.budget = budget
        self.budget_manager = budget_manager

    def _system_instructions(self) -> str:
        if self.agent_type == "refactor":
            return ("You are a precise code-refactoring assistant. Provide diffs or patch-style outputs, "
                    "preserve behavior, and include a summary of changes.")
        elif self.agent_type == "test_scaffold":
            return ("You write unit tests with clear arrange/act/assert phases. "
                    "Target high-value paths and edge cases.")
        else:
            return ("You are a pragmatic software assistant. Generate code and explanations with "
                    "clear, runnable examples and minimal dependencies.")

    def _idempotency_key(self, task_id: str, attempt: int) -> str:
        return f"task:{task_id}:agent:{self.agent_type}:attempt:{attempt}"

    def run(self, task_id: str, description: str, attempt: int) -> Dict[str, Any]:
        # Prepare request following your budget constraints
        system_text = self._system_instructions()
        prompt_user = f"Task description:\n{description}\n\nProduce the required code and a short rationale."

        # For retries, use idempotency_key to avoid duplicating charges/work if the server already processed it
        resp = client.responses.create(
            model="gpt-4.1-mini",
            input=[
                {"role": "system", "content": [{"type": "text", "text": system_text}]},
                {"role": "user", "content": [{"type": "text", "text": prompt_user}]}
            ],
            metadata={"component": "implementer", "task_id": task_id, "agent_type": self.agent_type},
            idempotency_key=self._idempotency_key(task_id, attempt)
        )

        # Extract output and usage safely
        try:
            text = resp.output_text
        except Exception:
            # Fallback parsing when SDK attributes differ
            text = resp.output[0].content[0].text

        usage = {}
        # SDKs typically provide usage fields; handle variations defensively
        if hasattr(resp, "usage") and resp.usage is not None:
            usage = {
                "input_tokens": getattr(resp.usage, "input_tokens", getattr(resp.usage, "prompt_tokens", 0)),
                "output_tokens": getattr(resp.usage, "output_tokens", getattr(resp.usage, "completion_tokens", 0)),
                "total_tokens": getattr(resp.usage, "total_tokens", None)
            }

            input_tokens = usage.get("input_tokens", 0) or 0
            output_tokens = usage.get("output_tokens", 0) or 0
            # Charge budget based on tokens
            self.budget_manager.charge(self.budget, input_tokens, output_tokens)

        return {
            "content": text,
            "usage": usage
        }

Integrating with the orchestrator

We’ll now integrate the implementer, planner, budget manager, and checkpoint store. This script creates a plan from a goal, spins up tasks with budgets, runs them in parallel, checkpoints progress, and prints a summary.

# run_workflow.py
import asyncio
import os
from dataclasses import replace
from typing import Dict
from orchestrator import Orchestrator, Task, RunConfig
from planner import plan_tasks
from budget import Budget, BudgetManager
from checkpoints import CheckpointStore
from agent import ImplementerAgent

# Expect OPENAI_API_KEY in environment for the SDK

class LLMOrchestrator(Orchestrator):
    def __init__(self, tasks, config, checkpoint_store, budget_manager):
        super().__init__(tasks, config)
        self.checkpoints = checkpoint_store
        self.budget_manager = budget_manager
        self.task_budgets: Dict[str, Budget] = {t.id: Budget(total_credits=t.budget_credits) for t in tasks}

    async def run(self):
        self.checkpoints.set_run_start(self.config.run_id, {"max_concurrency": self.config.max_concurrency})
        await super().run()
        self.checkpoints.set_run_complete(self.config.run_id)

    async def _run_one(self, task: Task):
        if task.status != "PENDING":
            return
        task.status = "RUNNING"
        self.checkpoints.upsert_task(task.id, task.name, task.status, task.attempts, task.result, task.error, None)
        await self.sem.acquire()
        try:
            for attempt in range(1, self.config.max_retries + 1):
                task.attempts = attempt
                agent = ImplementerAgent(task.agent_type, self.task_budgets[task.id], self.budget_manager)
                try:
                    out = await asyncio.to_thread(agent.run, task.id, task.description, attempt)
                    task.result = out["content"]
                    usage = out.get("usage")
                    task.status = "DONE"
                    self.checkpoints.upsert_task(task.id, task.name, task.status, task.attempts, task.result, None, usage)
                    break
                except Exception as e:
                    task.error = str(e)
                    self.checkpoints.upsert_task(task.id, task.name, "RUNNING", task.attempts, None, task.error, None)
                    if attempt >= self.config.max_retries:
                        task.status = "FAILED"
                        self.checkpoints.upsert_task(task.id, task.name, task.status, task.attempts, None, task.error, None)
                        break
                    await asyncio.sleep(self.config.retry_backoff ** attempt)
        finally:
            self.sem.release()

async def main():
    goal = """Create a small CLI tool that downloads a CSV from a URL, validates a schema, computes summary stats, and writes a Markdown report."""
    constraints = """Use Python 3.10+, standard libraries where possible, and write at least 3 unit tests. Provide a README."""

    # Plan tasks
    tasks = plan_tasks(goal, constraints, total_budget_credits=1200)

    # Orchestrate
    config = RunConfig(max_concurrency=3, max_retries=3, retry_backoff=2.0)
    checkpoints = CheckpointStore("checkpoints.sqlite")
    budget_manager = BudgetManager(credit_per_1k_tokens=1.2)

    orch = LLMOrchestrator(tasks, config, checkpoints, budget_manager)
    await orch.run()

if __name__ == "__main__":
    asyncio.run(main())

This is a runnable backbone. You can now plug in a reviewer agent to check acceptance criteria, extend the planner schema to include tests, and add more agent types for specialized tasks (e.g., data ingestion, code synthesis, test generation). Notice that all token usage flows through the budget manager, and all state transitions are checkpointed.

7. Monitoring agent progress

When parallel agents run for hours, you need clear progress reporting. Monitoring should answer: Which tasks are running, blocked, or failed? How much budget did we consume? How many retries? What artifacts are available? Minimal monitoring requires structured logs and a progress file; more advanced setups push metrics to an observability stack (Prometheus, OpenTelemetry, or a managed APM).

Progress events

We’ll add a simple event logger that writes JSON lines to a file and prints summaries to stdout. Downstream tooling (e.g., a web UI) can tail this file for near-real-time dashboards. You can also emit OpenTelemetry traces for spans per task.

# progress.py
import json
import time
from threading import Lock
from typing import Dict, Any

class ProgressLogger:
    def __init__(self, path: str = "progress.jsonl"):
        self.path = path
        self._lock = Lock()

    def emit(self, event: Dict[str, Any]):
        event["ts"] = time.time()
        with self._lock:
            with open(self.path, "a", encoding="utf-8") as f:
                f.write(json.dumps(event, ensure_ascii=False) + "\n")
        print(f"[progress] {event['type']}: {event.get('msg', '')}")

Integrate this into the orchestrator to emit events on task start, completion, failure, and budget updates.

# extend LLMOrchestrator in run_workflow.py
from progress import ProgressLogger

class LLMOrchestrator(Orchestrator):
    def __init__(self, tasks, config, checkpoint_store, budget_manager):
        super().__init__(tasks, config)
        self.checkpoints = checkpoint_store
        self.budget_manager = budget_manager
        self.task_budgets = {t.id: Budget(total_credits=t.budget_credits) for t in tasks}
        self.progress = ProgressLogger()

    async def run(self):
        self.checkpoints.set_run_start(self.config.run_id, {"max_concurrency": self.config.max_concurrency})
        self.progress.emit({"type": "run.start", "run_id": self.config.run_id, "msg": "Run started"})
        await super().run()
        self.checkpoints.set_run_complete(self.config.run_id)
        self.progress.emit({"type": "run.end", "run_id": self.config.run_id, "msg": "Run completed"})

    async def _run_one(self, task: Task):
        if task.status != "PENDING":
            return
        task.status = "RUNNING"
        self.progress.emit({"type": "task.start", "task_id": task.id, "name": task.name})
        self.checkpoints.upsert_task(task.id, task.name, task.status, task.attempts, task.result, task.error, None)
        await self.sem.acquire()
        try:
            for attempt in range(1, self.config.max_retries + 1):
                task.attempts = attempt
                agent = ImplementerAgent(task.agent_type, self.task_budgets[task.id], self.budget_manager)
                try:
                    out = await asyncio.to_thread(agent.run, task.id, task.description, attempt)
                    task.result = out["content"]
                    usage = out.get("usage")
                    task.status = "DONE"
                    self.checkpoints.upsert_task(task.id, task.name, task.status, task.attempts, task.result, None, usage)
                    self.progress.emit({"type": "task.done", "task_id": task.id, "usage": usage, "remaining_credits": self.task_budgets[task.id].remaining})
                    break
                except Exception as e:
                    task.error = str(e)
                    self.checkpoints.upsert_task(task.id, task.name, "RUNNING", task.attempts, None, task.error, None)
                    self.progress.emit({"type": "task.retry", "task_id": task.id, "attempt": attempt, "msg": task.error})
                    if attempt >= self.config.max_retries:
                        task.status = "FAILED"
                        self.checkpoints.upsert_task(task.id, task.name, task.status, task.attempts, None, task.error, None)
                        self.progress.emit({"type": "task.failed", "task_id": task.id, "msg": task.error})
                        break
                    await asyncio.sleep(self.config.retry_backoff ** attempt)
        finally:
            self.sem.release()

Key metrics to track

Metric Description
Tasks completed/failed High-level success/failure ratio
Average attempts per task Health of prompts/tools; high means instability
Credits used per task Budget adherence and overruns
Throughput (tasks/hour) Parallelism effectiveness; identify bottlenecks
Latency per task type Which agent types are slowest

Start simple: JSON lines and a few summary prints can take you very far. For deeper visibility, export span events to an APM and annotate prompts, usage, and results with task IDs and run IDs so you can trace cause and effect across the system.

8. Integration with CI/CD pipelines

Automation is most impactful when it’s safe and repeatable. Connecting your multi-agent workflow to CI/CD lets you run planners and implementers on branch pushes, open pull requests with proposed changes, and run reviewer agents to annotate diffs. Guardrails ensure non-deterministic steps don’t break your pipeline, and budgets prevent runaway costs.

Common CI patterns

  • Dry run vs live run: In PRs, run the planner and produce a plan and budget only. On main, allow implementers to apply code changes behind a flag.
  • Artifact storage: Upload task outputs, logs, and progress files as CI artifacts for auditability.
  • Budget gates: Fail the job if projected or actual credits exceed a threshold.
  • Prompt/test snapshots: Keep prompt templates and sample inputs under version control; test with fixtures for stability.

GitHub Actions example

The YAML below runs the planner on every pull request to estimate budgets and generate a task plan, then runs a limited implementer job for safe tasks only. On merge to main, it executes the full workflow with a higher concurrency limit.

# .github/workflows/agents.yml
name: Agents

on:
  pull_request:
    branches: [ main ]
  push:
    branches: [ main ]

jobs:
  plan:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"
      - name: Install deps
        run: |
          python -m pip install --upgrade pip
          pip install openai tiktoken
      - name: Plan tasks (dry run)
        env:
          OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
        run: |
          python -c "from planner import plan_tasks; \
goal='Refactor data loaders and add unit tests'; \
constraints='Python 3.11. Use pandas only if necessary.'; \
tasks=plan_tasks(goal,constraints,total_budget_credits=600); \
print('Planned', len(tasks), 'tasks'); \
[print(t) for t in tasks]"

  implement:
    if: github.event_name == 'push'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"
      - name: Install deps
        run: |
          python -m pip install --upgrade pip
          pip install openai tiktoken
      - name: Run workflow
        env:
          OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
        run: |
          python run_workflow.py
      - name: Upload artifacts
        uses: actions/upload-artifact@v4
        with:
          name: agent-artifacts
          path: |
            checkpoints.sqlite
            progress.jsonl

In more advanced pipelines, you might task a reviewer agent with evaluating diffs and leaving comments on PRs. A simple reviewer can run in CI with a read-only token and a low credit budget, summarizing issues and test coverage suggestions without making changes.

9. Key takeaways

  • Architecture matters: A Codex-style system is more than a prompt—it’s planner, implementers, reviewer, and orchestrator working together. Clear roles allow parallelism and make long runs reliable.
  • Parallelism compresses time: Async orchestration with dependency-aware scheduling turns day-long tasks into multi-hour runs without losing control.
  • Budgeting keeps you honest: A credit layer enforces spend at the task level, using token usage from API responses. Allocate budgets during planning; enforce during execution.
  • Checkpoints and retries are non-negotiable: Idempotency keys, exponential backoff, and a durable checkpoint store give you resilience across failures and restarts.
  • Monitoring builds trust: Progress events and metrics help you see what’s happening and spot problems quickly. Start with JSONL; evolve to full tracing.
  • CI/CD closes the loop: Running agents in your pipeline enables continuous refactoring, documentation generation, and test scaffolding—safely and repeatably.

With these patterns and the code above, you have a foundation for serious multi-agent automation. Expand the planner schema as your needs grow, add specialized agents for your domain, and refine prompts using your telemetry. The same infrastructure supports everything from codegen and test writing to data extraction and report generation. As you iterate, revisit the planner’s granularity, tighten budgets, and add more asserts to the reviewer to keep quality high. Codex Security Masterclass: 30 Production-Ready Prompts for Automated Vulnerability Scanning, Patch Generation, and Security Code Review

Access 40,000+ AI Prompts for ChatGPT, Claude & Codex — Free!

Subscribe to get instant access to our complete Notion Prompt Library — the largest curated collection of prompts for ChatGPT, Claude, OpenAI Codex, and other leading AI models. Optimized for real-world workflows across coding, research, content creation, and business.

Get Free Access Now →

Appendix: End-to-end example with task decomposition and reviews

To round things out, here’s a more complete example showing a reviewer agent and an extended planner schema. The reviewer checks outputs against acceptance criteria and either passes the task or requests a fix. In practice, the reviewer can annotate code diffs, scan for insecure patterns, or run quick static analysis prior to merging changes.

# reviewer.py
from typing import Dict, Any
from openai import OpenAI

client = OpenAI()

class ReviewerAgent:
    def __init__(self, criteria: str, strict: bool = True) -> None:
        self.criteria = criteria
        self.strict = strict

    def review(self, task_id: str, artifact: str) -> Dict[str, Any]:
        prompt = f"""You are a code reviewer. Review the artifact below against the criteria.

Criteria:
{self.criteria}

Artifact:
{artifact}

Return a short verdict (PASS or FAIL) and concrete reasons. If FAIL, explain what to fix."""

        resp = client.responses.create(
            model="gpt-4.1-mini",
            input=[
                {"role": "system", "content": [{"type": "text", "text": "You are a rigorous reviewer. Be concise and specific."}]},
                {"role": "user", "content": [{"type": "text", "text": prompt}]}
            ],
            metadata={"component": "reviewer", "task_id": task_id}
        )

        text = resp.output_text
        verdict = "PASS" if "PASS" in text[:100].upper() else "FAIL"
        return {"verdict": verdict, "text": text}

Now let’s sketch how we might thread the reviewer into the orchestrator, making certain tasks subject to approval. For simplicity, we’ll assume all tasks have the same criteria; in a real system, each task in the planner schema would carry its own acceptance criteria.

# run_workflow_with_review.py (diff from run_workflow.py)
from reviewer import ReviewerAgent

class LLMOrchestratorWithReview(LLMOrchestrator):
    def __init__(self, tasks, config, checkpoint_store, budget_manager, criteria: str):
        super().__init__(tasks, config, checkpoint_store, budget_manager)
        self.reviewer = ReviewerAgent(criteria=criteria)

    async def _run_one(self, task: Task):
        if task.status != "PENDING":
            return
        task.status = "RUNNING"
        self.progress.emit({"type": "task.start", "task_id": task.id, "name": task.name})
        self.checkpoints.upsert_task(task.id, task.name, task.status, task.attempts, task.result, task.error, None)
        await self.sem.acquire()
        try:
            for attempt in range(1, self.config.max_retries + 1):
                task.attempts = attempt
                agent = ImplementerAgent(task.agent_type, self.task_budgets[task.id], self.budget_manager)
                try:
                    out = await asyncio.to_thread(agent.run, task.id, task.description, attempt)
                    usage = out.get("usage")
                    review = await asyncio.to_thread(self.reviewer.review, task.id, out["content"])
                    if review["verdict"] == "PASS":
                        task.result = out["content"]
                        task.status = "DONE"
                        self.checkpoints.upsert_task(task.id, task.name, task.status, task.attempts, task.result, None, usage)
                        self.progress.emit({"type": "task.done", "task_id": task.id, "usage": usage, "review": "PASS"})
                        break
                    else:
                        self.progress.emit({"type": "task.review_fail", "task_id": task.id, "review": review["text"]})
                        raise RuntimeError("review failed")
                except Exception as e:
                    task.error = str(e)
                    self.checkpoints.upsert_task(task.id, task.name, "RUNNING", task.attempts, None, task.error, None)
                    self.progress.emit({"type": "task.retry", "task_id": task.id, "attempt": attempt, "msg": task.error})
                    if attempt >= self.config.max_retries:
                        task.status = "FAILED"
                        self.checkpoints.upsert_task(task.id, task.name, task.status, task.attempts, None, task.error, None)
                        self.progress.emit({"type": "task.failed", "task_id": task.id, "msg": task.error})
                        break
                    await asyncio.sleep(self.config.retry_backoff ** attempt)
        finally:
            self.sem.release()

In production, pair this with guardrails: a whitelist of files an agent is allowed to change, test runs before marking tasks as DONE, and a reviewer agent that rejects outputs lacking tests or violating linters. This turns your multi-agent system into a dependable collaborator rather than a risky black box.

Get Free Access to 40,000+ AI Prompts for ChatGPT, Claude & Codex

Subscribe for instant access to the largest curated Notion Prompt Library for AI workflows.

More on this