SciPy Matlab 数组
SciPy MATLAB 数组互操作:量化金融模型集成
scipy.io
模块提供了MATLAB .mat
文件的读写功能,支持不同MATLAB版本格式(v4, v5, v6, v7, v7.3)。这在量化金融中用于与MATLAB量化工具箱、风险管理系统集成,或迁移遗留模型至关重要。
1. 基本MATLAB文件I/O
读取MATLAB文件
from scipy.io import loadmat, savemat
import numpy as np
import pandas as pd
import h5py # 用于v7.3格式
# 读取MATLAB文件(自动检测格式)
def load_matlab_data(filename):
"""加载MATLAB数据,支持版本检测"""
try:
# 优先尝试标准加载
data = loadmat(filename, struct_as_record=False, squeeze_me=True)
# 处理MATLAB结构体
variables = {}
for key, value in data.items():
if not key.startswith('__'): # 跳过元数据
variables[key] = value
print(f"加载变量 '{key}': {type(value)}")
if hasattr(value, 'shape'):
print(f" 形状: {value.shape}")
return variables
except Exception as e:
print(f"标准加载失败: {e}")
# 尝试HDF5格式(v7.3)
try:
with h5py.File(filename, 'r') as f:
variables = {}
def extract_hdf5(name, obj):
if isinstance(obj, h5py.Dataset):
variables[name] = np.array(obj)
f.visititems(extract_hdf5)
return variables
except:
raise ValueError(f"无法加载MATLAB文件: {filename}")
# 示例:加载量化模型参数
mat_data = load_matlab_data('quant_model.mat')
# 假设包含:cov_matrix, weights, returns
if 'cov_matrix' in mat_data:
cov_matrix = mat_data['cov_matrix']
print(f"协方差矩阵: {cov_matrix.shape}")
写入MATLAB文件
def save_to_matlab(variables, filename, version='5', do_compression=True):
"""保存数据到MATLAB格式"""
# 准备MATLAB兼容数据
mat_data = {}
for key, value in variables.items():
if isinstance(value, (np.ndarray, pd.DataFrame, pd.Series)):
# NumPy数组直接保存
if isinstance(value, (pd.DataFrame, pd.Series)):
mat_data[key] = value.values
else:
mat_data[key] = value
elif isinstance(value, dict):
# 递归处理字典(模拟MATLAB结构体)
struct = np.empty((1, 1), dtype=[(subkey, 'O') for subkey in value.keys()])
for subkey, subvalue in value.items():
if isinstance(subvalue, np.ndarray):
struct[subkey][0, 0] = subvalue
else:
struct[subkey][0, 0] = np.array(subvalue)
mat_data[key] = struct
else:
mat_data[key] = np.array(value)
# 保存(v7.3需要HDF5)
if version == '7.3':
savemat(filename, mat_data, format='5', do_compression=do_compression)
print(f"保存到HDF5格式: {filename}")
else:
savemat(filename, mat_data, format=version, do_compression=do_compression)
print(f"保存到MATLAB {version}格式: {filename}")
# 示例:保存投资组合优化结果
portfolio_results = {
'optimal_weights': np.random.rand(100),
'expected_returns': np.random.rand(100),
'covariance_matrix': np.random.rand(100, 100),
'risk_metrics': {
'VaR': 0.05,
'CVaR': 0.08,
'Sharpe': 1.2
}
}
save_to_matlab(portfolio_results, 'portfolio_results.mat')
2. 数据类型转换与兼容性
MATLAB与NumPy类型映射
def matlab_to_python_types(mat_data):
"""MATLAB数据类型到Python转换"""
type_mapping = {
'double': np.float64,
'single': np.float32,
'int32': np.int32,
'int64': np.int64,
'logical': np.bool_,
'char': str,
'cell': list,
'struct': dict
}
converted_data = {}
for key, value in mat_data.items():
if isinstance(value, np.ndarray):
# 处理MATLAB特有属性
if value.dtype.names is not None: # 结构体数组
struct_dict = {}
for field in value.dtype.names:
struct_dict[field] = value[field].squeeze()
converted_data[key] = struct_dict
elif value.dtype.kind == 'O': # 对象数组(cell)
converted_data[key] = [v.squeeze() if hasattr(v, 'squeeze') else v
for v in value.flat]
else:
# 标准数值数组
converted_data[key] = np.squeeze(value)
else:
converted_data[key] = value
return converted_data
# 示例:处理复杂MATLAB结构体
complex_data = loadmat('complex_model.mat', struct_as_record=True)
python_data = matlab_to_python_types(complex_data)
print("转换后的数据类型:")
for key, value in list(python_data.items())[:3]:
print(f" {key}: {type(value)}")
时间序列数据处理
def matlab_timeseries_to_pandas(mat_data, time_key='dates', data_key='returns'):
"""MATLAB时间序列转换为Pandas"""
if time_key in mat_data and data_key in mat_data:
# MATLAB日期通常是自1900-01-01的序列数
matlab_dates = mat_data[time_key].flatten()
# 转换为Python datetime
python_dates = pd.to_datetime(matlab_dates, origin='1899-12-30', unit='D')
returns = mat_data[data_key]
if returns.ndim > 1:
# 多资产时间序列
df = pd.DataFrame(returns.T, index=python_dates,
columns=[f'ASSET_{i}' for i in range(returns.shape[1])])
else:
df = pd.DataFrame({'returns': returns.flatten()}, index=python_dates)
return df.dropna()
raise KeyError(f"未找到时间序列键: {time_key}, {data_key}")
# 示例:转换MATLAB历史数据
timeseries_df = matlab_timeseries_to_pandas(mat_data, 'trade_dates', 'asset_returns')
print("时间序列数据:")
print(timeseries_df.head())
print(f"时间范围: {timeseries_df.index.min()} 到 {timeseries_df.index.max()}")
3. 量化金融模型迁移
Black-Scholes参数迁移
def migrate_bs_model(matlab_params):
"""迁移Black-Scholes模型参数"""
bs_params = {
'S': matlab_params['spot_price'], # 当前价格
'K': matlab_params['strike_price'], # 行权价
'T': matlab_params['time_to_maturity'], # 到期时间
'r': matlab_params['risk_free_rate'], # 无风险利率
'sigma': matlab_params['volatility'], # 波动率
'dividend_yield': matlab_params.get('dividend_yield', 0.0)
}
# 验证参数
for key, value in bs_params.items():
if np.any(np.isnan(value)) or np.any(np.isinf(value)):
print(f"警告: {key}包含无效值")
return bs_params
# 批量期权定价参数迁移
options_data = loadmat('options_portfolio.mat')
migrated_options = []
for i in range(options_data['option_struct'].shape[1]):
option_params = migrate_bs_model(options_data['option_struct'][0, i])
migrated_options.append(option_params)
print(f"迁移 {len(migrated_options)} 个期权定价参数")
投资组合优化结果转换
def convert_portfolio_optimization(matlab_results):
"""转换MATLAB投资组合优化结果"""
weights = matlab_results['optimal_weights'].flatten()
constraints = matlab_results['constraints']
# 风险度量
risk_measures = {
'expected_return': float(matlab_results['expected_return']),
'portfolio_variance': float(matlab_results['portfolio_variance']),
'VaR_95': float(matlab_results['VaR_95']),
'tracking_error': float(matlab_results['tracking_error'])
}
# 约束违反检查
constraint_violations = {}
for i, (cons_type, cons_value) in enumerate(zip(constraints['type'], constraints['value'])):
violation = abs(cons_value)
constraint_violations[f'constraint_{i}'] = {
'type': cons_type,
'violation': violation,
'satisfied': violation < 1e-6
}
return {
'weights': weights,
'risk_measures': risk_measures,
'constraints': constraint_violations,
'success': matlab_results.get('optimization_success', True)
}
# 处理批量优化结果
portfolio_data = loadmat('portfolio_optimizations.mat')
converted_portfolios = []
for i in range(portfolio_data['results'].shape[1]):
result = convert_portfolio_optimization(portfolio_data['results'][0, i])
converted_portfolios.append(result)
# 验证结果
success_rate = sum(1 for p in converted_portfolios if p['success']) / len(converted_portfolios)
print(f"优化成功率: {success_rate:.1%}")
4. 大规模数据处理
HDF5格式(MATLAB v7.3)支持
def handle_large_matlab_v73(filename):
"""处理大型MATLAB v7.3文件(HDF5)"""
variables = {}
with h5py.File(filename, 'r') as f:
def recursive_extract(name, obj):
if isinstance(obj, h5py.Dataset):
# 处理大数据集(分块读取)
if obj.size > 1e7: # >10M元素
print(f"大数组 '{name}': {obj.shape}, 分块处理")
# 分块读取策略
chunk_size = min(1000000, obj.shape[0])
data_chunks = []
for start in range(0, obj.shape[0], chunk_size):
end = min(start + chunk_size, obj.shape[0])
chunk = obj[start:end]
data_chunks.append(chunk[:])
variables[name] = np.concatenate(data_chunks)
else:
variables[name] = np.array(obj)
elif isinstance(obj, h5py.Group):
pass # 递归子组
f.visititems(recursive_extract)
return variables
# 处理大型历史数据
large_data = handle_large_matlab_v73('historical_data_v73.mat')
print(f"大型数据变量: {list(large_data.keys())}")
内存映射处理超大矩阵
def memory_mapped_matlab(filename, variable_name):
"""内存映射处理超大MATLAB矩阵"""
# 先加载元数据
metadata = loadmat(filename, struct_as_record=False)
if variable_name in metadata:
# 创建临时密集文件用于内存映射
temp_dense = metadata[variable_name]
temp_filename = 'temp_dense.npy'
np.save(temp_filename, temp_dense)
# 内存映射
mmap_array = np.lib.format.open_memmap(temp_filename, dtype=temp_dense.dtype,
mode='r', shape=temp_dense.shape)
return mmap_array
raise KeyError(f"变量 {variable_name} 未找到")
# 处理超大协方差矩阵
large_cov_mmap = memory_mapped_matlab('large_cov.mat', 'covariance_matrix')
print(f"内存映射协方差矩阵: {large_cov_mmap.shape}")
print(f"内存占用: {large_cov_mmap.nbytes / 1e9:.2f} GB (虚拟)")
# 计算主成分(无需全部加载)
from scipy.sparse.linalg import eigsh
eigenvalues, eigenvectors = eigsh(large_cov_mmap, k=10, which='LM')
print(f"前10个特征值: {eigenvalues[-10:]}")
5. 与Pandas集成
DataFrame ↔ MATLAB互转
def dataframe_to_matlab(df, filename, var_name='data'):
"""Pandas DataFrame转换为MATLAB"""
# 提取数值数据和索引
numeric_data = df.select_dtypes(include=[np.number]).values
row_names = df.index.astype(str).tolist()
col_names = df.columns.tolist()
matlab_struct = {
var_name: numeric_data,
f'{var_name}_row_names': np.array(row_names, dtype='U'),
f'{var_name}_col_names': np.array(col_names, dtype='U'),
f'{var_name}_description': df.describe().values
}
save_to_matlab(matlab_struct, filename)
print(f"DataFrame保存为MATLAB: {filename}")
def matlab_to_dataframe(mat_data, var_name):
"""MATLAB数据转换为DataFrame"""
if var_name in mat_data:
data = mat_data[var_name]
# 尝试重建索引和列名
row_names_key = f'{var_name}_row_names'
col_names_key = f'{var_name}_col_names'
if row_names_key in mat_data and col_names_key in mat_data:
index = pd.Index(mat_data[row_names_key])
columns = pd.Index(mat_data[col_names_key])
return pd.DataFrame(data, index=index, columns=columns)
else:
return pd.DataFrame(data)
raise KeyError(f"未找到DataFrame数据: {var_name}")
# 投资组合数据互转
portfolio_df = pd.DataFrame({
'weights': np.random.rand(100),
'returns': np.random.randn(100),
'volatility': np.random.rand(100) * 0.3
}, index=[f'ASSET_{i}' for i in range(100)])
dataframe_to_matlab(portfolio_df, 'portfolio_df.mat', 'portfolio')
converted_back = matlab_to_dataframe(loadmat('portfolio_df.mat'), 'portfolio')
print("DataFrame互转验证:")
print(f"原始形状: {portfolio_df.shape}, 转换后: {converted_back.shape}")
print(f"数据一致性: {np.allclose(portfolio_df.values, converted_back.values)}")
6. 版本兼容性与错误处理
MATLAB版本检测与转换
def detect_matlab_version(filename):
"""检测MATLAB文件版本"""
import struct
with open(filename, 'rb') as f:
header = f.read(116) # MATLAB文件头
if b'MATLAB 5.0 MAT-file' in header:
return 'v5-v6'
elif b'HDF5' in header[:8]:
return 'v7.3'
else:
# 尝试加载判断
try:
loadmat(filename, struct_as_record=False)
return 'v4-v6'
except:
return 'v7.3-hdf5'
def convert_matlab_version(input_file, output_file, target_version='5'):
"""转换MATLAB文件版本"""
version = detect_matlab_version(input_file)
print(f"检测版本: {version}, 目标: {target_version}")
if version == 'v7.3' and target_version in ['5', '4']:
# HDF5转标准格式
data = handle_large_matlab_v73(input_file)
save_to_matlab(data, output_file, version=target_version)
elif version in ['v5-v6', 'v4-v6'] and target_version == '7.3':
# 标准格式转HDF5
data = load_matlab_data(input_file)
save_to_matlab(data, output_file, version='7.3')
else:
print("版本兼容,无需转换")
import shutil
shutil.copy(input_file, output_file)
# 版本转换示例
convert_matlab_version('model_v73.mat', 'model_v5.mat', '5')
错误处理与数据验证
def robust_matlab_loader(filename, validate=True):
"""健壮的MATLAB加载器"""
try:
data = loadmat(filename, struct_as_record=False, squeeze_me=True)
if validate:
# 数据完整性检查
for key, value in data.items():
if not key.startswith('__'):
if isinstance(value, np.ndarray):
if np.any(np.isnan(value)):
print(f"警告: '{key}'包含NaN值")
if np.any(np.isinf(value)):
print(f"警告: '{key}'包含Inf值")
if value.ndim > 2:
print(f"警告: '{key}'高维数组: {value.shape}")
return data
except Exception as e:
print(f"加载错误: {e}")
print("尝试备用方法...")
# 备用HDF5加载
try:
return handle_large_matlab_v73(filename)
except Exception as e2:
raise RuntimeError(f"所有加载方法失败: {e2}")
# 批量处理MATLAB文件
mat_files = ['model1.mat', 'model2.mat', 'large_data.mat']
loaded_models = {}
for mat_file in mat_files:
try:
loaded_models[mat_file] = robust_matlab_loader(mat_file, validate=True)
print(f"✓ 成功加载: {mat_file}")
except Exception as e:
print(f"✗ 加载失败: {mat_file} - {e}")
7. 高级应用:模型集成
MATLAB优化结果验证
def validate_matlab_optimization(matlab_results, python_recompute=True):
"""验证MATLAB优化结果"""
# 提取MATLAB结果
matlab_weights = matlab_results['optimal_weights'].flatten()
matlab_objective = float(matlab_results['objective_value'])
if python_recompute:
# Python重算验证
returns = matlab_results['expected_returns']
cov_matrix = matlab_results['cov_matrix']
def python_objective(weights):
port_return = np.dot(weights, returns)
port_risk = np.sqrt(np.dot(weights.T, np.dot(cov_matrix, weights)))
return -(port_return - 0.03) / port_risk # 负夏普比率
python_value = python_objective(matlab_weights)
validation_error = abs(matlab_objective - python_value) / abs(matlab_objective)
return {
'matlab_weights': matlab_weights,
'matlab_objective': matlab_objective,
'python_objective': python_value,
'relative_error': validation_error,
'validation_passed': validation_error < 1e-6
}
else:
return {'matlab_weights': matlab_weights, 'matlab_objective': matlab_objective}
# 批量验证
optimization_results = loadmat('batch_optimizations.mat')['results']
validated_results = []
for i, result in enumerate(optimization_results[0]):
validation = validate_matlab_optimization(result)
validated_results.append(validation)
status = "✓" if validation['validation_passed'] else "✗"
print(f"{status} 优化 {i+1}: 误差 {validation['relative_error']:.2e}")
实时模型同步
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class MatlabModelSync(FileSystemEventHandler):
"""MATLAB模型文件实时同步"""
def __init__(self, watch_dir, callback):
self.callback = callback
self.last_modified = {}
def on_modified(self, event):
if event.src_path.endswith('.mat'):
filename = event.src_path
mtime = os.path.getmtime(filename)
# 防重复触发
if filename not in self.last_modified or mtime > self.last_modified[filename]:
self.last_modified[filename] = mtime
print(f"检测到MATLAB文件更新: {os.path.basename(filename)}")
try:
data = robust_matlab_loader(filename)
self.callback(filename, data)
except Exception as e:
print(f"同步失败: {e}")
def start_sync(self):
observer = Observer()
observer.schedule(self, path=watch_dir, recursive=False)
observer.start()
print(f"开始监控目录: {watch_dir}")
return observer
def model_update_callback(filename, data):
"""模型更新回调"""
print(f"更新模型参数 from {filename}")
# 更新全局模型参数
global current_model_params
current_model_params.update(data)
print(f"参数更新完成,变量: {list(data.keys())}")
# 启动实时同步
sync_handler = MatlabModelSync('/path/to/matlab/models', model_update_callback)
observer = sync_handler.start_sync()
# 保持运行
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
8. 性能优化
批量处理与并行加载
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing as mp
def parallel_load_matlab(mat_files, max_workers=None):
"""并行加载多个MATLAB文件"""
if max_workers is None:
max_workers = min(mp.cpu_count(), len(mat_files))
def load_single_file(filename):
try:
return filename, robust_matlab_loader(filename)
except Exception as e:
return filename, {'error': str(e)}
results = {}
with ProcessPoolExecutor(max_workers=max_workers) as executor:
future_to_file = {executor.submit(load_single_file, f): f
for f in mat_files}
for future in as_completed(future_to_file):
filename, data = future.result()
results[filename] = data
status = "✓" if 'error' not in data else "✗"
print(f"{status} {os.path.basename(filename)}")
return results
# 批量处理大型模型库
model_files = glob.glob('models/*.mat')
loaded_models = parallel_load_matlab(model_files, max_workers=4)
压缩与存储优化
def optimize_matlab_storage(data_dict, filename, compression_level=9):
"""优化MATLAB文件存储"""
optimized_data = {}
for key, value in data_dict.items():
if isinstance(value, np.ndarray):
# 选择最优dtype
if value.dtype == np.float64 and np.all(np.abs(value) < 1e6):
optimized_data[key] = value.astype(np.float32)
elif np.issubdtype(value.dtype, np.integer) and value.max() < 2**31:
optimized_data[key] = value.astype(np.int32)
else:
optimized_data[key] = value
else:
optimized_data[key] = value
# 使用高压缩
savemat(filename, optimized_data, format='5',
do_compression=True, long_field_names=True)
# 验证压缩效果
original_size = sum(arr.nbytes for arr in data_dict.values() if isinstance(arr, np.ndarray))
compressed_size = os.path.getsize(filename)
compression_ratio = original_size / compressed_size
print(f"压缩比: {compression_ratio:.1f}x")
print(f"文件大小: {compressed_size / 1e6:.1f} MB")
return optimized_data
# 优化存储大型数据集
large_dataset = {'returns': np.random.randn(1000000, 100),
'factors': np.random.randn(1000000, 20)}
optimize_matlab_storage(large_dataset, 'compressed_data.mat')
9. 完整工作流示例
class MatlabQuantWorkflow:
"""MATLAB-Python量化工作流"""
def __init__(self, matlab_dir):
self.matlab_dir = matlab_dir
self.models = {}
def load_models(self):
"""批量加载MATLAB模型"""
mat_files = glob.glob(os.path.join(self.matlab_dir, '*.mat'))
self.models = parallel_load_matlab(mat_files)
# 分类模型
self.portfolio_models = {}
self.pricing_models = {}
for filename, data in self.models.items():
if 'portfolio' in filename.lower():
self.portfolio_models[filename] = data
elif 'option' in filename.lower() or 'pricing' in filename.lower():
self.pricing_models[filename] = data
def validate_and_migrate(self):
"""验证并迁移模型"""
migrated_models = {}
for name, data in self.portfolio_models.items():
try:
validation = validate_matlab_optimization(data)
if validation['validation_passed']:
migrated_models[name] = {
'weights': validation['matlab_weights'],
'risk_metrics': validation['risk_measures'],
'validated': True
}
print(f"✓ 验证通过: {name}")
else:
print(f"✗ 验证失败: {name}")
except Exception as e:
print(f"✗ 处理错误 {name}: {e}")
return migrated_models
def export_to_python_format(self, migrated_models, output_dir):
"""导出为Python原生格式"""
os.makedirs(output_dir, exist_ok=True)
for name, model in migrated_models.items():
# 保存为Pickle(快速)
pickle_filename = os.path.join(output_dir, f"{os.path.splitext(name)[0]}.pkl")
with open(pickle_filename, 'wb') as f:
pickle.dump(model, f)
# 保存为HDF5(兼容性)
h5_filename = os.path.join(output_dir, f"{os.path.splitext(name)[0]}.h5")
with h5py.File(h5_filename, 'w') as f:
for key, value in model.items():
if isinstance(value, np.ndarray):
f.create_dataset(key, data=value)
else:
f.attrs[key] = value
print(f"导出完成: {output_dir}")
def run_complete_workflow(self):
"""完整工作流"""
print("1. 加载MATLAB模型...")
self.load_models()
print("2. 验证与迁移...")
migrated = self.validate_and_migrate()
print("3. 导出Python格式...")
self.export_to_python_format(migrated, 'migrated_models')
return migrated
# 执行完整迁移
workflow = MatlabQuantWorkflow('/path/to/matlab/models')
migrated_models = workflow.run_complete_workflow()
SciPy的MATLAB数组支持为量化金融模型迁移、遗留系统集成和跨平台开发提供了关键桥梁。通过合理的数据类型转换、版本兼容性和性能优化,可以无缝地在Python生态中利用MATLAB开发的复杂金融模型。需要特定模型迁移方案或与MATLAB引擎的实时交互,请告诉我具体需求!