SciPy 空间数据

SciPy 空间数据处理:量化金融地理空间分析

scipy.spatial 模块提供了强大的空间数据结构和算法,支持KD树、Voronoi图、Delaunay三角剖分等,是处理地理金融数据、高频交易位置分析、市场空间分布等场景的核心工具。

1. 核心空间数据结构

KDTree与cKDTree(最近邻搜索)
from scipy.spatial import KDTree, cKDTree
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler

# 金融地理数据:交易员/分支机构位置
np.random.seed(42)
n_points = 1000
locations = np.random.uniform(-180, 180, (n_points, 2))  # 经纬度
locations[:, 0] = np.clip(locations[:, 0], -180, 180)  # 经度
locations[:, 1] = np.clip(locations[:, 1], -90, 90)    # 纬度

# 交易量/重要性权重
volumes = np.random.exponential(1000, n_points)
branches = pd.DataFrame({
    'longitude': locations[:, 0],
    'latitude': locations[:, 1],
    'volume': volumes,
    'branch_id': range(n_points)
})

print("分支机构地理分布:")
print(branches.head())

# 构建KD树(cKDTree更快,支持查询)
tree = cKDTree(locations)
# 或者标准KDTree
# tree = KDTree(locations)

# 查询每个点的最近K个邻居
distances, indices = tree.query(locations, k=6)  # 自身+5个最近邻居
print(f"平均最近5邻距离: {distances[:, -1].mean():.4f} 度")

最近邻在金融中的应用:竞争分析

def spatial_competition_analysis(tree, locations, volumes, radius=1.0):
    """空间竞争分析:识别地理集中区域"""

    # 范围查询:半径内竞争对手
    indices_within_radius = tree.query_ball_point(locations, r=radius)

    competition_scores = []
    for i, neighbors in enumerate(indices_within_radius):
        if len(neighbors) > 1:  # 排除自身
            local_volume = volumes[neighbors].sum()
            local_market_share = volumes[i] / local_volume
            competition_intensity = len(neighbors) - 1

            competition_scores.append({
                'branch_id': i,
                'n_competitors': competition_intensity,
                'market_share': local_market_share,
                'total_local_volume': local_volume
            })

    return pd.DataFrame(competition_scores)

# 分析1度范围内的竞争
competition = spatial_competition_analysis(tree, locations, volumes, radius=1.0)
high_competition = competition[competition['n_competitors'] > 5]
print(f"高竞争区域分支: {len(high_competition)}")
print(high_competition.head())
距离计算与度量
from scipy.spatial.distance import cdist, pdist, squareform, haversine

def haversine_distance(lat_lon1, lat_lon2):
    """地理距离计算(球面距离)"""
    # 转换为弧度
    lat1, lon1 = np.radians(lat_lon1[:, 0]), np.radians(lat_lon1[:, 1])
    lat2, lon2 = np.radians(lat_lon2[:, 0]), np.radians(lat_lon2[:, 1])

    # Haversine公式
    dlat, dlon = lat2 - lat1, lon2 - lon1
    a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
    c = 2 * np.arcsin(np.sqrt(a))

    # 地球半径(千米)
    R = 6371.0
    return c * R

# 成对距离矩阵
geo_distances = haversine_distance(locations, locations)
print(f"最大地理距离: {geo_distances.max()/1000:.1f} 千米")
print(f"平均距离: {geo_distances.mean()/1000:.1f} 千米")

# 不同距离度量对比
dist_euclidean = cdist(locations, locations, metric='euclidean')
dist_haversine = cdist(locations, locations, metric='haversine') / 1000  # 千米
print(f"欧氏 vs 地理距离相关性: {np.corrcoef(dist_euclidean.flatten(), dist_haversine.flatten())[0,1]:.3f}")

2. Voronoi图与市场分割

Voronoi市场区域划分
from scipy.spatial import Voronoi, voronoi_plot_2d
import matplotlib.pyplot as plt

