SciPy 稀疏矩阵

SciPy 稀疏矩阵全面指南:大规模量化金融应用

scipy.sparse 模块提供了高效的稀疏矩阵存储和计算功能,对于处理大规模金融数据(如高频交易tick数据、因子模型、风险计算)至关重要。稀疏矩阵可以将内存占用从O(n²)降低到O(nnz),其中nnz是非零元素个数

1. 稀疏矩阵存储格式对比

格式全称特点内存占用适用场景
CSRCompressed Sparse Row行压缩,快速矩阵-向量乘法投资组合计算、风险因子
CSCCompressed Sparse Column列压缩,快速列操作特征工程、切片查询
COOCoordinate坐标格式,快速构建数据导入、稀疏数据构造
DOKDictionary of Keys字典存储,支持动态更新稀疏矩阵初始化
LILList of Lists每行列表,支持动态修改稀疏矩阵构建阶段
BSRBlock Sparse Row块稀疏,快速块操作结构化稀疏数据

2. 基本创建与转换

from scipy import sparse
import numpy as np
import pandas as pd

# 方法1:从密集矩阵转换
dense_matrix = np.random.rand(1000, 1000)
dense_matrix[np.random.rand(1000, 1000) > 0.95] = 0  # 95%稀疏

csr_matrix = sparse.csr_matrix(dense_matrix)
csc_matrix = sparse.csc_matrix(dense_matrix)
coo_matrix = sparse.coo_matrix(dense_matrix)

print(f"密集矩阵: {dense_matrix.nbytes / 1e6:.1f} MB")
print(f"CSR矩阵: {csr_matrix.data.nbytes / 1e6:.1f} MB")
print(f"稀疏率: {100 * csr_matrix.nnz / (csr_matrix.shape[0] * csr_matrix.shape[1]):.1f}%")
print(f"压缩比: {dense_matrix.nbytes / csr_matrix.data.nbytes:.1f}x")

COO格式构造(金融数据导入)

# 从市场数据坐标构造(行:时间,列:资产)
rows = np.array([0, 0, 1, 2, 2, 3])  # 时间索引
cols = np.array([0, 2, 1, 0, 1, 2])  # 资产索引
data = np.array([1.0, 0.5, 0.8, 1.2, 0.9, 0.3])  # 权重/暴露

# COO矩阵
exposure_coo = sparse.coo_matrix((data, (rows, cols)), 
                                shape=(1000, 500))  # 1000天×500资产

# 转换为CSR(计算友好)
exposure_csr = exposure_coo.tocsr()
print(f"非零元素: {exposure_csr.nnz}")
print(f"每行平均非零: {exposure_csr.nnz / exposure_csr.shape[0]:.1f}")

从Pandas稀疏DataFrame转换

# 模拟因子暴露数据
dates = pd.date_range('2024-01-01', periods=252)
assets = [f'ASSET_{i}' for i in range(100)]
df_sparse = pd.DataFrame(index=dates, columns=assets, dtype=float)

# 随机生成稀疏因子暴露(95%为0)
for col in assets:
    non_zero_days = np.random.choice(len(dates), size=int(0.05 * len(dates)), replace=False)
    df_sparse.iloc[non_zero_days, assets.index(col)] = np.random.randn(len(non_zero_days))

# 转换为SciPy稀疏矩阵
sparse_matrix = sparse.coo_matrix(df_sparse.sparse.to_coo()).tocsr()
print(f"Pandas稀疏DF -> SciPy CSR: {sparse_matrix.shape}")

3. 金融应用:大规模协方差矩阵

因子模型协方差计算

def build_factor_covariance(factor_returns, factor_weights):
    """
    构建因子模型协方差矩阵(稀疏优化)

    factor_returns: (T, K) 因子收益矩阵,T时间,K因子
    factor_weights: (N, K) 资产因子暴露矩阵,N资产
    """
    T, K = factor_returns.shape
    N, _ = factor_weights.shape

    # 因子协方差(密集K×K)
    factor_cov = np.cov(factor_returns.T)

    # 构建稀疏的F'ΣF(N×N协方差)
    # 使用CSR格式的矩阵乘法
    weights_csr = sparse.csr_matrix(factor_weights)
    cov_result = weights_csr @ factor_cov @ weights_csr.T

    # 添加特异风险(对角稀疏矩阵)
    idiosyncratic_var = np.random.rand(N) * 0.01**2  # 特异方差
    diag_sparse = sparse.diags(idiosyncratic_var, 0, shape=(N, N), format='csr')

    total_cov = cov_result + diag_sparse

    return total_cov.tocsr()

