SciPy 插值

SciPy 插值全面指南:量化金融时间序列与曲线拟合

scipy.interpolate 模块提供了丰富的插值算法,从一维时间序列插值到多维曲面拟合,支持量化金融中的收益率曲线构建、缺失数据填补、期权定价波动率表面等核心应用。

1. 一维插值算法对比

线性插值 vs 样条插值
from scipy.interpolate import interp1d, UnivariateSpline, Akima1DInterpolator
from scipy import interpolate
import numpy as np
import matplotlib.pyplot as plt

# 模拟不规则时间序列(缺失数据)
np.random.seed(42)
t = np.sort(np.random.uniform(0, 10, 50))  # 不规则时间点
y = np.sin(t) + 0.1 * np.random.randn(50)  # 带噪声的正弦波

# 1. 线性插值(简单、快速)
linear_interp = interp1d(t, y, kind='linear', bounds_error=False, fill_value='extrapolate')

# 2. 三次样条插值(平滑、连续导数)
cubic_interp = interp1d(t, y, kind='cubic', bounds_error=False, fill_value='extrapolate')

# 3. Akima插值(避免Runge现象)
akima_interp = Akima1DInterpolator(t, y)

# 4. UnivariateSpline(参数化样条,可控平滑度)
spline = UnivariateSpline(t, y, s=0.1)  # s=平滑参数

# 密集查询点
t_fine = np.linspace(0, 10, 1000)

# 计算插值结果
y_linear = linear_interp(t_fine)
y_cubic = cubic_interp(t_fine)
y_akima = akima_interp(t_fine)
y_spline = spline(t_fine)

# 可视化对比
plt.figure(figsize=(12, 8))
plt.plot(t, y, 'ko', label='原始数据', markersize=4)
plt.plot(t_fine, y_linear, 'r-', label='线性插值', alpha=0.8)
plt.plot(t_fine, y_cubic, 'b-', label='三次样条', alpha=0.8)
plt.plot(t_fine, y_akima, 'g-', label='Akima插值', alpha=0.8)
plt.plot(t_fine, y_spline, 'm-', label='UnivariateSpline', alpha=0.8)
plt.legend()
plt.title('不同插值方法对比')
plt.xlabel('时间')
plt.ylabel('值')
plt.grid(True, alpha=0.3)
plt.show()
插值误差分析
def interpolation_error_analysis(true_func, t_sample, methods=['linear', 'cubic', 'akima']):
    """插值误差分析"""
    y_true_sample = true_func(t_sample)
    t_fine = np.linspace(t_sample.min(), t_sample.max(), 1000)
    y_true_fine = true_func(t_fine)

    errors = {}
    for method in methods:
        if method == 'linear':
            interp = interp1d(t_sample, y_true_sample, kind='linear', 
                            bounds_error=False)
        elif method == 'cubic':
            interp = interp1d(t_sample, y_true_sample, kind='cubic', 
                            bounds_error=False)
        elif method == 'akima':
            interp = Akima1DInterpolator(t_sample, y_true_sample)

        y_interp = interp(t_fine)
        rmse = np.sqrt(np.mean((y_interp - y_true_fine)**2))
        max_error = np.max(np.abs(y_interp - y_true_fine))

        errors[method] = {'rmse': rmse, 'max_error': max_error}

    return errors, t_fine, y_true_fine

# 测试不同采样密度
sample_densities = [10, 20, 50, 100]
true_func = lambda t: np.sin(2 * np.pi * t) + 0.5 * np.sin(4 * np.pi * t)

plt.figure(figsize=(12, 8))
for i, n_samples in enumerate(sample_densities):
    t_sample = np.linspace(0, 10, n_samples)
    errors, _, _ = interpolation_error_analysis(true_func, t_sample)

    plt.subplot(2, 2, i+1)
    for method, err in errors.items():
        plt.scatter(n_samples, err['rmse'], label=method)
    plt.xscale('log')
    plt.yscale('log')
    plt.xlabel('采样点数')
    plt.ylabel('RMSE')
    plt.title(f'采样点数: {n_samples}')
    plt.legend()
    plt.grid(True)
plt.tight_layout()
plt.show()

2. 金融时间序列插值

收益率曲线插值(Nelson-Siegel模型)
from scipy.interpolate import interp1d, splev, splrep
from scipy.optimize import curve_fit

