Pandas 数据清洗
Pandas 数据清洗详解
1. 数据质量评估
1.1 初步检查
import pandas as pd
import numpy as np
import missingno as msno # 可视化缺失值,需要安装
# 基本信息检查
def initial_assessment(df):
"""初始数据质量评估"""
print("=== 数据形状 ===")
print(f"行数: {len(df)}, 列数: {len(df.columns)}")
print("\n=== 数据预览 ===")
print(df.head())
print("\n=== 数据信息 ===")
print(df.info())
print("\n=== 描述性统计 ===")
print(df.describe(include='all'))
print("\n=== 缺失值统计 ===")
missing = df.isnull().sum()
missing_percent = (missing / len(df)) * 100
missing_df = pd.DataFrame({
'缺失值': missing,
'缺失率%': missing_percent.round(2)
})
print(missing_df[missing_df['缺失值'] > 0])
print("\n=== 重复值 ===")
duplicates = df.duplicated().sum()
print(f"重复行数: {duplicates}")
return missing_df, duplicates
# 使用
df = pd.read_csv('data.csv')
missing_info, dup_count = initial_assessment(df)
1.2 缺失值可视化
# 安装: pip install missingno
import missingno as msno
# 缺失值矩阵图
msno.matrix(df)
msno.bar(df) # 缺失值条形图
msno.heatmap(df) # 缺失值相关性热图
# 树状图显示缺失模式
msno.dendrogram(df)
2. 缺失值处理
2.1 检测和统计
# 多种检测方法
print("空值检测:")
print(df.isnull().sum()) # 每列空值数
print(df.isna().sum()) # 等价于 isnull()
print(df.isnull().any(axis=1).sum()) # 含空值的行数
# 缺失率计算
def missing_rate(df):
total_cells = df.size
missing_cells = df.isnull().sum().sum()
return missing_cells / total_cells * 100
print(f"总缺失率: {missing_rate(df):.2f}%")
# 按行统计
df['missing_count'] = df.isnull().sum(axis=1)
print("每行缺失值统计:")
print(df['missing_count'].value_counts().sort_index())
2.2 缺失值填充策略
# 数值列填充
df['numeric_col'].fillna(df['numeric_col'].mean(), inplace=True) # 均值
df['numeric_col'].fillna(df['numeric_col'].median(), inplace=True) # 中位数
df['numeric_col'].fillna(df['numeric_col'].mode()[0], inplace=True) # 众数
# 前向/后向填充(时间序列常用)
df['price'].fillna(method='ffill', inplace=True) # 前向填充
df['price'].fillna(method='bfill', inplace=True) # 后向填充
# 基于其他列填充
df['age'].fillna(df.groupby('city')['age'].transform('median'), inplace=True)
# 字符串列填充
df['category'].fillna('Unknown', inplace=True)
df['name'].fillna('N/A', inplace=True)
# 多列条件填充
df['sales'] = df['sales'].fillna(
df['region'].map({'North': 1000, 'South': 800, 'East': 1200})
)
# 插值法
df['temperature'].interpolate(method='linear', inplace=True) # 线性插值
df['temperature'].interpolate(method='time', inplace=True) # 时间插值
2.3 删除策略
# 删除含空值的行/列
df_clean_rows = df.dropna() # 删除所有含空值的行
df_clean_cols = df.dropna(axis=1, thresh=0.8*len(df)) # 保留空值率<20%的列
# 条件删除
df_subset = df.dropna(subset=['name', 'age']) # 必须有name和age的行
df_no_high_missing = df.dropna(thresh=len(df.columns)*0.9) # 保留90%非空的行
# 基于缺失率删除
def drop_high_missing(df, threshold=0.5):
"""删除缺失率超过阈值的列"""
missing_ratios = df.isnull().mean()
cols_to_drop = missing_ratios[missing_ratios > threshold].index
print(f"删除列: {list(cols_to_drop)}")
return df.drop(columns=cols_to_drop)
df_filtered = drop_high_missing(df, threshold=0.7)
3. 重复值处理
3.1 检测重复
# 全局重复
duplicates = df.duplicated().sum()
print(f"重复行数: {duplicates}")
# 按子集检测
subset_dups = df.duplicated(subset=['name', 'email']).sum()
print(f"按name和email重复: {subset_dups}")
# 查看重复行
duplicate_rows = df[df.duplicated(keep=False)] # keep=False显示所有重复
print("重复数据预览:")
print(duplicate_rows.head())
# 重复统计
dup_counts = df.duplicated(keep=False).sum(level=0) if df.index.name else df.duplicated(keep=False).value_counts()
3.2 删除重复
# 删除重复行
df_unique = df.drop_duplicates() # 删除所有重复
df_keep_first = df.drop_duplicates(keep='first') # 保留第一条
df_keep_last = df.drop_duplicates(keep='last') # 保留最后一条
# 按子集删除
df_no_email_dup = df.drop_duplicates(subset=['email'])
# 强制删除(忽略索引)
df_clean = df.reset_index(drop=True).drop_duplicates().set_index('id')
# 智能去重(保留最新数据)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df_sorted = df.sort_values('timestamp')
df_dedup = df_sorted.drop_duplicates(subset=['user_id'], keep='last')
4. 数据类型转换
4.1 类型检测和转换
# 当前类型检查
print("数据类型:")
print(df.dtypes)
print("\n对象列内容类型:")
for col in df.select_dtypes(include=['object']).columns:
print(f"{col}: {df[col].apply(type).value_counts()}")
# 字符串转数值
df['price'] = pd.to_numeric(df['price'], errors='coerce') # 无效值变NaN
df['quantity'] = pd.to_numeric(df['quantity'], downcast='integer')
# 日期转换
df['date'] = pd.to_datetime(df['date'], errors='coerce', infer_datetime_format=True)
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
# 分类类型(节省内存)
low_cardinality_cols = df.select_dtypes(include=['object']).columns
for col in low_cardinality_cols:
if df[col].nunique() / len(df) < 0.05: # 唯一值少于5%
df[col] = df[col].astype('category')
# 布尔类型
df['is_active'] = df['status'].isin(['active', 'Active']).astype(bool)
4.2 类型优化
def optimize_dtypes(df):
"""自动优化数据类型"""
optimized_df = df.copy()
# 整数优化
for col in optimized_df.select_dtypes(include=['int64']).columns:
optimized_df[col] = pd.to_numeric(optimized_df[col], downcast='integer')
# 浮点数优化
for col in optimized_df.select_dtypes(include=['float64']).columns:
optimized_df[col] = pd.to_numeric(optimized_df[col], downcast='float')
# 对象转分类
for col in optimized_df.select_dtypes(include=['object']).columns:
if optimized_df[col].nunique() / len(optimized_df) < 0.5:
optimized_df[col] = optimized_df[col].astype('category')
# 日期识别
for col in optimized_df.select_dtypes(include=['object']).columns:
try:
optimized_df[col] = pd.to_datetime(optimized_df[col], errors='coerce')
except:
pass
return optimized_df
df_optimized = optimize_dtypes(df)
print(f"内存节省: {(df.memory_usage(deep=True).sum() - df_optimized.memory_usage(deep=True).sum()) / 1024**2:.2f} MB")
5. 异常值处理
5.1 异常值检测
# IQR 方法
def detect_outliers_iqr(df, column):
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = df[(df[column] < lower_bound) | (df[column] > upper_bound)]
return outliers, lower_bound, upper_bound
outliers, lb, ub = detect_outliers_iqr(df, 'price')
print(f"价格异常值: {len(outliers)} 条")
print(f"范围: ({lb:.2f}, {ub:.2f})")
# Z-score 方法
from scipy import stats
z_scores = np.abs(stats.zscore(df['price'].dropna()))
outliers_z = df.iloc[z_scores[z_scores > 3].index] # 3σ 准则
print(f"Z-score 异常值: {len(outliers_z)} 条")
# 业务规则
def business_rule_outliers(df):
"""基于业务规则检测异常"""
rules = {
'price_negative': (df['price'] < 0),
'quantity_zero': (df['quantity'] <= 0),
'price_too_high': (df['price'] > 10000),
'age_invalid': (df['age'] < 0) | (df['age'] > 120)
}
outliers = pd.DataFrame(index=df.index, dtype=bool)
for rule_name, condition in rules.items():
outliers[rule_name] = condition
print(f"{rule_name}: {condition.sum()} 条")
return outliers.any(axis=1), outliers
business_outliers, details = business_rule_outliers(df)
5.2 异常值处理
# 替换为边界值
def cap_outliers(df, column):
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - 1.5 * IQR
upper = Q3 + 1.5 * IQR
df[column] = df[column].clip(lower=lower, upper=upper)
return df
df_capped = cap_outliers(df.copy(), 'price')
# 替换为均值/中位数
outlier_mask = (df['price'] < lb) | (df['price'] > ub)
df.loc[outlier_mask, 'price'] = df.loc[~outlier_mask, 'price'].median()
# 删除异常值
df_clean = df[(df['price'] >= lb) & (df['price'] <= ub)]
# 标记异常值
df['is_outlier'] = ((df['price'] < lb) | (df['price'] > ub))
6. 字符串清洗
6.1 文本标准化
# 基础清洗
df['name'] = df['name'].str.strip() # 去除首尾空格
df['email'] = df['email'].str.lower().str.strip() # 转小写并去空格
df['phone'] = df['phone'].str.replace(r'[^\d+]', '', regex=True) # 提取数字
# 标准化格式
df['city'] = df['city'].str.title() # 首字母大写
df['address'] = df['address'].str.replace(r'\s+', ' ', regex=True) # 统一空格
# 正则替换
df['product_code'] = df['product_code'].str.replace(r'^PROD-', '', regex=True)
df['description'] = df['description'].str.replace(r'<[^>]*>', '', regex=True) # 移除HTML标签
# 提取信息
df['domain'] = df['email'].str.extract(r'@([\w.-]+)')
df['phone_area'] = df['phone'].str[:3]
# 缺失值处理
df['name'] = df['name'].replace('', np.nan).fillna('Unknown')
6.2 文本验证
import re
def validate_email(email):
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, str(email)))
def validate_phone(phone):
pattern = r'^\+?1?\d{10,15}$'
return bool(re.match(pattern, str(phone)))
# 批量验证
df['email_valid'] = df['email'].apply(validate_email)
df['phone_valid'] = df['phone'].apply(validate_phone)
invalid_emails = df[~df['email_valid']]
print(f"无效邮箱: {len(invalid_emails)} 条")
7. 数据一致性检查
7.1 枚举值标准化
# 统一分类值
def standardize_categories(df, column, mapping):
"""标准化分类值"""
df[column] = df[column].str.lower().str.strip()
df[column] = df[column].map(mapping).fillna(df[column])
return df
# 示例映射
status_mapping = {
'active': 'Active',
'act': 'Active',
'a': 'Active',
'inactive': 'Inactive',
'inact': 'Inactive',
'i': 'Inactive'
}
df = standardize_categories(df, 'status', status_mapping)
# 智能映射(模糊匹配)
from fuzzywuzzy import fuzz, process
def smart_mapping(series, valid_values, threshold=80):
"""模糊匹配映射"""
def map_value(value):
if pd.isna(value):
return value
match = process.extractOne(str(value), valid_values, scorer=fuzz.token_sort_ratio)
return match[0] if match[1] >= threshold else value
return series.apply(map_value)
valid_statuses = ['Active', 'Inactive', 'Pending']
df['status_clean'] = smart_mapping(df['status'], valid_statuses)
7.2 跨列一致性
# 年龄和出生日期一致性
def validate_age_birthdate(df):
current_year = pd.Timestamp.now().year
df['birth_year_est'] = current_year - df['age']
df['age_diff'] = df['birth_year_est'] - df['birth_date'].dt.year
inconsistent = df[abs(df['age_diff']) > 1]
print(f"年龄与出生日期不一致: {len(inconsistent)} 条")
return inconsistent
# 业务规则验证
def validate_business_rules(df):
issues = []
# 价格不能为负
if (df['price'] < 0).any():
issues.append('负价格')
# 数量与总价关系
df['calc_total'] = df['quantity'] * df['price']
price_mismatch = abs(df['total'] - df['calc_total']) > 0.01
if price_mismatch.any():
issues.append('总价计算错误')
# 地区代码验证
valid_regions = ['BJ', 'SH', 'GZ', 'SZ']
invalid_regions = ~df['region_code'].isin(valid_regions)
if invalid_regions.any():
issues.append(f'无效地区代码: {df[invalid_regions]["region_code"].unique()}')
return issues
issues = validate_business_rules(df)
for issue in issues:
print(f"问题: {issue}")
8. 数据标准化和规范化
8.1 地址标准化
def standardize_address(df):
"""地址标准化"""
# 去除多余空格
df['address'] = df['address'].str.replace(r'\s+', ' ', regex=True).str.strip()
# 标准化缩写
address_mapping = {
r'\bSt\.?\b': 'Street',
r'\bAve\.?\b': 'Avenue',
r'\bRd\.?\b': 'Road',
r'\bBlvd\.?\b': 'Boulevard',
r'\bDr\.?\b': 'Drive'
}
for pattern, replacement in address_mapping.items():
df['address'] = df['address'].str.replace(pattern, replacement, regex=True)
# 提取邮编
df['zip_code'] = df['address'].str.extract(r'(\d{5}(-\d{4})?)')
return df
df = standardize_address(df)
8.2 货币格式统一
def standardize_currency(df, column):
"""统一货币格式"""
# 移除货币符号和逗号
df[column] = df[column].str.replace(r'[$,€,£,¥]', '', regex=True)
df[column] = df[column].str.replace(',', '', regex=True)
# 转换为数值
df[column] = pd.to_numeric(df[column], errors='coerce')
# 统一为美元(如果需要汇率转换)
# df[column] = df[column] * exchange_rate
return df
df = standardize_currency(df, 'price')
9. 完整数据清洗管道
9.1 自动化清洗流程
class DataCleaningPipeline:
"""数据清洗管道"""
def __init__(self, df):
self.original_df = df.copy()
self.df = df.copy()
self.report = {
'initial_shape': df.shape,
'issues_found': {},
'actions_taken': []
}
def handle_missing(self, strategy='mean', threshold=0.5):
"""处理缺失值"""
missing_ratios = self.df.isnull().mean()
cols_to_drop = missing_ratios[missing_ratios > threshold].index
if len(cols_to_drop) > 0:
self.df.drop(columns=cols_to_drop, inplace=True)
self.report['issues_found']['high_missing_cols'] = list(cols_to_drop)
self.report['actions_taken'].append(f"删除了 {len(cols_to_drop)} 个高缺失率列")
# 数值列填充
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
if col in self.df.columns:
if strategy == 'mean':
fill_value = self.df[col].mean()
elif strategy == 'median':
fill_value = self.df[col].median()
self.df[col].fillna(fill_value, inplace=True)
# 分类列填充
cat_cols = self.df.select_dtypes(include=['object', 'category']).columns
for col in cat_cols:
if col in self.df.columns:
self.df[col].fillna('Unknown', inplace=True)
self.report['actions_taken'].append(f"缺失值填充策略: {strategy}")
return self
def remove_duplicates(self, subset=None):
"""删除重复值"""
initial_len = len(self.df)
self.df.drop_duplicates(subset=subset, inplace=True)
final_len = len(self.df)
removed = initial_len - final_len
if removed > 0:
self.report['issues_found']['duplicates'] = removed
self.report['actions_taken'].append(f"删除了 {removed} 个重复行")
return self
def fix_data_types(self):
"""修复数据类型"""
# 日期列
for col in self.df.columns:
if 'date' in col.lower() or 'time' in col.lower():
self.df[col] = pd.to_datetime(self.df[col], errors='coerce')
# 数值列
for col in self.df.columns:
if self.df[col].dtype == 'object':
try:
self.df[col] = pd.to_numeric(self.df[col], errors='coerce')
except:
pass
# 分类优化
for col in self.df.select_dtypes(include=['object']).columns:
if self.df[col].nunique() / len(self.df) < 0.1:
self.df[col] = self.df[col].astype('category')
self.report['actions_taken'].append("数据类型优化完成")
return self
def handle_outliers(self, method='cap', sigma=3):
"""处理异常值"""
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
if method == 'cap':
Q1 = self.df[col].quantile(0.25)
Q3 = self.df[col].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - 1.5 * IQR
upper = Q3 + 1.5 * IQR
initial_outliers = ((self.df[col] < lower) | (self.df[col] > upper)).sum()
self.df[col] = self.df[col].clip(lower=lower, upper=upper)
self.report['issues_found'][f'{col}_outliers'] = initial_outliers
elif method == 'zscore':
z_scores = np.abs(stats.zscore(self.df[col].dropna()))
outliers = z_scores > sigma
self.df.loc[outliers, col] = np.nan # 或用均值填充
return self
def clean_text(self):
"""文本清洗"""
text_cols = self.df.select_dtypes(include=['object']).columns
for col in text_cols:
# 去除空格
self.df[col] = self.df[col].astype(str).str.strip()
# 转小写(可选)
# self.df[col] = self.df[col].str.lower()
# 替换常见无效值
self.df[col] = self.df[col].replace(['', 'nan', 'null', 'N/A'], np.nan)
return self
def validate_business_rules(self, rules):
"""业务规则验证"""
issues = []
for rule_name, rule_func in rules.items():
invalid_mask = rule_func(self.df)
invalid_count = invalid_mask.sum()
if invalid_count > 0:
issues.append({
'rule': rule_name,
'invalid_count': invalid_count,
'invalid_indices': self.df[invalid_mask].index.tolist()
})
self.report['issues_found']['business_rules'] = issues
return self
def execute(self):
"""执行完整清洗流程"""
# 按顺序执行清洗步骤
self.handle_missing()
self.remove_duplicates()
self.fix_data_types()
self.clean_text()
self.handle_outliers()
# 业务规则(需要用户定义)
business_rules = {
'negative_price': lambda df: df['price'] < 0,
'zero_quantity': lambda df: df['quantity'] <= 0
}
self.validate_business_rules(business_rules)
self.report['final_shape'] = self.df.shape
return self.df, self.report
def save_report(self, filename):
"""保存清洗报告"""
import json
report = {
'initial_shape': self.report['initial_shape'],
'final_shape': self.report['final_shape'],
'actions_taken': self.report['actions_taken'],
'issues_found': self.report['issues_found']
}
with open(filename, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
# 使用示例
pipeline = DataCleaningPipeline(df)
clean_df, report = pipeline.execute()
pipeline.save_report('cleaning_report.json')
print("清洗完成!")
print(f"数据形状: {clean_df.shape}")
print("处理报告已保存到 cleaning_report.json")
9.2 质量检查函数
def data_quality_report(df, original_df=None):
"""生成数据质量报告"""
report = {}
# 基本信息
report['shape'] = df.shape
report['memory_usage'] = df.memory_usage(deep=True).sum() / 1024**2
# 缺失值
report['missing_values'] = df.isnull().sum().to_dict()
report['missing_rate'] = (df.isnull().sum() / len(df) * 100).round(2).to_dict()
# 唯一值
report['unique_counts'] = df.nunique().to_dict()
# 数据类型
report['dtypes'] = df.dtypes.astype(str).to_dict()
# 异常值(简单统计)
numeric_cols = df.select_dtypes(include=[np.number]).columns
outliers_info = {}
for col in numeric_cols:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
outlier_count = ((df[col] < (Q1 - 1.5*IQR)) | (df[col] > (Q3 + 1.5*IQR))).sum()
outliers_info[col] = outlier_count
report['outliers'] = outliers_info
# 与原始数据对比(如果提供)
if original_df is not None:
report['rows_removed'] = len(original_df) - len(df)
report['cols_removed'] = len(original_df.columns) - len(df.columns)
return report
# 生成报告
quality_report = data_quality_report(clean_df, df)
print("数据质量报告:")
for key, value in quality_report.items():
if isinstance(value, dict) and len(value) > 0:
print(f"\n{key}:")
for k, v in value.items():
if v > 0:
print(f" {k}: {v}")
10. 最佳实践和注意事项
10.1 清洗原则
- 记录所有操作:维护详细的清洗日志
- 不破坏原始数据:始终保留原始数据副本
- 业务规则优先:清洗策略要符合业务逻辑
- 渐进式清洗:分步骤执行,便于调试
- 自动化测试:建立数据质量检查
10.2 性能优化
# 向量化操作而非循环
# 差:for index, row in df.iterrows(): ...
# 好:df['new_col'] = df['col1'] + df['col2']
# 使用适当的数据类型
df['category'] = df['category'].astype('category') # 节省内存
df['id'] = df['id'].astype('int32') # 更小的整数类型
# 分块处理大文件
def clean_large_file(file_path, chunk_size=10000):
chunks = []
for chunk in pd.read_csv(file_path, chunksize=chunk_size):
# 清洗每个块
chunk_clean = clean_chunk(chunk)
chunks.append(chunk_clean)
return pd.concat(chunks, ignore_index=True)
10.3 版本控制和可重复性
# 使用配置文件
CLEANING_CONFIG = {
'missing_threshold': 0.5,
'outlier_sigma': 3,
'filling_strategy': 'median',
'remove_duplicates': True,
'standardize_text': True
}
# 种子设置(随机操作)
np.random.seed(42)
数据清洗是数据分析的基础,良好的清洗习惯可以显著提高数据质量和分析结果的可靠性。通过自动化管道和详细的报告机制,可以高效处理复杂的数据清洗任务。