Pandas 性能优化
Pandas 性能优化详解
1. 数据类型优化
1.1 自动数据类型下转换
import pandas as pd
import numpy as np
import time
import psutil
import os
def memory_usage(df):
"""计算DataFrame内存使用"""
return df.memory_usage(deep=True).sum() / 1024**2 # MB
def optimize_dtypes(df):
"""智能数据类型优化"""
optimized = df.copy()
# 1. 整数优化
for col in optimized.select_dtypes(include=['int64']).columns:
min_val = optimized[col].min()
max_val = optimized[col].max()
# 尝试int8
if min_val >= np.iinfo(np.int8).min and max_val <= np.iinfo(np.int8).max:
optimized[col] = optimized[col].astype('int8')
# 尝试int16
elif min_val >= np.iinfo(np.int16).min and max_val <= np.iinfo(np.int16).max:
optimized[col] = optimized[col].astype('int16')
# 尝试int32
elif min_val >= np.iinfo(np.int32).min and max_val <= np.iinfo(np.int32).max:
optimized[col] = optimized[col].astype('int32')
# 2. 浮点数优化
for col in optimized.select_dtypes(include=['float64']).columns:
# 检查是否为整数值(优化为int)
if optimized[col].apply(float.is_integer).all():
optimized[col] = pd.to_numeric(optimized[col], downcast='integer')
else:
# 尝试float32
optimized[col] = optimized[col].astype('float32')
# 3. 字符串→分类优化
for col in optimized.select_dtypes(include=['object']).columns:
n_unique = optimized[col].nunique()
n_rows = len(optimized)
# 高基数字符串保持object,低基数转为category
if n_unique / n_rows < 0.5 and n_unique < 1000:
optimized[col] = optimized[col].astype('category')
# 4. 布尔值优化
for col in optimized.select_dtypes(include=['object']).columns:
unique_vals = set(optimized[col].dropna().unique())
if len(unique_vals) == 2 and all(isinstance(v, (bool, int)) for v in unique_vals):
optimized[col] = optimized[col].astype(bool)
return optimized
# 示例对比
np.random.seed(42)
n_rows = 1000000
df_original = pd.DataFrame({
'int_col': np.random.randint(-1000, 1000, n_rows),
'float_col': np.random.randn(n_rows),
'category_col': np.random.choice(['A', 'B', 'C'], n_rows),
'bool_col': np.random.choice([True, False], n_rows),
'string_col': np.random.choice(['x', 'y', 'z', 'w'], n_rows)
})
print("原始内存:", memory_usage(df_original), "MB")
df_optimized = optimize_dtypes(df_original)
print("优化后内存:", memory_usage(df_optimized), "MB")
print("节省:", round((1 - memory_usage(df_optimized)/memory_usage(df_original)) * 100, 1), "%")
1.2 分类数据优化技巧
def advanced_category_optimization(df, high_cardinality_threshold=0.1):
"""高级分类优化"""
optimized = df.copy()
for col in df.select_dtypes(include=['object', 'string']).columns:
n_unique = df[col].nunique()
n_rows = len(df)
fillna_strategy = df[col].mode()[0] if not df[col].mode().empty else 'Unknown'
# 高基数分类:hash编码或保持string
if n_unique / n_rows > high_cardinality_threshold:
# 使用hash编码减少内存
optimized[f'{col}_hash'] = (df[col].astype('string')
.apply(lambda x: hash(str(x)) % 10000)
.astype('int16'))
optimized = optimized.drop(columns=[col])
else:
# 低基数:使用category
optimized[col] = (df[col].fillna(fillna_strategy)
.astype('category')
.cat.codes.astype('int8'))
return optimized
# 示例
df_cat = pd.DataFrame({
'low_card': np.random.choice(['A', 'B', 'C'], n_rows),
'high_card': [f'user_{i}' for i in range(n_rows)]
})
df_cat_opt = advanced_category_optimization(df_cat)
print("分类优化效果:", memory_usage(df_cat_opt), "MB")
2. 索引优化
2.1 高效索引选择
def optimize_index_selection(df, query_patterns):
"""
根据查询模式优化索引
query_patterns: 常用查询条件的列名列表
"""
# 计算基数(选择性)
cardinality = df.nunique() / len(df)
# 选择高选择性列作为索引
candidate_cols = [col for col in query_patterns if col in df.columns]
high_selectivity_cols = cardinality[candidate_cols].sort_values(ascending=False)
if len(high_selectivity_cols) > 0:
best_index_col = high_selectivity_cols.index[0]
df_indexed = df.set_index(best_index_col)
print(f"推荐索引列: {best_index_col} (选择性: {high_selectivity_cols[0]:.3f})")
return df_indexed
return df
# 示例:频繁按日期和地区查询
patterns = ['date', 'region']
df_indexed = optimize_index_selection(df_original, patterns)
2.2 多级索引优化
def create_composite_index(df, index_cols, sort=True):
"""创建复合索引"""
df_indexed = df.set_index(index_cols)
if sort:
df_indexed = df_indexed.sort_index()
return df_indexed
# 示例:日期+地区复合索引
df_composite = create_composite_index(df_original, ['category', 'int_col'])
print("复合索引查询速度测试:")
%timeit df_composite.loc['A'] # 索引查询
3. 向量化操作 vs Apply
3.1 性能对比测试
# 创建测试数据
n = 1000000
test_df = pd.DataFrame({
'a': np.random.randn(n),
'b': np.random.randn(n),
'category': np.random.choice(['X', 'Y'], n)
})
# 方法1: apply(慢)
def slow_method(df):
start = time.time()
result = df.groupby('category').apply(lambda g: g['a'] * g['b'].mean())
return time.time() - start
# 方法2: 向量化
def fast_method(df):
start = time.time()
cat_means = df.groupby('category')['b'].mean()
result = df['a'] * df['category'].map(cat_means)
return time.time() - start
# 测试
print("Apply方法耗时:", slow_method(test_df))
print("向量化方法耗时:", fast_method(test_df))
3.2 高效替代方案
def vectorized_transform(df, group_col, value_col, transform_func):
"""向量化transform替代"""
group_values = df.groupby(group_col)[value_col].transform(transform_func)
return group_values
# 示例:组内标准化
def group_standardize(df):
"""组内标准化(向量化)"""
return df.groupby('category')[['a', 'b']].transform(
lambda x: (x - x.mean()) / x.std()
)
# 对比
normalized = group_standardize(test_df)
print("向量化组内标准化完成")
3.3 Numba加速
from numba import jit
import numpy as np
@jit(nopython=True)
def numba_multiply(a, b):
"""Numba加速的数组运算"""
result = np.zeros_like(a)
for i in range(len(a)):
result[i] = a[i] * b[i]
return result
def numba_optimized(df):
"""使用Numba优化"""
a_array = df['a'].values
b_array = df['b'].values
return pd.Series(numba_multiply(a_array, b_array), index=df.index)
# 性能测试(Numba首次调用有编译开销)
result_numba = numba_optimized(test_df)
4. 分块处理大文件
4.1 内存映射和分块读取
def process_large_csv_chunks(file_path, chunk_size=100000, func=None):
"""分块处理大CSV文件"""
chunks = []
total_chunks = 0
for chunk_num, chunk in enumerate(pd.read_csv(file_path, chunksize=chunk_size)):
total_chunks += 1
print(f"处理块 {chunk_num + 1}/{total_chunks}")
if func:
processed_chunk = func(chunk)
else:
processed_chunk = chunk
# 内存优化:只保留需要的列
if 'keep_cols' in locals():
processed_chunk = processed_chunk[keep_cols]
chunks.append(processed_chunk)
return pd.concat(chunks, ignore_index=True)
# 示例处理函数
def preprocess_chunk(chunk):
"""每块预处理"""
# 类型转换
numeric_cols = chunk.select_dtypes(include=['object']).columns
for col in numeric_cols:
chunk[col] = pd.to_numeric(chunk[col], errors='coerce')
# 过滤
chunk = chunk[chunk['value'] > 0]
return chunk
# 使用
# large_df = process_large_csv_chunks('large_file.csv', func=preprocess_chunk)
4.2 内存映射(Memory Mapping)
def memory_mapped_read(file_path):
"""内存映射读取大文件"""
# 使用read_csv的memory_map参数
df = pd.read_csv(file_path, memory_map=True, low_memory=False)
# 或者使用numpy.memmap
import numpy as np
mmap_array = np.memmap(file_path, dtype='float32', mode='r', shape=(1000000,))
return df
# HDF5格式(适合大数组)
def hdf5_optimized(df, key='data'):
"""HDF5存储和读取"""
# 写入
df.to_hdf('data.h5', key=key, mode='w', complevel=9, complib='zlib')
# 读取(部分读取)
chunk = pd.read_hdf('data.h5', key=key, start=0, stop=10000)
return chunk
5. 并行处理
5.1 Dask集成
import dask.dataframe as dd
def dask_optimization(df_path, npartitions=None):
"""Dask并行处理"""
# 从文件创建Dask DataFrame
ddf = dd.read_csv(df_path)
# 自动分区或指定分区数
if npartitions:
ddf = ddf.repartition(npartitions=npartitions)
# 延迟计算操作
mean_result = ddf.mean()
result = mean_result.compute() # 触发计算
# 复杂操作链
processed = (ddf.groupby('category')
.agg({'value': 'mean'})
.compute())
return processed
# 分区优化
def optimize_dask_partitions(ddf, target_size_mb=100):
"""优化Dask分区大小"""
npartitions = len(ddf) // (target_size_mb * 1024 * 1024 // ddf.dtypes[0].itemsize)
return ddf.repartition(npartitions=npartitions)
5.2 多进程处理
from multiprocessing import Pool, cpu_count
from functools import partial
import swifter # pip install swifter
def parallel_groupby(df, group_col, agg_dict, n_jobs=None):
"""并行groupby"""
if n_jobs is None:
n_jobs = cpu_count()
# 方法1: 使用swifter(自动选择最佳方法)
result = df.swifter.apply(lambda g: g.agg(agg_dict), axis=1)
# 方法2: 手动多进程
def process_group(group):
return group.groupby(group_col).agg(agg_dict)
groups = [group for _, group in df.groupby(group_col)]
with Pool(n_jobs) as pool:
results = pool.map(process_group, groups)
return pd.concat(results)
# 使用
# result = parallel_groupby(df, 'category', {'value': 'mean'})
5.3 Ray和Modin
# Modin(drop-in Pandas替代)
# pip install modin[ray]
import modin.pandas as pd_modin
import ray
ray.init()
# Modin使用相同API
df_modin = pd_modin.read_csv('large_file.csv')
result = df_modin.groupby('category').mean().compute() # 自动并行
6. 向量化字符串操作
6.1 高效字符串处理
def vectorized_string_ops(df):
"""向量化字符串操作"""
# 避免逐行apply,使用.str访问器
df['length'] = df['text'].str.len()
df['uppercase'] = df['text'].str.upper()
df['contains_a'] = df['text'].str.contains('a', regex=False)
# 正则表达式向量化
df['email_domain'] = (df['email']
.str.extract(r'@([\w.-]+)', expand=False)
.str.split('\.', expand=True)[0])
# 批量替换
df['clean_text'] = (df['text']
.str.replace(r'[^\w\s]', '', regex=True)
.str.replace(r'\s+', ' ', regex=True)
.str.strip())
return df
# 性能对比
def slow_string_apply(df):
"""慢的字符串apply"""
df['length_slow'] = df['text'].apply(len)
return df
# 测试
n = 100000
test_str_df = pd.DataFrame({'text': [f'test_string_{i}' for i in range(n)]})
%timeit vectorized_string_ops(test_str_df)
%timeit slow_string_apply(test_str_df)
7. 缓存和中间结果优化
7.1 结果缓存
from functools import lru_cache
import joblib
class CachedDataFrame:
"""带缓存的DataFrame操作"""
def __init__(self, df):
self.df = df
self.cache = {}
@lru_cache(maxsize=128)
def cached_groupby(self, group_col, agg_func):
"""缓存分组结果"""
key = f"{group_col}_{agg_func}"
if key not in self.cache:
self.cache[key] = self.df.groupby(group_col).agg(agg_func)
return self.cache[key]
def save_cache(self, filepath):
"""保存缓存到磁盘"""
joblib.dump(self.cache, filepath)
@classmethod
def load_cache(cls, df, cache_file):
"""从磁盘加载缓存"""
cache = joblib.load(cache_file)
instance = cls(df)
instance.cache = cache
return instance
# 使用
cached_df = CachedDataFrame(df_original)
result1 = cached_df.cached_groupby('category', 'mean')
result2 = cached_df.cached_groupby('category', 'mean') # 从缓存读取
7.2 增量计算
class IncrementalProcessor:
"""增量计算处理器"""
def __init__(self):
self.summary_stats = {}
self.last_n = 0
def update(self, new_data):
"""增量更新统计"""
n_new = len(new_data)
for col in new_data.select_dtypes(include=[np.number]).columns:
if col not in self.summary_stats:
self.summary_stats[col] = {
'count': 0, 'sum': 0, 'sum_sq': 0, 'min': np.inf, 'max': -np.inf
}
stats = self.summary_stats[col]
stats['count'] += n_new
stats['sum'] += new_data[col].sum()
stats['sum_sq'] += (new_data[col]**2).sum()
stats['min'] = min(stats['min'], new_data[col].min())
stats['max'] = max(stats['max'], new_data[col].max())
self.last_n = n_new
def get_stats(self):
"""获取统计信息"""
stats = {}
for col, s in self.summary_stats.items():
n = s['count']
mean = s['sum'] / n
std = np.sqrt((s['sum_sq'] / n) - (mean**2))
stats[col] = {'mean': mean, 'std': std, 'min': s['min'], 'max': s['max']}
return stats
# 示例:流式处理
processor = IncrementalProcessor()
for i in range(10):
batch = pd.DataFrame({
'value': np.random.randn(1000),
'category': np.random.choice(['A', 'B'], 1000)
})
processor.update(batch)
print(f"批次 {i}: 均值 {processor.get_stats()['value']['mean']:.3f}")
8. 格式优化和存储
8.1 高效文件格式
def benchmark_file_formats(df, output_dir='format_bench'):
"""文件格式性能基准测试"""
import os
os.makedirs(output_dir, exist_ok=True)
formats = {
'csv': lambda: df.to_csv(f'{output_dir}/data.csv', index=False),
'parquet': lambda: df.to_parquet(f'{output_dir}/data.parquet',
compression='snappy', index=False),
'feather': lambda: df.to_feather(f'{output_dir}/data.feather'),
'pickle': lambda: df.to_pickle(f'{output_dir}/data.pkl'),
'hdf5': lambda: df.to_hdf(f'{output_dir}/data.h5', key='data',
complevel=9, complib='zlib')
}
results = {}
for name, write_func in formats.items():
start = time.time()
write_func()
write_time = time.time() - start
# 读取时间
start = time.time()
if name == 'csv':
read_df = pd.read_csv(f'{output_dir}/data.csv')
elif name == 'parquet':
read_df = pd.read_parquet(f'{output_dir}/data.parquet')
elif name == 'feather':
read_df = pd.read_feather(f'{output_dir}/data.feather')
elif name == 'pickle':
read_df = pd.read_pickle(f'{output_dir}/data.pkl')
elif name == 'hdf5':
read_df = pd.read_hdf(f'{output_dir}/data.h5', key='data')
read_time = time.time() - start
file_size = os.path.getsize(f'{output_dir}/data.{name if name != "hdf5" else "h5"}')
results[name] = {
'write_time': write_time,
'read_time': read_time,
'file_size_mb': file_size / 1024**2
}
return pd.DataFrame(results).T
# 测试
# format_results = benchmark_file_formats(df_optimized)
# print(format_results)
8.2 Parquet分区优化
def partitioned_parquet(df, partition_cols, output_dir):
"""分区Parquet存储"""
import pyarrow as pa
from pyarrow import parquet as pq
# 按分区列分组
for partition_value in df[partition_cols[0]].unique():
partition_df = df[df[partition_cols[0]] == partition_value]
# 子分区
for sub_value in partition_df[partition_cols[1]].unique():
sub_df = partition_df[partition_df[partition_cols[1]] == sub_value]
# Parquet分区路径
partition_path = f"{output_dir}/partition_col1={partition_value}/partition_col2={sub_value}"
sub_df.to_parquet(partition_path, index=False)
# 读取分区数据
def read_partitioned_data(partition_filter):
"""按分区过滤读取"""
import glob
pattern = f"{output_dir}/**/*"
files = glob.glob(pattern, recursive=True)
# 只读取匹配分区的文件
filtered_files = [f for f in files if all(kv in f for kv in partition_filter.items())]
return pd.concat([pd.read_parquet(f) for f in filtered_files])
# 示例
# partitioned_parquet(df, ['region', 'category'], 'partitioned_data')
9. 查询优化
9.1 高效过滤
def optimized_filtering(df):
"""高效数据过滤"""
# 1. 使用布尔索引而非query(更快)
mask = (df['value'] > 100) & (df['category'] == 'A')
result1 = df[mask]
# 2. 避免链式索引
# 慢:df['category'][df['value'] > 100] = 'High'
# 快:
df.loc[df['value'] > 100, 'category'] = 'High'
# 3. 使用isin代替多个or条件
values = [1, 2, 3, 4, 5]
# 慢:df[df['id'] == 1][df['id'] == 2]...
# 快:
result2 = df[df['id'].isin(values)]
# 4. 预计算复杂条件
complex_mask = df['text'].str.contains('pattern', na=False) & (df['value'] > df['value'].quantile(0.9))
result3 = df[complex_mask]
return result1, result2, result3
# 9.2 使用query的优化
def smart_query(df):
"""优化的query使用"""
# 使用@变量引用
threshold = 100
result = df.query('value > @threshold & category == "A"')
# 避免query中的复杂计算
df['value_log'] = np.log1p(df['value']) # 预计算
result = df.query('value_log > @log_threshold')
9.3 索引加速查询
def benchmark_indexing(df):
"""索引性能基准测试"""
# 无索引
start = time.time()
result_no_idx = df[(df['date'] >= '2023-01-01') & (df['category'] == 'A')]
time_no_idx = time.time() - start
# 有索引
df_indexed = df.set_index(['date', 'category'])
start = time.time()
result_idx = df_indexed.loc[pd.IndexSlice['2023-01-01':, 'A']]
time_idx = time.time() - start
print(f"无索引耗时: {time_no_idx:.4f}s")
print(f"有索引耗时: {time_idx:.4f}s")
print(f"加速比: {time_no_idx / time_idx:.1f}x")
return time_no_idx / time_idx
10. 完整优化管道
10.1 自动化优化管道
class PandasOptimizer:
"""完整Pandas优化管道"""
def __init__(self, df):
self.original_df = df
self.optimized_df = None
self.optimization_report = {}
def run_full_optimization(self, target_memory_gb=1.0):
"""运行完整优化"""
print("开始优化...")
# 1. 数据类型优化
start_memory = memory_usage(self.original_df)
self.optimized_df = optimize_dtypes(self.original_df.copy())
dtype_saving = (1 - memory_usage(self.optimized_df) / start_memory) * 100
self.optimization_report['dtype_optimization'] = {
'before_mb': start_memory,
'after_mb': memory_usage(self.optimized_df),
'saving_percent': dtype_saving
}
# 2. 索引优化
if 'date' in self.optimized_df.columns:
self.optimized_df['date'] = pd.to_datetime(self.optimized_df['date'])
self.optimized_df = self.optimized_df.set_index('date')
# 3. 分类优化
self.optimized_df = advanced_category_optimization(self.optimized_df)
# 4. 预计算常用聚合
if 'category' in self.optimized_df.columns:
self.category_stats = self.optimized_df.groupby('category').mean()
# 5. 内存目标检查
final_memory = memory_usage(self.optimized_df)
if final_memory > target_memory_gb * 1024:
print(f"警告:内存使用 {final_memory:.1f}MB 超过目标")
self.optimization_report['final_memory'] = final_memory
print(f"优化完成!内存使用: {final_memory:.1f}MB")
return self.optimized_df
def generate_report(self):
"""生成优化报告"""
report = self.optimization_report.copy()
report['speedup_factors'] = {
'vectorized_operations': '10-100x',
'proper_indexing': '5-50x',
'dtype_optimization': f"{report['dtype_optimization']['saving_percent']:.1f}% memory"
}
return report
def export_optimized(self, format='parquet', path='optimized_data'):
"""导出优化数据"""
if format == 'parquet':
self.optimized_df.to_parquet(f'{path}.parquet',
compression='snappy',
index=True)
elif format == 'feather':
self.optimized_df.reset_index().to_feather(f'{path}.feather')
# 使用示例
optimizer = PandasOptimizer(df_original)
optimized_data = optimizer.run_full_optimization()
report = optimizer.generate_report()
print("优化报告:", report)
optimizer.export_optimized()
10.2 监控和诊断
def performance_monitor(func):
"""性能监控装饰器"""
import tracemalloc
def wrapper(*args, **kwargs):
tracemalloc.start()
start_time = time.time()
start_memory = psutil.Process(os.getpid()).memory_info().rss / 1024**2
result = func(*args, **kwargs)
end_time = time.time()
end_memory = psutil.Process(os.getpid()).memory_info().rss / 1024**2
tracemalloc.stop()
print(f"{func.__name__}:")
print(f" 时间: {end_time - start_time:.4f}s")
print(f" 内存增量: {end_memory - start_memory:.2f}MB")
return result
return wrapper
@performance_monitor
def optimized_pipeline(df):
"""优化管道示例"""
# 向量化操作
df['feature1'] = df['a'] * df['b']
df['feature2'] = np.log1p(np.abs(df['feature1']))
# 高效分组
group_stats = df.groupby('category', sort=False)['feature1'].agg(['mean', 'std'])
# 合并结果
result = df.merge(group_stats, left_on='category', right_index=True)
return result
# 测试
result = optimized_pipeline(df_optimized)
11. 最佳实践总结
11.1 核心原则
# 1. 优先向量化操作
assert np.allclose(df['a'] * df['b'], df.apply(lambda row: row['a'] * row['b'], axis=1))
# 2. 选择合适的数据类型
df_int8 = df['int_col'].astype('int8') # 而不是int64
# 3. 使用索引加速查询
df.set_index(['date', 'category'], inplace=True)
# 4. 分块处理大文件
for chunk in pd.read_csv('large.csv', chunksize=100000):
process_chunk(chunk)
# 5. 缓存频繁使用的结果
@lru_cache(maxsize=100)
def expensive_calculation(key):
return heavy_computation(key)
# 6. 使用Parquet/Feather替代CSV
df.to_parquet('data.parquet') # 更快、更小
11.2 生产环境检查清单
def production_readiness_check(df):
"""生产环境就绪检查"""
issues = []
# 1. 数据类型检查
for col in df.select_dtypes(include=['object']).columns:
if df[col].nunique() / len(df) < 0.5:
issues.append(f"列 {col}: 建议使用category类型")
# 2. 内存使用检查
memory_mb = memory_usage(df)
if memory_mb > 1000: # 1GB阈值
issues.append(f"高内存使用: {memory_mb:.1f}MB")
# 3. 索引检查
if not isinstance(df.index, pd.DatetimeIndex) and 'date' in df.columns:
issues.append("建议将日期列设为索引")
# 4. 缺失值检查
missing_rate = df.isnull().mean()
high_missing = missing_rate[missing_rate > 0.5]
if len(high_missing) > 0:
issues.append(f"高缺失率列: {high_missing.index.tolist()}")
return issues
# 检查
issues = production_readiness_check(df_optimized)
print("生产环境问题:", issues if issues else "无问题")
通过系统性的数据类型优化、向量化操作、索引设计、分块处理和高效存储格式选择,Pandas性能可以提升数十倍甚至上百倍。关键是理解底层数据结构和计算模式,选择最适合的优化策略,并在生产环境中持续监控和调整。