Skip to content
← All Examples
🏭

Manufacturing: Predictive Maintenance

A predictive maintenance system where causal tracing connects equipment sensor anomalies to product quality defects and maintenance scheduling, closing the loop between machine health and product quality.

Important
When a defective batch is found at inspection, the critical question is: which machine caused this, and what sensor reading predicted it? PyRapide's causal graph traces defects back through the exact sensor anomalies and machine states that produced them.

Architecture Overview

  • MachineSensor covers vibration, temperature, and pressure sensors on production equipment, with anomaly detection.
  • QualityInspector represents inspection stations that check batch quality and flag defects with producing machine IDs.
  • MaintenanceSystem schedules, tracks, and clears predictive and corrective maintenance.

Interfaces

mfg_interfaces.py python
1from pyrapide import interface, action, module, when
2from pyrapide import architecture, connect, Pattern, Engine
3from pyrapide import must_match, never
4import asyncio
5
6# ── Interfaces ──────────────────────────────────────────
7
8@interface
9class MachineSensor:
10    """Equipment-mounted sensor array."""
11    @action
12    async def vibration_reading(self, machine_id: str,
13                                 frequency_hz: float,
14                                 amplitude: float) -> None: ...
15    @action
16    async def temperature_reading(self, machine_id: str,
17                                   temp_c: float) -> None: ...
18    @action
19    async def pressure_reading(self, machine_id: str,
20                                pressure_psi: float) -> None: ...
21    @action
22    async def anomaly_detected(self, machine_id: str,
23                                sensor_type: str,
24                                deviation: float) -> None: ...
25
26@interface
27class QualityInspector:
28    """Product quality inspection station."""
29    @action
30    async def inspection_started(self, batch_id: str,
31                                  station_id: str) -> None: ...
32    @action
33    async def defect_found(self, batch_id: str, defect_type: str,
34                            severity: str, machine_id: str) -> None: ...
35    @action
36    async def batch_passed(self, batch_id: str) -> None: ...
37    @action
38    async def batch_rejected(self, batch_id: str,
39                              reason: str) -> None: ...
40
41@interface
42class MaintenanceSystem:
43    """Predictive maintenance scheduling and execution."""
44    @action
45    async def maintenance_scheduled(self, machine_id: str,
46                                     type: str, priority: str,
47                                     reason: str) -> None: ...
48    @action
49    async def maintenance_started(self, machine_id: str,
50                                   technician_id: str) -> None: ...
51    @action
52    async def maintenance_completed(self, machine_id: str,
53                                     actions_taken: list[str]) -> None: ...
54    @action
55    async def machine_cleared(self, machine_id: str) -> None: ...

Module Logic

mfg_modules.py python
1# ── Modules ─────────────────────────────────────────────
2
3@module(implements=MaintenanceSystem)
4class PredictiveMaintenance:
5    @when("MachineSensor.anomaly_detected")
6    async def schedule_from_anomaly(self, event):
7        data = event.data
8        await self.maintenance_scheduled(
9            machine_id=data["machine_id"],
10            type="predictive",
11            priority="high" if data["deviation"] > 3.0 else "medium",
12            reason=f"{data['sensor_type']} deviation: {data['deviation']:.1f} sigma"
13        )
14
15    @when("QualityInspector.defect_found")
16    async def schedule_from_defect(self, event):
17        data = event.data
18        if data["severity"] == "critical":
19            await self.maintenance_scheduled(
20                machine_id=data["machine_id"],
21                type="corrective",
22                priority="urgent",
23                reason=f"Critical defect: {data['defect_type']}"
24            )
25
26@module(implements=QualityInspector)
27class QualityControl:
28    @when("MachineSensor.anomaly_detected")
29    async def flag_batch_for_inspection(self, event):
30        # When a machine shows anomalies, inspect its output
31        await self.inspection_started(
32            batch_id=f"batch_{event.data['machine_id']}_current",
33            station_id="station_1"
34        )

Architecture and Constraints

  • anomaly_must_schedule_maintenance: sensor anomalies must trigger maintenance scheduling.
  • no_clearance_without_maintenance: machines cannot be cleared without completed maintenance.
  • critical_defect_must_maintain: critical quality defects must trigger maintenance.
  • no_pass_with_anomaly: batches must not pass inspection if the machine has unresolved anomalies.
  • maintenance_must_complete: started maintenance must be completed.
