SciPy 插值
SciPy 插值全面指南:量化金融时间序列与曲线拟合
scipy.interpolate
模块提供了丰富的插值算法,从一维时间序列插值到多维曲面拟合,支持量化金融中的收益率曲线构建、缺失数据填补、期权定价波动率表面等核心应用。
1. 一维插值算法对比
线性插值 vs 样条插值
from scipy.interpolate import interp1d, UnivariateSpline, Akima1DInterpolator
from scipy import interpolate
import numpy as np
import matplotlib.pyplot as plt
# 模拟不规则时间序列(缺失数据)
np.random.seed(42)
t = np.sort(np.random.uniform(0, 10, 50)) # 不规则时间点
y = np.sin(t) + 0.1 * np.random.randn(50) # 带噪声的正弦波
# 1. 线性插值(简单、快速)
linear_interp = interp1d(t, y, kind='linear', bounds_error=False, fill_value='extrapolate')
# 2. 三次样条插值(平滑、连续导数)
cubic_interp = interp1d(t, y, kind='cubic', bounds_error=False, fill_value='extrapolate')
# 3. Akima插值(避免Runge现象)
akima_interp = Akima1DInterpolator(t, y)
# 4. UnivariateSpline(参数化样条,可控平滑度)
spline = UnivariateSpline(t, y, s=0.1) # s=平滑参数
# 密集查询点
t_fine = np.linspace(0, 10, 1000)
# 计算插值结果
y_linear = linear_interp(t_fine)
y_cubic = cubic_interp(t_fine)
y_akima = akima_interp(t_fine)
y_spline = spline(t_fine)
# 可视化对比
plt.figure(figsize=(12, 8))
plt.plot(t, y, 'ko', label='原始数据', markersize=4)
plt.plot(t_fine, y_linear, 'r-', label='线性插值', alpha=0.8)
plt.plot(t_fine, y_cubic, 'b-', label='三次样条', alpha=0.8)
plt.plot(t_fine, y_akima, 'g-', label='Akima插值', alpha=0.8)
plt.plot(t_fine, y_spline, 'm-', label='UnivariateSpline', alpha=0.8)
plt.legend()
plt.title('不同插值方法对比')
plt.xlabel('时间')
plt.ylabel('值')
plt.grid(True, alpha=0.3)
plt.show()
插值误差分析
def interpolation_error_analysis(true_func, t_sample, methods=['linear', 'cubic', 'akima']):
"""插值误差分析"""
y_true_sample = true_func(t_sample)
t_fine = np.linspace(t_sample.min(), t_sample.max(), 1000)
y_true_fine = true_func(t_fine)
errors = {}
for method in methods:
if method == 'linear':
interp = interp1d(t_sample, y_true_sample, kind='linear',
bounds_error=False)
elif method == 'cubic':
interp = interp1d(t_sample, y_true_sample, kind='cubic',
bounds_error=False)
elif method == 'akima':
interp = Akima1DInterpolator(t_sample, y_true_sample)
y_interp = interp(t_fine)
rmse = np.sqrt(np.mean((y_interp - y_true_fine)**2))
max_error = np.max(np.abs(y_interp - y_true_fine))
errors[method] = {'rmse': rmse, 'max_error': max_error}
return errors, t_fine, y_true_fine
# 测试不同采样密度
sample_densities = [10, 20, 50, 100]
true_func = lambda t: np.sin(2 * np.pi * t) + 0.5 * np.sin(4 * np.pi * t)
plt.figure(figsize=(12, 8))
for i, n_samples in enumerate(sample_densities):
t_sample = np.linspace(0, 10, n_samples)
errors, _, _ = interpolation_error_analysis(true_func, t_sample)
plt.subplot(2, 2, i+1)
for method, err in errors.items():
plt.scatter(n_samples, err['rmse'], label=method)
plt.xscale('log')
plt.yscale('log')
plt.xlabel('采样点数')
plt.ylabel('RMSE')
plt.title(f'采样点数: {n_samples}')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
2. 金融时间序列插值
收益率曲线插值(Nelson-Siegel模型)
from scipy.interpolate import interp1d, splev, splrep
from scipy.optimize import curve_fit
def nelson_siegel(t, beta0, beta1, beta2, tau):
"""Nelson-Siegel收益率曲线模型"""
return (beta0 + beta1 * (1 - np.exp(-t/tau)) / (t/tau) +
beta2 * ((1 - np.exp(-t/tau)) / (t/tau) - np.exp(-t/tau)))
def fit_yield_curve_interpolation(maturities, yields, method='spline'):
"""收益率曲线拟合与插值"""
# 参数化拟合(Nelson-Siegel)
try:
popt, _ = curve_fit(nelson_siegel, maturities, yields,
p0=[yields.mean(), -0.02, -0.02, 2.0],
bounds=([0, -1, -1, 0.1], [0.1, 0, 0, 10]))
ns_func = lambda t: nelson_siegel(t, *popt)
except:
ns_func = None
# 样条插值
if method == 'spline':
tck = splrep(maturities, yields, s=0.01) # 平滑样条
spline_func = lambda t: splev(t, tck)
elif method == 'linear':
spline_func = interp1d(maturities, yields, kind='linear',
bounds_error=False, fill_value='extrapolate')
else:
spline_func = interp1d(maturities, yields, kind='cubic',
bounds_error=False, fill_value='extrapolate')
# 查询点
query_maturities = np.linspace(maturities.min(), maturities.max(), 100)
results = {
'ns_func': ns_func,
'spline_func': spline_func,
'query_maturities': query_maturities,
'params': popt if ns_func else None
}
return results
# 示例:收益率曲线插值
maturities = np.array([0.25, 0.5, 1, 2, 5, 10, 20, 30])
yields = np.array([0.02, 0.025, 0.03, 0.035, 0.04, 0.045, 0.05, 0.048])
curve_results = fit_yield_curve_interpolation(maturities, yields, method='spline')
# 可视化
plt.figure(figsize=(10, 6))
plt.plot(maturities, yields, 'ro', markersize=8, label='市场报价')
plt.plot(curve_results['query_maturities'],
curve_results['spline_func'](curve_results['query_maturities']),
'b-', label='样条插值')
if curve_results['ns_func']:
plt.plot(curve_results['query_maturities'],
curve_results['ns_func'](curve_results['query_maturities']),
'g--', label='Nelson-Siegel')
plt.xlabel('期限 (年)')
plt.ylabel('收益率')
plt.title('收益率曲线插值')
plt.legend()
plt.grid(True)
plt.show()
缺失交易数据填补
def fill_missing_trades(trade_times, trade_prices, query_times, method='spline'):
"""填补缺失交易数据"""
# 确保时间排序
sort_idx = np.argsort(trade_times)
t_sorted = trade_times[sort_idx]
p_sorted = trade_prices[sort_idx]
# 去除完全重复的时间点
unique_mask = np.r_[True, t_sorted[1:] != t_sorted[:-1]]
t_unique = t_sorted[unique_mask]
p_unique = p_sorted[unique_mask]
if method == 'spline':
# 使用样条插值(适合价格路径)
tck = splrep(t_unique, p_unique, s=len(p_unique)*0.1)
filled_prices = splev(query_times, tck, ext=1) # 外推
elif method == 'linear':
interp = interp1d(t_unique, p_unique, kind='linear',
bounds_error=False, fill_value='extrapolate')
filled_prices = interp(query_times)
else:
# 最近邻插值(适合稀疏数据)
from scipy.interpolate import interp1d
interp = interp1d(t_unique, p_unique, kind='previous',
bounds_error=False, fill_value='extrapolate')
filled_prices = interp(query_times)
return filled_prices, t_unique, p_unique
# 模拟缺失交易数据
np.random.seed(42)
trade_times = np.sort(np.random.uniform(0, 252, 200)) # 交易日
trade_prices = 100 * np.cumprod(1 + np.random.randn(200) * 0.01) # 随机游走
# 创建规则时间网格(每日)
query_times = np.arange(252)
# 缺失数据填补
filled_prices, _, _ = fill_missing_trades(trade_times, trade_prices, query_times)
# 可视化
plt.figure(figsize=(12, 6))
plt.plot(trade_times, trade_prices, 'ro', label='实际交易', markersize=4)
plt.plot(query_times, filled_prices, 'b-', label='插值填补', linewidth=1)
plt.xlabel('交易日')
plt.ylabel('价格')
plt.title('缺失交易数据填补')
plt.legend()
plt.grid(True)
plt.show()
3. 二维插值:波动率表面
双线性插值与双三次插值
from scipy.interpolate import RectBivariateSpline, griddata
import numpy as np
def volatility_surface_interpolation(strikes, maturities, implied_vols,
query_strikes, query_maturities):
"""期权隐含波动率表面插值"""
# 确保网格规则
strike_grid, maturity_grid = np.meshgrid(strikes, maturities, indexing='ij')
# 方法1:RectBivariateSpline(规则网格)
spline = RectBivariateSpline(maturities, strikes, implied_vols,
s=0.1, kx=3, ky=3) # 双三次样条
# 查询
vol_surface_spline = spline(query_maturities, query_strikes)
# 方法2:griddata(不规则数据)
points = np.array([strike_grid.ravel(), maturity_grid.ravel()]).T
values = implied_vols.ravel()
# 不同插值方法
vol_surface_linear = griddata(points, values,
(query_strikes[None,:], query_maturities[:,None]),
method='linear')
vol_surface_cubic = griddata(points, values,
(query_strikes[None,:], query_maturities[:,None]),
method='cubic')
return {
'spline': vol_surface_spline,
'linear': vol_surface_linear,
'cubic': vol_surface_cubic,
'strike_grid': query_strikes,
'maturity_grid': query_maturities
}
# 生成模拟波动率表面
strikes = np.array([80, 90, 100, 110, 120])
maturities = np.array([0.25, 0.5, 1.0, 2.0])
strike_grid, maturity_grid = np.meshgrid(strikes, maturities)
# 模拟隐含波动率(微笑效应)
implied_vols = (0.2 + 0.1 * np.abs(strike_grid - 100) / 100 +
0.05 * np.log(1 + maturity_grid) +
0.02 * np.random.randn(*strike_grid.shape))
# 更密集查询网格
query_strikes = np.linspace(75, 125, 50)
query_maturities = np.linspace(0.1, 2.5, 30)
vol_results = volatility_surface_interpolation(strikes, maturities, implied_vols,
query_strikes, query_maturities)
# 3D可视化
from mpl_toolkits.mplot3d import Axes3D
fig = plt.figure(figsize=(15, 5))
ax1 = fig.add_subplot(131, projection='3d')
X, Y = np.meshgrid(query_strikes, query_maturities)
surf1 = ax1.plot_surface(X, Y, vol_results['spline'], cmap='viridis')
ax1.set_title('RectBivariateSpline')
ax1.set_xlabel('Strike')
ax1.set_ylabel('Maturity')
ax1.set_zlabel('Implied Vol')
ax2 = fig.add_subplot(132, projection='3d')
surf2 = ax2.plot_surface(X, Y, vol_results['linear'], cmap='viridis')
ax2.set_title('Linear GridData')
ax2.set_xlabel('Strike')
ax2.set_ylabel('Maturity')
ax3 = fig.add_subplot(133)
im = ax3.contourf(X, Y, vol_results['spline'], levels=20, cmap='RdYlBu_r')
ax3.scatter(strikes, maturities, implied_vols.ravel(), c='black', s=50, marker='o')
plt.colorbar(im, ax=ax3)
ax3.set_title('波动率微笑表面')
ax3.set_xlabel('Strike')
ax3.set_ylabel('Maturity')
plt.tight_layout()
plt.show()
径向基函数插值(RBF)
from scipy.interpolate import RBFInterpolator, Rbf
def rbf_volatility_interpolation(scattered_points, vols, query_grid):
"""RBF波动率表面插值(不规则数据)"""
# 方法1:RBFInterpolator(推荐,NumPy 1.19+)
rbf_interp = RBFInterpolator(scattered_points, vols, kernel='thin_plate_spline')
vol_rbf = rbf_interp(query_grid)
# 方法2:传统Rbf类
rbf = Rbf(*scattered_points.T, vols, function='thin_plate', smooth=0.1)
vol_rbf_traditional = rbf(*query_grid.T)
return vol_rbf, vol_rbf_traditional
# 不规则波动率数据点
n_points = 50
scattered_strikes = np.random.uniform(80, 120, n_points)
scattered_maturities = np.random.uniform(0.1, 2.0, n_points)
scattered_vols = 0.2 + 0.15 * np.abs(scattered_strikes - 100) / 100 + \
0.05 * np.log(1 + scattered_maturities) + \
0.01 * np.random.randn(n_points)
scattered_points = np.column_stack([scattered_strikes, scattered_maturities])
# 查询网格
query_strikes, query_maturities = np.meshgrid(np.linspace(75, 125, 50),
np.linspace(0.1, 2.5, 30))
query_grid = np.column_stack([query_strikes.ravel(), query_maturities.ravel()])
vol_rbf, vol_traditional = rbf_volatility_interpolation(scattered_points,
scattered_vols, query_grid)
# 重塑为网格
vol_rbf_grid = vol_rbf.reshape(query_maturities.shape)
plt.figure(figsize=(10, 8))
plt.contourf(query_strikes, query_maturities, vol_rbf_grid, levels=20, cmap='RdYlBu_r')
plt.colorbar(label='Implied Volatility')
plt.scatter(scattered_strikes, scattered_maturities, c=scattered_vols,
s=50, cmap='RdYlBu_r', edgecolors='black')
plt.title('RBF波动率表面插值')
plt.xlabel('Strike Price')
plt.ylabel('Time to Maturity')
plt.show()
4. 不规则数据插值
Delaunay三角剖分插值
from scipy.spatial import Delaunay
from scipy.interpolate import LinearNDInterpolator, griddata
def delaunay_interpolation(scattered_points, values, query_points):
"""Delaunay三角剖分插值"""
# Delaunay三角剖分
tri = Delaunay(scattered_points)
# 三角剖分插值
interp = LinearNDInterpolator(tri, values)
interpolated = interp(query_points)
# 备用:griddata
grid_interp = griddata(scattered_points, values, query_points,
method='linear', fill_value=np.nan)
return interpolated, grid_interp, tri
# 模拟不规则金融数据(多维)
n_points = 100
points = np.random.rand(n_points, 2) * 10 # 2D空间
values = np.sin(points[:, 0]) * np.cos(points[:, 1]) + 0.1 * np.random.randn(n_points)
# 查询网格
xi, yi = np.mgrid[0:10:50j, 0:10:50j]
query_points = np.column_stack([xi.ravel(), yi.ravel()])
interp_values, grid_values, triangulation = delaunay_interpolation(points, values, query_points)
# 可视化
plt.figure(figsize=(12, 5))
plt.subplot(121)
plt.triplot(triangulation.simplices[:, 0], triangulation.simplices[:, 1], 'b-')
plt.scatter(points[:, 0], points[:, 1], c=values, s=50, cmap='viridis')
plt.title('Delaunay三角剖分')
plt.colorbar(label='原始值')
plt.subplot(122)
plt.contourf(xi, yi, interp_values.reshape(xi.shape), levels=20, cmap='viridis')
plt.colorbar(label='插值结果')
plt.scatter(points[:, 0], points[:, 1], c=values, s=30, edgecolors='black')
plt.title('三角剖分插值')
plt.tight_layout()
plt.show()
CloughTocher2DInterpolator(二次插值)
from scipy.interpolate import CloughTocher2DInterpolator
def clough_tocher_interpolation(points, values, query_grid):
"""Clough-Tocher二次三角插值(更平滑)"""
# Clough-Tocher插值器
ct_interp = CloughTocher2DInterpolator(points, values)
interpolated = ct_interp(query_grid)
return interpolated
# 使用Clough-Tocher插值
ct_values = clough_tocher_interpolation(points, values, query_points)
ct_grid = ct_values.reshape(xi.shape)
plt.figure(figsize=(10, 6))
plt.contourf(xi, yi, ct_grid, levels=20, cmap='plasma')
plt.colorbar(label='Clough-Tocher插值')
plt.scatter(points[:, 0], points[:, 1], c=values, s=50, edgecolors='white')
plt.title('Clough-Tocher二次插值')
plt.xlabel('X')
plt.ylabel('Y')
plt.show()
5. 高阶插值与正则化
B样条插值(精确控制)
from scipy.interpolate import BSpline, splrep, splev
def bspline_interpolation(x, y, degree=3, periodic=False):
"""B样条插值(可精确控制节点)"""
# 自动生成节点
tck, u = splrep(x, y, s=0, k=degree) # s=0为插值,k=样条阶数
# 手动构造BSpline
n = len(x)
t = tck[0] # 节点向量
c = tck[1] # 系数
k = tck[2] # 阶数
bspline = BSpline(t, c, k)
return bspline, tck
# 示例:高精度曲线拟合
x = np.linspace(0, 10, 20)
y = np.exp(-x/5) * np.sin(2*x) + 0.05 * np.random.randn(20)
bspline, tck = bspline_interpolation(x, y, degree=3)
x_fine = np.linspace(0, 10, 1000)
y_bspline = bspline(x_fine)
y_splev = splev(x_fine, tck)
plt.figure(figsize=(10, 6))
plt.plot(x, y, 'ro', label='数据点')
plt.plot(x_fine, y_bspline, 'b-', label='B样条')
plt.plot(x_fine, y_splev, 'g--', label='splev', alpha=0.8)
plt.legend()
plt.title('B样条插值')
plt.grid(True)
plt.show()
正则化样条(平滑控制)
def regularized_spline_interpolation(x, y, smoothing_factor=None):
"""正则化样条插值(控制过拟合)"""
if smoothing_factor is None:
# 自动选择平滑参数
smoothing_factor = len(x) * (np.std(y) ** 2) * 0.01
# UnivariateSpline带正则化
reg_spline = UnivariateSpline(x, y, s=smoothing_factor)
# 比较不同平滑度
smooth_levels = [0, len(x)*0.1, len(x)*1, len(x)*10]
plt.figure(figsize=(12, 8))
plt.plot(x, y, 'ko', markersize=6, label='原始数据')
for i, s in enumerate(smooth_levels):
spline = UnivariateSpline(x, y, s=s)
plt.plot(x_fine, spline(x_fine), label=f's={s:.1f}',
linestyle=['-', '--', '-.', ':'][i])
plt.legend()
plt.title('正则化样条插值(不同平滑参数)')
plt.xlabel('X')
plt.ylabel('Y')
plt.grid(True)
plt.show()
return reg_spline
# 噪声数据平滑插值
reg_spline = regularized_spline_interpolation(x, y)
6. 金融高级应用
利率期限结构动态插值
class DynamicYieldCurve:
"""动态收益率曲线插值器"""
def __init__(self):
self.curves = {}
self.interpolators = {}
def add_curve(self, date, maturities, yields):
"""添加收益率曲线"""
interp = fit_yield_curve_interpolation(maturities, yields, method='spline')
self.curves[date] = {
'maturities': maturities,
'yields': yields,
'interpolator': interp['spline_func']
}
def interpolate_curve(self, date, target_maturity):
"""插值特定日期的收益率"""
if date in self.curves:
return self.curves[date]['interpolator'](target_maturity)
else:
# 时间插值最近的曲线
dates = sorted(self.curves.keys())
idx = np.searchsorted(dates, date)
if idx == 0:
return self.curves[dates[0]]['interpolator'](target_maturity)
elif idx == len(dates):
return self.curves[dates[-1]]['interpolator'](target_maturity)
else:
# 线性插值两日曲线
date1, date2 = dates[idx-1], dates[idx]
y1 = self.curves[date1]['interpolator'](target_maturity)
y2 = self.curves[date2]['interpolator'](target_maturity)
weight = (date - date1) / (date2 - date1)
return (1 - weight) * y1 + weight * y2
def forward_curve(self, date, maturity_range):
"""远期曲线计算"""
spot_rates = np.array([self.interpolate_curve(date, t) for t in maturity_range])
# 简单远期率近似
forward_rates = np.diff(spot_rates) / np.diff(maturity_range)
return forward_rates
# 示例:动态收益率曲线
dates = pd.date_range('2024-01-01', periods=10, freq='B')
yield_curve = DynamicYieldCurve()
maturities_base = np.array([0.25, 0.5, 1, 2, 5, 10])
for i, date in enumerate(dates):
# 模拟收益率变动
yields = 0.03 + 0.005 * np.sin(i * 0.5) + np.random.randn(len(maturities_base)) * 0.001
yield_curve.add_curve(date, maturities_base, yields)
# 查询2年期收益率时间序列
query_date = pd.date_range('2024-01-01', '2024-01-20', freq='D')
two_year_yields = [yield_curve.interpolate_curve(d, 2.0) for d in query_date]
plt.figure(figsize=(12, 6))
plt.plot(query_date, two_year_yields)
plt.title('2年期收益率时间序列(插值)')
plt.ylabel('收益率')
plt.xticks(rotation=45)
plt.grid(True)
plt.show()
路径依赖期权定价插值
def barrier_option_pricing_interpolation(spot, strike, barrier, maturity,
vol_surface_func):
"""障碍期权定价(波动率表面插值)"""
# 插值当前波动率
current_vol = vol_surface_func(strike, maturity)
# 简化定价(实际使用蒙特卡洛或有限差分)
# 这里仅演示插值调用
time_steps = 252
dt = maturity / time_steps
# 路径模拟(简化)
paths = np.zeros(time_steps)
paths[0] = spot
for t in range(1, time_steps):
# 插值瞬时波动率
time_to_mat = maturity - t * dt
step_vol = vol_surface_func(spot, time_to_mat)
dW = np.random.randn() * np.sqrt(dt)
paths[t] = paths[t-1] * np.exp((np.log(1.05)-0.5*step_vol**2)*dt +
step_vol * dW)
# 障碍检查
knocked_out = np.any(paths <= barrier)
payoff = np.maximum(paths[-1] - strike, 0) * (1 - knocked_out)
return payoff, current_vol
# 批量定价
strikes = np.linspace(90, 110, 10)
results = []
for K in strikes:
payoff, vol = barrier_option_pricing_interpolation(100, K, 95, 1.0,
vol_results['spline'].evalfunc)
results.append({'strike': K, 'payoff': payoff, 'vol': vol})
df_results = pd.DataFrame(results)
print(df_results)
7. 性能优化与大规模插值
向量化插值
def vectorized_interpolation(interp_func, query_points):
"""向量化批量插值"""
# 分批处理避免内存问题
batch_size = 10000
n_queries = len(query_points)
results = np.zeros(n_queries)
for i in range(0, n_queries, batch_size):
batch = query_points[i:i+batch_size]
results[i:i+batch_size] = interp_func(batch)
return results
# 大规模时间序列插值
n_assets = 1000
n_days = 252
trade_days = np.random.choice(252, size=(n_assets, int(0.8*252)), replace=False)
trade_returns = np.random.randn(n_assets, int(0.8*252)) * 0.01
# 为每个资产构建插值器
interpolators = []
for i in range(n_assets):
sorted_idx = np.argsort(trade_days[i])
interp = interp1d(trade_days[i][sorted_idx], trade_returns[i][sorted_idx],
kind='linear', bounds_error=False, fill_value=0)
interpolators.append(interp)
# 向量化填充完整日历
full_returns = np.zeros((n_assets, n_days))
for i, interp in enumerate(interpolators):
full_returns[i] = vectorized_interpolation(interp, np.arange(n_days))
print(f"完整回报矩阵: {full_returns.shape}")
print(f"缺失数据填补率: {np.isnan(full_returns).sum() / full_returns.size * 100:.1f}%")
内存高效插值
class MemoryEfficientInterpolator:
"""内存高效插值器"""
def __init__(self, x, y, method='linear', chunk_size=10000):
self.chunk_size = chunk_size
self.interp = interp1d(x, y, kind=method,
bounds_error=False, fill_value='extrapolate')
def __call__(self, query_points):
if len(query_points) <= self.chunk_size:
return self.interp(query_points)
else:
# 分块处理
results = []
for i in range(0, len(query_points), self.chunk_size):
chunk = query_points[i:i+self.chunk_size]
results.append(self.interp(chunk))
return np.concatenate(results)
# 使用示例
large_interp = MemoryEfficientInterpolator(t, y, chunk_size=1000)
large_query = np.linspace(0, 10, 100000)
result = large_interp(large_query)
8. 插值质量评估
交叉验证与插值评估
def cross_validate_interpolation(x, y, method='cubic', n_folds=5):
"""插值交叉验证"""
from sklearn.model_selection import KFold
kf = KFold(n_splits=n_folds, shuffle=True, random_state=42)
scores = []
for train_idx, test_idx in kf.split(x):
x_train, x_test = x[train_idx], x[test_idx]
y_train, y_test = y[train_idx], y[test_idx]
# 训练插值器
if method == 'linear':
interp = interp1d(x_train, y_train, kind='linear')
else:
interp = interp1d(x_train, y_train, kind='cubic')
# 测试
y_pred = interp(x_test)
rmse = np.sqrt(np.mean((y_pred - y_test)**2))
scores.append(rmse)
return np.mean(scores), np.std(scores)
# 评估不同方法
methods = ['linear', 'cubic', 'spline']
results = {}
for method in methods:
mean_error, std_error = cross_validate_interpolation(t, y, method)
results[method] = {'mean_rmse': mean_error, 'std_rmse': std_error}
print("插值方法交叉验证结果:")
for method, res in results.items():
print(f"{method:8s}: RMSE = {res['mean_rmse']:.6f} ± {res['std_rmse']:.6f}")
9. 完整金融插值工作流
class FinancialInterpolator:
"""金融数据插值工作流"""
def __init__(self):
self.interpolators = {}
self.metadata = {}
def fit_time_series(self, dates, values, asset_id, method='spline'):
"""拟合时间序列插值器"""
# 转换为数值时间
t_numeric = (dates - dates.min()).days
self.interpolators[asset_id] = interp1d(t_numeric, values,
kind=method,
bounds_error=False)
self.metadata[asset_id] = {
'dates': dates,
'method': method,
'n_points': len(values)
}
def get_complete_series(self, start_date, end_date, freq='D', assets=None):
"""获取完整时间序列"""
if assets is None:
assets = list(self.interpolators.keys())
complete_dates = pd.date_range(start_date, end_date, freq=freq)
t_target = (complete_dates - complete_dates[0]).days
results = {}
for asset in assets:
interp = self.interpolators[asset]
interpolated = interp(t_target)
results[asset] = pd.Series(interpolated, index=complete_dates)
return pd.DataFrame(results)
def validate_interpolation(self, asset_id, test_ratio=0.2):
"""插值质量验证"""
dates = self.metadata[asset_id]['dates']
values = self.interpolators[asset_id]( (dates - dates[0]).days )
# 简单验证:与原始数据比较
original_values = pd.Series(values, index=dates).dropna()
rmse = np.sqrt(np.mean((original_values - values)**2))
return {'rmse': rmse, 'n_valid': len(original_values)}
# 使用示例
financial_interp = FinancialInterpolator()
# 模拟多资产数据
assets = ['AAPL', 'GOOG', 'MSFT']
for asset in assets:
dates = pd.date_range('2024-01-01', '2024-06-30', freq='B')
# 随机缺失20%数据
mask = np.random.choice([True, False], len(dates), p=[0.8, 0.2])
sparse_dates = dates[mask]
sparse_returns = np.random.randn(len(sparse_dates)) * 0.01
financial_interp.fit_time_series(sparse_dates, sparse_returns, asset)
# 获取完整数据
complete_data = financial_interp.get_complete_series('2024-01-01', '2024-12-31',
assets=assets)
# 验证质量
for asset in assets:
validation = financial_interp.validate_interpolation(asset)
print(f"{asset}: RMSE = {validation['rmse']:.6f}")
print("\n完整数据形状:", complete_data.shape)
print(complete_data.head())
SciPy插值工具在量化金融中应用广泛,从收益率曲线构建、波动率表面插值到缺失数据填补,都提供了高效且精确的解决方案。通过合理选择插值方法和参数,可以平衡计算效率与拟合精度。需要特定金融插值算法优化或与机器学习模型集成,请告诉我具体需求!