Skip to content
← All Examples
🤖

Autonomous Agents: Multi-Agent Orchestration

A multi-agent system where a planner decomposes goals, an executor invokes tools, and a reviewer validates outcomes, all with full causal traceability across every decision, tool call, and retry loop.

Important
Autonomous agents that invoke tools, make plans, and retry on failure create complex causal chains that are impossible to debug from logs alone. PyRapide traces every step: which plan step caused which tool call, which failure triggered which retry, and whether the reviewer actually approved the final outcome.

Architecture Overview

  • PlannerAgent receives high-level goals, generates step-by-step plans, and revises plans when reviews fail.
  • ExecutorAgent executes plan steps by invoking tools and reports success or failure for each step.
  • ReviewerAgent reviews execution outcomes, approves or rejects, and requests retries with guidance.

Interfaces

agent_interfaces.py python
1from pyrapide import interface, action, module, when
2from pyrapide import architecture, connect, Pattern, Engine
3from pyrapide import must_match, never
4import asyncio
5
6# ── Interfaces ──────────────────────────────────────────
7
8@interface
9class PlannerAgent:
10    """Decomposes high-level goals into executable steps."""
11    @action
12    async def goal_received(self, goal_id: str,
13                             description: str) -> None: ...
14    @action
15    async def plan_generated(self, goal_id: str,
16                              steps: list[dict]) -> None: ...
17    @action
18    async def plan_revised(self, goal_id: str,
19                            reason: str,
20                            new_steps: list[dict]) -> None: ...
21
22@interface
23class ExecutorAgent:
24    """Executes individual plan steps using tools."""
25    @action
26    async def step_started(self, goal_id: str, step_id: str,
27                            action: str, tool: str) -> None: ...
28    @action
29    async def tool_invoked(self, step_id: str, tool: str,
30                            params: dict) -> None: ...
31    @action
32    async def tool_result(self, step_id: str, tool: str,
33                           result: dict, success: bool) -> None: ...
34    @action
35    async def step_completed(self, goal_id: str, step_id: str,
36                              outcome: str) -> None: ...
37    @action
38    async def step_failed(self, goal_id: str, step_id: str,
39                           error: str) -> None: ...
40
41@interface
42class ReviewerAgent:
43    """Reviews execution outcomes and decides next actions."""
44    @action
45    async def review_started(self, goal_id: str,
46                              completed_steps: list[str]) -> None: ...
47    @action
48    async def review_passed(self, goal_id: str,
49                             assessment: str) -> None: ...
50    @action
51    async def review_failed(self, goal_id: str,
52                             issues: list[str]) -> None: ...
53    @action
54    async def goal_achieved(self, goal_id: str,
55                             summary: str) -> None: ...
56    @action
57    async def retry_requested(self, goal_id: str,
58                               failed_steps: list[str],
59                               guidance: str) -> None: ...

Module Logic

The modules implement the feedback loop: the executor runs steps from the planner, the reviewer evaluates outcomes, and failures loop back for retries or plan revisions.

agent_modules.py python
1# ── Modules ─────────────────────────────────────────────
2
3@module(implements=ExecutorAgent)
4class ToolExecutor:
5    @when("PlannerAgent.plan_generated")
6    async def execute_plan(self, event):
7        goal_id = event.data["goal_id"]
8        for step in event.data["steps"]:
9            await self.step_started(
10                goal_id=goal_id,
11                step_id=step["id"],
12                action=step["action"],
13                tool=step["tool"]
14            )
15            await self.tool_invoked(
16                step_id=step["id"],
17                tool=step["tool"],
18                params=step.get("params", {})
19            )
20
21    @when("ReviewerAgent.retry_requested")
22    async def retry_steps(self, event):
23        goal_id = event.data["goal_id"]
24        for step_id in event.data["failed_steps"]:
25            await self.step_started(
26                goal_id=goal_id,
27                step_id=f"{step_id}_retry",
28                action="retry",
29                tool="auto"
30            )
31
32@module(implements=ReviewerAgent)
33class QualityReviewer:
34    @when("ExecutorAgent.step_completed")
35    async def check_step(self, event):
36        # Accumulate completed steps, trigger review when all done
37        pass
38
39    @when("ExecutorAgent.step_failed")
40    async def handle_failure(self, event):
41        data = event.data
42        await self.review_failed(
43            goal_id=data["goal_id"],
44            issues=[data["error"]]
45        )
46        await self.retry_requested(
47            goal_id=data["goal_id"],
48            failed_steps=[data["step_id"]],
49            guidance="Retry with alternative approach"
50        )
51
52@module(implements=PlannerAgent)
53class AdaptivePlanner:
54    @when("ReviewerAgent.review_failed")
55    async def revise_plan(self, event):
56        await self.plan_revised(
57            goal_id=event.data["goal_id"],
58            reason=f"Review failed: {event.data['issues']}",
59            new_steps=[{"id": "revised_1", "action": "alternative",
60                        "tool": "fallback_tool"}]
61        )

Architecture and Constraints

