Skip to content

Streaming and Real-Time Processing

Production systems generate events continuously from multiple concurrent sources. PyRapide's streaming module lets you ingest, causally link, and constrain events in real time, without waiting for a batch to complete.

💡 Tip
Streaming is where PyRapide's causal model truly shines. Traditional log aggregators merge streams by timestamp; PyRapide merges them by causality, so concurrent events from independent sources stay correctly unordered.

StreamProcessor

The StreamProcessor is the central object for real-time event ingestion. You attach one or more event sources, define constraints and watch patterns, then call run() to begin processing.

stream_setup.py python
from pyrapide import StreamProcessor, InMemoryEventSource
from pyrapide import must_match, never, watch

processor = StreamProcessor()

Event Sources

An event source is any async iterable that yields events. PyRapide ships with InMemoryEventSource for testing and prototyping, and adapters like MCPEventAdapter and LLMEventAdapter for production integrations.

sources.py python
from pyrapide import InMemoryEventSource

source_a = InMemoryEventSource("sensor-cluster-a")
source_b = InMemoryEventSource("sensor-cluster-b")

processor.add_source("sensors_a", source_a)
processor.add_source("sensors_b", source_b)

Watch Patterns

Use watch to register callbacks that fire whenever a pattern is observed in the causal stream. Watch patterns operate on the causal graph, not on raw event order.

watch.py python
# React to any event matching a pattern
processor.watch(
    pattern="Sensor.reading",
    callback=lambda event: print(f"Got reading: {event}")
)

# Watch for causal sequences
processor.watch(
    pattern=("Sensor.reading", "Alerter.alert"),
    callback=lambda cause, effect: log_alert_chain(cause, effect)
)

Enforcing Constraints

Constraints defined with must_match and never are evaluated continuously as events arrive. A violation fires immediately; you do not have to wait until the stream ends.

constraints.py python
from pyrapide import must_match, never

# Every reading must eventually cause an alert
processor.enforce(must_match(
    trigger="Sensor.reading",
    response="Alerter.alert",
    name="reading_must_alert"
))

# Two alerts must never fire from the same reading
processor.enforce(never(
    pattern=("Alerter.alert", "Alerter.alert"),
    within_cause="Sensor.reading",
    name="no_double_alert"
))

Sliding Window

For long-running or unbounded streams, use the window_size parameter to limit how many events the processor keeps in memory. Older events outside the window are evicted, but their causal links to retained events are preserved as summary edges.

window.py python
# Keep the last 10,000 events in memory
await processor.run(window_size=10000)
i Note
The sliding window evicts events in causal order, not timestamp order. An old event that still has unresolved downstream constraints will be retained until those constraints are satisfied or violated.

Full Streaming Example

Here is a complete example that sets up a two-source streaming processor with constraints, watch patterns, and a sliding window.

streaming_full.py python
1from pyrapide import (
2    StreamProcessor, InMemoryEventSource,
3    must_match, never, watch
4)
5import asyncio
6
7async def main():
8    processor = StreamProcessor()
9
10    # Create event sources
11    sensors = InMemoryEventSource("sensors")
12    alerts = InMemoryEventSource("alerts")
13
14    processor.add_source("sensors", sensors)
15    processor.add_source("alerts", alerts)
16
17    # Watch for high-temperature readings
18    processor.watch(
19        pattern="Sensor.reading",
20        callback=lambda e: handle_reading(e)
21    )
22
23    # Constraint: every critical reading must cause an alert
24    processor.enforce(must_match(
25        trigger="Sensor.critical_reading",
26        response="Alerter.alert",
27        name="critical_must_alert"
28    ))
29
30    # Constraint: never two alerts from the same root cause
31    processor.enforce(never(
32        pattern=("Alerter.alert", "Alerter.alert"),
33        within_cause="Sensor.critical_reading",
34        name="no_duplicate_alerts"
35    ))
36
37    # Run with a sliding window of 5000 events
38    await processor.run(window_size=5000)
39
40def handle_reading(event):
41    if event.data.get("temperature", 0) > 100:
42        print(f"High temp detected: {event}")
43
44asyncio.run(main())
★ Important
The StreamProcessor is async-native. All sources are consumed concurrently via asyncio, and causal links are established as events arrive from any source.

Next Steps