|

Pandas DataFrame API 手册

Pandas DataFrame API 完整手册

1. DataFrame 创建

1.1 基本创建方法

import pandas as pd
import numpy as np
from datetime import datetime

# 从字典创建(最常用)
data = {
    'A': [1, 2, 3, 4],
    'B': [10, 20, 30, 40],
    'C': ['a', 'b', 'c', 'd']
}
df1 = pd.DataFrame(data)

# 指定索引
df2 = pd.DataFrame(data, index=['row1', 'row2', 'row3', 'row4'])

# 从列表创建
df3 = pd.DataFrame([[1, 'a'], [2, 'b'], [3, 'c']], 
                   columns=['num', 'char'])

# 从Series创建
s1 = pd.Series([1, 2, 3], name='A')
s2 = pd.Series([10, 20, 30], name='B')
df4 = pd.DataFrame({'A': s1, 'B': s2})

# 从NumPy数组
arr = np.random.randn(4, 3)
df5 = pd.DataFrame(arr, columns=['X', 'Y', 'Z'])

# 空DataFrame
df_empty = pd.DataFrame()

print("DataFrame示例:")
print(df1)

1.2 高级创建

# 指定数据类型
df_typed = pd.DataFrame({
    'int_col': pd.Series([1, 2, 3], dtype='int32'),
    'float_col': pd.Series([1.1, 2.2, 3.3], dtype='float32'),
    'cat_col': pd.Series(['A', 'B', 'A'], dtype='category')
})

# 使用pd.date_range创建时间序列
dates = pd.date_range('2023-01-01', periods=5, freq='D')
df_time = pd.DataFrame({
    'date': dates,
    'value': np.random.randn(5)
}, index=dates)

# 多级列索引
arrays = [['A', 'A', 'B'], ['foo', 'bar', 'baz']]
tuples = list(zip(*arrays))
multi_cols = pd.MultiIndex.from_tuples(tuples, names=['first', 'second'])
df_multi = pd.DataFrame(np.random.randn(4, 3), columns=multi_cols)

# 从记录列表创建
records = [{'A': 1, 'B': 10}, {'A': 2, 'B': 20}]
df_records = pd.DataFrame(records)

print("多级列示例:")
print(df_multi)

2. 属性和基本信息

2.1 基本属性

df = pd.DataFrame(np.random.randn(5, 3), columns=['A', 'B', 'C'])

print("基本属性:")
print("形状:", df.shape)
print("列数:", df.shape[1])
print("行数:", df.shape[0])
print("列名:", df.columns)
print("索引:", df.index)
print("数据类型:", df.dtypes)
print("值:", df.values)
print("转置:", df.T.shape)
print("是否为空:", df.empty)
print("NDim:", df.ndim)

2.2 内存和信息

# 内存使用
print("内存使用:")
print("单列:", df.memory_usage(deep=True))
print("总计:", df.memory_usage(deep=True).sum())
print("索引内存:", df.memory_usage(index=True, deep=True))

# 详细信息
print("\nDataFrame信息:")
print(df.info(memory_usage='deep'))

# 描述性统计
print("\n描述性统计:")
print(df.describe())

# 缺失值统计
print("\n缺失值:")
print(df.isnull().sum())

# 精确统计
print("\n精确信息:")
print(df.info(verbose=True, memory_usage='deep'))

3. 索引和选择

3.1 列选择

df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6], 'C': [7, 8, 9]})

# 单个列(返回Series)
print("列A:", df['A'])

# 多个列
print("多列:", df[['A', 'C']])

# 新列添加
df['D'] = df['A'] + df['B']

# 删除列
df_drop = df.drop('D', axis=1)

# 列存在检查
print("列A存在:", 'A' in df.columns)

3.2 行选择(loc/iloc)

# loc(标签索引)
print("loc单行:", df.loc[0])
print("loc多行:", df.loc[[0, 2]])
print("loc切片:", df.loc[0:2])
print("loc条件:", df.loc[df['A'] > 1])

# iloc(位置索引)
print("iloc单行:", df.iloc[0])
print("iloc多行:", df.iloc[[0, 2]])
print("iloc切片:", df.iloc[0:2])