def nelson_siegel(t, beta0, beta1, beta2, tau):
    """Nelson-Siegel收益率曲线模型"""
    return (beta0 + beta1 * (1 - np.exp(-t/tau)) / (t/tau) + 
            beta2 * ((1 - np.exp(-t/tau)) / (t/tau) - np.exp(-t/tau)))

def fit_yield_curve_interpolation(maturities, yields, method='spline'):
    """收益率曲线拟合与插值"""

    # 参数化拟合(Nelson-Siegel)
    try:
        popt, _ = curve_fit(nelson_siegel, maturities, yields, 
                           p0=[yields.mean(), -0.02, -0.02, 2.0],
                           bounds=([0, -1, -1, 0.1], [0.1, 0, 0, 10]))
        ns_func = lambda t: nelson_siegel(t, *popt)
    except:
        ns_func = None

    # 样条插值
    if method == 'spline':
        tck = splrep(maturities, yields, s=0.01)  # 平滑样条
        spline_func = lambda t: splev(t, tck)
    elif method == 'linear':
        spline_func = interp1d(maturities, yields, kind='linear', 
                              bounds_error=False, fill_value='extrapolate')
    else:
        spline_func = interp1d(maturities, yields, kind='cubic', 
                              bounds_error=False, fill_value='extrapolate')

    # 查询点
    query_maturities = np.linspace(maturities.min(), maturities.max(), 100)

    results = {
        'ns_func': ns_func,
        'spline_func': spline_func,
        'query_maturities': query_maturities,
        'params': popt if ns_func else None
    }

    return results

# 示例:收益率曲线插值
maturities = np.array([0.25, 0.5, 1, 2, 5, 10, 20, 30])
yields = np.array([0.02, 0.025, 0.03, 0.035, 0.04, 0.045, 0.05, 0.048])

curve_results = fit_yield_curve_interpolation(maturities, yields, method='spline')

# 可视化
plt.figure(figsize=(10, 6))
plt.plot(maturities, yields, 'ro', markersize=8, label='市场报价')
plt.plot(curve_results['query_maturities'], 
         curve_results['spline_func'](curve_results['query_maturities']), 
         'b-', label='样条插值')
if curve_results['ns_func']:
    plt.plot(curve_results['query_maturities'], 
             curve_results['ns_func'](curve_results['query_maturities']), 
             'g--', label='Nelson-Siegel')
plt.xlabel('期限 (年)')
plt.ylabel('收益率')
plt.title('收益率曲线插值')
plt.legend()
plt.grid(True)
plt.show()
缺失交易数据填补
def fill_missing_trades(trade_times, trade_prices, query_times, method='spline'):
    """填补缺失交易数据"""

    # 确保时间排序
    sort_idx = np.argsort(trade_times)
    t_sorted = trade_times[sort_idx]
    p_sorted = trade_prices[sort_idx]

    # 去除完全重复的时间点
    unique_mask = np.r_[True, t_sorted[1:] != t_sorted[:-1]]
    t_unique = t_sorted[unique_mask]
    p_unique = p_sorted[unique_mask]

    if method == 'spline':
        # 使用样条插值(适合价格路径)
        tck = splrep(t_unique, p_unique, s=len(p_unique)*0.1)
        filled_prices = splev(query_times, tck, ext=1)  # 外推
    elif method == 'linear':
        interp = interp1d(t_unique, p_unique, kind='linear', 
                         bounds_error=False, fill_value='extrapolate')
        filled_prices = interp(query_times)
    else:
        # 最近邻插值(适合稀疏数据)
        from scipy.interpolate import interp1d
        interp = interp1d(t_unique, p_unique, kind='previous', 
                         bounds_error=False, fill_value='extrapolate')
        filled_prices = interp(query_times)

    return filled_prices, t_unique, p_unique

# 模拟缺失交易数据
np.random.seed(42)
trade_times = np.sort(np.random.uniform(0, 252, 200))  # 交易日
trade_prices = 100 * np.cumprod(1 + np.random.randn(200) * 0.01)  # 随机游走

# 创建规则时间网格(每日)
query_times = np.arange(252)

# 缺失数据填补
filled_prices, _, _ = fill_missing_trades(trade_times, trade_prices, query_times)