def voronoi_market_segmentation(locations, volumes):
    """Voronoi图市场分割"""

    # 加权Voronoi(按交易量)
    vor = Voronoi(locations)

    # 计算每个Voronoi单元的市场份额
    region_volumes = []
    for region in vor.regions:
        if not region or -1 in region:  # 跳过无限区域
            continue
        # 找到对应点的交易量
        region_points = [vor.point_region[i] for i in region if i < len(vor.point_region)]
        total_volume = sum(volumes[point_idx] for point_idx in region_points)
        region_volumes.append(total_volume)

    return vor, region_volumes

# 生成Voronoi图
vor, region_vols = voronoi_market_segmentation(locations, volumes)

# 可视化
plt.figure(figsize=(12, 10))
voronoi_plot_2d(vor, ax=plt.gca(), show_vertices=False, line_colors='gray', 
                line_width=1, line_alpha=0.6)

# 标记分支机构(按交易量着色)
scatter = plt.scatter(locations[:, 0], locations[:, 1], c=volumes, 
                     cmap='Reds', s=50, alpha=0.8, edgecolors='black')
plt.colorbar(scatter, label='交易量')
plt.title('Voronoi市场分割(按交易量着色)')
plt.xlabel('经度')
plt.ylabel('纬度')
plt.grid(True, alpha=0.3)
plt.show()

print(f"平均市场区域交易量: {np.mean(region_vols):.0f}")

Delaunay三角剖分(邻接关系)

from scipy.spatial import Delaunay

def delaunay_network(locations):
    """Delaunay三角剖分构建空间网络"""

    # 三角剖分
    tri = Delaunay(locations)

    # 提取边(邻接关系)
    edges = set()
    for simplex in tri.simplices:
        for i in range(3):
            edge = tuple(sorted([simplex[i], simplex[(i+1)%3]]))
            edges.add(edge)

    edges_list = list(edges)
    print(f"Delaunay网络: {len(edges_list)}条边")

    # 计算边权重(距离)
    edge_distances = []
    for i, j in edges_list:
        dist = haversine_distance(locations[[i]], locations[[j]])[0, 0]
        edge_distances.append(dist)

    return edges_list, np.array(edge_distances)

edges, distances = delaunay_network(locations)

# 构建空间邻接矩阵
n = len(locations)
row_indices, col_indices = zip(*edges)
adj_matrix = np.zeros((n, n))
adj_matrix[row_indices, col_indices] = 1
adj_matrix += adj_matrix.T  # 无向图
sparse_adj = sparse.csr_matrix(adj_matrix)
print(f"空间网络稀疏度: {sparse_adj.nnz / (n*n) * 100:.2f}%")

3. 空间自相关与Moran’s I

空间统计分析
from scipy.spatial.distance import cdist
from scipy.stats import pearsonr

def spatial_autocorrelation(locations, values, method='queen'):
    """空间自相关分析(Moran's I)"""

    # 构建空间权重矩阵
    distances = cdist(locations, locations)
    if method == 'queen':
        # Queen邻接(共享边或顶点)
        bandwidth = np.percentile(distances, 10)  # 10%分位数
        W = (distances < bandwidth).astype(float)
    else:
        # K最近邻
        tree = cKDTree(locations)
        _, indices = tree.query(locations, k=5)
        W = np.zeros_like(distances)
        for i, neighbors in enumerate(indices):
            W[i, neighbors] = 1

    # 标准化权重矩阵(行标准化)
    W_row_sum = W.sum(axis=1, keepdims=True)
    W_normalized = W / W_row_sum
    W_sparse = sparse.csr_matrix(W_normalized)

    # Moran's I计算
    n = len(values)
    values_centered = values - values.mean()
    global_moran = (n * (values_centered @ W_sparse @ values_centered)) / \
                   (values_centered @ values_centered * W_sparse.sum())

    # 局部Moran's I
    local_moran = n * (values_centered * (W_sparse @ values_centered)) / \
                  (values_centered @ values_centered)

    return {
        'global_moran_I': global_moran,
        'local_moran_I': local_moran,
        'spatial_weights': W_sparse,
        'p_value': None  # 需要置换检验
    }

