SciPy Matlab 数组

SciPy MATLAB 数组互操作:量化金融模型集成

scipy.io 模块提供了MATLAB .mat 文件的读写功能,支持不同MATLAB版本格式(v4, v5, v6, v7, v7.3)。这在量化金融中用于与MATLAB量化工具箱、风险管理系统集成,或迁移遗留模型至关重要。

1. 基本MATLAB文件I/O

读取MATLAB文件
from scipy.io import loadmat, savemat
import numpy as np
import pandas as pd
import h5py  # 用于v7.3格式

# 读取MATLAB文件(自动检测格式)
def load_matlab_data(filename):
    """加载MATLAB数据,支持版本检测"""
    try:
        # 优先尝试标准加载
        data = loadmat(filename, struct_as_record=False, squeeze_me=True)

        # 处理MATLAB结构体
        variables = {}
        for key, value in data.items():
            if not key.startswith('__'):  # 跳过元数据
                variables[key] = value
                print(f"加载变量 '{key}': {type(value)}")
                if hasattr(value, 'shape'):
                    print(f"  形状: {value.shape}")

        return variables

    except Exception as e:
        print(f"标准加载失败: {e}")
        # 尝试HDF5格式(v7.3)
        try:
            with h5py.File(filename, 'r') as f:
                variables = {}
                def extract_hdf5(name, obj):
                    if isinstance(obj, h5py.Dataset):
                        variables[name] = np.array(obj)
                f.visititems(extract_hdf5)
            return variables
        except:
            raise ValueError(f"无法加载MATLAB文件: {filename}")

# 示例:加载量化模型参数
mat_data = load_matlab_data('quant_model.mat')
# 假设包含:cov_matrix, weights, returns
if 'cov_matrix' in mat_data:
    cov_matrix = mat_data['cov_matrix']
    print(f"协方差矩阵: {cov_matrix.shape}")
写入MATLAB文件
def save_to_matlab(variables, filename, version='5', do_compression=True):
    """保存数据到MATLAB格式"""
    # 准备MATLAB兼容数据
    mat_data = {}

    for key, value in variables.items():
        if isinstance(value, (np.ndarray, pd.DataFrame, pd.Series)):
            # NumPy数组直接保存
            if isinstance(value, (pd.DataFrame, pd.Series)):
                mat_data[key] = value.values
            else:
                mat_data[key] = value
        elif isinstance(value, dict):
            # 递归处理字典(模拟MATLAB结构体)
            struct = np.empty((1, 1), dtype=[(subkey, 'O') for subkey in value.keys()])
            for subkey, subvalue in value.items():
                if isinstance(subvalue, np.ndarray):
                    struct[subkey][0, 0] = subvalue
                else:
                    struct[subkey][0, 0] = np.array(subvalue)
            mat_data[key] = struct
        else:
            mat_data[key] = np.array(value)

    # 保存(v7.3需要HDF5)
    if version == '7.3':
        savemat(filename, mat_data, format='5', do_compression=do_compression)
        print(f"保存到HDF5格式: {filename}")
    else:
        savemat(filename, mat_data, format=version, do_compression=do_compression)
        print(f"保存到MATLAB {version}格式: {filename}")

# 示例:保存投资组合优化结果
portfolio_results = {
    'optimal_weights': np.random.rand(100),
    'expected_returns': np.random.rand(100),
    'covariance_matrix': np.random.rand(100, 100),
    'risk_metrics': {
        'VaR': 0.05,
        'CVaR': 0.08,
        'Sharpe': 1.2
    }
}
save_to_matlab(portfolio_results, 'portfolio_results.mat')

2. 数据类型转换与兼容性

