"""
Network builder for financial network analysis.
This module provides the FinancialNetworkBuilder class for constructing
financial networks from preprocessed data.
"""
import logging
import networkx as nx
import numpy as np
import pandas as pd
import scipy.sparse as sp
import scipy.linalg as la
from typing import Dict, List, Optional, Tuple, Union
from ..data.preprocessor import DataPreprocessor
logger = logging.getLogger(__name__)
[docs]
class FinancialNetworkBuilder:
"""
Constructs financial networks from preprocessed data.
Parameters
----------
preprocessor : DataPreprocessor
Preprocessor containing the data to build the network from
Attributes
----------
G : networkx.Graph
NetworkX graph representing the financial network
adjacency_matrix : scipy.sparse.csr_matrix
Sparse adjacency matrix of the network
laplacian : scipy.sparse.csr_matrix
Graph Laplacian matrix
eigenvalues : numpy.ndarray
Eigenvalues of the Laplacian
eigenvectors : numpy.ndarray
Eigenvectors of the Laplacian
"""
[docs]
def __init__(self, preprocessor: DataPreprocessor):
"""Initialize the network builder with preprocessed data."""
self.preprocessor = preprocessor
self.G = None
self.adjacency_matrix = None
self.laplacian = None
self.eigenvalues = None
self.eigenvectors = None
[docs]
def construct_network(self, time_point: Union[str, pd.Timestamp], edge_weight_type: str = 'interbank_exposures') -> nx.Graph:
"""
Construct network for a specific time point.
Parameters
----------
time_point : str or pd.Timestamp
Date for which to construct the network
edge_weight_type : str, optional
Type of edge weight to use, by default 'interbank_exposures'
Returns
-------
networkx.Graph
Constructed financial network
"""
# Get data for the specified time point
data = self.preprocessor.get_data_for_timepoint(time_point)
# Extract edge data
if edge_weight_type not in data['edge_data']:
raise ValueError(f"Edge weight type '{edge_weight_type}' not found in data")
edge_data = data['edge_data'][edge_weight_type]
# Create empty graph
G = nx.DiGraph()
# Add edges
for _, row in edge_data.iterrows():
G.add_edge(
row['source'],
row['target'],
weight=row['weight']
)
# Add node attributes
for category, node_data in data['node_data'].items():
if isinstance(node_data, pd.DataFrame):
for _, row in node_data.iterrows():
bank_id = row['bank_id']
if bank_id in G.nodes():
for col in node_data.columns:
if col != 'bank_id' and col != 'date':
G.nodes[bank_id][col] = row[col]
else:
# If node_data is a Series (single time point)
for bank_id, value in node_data.items():
if bank_id in G.nodes():
G.nodes[bank_id][category] = value
# Validate that the graph has nodes
if G.number_of_nodes() == 0:
raise ValueError(
"No nodes found in network for the given time point and edge weight type"
)
# Store the graph
self.G = G
# Create adjacency matrix
self.adjacency_matrix = nx.to_scipy_sparse_array(G, weight='weight')
return G
[docs]
def compute_laplacian(self, normalized: bool = True) -> sp.csr_matrix:
"""
Compute the graph Laplacian.
Parameters
----------
normalized : bool, optional
Whether to compute the normalized Laplacian, by default True
Returns
-------
scipy.sparse.csr_matrix
Graph Laplacian matrix
"""
if self.G is None:
raise ValueError("Network not constructed. Call construct_network first.")
if normalized:
self.laplacian = nx.normalized_laplacian_matrix(self.G, weight='weight')
else:
self.laplacian = nx.laplacian_matrix(self.G, weight='weight')
return self.laplacian
[docs]
def spectral_analysis(self) -> Tuple[np.ndarray, np.ndarray]:
"""
Perform spectral analysis of the Laplacian.
Returns
-------
tuple
Tuple containing eigenvalues and eigenvectors
"""
if self.laplacian is None:
raise ValueError("Laplacian not computed. Call compute_laplacian first.")
# For small networks, we can use dense eigendecomposition
L_dense = np.asarray(self.laplacian.todense())
self.eigenvalues, self.eigenvectors = la.eigh(L_dense)
return self.eigenvalues, self.eigenvectors
[docs]
def find_spectral_gap(self) -> Tuple[int, float]:
"""
Identify the spectral gap for coarse-graining.
Returns
-------
tuple
Tuple containing the index of the gap and the gap size
"""
if self.eigenvalues is None:
raise ValueError("Spectral analysis not performed. Call spectral_analysis first.")
# Compute differences between consecutive eigenvalues
gaps = np.diff(self.eigenvalues)
# Find the largest gap after the first few eigenvalues
# Skip the first eigenvalue (which is 0 for connected graphs)
k = np.argmax(gaps[1:]) + 1
return k, gaps[k]
[docs]
def get_node_attribute_matrix(self, attribute: str) -> np.ndarray:
"""
Get matrix of node attributes.
Parameters
----------
attribute : str
Node attribute to extract
Returns
-------
numpy.ndarray
Matrix of node attributes
"""
if self.G is None:
raise ValueError("Network not constructed. Call construct_network first.")
# Extract attribute for all nodes
values = []
for node in self.G.nodes():
if attribute in self.G.nodes[node]:
values.append(self.G.nodes[node][attribute])
else:
values.append(0)
return np.array(values)
[docs]
def get_edge_weight_matrix(self) -> np.ndarray:
"""
Get matrix of edge weights.
Returns
-------
numpy.ndarray
Matrix of edge weights
"""
if self.adjacency_matrix is None:
raise ValueError("Network not constructed. Call construct_network first.")
return self.adjacency_matrix.todense()
[docs]
def compute_centrality_measures(self) -> Dict[str, Dict[str, float]]:
"""
Compute various centrality measures for the network.
Returns
-------
dict
Dictionary mapping centrality types to dictionaries of node centralities
"""
if self.G is None:
raise ValueError("Network not constructed. Call construct_network first.")
centrality_measures = {}
# Degree centrality
centrality_measures['degree'] = nx.degree_centrality(self.G)
# Eigenvector centrality
try:
centrality_measures['eigenvector'] = nx.eigenvector_centrality(
self.G, weight='weight'
)
except nx.PowerIterationFailedConvergence:
logger.warning(
"Eigenvector centrality did not converge for graph; returning zeros."
)
centrality_measures['eigenvector'] = {node: 0 for node in self.G.nodes()}
# Betweenness centrality
centrality_measures['betweenness'] = nx.betweenness_centrality(self.G, weight='weight')
# Closeness centrality
centrality_measures['closeness'] = nx.closeness_centrality(self.G, distance='weight')
# PageRank
centrality_measures['pagerank'] = nx.pagerank(self.G, weight='weight')
return centrality_measures