SciPy 图结构

SciPy 图结构与网络分析:量化金融应用

虽然SciPy核心没有专门的图论模块,但通过scipy.sparsescipy.sparse.csr_matrix可以高效构建和操作图结构数据。结合NetworkX库,SciPy提供了强大的图算法支持,特别适用于金融网络分析、风险传染、资产相关性网络等场景。

1. 图表示方法与SciPy集成

邻接矩阵(Adjacency Matrix)
from scipy import sparse
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt

def create_adjacency_matrix(graph):
    """从NetworkX图创建SciPy稀疏邻接矩阵"""
    n_nodes = graph.number_of_nodes()
    adj_matrix = nx.to_scipy_sparse_array(graph, nodelist=range(n_nodes), format='csr')
    return adj_matrix

# 示例:金融资产相关性网络
np.random.seed(42)
n_assets = 100
G = nx.erdos_renyi_graph(n_assets, 0.05)  # 5%连接概率

# 资产标签
assets = [f'ASSET_{i}' for i in range(n_assets)]
nx.set_node_attributes(G, {i: assets[i] for i in range(n_assets)}, 'symbol')

adj_csr = create_adjacency_matrix(G)
print(f"图结构: {n_assets}节点, {G.number_of_edges()}边")
print(f"邻接矩阵: {adj_csr.shape}, 非零元素: {adj_csr.nnz}")
print(f"平均度: {2 * G.number_of_edges() / n_assets:.2f}")
边列表与COO格式
def edges_to_coo_matrix(edges, n_nodes):
    """从边列表创建COO稀疏矩阵"""
    rows, cols = zip(*edges)
    data = np.ones(len(edges))  # 无向图权重为1

    coo = sparse.coo_matrix((data, (rows, cols)), 
                           shape=(n_nodes, n_nodes))
    return coo.tocsr() + coo.tocsc().T - sparse.diags(coo.diagonal())  # 无向图对称化

# 金融交易网络(有向)
trades = [(0, 1, 1000), (1, 2, 500), (0, 2, 2000),  # (from, to, volume)
          (2, 0, 1500), (1, 0, 800)]
edges, weights = zip(*[(u, v) for u, v, w in trades])
trade_weights = np.array(weights)

trade_adj = sparse.coo_matrix((trade_weights, (edges, weights)), 
                             shape=(n_assets, n_assets)).tocsr()
print("交易网络邻接矩阵:", trade_adj.sum())

2. 图谱分析算法

度分布与网络统计
def analyze_network_stats(adj_matrix):
    """网络基本统计分析"""
    # 度序列
    degrees = np.array(adj_matrix.sum(axis=1)).flatten()
    in_degrees = np.array(adj_matrix.sum(axis=0)).flatten()

    stats = {
        'avg_degree': degrees.mean(),
        'max_degree': degrees.max(),
        'degree_std': degrees.std(),
        'clustering_coeff': nx.average_clustering(nx.from_scipy_sparse_array(adj_matrix)),
        'assortativity': nx.degree_assortativity_coefficient(nx.from_scipy_sparse_array(adj_matrix)),
        'density': adj_matrix.nnz / (adj_matrix.shape[0] * (adj_matrix.shape[1] - 1))
    }

    # 幂律检验(金融网络常见)
    from scipy import stats
    log_degrees = np.log10(degrees[degrees > 0])
    log_bins = np.log10(np.histogram(degrees[degrees > 0], bins=20)[1][1:])

    slope, intercept, r_value, p_value, std_err = stats.linregress(log_bins[:-1], log_degrees)

    stats.update({
        'power_law_exponent': -slope,
        'power_law_r2': r_value**2,
        'power_law_pvalue': p_value
    })

    return stats, degrees

# 分析金融网络
stats, degrees = analyze_network_stats(adj_csr)
print("网络统计:")
for key, value in stats.items():
    print(f"  {key}: {value:.4f}")

# 可视化度分布
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.hist(degrees, bins=30, alpha=0.7, log=True)
plt.xlabel('度数')
plt.ylabel('频数 (对数)')
plt.title('度分布')

plt.subplot(1, 2, 2)
plt.loglog(sorted(degrees, reverse=True), 'o-')
plt.xlabel('排名')
plt.ylabel('度数')
plt.title('Rank-Degree图')
plt.tight_layout()
plt.show()
中心性度量(风险识别)
from scipy.sparse.linalg import eigs, eigsh