MATLAB与NumPy类型映射
def matlab_to_python_types(mat_data):
    """MATLAB数据类型到Python转换"""
    type_mapping = {
        'double': np.float64,
        'single': np.float32,
        'int32': np.int32,
        'int64': np.int64,
        'logical': np.bool_,
        'char': str,
        'cell': list,
        'struct': dict
    }

    converted_data = {}
    for key, value in mat_data.items():
        if isinstance(value, np.ndarray):
            # 处理MATLAB特有属性
            if value.dtype.names is not None:  # 结构体数组
                struct_dict = {}
                for field in value.dtype.names:
                    struct_dict[field] = value[field].squeeze()
                converted_data[key] = struct_dict
            elif value.dtype.kind == 'O':  # 对象数组(cell)
                converted_data[key] = [v.squeeze() if hasattr(v, 'squeeze') else v 
                                     for v in value.flat]
            else:
                # 标准数值数组
                converted_data[key] = np.squeeze(value)
        else:
            converted_data[key] = value

    return converted_data

# 示例:处理复杂MATLAB结构体
complex_data = loadmat('complex_model.mat', struct_as_record=True)
python_data = matlab_to_python_types(complex_data)
print("转换后的数据类型:")
for key, value in list(python_data.items())[:3]:
    print(f"  {key}: {type(value)}")
时间序列数据处理
def matlab_timeseries_to_pandas(mat_data, time_key='dates', data_key='returns'):
    """MATLAB时间序列转换为Pandas"""
    if time_key in mat_data and data_key in mat_data:
        # MATLAB日期通常是自1900-01-01的序列数
        matlab_dates = mat_data[time_key].flatten()

        # 转换为Python datetime
        python_dates = pd.to_datetime(matlab_dates, origin='1899-12-30', unit='D')

        returns = mat_data[data_key]
        if returns.ndim > 1:
            # 多资产时间序列
            df = pd.DataFrame(returns.T, index=python_dates, 
                            columns=[f'ASSET_{i}' for i in range(returns.shape[1])])
        else:
            df = pd.DataFrame({'returns': returns.flatten()}, index=python_dates)

        return df.dropna()

    raise KeyError(f"未找到时间序列键: {time_key}, {data_key}")

# 示例:转换MATLAB历史数据
timeseries_df = matlab_timeseries_to_pandas(mat_data, 'trade_dates', 'asset_returns')
print("时间序列数据:")
print(timeseries_df.head())
print(f"时间范围: {timeseries_df.index.min()} 到 {timeseries_df.index.max()}")

3. 量化金融模型迁移

Black-Scholes参数迁移
def migrate_bs_model(matlab_params):
    """迁移Black-Scholes模型参数"""
    bs_params = {
        'S': matlab_params['spot_price'],      # 当前价格
        'K': matlab_params['strike_price'],    # 行权价
        'T': matlab_params['time_to_maturity'], # 到期时间
        'r': matlab_params['risk_free_rate'],  # 无风险利率
        'sigma': matlab_params['volatility'],  # 波动率
        'dividend_yield': matlab_params.get('dividend_yield', 0.0)
    }

    # 验证参数
    for key, value in bs_params.items():
        if np.any(np.isnan(value)) or np.any(np.isinf(value)):
            print(f"警告: {key}包含无效值")

    return bs_params

# 批量期权定价参数迁移
options_data = loadmat('options_portfolio.mat')
migrated_options = []
for i in range(options_data['option_struct'].shape[1]):
    option_params = migrate_bs_model(options_data['option_struct'][0, i])
    migrated_options.append(option_params)

print(f"迁移 {len(migrated_options)} 个期权定价参数")
投资组合优化结果转换
def convert_portfolio_optimization(matlab_results):
    """转换MATLAB投资组合优化结果"""

    weights = matlab_results['optimal_weights'].flatten()
    constraints = matlab_results['constraints']

    # 风险度量
    risk_measures = {
        'expected_return': float(matlab_results['expected_return']),
        'portfolio_variance': float(matlab_results['portfolio_variance']),
        'VaR_95': float(matlab_results['VaR_95']),
        'tracking_error': float(matlab_results['tracking_error'])
    }

    # 约束违反检查
    constraint_violations = {}
    for i, (cons_type, cons_value) in enumerate(zip(constraints['type'], constraints['value'])):
        violation = abs(cons_value)
        constraint_violations[f'constraint_{i}'] = {
            'type': cons_type,
            'violation': violation,
            'satisfied': violation < 1e-6
        }

    return {
        'weights': weights,
        'risk_measures': risk_measures,
        'constraints': constraint_violations,
        'success': matlab_results.get('optimization_success', True)
    }

