Examples

This page provides practical examples of using Cohomological Risk Scoring.

Example 1: Fraud Ring Detection

Detecting a circular transaction pattern (money laundering loop):

import numpy as np
from cohomological_risk_scoring import PCRScorer

# Create a network with a fraud ring (5-cycle)
n_nodes = 20
features = np.random.randn(n_nodes, 4)

# Add normal transactions
edges = [(i, j) for i in range(n_nodes)
         for j in range(i+1, n_nodes)
         if np.random.random() > 0.8]

# Add fraud ring: nodes 0->5->10->15->19->0
fraud_ring = [(0, 5), (5, 10), (10, 15), (15, 19), (19, 0)]
edges.extend(fraud_ring)

# Manipulate features to create inconsistency
features[fraud_ring[0][0]] *= 2  # Inconsistent profiles

# Detect fraud
scorer = PCRScorer()
scorer.fit(features, edges)
risk_scores = scorer.compute_all_scores()

# Fraud ring members should have high scores
print("Risk scores for fraud ring:")
for node in [0, 5, 10, 15, 19]:
    print(f"Node {node}: {risk_scores[node]:.3f}")

Example 2: Transaction Network Analysis

Analyzing a transaction network from CSV data:

import pandas as pd
from cohomological_risk_scoring import PCRScorer

# Load transaction data
df = pd.read_csv('transactions.csv')

# Build adjacency and features
entities = pd.concat([df['sender'], df['receiver']]).unique()
entity_to_id = {e: i for i, e in enumerate(entities)}

edges = [(entity_to_id[row['sender']],
          entity_to_id[row['receiver']])
         for _, row in df.iterrows()]

# Extract features per entity
features = []
for entity in entities:
    sent = df[df['sender'] == entity]['amount'].sum()
    received = df[df['receiver'] == entity]['amount'].sum()
    count = len(df[(df['sender'] == entity) |
                   (df['receiver'] == entity)])
    features.append([sent, received, count, sent - received])

features = np.array(features)

# Compute risk
scorer = PCRScorer()
scorer.fit(features, edges)
scores = scorer.compute_all_scores()
classes = scorer.get_risk_classes()

# Report high-risk entities
print("High-risk entities:")
for idx in classes['high']:
    print(f"{entities[idx]}: {scores[idx]:.3f}")

Example 3: Visualization and Reporting

from cohomological_risk_scoring import PCRScorer
import matplotlib.pyplot as plt

# Fit model (using data from previous examples)
scorer = PCRScorer()
scorer.fit(features, edges)

# Visualize persistence diagram
scorer.visualize_persistence()
plt.savefig('persistence_diagram.png')

# Generate detailed report
report = scorer.generate_report()
print(report)

# Extract specific cohomology classes
sheaf = scorer.sheaf
cohomology = sheaf.compute_cohomology()

print(f"Dimension of H^1: {cohomology['rank']}")
print(f"Number of risk classes: {len(cohomology['generators'])}")

Example 4: Advanced Configuration

Customizing the financial sheaf and filtration:

from cohomological_risk_scoring import FinancialSheaf
import networkx as nx

# Create custom graph
G = nx.karate_club_graph()

# Define custom vertex features
features = np.array([[G.degree(n),
                      nx.clustering(G, n),
                      nx.betweenness_centrality(G)[n]]
                     for n in G.nodes()])

# Create sheaf with custom parameters
sheaf = FinancialSheaf(
    vertex_features=features,
    edges=list(G.edges()),
    max_filtration=2.0
)

# Build and analyze
sheaf.build_complex_from_graph()
sheaf.define_sheaf_data()
cohomology = sheaf.compute_persistent_cohomology()
scores = sheaf.compute_pcr_score()

print(f"PCR Scores: {scores}")

Example 5: Real-Time Monitoring

Streaming risk assessment for incoming transactions:

class RiskMonitor:
    def __init__(self):
        self.scorer = PCRScorer()
        self.features = None
        self.edges = []

    def update(self, new_transaction):
        """Add new transaction and recompute risks"""
        sender, receiver, amount = new_transaction
        self.edges.append((sender, receiver))

        # Update features (simplified)
        if self.features is None:
            n = max(sender, receiver) + 1
            self.features = np.zeros((n, 4))

        self.features[sender][0] += amount  # total sent
        self.features[receiver][1] += amount  # total received

        # Refit (in production, use incremental updates)
        self.scorer.fit(self.features, self.edges)

        return self.scorer.compute_all_scores()

# Usage
monitor = RiskMonitor()
for transaction in incoming_stream:
    scores = monitor.update(transaction)
    # Alert if high risk detected
    if max(scores) > threshold:
        alert_compliance_team(scores)