def centrality_measures(adj_matrix, normalized=True):
    """计算多种中心性度量"""
    n = adj_matrix.shape[0]

    # 1. 度中心性
    degree_centrality = np.array(adj_matrix.sum(axis=1)).flatten()
    if normalized:
        degree_centrality /= n - 1

    # 2. 特征向量中心性(PageRank类比)
    try:
        eigenvalues, eigenvectors = eigsh(adj_matrix, k=1, which='LR')
        eigenvector_centrality = np.abs(eigenvectors[:, 0])
        eigenvector_centrality /= eigenvector_centrality.sum()
    except:
        eigenvector_centrality = np.zeros(n)

    # 3. 介数中心性(简化近似)
    # 使用SciPy实现Katz中心性
    alpha = 0.1  # 衰减因子
    I = sparse.eye(n, format='csr')
    katz_matrix = sparse.linalg.inv(I - alpha * adj_matrix)
    katz_centrality = np.array(katz_matrix.sum(axis=0)).flatten()

    # 4. 接近中心性
    from scipy.sparse.csgraph import shortest_path
    dist_matrix, predecessors = shortest_path(adj_matrix, directed=False, 
                                            return_predecessors=True)
    closeness_centrality = 1 / (np.sum(dist_matrix, axis=1) + 1e-10)
    closeness_centrality /= n - 1

    return {
        'degree': degree_centrality,
        'eigenvector': eigenvector_centrality,
        'katz': katz_centrality,
        'closeness': closeness_centrality
    }

# 计算中心性(识别系统重要性资产)
centralities = centrality_measures(adj_csr)
systemic_assets = np.argsort(centralities['eigenvector'])[-10:]  # Top 10系统性资产
print("系统重要性资产:", [assets[i] for i in systemic_assets])

3. 金融网络风险分析

风险传染模型
def contagion_risk_analysis(adj_matrix, initial_shocks, contagion_param=0.3):
    """
    风险传染模拟(De Marzo-Mayo模型简化)

    adj_matrix: 金融网络邻接矩阵
    initial_shocks: 初始冲击节点
    contagion_param: 传染参数
    """
    n = adj_matrix.shape[0]
    shocks = np.zeros(n)
    shocks[initial_shocks] = 1.0  # 初始违约

    # 传染迭代
    max_iterations = 100
    tolerance = 1e-6
    for iteration in range(max_iterations):
        # 邻居受传染概率
        neighbor_exposure = adj_matrix @ shocks
        new_infections = (neighbor_exposure * contagion_param > 
                         np.random.rand(n)) * (1 - shocks)

        shocks += new_infections
        if np.sum(new_infections) < tolerance:
            break

    total_contagion = shocks.sum()
    print(f"传染轮次: {iteration}")
    print(f"总传染节点: {total_contagion:.0f}/{n}")

    return shocks, iteration

# 模拟银行间传染
bank_network = nx.barabasi_albert_graph(50, 3)  # 规模自由网络
bank_adj = create_adjacency_matrix(bank_network)

# 初始5家银行违约
initial_defaults = np.random.choice(50, 5, replace=False)
contagion_shocks, steps = contagion_risk_analysis(bank_adj, initial_defaults)
print(f"传染完成,影响{contagion_shocks.sum():.0f}家银行")
网络VaR计算
def network_var(adj_matrix, asset_returns, weights, alpha=0.05):
    """网络增强的VaR计算(考虑传染效应)"""

    # 1. 独立VaR
    portfolio_return = np.dot(asset_returns, weights)
    standalone_var = np.percentile(portfolio_return, alpha * 100)

    # 2. 网络传染VaR
    n_assets, n_scenarios = asset_returns.shape

    # 构建情景下的网络冲击
    network_shocks = []
    for scenario in range(n_scenarios):
        # 基于相关性生成网络扰动
        shock_vector = asset_returns[scenario]
        network_effect = adj_matrix @ (shock_vector * weights)
        total_shock = np.dot(weights, shock_vector + 0.3 * network_effect)
        network_shocks.append(total_shock)

    network_var = np.percentile(network_shocks, alpha * 100)

    return {
        'standalone_var': standalone_var,
        'network_var': network_var,
        'contagion_amplification': network_var / standalone_var - 1
    }

# 示例数据
n_scenarios = 10000
asset_returns = np.random.multivariate_normal(np.zeros(n_assets), 
                                            np.eye(n_assets) * 0.02**2, 
                                            n_scenarios)
weights = np.random.rand(n_assets)
weights /= weights.sum()

var_results = network_var(adj_csr, asset_returns, weights)
print(f"独立VaR: {var_results['standalone_var']:.4f}")
print(f"网络VaR: {var_results['network_var']:.4f}")
print(f"传染放大: {var_results['contagion_amplification']*100:.1f}%")