# 处理批量优化结果
portfolio_data = loadmat('portfolio_optimizations.mat')
converted_portfolios = []
for i in range(portfolio_data['results'].shape[1]):
    result = convert_portfolio_optimization(portfolio_data['results'][0, i])
    converted_portfolios.append(result)

# 验证结果
success_rate = sum(1 for p in converted_portfolios if p['success']) / len(converted_portfolios)
print(f"优化成功率: {success_rate:.1%}")

4. 大规模数据处理

HDF5格式(MATLAB v7.3)支持
def handle_large_matlab_v73(filename):
    """处理大型MATLAB v7.3文件(HDF5)"""
    variables = {}

    with h5py.File(filename, 'r') as f:
        def recursive_extract(name, obj):
            if isinstance(obj, h5py.Dataset):
                # 处理大数据集(分块读取)
                if obj.size > 1e7:  # >10M元素
                    print(f"大数组 '{name}': {obj.shape}, 分块处理")
                    # 分块读取策略
                    chunk_size = min(1000000, obj.shape[0])
                    data_chunks = []
                    for start in range(0, obj.shape[0], chunk_size):
                        end = min(start + chunk_size, obj.shape[0])
                        chunk = obj[start:end]
                        data_chunks.append(chunk[:])
                    variables[name] = np.concatenate(data_chunks)
                else:
                    variables[name] = np.array(obj)
            elif isinstance(obj, h5py.Group):
                pass  # 递归子组

        f.visititems(recursive_extract)

    return variables

# 处理大型历史数据
large_data = handle_large_matlab_v73('historical_data_v73.mat')
print(f"大型数据变量: {list(large_data.keys())}")
内存映射处理超大矩阵
def memory_mapped_matlab(filename, variable_name):
    """内存映射处理超大MATLAB矩阵"""
    # 先加载元数据
    metadata = loadmat(filename, struct_as_record=False)

    if variable_name in metadata:
        # 创建临时密集文件用于内存映射
        temp_dense = metadata[variable_name]
        temp_filename = 'temp_dense.npy'
        np.save(temp_filename, temp_dense)

        # 内存映射
        mmap_array = np.lib.format.open_memmap(temp_filename, dtype=temp_dense.dtype, 
                                             mode='r', shape=temp_dense.shape)

        return mmap_array

    raise KeyError(f"变量 {variable_name} 未找到")

# 处理超大协方差矩阵
large_cov_mmap = memory_mapped_matlab('large_cov.mat', 'covariance_matrix')
print(f"内存映射协方差矩阵: {large_cov_mmap.shape}")
print(f"内存占用: {large_cov_mmap.nbytes / 1e9:.2f} GB (虚拟)")

# 计算主成分(无需全部加载)
from scipy.sparse.linalg import eigsh
eigenvalues, eigenvectors = eigsh(large_cov_mmap, k=10, which='LM')
print(f"前10个特征值: {eigenvalues[-10:]}")

5. 与Pandas集成

DataFrame ↔ MATLAB互转
def dataframe_to_matlab(df, filename, var_name='data'):
    """Pandas DataFrame转换为MATLAB"""
    # 提取数值数据和索引
    numeric_data = df.select_dtypes(include=[np.number]).values
    row_names = df.index.astype(str).tolist()
    col_names = df.columns.tolist()

    matlab_struct = {
        var_name: numeric_data,
        f'{var_name}_row_names': np.array(row_names, dtype='U'),
        f'{var_name}_col_names': np.array(col_names, dtype='U'),
        f'{var_name}_description': df.describe().values
    }

    save_to_matlab(matlab_struct, filename)
    print(f"DataFrame保存为MATLAB: {filename}")