# 可视化
plt.figure(figsize=(12, 6))
plt.plot(trade_times, trade_prices, 'ro', label='实际交易', markersize=4)
plt.plot(query_times, filled_prices, 'b-', label='插值填补', linewidth=1)
plt.xlabel('交易日')
plt.ylabel('价格')
plt.title('缺失交易数据填补')
plt.legend()
plt.grid(True)
plt.show()

3. 二维插值:波动率表面

双线性插值与双三次插值
from scipy.interpolate import RectBivariateSpline, griddata
import numpy as np

def volatility_surface_interpolation(strikes, maturities, implied_vols, 
                                   query_strikes, query_maturities):
    """期权隐含波动率表面插值"""

    # 确保网格规则
    strike_grid, maturity_grid = np.meshgrid(strikes, maturities, indexing='ij')

    # 方法1:RectBivariateSpline(规则网格)
    spline = RectBivariateSpline(maturities, strikes, implied_vols, 
                                s=0.1, kx=3, ky=3)  # 双三次样条

    # 查询
    vol_surface_spline = spline(query_maturities, query_strikes)

    # 方法2:griddata(不规则数据)
    points = np.array([strike_grid.ravel(), maturity_grid.ravel()]).T
    values = implied_vols.ravel()

    # 不同插值方法
    vol_surface_linear = griddata(points, values, 
                                 (query_strikes[None,:], query_maturities[:,None]), 
                                 method='linear')
    vol_surface_cubic = griddata(points, values, 
                                (query_strikes[None,:], query_maturities[:,None]), 
                                method='cubic')

    return {
        'spline': vol_surface_spline,
        'linear': vol_surface_linear,
        'cubic': vol_surface_cubic,
        'strike_grid': query_strikes,
        'maturity_grid': query_maturities
    }

# 生成模拟波动率表面
strikes = np.array([80, 90, 100, 110, 120])
maturities = np.array([0.25, 0.5, 1.0, 2.0])
strike_grid, maturity_grid = np.meshgrid(strikes, maturities)

# 模拟隐含波动率(微笑效应)
implied_vols = (0.2 + 0.1 * np.abs(strike_grid - 100) / 100 + 
                0.05 * np.log(1 + maturity_grid) + 
                0.02 * np.random.randn(*strike_grid.shape))

# 更密集查询网格
query_strikes = np.linspace(75, 125, 50)
query_maturities = np.linspace(0.1, 2.5, 30)

vol_results = volatility_surface_interpolation(strikes, maturities, implied_vols,
                                              query_strikes, query_maturities)

# 3D可视化
from mpl_toolkits.mplot3d import Axes3D
fig = plt.figure(figsize=(15, 5))

ax1 = fig.add_subplot(131, projection='3d')
X, Y = np.meshgrid(query_strikes, query_maturities)
surf1 = ax1.plot_surface(X, Y, vol_results['spline'], cmap='viridis')
ax1.set_title('RectBivariateSpline')
ax1.set_xlabel('Strike')
ax1.set_ylabel('Maturity')
ax1.set_zlabel('Implied Vol')

ax2 = fig.add_subplot(132, projection='3d')
surf2 = ax2.plot_surface(X, Y, vol_results['linear'], cmap='viridis')
ax2.set_title('Linear GridData')
ax2.set_xlabel('Strike')
ax2.set_ylabel('Maturity')

ax3 = fig.add_subplot(133)
im = ax3.contourf(X, Y, vol_results['spline'], levels=20, cmap='RdYlBu_r')
ax3.scatter(strikes, maturities, implied_vols.ravel(), c='black', s=50, marker='o')
plt.colorbar(im, ax=ax3)
ax3.set_title('波动率微笑表面')
ax3.set_xlabel('Strike')
ax3.set_ylabel('Maturity')

plt.tight_layout()
plt.show()
径向基函数插值(RBF)
from scipy.interpolate import RBFInterpolator, Rbf

def rbf_volatility_interpolation(scattered_points, vols, query_grid):
    """RBF波动率表面插值(不规则数据)"""

    # 方法1:RBFInterpolator(推荐,NumPy 1.19+)
    rbf_interp = RBFInterpolator(scattered_points, vols, kernel='thin_plate_spline')
    vol_rbf = rbf_interp(query_grid)

    # 方法2:传统Rbf类
    rbf = Rbf(*scattered_points.T, vols, function='thin_plate', smooth=0.1)
    vol_rbf_traditional = rbf(*query_grid.T)

    return vol_rbf, vol_rbf_traditional