# 空间自相关分析:交易量分布
spatial_stats = spatial_autocorrelation(locations, np.log1p(volumes))
print(f"全局Moran's I: {spatial_stats['global_moran_I']:.4f}")
print(f"空间聚类显著性: {'强' if spatial_stats['global_moran_I'] > 0.3 else '弱'}")

# 识别空间热点
hotspots = np.where(spatial_stats['local_moran_I'] > spatial_stats['local_moran_I'].quantile(0.9))[0]
print(f"空间热点区域: {len(hotspots)}个分支")

4. 空间插值与预测

Kriging(高斯过程插值)
from scipy.spatial.distance import pdist, squareform
from scipy.linalg import solve

def simple_kriging(locations_known, values_known, locations_predict, nugget=1e-6):
    """简单Kriging空间插值"""

    def gaussian_covariance(dist, sill=1.0, range_param=1000):
        """高斯协方差函数"""
        return sill * np.exp(-dist**2 / (2 * range_param**2))

    # 已知点距离矩阵
    dist_known = squareform(pdist(locations_known * 111, 'euclidean'))  # 度转千米
    cov_known = gaussian_covariance(dist_known)
    np.fill_diagonal(cov_known, cov_known.max() * (1 - nugget))  # 块金效应

    # 预测点与已知点距离
    dist_predict = cdist(locations_predict * 111, locations_known * 111, 'euclidean')
    cov_predict_known = gaussian_covariance(dist_predict)

    # Kriging权重
    weights = solve(cov_known, cov_predict_known.T, sym_pos=True)

    # 预测值
    predictions = weights.T @ values_known

    # 预测方差
    cov_predict = gaussian_covariance(squareform(pdist(locations_predict * 111)))
    prediction_var = np.diag(cov_predict) - np.sum(weights * cov_predict_known, axis=1)

    return predictions, np.sqrt(np.maximum(prediction_var, 0))

# 插值示例:缺失交易量预测
known_mask = np.random.choice([True, False], n_points, p=[0.8, 0.2])
known_locations = locations[known_mask]
known_volumes = volumes[known_mask]

# 预测所有点(包括已知点验证)
all_predictions, uncertainties = simple_kriging(known_locations, known_volumes, 
                                               locations, nugget=1e-5)

print(f"插值R²: {pearsonr(known_volumes, all_predictions[known_mask])[0]:.3f}")
print(f"平均不确定性: {uncertainties.mean():.1f}")
RBF插值(径向基函数)
from scipy.interpolate import RBFInterpolator

def rbf_spatial_interpolation(locations, values, query_points=None):
    """RBF空间插值"""

    if query_points is None:
        query_points = np.mgrid[-90:90:100j, -180:180:200j].reshape(2, -1).T

    # 不同核函数对比
    kernels = ['thin_plate_spline', 'gaussian', 'multiquadric']
    interpolators = {}

    for kernel in kernels:
        rbf = RBFInterpolator(locations, values, kernel=kernel)
        interpolated = rbf(query_points)
        interpolators[kernel] = interpolated.reshape(100, 200)

    return interpolators, query_points

# 生成插值网格
interpolators, grid_points = rbf_spatial_interpolation(locations, np.log1p(volumes))

# 可视化插值结果
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
for i, (kernel, interp) in enumerate(interpolators.items()):
    im = axes[i].imshow(interp, extent=[-180, 180, -90, 90], origin='lower', cmap='Reds')
    axes[i].scatter(locations[:, 0], locations[:, 1], c='black', s=1, alpha=0.5)
    axes[i].set_title(f'RBF {kernel}')
    plt.colorbar(im, ax=axes[i])
plt.tight_layout()
plt.show()

5. 空间聚类与热点检测