def matlab_to_dataframe(mat_data, var_name):
    """MATLAB数据转换为DataFrame"""
    if var_name in mat_data:
        data = mat_data[var_name]

        # 尝试重建索引和列名
        row_names_key = f'{var_name}_row_names'
        col_names_key = f'{var_name}_col_names'

        if row_names_key in mat_data and col_names_key in mat_data:
            index = pd.Index(mat_data[row_names_key])
            columns = pd.Index(mat_data[col_names_key])
            return pd.DataFrame(data, index=index, columns=columns)
        else:
            return pd.DataFrame(data)

    raise KeyError(f"未找到DataFrame数据: {var_name}")

# 投资组合数据互转
portfolio_df = pd.DataFrame({
    'weights': np.random.rand(100),
    'returns': np.random.randn(100),
    'volatility': np.random.rand(100) * 0.3
}, index=[f'ASSET_{i}' for i in range(100)])

dataframe_to_matlab(portfolio_df, 'portfolio_df.mat', 'portfolio')
converted_back = matlab_to_dataframe(loadmat('portfolio_df.mat'), 'portfolio')
print("DataFrame互转验证:")
print(f"原始形状: {portfolio_df.shape}, 转换后: {converted_back.shape}")
print(f"数据一致性: {np.allclose(portfolio_df.values, converted_back.values)}")

6. 版本兼容性与错误处理

MATLAB版本检测与转换
def detect_matlab_version(filename):
    """检测MATLAB文件版本"""
    import struct

    with open(filename, 'rb') as f:
        header = f.read(116)  # MATLAB文件头
        if b'MATLAB 5.0 MAT-file' in header:
            return 'v5-v6'
        elif b'HDF5' in header[:8]:
            return 'v7.3'
        else:
            # 尝试加载判断
            try:
                loadmat(filename, struct_as_record=False)
                return 'v4-v6'
            except:
                return 'v7.3-hdf5'

def convert_matlab_version(input_file, output_file, target_version='5'):
    """转换MATLAB文件版本"""
    version = detect_matlab_version(input_file)
    print(f"检测版本: {version}, 目标: {target_version}")

    if version == 'v7.3' and target_version in ['5', '4']:
        # HDF5转标准格式
        data = handle_large_matlab_v73(input_file)
        save_to_matlab(data, output_file, version=target_version)
    elif version in ['v5-v6', 'v4-v6'] and target_version == '7.3':
        # 标准格式转HDF5
        data = load_matlab_data(input_file)
        save_to_matlab(data, output_file, version='7.3')
    else:
        print("版本兼容,无需转换")
        import shutil
        shutil.copy(input_file, output_file)

# 版本转换示例
convert_matlab_version('model_v73.mat', 'model_v5.mat', '5')
错误处理与数据验证
def robust_matlab_loader(filename, validate=True):
    """健壮的MATLAB加载器"""
    try:
        data = loadmat(filename, struct_as_record=False, squeeze_me=True)

        if validate:
            # 数据完整性检查
            for key, value in data.items():
                if not key.startswith('__'):
                    if isinstance(value, np.ndarray):
                        if np.any(np.isnan(value)):
                            print(f"警告: '{key}'包含NaN值")
                        if np.any(np.isinf(value)):
                            print(f"警告: '{key}'包含Inf值")
                        if value.ndim > 2:
                            print(f"警告: '{key}'高维数组: {value.shape}")

        return data

    except Exception as e:
        print(f"加载错误: {e}")
        print("尝试备用方法...")

        # 备用HDF5加载
        try:
            return handle_large_matlab_v73(filename)
        except Exception as e2:
            raise RuntimeError(f"所有加载方法失败: {e2}")

# 批量处理MATLAB文件
mat_files = ['model1.mat', 'model2.mat', 'large_data.mat']
loaded_models = {}
for mat_file in mat_files:
    try:
        loaded_models[mat_file] = robust_matlab_loader(mat_file, validate=True)
        print(f"✓ 成功加载: {mat_file}")
    except Exception as e:
        print(f"✗ 加载失败: {mat_file} - {e}")

7. 高级应用:模型集成

