🏦
Finance: Transaction Fraud Detection
A payment processing pipeline where causal tracing connects transaction intake, risk scoring, compliance checks, and approval decisions into a fully auditable event graph.
★ Important
Financial regulations require institutions to explain why a transaction was approved or declined. PyRapide's causal graph provides a machine-readable audit trail that connects every decision to its contributing factors.
Architecture Overview
- PaymentGateway receives incoming transactions and emits approval or decline decisions.
- RiskEngine evaluates fraud risk scores, velocity checks, and flags suspicious activity.
- ComplianceEngine performs AML and sanctions screening with hold/clear decisions.
Interfaces
fraud_interfaces.py python
1from pyrapide import interface, action, module, when
2from pyrapide import architecture, connect, Pattern, Engine
3from pyrapide import must_match, never
4import asyncio
5
6# ── Interfaces ──────────────────────────────────────────
7
8@interface
9class PaymentGateway:
10 """Processes incoming payment transactions."""
11 @action
12 async def transaction_received(self, tx_id: str, amount: float,
13 currency: str, merchant_id: str) -> None: ...
14 @action
15 async def transaction_approved(self, tx_id: str) -> None: ...
16 @action
17 async def transaction_declined(self, tx_id: str, reason: str) -> None: ...
18
19@interface
20class RiskEngine:
21 """Evaluates fraud risk for transactions."""
22 @action
23 async def risk_assessment(self, tx_id: str, risk_score: float,
24 factors: list[str]) -> None: ...
25 @action
26 async def velocity_check(self, tx_id: str, merchant_id: str,
27 count_1h: int) -> None: ...
28 @action
29 async def flag_suspicious(self, tx_id: str, reason: str) -> None: ...
30
31@interface
32class ComplianceEngine:
33 """Regulatory compliance checks."""
34 @action
35 async def aml_check(self, tx_id: str, status: str) -> None: ...
36 @action
37 async def sanctions_check(self, tx_id: str, status: str) -> None: ...
38 @action
39 async def compliance_hold(self, tx_id: str, reason: str) -> None: ...
40 @action
41 async def compliance_cleared(self, tx_id: str) -> None: ...
Module Logic
fraud_modules.py python
1# ── Modules ─────────────────────────────────────────────
2
3@module(implements=RiskEngine)
4class FraudDetector:
5 @when("PaymentGateway.transaction_received")
6 async def assess_risk(self, event):
7 tx = event.data
8 score = await self._calculate_risk(tx)
9 await self.risk_assessment(
10 tx_id=tx["tx_id"],
11 risk_score=score,
12 factors=self._risk_factors(tx)
13 )
14 if score > 0.8:
15 await self.flag_suspicious(
16 tx_id=tx["tx_id"],
17 reason=f"High risk score: {score:.2f}"
18 )
19
20 @when("PaymentGateway.transaction_received")
21 async def check_velocity(self, event):
22 tx = event.data
23 count = await self._count_recent(tx["merchant_id"])
24 await self.velocity_check(
25 tx_id=tx["tx_id"],
26 merchant_id=tx["merchant_id"],
27 count_1h=count
28 )
29
30@module(implements=ComplianceEngine)
31class RegulatoryChecker:
32 @when("RiskEngine.risk_assessment")
33 async def run_compliance(self, event):
34 tx_id = event.data["tx_id"]
35 await self.aml_check(tx_id=tx_id, status="passed")
36 await self.sanctions_check(tx_id=tx_id, status="passed")
37 await self.compliance_cleared(tx_id=tx_id)
Architecture and Constraints
The constraints encode regulatory and business rules:
- tx_must_be_assessed: every transaction must receive a risk score.
- flagged_must_clear_compliance: a flagged transaction must never be approved without compliance clearance.
- tx_must_resolve: every transaction must end in approval or decline (no silent drops).
- aml_required / sanctions_required: both compliance checks must complete.
fraud_architecture.py python
1# ── Architecture ────────────────────────────────────────
2
3@architecture
4class FraudDetectionSystem:
5 gateway: PaymentGateway
6 risk: RiskEngine
7 compliance: ComplianceEngine
8
9 def connections(self):
10 return [
11 # Payments flow to risk assessment
12 connect(Pattern.match("PaymentGateway.transaction_received"), "risk"),
13 # Risk results flow to compliance
14 connect(Pattern.match("RiskEngine.*"), "compliance"),
15 # Compliance results flow back to gateway for decision
16 connect(Pattern.match("ComplianceEngine.*"), "gateway"),
17 ]
18
19 def constraints(self):
20 return [
21 # Every transaction must receive a risk assessment
22 must_match(
23 trigger="PaymentGateway.transaction_received",
24 response="RiskEngine.risk_assessment",
25 name="tx_must_be_assessed"
26 ),
27
28 # A flagged transaction must never be approved
29 # without a compliance clearance in between
30 never(
31 pattern=("RiskEngine.flag_suspicious",
32 "PaymentGateway.transaction_approved"),
33 unless="ComplianceEngine.compliance_cleared",
34 name="flagged_must_clear_compliance"
35 ),
36
37 # Every transaction must receive either
38 # approval or decline (no silent drops)
39 must_match(
40 trigger="PaymentGateway.transaction_received",
41 response=("PaymentGateway.transaction_approved",
42 "PaymentGateway.transaction_declined"),
43 name="tx_must_resolve"
44 ),
45
46 # AML and sanctions checks must both complete
47 must_match(
48 trigger="RiskEngine.risk_assessment",
49 response="ComplianceEngine.aml_check",
50 name="aml_required"
51 ),
52 must_match(
53 trigger="RiskEngine.risk_assessment",
54 response="ComplianceEngine.sanctions_check",
55 name="sanctions_required"
56 ),
57 ]
Execution and Analysis
fraud_analysis.py python
1# ── Execute and Analyze ─────────────────────────────────
2
3async def main():
4 engine = Engine()
5 computation = await engine.run(FraudDetectionSystem)
6
7 from pyrapide import (
8 root_causes, forward_slice, critical_path,
9 backward_slice, bottleneck_events
10 )
11
12 # For every declined transaction, trace the root cause
13 declined = [e for e in computation.events
14 if e.name == "PaymentGateway.transaction_declined"]
15 for d in declined:
16 causes = root_causes(computation, d)
17 print(f"TX {d.data['tx_id']} declined because of:")
18 for c in causes:
19 print(f" {c.name}: {c.data}")
20
21 # Find bottleneck events in the payment pipeline
22 bottlenecks = bottleneck_events(computation, threshold=0.3)
23 print("\nPipeline bottlenecks:")
24 for event, score in bottlenecks:
25 print(f" {event.name}: {score:.0%} of paths")
26
27 # When fraud is detected, what is the blast radius?
28 flagged = [e for e in computation.events
29 if e.name == "RiskEngine.flag_suspicious"]
30 for f in flagged:
31 impact = forward_slice(computation, f)
32 print(f"\nFlag '{f.data['reason']}' affected {len(impact)} events")
33
34asyncio.run(main())
💡 Tip
Use
backward_slice on a declined transaction to generate the complete regulatory explanation: every risk factor, every compliance check, and every data point that contributed to the decision.