# 混合选择
print("loc指定列:", df.loc[0, ['A', 'B']])
print("iloc指定列:", df.iloc[0, [0, 1]])

# 布尔索引
mask = df['A'] > 1
print("布尔索引:", df[mask])

3.3 at/iat(单元素快速访问)

# 快速单元素访问
df.at[0, 'A'] = 100
df.iat[1, 0] = 200

print("快速访问修改:")
print(df)

# 查询
print("at查询:", df.at[0, 'A'])
print("iat查询:", df.iat[0, 0])

3.4 多级索引操作

# 创建多级索引DataFrame
index = pd.MultiIndex.from_product([['A', 'B'], [1, 2]], 
                                  names=['level1', 'level2'])
df_multi_idx = pd.DataFrame(np.random.randn(4, 2), 
                           index=index, 
                           columns=['X', 'Y'])

print("多级索引:")
print(df_multi_idx)

# 多级选择
print("第一级A:", df_multi_idx.loc['A'])
print("具体组合:", df_multi_idx.loc[('A', 1)])
print("部分选择:", df_multi_idx.xs(1, level='level2'))

4. 数据操作

4.1 添加和删除

df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]})

# 添加列
df['C'] = df['A'] * 2
df.insert(1, 'new_col', [10, 20, 30])  # 指定位置插入

# 添加行
new_row = pd.DataFrame({'A': [4], 'B': [7], 'C': [8]})
df = pd.concat([df, new_row], ignore_index=True)

# 删除
df_dropped = df.drop(0)  # 删除行
df_cols_dropped = df.drop(['C'], axis=1)  # 删除列

# pop方法(返回删除的列)
col_popped = df.pop('A')
print("弹出列:", col_popped)

4.2 赋值和修改

# 直接赋值
df.loc[0, 'A'] = 999
df.iloc[1, 1] = 888

# 批量赋值
df.loc[df['A'] > 2, 'B'] = 0

# 使用update(原地更新)
other_df = pd.DataFrame({'A': [100, 200, 300]}, index=df.index)
df.update(other_df)

# 条件赋值(np.where)
df['new'] = np.where(df['A'] > 1, 'High', 'Low')

4.3 排序

df = pd.DataFrame({
    'A': [3, 1, 2],
    'B': [2, 3, 1],
    'C': ['c', 'a', 'b']
}, index=[2, 0, 1])

# 按列排序
df_sorted_val = df.sort_values('A')

# 按索引排序
df_sorted_idx = df.sort_index()

# 多列排序
df_multi_sort = df.sort_values(['A', 'B'])

# 按列名排序
df_col_sort = df.sort_index(axis=1)

# 稳定排序
df_stable = df.sort_values('A', kind='stable')

print("排序结果:")
print(df_sorted_val)

5. 缺失值处理

5.1 检测和统计

df = pd.DataFrame({
    'A': [1, np.nan, 3],
    'B': [4, 5, np.nan],
    'C': [np.nan, np.nan, 9]
})

print("缺失值检测:")
print("总缺失:", df.isnull().sum().sum())
print("每列缺失:", df.isnull().sum())
print("缺失比例:", df.isnull().mean())
print("非缺失计数:", df.notnull().sum())

5.2 填充方法

# 简单填充
df_filled = df.fillna(0)

# 前向/后向填充
df_ffill = df.fillna(method='ffill')
df_bfill = df.fillna(method='bfill')

# 按列填充
df_col_fill = df.fillna({'A': df['A'].mean(), 'B': 0})

# 插值
df_interp = df.interpolate(method='linear')

# 组合填充
df_combined = df.fillna(method='ffill').fillna(method='bfill')

print("填充结果:")
print(df_filled)

5.3 删除缺失值

# 删除含NaN的行
df_dropna = df.dropna()

# 按阈值删除
df_thresh = df.dropna(thresh=2)  # 至少2个非NaN值

# 按列删除
df_drop_col = df.dropna(axis=1, how='all')  # 删除全为NaN的列

# 特定列删除
df_subset = df.dropna(subset=['A', 'B'])

