Pandas DataFrame API 手册
Pandas DataFrame API 完整手册
1. DataFrame 创建
1.1 基本创建方法
import pandas as pd
import numpy as np
from datetime import datetime
# 从字典创建(最常用)
data = {
'A': [1, 2, 3, 4],
'B': [10, 20, 30, 40],
'C': ['a', 'b', 'c', 'd']
}
df1 = pd.DataFrame(data)
# 指定索引
df2 = pd.DataFrame(data, index=['row1', 'row2', 'row3', 'row4'])
# 从列表创建
df3 = pd.DataFrame([[1, 'a'], [2, 'b'], [3, 'c']],
columns=['num', 'char'])
# 从Series创建
s1 = pd.Series([1, 2, 3], name='A')
s2 = pd.Series([10, 20, 30], name='B')
df4 = pd.DataFrame({'A': s1, 'B': s2})
# 从NumPy数组
arr = np.random.randn(4, 3)
df5 = pd.DataFrame(arr, columns=['X', 'Y', 'Z'])
# 空DataFrame
df_empty = pd.DataFrame()
print("DataFrame示例:")
print(df1)
1.2 高级创建
# 指定数据类型
df_typed = pd.DataFrame({
'int_col': pd.Series([1, 2, 3], dtype='int32'),
'float_col': pd.Series([1.1, 2.2, 3.3], dtype='float32'),
'cat_col': pd.Series(['A', 'B', 'A'], dtype='category')
})
# 使用pd.date_range创建时间序列
dates = pd.date_range('2023-01-01', periods=5, freq='D')
df_time = pd.DataFrame({
'date': dates,
'value': np.random.randn(5)
}, index=dates)
# 多级列索引
arrays = [['A', 'A', 'B'], ['foo', 'bar', 'baz']]
tuples = list(zip(*arrays))
multi_cols = pd.MultiIndex.from_tuples(tuples, names=['first', 'second'])
df_multi = pd.DataFrame(np.random.randn(4, 3), columns=multi_cols)
# 从记录列表创建
records = [{'A': 1, 'B': 10}, {'A': 2, 'B': 20}]
df_records = pd.DataFrame(records)
print("多级列示例:")
print(df_multi)
2. 属性和基本信息
2.1 基本属性
df = pd.DataFrame(np.random.randn(5, 3), columns=['A', 'B', 'C'])
print("基本属性:")
print("形状:", df.shape)
print("列数:", df.shape[1])
print("行数:", df.shape[0])
print("列名:", df.columns)
print("索引:", df.index)
print("数据类型:", df.dtypes)
print("值:", df.values)
print("转置:", df.T.shape)
print("是否为空:", df.empty)
print("NDim:", df.ndim)
2.2 内存和信息
# 内存使用
print("内存使用:")
print("单列:", df.memory_usage(deep=True))
print("总计:", df.memory_usage(deep=True).sum())
print("索引内存:", df.memory_usage(index=True, deep=True))
# 详细信息
print("\nDataFrame信息:")
print(df.info(memory_usage='deep'))
# 描述性统计
print("\n描述性统计:")
print(df.describe())
# 缺失值统计
print("\n缺失值:")
print(df.isnull().sum())
# 精确统计
print("\n精确信息:")
print(df.info(verbose=True, memory_usage='deep'))
3. 索引和选择
3.1 列选择
df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6], 'C': [7, 8, 9]})
# 单个列(返回Series)
print("列A:", df['A'])
# 多个列
print("多列:", df[['A', 'C']])
# 新列添加
df['D'] = df['A'] + df['B']
# 删除列
df_drop = df.drop('D', axis=1)
# 列存在检查
print("列A存在:", 'A' in df.columns)
3.2 行选择(loc/iloc)
# loc(标签索引)
print("loc单行:", df.loc[0])
print("loc多行:", df.loc[[0, 2]])
print("loc切片:", df.loc[0:2])
print("loc条件:", df.loc[df['A'] > 1])
# iloc(位置索引)
print("iloc单行:", df.iloc[0])
print("iloc多行:", df.iloc[[0, 2]])
print("iloc切片:", df.iloc[0:2])
# 混合选择
print("loc指定列:", df.loc[0, ['A', 'B']])
print("iloc指定列:", df.iloc[0, [0, 1]])
# 布尔索引
mask = df['A'] > 1
print("布尔索引:", df[mask])
3.3 at/iat(单元素快速访问)
# 快速单元素访问
df.at[0, 'A'] = 100
df.iat[1, 0] = 200
print("快速访问修改:")
print(df)
# 查询
print("at查询:", df.at[0, 'A'])
print("iat查询:", df.iat[0, 0])
3.4 多级索引操作
# 创建多级索引DataFrame
index = pd.MultiIndex.from_product([['A', 'B'], [1, 2]],
names=['level1', 'level2'])
df_multi_idx = pd.DataFrame(np.random.randn(4, 2),
index=index,
columns=['X', 'Y'])
print("多级索引:")
print(df_multi_idx)
# 多级选择
print("第一级A:", df_multi_idx.loc['A'])
print("具体组合:", df_multi_idx.loc[('A', 1)])
print("部分选择:", df_multi_idx.xs(1, level='level2'))
4. 数据操作
4.1 添加和删除
df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]})
# 添加列
df['C'] = df['A'] * 2
df.insert(1, 'new_col', [10, 20, 30]) # 指定位置插入
# 添加行
new_row = pd.DataFrame({'A': [4], 'B': [7], 'C': [8]})
df = pd.concat([df, new_row], ignore_index=True)
# 删除
df_dropped = df.drop(0) # 删除行
df_cols_dropped = df.drop(['C'], axis=1) # 删除列
# pop方法(返回删除的列)
col_popped = df.pop('A')
print("弹出列:", col_popped)
4.2 赋值和修改
# 直接赋值
df.loc[0, 'A'] = 999
df.iloc[1, 1] = 888
# 批量赋值
df.loc[df['A'] > 2, 'B'] = 0
# 使用update(原地更新)
other_df = pd.DataFrame({'A': [100, 200, 300]}, index=df.index)
df.update(other_df)
# 条件赋值(np.where)
df['new'] = np.where(df['A'] > 1, 'High', 'Low')
4.3 排序
df = pd.DataFrame({
'A': [3, 1, 2],
'B': [2, 3, 1],
'C': ['c', 'a', 'b']
}, index=[2, 0, 1])
# 按列排序
df_sorted_val = df.sort_values('A')
# 按索引排序
df_sorted_idx = df.sort_index()
# 多列排序
df_multi_sort = df.sort_values(['A', 'B'])
# 按列名排序
df_col_sort = df.sort_index(axis=1)
# 稳定排序
df_stable = df.sort_values('A', kind='stable')
print("排序结果:")
print(df_sorted_val)
5. 缺失值处理
5.1 检测和统计
df = pd.DataFrame({
'A': [1, np.nan, 3],
'B': [4, 5, np.nan],
'C': [np.nan, np.nan, 9]
})
print("缺失值检测:")
print("总缺失:", df.isnull().sum().sum())
print("每列缺失:", df.isnull().sum())
print("缺失比例:", df.isnull().mean())
print("非缺失计数:", df.notnull().sum())
5.2 填充方法
# 简单填充
df_filled = df.fillna(0)
# 前向/后向填充
df_ffill = df.fillna(method='ffill')
df_bfill = df.fillna(method='bfill')
# 按列填充
df_col_fill = df.fillna({'A': df['A'].mean(), 'B': 0})
# 插值
df_interp = df.interpolate(method='linear')
# 组合填充
df_combined = df.fillna(method='ffill').fillna(method='bfill')
print("填充结果:")
print(df_filled)
5.3 删除缺失值
# 删除含NaN的行
df_dropna = df.dropna()
# 按阈值删除
df_thresh = df.dropna(thresh=2) # 至少2个非NaN值
# 按列删除
df_drop_col = df.dropna(axis=1, how='all') # 删除全为NaN的列
# 特定列删除
df_subset = df.dropna(subset=['A', 'B'])
6. 合并和连接
6.1 连接(concat)
df1 = pd.DataFrame({'A': [1, 2], 'B': [3, 4]})
df2 = pd.DataFrame({'A': [5, 6], 'C': [7, 8]})
# 行连接
df_concat_row = pd.concat([df1, df2], axis=0, ignore_index=True)
# 列连接
df_concat_col = pd.concat([df1, df2], axis=1)
# 内部连接(只保留共同列)
df_inner = pd.concat([df1, df2], axis=0, join='inner')
# 外部连接
df_outer = pd.concat([df1, df2], axis=0, join='outer')
print("连接结果:")
print(df_concat_row)
6.2 合并(merge)
# 示例数据
left = pd.DataFrame({'key': ['K0', 'K1', 'K2'], 'A': [1, 2, 3]})
right = pd.DataFrame({'key': ['K0', 'K1', 'K3'], 'B': [4, 5, 6]})
# 内连接
merged_inner = pd.merge(left, right, on='key', how='inner')
# 左连接
merged_left = pd.merge(left, right, on='key', how='left')
# 外连接
merged_outer = pd.merge(left, right, on='key', how='outer')
# 多键合并
df1 = pd.DataFrame({'A': ['foo', 'bar'], 'B': [1, 2]})
df2 = pd.DataFrame({'A': ['foo', 'baz'], 'C': [3, 4]})
merged_multi = pd.merge(df1, df2, on='A')
# 索引合并
merged_index = pd.merge(left, right, left_index=True, right_index=True)
print("合并结果:")
print(merged_inner)
6.3 连接(join)
# 基于索引连接
df1 = pd.DataFrame({'A': [1, 2]}, index=['a', 'b'])
df2 = pd.DataFrame({'B': [3, 4]}, index=['a', 'c'])
# 默认左连接
joined = df1.join(df2)
# 内连接
joined_inner = df1.join(df2, how='inner')
# 多DataFrame连接
result = df1.join([df2, df3], how='outer')
7. 分组操作(groupby)
7.1 基本分组
df = pd.DataFrame({
'group': ['A', 'A', 'B', 'B', 'A'],
'value': [1, 2, 3, 4, 5],
'category': ['X', 'Y', 'X', 'Y', 'X']
})
# 按单列分组
grouped = df.groupby('group')
print("组名:", grouped.groups.keys())
print("A组:", grouped.get_group('A'))
# 迭代分组
for name, group in grouped:
print(f"{name}: {len(group)} 行")
# 聚合
print("组统计:")
print(grouped['value'].agg(['mean', 'sum', 'count']))
7.2 多级分组
# 多列分组
multi_group = df.groupby(['group', 'category'])['value'].mean()
print("多级分组:")
print(multi_group)
# 自定义聚合
def custom_agg(x):
return {'min': x.min(), 'max': x.max(), 'range': x.max() - x.min()}
result = df.groupby('group').agg(custom_agg)
print("自定义聚合:")
print(result)
7.3 转换和过滤
# transform(保持形状)
df['mean_group'] = df.groupby('group')['value'].transform('mean')
# apply(灵活但慢)
def custom_func(group):
return group['value'].rank(pct=True)
df['rank'] = df.groupby('group').apply(custom_func)
# filter(筛选组)
filtered = df.groupby('group').filter(lambda x: len(x) > 1)
8. 重塑和透视
8.1 透视表(pivot)
df_pivot_data = pd.DataFrame({
'date': ['2023-01', '2023-01', '2023-02', '2023-02'],
'category': ['A', 'B', 'A', 'B'],
'value': [1, 2, 3, 4]
})
# 简单透视
pivot_simple = df_pivot_data.pivot(index='date', columns='category', values='value')
# 多值透视
pivot_multi = df_pivot_data.pivot(index='date', columns='category')
8.2 透视表(pivot_table)
# 带聚合的透视表
pivot_table = pd.pivot_table(df_pivot_data,
values='value',
index='date',
columns='category',
aggfunc='sum',
fill_value=0,
margins=True) # 添加总计
print("透视表:")
print(pivot_table)
8.3 熔融(melt)
# 宽格式→长格式
wide_df = pd.DataFrame({
'date': ['2023-01', '2023-02'],
'A': [1, 3],
'B': [2, 4]
})
long_df = pd.melt(wide_df, id_vars=['date'],
value_vars=['A', 'B'],
var_name='category',
value_name='value')
print("熔融结果:")
print(long_df)
8.4 堆叠和展开(stack/unstack)
# 创建多级列
arrays = [['A', 'A', 'B'], ['one', 'two', 'one']]
multi_cols = pd.MultiIndex.from_arrays(arrays)
df_multi = pd.DataFrame(np.random.randn(4, 3), columns=multi_cols)
# stack(列→行)
stacked = df_multi.stack()
print("堆叠:")
print(stacked)
# unstack(行→列)
unstacked = stacked.unstack()
9. 统计和聚合
9.1 描述性统计
df = pd.DataFrame(np.random.randn(5, 3), columns=['A', 'B', 'C'])
print("基本统计:")
print(df.mean())
print(df.std())
print(df.describe())
# 分位数
print("分位数:", df.quantile([0.25, 0.5, 0.75]))
# 相关性
print("相关系数:", df.corr())
print("协方差:", df.cov())
9.2 聚合函数
# 基本聚合
print("聚合:")
print(df.agg(['sum', 'mean', 'std']))
# 列特定聚合
agg_dict = {'A': ['sum', 'mean'], 'B': 'std', 'C': 'count'}
print("自定义聚合:")
print(df.agg(agg_dict))
# 命名聚合
def custom_agg(df):
return df.mean() * 2
print("命名聚合:")
print(df.agg(custom_agg, axis=1))
9.3 滚动和扩展统计
# 滚动窗口
rolling = df.rolling(window=3)
print("滚动统计:")
print(rolling.mean())
print(rolling.agg(['min', 'max', 'mean']))
# 扩展窗口
expanding = df.expanding()
print("扩展统计:")
print(expanding.mean())
# 指数加权
ewm = df.ewm(span=3)
print("指数加权:")
print(ewm.mean())
10. 字符串和正则操作
10.1 str访问器
df_str = pd.DataFrame({
'text': ['Apple pie', 'Banana split', 'Cherry tart'],
'code': ['A001', 'B002', 'C003']
})
# 基本字符串操作
df_str['length'] = df_str['text'].str.len()
df_str['upper'] = df_str['text'].str.upper()
df_str['contains_a'] = df_str['text'].str.contains('a')
# 正则表达式
df_str['first_letter'] = df_str['text'].str.extract(r'(\w)')
df_str['code_num'] = df_str['code'].str.extract(r'(\d+)').astype(int)
# 分割
df_str[['fruit', 'dessert']] = df_str['text'].str.split(' ', expand=True)
print("字符串操作结果:")
print(df_str)
10.2 批量字符串处理
# 高级正则
patterns = {
'email': r'[\w\.-]+@[\w\.-]+',
'phone': r'\d{3}-\d{3}-\d{4}'
}
# 查找所有匹配
df_str['emails'] = df_str['text'].str.findall(r'[\w\.-]+@[\w\.-]+')
# 替换
df_str['clean_text'] = df_str['text'].str.replace(r'[^\w\s]', '', regex=True)
# 条件替换
df_str['normalized'] = df_str['text'].str.replace(r'\s+', ' ', regex=True).str.strip()
11. 时间序列操作
11.1 创建和索引
# 时间索引
dates = pd.date_range('2023-01-01', periods=10, freq='D')
df_time = pd.DataFrame({
'value': np.random.randn(10),
'category': np.random.choice(['A', 'B'], 10)
}, index=dates)
# 设置时间索引
df_time.index = pd.to_datetime(df_time.index)
df_time.index.name = 'date'
print("时间序列:")
print(df_time.head())
11.2 时间属性
# 提取时间组件
df_time['year'] = df_time.index.year
df_time['month'] = df_time.index.month
df_time['day'] = df_time.index.day
df_time['weekday'] = df_time.index.day_name()
# 时间切片
jan_data = df_time['2023-01']
first_week = df_time['2023-01-01':'2023-01-07']
# 条件筛选
weekends = df_time[df_time.index.dayofweek.isin([5, 6])]
11.3 重采样和频率转换
# 重采样
daily_to_monthly = df_time['value'].resample('M').mean()
daily_to_weekly = df_time.resample('W').agg({
'value': 'mean',
'category': lambda x: x.mode()[0] if len(x) > 0 else np.nan
})
# 降采样(聚合)
downsample = df_time.resample('W').agg(['mean', 'std'])
# 上采样(插值)
upsample = df_time.resample('6H').ffill() # 前向填充
# 自定义重采样
def custom_resample(x):
return x.ewm(span=3).mean().iloc[-1]
weekly_custom = df_time['value'].resample('W').apply(custom_resample)
12. 输入输出操作
12.1 CSV操作
# 读取
df_csv = pd.read_csv('data.csv',
index_col=0,
parse_dates=True,
dtype={'id': 'int32'},
chunksize=1000)
# 写入
df.to_csv('output.csv',
index=True,
compression='gzip',
date_format='%Y-%m-%d')
# 分块写入
for i, chunk in enumerate(np.array_split(df, 10)):
chunk.to_csv(f'chunk_{i}.csv', index=False)
12.2 Excel操作
# 多工作表写入
with pd.ExcelWriter('output.xlsx', engine='openpyxl') as writer:
df.to_excel(writer, sheet_name='Data', index=False)
df.describe().to_excel(writer, sheet_name='Stats')
# 读取多工作表
excel_file = pd.ExcelFile('data.xlsx')
sheets = {name: pd.read_excel(excel_file, name) for name in excel_file.sheet_names}
12.3 高效格式(Parquet)
# Parquet(推荐大数据)
df.to_parquet('data.parquet',
compression='snappy',
index=True,
partition_cols=['category'])
# 读取分区数据
df_part = pd.read_parquet('data.parquet',
filters=[('category', '=', 'A')],
columns=['value', 'date'])
13. 高级功能
13.1 窗口函数
# 滚动窗口
df['rolling_mean'] = df['value'].rolling(window=3).mean()
df['rolling_std'] = df['value'].rolling(3, min_periods=1).std()
# 自定义窗口函数
df['rolling_range'] = df['value'].rolling(3).apply(
lambda x: x.max() - x.min(), raw=True
)
# 条件窗口
df['expanding_mean'] = df['value'].expanding().mean()
# 分组窗口
df['group_rolling'] = df.groupby('category')['value'].rolling(3).mean().reset_index(0, drop=True)
13.2 向量化操作和apply
# 向量化操作(推荐)
df['A_squared'] = df['A'] ** 2
df['A_plus_B'] = df['A'] + df['B']
# apply(谨慎使用,性能差)
df['complex'] = df.apply(lambda row: row['A'] * row['B'] if row['A'] > 0 else 0, axis=1)
# 按列apply
df['mean_A_B'] = df[['A', 'B']].apply(np.mean, axis=1)
# 使用numpy向量化替代apply
df['vectorized'] = np.where(df['A'] > 0, df['A'] * df['B'], 0)
13.3 性能优化
def optimize_dataframe(df):
"""DataFrame性能优化"""
optimized = df.copy()
# 1. 数据类型优化
for col in optimized.select_dtypes(include=['int64']).columns:
col_min = optimized[col].min()
col_max = optimized[col].max()
if col_min >= np.iinfo(np.int8).min and col_max <= np.iinfo(np.int8).max:
optimized[col] = optimized[col].astype('int8')
# 2. 分类优化
for col in optimized.select_dtypes(include=['object']).columns:
if optimized[col].nunique() / len(optimized) < 0.5:
optimized[col] = optimized[col].astype('category')
# 3. 浮点优化
for col in optimized.select_dtypes(include=['float64']).columns:
optimized[col] = pd.to_numeric(optimized[col], downcast='float')
return optimized
# 内存对比
original_memory = df.memory_usage(deep=True).sum()
optimized_df = optimize_dataframe(df)
optimized_memory = optimized_df.memory_usage(deep=True).sum()
print(f"内存节省: {(1 - optimized_memory/original_memory)*100:.1f}%")
14. 可视化和绘图
14.1 基本绘图
import matplotlib.pyplot as plt
# 线图
df.plot(x='date', y='value', figsize=(10, 6))
# 散点图
df.plot.scatter(x='A', y='B', c='C', colormap='viridis')
# 直方图
df['A'].hist(bins=20)
# 箱线图
df.boxplot(column='value', by='category')
# 相关性热力图
import seaborn as sns
plt.figure(figsize=(8, 6))
sns.heatmap(df.corr(), annot=True, cmap='coolwarm')
plt.title('相关性热力图')
plt.show()
14.2 高级可视化
# 配对图
pd.plotting.scatter_matrix(df[['A', 'B', 'C']], figsize=(10, 10))
# 时间序列分解(需要statsmodels)
from statsmodels.tsa.seasonal import seasonal_decompose
decomp = seasonal_decompose(df_time['value'], model='additive', period=7)
decomp.plot()
plt.show()
# 自定义绘图
fig, axes = plt.subplots(2, 2, figsize=(12, 8))
for i, col in enumerate(df.columns):
df[col].plot(ax=axes[i//2, i%2], title=f'{col} 时间序列')
plt.tight_layout()
plt.show()
15. 最佳实践和错误处理
15.1 性能最佳实践
# 1. 使用向量化操作
df['result'] = df['A'] * df['B'] # 而不是apply
# 2. 选择性列读取
df_subset = pd.read_csv('large.csv', usecols=['important_col1', 'important_col2'])
# 3. 使用categorical类型
df['category'] = df['category'].astype('category')
# 4. 索引优化
df.set_index('date', inplace=True) # 时间索引
df.sort_index(inplace=True)
# 5. 分块处理
def process_chunks(filepath):
return pd.concat([process_chunk(chunk) for chunk in
pd.read_csv(filepath, chunksize=10000)], ignore_index=True)
15.2 错误处理
def safe_dataframe_operation(df, operation):
"""安全的DataFrame操作"""
try:
if operation == 'groupby':
return df.groupby('key').mean()
elif operation == 'merge':
return pd.merge(df, other_df, on='key', how='left')
except KeyError as e:
print(f"列不存在: {e}")
return None
except ValueError as e:
print(f"值错误: {e}")
return None
except Exception as e:
print(f"未知错误: {e}")
return None
# 使用
result = safe_dataframe_operation(df, 'groupby')
15.3 生产环境检查
def validate_dataframe(df, required_cols=None, check_memory=True):
"""DataFrame验证"""
issues = []
# 形状检查
if len(df) == 0:
issues.append("DataFrame为空")
# 列检查
if required_cols:
missing_cols = set(required_cols) - set(df.columns)
if missing_cols:
issues.append(f"缺少列: {missing_cols}")
# 缺失值检查
high_missing = df.isnull().mean()[df.isnull().mean() > 0.5]
if len(high_missing) > 0:
issues.append(f"高缺失率列: {high_missing.index.tolist()}")
# 内存检查
if check_memory:
memory_mb = df.memory_usage(deep=True).sum() / 1024**2
if memory_mb > 1000: # 1GB阈值
issues.append(f"高内存使用: {memory_mb:.1f}MB")
return issues if issues else ["验证通过"]
# 使用
issues = validate_dataframe(df, required_cols=['A', 'B'])
print("验证结果:", issues)
Pandas DataFrame 是二维数据处理的核心结构,提供了强大的索引、合并、分组、统计和可视化功能。掌握向量化操作、数据类型优化和内存管理是提升性能的关键。在生产环境中,注重错误处理、数据验证和性能监控至关重要。