MATLAB优化结果验证
def validate_matlab_optimization(matlab_results, python_recompute=True):
    """验证MATLAB优化结果"""

    # 提取MATLAB结果
    matlab_weights = matlab_results['optimal_weights'].flatten()
    matlab_objective = float(matlab_results['objective_value'])

    if python_recompute:
        # Python重算验证
        returns = matlab_results['expected_returns']
        cov_matrix = matlab_results['cov_matrix']

        def python_objective(weights):
            port_return = np.dot(weights, returns)
            port_risk = np.sqrt(np.dot(weights.T, np.dot(cov_matrix, weights)))
            return -(port_return - 0.03) / port_risk  # 负夏普比率

        python_value = python_objective(matlab_weights)
        validation_error = abs(matlab_objective - python_value) / abs(matlab_objective)

        return {
            'matlab_weights': matlab_weights,
            'matlab_objective': matlab_objective,
            'python_objective': python_value,
            'relative_error': validation_error,
            'validation_passed': validation_error < 1e-6
        }
    else:
        return {'matlab_weights': matlab_weights, 'matlab_objective': matlab_objective}

# 批量验证
optimization_results = loadmat('batch_optimizations.mat')['results']
validated_results = []
for i, result in enumerate(optimization_results[0]):
    validation = validate_matlab_optimization(result)
    validated_results.append(validation)
    status = "✓" if validation['validation_passed'] else "✗"
    print(f"{status} 优化 {i+1}: 误差 {validation['relative_error']:.2e}")
实时模型同步
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class MatlabModelSync(FileSystemEventHandler):
    """MATLAB模型文件实时同步"""

    def __init__(self, watch_dir, callback):
        self.callback = callback
        self.last_modified = {}

    def on_modified(self, event):
        if event.src_path.endswith('.mat'):
            filename = event.src_path
            mtime = os.path.getmtime(filename)

            # 防重复触发
            if filename not in self.last_modified or mtime > self.last_modified[filename]:
                self.last_modified[filename] = mtime
                print(f"检测到MATLAB文件更新: {os.path.basename(filename)}")

                try:
                    data = robust_matlab_loader(filename)
                    self.callback(filename, data)
                except Exception as e:
                    print(f"同步失败: {e}")

    def start_sync(self):
        observer = Observer()
        observer.schedule(self, path=watch_dir, recursive=False)
        observer.start()
        print(f"开始监控目录: {watch_dir}")
        return observer

def model_update_callback(filename, data):
    """模型更新回调"""
    print(f"更新模型参数 from {filename}")
    # 更新全局模型参数
    global current_model_params
    current_model_params.update(data)
    print(f"参数更新完成,变量: {list(data.keys())}")

# 启动实时同步
sync_handler = MatlabModelSync('/path/to/matlab/models', model_update_callback)
observer = sync_handler.start_sync()

# 保持运行
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    observer.stop()
observer.join()

8. 性能优化

批量处理与并行加载
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing as mp

def parallel_load_matlab(mat_files, max_workers=None):
    """并行加载多个MATLAB文件"""
    if max_workers is None:
        max_workers = min(mp.cpu_count(), len(mat_files))

    def load_single_file(filename):
        try:
            return filename, robust_matlab_loader(filename)
        except Exception as e:
            return filename, {'error': str(e)}

    results = {}
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        future_to_file = {executor.submit(load_single_file, f): f 
                         for f in mat_files}

        for future in as_completed(future_to_file):
            filename, data = future.result()
            results[filename] = data
            status = "✓" if 'error' not in data else "✗"
            print(f"{status} {os.path.basename(filename)}")

    return results