DBSCAN空间聚类
from sklearn.cluster import DBSCAN
from scipy.spatial.distance import cdist

def spatial_dbscan_clustering(locations, eps=50, min_samples=5):  # eps: 千米
    """DBSCAN空间聚类(地理距离)"""

    # 转换为千米距离
    coords_km = locations * 111  # 近似转换(纬度)

    # DBSCAN聚类
    dbscan = DBSCAN(eps=eps, min_samples=min_samples, metric='euclidean')
    clusters = dbscan.fit_predict(coords_km)

    # 聚类统计
    n_clusters = len(set(clusters)) - (1 if -1 in clusters else 0)
    cluster_sizes = np.bincount(clusters[clusters >= 0])

    return clusters, n_clusters, cluster_sizes

# 检测交易热点
clusters, n_hotspots, sizes = spatial_dbscan_clustering(locations, eps=100, min_samples=3)
print(f"检测到 {n_hotspots} 个交易热点")
print(f"最大热点规模: {sizes.max()} 个分支")

# 可视化聚类
plt.figure(figsize=(10, 8))
scatter = plt.scatter(locations[:, 0], locations[:, 1], c=clusters, 
                     cmap='tab20', s=volumes/10, alpha=0.7)
plt.colorbar(scatter, label='聚类标签')
plt.title('DBSCAN空间聚类(交易热点)')
plt.xlabel('经度')
plt.ylabel('纬度')
plt.show()
Getis-Ord Gi*热点统计
def getis_ord_hotspot(locations, values, bandwidth=100):
    """Getis-Ord Gi*热点分析"""

    # 构建自适应带宽权重矩阵
    tree = cKDTree(locations * 111)
    indices = tree.query_ball_point(locations * 111, r=bandwidth)

    W = np.zeros((len(locations), len(locations)))
    for i, neighbors in enumerate(indices):
        W[i, neighbors] = 1
        W[i, i] = 1  # 包含自身

    # 行标准化
    W = W / W.sum(axis=1, keepdims=True)

    # Gi*统计量
    values_centered = values - values.mean()
    local_sums = W @ values_centered
    global_sum = values.sum()
    n = len(values)

    # 方差计算
    W_ones = W @ np.ones(n)
    B = W - np.outer(W_ones, np.ones(n)) / n
    variance = (global_sum * np.trace(B @ B.T) - 
                np.sum(values_centered * (B @ values_centered))) / n**2

    gi_star = local_sums / np.sqrt(variance)
    p_values = 1 - stats.norm.cdf(gi_star)

    return gi_star, p_values

# 热点检测
gi_star, pvals = getis_ord_hotspot(locations, volumes, bandwidth=200)
significant_hotspots = np.where((gi_star > 1.96) & (pvals < 0.05))[0]
print(f"显著热点: {len(significant_hotspots)} 个")

6. 金融空间应用案例

高频交易场地优化
def hft_location_optimization(existing_locations, volumes, candidate_sites, 
                             latency_factor=0.1):
    """高频交易场地选址优化"""

    # 计算与现有场地的地理延迟
    dist_to_existing = haversine_distance(candidate_sites, existing_locations) / 1000  # 千米
    latency_penalty = dist_to_existing.mean(axis=1) * latency_factor

    # 空间覆盖分析
    tree = cKDTree(existing_locations * 111)
    coverage_radius = 500  # 千米
    coverage_scores = []

    for site in candidate_sites:
        # 覆盖区域内的交易量
        covered_points = tree.query_ball_point(site * 111, coverage_radius)
        if covered_points:
            covered_volume = volumes[covered_points].sum()
        else:
            covered_volume = 0

        coverage_scores.append(covered_volume)

    # 综合评分
    total_scores = np.array(coverage_scores) - latency_penalty * 1e6
    optimal_site = np.argmax(total_scores)

    return {
        'optimal_site': candidate_sites[optimal_site],
        'coverage_score': coverage_scores[optimal_site],
        'latency_penalty': latency_penalty[optimal_site],
        'total_score': total_scores[optimal_site]
    }