6. 合并和连接

6.1 连接(concat)

df1 = pd.DataFrame({'A': [1, 2], 'B': [3, 4]})
df2 = pd.DataFrame({'A': [5, 6], 'C': [7, 8]})

# 行连接
df_concat_row = pd.concat([df1, df2], axis=0, ignore_index=True)

# 列连接
df_concat_col = pd.concat([df1, df2], axis=1)

# 内部连接(只保留共同列)
df_inner = pd.concat([df1, df2], axis=0, join='inner')

# 外部连接
df_outer = pd.concat([df1, df2], axis=0, join='outer')

print("连接结果:")
print(df_concat_row)

6.2 合并(merge)

# 示例数据
left = pd.DataFrame({'key': ['K0', 'K1', 'K2'], 'A': [1, 2, 3]})
right = pd.DataFrame({'key': ['K0', 'K1', 'K3'], 'B': [4, 5, 6]})

# 内连接
merged_inner = pd.merge(left, right, on='key', how='inner')

# 左连接
merged_left = pd.merge(left, right, on='key', how='left')

# 外连接
merged_outer = pd.merge(left, right, on='key', how='outer')

# 多键合并
df1 = pd.DataFrame({'A': ['foo', 'bar'], 'B': [1, 2]})
df2 = pd.DataFrame({'A': ['foo', 'baz'], 'C': [3, 4]})
merged_multi = pd.merge(df1, df2, on='A')

# 索引合并
merged_index = pd.merge(left, right, left_index=True, right_index=True)

print("合并结果:")
print(merged_inner)

6.3 连接(join)

# 基于索引连接
df1 = pd.DataFrame({'A': [1, 2]}, index=['a', 'b'])
df2 = pd.DataFrame({'B': [3, 4]}, index=['a', 'c'])

# 默认左连接
joined = df1.join(df2)

# 内连接
joined_inner = df1.join(df2, how='inner')

# 多DataFrame连接
result = df1.join([df2, df3], how='outer')

7. 分组操作(groupby)

7.1 基本分组

df = pd.DataFrame({
    'group': ['A', 'A', 'B', 'B', 'A'],
    'value': [1, 2, 3, 4, 5],
    'category': ['X', 'Y', 'X', 'Y', 'X']
})

# 按单列分组
grouped = df.groupby('group')
print("组名:", grouped.groups.keys())
print("A组:", grouped.get_group('A'))

# 迭代分组
for name, group in grouped:
    print(f"{name}: {len(group)} 行")

# 聚合
print("组统计:")
print(grouped['value'].agg(['mean', 'sum', 'count']))

7.2 多级分组

# 多列分组
multi_group = df.groupby(['group', 'category'])['value'].mean()
print("多级分组:")
print(multi_group)

# 自定义聚合
def custom_agg(x):
    return {'min': x.min(), 'max': x.max(), 'range': x.max() - x.min()}

result = df.groupby('group').agg(custom_agg)
print("自定义聚合:")
print(result)

7.3 转换和过滤

# transform(保持形状)
df['mean_group'] = df.groupby('group')['value'].transform('mean')

# apply(灵活但慢)
def custom_func(group):
    return group['value'].rank(pct=True)

df['rank'] = df.groupby('group').apply(custom_func)

# filter(筛选组)
filtered = df.groupby('group').filter(lambda x: len(x) > 1)

8. 重塑和透视

8.1 透视表(pivot)

df_pivot_data = pd.DataFrame({
    'date': ['2023-01', '2023-01', '2023-02', '2023-02'],
    'category': ['A', 'B', 'A', 'B'],
    'value': [1, 2, 3, 4]
})

# 简单透视
pivot_simple = df_pivot_data.pivot(index='date', columns='category', values='value')

# 多值透视
pivot_multi = df_pivot_data.pivot(index='date', columns='category')

8.2 透视表(pivot_table)

# 带聚合的透视表
pivot_table = pd.pivot_table(df_pivot_data, 
                           values='value', 
                           index='date', 
                           columns='category', 
                           aggfunc='sum', 
                           fill_value=0,
                           margins=True)  # 添加总计