# 不规则波动率数据点
n_points = 50
scattered_strikes = np.random.uniform(80, 120, n_points)
scattered_maturities = np.random.uniform(0.1, 2.0, n_points)
scattered_vols = 0.2 + 0.15 * np.abs(scattered_strikes - 100) / 100 + \
                0.05 * np.log(1 + scattered_maturities) + \
                0.01 * np.random.randn(n_points)

scattered_points = np.column_stack([scattered_strikes, scattered_maturities])

# 查询网格
query_strikes, query_maturities = np.meshgrid(np.linspace(75, 125, 50), 
                                             np.linspace(0.1, 2.5, 30))
query_grid = np.column_stack([query_strikes.ravel(), query_maturities.ravel()])

vol_rbf, vol_traditional = rbf_volatility_interpolation(scattered_points, 
                                                       scattered_vols, query_grid)

# 重塑为网格
vol_rbf_grid = vol_rbf.reshape(query_maturities.shape)

plt.figure(figsize=(10, 8))
plt.contourf(query_strikes, query_maturities, vol_rbf_grid, levels=20, cmap='RdYlBu_r')
plt.colorbar(label='Implied Volatility')
plt.scatter(scattered_strikes, scattered_maturities, c=scattered_vols, 
           s=50, cmap='RdYlBu_r', edgecolors='black')
plt.title('RBF波动率表面插值')
plt.xlabel('Strike Price')
plt.ylabel('Time to Maturity')
plt.show()

4. 不规则数据插值

Delaunay三角剖分插值
from scipy.spatial import Delaunay
from scipy.interpolate import LinearNDInterpolator, griddata

def delaunay_interpolation(scattered_points, values, query_points):
    """Delaunay三角剖分插值"""

    # Delaunay三角剖分
    tri = Delaunay(scattered_points)

    # 三角剖分插值
    interp = LinearNDInterpolator(tri, values)
    interpolated = interp(query_points)

    # 备用:griddata
    grid_interp = griddata(scattered_points, values, query_points, 
                          method='linear', fill_value=np.nan)

    return interpolated, grid_interp, tri

# 模拟不规则金融数据(多维)
n_points = 100
points = np.random.rand(n_points, 2) * 10  # 2D空间
values = np.sin(points[:, 0]) * np.cos(points[:, 1]) + 0.1 * np.random.randn(n_points)

# 查询网格
xi, yi = np.mgrid[0:10:50j, 0:10:50j]
query_points = np.column_stack([xi.ravel(), yi.ravel()])

interp_values, grid_values, triangulation = delaunay_interpolation(points, values, query_points)

# 可视化
plt.figure(figsize=(12, 5))
plt.subplot(121)
plt.triplot(triangulation.simplices[:, 0], triangulation.simplices[:, 1], 'b-')
plt.scatter(points[:, 0], points[:, 1], c=values, s=50, cmap='viridis')
plt.title('Delaunay三角剖分')
plt.colorbar(label='原始值')

plt.subplot(122)
plt.contourf(xi, yi, interp_values.reshape(xi.shape), levels=20, cmap='viridis')
plt.colorbar(label='插值结果')
plt.scatter(points[:, 0], points[:, 1], c=values, s=30, edgecolors='black')
plt.title('三角剖分插值')
plt.tight_layout()
plt.show()
CloughTocher2DInterpolator(二次插值)
from scipy.interpolate import CloughTocher2DInterpolator

def clough_tocher_interpolation(points, values, query_grid):
    """Clough-Tocher二次三角插值(更平滑)"""

    # Clough-Tocher插值器
    ct_interp = CloughTocher2DInterpolator(points, values)
    interpolated = ct_interp(query_grid)

    return interpolated

# 使用Clough-Tocher插值
ct_values = clough_tocher_interpolation(points, values, query_points)
ct_grid = ct_values.reshape(xi.shape)

plt.figure(figsize=(10, 6))
plt.contourf(xi, yi, ct_grid, levels=20, cmap='plasma')
plt.colorbar(label='Clough-Tocher插值')
plt.scatter(points[:, 0], points[:, 1], c=values, s=50, edgecolors='white')
plt.title('Clough-Tocher二次插值')
plt.xlabel('X')
plt.ylabel('Y')
plt.show()