# 批量处理大型模型库
model_files = glob.glob('models/*.mat')
loaded_models = parallel_load_matlab(model_files, max_workers=4)
压缩与存储优化
def optimize_matlab_storage(data_dict, filename, compression_level=9):
    """优化MATLAB文件存储"""

    optimized_data = {}
    for key, value in data_dict.items():
        if isinstance(value, np.ndarray):
            # 选择最优dtype
            if value.dtype == np.float64 and np.all(np.abs(value) < 1e6):
                optimized_data[key] = value.astype(np.float32)
            elif np.issubdtype(value.dtype, np.integer) and value.max() < 2**31:
                optimized_data[key] = value.astype(np.int32)
            else:
                optimized_data[key] = value
        else:
            optimized_data[key] = value

    # 使用高压缩
    savemat(filename, optimized_data, format='5', 
            do_compression=True, long_field_names=True)

    # 验证压缩效果
    original_size = sum(arr.nbytes for arr in data_dict.values() if isinstance(arr, np.ndarray))
    compressed_size = os.path.getsize(filename)
    compression_ratio = original_size / compressed_size

    print(f"压缩比: {compression_ratio:.1f}x")
    print(f"文件大小: {compressed_size / 1e6:.1f} MB")
    return optimized_data

# 优化存储大型数据集
large_dataset = {'returns': np.random.randn(1000000, 100), 
                'factors': np.random.randn(1000000, 20)}
optimize_matlab_storage(large_dataset, 'compressed_data.mat')

9. 完整工作流示例

class MatlabQuantWorkflow:
    """MATLAB-Python量化工作流"""

    def __init__(self, matlab_dir):
        self.matlab_dir = matlab_dir
        self.models = {}

    def load_models(self):
        """批量加载MATLAB模型"""
        mat_files = glob.glob(os.path.join(self.matlab_dir, '*.mat'))
        self.models = parallel_load_matlab(mat_files)

        # 分类模型
        self.portfolio_models = {}
        self.pricing_models = {}
        for filename, data in self.models.items():
            if 'portfolio' in filename.lower():
                self.portfolio_models[filename] = data
            elif 'option' in filename.lower() or 'pricing' in filename.lower():
                self.pricing_models[filename] = data

    def validate_and_migrate(self):
        """验证并迁移模型"""
        migrated_models = {}

        for name, data in self.portfolio_models.items():
            try:
                validation = validate_matlab_optimization(data)
                if validation['validation_passed']:
                    migrated_models[name] = {
                        'weights': validation['matlab_weights'],
                        'risk_metrics': validation['risk_measures'],
                        'validated': True
                    }
                    print(f"✓ 验证通过: {name}")
                else:
                    print(f"✗ 验证失败: {name}")
            except Exception as e:
                print(f"✗ 处理错误 {name}: {e}")

        return migrated_models

    def export_to_python_format(self, migrated_models, output_dir):
        """导出为Python原生格式"""
        os.makedirs(output_dir, exist_ok=True)

        for name, model in migrated_models.items():
            # 保存为Pickle(快速)
            pickle_filename = os.path.join(output_dir, f"{os.path.splitext(name)[0]}.pkl")
            with open(pickle_filename, 'wb') as f:
                pickle.dump(model, f)

            # 保存为HDF5(兼容性)
            h5_filename = os.path.join(output_dir, f"{os.path.splitext(name)[0]}.h5")
            with h5py.File(h5_filename, 'w') as f:
                for key, value in model.items():
                    if isinstance(value, np.ndarray):
                        f.create_dataset(key, data=value)
                    else:
                        f.attrs[key] = value

        print(f"导出完成: {output_dir}")

    def run_complete_workflow(self):
        """完整工作流"""
        print("1. 加载MATLAB模型...")
        self.load_models()

        print("2. 验证与迁移...")
        migrated = self.validate_and_migrate()

        print("3. 导出Python格式...")
        self.export_to_python_format(migrated, 'migrated_models')

        return migrated

# 执行完整迁移
workflow = MatlabQuantWorkflow('/path/to/matlab/models')
migrated_models = workflow.run_complete_workflow()

SciPy的MATLAB数组支持为量化金融模型迁移、遗留系统集成和跨平台开发提供了关键桥梁。通过合理的数据类型转换、版本兼容性和性能优化,可以无缝地在Python生态中利用MATLAB开发的复杂金融模型。需要特定模型迁移方案或与MATLAB引擎的实时交互,请告诉我具体需求!

类似文章

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注