# 示例:多因子模型
np.random.seed(42)
T, K, N = 252, 10, 1000  # 252天,10因子,1000资产
factor_returns = np.random.randn(T, K) * 0.02
factor_weights = sparse.random(N, K, density=0.1, format='csr').toarray()  # 90%稀疏暴露

large_cov = build_factor_covariance(factor_returns, factor_weights)
print(f"大规模协方差: {large_cov.shape}")
print(f"非零元素: {large_cov.nnz}")
print(f"内存节省: {large_cov.nnz / (N*N) * 100:.1f}% 非零")

风险计算(VaR)

def portfolio_var_sparse(weights, cov_matrix, alpha=0.05, n_simulations=10000):
    """使用稀疏协方差的组合VaR计算"""

    # Cholesky分解(稀疏)
    try:
        L = sparse.linalg.splu(cov_matrix).L  # LU分解
        chol_L = sparse.linalg.spilu(L, drop_tol=1e-6).L  # 不完整Cholesky
    except:
        # 回退到密集计算(小矩阵)
        return portfolio_var_dense(weights, cov_matrix, alpha, n_simulations)

    # 蒙特卡洛模拟(利用稀疏性)
    weights_csr = sparse.csr_matrix(weights.reshape(1, -1))
    n_assets = len(weights)

    # 生成标准正态随机变量
    Z = np.random.randn(n_simulations, n_assets)

    # 批量矩阵乘法(利用稀疏性)
    factors = Z @ chol_L.T.toarray()  # 转换为密集(计算阶段)
    portfolio_returns = (weights_csr @ factors.T).flatten()

    var = np.percentile(portfolio_returns, alpha * 100)
    cvar = portfolio_returns[portfolio_returns <= var].mean()

    return var, cvar

# 示例:大规模投资组合VaR
weights = np.random.rand(1000)
weights /= weights.sum()  # 标准化权重
var_95, cvar_95 = portfolio_var_sparse(weights, large_cov)
print(f"95% VaR: {var_95:.4f}, CVaR: {cvar_95:.4f}")

4. 稀疏线性代数运算

稀疏矩阵求解器

from scipy.sparse.linalg import spsolve, spilu, lgmres, gmres, bicgstab

def solve_portfolio_constraints(A_sparse, b):
    """
    解决稀疏约束系统:Aω = b
    如:权重约束、行业暴露约束
    """
    # 直接求解器(小到中等规模)
    try:
        x = spsolve(A_sparse, b)
        return x, 'direct'
    except:
        pass

    # 迭代求解器(大规模)
    solvers = [
        ('GMRES', lambda M, b: gmres(M, b, tol=1e-8)[0]),
        ('BiCGSTAB', lambda M, b: bicgstab(M, b, tol=1e-8)[0]),
        ('LGMRES', lambda M, b: lgmres(M, b, tol=1e-8)[0])
    ]

    for name, solver in solvers:
        try:
            x, info = solver(A_sparse, b)
            if info == 0:
                return x, name
        except:
            continue

    raise ValueError("所有求解器失败")