4. 社区检测与模块化

谱聚类社区检测
from scipy.sparse.linalg import eigs
from sklearn.cluster import SpectralClustering

def spectral_community_detection(adj_matrix, n_communities=5):
    """谱聚类社区检测"""

    # 计算归一化拉普拉斯矩阵
    degree_matrix = sparse.diags(np.array(adj_matrix.sum(axis=1)).flatten())
    laplacian = degree_matrix - adj_matrix

    # 归一化拉普拉斯
    D_inv_sqrt = sparse.diags(1 / np.sqrt(np.array(degree_matrix.diagonal())))
    norm_laplacian = D_inv_sqrt @ laplacian @ D_inv_sqrt

    # 计算前k个特征向量
    eigenvalues, eigenvectors = eigs(norm_laplacian, k=n_communities, 
                                   which='SR', tol=1e-4)

    # 谱聚类
    spectral = SpectralClustering(n_clusters=n_communities, 
                                 affinity='precomputed',
                                 random_state=42)
    communities = spectral.fit_predict(eigenvectors.real)

    return communities, eigenvalues

# 检测金融行业社区
communities, evals = spectral_community_detection(adj_csr, n_communities=6)

# 社区统计
unique_communities, community_sizes = np.unique(community_sizes, return_counts=True)
print("社区检测结果:")
for comm, size in zip(unique_communities, community_sizes):
    print(f"社区 {comm}: {size}个资产")
模块化优化(Louvain算法近似)
def modularity_optimization(adj_matrix, resolution=1.0):
    """模块化优化(贪婪算法)"""
    from sklearn.cluster import AgglomerativeClustering

    # 层次聚类近似模块化
    connectivity = adj_matrix > 0
    clustering = AgglomerativeClustering(n_clusters=None, 
                                       distance_threshold=0,
                                       connectivity=connectivity,
                                       linkage='ward')

    # 初始聚类
    labels = clustering.fit_predict(adj_matrix.toarray())

    # 模块化得分
    def modularity_score(adj, communities):
        """计算模块化得分"""
        n = adj.shape[0]
        Q = 0
        for comm in np.unique(comm):
            comm_mask = (communities == comm)
            e_ii = (adj[comm_mask][:, comm_mask].sum() / 2)
            a_i = adj[comm_mask].sum() / 2
            Q += (e_ii - a_i**2) / 1.0
        return Q

    return labels, modularity_score(adj_matrix, labels)

communities, modularity = modularity_optimization(adj_csr)
print(f"最优模块化得分: {modularity:.4f}")

5. 时变网络分析

滚动窗口网络
def rolling_network_analysis(returns, window=30, min_spanning_tree=True):
    """滚动窗口网络分析"""
    T, N = returns.shape
    networks = []

    for t in range(window, T):
        # 计算时间窗口相关性
        window_returns = returns[t-window:t]
        corr_matrix = np.corrcoef(window_returns.T)
        corr_matrix[np.abs(corr_matrix) < 0.1] = 0  # 阈值化

        # 转换为邻接矩阵
        adj_window = (np.abs(corr_matrix) > 0.5).astype(float)
        np.fill_diagonal(adj_window, 0)

        # 最小生成树(网络稀疏化)
        if min_spanning_tree:
            G_window = nx.from_numpy_array(adj_window)
            mst = nx.minimum_spanning_tree(G_window)
            adj_mst = nx.to_scipy_sparse_array(mst)
        else:
            adj_mst = sparse.csr_matrix(adj_window)

        networks.append({
            'time': t,
            'adj_matrix': adj_mst,
            'correlation': corr_matrix,
            'mst_edges': G_window.number_of_edges() if min_spanning_tree else None
        })

    return networks

# 示例:股票网络演化
returns = np.random.randn(252*2, 50) * 0.02  # 2年50只股票
networks = rolling_network_analysis(returns, window=60)

# 网络指标时间序列
mst_edges = [net['mst_edges'] for net in networks]
plt.figure(figsize=(12, 6))
plt.plot(mst_edges)
plt.title('最小生成树边数时间序列')
plt.ylabel('MST边数')
plt.xlabel('时间窗口')
plt.show()

6. 图神经网络数据准备