5. 高阶插值与正则化

B样条插值(精确控制)
from scipy.interpolate import BSpline, splrep, splev

def bspline_interpolation(x, y, degree=3, periodic=False):
    """B样条插值(可精确控制节点)"""

    # 自动生成节点
    tck, u = splrep(x, y, s=0, k=degree)  # s=0为插值,k=样条阶数

    # 手动构造BSpline
    n = len(x)
    t = tck[0]  # 节点向量
    c = tck[1]  # 系数
    k = tck[2]  # 阶数

    bspline = BSpline(t, c, k)

    return bspline, tck

# 示例:高精度曲线拟合
x = np.linspace(0, 10, 20)
y = np.exp(-x/5) * np.sin(2*x) + 0.05 * np.random.randn(20)

bspline, tck = bspline_interpolation(x, y, degree=3)

x_fine = np.linspace(0, 10, 1000)
y_bspline = bspline(x_fine)
y_splev = splev(x_fine, tck)

plt.figure(figsize=(10, 6))
plt.plot(x, y, 'ro', label='数据点')
plt.plot(x_fine, y_bspline, 'b-', label='B样条')
plt.plot(x_fine, y_splev, 'g--', label='splev', alpha=0.8)
plt.legend()
plt.title('B样条插值')
plt.grid(True)
plt.show()
正则化样条(平滑控制)
def regularized_spline_interpolation(x, y, smoothing_factor=None):
    """正则化样条插值(控制过拟合)"""

    if smoothing_factor is None:
        # 自动选择平滑参数
        smoothing_factor = len(x) * (np.std(y) ** 2) * 0.01

    # UnivariateSpline带正则化
    reg_spline = UnivariateSpline(x, y, s=smoothing_factor)

    # 比较不同平滑度
    smooth_levels = [0, len(x)*0.1, len(x)*1, len(x)*10]
    plt.figure(figsize=(12, 8))

    plt.plot(x, y, 'ko', markersize=6, label='原始数据')

    for i, s in enumerate(smooth_levels):
        spline = UnivariateSpline(x, y, s=s)
        plt.plot(x_fine, spline(x_fine), label=f's={s:.1f}', 
                linestyle=['-', '--', '-.', ':'][i])

    plt.legend()
    plt.title('正则化样条插值(不同平滑参数)')
    plt.xlabel('X')
    plt.ylabel('Y')
    plt.grid(True)
    plt.show()

    return reg_spline

# 噪声数据平滑插值
reg_spline = regularized_spline_interpolation(x, y)

6. 金融高级应用

利率期限结构动态插值
class DynamicYieldCurve:
    """动态收益率曲线插值器"""

    def __init__(self):
        self.curves = {}
        self.interpolators = {}

    def add_curve(self, date, maturities, yields):
        """添加收益率曲线"""
        interp = fit_yield_curve_interpolation(maturities, yields, method='spline')
        self.curves[date] = {
            'maturities': maturities,
            'yields': yields,
            'interpolator': interp['spline_func']
        }

    def interpolate_curve(self, date, target_maturity):
        """插值特定日期的收益率"""
        if date in self.curves:
            return self.curves[date]['interpolator'](target_maturity)
        else:
            # 时间插值最近的曲线
            dates = sorted(self.curves.keys())
            idx = np.searchsorted(dates, date)
            if idx == 0:
                return self.curves[dates[0]]['interpolator'](target_maturity)
            elif idx == len(dates):
                return self.curves[dates[-1]]['interpolator'](target_maturity)
            else:
                # 线性插值两日曲线
                date1, date2 = dates[idx-1], dates[idx]
                y1 = self.curves[date1]['interpolator'](target_maturity)
                y2 = self.curves[date2]['interpolator'](target_maturity)
                weight = (date - date1) / (date2 - date1)
                return (1 - weight) * y1 + weight * y2

    def forward_curve(self, date, maturity_range):
        """远期曲线计算"""
        spot_rates = np.array([self.interpolate_curve(date, t) for t in maturity_range])
        # 简单远期率近似
        forward_rates = np.diff(spot_rates) / np.diff(maturity_range)
        return forward_rates

# 示例:动态收益率曲线
dates = pd.date_range('2024-01-01', periods=10, freq='B')
yield_curve = DynamicYieldCurve()

