Pandas 数据排序与聚合
Pandas 数据排序与聚合详解
1. 数据排序(Sorting)
1.1 基本排序函数
1.1.1 按列值排序
import pandas as pd
import numpy as np
# 创建示例数据
np.random.seed(42)
df = pd.DataFrame({
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eva'],
'age': [25, 30, 35, 28, 32],
'salary': [50000, 60000, 55000, 70000, 65000],
'department': ['IT', 'HR', 'IT', 'Finance', 'HR'],
'performance': ['Good', 'Excellent', 'Average', 'Excellent', 'Good']
})
print("原始数据:")
print(df)
# 按单列升序排序
df_sorted_age = df.sort_values('age')
print("\n按年龄升序:")
print(df_sorted_age)
# 按单列降序排序
df_sorted_salary = df.sort_values('salary', ascending=False)
print("\n按薪资降序:")
print(df_sorted_salary)
# 多列排序
df_multi_sort = df.sort_values(['department', 'salary'], ascending=[True, False])
print("\n按部门升序,薪资降序:")
print(df_multi_sort)
# 混合排序(部分升序部分降序)
df_mixed = df.sort_values(['age', 'salary'], ascending=[True, False])
print("\n年龄升序,薪资降序:")
print(df_mixed)
1.1.2 按索引排序
# 创建带乱序索引的数据
df乱序 = df.iloc[[3, 1, 4, 0, 2]].copy()
df乱序.index = [10, 5, 15, 0, 20]
print("乱序索引数据:")
print(df乱序)
# 按索引排序
df_index_sorted = df乱序.sort_index(ascending=True)
print("\n按索引升序:")
print(df_index_sorted)
# 按索引降序
df_index_desc = df乱序.sort_index(ascending=False)
print("\n按索引降序:")
print(df_index_desc)
# 按列索引排序
df_col_sorted = df.sort_index(axis=1)
print("\n按列名排序:")
print(df_col_sorted)
1.1.3 稳定排序和自定义排序
# 稳定排序(保持相对顺序)
df_with_time = df.copy()
df_with_time['process_time'] = pd.to_datetime(['2023-01-01 10:00',
'2023-01-01 09:30',
'2023-01-01 11:00',
'2023-01-01 09:45',
'2023-01-01 10:15'])
# 先按处理时间排序(稳定),再按薪资
df_stable = df_with_time.sort_values('process_time', kind='stable').sort_values('salary', ascending=False)
print("稳定排序结果:")
print(df_stable[['name', 'salary', 'process_time']])
# 自定义排序键
def custom_performance_sort(x):
"""自定义绩效排序: Excellent > Good > Average"""
order = {'Excellent': 3, 'Good': 2, 'Average': 1}
return x.map(order)
df_custom = df.sort_values('performance', key=custom_performance_sort, ascending=False)
print("\n自定义绩效排序:")
print(df_custom)
1.2 高级排序技巧
1.2.1 多级排序和分位排序
# 创建更复杂的数据
sales_df = pd.DataFrame({
'region': np.random.choice(['North', 'South', 'East', 'West'], 20),
'product': np.random.choice(['A', 'B', 'C'], 20),
'sales': np.random.randint(100, 1000, 20),
'quarter': np.random.choice(['Q1', 'Q2', 'Q3', 'Q4'], 20)
})
# 多级排序:地区→季度→销售额(降序)
sales_sorted = sales_df.sort_values(['region', 'quarter', 'sales'],
ascending=[True, True, False])
print("多级销售排序:")
print(sales_sorted.head(10))
# 分组内排序(按组内排名)
sales_df['sales_rank'] = sales_df.groupby(['region', 'product'])['sales'].rank(ascending=False)
print("\n按地区和产品分组内排名:")
print(sales_df[sales_df['sales_rank'] <= 2]) # 每个组前2名
1.2.2 条件排序
# 条件排序:销售额降序,但只考虑特定地区
def conditional_sort(df, condition_col, sort_col, condition_values):
"""条件排序函数"""
mask = df[condition_col].isin(condition_values)
result = df[mask].sort_values(sort_col, ascending=False)
return result
high_sales_north = conditional_sort(sales_df, 'region', 'sales', ['North'])
print("北方地区高销售额:")
print(high_sales_north.head())
# 使用query结合排序
top_per_region = (sales_df
.query("sales > 500")
.sort_values(['region', 'sales'], ascending=[True, False])
.groupby('region')
.head(2))
print("\n各地区销售额前2:")
print(top_per_region)
2. 数据聚合(Aggregation)
2.1 基本聚合函数
2.1.1 单列聚合
print("单列聚合:")
print(f"平均薪资: {df['salary'].mean():.2f}")
print(f"最大薪资: {df['salary'].max()}")
print(f"最小薪资: {df['salary'].min()}")
print(f"总薪资: {df['salary'].sum()}")
print(f"薪资标准差: {df['salary'].std():.2f}")
print(f"非空计数: {df['salary'].count()}")
print(f"唯一值数量: {df['salary'].nunique()}")
2.1.2 多列聚合
# DataFrame级别聚合
print("多列聚合:")
agg_stats = df[['age', 'salary']].agg(['min', 'max', 'mean', 'std'])
print(agg_stats)
# 不同列使用不同函数
custom_agg = df.agg({
'age': ['min', 'max', 'mean'],
'salary': ['sum', 'mean', 'std'],
'department': 'count'
})
print("\n自定义聚合:")
print(custom_agg)
2.1.3 命名聚合结果
# 使用命名聚合(推荐)
named_agg = df.agg(
min_age=('age', 'min'),
max_age=('age', 'max'),
avg_salary=('salary', 'mean'),
total_salary=('salary', 'sum'),
employee_count=('name', 'count')
)
print("命名聚合结果:")
print(named_agg)
2.2 分组聚合(GroupBy)
2.2.1 基本分组聚合
# 按部门分组
dept_group = df.groupby('department')
print("按部门聚合:")
dept_stats = dept_group.agg({
'age': ['mean', 'count'],
'salary': ['mean', 'sum', 'std']
})
print(dept_stats)
# 扁平化列名
dept_stats.columns = ['_'.join(col).strip() for col in dept_stats.columns]
print("\n扁平化列名:")
print(dept_stats)
2.2.2 多级分组
# 按多个列分组
multi_group = df.groupby(['department', 'performance'])
multi_stats = multi_group.agg({
'age': 'mean',
'salary': 'mean'
}).round(2)
print("多级分组聚合:")
print(multi_stats)
# 重置索引查看结果
print("\n重置索引:")
print(multi_stats.reset_index())
2.2.3 自定义聚合函数
def salary_range(series):
"""计算薪资范围"""
return series.max() - series.min()
def top_performer_ratio(group):
"""计算优秀员工比例"""
excellent_count = (group['performance'] == 'Excellent').sum()
total_count = len(group)
return excellent_count / total_count if total_count > 0 else 0
custom_group_agg = df.groupby('department').agg({
'salary': [np.mean, np.median, salary_range],
'performance': top_performer_ratio,
'age': lambda x: x.quantile(0.75) - x.quantile(0.25) # IQR
})
print("自定义聚合函数:")
print(custom_group_agg)
2.3 高级聚合技巧
2.3.1 多函数聚合和过滤
# 使用filter筛选分组
def large_department(group):
"""只保留员工数>=2的部门"""
return len(group) >= 2
filtered_groups = df.groupby('department').filter(large_department)
print("大部门员工:")
print(filtered_groups)
# 使用apply进行复杂聚合
def department_summary(group):
"""部门详细汇总"""
return pd.Series({
'avg_age': group['age'].mean(),
'avg_salary': group['salary'].mean(),
'employee_count': len(group),
'senior_count': (group['age'] > 30).sum(),
'high_earners': (group['salary'] > group['salary'].median()).sum()
})
dept_summary = df.groupby('department').apply(department_summary)
print("\n部门详细汇总:")
print(dept_summary)
2.3.2 分位数和排名聚合
# 分位数聚合
quantile_agg = df.groupby('department').quantile([0.25, 0.5, 0.75])
print("分位数聚合:")
print(quantile_agg['salary'])
# 组内排名
df['salary_rank'] = df.groupby('department')['salary'].rank(ascending=False)
df['salary_percentile'] = df.groupby('department')['salary'].rank(pct=True)
print("\n带排名的员工数据:")
print(df[['name', 'department', 'salary', 'salary_rank', 'salary_percentile']].round(2))
2.3.3 条件聚合
# 按条件分组聚合
def performance_category(perf):
"""绩效分类"""
if perf == 'Excellent':
return 'High'
elif perf == 'Good':
return 'Medium'
else:
return 'Low'
df['perf_cat'] = df['performance'].apply(performance_category)
perf_agg = df.groupby('perf_cat').agg({
'salary': ['count', 'mean', 'sum'],
'age': 'mean'
})
perf_agg.columns = ['_'.join(col).strip() for col in perf_agg.columns]
print("按绩效分类聚合:")
print(perf_agg)
# 交叉表聚合
crosstab = pd.crosstab(df['department'], df['performance'],
values=df['salary'], aggfunc='mean')
print("\n部门-绩效薪资交叉表:")
print(crosstab)
3. 透视表与交叉表
3.1 Pivot Table
# 创建销售数据用于透视
sales_data = pd.DataFrame({
'date': pd.date_range('2023-01-01', periods=12, freq='M'),
'region': np.random.choice(['North', 'South', 'East', 'West'], 12),
'product': np.random.choice(['A', 'B', 'C'], 12),
'sales': np.random.randint(1000, 10000, 12),
'profit': np.random.randint(100, 1000, 12)
})
# 基本透视表
pivot_basic = sales_data.pivot_table(values='sales',
index='region',
columns='product',
aggfunc='sum',
fill_value=0)
print("产品销售透视表:")
print(pivot_basic)
# 多级透视表
pivot_multi = sales_data.pivot_table(values='sales',
index=['region'],
columns=['product'],
aggfunc=['sum', 'mean'],
fill_value=0,
margins=True) # 添加总计
print("\n多级透视表:")
print(pivot_multi)
3.2 高级透视操作
# 不同聚合函数
pivot_diff_agg = sales_data.pivot_table(values='sales',
index='region',
columns='product',
aggfunc={'A': 'sum', 'B': 'mean', 'C': 'count'},
fill_value=0)
print("不同聚合函数:")
print(pivot_diff_agg)
# 多值聚合
pivot_values = sales_data.pivot_table(values=['sales', 'profit'],
index='region',
columns='product',
aggfunc='sum',
fill_value=0)
print("\n多值透视表:")
print(pivot_values)
# 按时间分组的透视
sales_data['quarter'] = sales_data['date'].dt.quarter
quarter_pivot = sales_data.pivot_table(values='sales',
index='region',
columns='quarter',
aggfunc='sum',
fill_value=0)
print("\n季度销售透视:")
print(quarter_pivot)
3.3 交叉表(Crosstab)
# 频数交叉表
crosstab_freq = pd.crosstab(sales_data['region'], sales_data['product'])
print("地区-产品频数表:")
print(crosstab_freq)
# 加权交叉表
crosstab_weighted = pd.crosstab(sales_data['region'],
sales_data['product'],
values=sales_data['sales'],
aggfunc='sum')
print("\n地区-产品销售汇总:")
print(crosstab_weighted)
# 归一化交叉表
crosstab_norm = pd.crosstab(sales_data['region'],
sales_data['product'],
values=sales_data['sales'],
aggfunc='sum',
normalize='index') # 按行归一化
print("\n按地区归一化的销售占比:")
print(crosstab_norm.round(3))
4. 聚合与排序结合
4.1 分组后排序
# 按部门平均薪资排序
dept_salary = df.groupby('department')['salary'].mean().sort_values(ascending=False)
print("部门平均薪资排名:")
print(dept_salary)
# 复杂分组聚合后排序
dept_detail = df.groupby('department').agg({
'salary': ['mean', 'count'],
'age': 'mean'
}).round(2)
# 多列排序:先按平均薪资降序,再按员工数升序
dept_sorted = dept_detail.sort_values(('salary', 'mean'), ascending=False)
print("\n部门详细统计(按平均薪资排序):")
print(dept_sorted)
4.2 Top N 聚合
def top_n_by_group(df, group_col, value_col, n=3):
"""按组获取Top N"""
return (df.groupby(group_col)[value_col]
.nlargest(n)
.reset_index(level=0, drop=True)
.reset_index())
# 各地区Top 3销售
top_sales = top_n_by_group(sales_data, 'region', 'sales', 2)
print("各地区Top 2销售:")
print(top_sales.sort_values(['region', 'sales'], ascending=[True, False]))
# 使用rank获取排名
sales_ranked = sales_data.copy()
sales_ranked['sales_rank'] = sales_ranked.groupby('region')['sales'].rank(ascending=False)
top_per_region = sales_ranked[sales_ranked['sales_rank'] <= 2]
print("\n使用rank的Top 2:")
print(top_per_region.sort_values(['region', 'sales'], ascending=[True, False]))
4.3 动态时间窗口聚合
# 时间序列聚合示例
sales_data['month'] = sales_data['date'].dt.month
monthly_sales = sales_data.groupby(['region', sales_data['date'].dt.to_period('M')])['sales'].sum()
# 最近3个月滚动聚合
def rolling_group_agg(df, group_col, value_col, window=3):
"""分组滚动聚合"""
result = df.groupby(group_col)[value_col].rolling(window=window, min_periods=1).sum()
return result
rolling_sales = rolling_group_agg(sales_data, 'region', 'sales', window=3)
print("滚动3个月销售汇总:")
print(rolling_sales.groupby('region').last()) # 显示每个组最新值
5. 性能优化与最佳实践
5.1 排序性能优化
# 大数据排序优化
large_df = pd.DataFrame(np.random.randint(0, 1000, size=(1000000, 5)),
columns=['A', 'B', 'C', 'D', 'E'])
# 1. 预先选择需要的列
subset_cols = large_df[['A', 'B']].sort_values('A')
# 2. 使用category类型加速分组排序
large_df['category'] = pd.Categorical(large_df['A'] % 10)
sorted_cat = large_df.sort_values(['category', 'B'])
# 3. 多进程排序(对于超大数据)
# 使用modin或dask(需要安装)
# import modin.pandas as pd
5.2 聚合性能优化
# 1. 使用原生numpy函数
def fast_mean(series):
return series.values.mean()
# 2. 避免apply,使用向量化操作
def optimized_groupby(df):
"""优化分组聚合"""
# 使用named aggregation
result = df.groupby('department').agg(
avg_salary=('salary', 'mean'),
count=('salary', 'count'),
std_salary=('salary', 'std')
)
return result
# 3. 预计算中间结果
precomputed = df.groupby('department').size() # 快速计算组大小
salary_stats = df.groupby('department')['salary'].agg(['mean', 'std'])
5.3 内存优化技巧
def optimize_sort_agg_pipeline(df):
"""优化排序聚合管道"""
# 1. 数据类型优化
for col in df.select_dtypes(include=['int64']).columns:
df[col] = pd.to_numeric(df[col], downcast='integer')
# 2. 分类变量优化
for col in df.select_dtypes(include=['object']):
if df[col].nunique() < len(df) * 0.1:
df[col] = df[col].astype('category')
# 3. 高效分组聚合
result = (df.groupby('category_col', sort=False) # 禁用排序加速
.agg({'numeric_col': ['mean', 'sum']})
.sort_values(('numeric_col', 'mean'), ascending=False))
return result
# 使用
optimized_result = optimize_sort_agg_pipeline(df)
6. 实际应用案例
6.1 销售绩效分析
# 完整销售分析管道
def sales_performance_analysis(sales_df):
"""销售绩效分析"""
# 1. 数据预处理
sales_df['date'] = pd.to_datetime(sales_df['date'])
sales_df['month'] = sales_df['date'].dt.month
sales_df['quarter'] = sales_df['date'].dt.quarter
# 2. 按销售代表排名
sales_df['sales_rank'] = sales_df.groupby(['region', 'month'])['sales'].rank(ascending=False)
# 3. 月度绩效
monthly_perf = sales_df.groupby(['region', 'month']).agg({
'sales': ['sum', 'mean', 'count'],
'profit': 'sum'
})
monthly_perf.columns = ['total_sales', 'avg_sales', 'deal_count', 'total_profit']
monthly_perf = monthly_perf.sort_values(['region', 'month'])
# 4. 顶级销售代表
top_performers = (sales_df[sales_df['sales_rank'] <= 3]
.groupby(['region', 'month'])
.agg({'sales': 'sum', 'profit': 'sum'})
.sort_values('sales', ascending=False))
# 5. 季度趋势
quarterly_trend = sales_df.groupby(['region', 'quarter'])['sales'].sum().unstack()
return {
'monthly_performance': monthly_perf,
'top_performers': top_performers,
'quarterly_trend': quarterly_trend
}
# 示例使用
analysis_results = sales_performance_analysis(sales_data)
print("月度绩效:")
print(analysis_results['monthly_performance'].head())
6.2 客户分群分析
# 客户RFM分析(最近消费、频次、金额)
customer_data = pd.DataFrame({
'customer_id': np.random.randint(1000, 2000, 1000),
'order_date': pd.date_range('2022-01-01', periods=1000, freq='D'),
'order_amount': np.random.exponential(100, 1000),
'region': np.random.choice(['North', 'South'], 1000)
})
customer_data['recency'] = (pd.Timestamp.now() - customer_data['order_date']).dt.days
rfm = customer_data.groupby('customer_id').agg({
'recency': 'min', # 最近一次消费
'order_date': 'count', # 消费频次
'order_amount': 'sum' # 消费金额
}).rename(columns={'order_date': 'frequency'})
# RFM分位数
rfm['R_score'] = pd.qcut(rfm['recency'], 5, labels=[5, 4, 3, 2, 1]) # 越小越好
rfm['F_score'] = pd.qcut(rfm['frequency'].rank(method='first'), 5, labels=[1, 2, 3, 4, 5])
rfm['M_score'] = pd.qcut(rfm['order_amount'], 5, labels=[1, 2, 3, 4, 5])
# RFM总分
rfm['RFM_score'] = rfm['R_score'].astype(int) * 100 + rfm['F_score'].astype(int) * 10 + rfm['M_score'].astype(int)
# 按RFM排序和分群
rfm_sorted = rfm.sort_values('RFM_score', ascending=False)
high_value_customers = rfm_sorted.head(100)
print("顶级客户RFM:")
print(high_value_customers.head())
6.3 动态报告生成
class DataAggregator:
"""数据聚合和排序管理器"""
def __init__(self, df):
self.df = df.copy()
def group_and_sort(self, group_cols, agg_dict, sort_col, ascending=True, top_n=None):
"""分组聚合并排序"""
# 分组聚合
grouped = self.df.groupby(group_cols).agg(agg_dict)
# 扁平化多级列名
if isinstance(grouped.columns, pd.MultiIndex):
grouped.columns = ['_'.join(col).strip() for col in grouped.columns]
# 排序
sorted_result = grouped.sort_values(sort_col, ascending=ascending)
# Top N
if top_n:
sorted_result = sorted_result.head(top_n)
return sorted_result.reset_index()
def rolling_analysis(self, group_col, value_col, window, min_periods=1):
"""滚动分析"""
return (self.df.groupby(group_col)[value_col]
.rolling(window=window, min_periods=min_periods)
.agg(['mean', 'sum'])
.droplevel(0)
.sort_index())
def rank_analysis(self, group_cols, value_col):
"""排名分析"""
self.df['rank'] = self.df.groupby(group_cols)[value_col].rank(ascending=False)
return self.df
# 使用示例
aggregator = DataAggregator(sales_data)
result = aggregator.group_and_sort(
group_cols=['region'],
agg_dict={'sales': ['sum', 'mean'], 'profit': 'sum'},
sort_col=('sales', 'sum'),
ascending=False,
top_n=3
)
print("聚合排序结果:")
print(result)
7. 可视化聚合结果
import matplotlib.pyplot as plt
import seaborn as sns
# 聚合结果可视化
dept_salary = df.groupby('department')['salary'].mean().sort_values(ascending=False)
plt.figure(figsize=(10, 6))
dept_salary.plot(kind='bar')
plt.title('各部门的平均薪资')
plt.ylabel('平均薪资')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
# 热力图展示透视表
pivot_sales = sales_data.pivot_table(values='sales', index='region', columns='product', aggfunc='sum')
plt.figure(figsize=(8, 6))
sns.heatmap(pivot_sales, annot=True, cmap='YlOrRd', fmt='d')
plt.title('地区-产品销售热力图')
plt.show()
# 堆叠柱状图
quarterly = sales_data.groupby(['quarter', 'region'])['sales'].sum().unstack()
quarterly.plot(kind='bar', stacked=True, figsize=(10, 6))
plt.title('季度地区销售分布')
plt.ylabel('销售额')
plt.legend(title='地区')
plt.tight_layout()
plt.show()
排序和聚合是Pandas数据分析的核心功能,熟练掌握这些技巧可以高效处理复杂的数据分析需求。关键在于理解业务逻辑,选择合适的聚合函数和排序策略,并结合可视化工具深入洞察数据模式。