Skip to content
← All Examples
🏦

Finance: Transaction Fraud Detection

A payment processing pipeline where causal tracing connects transaction intake, risk scoring, compliance checks, and approval decisions into a fully auditable event graph.

Important
Financial regulations require institutions to explain why a transaction was approved or declined. PyRapide's causal graph provides a machine-readable audit trail that connects every decision to its contributing factors.

Architecture Overview

  • PaymentGateway receives incoming transactions and emits approval or decline decisions.
  • RiskEngine evaluates fraud risk scores, velocity checks, and flags suspicious activity.
  • ComplianceEngine performs AML and sanctions screening with hold/clear decisions.

Interfaces

fraud_interfaces.py python
1from pyrapide import interface, action, module, when
2from pyrapide import architecture, connect, Pattern, Engine
3from pyrapide import must_match, never
4import asyncio
5
6# ── Interfaces ──────────────────────────────────────────
7
8@interface
9class PaymentGateway:
10    """Processes incoming payment transactions."""
11    @action
12    async def transaction_received(self, tx_id: str, amount: float,
13                                    currency: str, merchant_id: str) -> None: ...
14    @action
15    async def transaction_approved(self, tx_id: str) -> None: ...
16    @action
17    async def transaction_declined(self, tx_id: str, reason: str) -> None: ...
18
19@interface
20class RiskEngine:
21    """Evaluates fraud risk for transactions."""
22    @action
23    async def risk_assessment(self, tx_id: str, risk_score: float,
24                               factors: list[str]) -> None: ...
25    @action
26    async def velocity_check(self, tx_id: str, merchant_id: str,
27                              count_1h: int) -> None: ...
28    @action
29    async def flag_suspicious(self, tx_id: str, reason: str) -> None: ...
30
31@interface
32class ComplianceEngine:
33    """Regulatory compliance checks."""
34    @action
35    async def aml_check(self, tx_id: str, status: str) -> None: ...
36    @action
37    async def sanctions_check(self, tx_id: str, status: str) -> None: ...
38    @action
39    async def compliance_hold(self, tx_id: str, reason: str) -> None: ...
40    @action
41    async def compliance_cleared(self, tx_id: str) -> None: ...

Module Logic

fraud_modules.py python
1# ── Modules ─────────────────────────────────────────────
2
3@module(implements=RiskEngine)
4class FraudDetector:
5    @when("PaymentGateway.transaction_received")
6    async def assess_risk(self, event):
7        tx = event.data
8        score = await self._calculate_risk(tx)
9        await self.risk_assessment(
10            tx_id=tx["tx_id"],
11            risk_score=score,
12            factors=self._risk_factors(tx)
13        )
14        if score > 0.8:
15            await self.flag_suspicious(
16                tx_id=tx["tx_id"],
17                reason=f"High risk score: {score:.2f}"
18            )
19
20    @when("PaymentGateway.transaction_received")
21    async def check_velocity(self, event):
22        tx = event.data
23        count = await self._count_recent(tx["merchant_id"])
24        await self.velocity_check(
25            tx_id=tx["tx_id"],
26            merchant_id=tx["merchant_id"],
27            count_1h=count
28        )
29
30@module(implements=ComplianceEngine)
31class RegulatoryChecker:
32    @when("RiskEngine.risk_assessment")
33    async def run_compliance(self, event):
34        tx_id = event.data["tx_id"]
35        await self.aml_check(tx_id=tx_id, status="passed")
36        await self.sanctions_check(tx_id=tx_id, status="passed")
37        await self.compliance_cleared(tx_id=tx_id)

Architecture and Constraints

The constraints encode regulatory and business rules:

  • tx_must_be_assessed: every transaction must receive a risk score.
  • flagged_must_clear_compliance: a flagged transaction must never be approved without compliance clearance.
  • tx_must_resolve: every transaction must end in approval or decline (no silent drops).
  • aml_required / sanctions_required: both compliance checks must complete.
fraud_architecture.py python
1# ── Architecture ────────────────────────────────────────
2
3@architecture
4class FraudDetectionSystem:
5    gateway: PaymentGateway
6    risk: RiskEngine
7    compliance: ComplianceEngine
8
9    def connections(self):
10        return [
11            # Payments flow to risk assessment
12            connect(Pattern.match("PaymentGateway.transaction_received"), "risk"),
13            # Risk results flow to compliance
14            connect(Pattern.match("RiskEngine.*"), "compliance"),
15            # Compliance results flow back to gateway for decision
16            connect(Pattern.match("ComplianceEngine.*"), "gateway"),
17        ]
18
19    def constraints(self):
20        return [
21            # Every transaction must receive a risk assessment
22            must_match(
23                trigger="PaymentGateway.transaction_received",
24                response="RiskEngine.risk_assessment",
25                name="tx_must_be_assessed"
26            ),
27
28            # A flagged transaction must never be approved
29            # without a compliance clearance in between
30            never(
31                pattern=("RiskEngine.flag_suspicious",
32                         "PaymentGateway.transaction_approved"),
33                unless="ComplianceEngine.compliance_cleared",
34                name="flagged_must_clear_compliance"
35            ),
36
37            # Every transaction must receive either
38            # approval or decline (no silent drops)
39            must_match(
40                trigger="PaymentGateway.transaction_received",
41                response=("PaymentGateway.transaction_approved",
42                          "PaymentGateway.transaction_declined"),
43                name="tx_must_resolve"
44            ),
45
46            # AML and sanctions checks must both complete
47            must_match(
48                trigger="RiskEngine.risk_assessment",
49                response="ComplianceEngine.aml_check",
50                name="aml_required"
51            ),
52            must_match(
53                trigger="RiskEngine.risk_assessment",
54                response="ComplianceEngine.sanctions_check",
55                name="sanctions_required"
56            ),
57        ]

Execution and Analysis

fraud_analysis.py python
1# ── Execute and Analyze ─────────────────────────────────
2
3async def main():
4    engine = Engine()
5    computation = await engine.run(FraudDetectionSystem)
6
7    from pyrapide import (
8        root_causes, forward_slice, critical_path,
9        backward_slice, bottleneck_events
10    )
11
12    # For every declined transaction, trace the root cause
13    declined = [e for e in computation.events
14                if e.name == "PaymentGateway.transaction_declined"]
15    for d in declined:
16        causes = root_causes(computation, d)
17        print(f"TX {d.data['tx_id']} declined because of:")
18        for c in causes:
19            print(f"  {c.name}: {c.data}")
20
21    # Find bottleneck events in the payment pipeline
22    bottlenecks = bottleneck_events(computation, threshold=0.3)
23    print("\nPipeline bottlenecks:")
24    for event, score in bottlenecks:
25        print(f"  {event.name}: {score:.0%} of paths")
26
27    # When fraud is detected, what is the blast radius?
28    flagged = [e for e in computation.events
29               if e.name == "RiskEngine.flag_suspicious"]
30    for f in flagged:
31        impact = forward_slice(computation, f)
32        print(f"\nFlag '{f.data['reason']}' affected {len(impact)} events")
33
34asyncio.run(main())
💡 Tip
Use backward_slice on a declined transaction to generate the complete regulatory explanation: every risk factor, every compliance check, and every data point that contributed to the decision.