# 新场地候选
candidate_cities = np.array([
    [-74.0060, 40.7128],  # 纽约
    [-118.2437, 34.0522], # 洛杉矶
    [-87.6298, 41.8781],  # 芝加哥
    [-122.4194, 37.7749]  # 旧金山
])

hft_optimal = hft_location_optimization(locations, volumes, candidate_cities)
print("最优HFT场地:", hft_optimal)
资产地理多元化
def geographic_diversification_score(locations, weights):
    """地理多元化评分"""

    # 构建地理协方差
    geo_dist = haversine_distance(locations, locations) / 1000  # 千米
    geo_corr = 1 / (1 + geo_dist / 1000)  # 距离越远相关性越低

    # 地理加权投资组合风险
    geo_risk = np.sqrt(weights @ geo_corr @ weights)

    # 最大地理分散(均匀分布全球)
    max_geo_diversity = np.sqrt(1 / len(weights))

    diversity_score = max_geo_diversity / geo_risk
    return diversity_score

# 投资组合地理多元化
portfolio_weights = np.random.dirichlet(np.ones(100))
geo_diversity = geographic_diversification_score(locations[:100], portfolio_weights)
print(f"地理多元化评分: {geo_diversity:.3f}")
print(f"解释: {'良好' if geo_diversity > 0.8 else '需改进'}")

7. 时空分析

时空聚类(ST-DBSCAN)
def st_dbscan(locations, timestamps, volumes, eps_spatial=50, eps_temporal=30):
    """时空DBSCAN聚类"""

    # 标准化时间维度
    time_normalized = (timestamps - timestamps.min()) / (timestamps.max() - timestamps.min())

    # 组合时空坐标
    spatio_temporal_coords = np.c_[locations * 111, time_normalized * 1000]  # 统一尺度

    # ST-DBSCAN
    st_dbscan = DBSCAN(eps=np.array([eps_spatial, eps_spatial, eps_temporal]), 
                      min_samples=3, metric='euclidean')
    st_clusters = st_dbscan.fit_predict(spatio_temporal_coords)

    return st_clusters

# 时空交易模式聚类
timestamps = pd.date_range('2024-01-01', periods=n_points, freq='D').values
st_clusters = st_dbscan(locations, timestamps, volumes)
print(f"时空聚类数量: {len(set(st_clusters)) - (1 if -1 in st_clusters else 0)}")

8. 性能优化与大规模处理

空间索引优化
def efficient_spatial_query(tree, query_points, max_distance=100):
    """高效批量空间查询"""

    # 批量范围查询
    all_neighbors = tree.query_ball_point(query_points * 111, r=max_distance)

    # 并行处理(可选)
    from concurrent.futures import ProcessPoolExecutor

    def process_neighbors(neighbors_batch):
        results = []
        for neighbors in neighbors_batch:
            if neighbors:
                local_volume = volumes[neighbors].sum()
                results.append(len(neighbors))
            else:
                results.append(0)
        return results

    # 分批处理大规模查询
    batch_size = 1000
    neighbor_counts = []
    for i in range(0, len(all_neighbors), batch_size):
        batch = all_neighbors[i:i+batch_size]
        batch_results = process_neighbors(batch)
        neighbor_counts.extend(batch_results)

    return neighbor_counts

# 大规模空间查询
large_query_points = np.random.uniform(-180, 180, (10000, 2))
large_query_points[:, 1] = np.clip(large_query_points[:, 1], -90, 90)
neighbor_stats = efficient_spatial_query(tree, large_query_points)
print(f"平均邻居数: {np.mean(neighbor_stats):.1f}")

9. 空间数据可视化

import cartopy.crs as ccrs
import cartopy.feature as cfeature

