SciPy 图结构
SciPy 图结构与网络分析:量化金融应用
虽然SciPy核心没有专门的图论模块,但通过scipy.sparse
和scipy.sparse.csr_matrix
可以高效构建和操作图结构数据。结合NetworkX库,SciPy提供了强大的图算法支持,特别适用于金融网络分析、风险传染、资产相关性网络等场景。
1. 图表示方法与SciPy集成
邻接矩阵(Adjacency Matrix)
from scipy import sparse
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
def create_adjacency_matrix(graph):
"""从NetworkX图创建SciPy稀疏邻接矩阵"""
n_nodes = graph.number_of_nodes()
adj_matrix = nx.to_scipy_sparse_array(graph, nodelist=range(n_nodes), format='csr')
return adj_matrix
# 示例:金融资产相关性网络
np.random.seed(42)
n_assets = 100
G = nx.erdos_renyi_graph(n_assets, 0.05) # 5%连接概率
# 资产标签
assets = [f'ASSET_{i}' for i in range(n_assets)]
nx.set_node_attributes(G, {i: assets[i] for i in range(n_assets)}, 'symbol')
adj_csr = create_adjacency_matrix(G)
print(f"图结构: {n_assets}节点, {G.number_of_edges()}边")
print(f"邻接矩阵: {adj_csr.shape}, 非零元素: {adj_csr.nnz}")
print(f"平均度: {2 * G.number_of_edges() / n_assets:.2f}")
边列表与COO格式
def edges_to_coo_matrix(edges, n_nodes):
"""从边列表创建COO稀疏矩阵"""
rows, cols = zip(*edges)
data = np.ones(len(edges)) # 无向图权重为1
coo = sparse.coo_matrix((data, (rows, cols)),
shape=(n_nodes, n_nodes))
return coo.tocsr() + coo.tocsc().T - sparse.diags(coo.diagonal()) # 无向图对称化
# 金融交易网络(有向)
trades = [(0, 1, 1000), (1, 2, 500), (0, 2, 2000), # (from, to, volume)
(2, 0, 1500), (1, 0, 800)]
edges, weights = zip(*[(u, v) for u, v, w in trades])
trade_weights = np.array(weights)
trade_adj = sparse.coo_matrix((trade_weights, (edges, weights)),
shape=(n_assets, n_assets)).tocsr()
print("交易网络邻接矩阵:", trade_adj.sum())
2. 图谱分析算法
度分布与网络统计
def analyze_network_stats(adj_matrix):
"""网络基本统计分析"""
# 度序列
degrees = np.array(adj_matrix.sum(axis=1)).flatten()
in_degrees = np.array(adj_matrix.sum(axis=0)).flatten()
stats = {
'avg_degree': degrees.mean(),
'max_degree': degrees.max(),
'degree_std': degrees.std(),
'clustering_coeff': nx.average_clustering(nx.from_scipy_sparse_array(adj_matrix)),
'assortativity': nx.degree_assortativity_coefficient(nx.from_scipy_sparse_array(adj_matrix)),
'density': adj_matrix.nnz / (adj_matrix.shape[0] * (adj_matrix.shape[1] - 1))
}
# 幂律检验(金融网络常见)
from scipy import stats
log_degrees = np.log10(degrees[degrees > 0])
log_bins = np.log10(np.histogram(degrees[degrees > 0], bins=20)[1][1:])
slope, intercept, r_value, p_value, std_err = stats.linregress(log_bins[:-1], log_degrees)
stats.update({
'power_law_exponent': -slope,
'power_law_r2': r_value**2,
'power_law_pvalue': p_value
})
return stats, degrees
# 分析金融网络
stats, degrees = analyze_network_stats(adj_csr)
print("网络统计:")
for key, value in stats.items():
print(f" {key}: {value:.4f}")
# 可视化度分布
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.hist(degrees, bins=30, alpha=0.7, log=True)
plt.xlabel('度数')
plt.ylabel('频数 (对数)')
plt.title('度分布')
plt.subplot(1, 2, 2)
plt.loglog(sorted(degrees, reverse=True), 'o-')
plt.xlabel('排名')
plt.ylabel('度数')
plt.title('Rank-Degree图')
plt.tight_layout()
plt.show()
中心性度量(风险识别)
from scipy.sparse.linalg import eigs, eigsh
def centrality_measures(adj_matrix, normalized=True):
"""计算多种中心性度量"""
n = adj_matrix.shape[0]
# 1. 度中心性
degree_centrality = np.array(adj_matrix.sum(axis=1)).flatten()
if normalized:
degree_centrality /= n - 1
# 2. 特征向量中心性(PageRank类比)
try:
eigenvalues, eigenvectors = eigsh(adj_matrix, k=1, which='LR')
eigenvector_centrality = np.abs(eigenvectors[:, 0])
eigenvector_centrality /= eigenvector_centrality.sum()
except:
eigenvector_centrality = np.zeros(n)
# 3. 介数中心性(简化近似)
# 使用SciPy实现Katz中心性
alpha = 0.1 # 衰减因子
I = sparse.eye(n, format='csr')
katz_matrix = sparse.linalg.inv(I - alpha * adj_matrix)
katz_centrality = np.array(katz_matrix.sum(axis=0)).flatten()
# 4. 接近中心性
from scipy.sparse.csgraph import shortest_path
dist_matrix, predecessors = shortest_path(adj_matrix, directed=False,
return_predecessors=True)
closeness_centrality = 1 / (np.sum(dist_matrix, axis=1) + 1e-10)
closeness_centrality /= n - 1
return {
'degree': degree_centrality,
'eigenvector': eigenvector_centrality,
'katz': katz_centrality,
'closeness': closeness_centrality
}
# 计算中心性(识别系统重要性资产)
centralities = centrality_measures(adj_csr)
systemic_assets = np.argsort(centralities['eigenvector'])[-10:] # Top 10系统性资产
print("系统重要性资产:", [assets[i] for i in systemic_assets])
3. 金融网络风险分析
风险传染模型
def contagion_risk_analysis(adj_matrix, initial_shocks, contagion_param=0.3):
"""
风险传染模拟(De Marzo-Mayo模型简化)
adj_matrix: 金融网络邻接矩阵
initial_shocks: 初始冲击节点
contagion_param: 传染参数
"""
n = adj_matrix.shape[0]
shocks = np.zeros(n)
shocks[initial_shocks] = 1.0 # 初始违约
# 传染迭代
max_iterations = 100
tolerance = 1e-6
for iteration in range(max_iterations):
# 邻居受传染概率
neighbor_exposure = adj_matrix @ shocks
new_infections = (neighbor_exposure * contagion_param >
np.random.rand(n)) * (1 - shocks)
shocks += new_infections
if np.sum(new_infections) < tolerance:
break
total_contagion = shocks.sum()
print(f"传染轮次: {iteration}")
print(f"总传染节点: {total_contagion:.0f}/{n}")
return shocks, iteration
# 模拟银行间传染
bank_network = nx.barabasi_albert_graph(50, 3) # 规模自由网络
bank_adj = create_adjacency_matrix(bank_network)
# 初始5家银行违约
initial_defaults = np.random.choice(50, 5, replace=False)
contagion_shocks, steps = contagion_risk_analysis(bank_adj, initial_defaults)
print(f"传染完成,影响{contagion_shocks.sum():.0f}家银行")
网络VaR计算
def network_var(adj_matrix, asset_returns, weights, alpha=0.05):
"""网络增强的VaR计算(考虑传染效应)"""
# 1. 独立VaR
portfolio_return = np.dot(asset_returns, weights)
standalone_var = np.percentile(portfolio_return, alpha * 100)
# 2. 网络传染VaR
n_assets, n_scenarios = asset_returns.shape
# 构建情景下的网络冲击
network_shocks = []
for scenario in range(n_scenarios):
# 基于相关性生成网络扰动
shock_vector = asset_returns[scenario]
network_effect = adj_matrix @ (shock_vector * weights)
total_shock = np.dot(weights, shock_vector + 0.3 * network_effect)
network_shocks.append(total_shock)
network_var = np.percentile(network_shocks, alpha * 100)
return {
'standalone_var': standalone_var,
'network_var': network_var,
'contagion_amplification': network_var / standalone_var - 1
}
# 示例数据
n_scenarios = 10000
asset_returns = np.random.multivariate_normal(np.zeros(n_assets),
np.eye(n_assets) * 0.02**2,
n_scenarios)
weights = np.random.rand(n_assets)
weights /= weights.sum()
var_results = network_var(adj_csr, asset_returns, weights)
print(f"独立VaR: {var_results['standalone_var']:.4f}")
print(f"网络VaR: {var_results['network_var']:.4f}")
print(f"传染放大: {var_results['contagion_amplification']*100:.1f}%")
4. 社区检测与模块化
谱聚类社区检测
from scipy.sparse.linalg import eigs
from sklearn.cluster import SpectralClustering
def spectral_community_detection(adj_matrix, n_communities=5):
"""谱聚类社区检测"""
# 计算归一化拉普拉斯矩阵
degree_matrix = sparse.diags(np.array(adj_matrix.sum(axis=1)).flatten())
laplacian = degree_matrix - adj_matrix
# 归一化拉普拉斯
D_inv_sqrt = sparse.diags(1 / np.sqrt(np.array(degree_matrix.diagonal())))
norm_laplacian = D_inv_sqrt @ laplacian @ D_inv_sqrt
# 计算前k个特征向量
eigenvalues, eigenvectors = eigs(norm_laplacian, k=n_communities,
which='SR', tol=1e-4)
# 谱聚类
spectral = SpectralClustering(n_clusters=n_communities,
affinity='precomputed',
random_state=42)
communities = spectral.fit_predict(eigenvectors.real)
return communities, eigenvalues
# 检测金融行业社区
communities, evals = spectral_community_detection(adj_csr, n_communities=6)
# 社区统计
unique_communities, community_sizes = np.unique(community_sizes, return_counts=True)
print("社区检测结果:")
for comm, size in zip(unique_communities, community_sizes):
print(f"社区 {comm}: {size}个资产")
模块化优化(Louvain算法近似)
def modularity_optimization(adj_matrix, resolution=1.0):
"""模块化优化(贪婪算法)"""
from sklearn.cluster import AgglomerativeClustering
# 层次聚类近似模块化
connectivity = adj_matrix > 0
clustering = AgglomerativeClustering(n_clusters=None,
distance_threshold=0,
connectivity=connectivity,
linkage='ward')
# 初始聚类
labels = clustering.fit_predict(adj_matrix.toarray())
# 模块化得分
def modularity_score(adj, communities):
"""计算模块化得分"""
n = adj.shape[0]
Q = 0
for comm in np.unique(comm):
comm_mask = (communities == comm)
e_ii = (adj[comm_mask][:, comm_mask].sum() / 2)
a_i = adj[comm_mask].sum() / 2
Q += (e_ii - a_i**2) / 1.0
return Q
return labels, modularity_score(adj_matrix, labels)
communities, modularity = modularity_optimization(adj_csr)
print(f"最优模块化得分: {modularity:.4f}")
5. 时变网络分析
滚动窗口网络
def rolling_network_analysis(returns, window=30, min_spanning_tree=True):
"""滚动窗口网络分析"""
T, N = returns.shape
networks = []
for t in range(window, T):
# 计算时间窗口相关性
window_returns = returns[t-window:t]
corr_matrix = np.corrcoef(window_returns.T)
corr_matrix[np.abs(corr_matrix) < 0.1] = 0 # 阈值化
# 转换为邻接矩阵
adj_window = (np.abs(corr_matrix) > 0.5).astype(float)
np.fill_diagonal(adj_window, 0)
# 最小生成树(网络稀疏化)
if min_spanning_tree:
G_window = nx.from_numpy_array(adj_window)
mst = nx.minimum_spanning_tree(G_window)
adj_mst = nx.to_scipy_sparse_array(mst)
else:
adj_mst = sparse.csr_matrix(adj_window)
networks.append({
'time': t,
'adj_matrix': adj_mst,
'correlation': corr_matrix,
'mst_edges': G_window.number_of_edges() if min_spanning_tree else None
})
return networks
# 示例:股票网络演化
returns = np.random.randn(252*2, 50) * 0.02 # 2年50只股票
networks = rolling_network_analysis(returns, window=60)
# 网络指标时间序列
mst_edges = [net['mst_edges'] for net in networks]
plt.figure(figsize=(12, 6))
plt.plot(mst_edges)
plt.title('最小生成树边数时间序列')
plt.ylabel('MST边数')
plt.xlabel('时间窗口')
plt.show()
6. 图神经网络数据准备
图拉普拉斯特征
def graph_laplacian_features(adj_matrix, degree_weighted=False):
"""图拉普拉斯算子特征(GNN输入)"""
# 度矩阵
if degree_weighted:
degrees = np.array(adj_matrix.sum(axis=1)).flatten()
D = sparse.diags(degrees)
else:
D = sparse.eye(adj_matrix.shape[0], format='csr')
# 归一化拉普拉斯
L = D - adj_matrix
L_norm = sparse.linalg.inv(D**0.5) @ L @ sparse.linalg.inv(D**0.5)
# 拉普拉斯特征向量(图谱嵌入)
try:
eigenvalues, eigenvectors = eigsh(L_norm, k=32, which='SM')
laplacian_features = eigenvectors.real
except:
laplacian_features = np.eye(adj_matrix.shape[0])[:, :32]
return {
'laplacian_matrix': L_norm,
'eigenvalues': eigenvalues,
'features': laplacian_features
}
# GNN特征提取
features = graph_laplacian_features(adj_csr)
print(f"图嵌入维度: {features['features'].shape}")
7. 高级图算法实现
PageRank(资产重要性)
def custom_pagerank(adj_matrix, alpha=0.85, max_iter=100, tol=1e-6):
"""自定义PageRank实现(SciPy稀疏)"""
n = adj_matrix.shape[0]
# 列归一化(随机超链接)
col_sums = np.array(adj_matrix.sum(axis=0)).flatten()
col_sums[col_sums == 0] = 1 # 避免除零
norm_adj = adj_matrix / col_sums
# 随机跳转矩阵
teleport = (1 - alpha) / n * sparse.eye(n, format='csr')
# PageRank迭代
pr = np.ones(n) / n # 初始均匀分布
for iteration in range(max_iter):
pr_new = alpha * (norm_adj.T @ pr) + teleport @ pr
if np.linalg.norm(pr_new - pr, 1) < tol:
break
pr = pr_new
return pr / pr.sum() # 归一化
# 资产PageRank(交易网络)
pagerank_scores = custom_pagerank(trade_adj)
top_assets = np.argsort(pagerank_scores)[-5:]
print("PageRank Top资产:", [assets[i] for i in top_assets])
最短路径与套利机会
from scipy.sparse.csgraph import shortest_path, dijkstra
def arbitrage_opportunities(adj_matrix, prices, max_path_length=5):
"""检测套利机会(图最短路径)"""
# 转换为距离矩阵(负对数价格)
log_prices = np.log(prices)
distance_matrix = -adj_matrix.multiply(log_prices) # 价格越高,距离越小
# 多源最短路径
distances, predecessors = shortest_path(distance_matrix,
directed=True,
return_predecessors=True)
# 寻找负循环(套利)
arbitrage_cycles = []
for start in range(adj_matrix.shape[0]):
# 重建路径
path = []
current = start
total_profit = 0
for _ in range(max_path_length):
next_node = predecessors[start, current]
if next_node == -9999: # 无路径
break
edge_profit = prices[current] / prices[next_node] - 1
total_profit += edge_profit
path.append((current, next_node))
current = next_node
if total_profit > 0.01: # 1%套利机会
arbitrage_cycles.append({
'path': path,
'profit': total_profit,
'start': start
})
return sorted(arbitrage_cycles, key=lambda x: x['profit'], reverse=True)
# 模拟套利网络
prices = np.random.uniform(0.8, 1.2, n_assets)
arbitrages = arbitrage_opportunities(trade_adj, prices)
if arbitrage_cycles:
print("发现套利机会:")
for arb in arbitrage_cycles[:3]:
print(f"路径: {arb['path']}, 利润: {arb['profit']:.2%}")
8. 网络可视化与分析
网络可视化
def visualize_financial_network(G, centrality_measure='degree', top_n=20):
"""金融网络可视化"""
# 计算中心性
if centrality_measure == 'degree':
centrality = dict(G.degree())
elif centrality_measure == 'pagerank':
centrality = nx.pagerank(G)
# 选择重要节点
top_nodes = sorted(centrality, key=centrality.get, reverse=True)[:top_n]
subgraph = G.subgraph(top_nodes).copy()
plt.figure(figsize=(12, 10))
pos = nx.spring_layout(subgraph, k=1, iterations=50)
# 节点大小基于中心性
node_sizes = [3000 * centrality[node] / max(centrality.values())
for node in subgraph.nodes()]
# 边权重可视化
edges = subgraph.edges()
weights = [G[u][v].get('weight', 1) for u, v in edges]
nx.draw_networkx_nodes(subgraph, pos, node_size=node_sizes,
node_color='lightblue', alpha=0.8)
nx.draw_networkx_edges(subgraph, pos, width=[w*2 for w in weights],
alpha=0.5, edge_color='gray')
nx.draw_networkx_labels(subgraph, pos, {n: G.nodes[n]['symbol'][:4]
for n in subgraph.nodes()},
font_size=8)
plt.title(f"金融网络 (Top {top_n}资产, {centrality_measure}中心性)")
plt.axis('off')
plt.tight_layout()
plt.show()
# 可视化
visualize_financial_network(G, centrality_measure='pagerank')
9. 性能优化与大规模网络
大规模网络处理
def large_scale_network_processing(n_nodes=10000, density=0.001):
"""大规模网络性能测试"""
# 生成大规模随机网络
G_large = nx.erdos_renyi_graph(n_nodes, density)
adj_large = nx.to_scipy_sparse_array(G_large, format='csr')
print(f"大规模网络: {n_nodes}节点, {G_large.number_of_edges():,}边")
print(f"内存占用: {adj_large.data.nbytes / 1e6:.1f} MB")
# 高效中心性计算
degrees = np.array(adj_large.sum(axis=1)).flatten()
top_nodes = np.argsort(degrees)[-100:] # Top 100
# 子图分析
subgraph_adj = adj_large[top_nodes][:, top_nodes]
subgraph_stats = analyze_network_stats(subgraph_adj)
return adj_large, subgraph_stats
large_adj, stats = large_scale_network_processing()
并行图计算
from multiprocessing import Pool
import dask.array as da
def parallel_pagerank(adj_matrix, n_workers=4):
"""并行PageRank计算"""
# 分块计算
block_size = adj_matrix.shape[0] // n_workers
pr_blocks = []
for i in range(n_workers):
start = i * block_size
end = min((i+1) * block_size, adj_matrix.shape[0])
block_adj = adj_matrix[start:end, :]
# 每个块独立计算
pr_block = custom_pagerank(block_adj)
pr_blocks.append(pr_block)
# 合并结果
full_pr = np.concatenate(pr_blocks)
return full_pr / full_pr.sum()
10. 金融网络最佳实践
网络构建策略
class FinancialNetworkAnalyzer:
"""金融网络分析器"""
def __init__(self, correlation_threshold=0.6, min_weight=0.1):
self.corr_threshold = correlation_threshold
self.min_weight = min_weight
def build_from_returns(self, returns):
"""从收益构建网络"""
corr_matrix = np.corrcoef(returns.T)
# 阈值化相关性网络
adj_matrix = (np.abs(corr_matrix) > self.corr_threshold).astype(float)
adj_matrix[np.abs(corr_matrix) < self.min_weight] = 0
np.fill_diagonal(adj_matrix, 0)
self.adj_matrix = sparse.csr_matrix(adj_matrix)
self.correlation_matrix = corr_matrix
return self
def systemic_risk_score(self):
"""系统性风险评分"""
centralities = centrality_measures(self.adj_matrix)
# 综合系统性指标
systemic_score = (centralities['degree'] +
centralities['eigenvector'] * 2 +
centralities['katz']) / 4
return np.argsort(systemic_score)[-10:] # Top 10系统性资产
def stress_test(self, shock_assets, shock_magnitude=0.5):
"""压力测试"""
contagion_shocks, steps = contagion_risk_analysis(
self.adj_matrix, shock_assets, contagion_param=shock_magnitude)
return {
'affected_assets': np.where(contagion_shocks > 0)[0],
'contagion_steps': steps,
'total_impact': contagion_shocks.sum()
}
# 使用示例
analyzer = FinancialNetworkAnalyzer(correlation_threshold=0.7)
analyzer.build_from_returns(returns)
systemic_assets = analyzer.systemic_risk_score()
print("系统性风险资产:", [assets[i] for i in systemic_assets])
# 压力测试
stress_results = analyzer.stress_test(systemic_assets[:3])
print(f"压力测试影响: {len(stress_results['affected_assets'])}个资产")
SciPy图结构分析结合稀疏矩阵技术,为金融网络风险管理、资产配置优化和系统性风险识别提供了强大工具。通过中心性分析、社区检测和传染模型,可以有效识别市场脆弱点和系统重要性机构。需要特定金融网络算法或与深度学习图神经网络的集成,请告诉我具体需求!