Pandas 高级功能
Pandas 高级功能详解
1. 多级索引(MultiIndex)
1.1 创建多级索引
import pandas as pd
import numpy as np
# 从数组创建
arrays = [
['A', 'A', 'A', 'B', 'B', 'B'],
[1, 2, 3, 1, 2, 3]
]
tuples = list(zip(*arrays))
multi_index = pd.MultiIndex.from_tuples(tuples, names=['first', 'second'])
# 创建DataFrame
df_multi = pd.DataFrame(np.random.randn(6, 3),
index=multi_index,
columns=['X', 'Y', 'Z'])
print("多级索引DataFrame:")
print(df_multi)
# 从产品创建
index = pd.MultiIndex.from_product([['A', 'B'], [1, 2]])
df_product = pd.DataFrame(np.arange(4).reshape(2, 2),
index=index,
columns=['data1', 'data2'])
print("\n产品多级索引:")
print(df_product)
1.2 多级索引操作
# 选择操作
print("第一级选择:")
print(df_multi.loc['A'])
print("\n具体组合:")
print(df_multi.loc[('A', 1)])
# 部分选择
print("\n第二级=2的行:")
print(df_multi.xs(2, level='second'))
# 多级切片
print("\nA级别切片:")
print(df_multi.loc['A', :])
# 交换级别
df_swapped = df_multi.swaplevel('first', 'second')
print("\n交换级别:")
print(df_swapped)
# 排序
df_sorted = df_multi.sort_index(level=0)
print("\n按第一级排序:")
print(df_sorted)
1.3 多级索引聚合
# 分组聚合
grouped = df_multi.groupby(level=0).mean()
print("按第一级分组均值:")
print(grouped)
# 多级聚合
multi_agg = df_multi.groupby(level=['first', 'second']).agg(['mean', 'std'])
print("\n多级聚合:")
print(multi_agg)
# 展平多级列
multi_agg.columns = ['_'.join(col).strip() for col in multi_agg.columns]
print("\n展平列名:")
print(multi_agg.head())
2. 窗口函数(Window Functions)
2.1 滚动窗口
# 创建时间序列数据
dates = pd.date_range('2023-01-01', periods=100, freq='D')
ts = pd.Series(np.random.randn(100).cumsum(), index=dates)
# 固定窗口滚动
rolling_mean = ts.rolling(window=7, min_periods=1).mean()
rolling_std = ts.rolling(window=7).std()
# 绘制
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 6))
plt.plot(ts.index, ts, label='原始数据', alpha=0.7)
plt.plot(rolling_mean.index, rolling_mean, label='7日滚动均值', linewidth=2)
plt.fill_between(rolling_std.index,
rolling_mean - rolling_std,
rolling_mean + rolling_std,
alpha=0.2, label='±1标准差')
plt.title('滚动窗口分析')
plt.legend()
plt.show()
2.2 扩展窗口
# 扩展窗口(从开始到当前)
expanding_mean = ts.expanding(min_periods=5).mean()
expanding_std = ts.expanding().std()
plt.figure(figsize=(12, 6))
plt.plot(ts.index, ts, label='原始数据', alpha=0.7)
plt.plot(expanding_mean.index, expanding_mean, label='扩展均值', linewidth=2)
plt.title('扩展窗口分析')
plt.legend()
plt.show()
2.3 指数加权窗口
# 指数加权移动平均(EWMA)
ewma_short = ts.ewm(span=7).mean() # 短期
ewma_long = ts.ewm(span=30).mean() # 长期
ewma_alpha = ts.ewm(alpha=0.1).mean() # 固定alpha
plt.figure(figsize=(12, 6))
plt.plot(ts.index, ts, label='原始数据', alpha=0.7)
plt.plot(ewma_short.index, ewma_short, label='EWMA(7)', linewidth=2)
plt.plot(ewma_long.index, ewma_long, label='EWMA(30)', linewidth=2)
plt.title('指数加权移动平均')
plt.legend()
plt.show()
2.4 自定义窗口
# 自定义窗口函数
def custom_window(series, window):
"""自定义窗口统计"""
return series.rolling(window).apply(
lambda x: np.max(x) - np.min(x), raw=True # 范围
)
range_7 = custom_window(ts, 7)
print("7日范围统计:")
print(range_7.head(10))
# 分组滚动窗口
df_window = pd.DataFrame({
'category': np.random.choice(['A', 'B'], 100),
'value': np.random.randn(100)
}, index=dates)
group_rolling = df_window.groupby('category')['value'].rolling(window=5).mean()
print("\n分组滚动均值:")
print(group_rolling.head(10))
3. 重采样(Resampling)
3.1 时间序列重采样
# 创建分钟级数据
freq_data = pd.DataFrame({
'value': np.random.randn(1440), # 一天1440分钟
}, index=pd.date_range('2023-01-01', periods=1440, freq='T'))
# 上采样(低频→高频)
hourly = freq_data.resample('H').mean()
daily_up = hourly.resample('T').ffill() # 前向填充
# 下采样(高频→低频)
daily = freq_data.resample('D').agg({
'value': ['mean', 'std', 'sum', lambda x: x.quantile(0.95)]
})
# 重命名聚合列
daily.columns = ['daily_mean', 'daily_std', 'daily_sum', 'daily_95th']
print("日度聚合:")
print(daily.head())
3.2 业务时间重采样
# 工作日重采样
business_days = freq_data.resample('B').mean() # 工作日
week_ends = freq_data.resample('W-FRI').last() # 周五结束
# 季度和年度
quarterly = freq_data.resample('Q').agg(['mean', 'sum'])
yearly = freq_data.resample('Y').agg(['mean', 'sum', 'count'])
# 自定义频率
custom_freq = freq_data.resample('2W').mean() # 每两周
3.3 重采样策略
# 不同聚合策略
resample_strategies = freq_data.resample('H').agg({
'value': {
'mean': 'mean',
'median': 'median',
'volatility': lambda x: x.std(),
'range': lambda x: x.max() - x.min(),
'count': 'count'
}
}).round(3)
# 展平列名
resample_strategies.columns = resample_strategies.columns.droplevel(0)
print("小时重采样策略:")
print(resample_strategies.head())
4. 分类数据处理(Categorical Data)
4.1 创建和操作分类
# 创建分类数据
df_cat = pd.DataFrame({
'category': pd.Categorical(['A', 'B', 'A', 'C', 'B', 'A'],
categories=['A', 'B', 'C'], ordered=True),
'value': [10, 20, 15, 25, 30, 12]
})
print("分类数据:")
print(df_cat)
print("\n分类信息:")
print(df_cat['category'].cat.categories)
print("有序:", df_cat['category'].cat.ordered)
# 分类排序
df_sorted = df_cat.sort_values('category')
print("\n按分类排序:")
print(df_sorted)
4.2 分类操作
# 添加/删除分类
df_cat['category'] = df_cat['category'].cat.add_categories('D')
print("添加分类D:")
print(df_cat['category'].cat.categories)
# 移除未使用的分类
df_cat['category'] = df_cat['category'].cat.remove_unused_categories()
print("\n移除未用分类:")
print(df_cat['category'].cat.categories)
# 分类重命名
df_cat['category'] = df_cat['category'].cat.rename_categories({
'A': 'Excellent', 'B': 'Good', 'C': 'Poor'
})
print("\n重命名分类:")
print(df_cat)
4.3 分类编码
# 序数编码
df_encoded = df_cat.copy()
df_encoded['category_code'] = df_encoded['category'].cat.codes
print("序数编码:")
print(df_encoded)
# 独热编码
pd.get_dummies(df_cat, columns=['category'], prefix='cat')
# 基础编码(target encoding)
def target_encode(df, cat_col, target_col):
"""目标编码"""
mean_target = df.groupby(cat_col)[target_col].mean()
return df[cat_col].map(mean_target)
df_cat['target_encoded'] = target_encode(df_cat, 'category', 'value')
print("\n目标编码:")
print(df_cat)
5. 自定义业务函数
5.1 向量化自定义函数
# NumPy向量化
def vectorized_operation(df):
"""向量化计算"""
numeric_cols = df.select_dtypes(include=[np.number]).columns
df['total'] = df[numeric_cols].sum(axis=1)
df['mean'] = df[numeric_cols].mean(axis=1)
df['zscore'] = (df['total'] - df['total'].mean()) / df['total'].std()
return df
# 示例
sales_df = pd.DataFrame({
'Q1': np.random.randint(100, 200, 10),
'Q2': np.random.randint(100, 200, 10),
'Q3': np.random.randint(100, 200, 10),
'Q4': np.random.randint(100, 200, 10)
})
result = vectorized_operation(sales_df)
print("向量化计算结果:")
print(result.head())
5.2 自定义聚合函数
def custom_agg_functions(df):
"""自定义聚合函数集合"""
def iqr(series):
"""四分位距"""
return series.quantile(0.75) - series.quantile(0.25)
def outlier_count(series, multiplier=1.5):
"""异常值计数"""
Q1 = series.quantile(0.25)
Q3 = series.quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - multiplier * IQR
upper = Q3 + multiplier * IQR
return ((series < lower) | (series > upper)).sum()
def mode_freq(series):
"""众数频率"""
mode_val = series.mode()
if len(mode_val) > 0:
return (series == mode_val[0]).sum()
return 0
agg_dict = {
'iqr': iqr,
'outliers': outlier_count,
'mode_frequency': mode_freq,
'cv': lambda x: x.std() / x.mean() if x.mean() != 0 else np.nan # 变异系数
}
return agg_dict
# 应用自定义聚合
custom_agg = sales_df.agg(custom_agg_functions(sales_df))
print("自定义聚合:")
print(custom_agg)
5.3 管道式处理
from functools import reduce
class DataPipeline:
"""数据处理管道"""
def __init__(self, df):
self.df = df.copy()
self.steps = []
def add_step(self, func, *args, **kwargs):
"""添加处理步骤"""
def step_wrapper(df):
return func(df, *args, **kwargs)
self.steps.append(step_wrapper)
return self
def add_custom(self, func):
"""添加自定义函数"""
self.steps.append(func)
return self
def execute(self):
"""执行管道"""
result = reduce(lambda df, step: step(df), self.steps, self.df)
return result
def log_steps(self):
"""记录步骤"""
step_names = [step.__name__ for step in self.steps]
print(f"执行步骤: {', '.join(step_names)}")
return self
# 示例使用
def clean_missing(df):
"""清洗缺失值"""
return df.fillna(df.mean(numeric_only=True))
def normalize(df):
"""标准化"""
numeric_cols = df.select_dtypes(include=[np.number]).columns
df[numeric_cols] = (df[numeric_cols] - df[numeric_cols].mean()) / df[numeric_cols].std()
return df
def add_features(df):
"""添加特征"""
df['total'] = df.sum(axis=1)
df['quarter_avg'] = df.filter(like='Q').mean(axis=1)
return df
# 构建管道
pipeline = (DataPipeline(sales_df)
.add_step(clean_missing)
.add_step(normalize)
.add_step(add_features)
.log_steps())
processed_df = pipeline.execute()
print("\n管道处理结果:")
print(processed_df.head())
6. 内存优化和性能提升
6.1 数据类型优化
def optimize_dtypes(df):
"""自动优化数据类型"""
optimized = df.copy()
# 整数优化
for col in optimized.select_dtypes(include=['int64']).columns:
optimized[col] = pd.to_numeric(optimized[col], downcast='integer')
# 浮点数优化
for col in optimized.select_dtypes(include=['float64']).columns:
optimized[col] = pd.to_numeric(optimized[col], downcast='float')
# 分类优化
for col in optimized.select_dtypes(include=['object']).columns:
if optimized[col].nunique() / len(optimized) < 0.5:
optimized[col] = optimized[col].astype('category')
# 日期优化
for col in optimized.select_dtypes(include=['object']).columns:
try:
optimized[col] = pd.to_datetime(optimized[col], errors='coerce')
except:
pass
# 布尔优化
for col in optimized.select_dtypes(include=['object']).columns:
if optimized[col].nunique() == 2:
unique_vals = set(optimized[col].dropna())
if all(val in [True, False, 1, 0, 'True', 'False'] for val in unique_vals):
optimized[col] = optimized[col].astype(bool)
return optimized
# 内存对比
original_memory = sales_df.memory_usage(deep=True).sum()
optimized_df = optimize_dtypes(sales_df)
optimized_memory = optimized_df.memory_usage(deep=True).sum()
print(f"原始内存: {original_memory / 1024**2:.2f} MB")
print(f"优化后内存: {optimized_memory / 1024**2:.2f} MB")
print(f"节省: {((original_memory - optimized_memory) / original_memory * 100):.1f}%")
6.2 索引优化
def optimize_index(df):
"""索引优化"""
result = df.copy()
# 设置高选择性列为索引
cardinality = result.nunique() / len(result)
high_card_cols = cardinality[cardinality > 0.1].index.tolist()
if high_card_cols:
best_index_col = high_card_cols[0]
result = result.set_index(best_index_col)
print(f"设置为索引: {best_index_col}")
# 多级索引优化
if len(result.index.names) > 1:
result = result.sort_index()
return result
# 分块处理大文件
def process_large_file(file_path, chunk_size=10000, func=None):
"""分块处理大文件"""
chunks = []
for chunk in pd.read_csv(file_path, chunksize=chunk_size):
if func:
processed = func(chunk)
else:
processed = chunk
chunks.append(processed)
return pd.concat(chunks, ignore_index=True)
6.3 并行处理
# 使用modin(需安装: pip install modin[ray])
# import ray
# ray.init()
# import modin.pandas as pd
# 使用Dask(需安装: pip install dask)
import dask.dataframe as dd
# Dask示例
def dask_processing(df_path):
"""Dask大数据处理"""
# 从文件创建Dask DataFrame
ddf = dd.read_csv(df_path)
# 延迟计算
mean_result = ddf.mean()
result = mean_result.compute() # 触发计算
# 分区操作
ddf_repartitioned = ddf.repartition(npartitions=4)
return result
# 使用joblib并行apply
from joblib import Parallel, delayed
def parallel_apply(df, func, n_jobs=-1):
"""并行apply"""
results = Parallel(n_jobs=n_jobs)(
delayed(func)(group) for name, group in df.groupby('category')
)
return pd.concat(results)
7. 扩展功能和集成
7.1 与SQL集成
from sqlalchemy import create_engine
# 创建数据库连接
engine = create_engine('sqlite:///example.db')
# 从SQL读取
query = "SELECT * FROM sales WHERE date > '2023-01-01'"
df_sql = pd.read_sql_query(query, engine)
# 写入SQL
df.to_sql('sales_data', engine, if_exists='replace', index=False)
# SQL风格操作
result = pd.read_sql_query("""
SELECT region, AVG(sales) as avg_sales, COUNT(*) as count
FROM sales_data
GROUP BY region
ORDER BY avg_sales DESC
""", engine)
print("SQL查询结果:")
print(result)
7.2 与Scikit-learn集成
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.pipeline import Pipeline
# 特征工程
def create_features(df):
"""特征工程"""
df = df.copy()
# 数值特征
numeric_features = df.select_dtypes(include=[np.number]).columns
# 分类特征编码
df = pd.get_dummies(df, columns=df.select_dtypes(include=['object', 'category']).columns)
# 特征选择
high_corr = df[numeric_features].corr().abs().unstack().sort_values(ascending=False)
top_features = high_corr[high_corr < 1].head(10).index.get_level_values(0).unique()
return df[top_features.tolist() + ['target']] # 假设有target列
# 机器学习管道
def ml_pipeline(df, target_col):
"""完整ML管道"""
# 准备数据
X = df.drop(columns=[target_col])
y = df[target_col]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 管道
pipeline = Pipeline([
('scaler', StandardScaler()),
('rf', RandomForestRegressor(n_estimators=100, random_state=42))
])
pipeline.fit(X_train, y_train)
# 评估
train_score = pipeline.score(X_train, y_train)
test_score = pipeline.score(X_test, y_test)
return {
'pipeline': pipeline,
'train_score': train_score,
'test_score': test_score,
'feature_importance': pipeline.named_steps['rf'].feature_importances_
}
# 示例(需要准备带目标变量的数据)
# ml_results = ml_pipeline(feature_df, 'target')
7.3 JSON和API集成
import requests
import json
def fetch_api_data(url, params=None):
"""从API获取数据"""
response = requests.get(url, params=params)
response.raise_for_status()
# JSON转Pandas
data = response.json()
if isinstance(data, list):
df = pd.DataFrame(data)
elif isinstance(data, dict) and 'results' in data:
df = pd.DataFrame(data['results'])
else:
df = pd.DataFrame([data])
return df
# 处理嵌套JSON
def flatten_json(df):
"""展平嵌套JSON"""
def flatten_dict(d):
flattened = {}
for key, value in d.items():
if isinstance(value, dict):
for k, v in value.items():
flattened[f"{key}_{k}"] = v
else:
flattened[key] = value
return flattened
if df.empty:
return df
flattened_data = [flatten_dict(row) for row in df.to_dict('records')]
return pd.DataFrame(flattened_data)
# 示例(使用公开API)
# url = "https://api.publicapis.org/entries"
# api_df = fetch_api_data(url)
# flattened_df = flatten_json(api_df)
8. 高级时间序列处理
8.1 周期性和季节性
def advanced_time_features(df, date_col):
"""高级时间特征"""
df = df.copy()
df[date_col] = pd.to_datetime(df[date_col])
# 基础时间特征
df['year'] = df[date_col].dt.year
df['month'] = df[date_col].dt.month
df['quarter'] = df[date_col].dt.quarter
df['day'] = df[date_col].dt.day
df['dayofweek'] = df[date_col].dt.dayofweek
df['hour'] = df[date_col].dt.hour
df['minute'] = df[date_col].dt.minute
# 周期特征(正弦/余弦编码)
df['month_sin'] = np.sin(2 * np.pi * df['month'] / 12)
df['month_cos'] = np.cos(2 * np.pi * df['month'] / 12)
df['dayofweek_sin'] = np.sin(2 * np.pi * df['dayofweek'] / 7)
df['dayofweek_cos'] = np.cos(2 * np.pi * df['dayofweek'] / 7)
df['hour_sin'] = np.sin(2 * np.pi * df['hour'] / 24)
df['hour_cos'] = np.cos(2 * np.pi * df['hour'] / 24)
# 滞后特征
for lag in [1, 7, 30]:
df[f'lag_{lag}'] = df['value'].shift(lag) # 假设有value列
# 滚动统计
for window in [7, 30]:
df[f'rolling_mean_{window}'] = df['value'].rolling(window).mean()
df[f'rolling_std_{window}'] = df['value'].rolling(window).std()
# 趋势特征
df['trend'] = df['value'].rolling(30).apply(lambda x: np.polyfit(range(len(x)), x, 1)[0])
return df
# 使用
# ts_df = advanced_time_features(df, 'date')
8.2 异常检测
def detect_anomalies(df, value_col, window=30, threshold=2.5):
"""异常检测"""
df = df.copy()
# 滚动统计
rolling_mean = df[value_col].rolling(window=window, center=True).mean()
rolling_std = df[value_col].rolling(window=window, center=True).std()
# Z-score
df['z_score'] = (df[value_col] - rolling_mean) / rolling_std
# 异常标记
df['is_anomaly_z'] = np.abs(df['z_score']) > threshold
# IQR方法
Q1 = df[value_col].rolling(window=window).quantile(0.25)
Q3 = df[value_col].rolling(window=window).quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
df['is_anomaly_iqr'] = ~df[value_col].between(lower_bound, upper_bound)
# 隔离森林(需要sklearn)
from sklearn.ensemble import IsolationForest
iso_forest = IsolationForest(contamination=0.1, random_state=42)
df['is_anomaly_iso'] = iso_forest.fit_predict(df[[value_col]]) == -1
return df
# 可视化异常
# anomalies = detect_anomalies(ts_df, 'value')
# plt.figure(figsize=(12, 6))
# plt.plot(anomalies.index, anomalies['value'], label='数据')
# anomalies[anomalies['is_anomaly_z']]['value'].plot(style='ro', label='异常')
# plt.legend()
# plt.show()
9. 实际高级应用案例
9.1 实时数据流处理
import time
from collections import deque
class StreamingProcessor:
"""实时数据流处理器"""
def __init__(self, window_size=100):
self.window_size = window_size
self.data_window = deque(maxlen=window_size)
self.metrics = {}
def process(self, new_data):
"""处理新数据点"""
self.data_window.append(new_data)
df_window = pd.DataFrame(list(self.data_window))
# 实时指标
self.metrics = {
'mean': df_window['value'].mean(),
'std': df_window['value'].std(),
'count': len(df_window),
'last_update': pd.Timestamp.now()
}
# 异常检测
if len(df_window) >= 10:
z_score = (new_data['value'] - df_window['value'].tail(10).mean()) / df_window['value'].tail(10).std()
self.metrics['is_anomaly'] = abs(z_score) > 2
return self.metrics
def get_trend(self):
"""趋势分析"""
if len(self.data_window) < 2:
return None
df = pd.DataFrame(list(self.data_window))
x = np.arange(len(df))
slope = np.polyfit(x, df['value'], 1)[0]
return slope
# 模拟实时数据
processor = StreamingProcessor()
for i in range(50):
new_point = {'value': np.random.randn() + (i % 10) * 0.1} # 带趋势的噪声
metrics = processor.process(new_point)
if i % 10 == 0:
print(f"时间步 {i}: 均值={metrics['mean']:.2f}, 趋势={processor.get_trend():.3f}")
9.2 A/B测试分析
def ab_test_analysis(df):
"""A/B测试统计分析"""
# 假设数据包含:user_id, group(A/B), metric, timestamp
# 基本统计
group_stats = df.groupby('group')['metric'].agg([
'count', 'mean', 'std', 'sem' # 标准误
]).round(4)
# t检验
from scipy import stats
group_a = df[df['group'] == 'A']['metric']
group_b = df[df['group'] == 'B']['metric']
t_stat, p_value = stats.ttest_ind(group_a, group_b)
# 效应量
cohens_d = (group_a.mean() - group_b.mean()) / np.sqrt((group_a.std() + group_b.std()) / 2)
# 置信区间
from statsmodels.stats.weightstats import ttest_ind
ttest_result = ttest_ind(group_a, group_b, usevar='unequal')
results = {
'group_stats': group_stats,
't_statistic': t_stat,
'p_value': p_value,
'cohen_d': cohens_d,
'confidence_interval': ttest_result.confidence_interval,
'significant': p_value < 0.05
}
return results
# 可视化A/B测试
def plot_ab_test(df):
"""A/B测试可视化"""
import seaborn as sns
plt.figure(figsize=(12, 8))
# 箱线图
sns.boxplot(data=df, x='group', y='metric')
plt.title('A/B测试箱线图')
# 密度图
fig, ax = plt.subplots(1, 2, figsize=(15, 6))
for group in ['A', 'B']:
data = df[df['group'] == group]['metric']
sns.kdeplot(data, ax=ax[0], label=f'Group {group}')
ax[0].set_title('分布密度对比')
ax[0].legend()
# 均值对比
means = df.groupby('group')['metric'].mean()
ax[1].bar(means.index, means.values, alpha=0.7)
ax[1].set_title('组均值对比')
ax[1].set_ylabel('均值')
plt.tight_layout()
plt.show()
Pandas的高级功能极大扩展了其在复杂数据处理场景中的应用能力。多级索引、窗口函数和分类数据处理提供了强大的数据组织和分析能力;内存优化和性能提升确保了大数据处理的效率;与机器学习和数据库的集成则构建了完整的数据科学工作流。这些功能结合业务需求,可以解决从实时数据流到复杂A/B测试的各种高级分析任务。