The constraints enforce agent accountability and prevent runaway behavior:

  • goal_must_produce_plan: every goal must be decomposed into a plan.
  • step_must_resolve: every started step must complete or fail (no silent hangs).
  • tool_must_return: every tool invocation must produce a result.
  • failure_must_act: failed reviews must trigger a retry or plan revision.
  • no_achievement_without_review: goals cannot be marked achieved without reviewer approval.
  • max_retry_limit: prevents infinite retry loops (max 3 retries per goal).
agent_architecture.py python
1# ── Architecture ────────────────────────────────────────
2
3@architecture
4class AgentOrchestration:
5    planner: PlannerAgent
6    executor: ExecutorAgent
7    reviewer: ReviewerAgent
8
9    def connections(self):
10        return [
11            # Planner sends plans to executor
12            connect(Pattern.match("PlannerAgent.plan_generated"), "executor"),
13            connect(Pattern.match("PlannerAgent.plan_revised"), "executor"),
14            # Executor results go to reviewer
15            connect(Pattern.match("ExecutorAgent.step_completed"), "reviewer"),
16            connect(Pattern.match("ExecutorAgent.step_failed"), "reviewer"),
17            # Reviewer feedback loops to planner and executor
18            connect(Pattern.match("ReviewerAgent.review_failed"), "planner"),
19            connect(Pattern.match("ReviewerAgent.retry_requested"), "executor"),
20        ]
21
22    def constraints(self):
23        return [
24            # Every goal must produce a plan
25            must_match(
26                trigger="PlannerAgent.goal_received",
27                response="PlannerAgent.plan_generated",
28                name="goal_must_produce_plan"
29            ),
30
31            # Every plan step must complete or fail (no silent hangs)
32            must_match(
33                trigger="ExecutorAgent.step_started",
34                response=("ExecutorAgent.step_completed",
35                          "ExecutorAgent.step_failed"),
36                name="step_must_resolve"
37            ),
38
39            # Every tool invocation must produce a result
40            must_match(
41                trigger="ExecutorAgent.tool_invoked",
42                response="ExecutorAgent.tool_result",
43                name="tool_must_return"
44            ),
45
46            # A failed review must trigger either a retry or plan revision
47            must_match(
48                trigger="ReviewerAgent.review_failed",
49                response=("ReviewerAgent.retry_requested",
50                          "PlannerAgent.plan_revised"),
51                name="failure_must_act"
52            ),
53
54            # Never achieve a goal without reviewer approval
55            never(
56                pattern=("PlannerAgent.plan_generated",
57                         "ReviewerAgent.goal_achieved"),
58                unless="ReviewerAgent.review_passed",
59                name="no_achievement_without_review"
60            ),
61
62            # Limit retry loops: never retry the same step
63            # more than 3 times
64            never(
65                pattern=("ReviewerAgent.retry_requested",
66                         "ReviewerAgent.retry_requested",
67                         "ReviewerAgent.retry_requested"),
68                condition=lambda e1, e2, e3: (
69                    e1.data["goal_id"] == e2.data["goal_id"]
70                    == e3.data["goal_id"]
71                ),
72                name="max_retry_limit"
73            ),
74        ]

Execution and Analysis

agent_analysis.py python
1# ── Execute and Analyze Agent Behavior ──────────────────
2
3async def main():
4    engine = Engine()
5    computation = await engine.run(AgentOrchestration)
6
7    from pyrapide import (
8        root_causes, forward_slice, backward_slice,
9        critical_path, bottleneck_events, causal_distance
10    )
11
12    # Trace the full causal chain for a goal achievement
13    achieved = [e for e in computation.events
14                if e.name == "ReviewerAgent.goal_achieved"]
15
16    for goal in achieved:
17        chain = backward_slice(computation, goal)
18        tools = [e for e in chain
19                if e.name == "ExecutorAgent.tool_invoked"]
20        retries = [e for e in chain
21                  if e.name == "ReviewerAgent.retry_requested"]
22        print(f"Goal '{goal.data['summary']}':")
23        print(f"  Total causal chain: {len(chain)} events")
24        print(f"  Tool invocations: {len(tools)}")
25        print(f"  Retries needed: {len(retries)}")
26
27    # Find the critical path (longest causal chain)
28    path = critical_path(computation)
29    print(f"\nCritical path ({len(path)} events):")
30    for i, event in enumerate(path):
31        print(f"  {i+1}. {event.name}")
32
33    # When a step fails, what is the blast radius?
34    failures = [e for e in computation.events
35                if e.name == "ExecutorAgent.step_failed"]
36    for f in failures:
37        impact = forward_slice(computation, f)
38        print(f"\nStep failure '{f.data['error']}' caused "
39              f"{len(impact)} downstream events")
40
41    # Identify bottleneck tools
42    bottlenecks = bottleneck_events(computation, threshold=0.3)
43    print("\nBottleneck events:")
44    for event, score in bottlenecks:
45        print(f"  {event.name}: {score:.0%} of causal paths")
46
47asyncio.run(main())
💡 Tip
The max_retry_limit constraint prevents agents from entering infinite loops. When violated, PyRapide identifies exactly which goal, which step, and which sequence of failures caused the loop, giving you the information needed to improve the agent's planning.
i Note
This pattern works with any agent framework. Wrap your existing agent's tool calls with PyRapide's MCPEventAdapter or LLMEventAdapter to get causal tracing without changing your agent's logic.