# 示例:稀疏约束系统
n_assets = 1000
A = sparse.random(n_assets, n_assets//10, density=0.1, format='csr')  # 稀疏约束矩阵
b = np.ones(n_assets//10)
solution, method = solve_portfolio_constraints(A, b)
print(f"求解成功,使用{method}方法")

特征值分解(大规模PCA)

from scipy.sparse.linalg import eigsh, lobpcg

def sparse_pca(cov_matrix, n_components=10):
    """大规模协方差矩阵的主成分分析"""

    # 计算前n个最大特征值/向量(Lanczos算法)
    eigenvalues, eigenvectors = eigsh(cov_matrix, k=n_components, 
                                    which='LM', maxiter=1000)

    # 解释方差比
    total_variance = eigenvalues.sum()
    explained_variance = eigenvalues / total_variance

    print(f"前{n_components}个特征值:")
    for i, (ev, var) in enumerate(zip(eigenvalues[::-1], explained_variance[::-1])):
        print(f"PC{i+1}: 特征值={ev:.4f}, 解释方差={var:.2%}")

    return eigenvalues, eigenvectors, explained_variance

# 应用到大规模协方差
eigenvalues, eigenvectors, explained = sparse_pca(large_cov, n_components=5)

# 重构(低秩近似)
n_components = 50
U, s, Vt = sparse.linalg.svds(large_cov, k=n_components)
low_rank_approx = U @ np.diag(s) @ Vt

print(f"低秩近似误差: {np.linalg.norm(large_cov.toarray() - low_rank_approx.toarray()):.2e}")

5. 高性能稀疏运算

矩阵-向量乘法优化

def benchmark_sparse_operations():
    """稀疏矩阵运算性能基准测试"""
    n = 10000
    density = 0.01  # 1%非零

    # 生成稀疏矩阵
    A_csr = sparse.random(n, n, density=density, format='csr')
    A_csc = A_csr.tocsc()
    x = np.random.randn(n)

    # CSR vs CSC性能对比
    %timeit A_csr @ x  # 矩阵-向量乘法(CSR更快)
    %timeit A_csc @ x

    # 稀疏矩阵乘法
    B_csr = sparse.random(n, n, density=0.005, format='csr')
    %timeit A_csr @ B_csr  # 稀疏×稀疏(需要小心内存峰值)

    # 转置操作
    %timeit A_csr.T
    %timeit A_csr.transpose()

    return A_csr

A = benchmark_sparse_operations()

批处理运算(向量化)

def batch_portfolio_returns(weights_batch, cov_matrix, returns_batch):
    """
    批量计算多投资组合收益(稀疏优化)

    weights_batch: (B, N) B个投资组合的权重
    returns_batch: (T, N) T天的资产收益
    """
    B, N = weights_batch.shape
    T, _ = returns_batch.shape

    # 转换为稀疏格式
    weights_sparse = sparse.csr_matrix(weights_batch)  # (B, N)
    returns_sparse = sparse.csr_matrix(returns_batch.T)  # (N, T)

    # 批量矩阵乘法:(B, N) × (N, T) = (B, T)
    portfolio_returns = weights_sparse @ returns_sparse

    return portfolio_returns.toarray()

# 示例:多策略回测
n_strategies, n_days, n_assets = 100, 252, 5000
weights_batch = np.random.rand(n_strategies, n_assets)
weights_batch /= weights_batch.sum(axis=1, keepdims=True)

returns_batch = np.random.randn(n_days, n_assets) * 0.01
strategy_returns = batch_portfolio_returns(weights_batch, None, returns_batch)

# 计算夏普比率
sharpe_ratios = (strategy_returns.mean(axis=1) / 
                strategy_returns.std(axis=1)) * np.sqrt(252)
print(f"最佳策略夏普比率: {sharpe_ratios.max():.3f}")

6. 稀疏矩阵在因子模型中的应用

多因子模型(Fama-French扩展)

class SparseFactorModel:
    """稀疏多因子模型"""

    def __init__(self, n_assets, n_factors):
        self.n_assets = n_assets
        self.n_factors = n_factors
        self.factor_exposures = None
        self.factor_returns = None

    def fit_exposures(self, asset_returns, max_iter=100):
        """估计稀疏因子暴露(L1正则化)"""
        T, N = asset_returns.shape

        # 构建稀疏回归问题
        from sklearn.linear_model import Lasso
        exposures = np.zeros((N, self.n_factors))

        for i in range(N):
            # 每个资产的因子暴露回归
            lasso = Lasso(alpha=0.01, max_iter=max_iter, selection='random')
            lasso.fit(self.factor_returns, asset_returns[:, i])
            exposures[i] = lasso.coef_

        # 转换为稀疏矩阵
        self.factor_exposures = sparse.csr_matrix(exposures)
        return self

    def predict_returns(self, factor_returns):
        """预测资产收益:β × f"""
        predicted = self.factor_exposures.T @ factor_returns  # (N, T)
        return predicted.T  # (T, N)

    def risk_decomposition(self):
        """风险分解:系统风险 + 特异风险"""
        # 系统风险协方差:F'Λ'F
        factor_cov = np.cov(self.factor_returns.T)
        systematic_cov = self.factor_exposures @ factor_cov @ self.factor_exposures.T

        # 特异风险(对角)
        idiosyncratic_var = np.random.rand(self.n_assets) * 0.01**2
        specific_cov = sparse.diags(idiosyncratic_var)

        total_cov = systematic_cov + specific_cov
        return total_cov.tocsr()

# 示例使用
model = SparseFactorModel(n_assets=2000, n_factors=20)
model.factor_returns = np.random.randn(252, 20) * 0.02
asset_returns = np.random.randn(252, 2000) * 0.01

model.fit_exposures(asset_returns)
risk_cov = model.risk_decomposition()
print(f"因子模型协方差: {risk_cov.shape}, 非零: {risk_cov.nnz}")

7. 稀疏矩阵I/O与持久化

高效存储与加载

def save_sparse_matrix(matrix, filename):
    """保存稀疏矩阵(CSR格式)"""
    from scipy.io import mmwrite
    import pickle

    # Matrix Market格式(标准)
    mmwrite(filename + '.mtx', matrix)

    # Pickle(包含元数据)
    with open(filename + '.pkl', 'wb') as f:
        pickle.dump({
            'data': matrix.data,
            'indices': matrix.indices,
            'indptr': matrix.indptr,
            'shape': matrix.shape,
            'format': matrix.format
        }, f)

    print(f"保存: {filename}")
    print(f"Matrix Market: {filename}.mtx")
    print(f"Pickle: {filename}.pkl")

def load_sparse_matrix(filename):
    """加载稀疏矩阵"""
    from scipy.io import mmread

    # Matrix Market加载
    matrix = mmread(filename + '.mtx').tocsr()
    return matrix

# HDF5存储(大规模数据)
import h5py

def save_sparse_hdf5(matrix, filename):
    """HDF5格式存储(支持超大规模)"""
    with h5py.File(filename, 'w') as f:
        f.create_dataset('data', data=matrix.data)
        f.create_dataset('indices', data=matrix.indices)
        f.create_dataset('indptr', data=matrix.indptr)
        f.attrs['shape'] = matrix.shape
        f.attrs['nnz'] = matrix.nnz

    print(f"HDF5文件大小: {os.path.getsize(filename) / 1e6:.1f} MB")

# 示例
save_sparse_matrix(large_cov, 'large_cov')
loaded_cov = load_sparse_matrix('large_cov')
assert np.allclose(large_cov.toarray(), loaded_cov.toarray())

8. 高级技巧与优化

预条件器加速迭代求解

from scipy.sparse.linalg import LinearOperator, aslinearoperator

def ilu_preconditioner(A, drop_tol=1e-4):
    """不完全LU预条件器"""
    ilu = spilu(A, drop_tol=drop_tol, fill_factor=10)

    def matvec(v):
        return ilu.solve(v)

    def rmatvec(v):
        return ilu.solve(v)  # 对称

    P = LinearOperator(A.shape, matvec=matvec, rmatvec=rmatvec)
    return P

# 使用预条件器的GMRES
A = sparse.random(5000, 5000, density=0.01, format='csr')
b = np.random.randn(5000)

P = ilu_preconditioner(A, drop_tol=1e-5)
x, info = gmres(A, b, M=P, tol=1e-8)
print(f"预条件GMRES收敛: {info == 0}")

稀疏矩阵切片与索引优化

def efficient_sparse_slicing(sparse_matrix):
    """高效的稀疏矩阵切片"""

    # CSR格式:行切片高效
    rows_subset = sparse_matrix[::10, :]  # 每10行取1行

    # CSC格式:列切片高效
    cols_subset = sparse_matrix[:, ::10].tocsc()

    # 高级索引(保持稀疏性)
    row_indices = np.random.choice(sparse_matrix.shape[0], 1000, replace=False)
    col_indices = np.random.choice(sparse_matrix.shape[1], 1000, replace=False)
    submatrix = sparse_matrix[row_indices[:, None], col_indices]

    return rows_subset, cols_subset, submatrix

# 行业子集提取
industry_mask = np.random.choice([0, 1], size=(n_assets,), p=[0.8, 0.2])
industry_assets = np.where(industry_mask)[0]
industry_cov = large_cov[industry_assets][:, industry_assets]

9. 内存管理与性能调优

内存监控

def monitor_sparse_memory(matrix):
    """稀疏矩阵内存分析"""
    print(f"形状: {matrix.shape}")
    print(f"非零元素: {matrix.nnz}")
    print(f"数据内存: {matrix.data.nbytes / 1e6:.1f} MB")
    print(f"索引内存: {(matrix.indices.nbytes + matrix.indptr.nbytes) / 1e6:.1f} MB")
    print(f"总内存: {(matrix.data.nbytes + matrix.indices.nbytes + matrix.indptr.nbytes) / 1e6:.1f} MB")
    print(f"密集等价: {matrix.shape[0] * matrix.shape[1] * 8 / 1e6:.1f} MB")
    print(f"节省率: {1 - matrix.nnz / (matrix.shape[0] * matrix.shape[1]):.1%}")

monitor_sparse_memory(large_cov)

垃圾回收与内存优化

import gc

def optimize_memory_usage():
    """稀疏矩阵内存优化"""
    # 强制垃圾回收
    gc.collect()

    # 转换到最优格式
    if sparse_matrix.format != 'csr':
        sparse_matrix = sparse_matrix.tocsr()

    # 消除零元素
    sparse_matrix.eliminate_zeros()

    # 压缩索引(如果适用)
    if hasattr(sparse_matrix, 'sum_duplicates'):
        sparse_matrix.sum_duplicates()

    return sparse_matrix

large_cov = optimize_memory_usage()

10. 与其他库集成

CuPy GPU加速(大规模)

# 需要:pip install cupy
import cupy as cp
from scipy.sparse import issparse

def gpu_sparse_multiply(A_scipy, x):
    """CPU稀疏矩阵 → GPU加速乘法"""
    if not issparse(A_scipy):
        raise ValueError("输入必须是SciPy稀疏矩阵")

    # 转换为CuPy稀疏
    A_cupy = cp.sparse.csr_matrix(A_scipy)
    x_gpu = cp.asarray(x)

    # GPU计算
    y_gpu = A_cupy @ x_gpu
    y_cpu = cp.asnumpy(y_gpu)

    return y_cpu

# 大规模风险计算
portfolio_value = gpu_sparse_multiply(large_cov, weights)

Dask分布式计算

import dask.array as da

def dask_sparse_integration(sparse_matrix, block_size=1000):
    """Dask分布式稀疏计算"""
    # 分块处理
    blocks = []
    n_rows, n_cols = sparse_matrix.shape

    for i in range(0, n_rows, block_size):
        row_block = sparse_matrix[i:i+block_size, :]
        blocks.append(da.from_array(row_block.toarray(), chunks='auto'))

    # Dask数组堆叠
    dask_array = da.vstack(blocks)

    # 分布式计算
    result = dask_array @ da.random.normal(size=(n_cols,), chunks=block_size)
    computed = result.compute()

    return computed

11. 最佳实践与注意事项

格式选择指南

def choose_optimal_format(operation):
    """根据操作选择最优稀疏格式"""
    format_recommendations = {
        'matrix_vector_multiply': 'csr',
        'vector_matrix_multiply': 'csc',
        'elementwise_operations': 'coo',
        'dynamic_updates': 'lil',
        'block_operations': 'bsr'
    }

    return format_recommendations.get(operation, 'csr')

# 示例:根据操作优化格式
A = sparse.random(10000, 10000, 0.01)
for op, fmt in [('matrix_vector', 'csr'), ('column_sum', 'csc')]:
    A_opt = A.asformat(fmt)
    print(f"{op}优化格式: {fmt}")

数值稳定性

def check_sparse_numerical_stability(matrix):
    """检查稀疏矩阵数值稳定性"""
    # 条件数估计
    cond_est = sparse.linalg.norm(matrix) * sparse.linalg.norm(sparse.linalg.inv(matrix))

    # 非零元素分布
    nnz_per_row = np.diff(matrix.indptr)
    sparsity_variation = nnz_per_row.std() / nnz_per_row.mean()

    warnings = []
    if cond_est > 1e10:
        warnings.append("警告:条件数过大,可能数值不稳定")
    if sparsity_variation > 2:
        warnings.append("警告:行稀疏度变化剧烈")

    for warning in warnings:
        print(warning)

    return {'condition_number': cond_est, 'sparsity_variation': sparsity_variation}

stability = check_sparse_numerical_stability(large_cov)

SciPy稀疏矩阵是处理金融大规模数据问题的核心工具,通过合理选择存储格式和算法,可以在保持精度的同时大幅降低内存和计算成本。需要特定稀疏算法实现或与深度学习框架的集成,请告诉我具体需求!

类似文章

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注