Streaming and Real-Time Processing
Production systems generate events continuously from multiple concurrent sources. PyRapide's streaming module lets you ingest, causally link, and constrain events in real time, without waiting for a batch to complete.
StreamProcessor
The StreamProcessor is the central object for real-time event ingestion. You attach one or more event sources, define constraints and watch patterns, then call run() to begin processing.
from pyrapide import StreamProcessor, InMemoryEventSource
from pyrapide import must_match, never, watch
processor = StreamProcessor()
Event Sources
An event source is any async iterable that yields events. PyRapide ships with InMemoryEventSource for testing and prototyping, and adapters like MCPEventAdapter and LLMEventAdapter for production integrations.
from pyrapide import InMemoryEventSource
source_a = InMemoryEventSource("sensor-cluster-a")
source_b = InMemoryEventSource("sensor-cluster-b")
processor.add_source("sensors_a", source_a)
processor.add_source("sensors_b", source_b)
Watch Patterns
Use watch to register callbacks that fire whenever a pattern is observed in the causal stream. Watch patterns operate on the causal graph, not on raw event order.
# React to any event matching a pattern
processor.watch(
pattern="Sensor.reading",
callback=lambda event: print(f"Got reading: {event}")
)
# Watch for causal sequences
processor.watch(
pattern=("Sensor.reading", "Alerter.alert"),
callback=lambda cause, effect: log_alert_chain(cause, effect)
)
Enforcing Constraints
Constraints defined with must_match and never are evaluated continuously as events arrive. A violation fires immediately; you do not have to wait until the stream ends.
from pyrapide import must_match, never
# Every reading must eventually cause an alert
processor.enforce(must_match(
trigger="Sensor.reading",
response="Alerter.alert",
name="reading_must_alert"
))
# Two alerts must never fire from the same reading
processor.enforce(never(
pattern=("Alerter.alert", "Alerter.alert"),
within_cause="Sensor.reading",
name="no_double_alert"
))
Sliding Window
For long-running or unbounded streams, use the window_size parameter to limit how many events the processor keeps in memory. Older events outside the window are evicted, but their causal links to retained events are preserved as summary edges.
# Keep the last 10,000 events in memory
await processor.run(window_size=10000)
Full Streaming Example
Here is a complete example that sets up a two-source streaming processor with constraints, watch patterns, and a sliding window.
1from pyrapide import (
2 StreamProcessor, InMemoryEventSource,
3 must_match, never, watch
4)
5import asyncio
6
7async def main():
8 processor = StreamProcessor()
9
10 # Create event sources
11 sensors = InMemoryEventSource("sensors")
12 alerts = InMemoryEventSource("alerts")
13
14 processor.add_source("sensors", sensors)
15 processor.add_source("alerts", alerts)
16
17 # Watch for high-temperature readings
18 processor.watch(
19 pattern="Sensor.reading",
20 callback=lambda e: handle_reading(e)
21 )
22
23 # Constraint: every critical reading must cause an alert
24 processor.enforce(must_match(
25 trigger="Sensor.critical_reading",
26 response="Alerter.alert",
27 name="critical_must_alert"
28 ))
29
30 # Constraint: never two alerts from the same root cause
31 processor.enforce(never(
32 pattern=("Alerter.alert", "Alerter.alert"),
33 within_cause="Sensor.critical_reading",
34 name="no_duplicate_alerts"
35 ))
36
37 # Run with a sliding window of 5000 events
38 await processor.run(window_size=5000)
39
40def handle_reading(event):
41 if event.data.get("temperature", 0) > 100:
42 print(f"High temp detected: {event}")
43
44asyncio.run(main())
StreamProcessor is async-native. All sources are consumed concurrently via asyncio, and causal links are established as events arrive from any source.Next Steps
- MCP Server Integration: attach MCP servers as streaming event sources
- LLM Integration: stream LLM request/response events
- Constraints: deep dive into
must_matchandnever