SciPy 稀疏矩阵
SciPy 稀疏矩阵全面指南:大规模量化金融应用
scipy.sparse
模块提供了高效的稀疏矩阵存储和计算功能,对于处理大规模金融数据(如高频交易tick数据、因子模型、风险计算)至关重要。稀疏矩阵可以将内存占用从O(n²)降低到O(nnz),其中nnz是非零元素个数。
1. 稀疏矩阵存储格式对比
格式 | 全称 | 特点 | 内存占用 | 适用场景 |
---|---|---|---|---|
CSR | Compressed Sparse Row | 行压缩,快速矩阵-向量乘法 | 低 | 投资组合计算、风险因子 |
CSC | Compressed Sparse Column | 列压缩,快速列操作 | 低 | 特征工程、切片查询 |
COO | Coordinate | 坐标格式,快速构建 | 中 | 数据导入、稀疏数据构造 |
DOK | Dictionary of Keys | 字典存储,支持动态更新 | 高 | 稀疏矩阵初始化 |
LIL | List of Lists | 每行列表,支持动态修改 | 中 | 稀疏矩阵构建阶段 |
BSR | Block Sparse Row | 块稀疏,快速块操作 | 中 | 结构化稀疏数据 |
2. 基本创建与转换
from scipy import sparse
import numpy as np
import pandas as pd
# 方法1:从密集矩阵转换
dense_matrix = np.random.rand(1000, 1000)
dense_matrix[np.random.rand(1000, 1000) > 0.95] = 0 # 95%稀疏
csr_matrix = sparse.csr_matrix(dense_matrix)
csc_matrix = sparse.csc_matrix(dense_matrix)
coo_matrix = sparse.coo_matrix(dense_matrix)
print(f"密集矩阵: {dense_matrix.nbytes / 1e6:.1f} MB")
print(f"CSR矩阵: {csr_matrix.data.nbytes / 1e6:.1f} MB")
print(f"稀疏率: {100 * csr_matrix.nnz / (csr_matrix.shape[0] * csr_matrix.shape[1]):.1f}%")
print(f"压缩比: {dense_matrix.nbytes / csr_matrix.data.nbytes:.1f}x")
COO格式构造(金融数据导入)
# 从市场数据坐标构造(行:时间,列:资产)
rows = np.array([0, 0, 1, 2, 2, 3]) # 时间索引
cols = np.array([0, 2, 1, 0, 1, 2]) # 资产索引
data = np.array([1.0, 0.5, 0.8, 1.2, 0.9, 0.3]) # 权重/暴露
# COO矩阵
exposure_coo = sparse.coo_matrix((data, (rows, cols)),
shape=(1000, 500)) # 1000天×500资产
# 转换为CSR(计算友好)
exposure_csr = exposure_coo.tocsr()
print(f"非零元素: {exposure_csr.nnz}")
print(f"每行平均非零: {exposure_csr.nnz / exposure_csr.shape[0]:.1f}")
从Pandas稀疏DataFrame转换
# 模拟因子暴露数据
dates = pd.date_range('2024-01-01', periods=252)
assets = [f'ASSET_{i}' for i in range(100)]
df_sparse = pd.DataFrame(index=dates, columns=assets, dtype=float)
# 随机生成稀疏因子暴露(95%为0)
for col in assets:
non_zero_days = np.random.choice(len(dates), size=int(0.05 * len(dates)), replace=False)
df_sparse.iloc[non_zero_days, assets.index(col)] = np.random.randn(len(non_zero_days))
# 转换为SciPy稀疏矩阵
sparse_matrix = sparse.coo_matrix(df_sparse.sparse.to_coo()).tocsr()
print(f"Pandas稀疏DF -> SciPy CSR: {sparse_matrix.shape}")
3. 金融应用:大规模协方差矩阵
因子模型协方差计算
def build_factor_covariance(factor_returns, factor_weights):
"""
构建因子模型协方差矩阵(稀疏优化)
factor_returns: (T, K) 因子收益矩阵,T时间,K因子
factor_weights: (N, K) 资产因子暴露矩阵,N资产
"""
T, K = factor_returns.shape
N, _ = factor_weights.shape
# 因子协方差(密集K×K)
factor_cov = np.cov(factor_returns.T)
# 构建稀疏的F'ΣF(N×N协方差)
# 使用CSR格式的矩阵乘法
weights_csr = sparse.csr_matrix(factor_weights)
cov_result = weights_csr @ factor_cov @ weights_csr.T
# 添加特异风险(对角稀疏矩阵)
idiosyncratic_var = np.random.rand(N) * 0.01**2 # 特异方差
diag_sparse = sparse.diags(idiosyncratic_var, 0, shape=(N, N), format='csr')
total_cov = cov_result + diag_sparse
return total_cov.tocsr()
# 示例:多因子模型
np.random.seed(42)
T, K, N = 252, 10, 1000 # 252天,10因子,1000资产
factor_returns = np.random.randn(T, K) * 0.02
factor_weights = sparse.random(N, K, density=0.1, format='csr').toarray() # 90%稀疏暴露
large_cov = build_factor_covariance(factor_returns, factor_weights)
print(f"大规模协方差: {large_cov.shape}")
print(f"非零元素: {large_cov.nnz}")
print(f"内存节省: {large_cov.nnz / (N*N) * 100:.1f}% 非零")
风险计算(VaR)
def portfolio_var_sparse(weights, cov_matrix, alpha=0.05, n_simulations=10000):
"""使用稀疏协方差的组合VaR计算"""
# Cholesky分解(稀疏)
try:
L = sparse.linalg.splu(cov_matrix).L # LU分解
chol_L = sparse.linalg.spilu(L, drop_tol=1e-6).L # 不完整Cholesky
except:
# 回退到密集计算(小矩阵)
return portfolio_var_dense(weights, cov_matrix, alpha, n_simulations)
# 蒙特卡洛模拟(利用稀疏性)
weights_csr = sparse.csr_matrix(weights.reshape(1, -1))
n_assets = len(weights)
# 生成标准正态随机变量
Z = np.random.randn(n_simulations, n_assets)
# 批量矩阵乘法(利用稀疏性)
factors = Z @ chol_L.T.toarray() # 转换为密集(计算阶段)
portfolio_returns = (weights_csr @ factors.T).flatten()
var = np.percentile(portfolio_returns, alpha * 100)
cvar = portfolio_returns[portfolio_returns <= var].mean()
return var, cvar
# 示例:大规模投资组合VaR
weights = np.random.rand(1000)
weights /= weights.sum() # 标准化权重
var_95, cvar_95 = portfolio_var_sparse(weights, large_cov)
print(f"95% VaR: {var_95:.4f}, CVaR: {cvar_95:.4f}")
4. 稀疏线性代数运算
稀疏矩阵求解器
from scipy.sparse.linalg import spsolve, spilu, lgmres, gmres, bicgstab
def solve_portfolio_constraints(A_sparse, b):
"""
解决稀疏约束系统:Aω = b
如:权重约束、行业暴露约束
"""
# 直接求解器(小到中等规模)
try:
x = spsolve(A_sparse, b)
return x, 'direct'
except:
pass
# 迭代求解器(大规模)
solvers = [
('GMRES', lambda M, b: gmres(M, b, tol=1e-8)[0]),
('BiCGSTAB', lambda M, b: bicgstab(M, b, tol=1e-8)[0]),
('LGMRES', lambda M, b: lgmres(M, b, tol=1e-8)[0])
]
for name, solver in solvers:
try:
x, info = solver(A_sparse, b)
if info == 0:
return x, name
except:
continue
raise ValueError("所有求解器失败")
# 示例:稀疏约束系统
n_assets = 1000
A = sparse.random(n_assets, n_assets//10, density=0.1, format='csr') # 稀疏约束矩阵
b = np.ones(n_assets//10)
solution, method = solve_portfolio_constraints(A, b)
print(f"求解成功,使用{method}方法")
特征值分解(大规模PCA)
from scipy.sparse.linalg import eigsh, lobpcg
def sparse_pca(cov_matrix, n_components=10):
"""大规模协方差矩阵的主成分分析"""
# 计算前n个最大特征值/向量(Lanczos算法)
eigenvalues, eigenvectors = eigsh(cov_matrix, k=n_components,
which='LM', maxiter=1000)
# 解释方差比
total_variance = eigenvalues.sum()
explained_variance = eigenvalues / total_variance
print(f"前{n_components}个特征值:")
for i, (ev, var) in enumerate(zip(eigenvalues[::-1], explained_variance[::-1])):
print(f"PC{i+1}: 特征值={ev:.4f}, 解释方差={var:.2%}")
return eigenvalues, eigenvectors, explained_variance
# 应用到大规模协方差
eigenvalues, eigenvectors, explained = sparse_pca(large_cov, n_components=5)
# 重构(低秩近似)
n_components = 50
U, s, Vt = sparse.linalg.svds(large_cov, k=n_components)
low_rank_approx = U @ np.diag(s) @ Vt
print(f"低秩近似误差: {np.linalg.norm(large_cov.toarray() - low_rank_approx.toarray()):.2e}")
5. 高性能稀疏运算
矩阵-向量乘法优化
def benchmark_sparse_operations():
"""稀疏矩阵运算性能基准测试"""
n = 10000
density = 0.01 # 1%非零
# 生成稀疏矩阵
A_csr = sparse.random(n, n, density=density, format='csr')
A_csc = A_csr.tocsc()
x = np.random.randn(n)
# CSR vs CSC性能对比
%timeit A_csr @ x # 矩阵-向量乘法(CSR更快)
%timeit A_csc @ x
# 稀疏矩阵乘法
B_csr = sparse.random(n, n, density=0.005, format='csr')
%timeit A_csr @ B_csr # 稀疏×稀疏(需要小心内存峰值)
# 转置操作
%timeit A_csr.T
%timeit A_csr.transpose()
return A_csr
A = benchmark_sparse_operations()
批处理运算(向量化)
def batch_portfolio_returns(weights_batch, cov_matrix, returns_batch):
"""
批量计算多投资组合收益(稀疏优化)
weights_batch: (B, N) B个投资组合的权重
returns_batch: (T, N) T天的资产收益
"""
B, N = weights_batch.shape
T, _ = returns_batch.shape
# 转换为稀疏格式
weights_sparse = sparse.csr_matrix(weights_batch) # (B, N)
returns_sparse = sparse.csr_matrix(returns_batch.T) # (N, T)
# 批量矩阵乘法:(B, N) × (N, T) = (B, T)
portfolio_returns = weights_sparse @ returns_sparse
return portfolio_returns.toarray()
# 示例:多策略回测
n_strategies, n_days, n_assets = 100, 252, 5000
weights_batch = np.random.rand(n_strategies, n_assets)
weights_batch /= weights_batch.sum(axis=1, keepdims=True)
returns_batch = np.random.randn(n_days, n_assets) * 0.01
strategy_returns = batch_portfolio_returns(weights_batch, None, returns_batch)
# 计算夏普比率
sharpe_ratios = (strategy_returns.mean(axis=1) /
strategy_returns.std(axis=1)) * np.sqrt(252)
print(f"最佳策略夏普比率: {sharpe_ratios.max():.3f}")
6. 稀疏矩阵在因子模型中的应用
多因子模型(Fama-French扩展)
class SparseFactorModel:
"""稀疏多因子模型"""
def __init__(self, n_assets, n_factors):
self.n_assets = n_assets
self.n_factors = n_factors
self.factor_exposures = None
self.factor_returns = None
def fit_exposures(self, asset_returns, max_iter=100):
"""估计稀疏因子暴露(L1正则化)"""
T, N = asset_returns.shape
# 构建稀疏回归问题
from sklearn.linear_model import Lasso
exposures = np.zeros((N, self.n_factors))
for i in range(N):
# 每个资产的因子暴露回归
lasso = Lasso(alpha=0.01, max_iter=max_iter, selection='random')
lasso.fit(self.factor_returns, asset_returns[:, i])
exposures[i] = lasso.coef_
# 转换为稀疏矩阵
self.factor_exposures = sparse.csr_matrix(exposures)
return self
def predict_returns(self, factor_returns):
"""预测资产收益:β × f"""
predicted = self.factor_exposures.T @ factor_returns # (N, T)
return predicted.T # (T, N)
def risk_decomposition(self):
"""风险分解:系统风险 + 特异风险"""
# 系统风险协方差:F'Λ'F
factor_cov = np.cov(self.factor_returns.T)
systematic_cov = self.factor_exposures @ factor_cov @ self.factor_exposures.T
# 特异风险(对角)
idiosyncratic_var = np.random.rand(self.n_assets) * 0.01**2
specific_cov = sparse.diags(idiosyncratic_var)
total_cov = systematic_cov + specific_cov
return total_cov.tocsr()
# 示例使用
model = SparseFactorModel(n_assets=2000, n_factors=20)
model.factor_returns = np.random.randn(252, 20) * 0.02
asset_returns = np.random.randn(252, 2000) * 0.01
model.fit_exposures(asset_returns)
risk_cov = model.risk_decomposition()
print(f"因子模型协方差: {risk_cov.shape}, 非零: {risk_cov.nnz}")
7. 稀疏矩阵I/O与持久化
高效存储与加载
def save_sparse_matrix(matrix, filename):
"""保存稀疏矩阵(CSR格式)"""
from scipy.io import mmwrite
import pickle
# Matrix Market格式(标准)
mmwrite(filename + '.mtx', matrix)
# Pickle(包含元数据)
with open(filename + '.pkl', 'wb') as f:
pickle.dump({
'data': matrix.data,
'indices': matrix.indices,
'indptr': matrix.indptr,
'shape': matrix.shape,
'format': matrix.format
}, f)
print(f"保存: {filename}")
print(f"Matrix Market: {filename}.mtx")
print(f"Pickle: {filename}.pkl")
def load_sparse_matrix(filename):
"""加载稀疏矩阵"""
from scipy.io import mmread
# Matrix Market加载
matrix = mmread(filename + '.mtx').tocsr()
return matrix
# HDF5存储(大规模数据)
import h5py
def save_sparse_hdf5(matrix, filename):
"""HDF5格式存储(支持超大规模)"""
with h5py.File(filename, 'w') as f:
f.create_dataset('data', data=matrix.data)
f.create_dataset('indices', data=matrix.indices)
f.create_dataset('indptr', data=matrix.indptr)
f.attrs['shape'] = matrix.shape
f.attrs['nnz'] = matrix.nnz
print(f"HDF5文件大小: {os.path.getsize(filename) / 1e6:.1f} MB")
# 示例
save_sparse_matrix(large_cov, 'large_cov')
loaded_cov = load_sparse_matrix('large_cov')
assert np.allclose(large_cov.toarray(), loaded_cov.toarray())
8. 高级技巧与优化
预条件器加速迭代求解
from scipy.sparse.linalg import LinearOperator, aslinearoperator
def ilu_preconditioner(A, drop_tol=1e-4):
"""不完全LU预条件器"""
ilu = spilu(A, drop_tol=drop_tol, fill_factor=10)
def matvec(v):
return ilu.solve(v)
def rmatvec(v):
return ilu.solve(v) # 对称
P = LinearOperator(A.shape, matvec=matvec, rmatvec=rmatvec)
return P
# 使用预条件器的GMRES
A = sparse.random(5000, 5000, density=0.01, format='csr')
b = np.random.randn(5000)
P = ilu_preconditioner(A, drop_tol=1e-5)
x, info = gmres(A, b, M=P, tol=1e-8)
print(f"预条件GMRES收敛: {info == 0}")
稀疏矩阵切片与索引优化
def efficient_sparse_slicing(sparse_matrix):
"""高效的稀疏矩阵切片"""
# CSR格式:行切片高效
rows_subset = sparse_matrix[::10, :] # 每10行取1行
# CSC格式:列切片高效
cols_subset = sparse_matrix[:, ::10].tocsc()
# 高级索引(保持稀疏性)
row_indices = np.random.choice(sparse_matrix.shape[0], 1000, replace=False)
col_indices = np.random.choice(sparse_matrix.shape[1], 1000, replace=False)
submatrix = sparse_matrix[row_indices[:, None], col_indices]
return rows_subset, cols_subset, submatrix
# 行业子集提取
industry_mask = np.random.choice([0, 1], size=(n_assets,), p=[0.8, 0.2])
industry_assets = np.where(industry_mask)[0]
industry_cov = large_cov[industry_assets][:, industry_assets]
9. 内存管理与性能调优
内存监控
def monitor_sparse_memory(matrix):
"""稀疏矩阵内存分析"""
print(f"形状: {matrix.shape}")
print(f"非零元素: {matrix.nnz}")
print(f"数据内存: {matrix.data.nbytes / 1e6:.1f} MB")
print(f"索引内存: {(matrix.indices.nbytes + matrix.indptr.nbytes) / 1e6:.1f} MB")
print(f"总内存: {(matrix.data.nbytes + matrix.indices.nbytes + matrix.indptr.nbytes) / 1e6:.1f} MB")
print(f"密集等价: {matrix.shape[0] * matrix.shape[1] * 8 / 1e6:.1f} MB")
print(f"节省率: {1 - matrix.nnz / (matrix.shape[0] * matrix.shape[1]):.1%}")
monitor_sparse_memory(large_cov)
垃圾回收与内存优化
import gc
def optimize_memory_usage():
"""稀疏矩阵内存优化"""
# 强制垃圾回收
gc.collect()
# 转换到最优格式
if sparse_matrix.format != 'csr':
sparse_matrix = sparse_matrix.tocsr()
# 消除零元素
sparse_matrix.eliminate_zeros()
# 压缩索引(如果适用)
if hasattr(sparse_matrix, 'sum_duplicates'):
sparse_matrix.sum_duplicates()
return sparse_matrix
large_cov = optimize_memory_usage()
10. 与其他库集成
CuPy GPU加速(大规模)
# 需要:pip install cupy
import cupy as cp
from scipy.sparse import issparse
def gpu_sparse_multiply(A_scipy, x):
"""CPU稀疏矩阵 → GPU加速乘法"""
if not issparse(A_scipy):
raise ValueError("输入必须是SciPy稀疏矩阵")
# 转换为CuPy稀疏
A_cupy = cp.sparse.csr_matrix(A_scipy)
x_gpu = cp.asarray(x)
# GPU计算
y_gpu = A_cupy @ x_gpu
y_cpu = cp.asnumpy(y_gpu)
return y_cpu
# 大规模风险计算
portfolio_value = gpu_sparse_multiply(large_cov, weights)
Dask分布式计算
import dask.array as da
def dask_sparse_integration(sparse_matrix, block_size=1000):
"""Dask分布式稀疏计算"""
# 分块处理
blocks = []
n_rows, n_cols = sparse_matrix.shape
for i in range(0, n_rows, block_size):
row_block = sparse_matrix[i:i+block_size, :]
blocks.append(da.from_array(row_block.toarray(), chunks='auto'))
# Dask数组堆叠
dask_array = da.vstack(blocks)
# 分布式计算
result = dask_array @ da.random.normal(size=(n_cols,), chunks=block_size)
computed = result.compute()
return computed
11. 最佳实践与注意事项
格式选择指南
def choose_optimal_format(operation):
"""根据操作选择最优稀疏格式"""
format_recommendations = {
'matrix_vector_multiply': 'csr',
'vector_matrix_multiply': 'csc',
'elementwise_operations': 'coo',
'dynamic_updates': 'lil',
'block_operations': 'bsr'
}
return format_recommendations.get(operation, 'csr')
# 示例:根据操作优化格式
A = sparse.random(10000, 10000, 0.01)
for op, fmt in [('matrix_vector', 'csr'), ('column_sum', 'csc')]:
A_opt = A.asformat(fmt)
print(f"{op}优化格式: {fmt}")
数值稳定性
def check_sparse_numerical_stability(matrix):
"""检查稀疏矩阵数值稳定性"""
# 条件数估计
cond_est = sparse.linalg.norm(matrix) * sparse.linalg.norm(sparse.linalg.inv(matrix))
# 非零元素分布
nnz_per_row = np.diff(matrix.indptr)
sparsity_variation = nnz_per_row.std() / nnz_per_row.mean()
warnings = []
if cond_est > 1e10:
warnings.append("警告:条件数过大,可能数值不稳定")
if sparsity_variation > 2:
warnings.append("警告:行稀疏度变化剧烈")
for warning in warnings:
print(warning)
return {'condition_number': cond_est, 'sparsity_variation': sparsity_variation}
stability = check_sparse_numerical_stability(large_cov)
SciPy稀疏矩阵是处理金融大规模数据问题的核心工具,通过合理选择存储格式和算法,可以在保持精度的同时大幅降低内存和计算成本。需要特定稀疏算法实现或与深度学习框架的集成,请告诉我具体需求!