SciPy 空间数据
SciPy 空间数据处理:量化金融地理空间分析
scipy.spatial
模块提供了强大的空间数据结构和算法,支持KD树、Voronoi图、Delaunay三角剖分等,是处理地理金融数据、高频交易位置分析、市场空间分布等场景的核心工具。
1. 核心空间数据结构
KDTree与cKDTree(最近邻搜索)
from scipy.spatial import KDTree, cKDTree
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
# 金融地理数据:交易员/分支机构位置
np.random.seed(42)
n_points = 1000
locations = np.random.uniform(-180, 180, (n_points, 2)) # 经纬度
locations[:, 0] = np.clip(locations[:, 0], -180, 180) # 经度
locations[:, 1] = np.clip(locations[:, 1], -90, 90) # 纬度
# 交易量/重要性权重
volumes = np.random.exponential(1000, n_points)
branches = pd.DataFrame({
'longitude': locations[:, 0],
'latitude': locations[:, 1],
'volume': volumes,
'branch_id': range(n_points)
})
print("分支机构地理分布:")
print(branches.head())
# 构建KD树(cKDTree更快,支持查询)
tree = cKDTree(locations)
# 或者标准KDTree
# tree = KDTree(locations)
# 查询每个点的最近K个邻居
distances, indices = tree.query(locations, k=6) # 自身+5个最近邻居
print(f"平均最近5邻距离: {distances[:, -1].mean():.4f} 度")
最近邻在金融中的应用:竞争分析
def spatial_competition_analysis(tree, locations, volumes, radius=1.0):
"""空间竞争分析:识别地理集中区域"""
# 范围查询:半径内竞争对手
indices_within_radius = tree.query_ball_point(locations, r=radius)
competition_scores = []
for i, neighbors in enumerate(indices_within_radius):
if len(neighbors) > 1: # 排除自身
local_volume = volumes[neighbors].sum()
local_market_share = volumes[i] / local_volume
competition_intensity = len(neighbors) - 1
competition_scores.append({
'branch_id': i,
'n_competitors': competition_intensity,
'market_share': local_market_share,
'total_local_volume': local_volume
})
return pd.DataFrame(competition_scores)
# 分析1度范围内的竞争
competition = spatial_competition_analysis(tree, locations, volumes, radius=1.0)
high_competition = competition[competition['n_competitors'] > 5]
print(f"高竞争区域分支: {len(high_competition)}")
print(high_competition.head())
距离计算与度量
from scipy.spatial.distance import cdist, pdist, squareform, haversine
def haversine_distance(lat_lon1, lat_lon2):
"""地理距离计算(球面距离)"""
# 转换为弧度
lat1, lon1 = np.radians(lat_lon1[:, 0]), np.radians(lat_lon1[:, 1])
lat2, lon2 = np.radians(lat_lon2[:, 0]), np.radians(lat_lon2[:, 1])
# Haversine公式
dlat, dlon = lat2 - lat1, lon2 - lon1
a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
c = 2 * np.arcsin(np.sqrt(a))
# 地球半径(千米)
R = 6371.0
return c * R
# 成对距离矩阵
geo_distances = haversine_distance(locations, locations)
print(f"最大地理距离: {geo_distances.max()/1000:.1f} 千米")
print(f"平均距离: {geo_distances.mean()/1000:.1f} 千米")
# 不同距离度量对比
dist_euclidean = cdist(locations, locations, metric='euclidean')
dist_haversine = cdist(locations, locations, metric='haversine') / 1000 # 千米
print(f"欧氏 vs 地理距离相关性: {np.corrcoef(dist_euclidean.flatten(), dist_haversine.flatten())[0,1]:.3f}")
2. Voronoi图与市场分割
Voronoi市场区域划分
from scipy.spatial import Voronoi, voronoi_plot_2d
import matplotlib.pyplot as plt
def voronoi_market_segmentation(locations, volumes):
"""Voronoi图市场分割"""
# 加权Voronoi(按交易量)
vor = Voronoi(locations)
# 计算每个Voronoi单元的市场份额
region_volumes = []
for region in vor.regions:
if not region or -1 in region: # 跳过无限区域
continue
# 找到对应点的交易量
region_points = [vor.point_region[i] for i in region if i < len(vor.point_region)]
total_volume = sum(volumes[point_idx] for point_idx in region_points)
region_volumes.append(total_volume)
return vor, region_volumes
# 生成Voronoi图
vor, region_vols = voronoi_market_segmentation(locations, volumes)
# 可视化
plt.figure(figsize=(12, 10))
voronoi_plot_2d(vor, ax=plt.gca(), show_vertices=False, line_colors='gray',
line_width=1, line_alpha=0.6)
# 标记分支机构(按交易量着色)
scatter = plt.scatter(locations[:, 0], locations[:, 1], c=volumes,
cmap='Reds', s=50, alpha=0.8, edgecolors='black')
plt.colorbar(scatter, label='交易量')
plt.title('Voronoi市场分割(按交易量着色)')
plt.xlabel('经度')
plt.ylabel('纬度')
plt.grid(True, alpha=0.3)
plt.show()
print(f"平均市场区域交易量: {np.mean(region_vols):.0f}")
Delaunay三角剖分(邻接关系)
from scipy.spatial import Delaunay
def delaunay_network(locations):
"""Delaunay三角剖分构建空间网络"""
# 三角剖分
tri = Delaunay(locations)
# 提取边(邻接关系)
edges = set()
for simplex in tri.simplices:
for i in range(3):
edge = tuple(sorted([simplex[i], simplex[(i+1)%3]]))
edges.add(edge)
edges_list = list(edges)
print(f"Delaunay网络: {len(edges_list)}条边")
# 计算边权重(距离)
edge_distances = []
for i, j in edges_list:
dist = haversine_distance(locations[[i]], locations[[j]])[0, 0]
edge_distances.append(dist)
return edges_list, np.array(edge_distances)
edges, distances = delaunay_network(locations)
# 构建空间邻接矩阵
n = len(locations)
row_indices, col_indices = zip(*edges)
adj_matrix = np.zeros((n, n))
adj_matrix[row_indices, col_indices] = 1
adj_matrix += adj_matrix.T # 无向图
sparse_adj = sparse.csr_matrix(adj_matrix)
print(f"空间网络稀疏度: {sparse_adj.nnz / (n*n) * 100:.2f}%")
3. 空间自相关与Moran’s I
空间统计分析
from scipy.spatial.distance import cdist
from scipy.stats import pearsonr
def spatial_autocorrelation(locations, values, method='queen'):
"""空间自相关分析(Moran's I)"""
# 构建空间权重矩阵
distances = cdist(locations, locations)
if method == 'queen':
# Queen邻接(共享边或顶点)
bandwidth = np.percentile(distances, 10) # 10%分位数
W = (distances < bandwidth).astype(float)
else:
# K最近邻
tree = cKDTree(locations)
_, indices = tree.query(locations, k=5)
W = np.zeros_like(distances)
for i, neighbors in enumerate(indices):
W[i, neighbors] = 1
# 标准化权重矩阵(行标准化)
W_row_sum = W.sum(axis=1, keepdims=True)
W_normalized = W / W_row_sum
W_sparse = sparse.csr_matrix(W_normalized)
# Moran's I计算
n = len(values)
values_centered = values - values.mean()
global_moran = (n * (values_centered @ W_sparse @ values_centered)) / \
(values_centered @ values_centered * W_sparse.sum())
# 局部Moran's I
local_moran = n * (values_centered * (W_sparse @ values_centered)) / \
(values_centered @ values_centered)
return {
'global_moran_I': global_moran,
'local_moran_I': local_moran,
'spatial_weights': W_sparse,
'p_value': None # 需要置换检验
}
# 空间自相关分析:交易量分布
spatial_stats = spatial_autocorrelation(locations, np.log1p(volumes))
print(f"全局Moran's I: {spatial_stats['global_moran_I']:.4f}")
print(f"空间聚类显著性: {'强' if spatial_stats['global_moran_I'] > 0.3 else '弱'}")
# 识别空间热点
hotspots = np.where(spatial_stats['local_moran_I'] > spatial_stats['local_moran_I'].quantile(0.9))[0]
print(f"空间热点区域: {len(hotspots)}个分支")
4. 空间插值与预测
Kriging(高斯过程插值)
from scipy.spatial.distance import pdist, squareform
from scipy.linalg import solve
def simple_kriging(locations_known, values_known, locations_predict, nugget=1e-6):
"""简单Kriging空间插值"""
def gaussian_covariance(dist, sill=1.0, range_param=1000):
"""高斯协方差函数"""
return sill * np.exp(-dist**2 / (2 * range_param**2))
# 已知点距离矩阵
dist_known = squareform(pdist(locations_known * 111, 'euclidean')) # 度转千米
cov_known = gaussian_covariance(dist_known)
np.fill_diagonal(cov_known, cov_known.max() * (1 - nugget)) # 块金效应
# 预测点与已知点距离
dist_predict = cdist(locations_predict * 111, locations_known * 111, 'euclidean')
cov_predict_known = gaussian_covariance(dist_predict)
# Kriging权重
weights = solve(cov_known, cov_predict_known.T, sym_pos=True)
# 预测值
predictions = weights.T @ values_known
# 预测方差
cov_predict = gaussian_covariance(squareform(pdist(locations_predict * 111)))
prediction_var = np.diag(cov_predict) - np.sum(weights * cov_predict_known, axis=1)
return predictions, np.sqrt(np.maximum(prediction_var, 0))
# 插值示例:缺失交易量预测
known_mask = np.random.choice([True, False], n_points, p=[0.8, 0.2])
known_locations = locations[known_mask]
known_volumes = volumes[known_mask]
# 预测所有点(包括已知点验证)
all_predictions, uncertainties = simple_kriging(known_locations, known_volumes,
locations, nugget=1e-5)
print(f"插值R²: {pearsonr(known_volumes, all_predictions[known_mask])[0]:.3f}")
print(f"平均不确定性: {uncertainties.mean():.1f}")
RBF插值(径向基函数)
from scipy.interpolate import RBFInterpolator
def rbf_spatial_interpolation(locations, values, query_points=None):
"""RBF空间插值"""
if query_points is None:
query_points = np.mgrid[-90:90:100j, -180:180:200j].reshape(2, -1).T
# 不同核函数对比
kernels = ['thin_plate_spline', 'gaussian', 'multiquadric']
interpolators = {}
for kernel in kernels:
rbf = RBFInterpolator(locations, values, kernel=kernel)
interpolated = rbf(query_points)
interpolators[kernel] = interpolated.reshape(100, 200)
return interpolators, query_points
# 生成插值网格
interpolators, grid_points = rbf_spatial_interpolation(locations, np.log1p(volumes))
# 可视化插值结果
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
for i, (kernel, interp) in enumerate(interpolators.items()):
im = axes[i].imshow(interp, extent=[-180, 180, -90, 90], origin='lower', cmap='Reds')
axes[i].scatter(locations[:, 0], locations[:, 1], c='black', s=1, alpha=0.5)
axes[i].set_title(f'RBF {kernel}')
plt.colorbar(im, ax=axes[i])
plt.tight_layout()
plt.show()
5. 空间聚类与热点检测
DBSCAN空间聚类
from sklearn.cluster import DBSCAN
from scipy.spatial.distance import cdist
def spatial_dbscan_clustering(locations, eps=50, min_samples=5): # eps: 千米
"""DBSCAN空间聚类(地理距离)"""
# 转换为千米距离
coords_km = locations * 111 # 近似转换(纬度)
# DBSCAN聚类
dbscan = DBSCAN(eps=eps, min_samples=min_samples, metric='euclidean')
clusters = dbscan.fit_predict(coords_km)
# 聚类统计
n_clusters = len(set(clusters)) - (1 if -1 in clusters else 0)
cluster_sizes = np.bincount(clusters[clusters >= 0])
return clusters, n_clusters, cluster_sizes
# 检测交易热点
clusters, n_hotspots, sizes = spatial_dbscan_clustering(locations, eps=100, min_samples=3)
print(f"检测到 {n_hotspots} 个交易热点")
print(f"最大热点规模: {sizes.max()} 个分支")
# 可视化聚类
plt.figure(figsize=(10, 8))
scatter = plt.scatter(locations[:, 0], locations[:, 1], c=clusters,
cmap='tab20', s=volumes/10, alpha=0.7)
plt.colorbar(scatter, label='聚类标签')
plt.title('DBSCAN空间聚类(交易热点)')
plt.xlabel('经度')
plt.ylabel('纬度')
plt.show()
Getis-Ord Gi*热点统计
def getis_ord_hotspot(locations, values, bandwidth=100):
"""Getis-Ord Gi*热点分析"""
# 构建自适应带宽权重矩阵
tree = cKDTree(locations * 111)
indices = tree.query_ball_point(locations * 111, r=bandwidth)
W = np.zeros((len(locations), len(locations)))
for i, neighbors in enumerate(indices):
W[i, neighbors] = 1
W[i, i] = 1 # 包含自身
# 行标准化
W = W / W.sum(axis=1, keepdims=True)
# Gi*统计量
values_centered = values - values.mean()
local_sums = W @ values_centered
global_sum = values.sum()
n = len(values)
# 方差计算
W_ones = W @ np.ones(n)
B = W - np.outer(W_ones, np.ones(n)) / n
variance = (global_sum * np.trace(B @ B.T) -
np.sum(values_centered * (B @ values_centered))) / n**2
gi_star = local_sums / np.sqrt(variance)
p_values = 1 - stats.norm.cdf(gi_star)
return gi_star, p_values
# 热点检测
gi_star, pvals = getis_ord_hotspot(locations, volumes, bandwidth=200)
significant_hotspots = np.where((gi_star > 1.96) & (pvals < 0.05))[0]
print(f"显著热点: {len(significant_hotspots)} 个")
6. 金融空间应用案例
高频交易场地优化
def hft_location_optimization(existing_locations, volumes, candidate_sites,
latency_factor=0.1):
"""高频交易场地选址优化"""
# 计算与现有场地的地理延迟
dist_to_existing = haversine_distance(candidate_sites, existing_locations) / 1000 # 千米
latency_penalty = dist_to_existing.mean(axis=1) * latency_factor
# 空间覆盖分析
tree = cKDTree(existing_locations * 111)
coverage_radius = 500 # 千米
coverage_scores = []
for site in candidate_sites:
# 覆盖区域内的交易量
covered_points = tree.query_ball_point(site * 111, coverage_radius)
if covered_points:
covered_volume = volumes[covered_points].sum()
else:
covered_volume = 0
coverage_scores.append(covered_volume)
# 综合评分
total_scores = np.array(coverage_scores) - latency_penalty * 1e6
optimal_site = np.argmax(total_scores)
return {
'optimal_site': candidate_sites[optimal_site],
'coverage_score': coverage_scores[optimal_site],
'latency_penalty': latency_penalty[optimal_site],
'total_score': total_scores[optimal_site]
}
# 新场地候选
candidate_cities = np.array([
[-74.0060, 40.7128], # 纽约
[-118.2437, 34.0522], # 洛杉矶
[-87.6298, 41.8781], # 芝加哥
[-122.4194, 37.7749] # 旧金山
])
hft_optimal = hft_location_optimization(locations, volumes, candidate_cities)
print("最优HFT场地:", hft_optimal)
资产地理多元化
def geographic_diversification_score(locations, weights):
"""地理多元化评分"""
# 构建地理协方差
geo_dist = haversine_distance(locations, locations) / 1000 # 千米
geo_corr = 1 / (1 + geo_dist / 1000) # 距离越远相关性越低
# 地理加权投资组合风险
geo_risk = np.sqrt(weights @ geo_corr @ weights)
# 最大地理分散(均匀分布全球)
max_geo_diversity = np.sqrt(1 / len(weights))
diversity_score = max_geo_diversity / geo_risk
return diversity_score
# 投资组合地理多元化
portfolio_weights = np.random.dirichlet(np.ones(100))
geo_diversity = geographic_diversification_score(locations[:100], portfolio_weights)
print(f"地理多元化评分: {geo_diversity:.3f}")
print(f"解释: {'良好' if geo_diversity > 0.8 else '需改进'}")
7. 时空分析
时空聚类(ST-DBSCAN)
def st_dbscan(locations, timestamps, volumes, eps_spatial=50, eps_temporal=30):
"""时空DBSCAN聚类"""
# 标准化时间维度
time_normalized = (timestamps - timestamps.min()) / (timestamps.max() - timestamps.min())
# 组合时空坐标
spatio_temporal_coords = np.c_[locations * 111, time_normalized * 1000] # 统一尺度
# ST-DBSCAN
st_dbscan = DBSCAN(eps=np.array([eps_spatial, eps_spatial, eps_temporal]),
min_samples=3, metric='euclidean')
st_clusters = st_dbscan.fit_predict(spatio_temporal_coords)
return st_clusters
# 时空交易模式聚类
timestamps = pd.date_range('2024-01-01', periods=n_points, freq='D').values
st_clusters = st_dbscan(locations, timestamps, volumes)
print(f"时空聚类数量: {len(set(st_clusters)) - (1 if -1 in st_clusters else 0)}")
8. 性能优化与大规模处理
空间索引优化
def efficient_spatial_query(tree, query_points, max_distance=100):
"""高效批量空间查询"""
# 批量范围查询
all_neighbors = tree.query_ball_point(query_points * 111, r=max_distance)
# 并行处理(可选)
from concurrent.futures import ProcessPoolExecutor
def process_neighbors(neighbors_batch):
results = []
for neighbors in neighbors_batch:
if neighbors:
local_volume = volumes[neighbors].sum()
results.append(len(neighbors))
else:
results.append(0)
return results
# 分批处理大规模查询
batch_size = 1000
neighbor_counts = []
for i in range(0, len(all_neighbors), batch_size):
batch = all_neighbors[i:i+batch_size]
batch_results = process_neighbors(batch)
neighbor_counts.extend(batch_results)
return neighbor_counts
# 大规模空间查询
large_query_points = np.random.uniform(-180, 180, (10000, 2))
large_query_points[:, 1] = np.clip(large_query_points[:, 1], -90, 90)
neighbor_stats = efficient_spatial_query(tree, large_query_points)
print(f"平均邻居数: {np.mean(neighbor_stats):.1f}")
9. 空间数据可视化
import cartopy.crs as ccrs
import cartopy.feature as cfeature
def geospatial_visualization(locations, volumes, clusters=None):
"""地理空间可视化(使用Cartopy)"""
fig = plt.figure(figsize=(15, 10))
ax = plt.axes(projection=ccrs.PlateCarree())
ax.add_feature(cfeature.COASTLINE)
ax.add_feature(cfeature.BORDERS, linestyle=':')
ax.add_feature(cfeature.OCEAN, color='lightblue')
ax.add_feature(cfeature.LAND, color='lightgray')
# 散点图
scatter = ax.scatter(locations[:, 0], locations[:, 1],
c=volumes, s=volumes/10, cmap='Reds',
transform=ccrs.PlateCarree(), alpha=0.7,
edgecolors='black', linewidth=0.5)
if clusters is not None:
# 聚类边界
for cluster_id in np.unique(clusters[clusters >= 0]):
cluster_points = locations[clusters == cluster_id]
if len(cluster_points) > 2:
# 凸包
from scipy.spatial import ConvexHull
try:
hull = ConvexHull(cluster_points)
hull_points = cluster_points[hull.vertices]
ax.plot(hull_points[:, 0], hull_points[:, 1],
transform=ccrs.PlateCarree(),
linewidth=2, alpha=0.6)
except:
pass
ax.set_global()
plt.colorbar(scatter, ax=ax, shrink=0.6, label='交易量')
ax.set_title('全球交易网络地理分布')
plt.show()
geospatial_visualization(locations, volumes, clusters)
10. 完整金融空间分析工作流
class FinancialSpatialAnalyzer:
"""金融空间数据分析器"""
def __init__(self, locations, attributes):
self.locations = np.array(locations)
self.attributes = pd.DataFrame(attributes)
self.tree = cKDTree(self.locations)
self.voronoi = Voronoi(self.locations)
def spatial_autocorrelation(self, attr_name):
"""空间自相关分析"""
values = self.attributes[attr_name].values
return spatial_autocorrelation(self.locations, values)
def identify_hotspots(self, attr_name, method='dbscan', **kwargs):
"""热点识别"""
values = self.attributes[attr_name].values
if method == 'dbscan':
clusters, _, _ = spatial_dbscan_clustering(self.locations, **kwargs)
hotspots = np.unique(clusters[clusters >= 0])
return hotspots
def optimal_location(self, candidate_sites, weights=None):
"""场地选址优化"""
if weights is None:
weights = np.ones(len(self.locations))
return hft_location_optimization(self.locations, weights, candidate_sites)
def geographic_diversification(self, portfolio_weights):
"""地理多元化评估"""
return geographic_diversification_score(self.locations, portfolio_weights)
def visualize(self, attr_name, clusters=None):
"""空间可视化"""
values = self.attributes[attr_name].values
geospatial_visualization(self.locations, values, clusters)
# 使用示例
spatial_analyzer = FinancialSpatialAnalyzer(
locations,
{'volume': volumes, 'revenue': volumes * np.random.uniform(0.8, 1.2, n_points)}
)
# 分析工作流
autocorr = spatial_analyzer.spatial_autocorrelation('volume')
hotspots = spatial_analyzer.identify_hotspots('volume', eps=100)
diversity_score = spatial_analyzer.geographic_diversification(np.random.dirichlet(np.ones(50)))
optimal_site = spatial_analyzer.optimal_location(candidate_cities)
print(f"空间自相关: {autocorr['global_moran_I']:.3f}")
print(f"热点数量: {len(hotspots)}")
print(f"多元化评分: {diversity_score:.3f}")
print("最优选址:", optimal_site['optimal_site'])
SciPy空间数据工具在金融地理分析中应用广泛,从分支机构选址、风险空间集中度评估,到高频交易延迟优化和市场热点识别,都能提供高效的计算支持。结合GIS数据和实时位置信息,可以构建完整的空间金融分析系统。需要特定空间算法优化或与GIS平台的集成,请告诉我具体需求!