def geospatial_visualization(locations, volumes, clusters=None):
    """地理空间可视化(使用Cartopy)"""

    fig = plt.figure(figsize=(15, 10))
    ax = plt.axes(projection=ccrs.PlateCarree())
    ax.add_feature(cfeature.COASTLINE)
    ax.add_feature(cfeature.BORDERS, linestyle=':')
    ax.add_feature(cfeature.OCEAN, color='lightblue')
    ax.add_feature(cfeature.LAND, color='lightgray')

    # 散点图
    scatter = ax.scatter(locations[:, 0], locations[:, 1], 
                        c=volumes, s=volumes/10, cmap='Reds', 
                        transform=ccrs.PlateCarree(), alpha=0.7,
                        edgecolors='black', linewidth=0.5)

    if clusters is not None:
        # 聚类边界
        for cluster_id in np.unique(clusters[clusters >= 0]):
            cluster_points = locations[clusters == cluster_id]
            if len(cluster_points) > 2:
                # 凸包
                from scipy.spatial import ConvexHull
                try:
                    hull = ConvexHull(cluster_points)
                    hull_points = cluster_points[hull.vertices]
                    ax.plot(hull_points[:, 0], hull_points[:, 1], 
                           transform=ccrs.PlateCarree(), 
                           linewidth=2, alpha=0.6)
                except:
                    pass

    ax.set_global()
    plt.colorbar(scatter, ax=ax, shrink=0.6, label='交易量')
    ax.set_title('全球交易网络地理分布')
    plt.show()

geospatial_visualization(locations, volumes, clusters)

10. 完整金融空间分析工作流

class FinancialSpatialAnalyzer:
    """金融空间数据分析器"""

    def __init__(self, locations, attributes):
        self.locations = np.array(locations)
        self.attributes = pd.DataFrame(attributes)
        self.tree = cKDTree(self.locations)
        self.voronoi = Voronoi(self.locations)

    def spatial_autocorrelation(self, attr_name):
        """空间自相关分析"""
        values = self.attributes[attr_name].values
        return spatial_autocorrelation(self.locations, values)

    def identify_hotspots(self, attr_name, method='dbscan', **kwargs):
        """热点识别"""
        values = self.attributes[attr_name].values
        if method == 'dbscan':
            clusters, _, _ = spatial_dbscan_clustering(self.locations, **kwargs)
            hotspots = np.unique(clusters[clusters >= 0])
        return hotspots

    def optimal_location(self, candidate_sites, weights=None):
        """场地选址优化"""
        if weights is None:
            weights = np.ones(len(self.locations))
        return hft_location_optimization(self.locations, weights, candidate_sites)

    def geographic_diversification(self, portfolio_weights):
        """地理多元化评估"""
        return geographic_diversification_score(self.locations, portfolio_weights)

    def visualize(self, attr_name, clusters=None):
        """空间可视化"""
        values = self.attributes[attr_name].values
        geospatial_visualization(self.locations, values, clusters)

# 使用示例
spatial_analyzer = FinancialSpatialAnalyzer(
    locations, 
    {'volume': volumes, 'revenue': volumes * np.random.uniform(0.8, 1.2, n_points)}
)

# 分析工作流
autocorr = spatial_analyzer.spatial_autocorrelation('volume')
hotspots = spatial_analyzer.identify_hotspots('volume', eps=100)
diversity_score = spatial_analyzer.geographic_diversification(np.random.dirichlet(np.ones(50)))
optimal_site = spatial_analyzer.optimal_location(candidate_cities)

print(f"空间自相关: {autocorr['global_moran_I']:.3f}")
print(f"热点数量: {len(hotspots)}")
print(f"多元化评分: {diversity_score:.3f}")
print("最优选址:", optimal_site['optimal_site'])

SciPy空间数据工具在金融地理分析中应用广泛,从分支机构选址、风险空间集中度评估,到高频交易延迟优化和市场热点识别,都能提供高效的计算支持。结合GIS数据和实时位置信息,可以构建完整的空间金融分析系统。需要特定空间算法优化或与GIS平台的集成,请告诉我具体需求!

类似文章

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注