Source code for scr_financial.network.coarse_graining

"""
Spectral coarse-graining for financial networks.

This module implements spectral coarse-graining techniques for simplifying
financial networks while preserving their essential structural and dynamic
properties.
"""

import logging

import numpy as np
import networkx as nx
import scipy.sparse as sp
import scipy.linalg as la
from scipy.linalg import expm
from typing import Dict, List, Optional, Tuple, Union
from sklearn.cluster import KMeans, SpectralClustering

from .builder import FinancialNetworkBuilder

logger = logging.getLogger(__name__)


class _AdjacencyAdapter:
    """Lightweight adapter mimicking FinancialNetworkBuilder for SCG from a raw adjacency matrix."""

    def __init__(self, adj_matrix: np.ndarray, node_ids: List[str]) -> None:
        n = adj_matrix.shape[0]
        self.G = nx.DiGraph()
        for i, nid in enumerate(node_ids):
            self.G.add_node(nid)
        for i in range(n):
            for j in range(n):
                if adj_matrix[i, j] > 0:
                    self.G.add_edge(node_ids[i], node_ids[j], weight=adj_matrix[i, j])

        adj_sym = (adj_matrix + adj_matrix.T) / 2.0
        self.adjacency_matrix = sp.csr_matrix(adj_sym)
        degree = np.sum(adj_sym, axis=1)
        self.laplacian = sp.csr_matrix(np.diag(degree) - adj_sym)
        self.eigenvalues: Optional[np.ndarray] = None
        self.eigenvectors: Optional[np.ndarray] = None

    def spectral_analysis(self) -> Tuple[np.ndarray, np.ndarray]:
        L_dense = np.asarray(self.laplacian.todense())
        self.eigenvalues, self.eigenvectors = la.eigh(L_dense)
        return self.eigenvalues, self.eigenvectors

    def find_spectral_gap(self) -> Tuple[int, float]:
        if self.eigenvalues is None:
            self.spectral_analysis()
        gaps = np.diff(self.eigenvalues)
        k = np.argmax(gaps[1:]) + 1
        return k, gaps[k]