maturities_base = np.array([0.25, 0.5, 1, 2, 5, 10])
for i, date in enumerate(dates):
    # 模拟收益率变动
    yields = 0.03 + 0.005 * np.sin(i * 0.5) + np.random.randn(len(maturities_base)) * 0.001
    yield_curve.add_curve(date, maturities_base, yields)

# 查询2年期收益率时间序列
query_date = pd.date_range('2024-01-01', '2024-01-20', freq='D')
two_year_yields = [yield_curve.interpolate_curve(d, 2.0) for d in query_date]

plt.figure(figsize=(12, 6))
plt.plot(query_date, two_year_yields)
plt.title('2年期收益率时间序列(插值)')
plt.ylabel('收益率')
plt.xticks(rotation=45)
plt.grid(True)
plt.show()
路径依赖期权定价插值
def barrier_option_pricing_interpolation(spot, strike, barrier, maturity, 
                                        vol_surface_func):
    """障碍期权定价(波动率表面插值)"""

    # 插值当前波动率
    current_vol = vol_surface_func(strike, maturity)

    # 简化定价(实际使用蒙特卡洛或有限差分)
    # 这里仅演示插值调用
    time_steps = 252
    dt = maturity / time_steps

    # 路径模拟(简化)
    paths = np.zeros(time_steps)
    paths[0] = spot

    for t in range(1, time_steps):
        # 插值瞬时波动率
        time_to_mat = maturity - t * dt
        step_vol = vol_surface_func(spot, time_to_mat)
        dW = np.random.randn() * np.sqrt(dt)
        paths[t] = paths[t-1] * np.exp((np.log(1.05)-0.5*step_vol**2)*dt + 
                                      step_vol * dW)

    # 障碍检查
    knocked_out = np.any(paths <= barrier)
    payoff = np.maximum(paths[-1] - strike, 0) * (1 - knocked_out)

    return payoff, current_vol

# 批量定价
strikes = np.linspace(90, 110, 10)
results = []
for K in strikes:
    payoff, vol = barrier_option_pricing_interpolation(100, K, 95, 1.0, 
                                                      vol_results['spline'].evalfunc)
    results.append({'strike': K, 'payoff': payoff, 'vol': vol})

df_results = pd.DataFrame(results)
print(df_results)

7. 性能优化与大规模插值

向量化插值
def vectorized_interpolation(interp_func, query_points):
    """向量化批量插值"""

    # 分批处理避免内存问题
    batch_size = 10000
    n_queries = len(query_points)
    results = np.zeros(n_queries)

    for i in range(0, n_queries, batch_size):
        batch = query_points[i:i+batch_size]
        results[i:i+batch_size] = interp_func(batch)

    return results

# 大规模时间序列插值
n_assets = 1000
n_days = 252
trade_days = np.random.choice(252, size=(n_assets, int(0.8*252)), replace=False)
trade_returns = np.random.randn(n_assets, int(0.8*252)) * 0.01

# 为每个资产构建插值器
interpolators = []
for i in range(n_assets):
    sorted_idx = np.argsort(trade_days[i])
    interp = interp1d(trade_days[i][sorted_idx], trade_returns[i][sorted_idx], 
                     kind='linear', bounds_error=False, fill_value=0)
    interpolators.append(interp)

# 向量化填充完整日历
full_returns = np.zeros((n_assets, n_days))
for i, interp in enumerate(interpolators):
    full_returns[i] = vectorized_interpolation(interp, np.arange(n_days))

print(f"完整回报矩阵: {full_returns.shape}")
print(f"缺失数据填补率: {np.isnan(full_returns).sum() / full_returns.size * 100:.1f}%")
内存高效插值
class MemoryEfficientInterpolator:
    """内存高效插值器"""

    def __init__(self, x, y, method='linear', chunk_size=10000):
        self.chunk_size = chunk_size
        self.interp = interp1d(x, y, kind=method, 
                              bounds_error=False, fill_value='extrapolate')

    def __call__(self, query_points):
        if len(query_points) <= self.chunk_size:
            return self.interp(query_points)
        else:
            # 分块处理
            results = []
            for i in range(0, len(query_points), self.chunk_size):
                chunk = query_points[i:i+self.chunk_size]
                results.append(self.interp(chunk))
            return np.concatenate(results)