图拉普拉斯特征
def graph_laplacian_features(adj_matrix, degree_weighted=False):
    """图拉普拉斯算子特征(GNN输入)"""

    # 度矩阵
    if degree_weighted:
        degrees = np.array(adj_matrix.sum(axis=1)).flatten()
        D = sparse.diags(degrees)
    else:
        D = sparse.eye(adj_matrix.shape[0], format='csr')

    # 归一化拉普拉斯
    L = D - adj_matrix
    L_norm = sparse.linalg.inv(D**0.5) @ L @ sparse.linalg.inv(D**0.5)

    # 拉普拉斯特征向量(图谱嵌入)
    try:
        eigenvalues, eigenvectors = eigsh(L_norm, k=32, which='SM')
        laplacian_features = eigenvectors.real
    except:
        laplacian_features = np.eye(adj_matrix.shape[0])[:, :32]

    return {
        'laplacian_matrix': L_norm,
        'eigenvalues': eigenvalues,
        'features': laplacian_features
    }

# GNN特征提取
features = graph_laplacian_features(adj_csr)
print(f"图嵌入维度: {features['features'].shape}")

7. 高级图算法实现

PageRank(资产重要性)
def custom_pagerank(adj_matrix, alpha=0.85, max_iter=100, tol=1e-6):
    """自定义PageRank实现(SciPy稀疏)"""
    n = adj_matrix.shape[0]

    # 列归一化(随机超链接)
    col_sums = np.array(adj_matrix.sum(axis=0)).flatten()
    col_sums[col_sums == 0] = 1  # 避免除零
    norm_adj = adj_matrix / col_sums

    # 随机跳转矩阵
    teleport = (1 - alpha) / n * sparse.eye(n, format='csr')

    # PageRank迭代
    pr = np.ones(n) / n  # 初始均匀分布
    for iteration in range(max_iter):
        pr_new = alpha * (norm_adj.T @ pr) + teleport @ pr
        if np.linalg.norm(pr_new - pr, 1) < tol:
            break
        pr = pr_new

    return pr / pr.sum()  # 归一化

# 资产PageRank(交易网络)
pagerank_scores = custom_pagerank(trade_adj)
top_assets = np.argsort(pagerank_scores)[-5:]
print("PageRank Top资产:", [assets[i] for i in top_assets])
最短路径与套利机会
from scipy.sparse.csgraph import shortest_path, dijkstra

def arbitrage_opportunities(adj_matrix, prices, max_path_length=5):
    """检测套利机会(图最短路径)"""

    # 转换为距离矩阵(负对数价格)
    log_prices = np.log(prices)
    distance_matrix = -adj_matrix.multiply(log_prices)  # 价格越高,距离越小

    # 多源最短路径
    distances, predecessors = shortest_path(distance_matrix, 
                                          directed=True, 
                                          return_predecessors=True)

    # 寻找负循环(套利)
    arbitrage_cycles = []
    for start in range(adj_matrix.shape[0]):
        # 重建路径
        path = []
        current = start
        total_profit = 0

        for _ in range(max_path_length):
            next_node = predecessors[start, current]
            if next_node == -9999:  # 无路径
                break
            edge_profit = prices[current] / prices[next_node] - 1
            total_profit += edge_profit
            path.append((current, next_node))
            current = next_node

        if total_profit > 0.01:  # 1%套利机会
            arbitrage_cycles.append({
                'path': path,
                'profit': total_profit,
                'start': start
            })

    return sorted(arbitrage_cycles, key=lambda x: x['profit'], reverse=True)

# 模拟套利网络
prices = np.random.uniform(0.8, 1.2, n_assets)
arbitrages = arbitrage_opportunities(trade_adj, prices)
if arbitrage_cycles:
    print("发现套利机会:")
    for arb in arbitrage_cycles[:3]:
        print(f"路径: {arb['path']}, 利润: {arb['profit']:.2%}")

8. 网络可视化与分析

网络可视化
def visualize_financial_network(G, centrality_measure='degree', top_n=20):
    """金融网络可视化"""

    # 计算中心性
    if centrality_measure == 'degree':
        centrality = dict(G.degree())
    elif centrality_measure == 'pagerank':
        centrality = nx.pagerank(G)

    # 选择重要节点
    top_nodes = sorted(centrality, key=centrality.get, reverse=True)[:top_n]
    subgraph = G.subgraph(top_nodes).copy()

    plt.figure(figsize=(12, 10))
    pos = nx.spring_layout(subgraph, k=1, iterations=50)

    # 节点大小基于中心性
    node_sizes = [3000 * centrality[node] / max(centrality.values()) 
                  for node in subgraph.nodes()]

    # 边权重可视化
    edges = subgraph.edges()
    weights = [G[u][v].get('weight', 1) for u, v in edges]

    nx.draw_networkx_nodes(subgraph, pos, node_size=node_sizes, 
                          node_color='lightblue', alpha=0.8)
    nx.draw_networkx_edges(subgraph, pos, width=[w*2 for w in weights], 
                          alpha=0.5, edge_color='gray')
    nx.draw_networkx_labels(subgraph, pos, {n: G.nodes[n]['symbol'][:4] 
                                         for n in subgraph.nodes()}, 
                          font_size=8)

    plt.title(f"金融网络 (Top {top_n}资产, {centrality_measure}中心性)")
    plt.axis('off')
    plt.tight_layout()
    plt.show()