print("透视表:")
print(pivot_table)

8.3 熔融(melt)

# 宽格式→长格式
wide_df = pd.DataFrame({
    'date': ['2023-01', '2023-02'],
    'A': [1, 3],
    'B': [2, 4]
})

long_df = pd.melt(wide_df, id_vars=['date'], 
                  value_vars=['A', 'B'],
                  var_name='category', 
                  value_name='value')

print("熔融结果:")
print(long_df)

8.4 堆叠和展开(stack/unstack)

# 创建多级列
arrays = [['A', 'A', 'B'], ['one', 'two', 'one']]
multi_cols = pd.MultiIndex.from_arrays(arrays)
df_multi = pd.DataFrame(np.random.randn(4, 3), columns=multi_cols)

# stack(列→行)
stacked = df_multi.stack()
print("堆叠:")
print(stacked)

# unstack(行→列)
unstacked = stacked.unstack()

9. 统计和聚合

9.1 描述性统计

df = pd.DataFrame(np.random.randn(5, 3), columns=['A', 'B', 'C'])

print("基本统计:")
print(df.mean())
print(df.std())
print(df.describe())

# 分位数
print("分位数:", df.quantile([0.25, 0.5, 0.75]))

# 相关性
print("相关系数:", df.corr())
print("协方差:", df.cov())

9.2 聚合函数

# 基本聚合
print("聚合:")
print(df.agg(['sum', 'mean', 'std']))

# 列特定聚合
agg_dict = {'A': ['sum', 'mean'], 'B': 'std', 'C': 'count'}
print("自定义聚合:")
print(df.agg(agg_dict))

# 命名聚合
def custom_agg(df):
    return df.mean() * 2

print("命名聚合:")
print(df.agg(custom_agg, axis=1))

9.3 滚动和扩展统计

# 滚动窗口
rolling = df.rolling(window=3)
print("滚动统计:")
print(rolling.mean())
print(rolling.agg(['min', 'max', 'mean']))

# 扩展窗口
expanding = df.expanding()
print("扩展统计:")
print(expanding.mean())

# 指数加权
ewm = df.ewm(span=3)
print("指数加权:")
print(ewm.mean())

10. 字符串和正则操作

10.1 str访问器

df_str = pd.DataFrame({
    'text': ['Apple pie', 'Banana split', 'Cherry tart'],
    'code': ['A001', 'B002', 'C003']
})

# 基本字符串操作
df_str['length'] = df_str['text'].str.len()
df_str['upper'] = df_str['text'].str.upper()
df_str['contains_a'] = df_str['text'].str.contains('a')

# 正则表达式
df_str['first_letter'] = df_str['text'].str.extract(r'(\w)')
df_str['code_num'] = df_str['code'].str.extract(r'(\d+)').astype(int)

# 分割
df_str[['fruit', 'dessert']] = df_str['text'].str.split(' ', expand=True)

print("字符串操作结果:")
print(df_str)

10.2 批量字符串处理

# 高级正则
patterns = {
    'email': r'[\w\.-]+@[\w\.-]+',
    'phone': r'\d{3}-\d{3}-\d{4}'
}

# 查找所有匹配
df_str['emails'] = df_str['text'].str.findall(r'[\w\.-]+@[\w\.-]+')

# 替换
df_str['clean_text'] = df_str['text'].str.replace(r'[^\w\s]', '', regex=True)

# 条件替换
df_str['normalized'] = df_str['text'].str.replace(r'\s+', ' ', regex=True).str.strip()

11. 时间序列操作

11.1 创建和索引

# 时间索引
dates = pd.date_range('2023-01-01', periods=10, freq='D')
df_time = pd.DataFrame({
    'value': np.random.randn(10),
    'category': np.random.choice(['A', 'B'], 10)
}, index=dates)

# 设置时间索引
df_time.index = pd.to_datetime(df_time.index)
df_time.index.name = 'date'

print("时间序列:")
print(df_time.head())

11.2 时间属性

# 提取时间组件
df_time['year'] = df_time.index.year
df_time['month'] = df_time.index.month
df_time['day'] = df_time.index.day
df_time['weekday'] = df_time.index.day_name()