# 使用示例
large_interp = MemoryEfficientInterpolator(t, y, chunk_size=1000)
large_query = np.linspace(0, 10, 100000)
result = large_interp(large_query)

8. 插值质量评估

交叉验证与插值评估
def cross_validate_interpolation(x, y, method='cubic', n_folds=5):
    """插值交叉验证"""

    from sklearn.model_selection import KFold
    kf = KFold(n_splits=n_folds, shuffle=True, random_state=42)
    scores = []

    for train_idx, test_idx in kf.split(x):
        x_train, x_test = x[train_idx], x[test_idx]
        y_train, y_test = y[train_idx], y[test_idx]

        # 训练插值器
        if method == 'linear':
            interp = interp1d(x_train, y_train, kind='linear')
        else:
            interp = interp1d(x_train, y_train, kind='cubic')

        # 测试
        y_pred = interp(x_test)
        rmse = np.sqrt(np.mean((y_pred - y_test)**2))
        scores.append(rmse)

    return np.mean(scores), np.std(scores)

# 评估不同方法
methods = ['linear', 'cubic', 'spline']
results = {}
for method in methods:
    mean_error, std_error = cross_validate_interpolation(t, y, method)
    results[method] = {'mean_rmse': mean_error, 'std_rmse': std_error}

print("插值方法交叉验证结果:")
for method, res in results.items():
    print(f"{method:8s}: RMSE = {res['mean_rmse']:.6f} ± {res['std_rmse']:.6f}")

9. 完整金融插值工作流

class FinancialInterpolator:
    """金融数据插值工作流"""

    def __init__(self):
        self.interpolators = {}
        self.metadata = {}

    def fit_time_series(self, dates, values, asset_id, method='spline'):
        """拟合时间序列插值器"""
        # 转换为数值时间
        t_numeric = (dates - dates.min()).days
        self.interpolators[asset_id] = interp1d(t_numeric, values, 
                                               kind=method, 
                                               bounds_error=False)
        self.metadata[asset_id] = {
            'dates': dates,
            'method': method,
            'n_points': len(values)
        }

    def get_complete_series(self, start_date, end_date, freq='D', assets=None):
        """获取完整时间序列"""
        if assets is None:
            assets = list(self.interpolators.keys())

        complete_dates = pd.date_range(start_date, end_date, freq=freq)
        t_target = (complete_dates - complete_dates[0]).days

        results = {}
        for asset in assets:
            interp = self.interpolators[asset]
            interpolated = interp(t_target)
            results[asset] = pd.Series(interpolated, index=complete_dates)

        return pd.DataFrame(results)

    def validate_interpolation(self, asset_id, test_ratio=0.2):
        """插值质量验证"""
        dates = self.metadata[asset_id]['dates']
        values = self.interpolators[asset_id]( (dates - dates[0]).days )

        # 简单验证:与原始数据比较
        original_values = pd.Series(values, index=dates).dropna()
        rmse = np.sqrt(np.mean((original_values - values)**2))

        return {'rmse': rmse, 'n_valid': len(original_values)}

# 使用示例
financial_interp = FinancialInterpolator()

# 模拟多资产数据
assets = ['AAPL', 'GOOG', 'MSFT']
for asset in assets:
    dates = pd.date_range('2024-01-01', '2024-06-30', freq='B')
    # 随机缺失20%数据
    mask = np.random.choice([True, False], len(dates), p=[0.8, 0.2])
    sparse_dates = dates[mask]
    sparse_returns = np.random.randn(len(sparse_dates)) * 0.01

    financial_interp.fit_time_series(sparse_dates, sparse_returns, asset)

# 获取完整数据
complete_data = financial_interp.get_complete_series('2024-01-01', '2024-12-31', 
                                                   assets=assets)

# 验证质量
for asset in assets:
    validation = financial_interp.validate_interpolation(asset)
    print(f"{asset}: RMSE = {validation['rmse']:.6f}")

print("\n完整数据形状:", complete_data.shape)
print(complete_data.head())

SciPy插值工具在量化金融中应用广泛,从收益率曲线构建、波动率表面插值到缺失数据填补,都提供了高效且精确的解决方案。通过合理选择插值方法和参数,可以平衡计算效率与拟合精度。需要特定金融插值算法优化或与机器学习模型集成,请告诉我具体需求!

类似文章

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注