# 可视化
visualize_financial_network(G, centrality_measure='pagerank')

9. 性能优化与大规模网络

大规模网络处理
def large_scale_network_processing(n_nodes=10000, density=0.001):
    """大规模网络性能测试"""

    # 生成大规模随机网络
    G_large = nx.erdos_renyi_graph(n_nodes, density)
    adj_large = nx.to_scipy_sparse_array(G_large, format='csr')

    print(f"大规模网络: {n_nodes}节点, {G_large.number_of_edges():,}边")
    print(f"内存占用: {adj_large.data.nbytes / 1e6:.1f} MB")

    # 高效中心性计算
    degrees = np.array(adj_large.sum(axis=1)).flatten()
    top_nodes = np.argsort(degrees)[-100:]  # Top 100

    # 子图分析
    subgraph_adj = adj_large[top_nodes][:, top_nodes]
    subgraph_stats = analyze_network_stats(subgraph_adj)

    return adj_large, subgraph_stats

large_adj, stats = large_scale_network_processing()
并行图计算
from multiprocessing import Pool
import dask.array as da

def parallel_pagerank(adj_matrix, n_workers=4):
    """并行PageRank计算"""

    # 分块计算
    block_size = adj_matrix.shape[0] // n_workers
    pr_blocks = []

    for i in range(n_workers):
        start = i * block_size
        end = min((i+1) * block_size, adj_matrix.shape[0])
        block_adj = adj_matrix[start:end, :]

        # 每个块独立计算
        pr_block = custom_pagerank(block_adj)
        pr_blocks.append(pr_block)

    # 合并结果
    full_pr = np.concatenate(pr_blocks)
    return full_pr / full_pr.sum()

10. 金融网络最佳实践

网络构建策略
class FinancialNetworkAnalyzer:
    """金融网络分析器"""

    def __init__(self, correlation_threshold=0.6, min_weight=0.1):
        self.corr_threshold = correlation_threshold
        self.min_weight = min_weight

    def build_from_returns(self, returns):
        """从收益构建网络"""
        corr_matrix = np.corrcoef(returns.T)

        # 阈值化相关性网络
        adj_matrix = (np.abs(corr_matrix) > self.corr_threshold).astype(float)
        adj_matrix[np.abs(corr_matrix) < self.min_weight] = 0
        np.fill_diagonal(adj_matrix, 0)

        self.adj_matrix = sparse.csr_matrix(adj_matrix)
        self.correlation_matrix = corr_matrix
        return self

    def systemic_risk_score(self):
        """系统性风险评分"""
        centralities = centrality_measures(self.adj_matrix)

        # 综合系统性指标
        systemic_score = (centralities['degree'] + 
                         centralities['eigenvector'] * 2 + 
                         centralities['katz']) / 4

        return np.argsort(systemic_score)[-10:]  # Top 10系统性资产

    def stress_test(self, shock_assets, shock_magnitude=0.5):
        """压力测试"""
        contagion_shocks, steps = contagion_risk_analysis(
            self.adj_matrix, shock_assets, contagion_param=shock_magnitude)
        return {
            'affected_assets': np.where(contagion_shocks > 0)[0],
            'contagion_steps': steps,
            'total_impact': contagion_shocks.sum()
        }

# 使用示例
analyzer = FinancialNetworkAnalyzer(correlation_threshold=0.7)
analyzer.build_from_returns(returns)
systemic_assets = analyzer.systemic_risk_score()
print("系统性风险资产:", [assets[i] for i in systemic_assets])

# 压力测试
stress_results = analyzer.stress_test(systemic_assets[:3])
print(f"压力测试影响: {len(stress_results['affected_assets'])}个资产")

SciPy图结构分析结合稀疏矩阵技术,为金融网络风险管理、资产配置优化和系统性风险识别提供了强大工具。通过中心性分析、社区检测和传染模型,可以有效识别市场脆弱点和系统重要性机构。需要特定金融网络算法或与深度学习图神经网络的集成,请告诉我具体需求!

类似文章

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注