mfg_architecture.py python
1# ── Architecture ────────────────────────────────────────
2
3@architecture
4class PredictiveMaintenanceSystem:
5    sensors: MachineSensor
6    quality: QualityInspector
7    maintenance: MaintenanceSystem
8
9    def connections(self):
10        return [
11            # Sensor anomalies trigger quality checks and maintenance
12            connect(Pattern.match("MachineSensor.anomaly_detected"),
13                    "quality"),
14            connect(Pattern.match("MachineSensor.anomaly_detected"),
15                    "maintenance"),
16            # Quality defects trigger maintenance
17            connect(Pattern.match("QualityInspector.defect_found"),
18                    "maintenance"),
19            # Maintenance clearance feeds back to quality
20            connect(Pattern.match("MaintenanceSystem.machine_cleared"),
21                    "quality"),
22        ]
23
24    def constraints(self):
25        return [
26            # Every sensor anomaly must trigger a maintenance schedule
27            must_match(
28                trigger="MachineSensor.anomaly_detected",
29                response="MaintenanceSystem.maintenance_scheduled",
30                name="anomaly_must_schedule_maintenance"
31            ),
32
33            # A machine must never resume production (cleared)
34            # without maintenance being completed first
35            never(
36                pattern=("MaintenanceSystem.maintenance_scheduled",
37                         "MaintenanceSystem.machine_cleared"),
38                unless="MaintenanceSystem.maintenance_completed",
39                name="no_clearance_without_maintenance"
40            ),
41
42            # Every critical defect must cause urgent maintenance
43            must_match(
44                trigger="QualityInspector.defect_found",
45                response="MaintenanceSystem.maintenance_scheduled",
46                condition=lambda e: e.data["severity"] == "critical",
47                name="critical_defect_must_maintain"
48            ),
49
50            # A batch must never pass inspection if the producing
51            # machine has an unresolved anomaly
52            never(
53                pattern=("MachineSensor.anomaly_detected",
54                         "QualityInspector.batch_passed"),
55                unless="MaintenanceSystem.machine_cleared",
56                name="no_pass_with_anomaly"
57            ),
58
59            # Every maintenance action must complete
60            must_match(
61                trigger="MaintenanceSystem.maintenance_started",
62                response="MaintenanceSystem.maintenance_completed",
63                name="maintenance_must_complete"
64            ),
65        ]

Execution and Analysis

mfg_analysis.py python
1# ── Execute and Analyze ─────────────────────────────────
2
3async def main():
4    engine = Engine()
5    computation = await engine.run(PredictiveMaintenanceSystem)
6
7    from pyrapide import (
8        root_causes, forward_slice, backward_slice,
9        common_ancestors, bottleneck_events, causal_distance
10    )
11
12    # Link product defects back to machine sensor anomalies
13    defects = [e for e in computation.events
14               if e.name == "QualityInspector.defect_found"]
15
16    for defect in defects:
17        causes = root_causes(computation, defect)
18        sensor_causes = [c for c in causes
19                        if c.name.startswith("MachineSensor.")]
20        print(f"Defect '{defect.data['defect_type']}' on batch "
21              f"{defect.data['batch_id']}:")
22        for sc in sensor_causes:
23            dist = causal_distance(computation, sc, defect)
24            print(f"  Caused by {sc.name} ({dist} steps)")
25            print(f"    Machine: {sc.data['machine_id']}")
26            print(f"    Deviation: {sc.data.get('deviation', 'N/A')}")
27
28    # Identify machines that are bottlenecks for quality issues
29    bottlenecks = bottleneck_events(computation, threshold=0.3)
30    print("\nMaintenance bottlenecks:")
31    for event, score in bottlenecks:
32        print(f"  {event.name} ({event.data.get('machine_id', '')}): "
33              f"{score:.0%}")
34
35    # For two defects on different batches, find if they share
36    # a common root cause (same machine, same sensor anomaly)
37    if len(defects) >= 2:
38        shared = common_ancestors(computation, defects[0], defects[1])
39        if shared:
40            print(f"\nDefects share {len(shared)} common ancestors")
41
42asyncio.run(main())
💡 Tip
Use common_ancestors to discover whether two defects on different batches share a root cause. This reveals systemic machine issues that batch-by-batch inspection cannot detect.