[docs] class SpectralCoarseGraining: """ Implements spectral coarse-graining for financial networks. Attributes: coarse_grained_laplacian: Coarse-grained Laplacian matrix. coarse_grained_adjacency: Coarse-grained adjacency matrix. rescaled_adjacency: Rescaled coarse-grained adjacency matrix. clusters: Cluster assignments for nodes. """
[docs] def __init__(self, network_builder: FinancialNetworkBuilder) -> None: """ Initialize the coarse-graining with a network builder. Args: network_builder: Network builder containing the network to coarse-grain. """ self.network_builder = network_builder self.coarse_grained_laplacian: Optional[np.ndarray] = None self.coarse_grained_adjacency: Optional[np.ndarray] = None self.rescaled_adjacency: Optional[np.ndarray] = None self.clusters: Optional[np.ndarray] = None self.cluster_mapping: Optional[Dict] = None self.coarse_grained_graph: Optional[nx.Graph] = None
[docs] @classmethod def from_adjacency(cls, adj_matrix: np.ndarray, node_ids: List[str]) -> "SpectralCoarseGraining": """Create an SCG instance directly from an adjacency matrix (no DataPreprocessor needed).""" adapter = _AdjacencyAdapter(adj_matrix, node_ids) instance = cls.__new__(cls) instance.network_builder = adapter instance.coarse_grained_laplacian = None instance.coarse_grained_adjacency = None instance.rescaled_adjacency = None instance.clusters = None instance.cluster_mapping = None instance.coarse_grained_graph = None return instance
[docs] def coarse_grain(self, k: Optional[int] = None) -> np.ndarray: """ Perform spectral coarse-graining using k eigenmodes. Args: k: Number of eigenmodes to use. If None, determined automatically using spectral gap. Returns: Coarse-grained Laplacian matrix as an ndarray. """ if ( self.network_builder.eigenvalues is None or self.network_builder.eigenvectors is None ): self.network_builder.spectral_analysis() eigenvalues = self.network_builder.eigenvalues eigenvectors = self.network_builder.eigenvectors if k is None: # Use spectral gap to determine k k, _ = self.network_builder.find_spectral_gap() # Construct coarse-grained Laplacian using first k eigenmodes. # Use np.zeros with the sparse matrix shape to avoid np.matrix. L_cg = np.zeros(self.network_builder.laplacian.shape) for alpha in range(k + 1): # Include eigenmode 0 to k lambda_alpha = eigenvalues[alpha] u_alpha = eigenvectors[:, alpha].reshape(-1, 1) L_cg += lambda_alpha * (u_alpha @ u_alpha.T) self.coarse_grained_laplacian = L_cg # Extract adjacency A₁ from the reconstructed Laplacian (Eq. 4) D = np.diag(np.diag(L_cg)) A1 = D - L_cg # Topology masking (Eq. 5): A₂ᵢⱼ = A₁ᵢⱼ if original Aᵢⱼ > 0, else 0 if hasattr(self.network_builder, 'adjacency_matrix') and self.network_builder.adjacency_matrix is not None: orig_A = np.asarray(self.network_builder.adjacency_matrix.todense()) mask = (orig_A > 0).astype(float) A1 *= mask self.coarse_grained_adjacency = A1 return self.coarse_grained_laplacian
[docs] def contract_vertices(self) -> np.ndarray: """Vertex contraction: merge nodes with negative off-diagonal in A₁. Nodes that are "similar" at the coarse scale (negative off-diagonal entries in the coarse-grained adjacency) are merged into super-nodes with summed edge weights. Returns: Contracted adjacency matrix (may be smaller than original). """ if self.coarse_grained_adjacency is None: raise ValueError("Call coarse_grain() first.") A = self.coarse_grained_adjacency.copy() n = A.shape[0] # Build union-find for nodes that share a negative off-diagonal entry parent = list(range(n)) def find(x): while parent[x] != x: parent[x] = parent[parent[x]] x = parent[x] return x def union(a, b): ra, rb = find(a), find(b) if ra != rb: parent[rb] = ra for i in range(n): for j in range(i + 1, n): if A[i, j] < 0: union(i, j) # Build mapping: root → list of members groups: Dict[int, List[int]] = {} for i in range(n): r = find(i) groups.setdefault(r, []).append(i) group_list = sorted(groups.values(), key=lambda g: g[0]) m = len(group_list) if m == n: # No contraction needed self.contracted_adjacency = np.maximum(A, 0) self._contraction_map = {i: i for i in range(n)} return self.contracted_adjacency # Build contracted adjacency by summing edges between super-nodes A_contracted = np.zeros((m, m)) node_to_super = {} for si, members in enumerate(group_list): for node in members: node_to_super[node] = si for i in range(n): for j in range(n): if i == j: continue si, sj = node_to_super[i], node_to_super[j] if si != sj and A[i, j] > 0: A_contracted[si, sj] += A[i, j] self.contracted_adjacency = A_contracted self._contraction_map = node_to_super self._contraction_groups = group_list logger.info("Vertex contraction: %d%d super-nodes", n, m) return self.contracted_adjacency
[docs] def rescale(self, lambda_k: Optional[float] = None) -> np.ndarray: """ Rescale the coarse-grained adjacency matrix. Args: lambda_k: Eigenvalue to use for rescaling. If None, uses the k-th eigenvalue where k is from find_spectral_gap. Returns: Rescaled coarse-grained adjacency matrix. Raises: ValueError: If coarse-grained adjacency has not been computed. """ if self.coarse_grained_adjacency is None: raise ValueError( "Coarse-grained adjacency not computed. Call coarse_grain first." ) if lambda_k is None: k, _ = self.network_builder.find_spectral_gap() lambda_k = self.network_builder.eigenvalues[k] if lambda_k <= 0: raise ValueError(f"lambda_k must be positive, got {lambda_k}") # Use contracted adjacency if available, otherwise coarse-grained base = getattr(self, 'contracted_adjacency', None) if base is None: base = self.coarse_grained_adjacency # Rescale by 1/lambda_k to preserve diffusion dynamics (Eq. 6) self.rescaled_adjacency = (1 / lambda_k) * base return self.rescaled_adjacency
[docs] def identify_clusters(self, n_clusters: Optional[int] = None) -> np.ndarray: """ Identify clusters based on eigenvectors. Args: n_clusters: Number of clusters. If None, determined from spectral gap. Returns: Cluster assignments for nodes. """ if self.network_builder.eigenvectors is None: self.network_builder.spectral_analysis() if n_clusters is None: k, _ = self.network_builder.find_spectral_gap() n_clusters = k + 1 # Use k+1 clusters # Extract relevant eigenvectors (skip first — trivial) eigenvectors = self.network_builder.eigenvectors[ :, 1 : n_clusters + 1 ] # Spectral partition: normalise rows and use sign structure # For 2 clusters use Fiedler vector sign; for k>2 use KMeans on # the normalised spectral embedding (Ng, Jordan & Weiss 2002). if n_clusters == 2: fiedler = eigenvectors[:, 0] self.clusters = (fiedler >= 0).astype(int) else: row_norms = np.linalg.norm(eigenvectors, axis=1, keepdims=True) row_norms = np.maximum(row_norms, 1e-12) embedding = eigenvectors / row_norms kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) self.clusters = kmeans.fit_predict(embedding) # Create mapping from nodes to clusters nodes = list(self.network_builder.G.nodes()) self.cluster_mapping = {nodes[i]: self.clusters[i] for i in range(len(nodes))} return self.clusters
[docs] def create_coarse_grained_graph(self) -> nx.Graph: """ Create a coarse-grained graph based on identified clusters. Returns: Coarse-grained graph. """ if self.clusters is None: self.identify_clusters() # Create a new graph with clusters as nodes G_cg = nx.DiGraph() # Add cluster nodes n_clusters = len(set(self.clusters)) for i in range(n_clusters): G_cg.add_node(f"cluster_{i}") # Compute inter-cluster weights original_graph = self.network_builder.G nodes = list(original_graph.nodes()) for i in range(n_clusters): for j in range(n_clusters): if i != j: # Find nodes in each cluster nodes_i = [ nodes[k] for k in range(len(nodes)) if self.clusters[k] == i ] nodes_j = [ nodes[k] for k in range(len(nodes)) if self.clusters[k] == j ] # Compute total weight between clusters total_weight = 0 for source in nodes_i: for target in nodes_j: if original_graph.has_edge(source, target): total_weight += original_graph[source][target]["weight"] # Add edge if there's a connection if total_weight > 0: G_cg.add_edge( f"cluster_{i}", f"cluster_{j}", weight=total_weight ) # Compute aggregate node attributes for i in range(n_clusters): nodes_i = [ nodes[k] for k in range(len(nodes)) if self.clusters[k] == i ] # Guard: skip attribute aggregation if cluster is empty if not nodes_i: return G_cg # Aggregate node attributes for attr in original_graph.nodes[nodes_i[0]]: # Skip non-numeric attributes if not isinstance( original_graph.nodes[nodes_i[0]][attr], (int, float) ): continue # Compute average avg_value = np.mean( [ original_graph.nodes[node][attr] for node in nodes_i if attr in original_graph.nodes[node] ] ) G_cg.nodes[f"cluster_{i}"][attr] = avg_value # Store original nodes in this cluster G_cg.nodes[f"cluster_{i}"]["original_nodes"] = nodes_i G_cg.nodes[f"cluster_{i}"]["size"] = len(nodes_i) self.coarse_grained_graph = G_cg return G_cg
[docs] def map_node_to_cluster(self, node: str) -> str: """ Map an original node to its cluster. Args: node: Original node identifier. Returns: Cluster identifier. Raises: ValueError: If clusters have not been identified or node is not found. """ if self.cluster_mapping is None: raise ValueError("Clusters not identified. Call identify_clusters first.") if node not in self.cluster_mapping: raise ValueError(f"Node {node} not found in the network") return f"cluster_{self.cluster_mapping[node]}"
[docs] def compute_coarse_graining_error(self) -> float: """ Compute the error introduced by coarse-graining. Returns: Relative error in eigenvalues. Raises: ValueError: If coarse-grained Laplacian has not been computed. """ if self.coarse_grained_laplacian is None: raise ValueError( "Coarse-grained Laplacian not computed. Call coarse_grain first." ) # Get original eigenvalues orig_eigenvalues = self.network_builder.eigenvalues # Compute eigenvalues of coarse-grained Laplacian cg_eigenvalues, _ = la.eigh(self.coarse_grained_laplacian) # Determine k k, _ = self.network_builder.find_spectral_gap() # Compare first k+1 eigenvalues error = np.linalg.norm( orig_eigenvalues[: k + 1] - cg_eigenvalues[: k + 1] ) / np.linalg.norm(orig_eigenvalues[: k + 1]) return error
[docs] def compute_reconstruction_accuracy(self, time_steps: int = 15) -> Dict[str, List[float]]: """Measure how well the CG Laplacian reconstructs the original diffusion signal. For each time step t, computes: - correlation between p_orig(t) and p_cg(t) - RMSE - R² (coefficient of determination) Returns dict with keys: 'time', 'correlation', 'rmse', 'r2'. """ if self.coarse_grained_laplacian is None: raise ValueError("Call coarse_grain() first.") L_orig = np.asarray(self.network_builder.laplacian.todense()) L_cg = self.coarse_grained_laplacian n = L_orig.shape[0] p0 = np.ones(n) / n times, corrs, rmses, r2s = [], [], [], [] for t in range(1, time_steps + 1): pt_orig = expm(-t * L_orig) @ p0 pt_cg = expm(-t * L_cg) @ p0 corr = float(np.corrcoef(pt_orig, pt_cg)[0, 1]) if np.std(pt_orig) > 1e-15 else 1.0 rmse = float(np.sqrt(np.mean((pt_orig - pt_cg) ** 2))) ss_res = np.sum((pt_orig - pt_cg) ** 2) ss_tot = np.sum((pt_orig - np.mean(pt_orig)) ** 2) r2 = 1.0 - ss_res / ss_tot if ss_tot > 1e-30 else 1.0 times.append(t) corrs.append(corr) rmses.append(rmse) r2s.append(r2) return {"time": times, "correlation": corrs, "rmse": rmses, "r2": r2s}
[docs] def compare_diffusion_dynamics(self, time_steps: int = 10) -> List[float]: """ Compare diffusion dynamics between original and coarse-grained networks. Uses the matrix exponential (scipy.linalg.expm) to compute the exact diffusion operator e^{-tL}, avoiding element-wise exp. Args: time_steps: Number of time steps to simulate. Defaults to 10. Returns: List of relative errors at each time step. Raises: ValueError: If coarse-grained Laplacian has not been computed. """ if self.coarse_grained_laplacian is None: raise ValueError( "Coarse-grained Laplacian not computed. Call coarse_grain first." ) # Original Laplacian — convert from np.matrix to np.ndarray L_orig = np.asarray(self.network_builder.laplacian.todense()) # Coarse-grained Laplacian (already ndarray) L_cg = self.coarse_grained_laplacian # Initial state (uniform distribution) p0 = np.ones(L_orig.shape[0]) / L_orig.shape[0] # Simulate diffusion on both networks using matrix exponential errors = [] for t in range(1, time_steps + 1): # Original diffusion: e^{-t L_orig} p0 pt_orig = expm(-t * np.asarray(L_orig)) @ p0 # Coarse-grained diffusion: e^{-t L_cg} p0 pt_cg = expm(-t * np.asarray(L_cg)) @ p0 # Compute relative error error = np.linalg.norm(pt_orig - pt_cg) / np.linalg.norm(pt_orig) errors.append(error) return errors