# 时间切片
jan_data = df_time['2023-01']
first_week = df_time['2023-01-01':'2023-01-07']

# 条件筛选
weekends = df_time[df_time.index.dayofweek.isin([5, 6])]

11.3 重采样和频率转换

# 重采样
daily_to_monthly = df_time['value'].resample('M').mean()
daily_to_weekly = df_time.resample('W').agg({
    'value': 'mean',
    'category': lambda x: x.mode()[0] if len(x) > 0 else np.nan
})

# 降采样(聚合)
downsample = df_time.resample('W').agg(['mean', 'std'])

# 上采样(插值)
upsample = df_time.resample('6H').ffill()  # 前向填充

# 自定义重采样
def custom_resample(x):
    return x.ewm(span=3).mean().iloc[-1]

weekly_custom = df_time['value'].resample('W').apply(custom_resample)

12. 输入输出操作

12.1 CSV操作

# 读取
df_csv = pd.read_csv('data.csv', 
                    index_col=0, 
                    parse_dates=True,
                    dtype={'id': 'int32'},
                    chunksize=1000)

# 写入
df.to_csv('output.csv', 
          index=True,
          compression='gzip',
          date_format='%Y-%m-%d')

# 分块写入
for i, chunk in enumerate(np.array_split(df, 10)):
    chunk.to_csv(f'chunk_{i}.csv', index=False)

12.2 Excel操作

# 多工作表写入
with pd.ExcelWriter('output.xlsx', engine='openpyxl') as writer:
    df.to_excel(writer, sheet_name='Data', index=False)
    df.describe().to_excel(writer, sheet_name='Stats')

# 读取多工作表
excel_file = pd.ExcelFile('data.xlsx')
sheets = {name: pd.read_excel(excel_file, name) for name in excel_file.sheet_names}

12.3 高效格式(Parquet)

# Parquet(推荐大数据)
df.to_parquet('data.parquet', 
              compression='snappy',
              index=True,
              partition_cols=['category'])

# 读取分区数据
df_part = pd.read_parquet('data.parquet', 
                         filters=[('category', '=', 'A')],
                         columns=['value', 'date'])

13. 高级功能

13.1 窗口函数

# 滚动窗口
df['rolling_mean'] = df['value'].rolling(window=3).mean()
df['rolling_std'] = df['value'].rolling(3, min_periods=1).std()

# 自定义窗口函数
df['rolling_range'] = df['value'].rolling(3).apply(
    lambda x: x.max() - x.min(), raw=True
)

# 条件窗口
df['expanding_mean'] = df['value'].expanding().mean()

# 分组窗口
df['group_rolling'] = df.groupby('category')['value'].rolling(3).mean().reset_index(0, drop=True)

13.2 向量化操作和apply

# 向量化操作(推荐)
df['A_squared'] = df['A'] ** 2
df['A_plus_B'] = df['A'] + df['B']

# apply(谨慎使用,性能差)
df['complex'] = df.apply(lambda row: row['A'] * row['B'] if row['A'] > 0 else 0, axis=1)

# 按列apply
df['mean_A_B'] = df[['A', 'B']].apply(np.mean, axis=1)

# 使用numpy向量化替代apply
df['vectorized'] = np.where(df['A'] > 0, df['A'] * df['B'], 0)

13.3 性能优化

def optimize_dataframe(df):
    """DataFrame性能优化"""
    optimized = df.copy()

    # 1. 数据类型优化
    for col in optimized.select_dtypes(include=['int64']).columns:
        col_min = optimized[col].min()
        col_max = optimized[col].max()
        if col_min >= np.iinfo(np.int8).min and col_max <= np.iinfo(np.int8).max:
            optimized[col] = optimized[col].astype('int8')

    # 2. 分类优化
    for col in optimized.select_dtypes(include=['object']).columns:
        if optimized[col].nunique() / len(optimized) < 0.5:
            optimized[col] = optimized[col].astype('category')

    # 3. 浮点优化
    for col in optimized.select_dtypes(include=['float64']).columns:
        optimized[col] = pd.to_numeric(optimized[col], downcast='float')

    return optimized

