🤖
Autonomous Agents: Multi-Agent Orchestration
A multi-agent system where a planner decomposes goals, an executor invokes tools, and a reviewer validates outcomes, all with full causal traceability across every decision, tool call, and retry loop.
★ Important
Autonomous agents that invoke tools, make plans, and retry on failure create complex causal chains that are impossible to debug from logs alone. PyRapide traces every step: which plan step caused which tool call, which failure triggered which retry, and whether the reviewer actually approved the final outcome.
Architecture Overview
- PlannerAgent receives high-level goals, generates step-by-step plans, and revises plans when reviews fail.
- ExecutorAgent executes plan steps by invoking tools and reports success or failure for each step.
- ReviewerAgent reviews execution outcomes, approves or rejects, and requests retries with guidance.
Interfaces
agent_interfaces.py python
1from pyrapide import interface, action, module, when
2from pyrapide import architecture, connect, Pattern, Engine
3from pyrapide import must_match, never
4import asyncio
5
6# ── Interfaces ──────────────────────────────────────────
7
8@interface
9class PlannerAgent:
10 """Decomposes high-level goals into executable steps."""
11 @action
12 async def goal_received(self, goal_id: str,
13 description: str) -> None: ...
14 @action
15 async def plan_generated(self, goal_id: str,
16 steps: list[dict]) -> None: ...
17 @action
18 async def plan_revised(self, goal_id: str,
19 reason: str,
20 new_steps: list[dict]) -> None: ...
21
22@interface
23class ExecutorAgent:
24 """Executes individual plan steps using tools."""
25 @action
26 async def step_started(self, goal_id: str, step_id: str,
27 action: str, tool: str) -> None: ...
28 @action
29 async def tool_invoked(self, step_id: str, tool: str,
30 params: dict) -> None: ...
31 @action
32 async def tool_result(self, step_id: str, tool: str,
33 result: dict, success: bool) -> None: ...
34 @action
35 async def step_completed(self, goal_id: str, step_id: str,
36 outcome: str) -> None: ...
37 @action
38 async def step_failed(self, goal_id: str, step_id: str,
39 error: str) -> None: ...
40
41@interface
42class ReviewerAgent:
43 """Reviews execution outcomes and decides next actions."""
44 @action
45 async def review_started(self, goal_id: str,
46 completed_steps: list[str]) -> None: ...
47 @action
48 async def review_passed(self, goal_id: str,
49 assessment: str) -> None: ...
50 @action
51 async def review_failed(self, goal_id: str,
52 issues: list[str]) -> None: ...
53 @action
54 async def goal_achieved(self, goal_id: str,
55 summary: str) -> None: ...
56 @action
57 async def retry_requested(self, goal_id: str,
58 failed_steps: list[str],
59 guidance: str) -> None: ...
Module Logic
The modules implement the feedback loop: the executor runs steps from the planner, the reviewer evaluates outcomes, and failures loop back for retries or plan revisions.
agent_modules.py python
1# ── Modules ─────────────────────────────────────────────
2
3@module(implements=ExecutorAgent)
4class ToolExecutor:
5 @when("PlannerAgent.plan_generated")
6 async def execute_plan(self, event):
7 goal_id = event.data["goal_id"]
8 for step in event.data["steps"]:
9 await self.step_started(
10 goal_id=goal_id,
11 step_id=step["id"],
12 action=step["action"],
13 tool=step["tool"]
14 )
15 await self.tool_invoked(
16 step_id=step["id"],
17 tool=step["tool"],
18 params=step.get("params", {})
19 )
20
21 @when("ReviewerAgent.retry_requested")
22 async def retry_steps(self, event):
23 goal_id = event.data["goal_id"]
24 for step_id in event.data["failed_steps"]:
25 await self.step_started(
26 goal_id=goal_id,
27 step_id=f"{step_id}_retry",
28 action="retry",
29 tool="auto"
30 )
31
32@module(implements=ReviewerAgent)
33class QualityReviewer:
34 @when("ExecutorAgent.step_completed")
35 async def check_step(self, event):
36 # Accumulate completed steps, trigger review when all done
37 pass
38
39 @when("ExecutorAgent.step_failed")
40 async def handle_failure(self, event):
41 data = event.data
42 await self.review_failed(
43 goal_id=data["goal_id"],
44 issues=[data["error"]]
45 )
46 await self.retry_requested(
47 goal_id=data["goal_id"],
48 failed_steps=[data["step_id"]],
49 guidance="Retry with alternative approach"
50 )
51
52@module(implements=PlannerAgent)
53class AdaptivePlanner:
54 @when("ReviewerAgent.review_failed")
55 async def revise_plan(self, event):
56 await self.plan_revised(
57 goal_id=event.data["goal_id"],
58 reason=f"Review failed: {event.data['issues']}",
59 new_steps=[{"id": "revised_1", "action": "alternative",
60 "tool": "fallback_tool"}]
61 )
Architecture and Constraints
The constraints enforce agent accountability and prevent runaway behavior:
- goal_must_produce_plan: every goal must be decomposed into a plan.
- step_must_resolve: every started step must complete or fail (no silent hangs).
- tool_must_return: every tool invocation must produce a result.
- failure_must_act: failed reviews must trigger a retry or plan revision.
- no_achievement_without_review: goals cannot be marked achieved without reviewer approval.
- max_retry_limit: prevents infinite retry loops (max 3 retries per goal).
agent_architecture.py python
1# ── Architecture ────────────────────────────────────────
2
3@architecture
4class AgentOrchestration:
5 planner: PlannerAgent
6 executor: ExecutorAgent
7 reviewer: ReviewerAgent
8
9 def connections(self):
10 return [
11 # Planner sends plans to executor
12 connect(Pattern.match("PlannerAgent.plan_generated"), "executor"),
13 connect(Pattern.match("PlannerAgent.plan_revised"), "executor"),
14 # Executor results go to reviewer
15 connect(Pattern.match("ExecutorAgent.step_completed"), "reviewer"),
16 connect(Pattern.match("ExecutorAgent.step_failed"), "reviewer"),
17 # Reviewer feedback loops to planner and executor
18 connect(Pattern.match("ReviewerAgent.review_failed"), "planner"),
19 connect(Pattern.match("ReviewerAgent.retry_requested"), "executor"),
20 ]
21
22 def constraints(self):
23 return [
24 # Every goal must produce a plan
25 must_match(
26 trigger="PlannerAgent.goal_received",
27 response="PlannerAgent.plan_generated",
28 name="goal_must_produce_plan"
29 ),
30
31 # Every plan step must complete or fail (no silent hangs)
32 must_match(
33 trigger="ExecutorAgent.step_started",
34 response=("ExecutorAgent.step_completed",
35 "ExecutorAgent.step_failed"),
36 name="step_must_resolve"
37 ),
38
39 # Every tool invocation must produce a result
40 must_match(
41 trigger="ExecutorAgent.tool_invoked",
42 response="ExecutorAgent.tool_result",
43 name="tool_must_return"
44 ),
45
46 # A failed review must trigger either a retry or plan revision
47 must_match(
48 trigger="ReviewerAgent.review_failed",
49 response=("ReviewerAgent.retry_requested",
50 "PlannerAgent.plan_revised"),
51 name="failure_must_act"
52 ),
53
54 # Never achieve a goal without reviewer approval
55 never(
56 pattern=("PlannerAgent.plan_generated",
57 "ReviewerAgent.goal_achieved"),
58 unless="ReviewerAgent.review_passed",
59 name="no_achievement_without_review"
60 ),
61
62 # Limit retry loops: never retry the same step
63 # more than 3 times
64 never(
65 pattern=("ReviewerAgent.retry_requested",
66 "ReviewerAgent.retry_requested",
67 "ReviewerAgent.retry_requested"),
68 condition=lambda e1, e2, e3: (
69 e1.data["goal_id"] == e2.data["goal_id"]
70 == e3.data["goal_id"]
71 ),
72 name="max_retry_limit"
73 ),
74 ]
Execution and Analysis
agent_analysis.py python
1# ── Execute and Analyze Agent Behavior ──────────────────
2
3async def main():
4 engine = Engine()
5 computation = await engine.run(AgentOrchestration)
6
7 from pyrapide import (
8 root_causes, forward_slice, backward_slice,
9 critical_path, bottleneck_events, causal_distance
10 )
11
12 # Trace the full causal chain for a goal achievement
13 achieved = [e for e in computation.events
14 if e.name == "ReviewerAgent.goal_achieved"]
15
16 for goal in achieved:
17 chain = backward_slice(computation, goal)
18 tools = [e for e in chain
19 if e.name == "ExecutorAgent.tool_invoked"]
20 retries = [e for e in chain
21 if e.name == "ReviewerAgent.retry_requested"]
22 print(f"Goal '{goal.data['summary']}':")
23 print(f" Total causal chain: {len(chain)} events")
24 print(f" Tool invocations: {len(tools)}")
25 print(f" Retries needed: {len(retries)}")
26
27 # Find the critical path (longest causal chain)
28 path = critical_path(computation)
29 print(f"\nCritical path ({len(path)} events):")
30 for i, event in enumerate(path):
31 print(f" {i+1}. {event.name}")
32
33 # When a step fails, what is the blast radius?
34 failures = [e for e in computation.events
35 if e.name == "ExecutorAgent.step_failed"]
36 for f in failures:
37 impact = forward_slice(computation, f)
38 print(f"\nStep failure '{f.data['error']}' caused "
39 f"{len(impact)} downstream events")
40
41 # Identify bottleneck tools
42 bottlenecks = bottleneck_events(computation, threshold=0.3)
43 print("\nBottleneck events:")
44 for event, score in bottlenecks:
45 print(f" {event.name}: {score:.0%} of causal paths")
46
47asyncio.run(main())
💡 Tip
The
max_retry_limit constraint prevents agents from entering infinite loops. When violated, PyRapide identifies exactly which goal, which step, and which sequence of failures caused the loop, giving you the information needed to improve the agent's planning.i Note
This pattern works with any agent framework. Wrap your existing agent's tool calls with PyRapide's
MCPEventAdapter or LLMEventAdapter to get causal tracing without changing your agent's logic.