# 内存对比
original_memory = df.memory_usage(deep=True).sum()
optimized_df = optimize_dataframe(df)
optimized_memory = optimized_df.memory_usage(deep=True).sum()
print(f"内存节省: {(1 - optimized_memory/original_memory)*100:.1f}%")

14. 可视化和绘图

14.1 基本绘图

import matplotlib.pyplot as plt

# 线图
df.plot(x='date', y='value', figsize=(10, 6))

# 散点图
df.plot.scatter(x='A', y='B', c='C', colormap='viridis')

# 直方图
df['A'].hist(bins=20)

# 箱线图
df.boxplot(column='value', by='category')

# 相关性热力图
import seaborn as sns
plt.figure(figsize=(8, 6))
sns.heatmap(df.corr(), annot=True, cmap='coolwarm')
plt.title('相关性热力图')
plt.show()

14.2 高级可视化

# 配对图
pd.plotting.scatter_matrix(df[['A', 'B', 'C']], figsize=(10, 10))

# 时间序列分解(需要statsmodels)
from statsmodels.tsa.seasonal import seasonal_decompose
decomp = seasonal_decompose(df_time['value'], model='additive', period=7)
decomp.plot()
plt.show()

# 自定义绘图
fig, axes = plt.subplots(2, 2, figsize=(12, 8))
for i, col in enumerate(df.columns):
    df[col].plot(ax=axes[i//2, i%2], title=f'{col} 时间序列')
plt.tight_layout()
plt.show()

15. 最佳实践和错误处理

15.1 性能最佳实践

# 1. 使用向量化操作
df['result'] = df['A'] * df['B']  # 而不是apply

# 2. 选择性列读取
df_subset = pd.read_csv('large.csv', usecols=['important_col1', 'important_col2'])

# 3. 使用categorical类型
df['category'] = df['category'].astype('category')

# 4. 索引优化
df.set_index('date', inplace=True)  # 时间索引
df.sort_index(inplace=True)

# 5. 分块处理
def process_chunks(filepath):
    return pd.concat([process_chunk(chunk) for chunk in 
                     pd.read_csv(filepath, chunksize=10000)], ignore_index=True)

15.2 错误处理

def safe_dataframe_operation(df, operation):
    """安全的DataFrame操作"""
    try:
        if operation == 'groupby':
            return df.groupby('key').mean()
        elif operation == 'merge':
            return pd.merge(df, other_df, on='key', how='left')
    except KeyError as e:
        print(f"列不存在: {e}")
        return None
    except ValueError as e:
        print(f"值错误: {e}")
        return None
    except Exception as e:
        print(f"未知错误: {e}")
        return None

# 使用
result = safe_dataframe_operation(df, 'groupby')

15.3 生产环境检查

def validate_dataframe(df, required_cols=None, check_memory=True):
    """DataFrame验证"""
    issues = []

    # 形状检查
    if len(df) == 0:
        issues.append("DataFrame为空")

    # 列检查
    if required_cols:
        missing_cols = set(required_cols) - set(df.columns)
        if missing_cols:
            issues.append(f"缺少列: {missing_cols}")

    # 缺失值检查
    high_missing = df.isnull().mean()[df.isnull().mean() > 0.5]
    if len(high_missing) > 0:
        issues.append(f"高缺失率列: {high_missing.index.tolist()}")

    # 内存检查
    if check_memory:
        memory_mb = df.memory_usage(deep=True).sum() / 1024**2
        if memory_mb > 1000:  # 1GB阈值
            issues.append(f"高内存使用: {memory_mb:.1f}MB")

    return issues if issues else ["验证通过"]

# 使用
issues = validate_dataframe(df, required_cols=['A', 'B'])
print("验证结果:", issues)

Pandas DataFrame 是二维数据处理的核心结构,提供了强大的索引、合并、分组、统计和可视化功能。掌握向量化操作、数据类型优化和内存管理是提升性能的关键。在生产环境中,注重错误处理、数据验证和性能监控至关重要。

类似文章

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注