|

Pandas 数据清洗

Pandas 数据清洗详解

1. 数据质量评估

1.1 初步检查

import pandas as pd
import numpy as np
import missingno as msno  # 可视化缺失值,需要安装

# 基本信息检查
def initial_assessment(df):
    """初始数据质量评估"""
    print("=== 数据形状 ===")
    print(f"行数: {len(df)}, 列数: {len(df.columns)}")

    print("\n=== 数据预览 ===")
    print(df.head())

    print("\n=== 数据信息 ===")
    print(df.info())

    print("\n=== 描述性统计 ===")
    print(df.describe(include='all'))

    print("\n=== 缺失值统计 ===")
    missing = df.isnull().sum()
    missing_percent = (missing / len(df)) * 100
    missing_df = pd.DataFrame({
        '缺失值': missing,
        '缺失率%': missing_percent.round(2)
    })
    print(missing_df[missing_df['缺失值'] > 0])

    print("\n=== 重复值 ===")
    duplicates = df.duplicated().sum()
    print(f"重复行数: {duplicates}")

    return missing_df, duplicates

# 使用
df = pd.read_csv('data.csv')
missing_info, dup_count = initial_assessment(df)

1.2 缺失值可视化

# 安装: pip install missingno
import missingno as msno

# 缺失值矩阵图
msno.matrix(df)
msno.bar(df)  # 缺失值条形图
msno.heatmap(df)  # 缺失值相关性热图

# 树状图显示缺失模式
msno.dendrogram(df)

2. 缺失值处理

2.1 检测和统计

# 多种检测方法
print("空值检测:")
print(df.isnull().sum())           # 每列空值数
print(df.isna().sum())             # 等价于 isnull()
print(df.isnull().any(axis=1).sum())  # 含空值的行数

# 缺失率计算
def missing_rate(df):
    total_cells = df.size
    missing_cells = df.isnull().sum().sum()
    return missing_cells / total_cells * 100

print(f"总缺失率: {missing_rate(df):.2f}%")

# 按行统计
df['missing_count'] = df.isnull().sum(axis=1)
print("每行缺失值统计:")
print(df['missing_count'].value_counts().sort_index())

2.2 缺失值填充策略

# 数值列填充
df['numeric_col'].fillna(df['numeric_col'].mean(), inplace=True)      # 均值
df['numeric_col'].fillna(df['numeric_col'].median(), inplace=True)    # 中位数
df['numeric_col'].fillna(df['numeric_col'].mode()[0], inplace=True)   # 众数

# 前向/后向填充(时间序列常用)
df['price'].fillna(method='ffill', inplace=True)    # 前向填充
df['price'].fillna(method='bfill', inplace=True)    # 后向填充

# 基于其他列填充
df['age'].fillna(df.groupby('city')['age'].transform('median'), inplace=True)

# 字符串列填充
df['category'].fillna('Unknown', inplace=True)
df['name'].fillna('N/A', inplace=True)

# 多列条件填充
df['sales'] = df['sales'].fillna(
    df['region'].map({'North': 1000, 'South': 800, 'East': 1200})
)

# 插值法
df['temperature'].interpolate(method='linear', inplace=True)      # 线性插值
df['temperature'].interpolate(method='time', inplace=True)        # 时间插值

2.3 删除策略

# 删除含空值的行/列
df_clean_rows = df.dropna()                          # 删除所有含空值的行
df_clean_cols = df.dropna(axis=1, thresh=0.8*len(df))  # 保留空值率<20%的列

# 条件删除
df_subset = df.dropna(subset=['name', 'age'])        # 必须有name和age的行
df_no_high_missing = df.dropna(thresh=len(df.columns)*0.9)  # 保留90%非空的行

# 基于缺失率删除
def drop_high_missing(df, threshold=0.5):
    """删除缺失率超过阈值的列"""
    missing_ratios = df.isnull().mean()
    cols_to_drop = missing_ratios[missing_ratios > threshold].index
    print(f"删除列: {list(cols_to_drop)}")
    return df.drop(columns=cols_to_drop)

df_filtered = drop_high_missing(df, threshold=0.7)

3. 重复值处理

3.1 检测重复

# 全局重复
duplicates = df.duplicated().sum()
print(f"重复行数: {duplicates}")

# 按子集检测
subset_dups = df.duplicated(subset=['name', 'email']).sum()
print(f"按name和email重复: {subset_dups}")

# 查看重复行
duplicate_rows = df[df.duplicated(keep=False)]  # keep=False显示所有重复
print("重复数据预览:")
print(duplicate_rows.head())

# 重复统计
dup_counts = df.duplicated(keep=False).sum(level=0) if df.index.name else df.duplicated(keep=False).value_counts()

3.2 删除重复

# 删除重复行
df_unique = df.drop_duplicates()                    # 删除所有重复
df_keep_first = df.drop_duplicates(keep='first')    # 保留第一条
df_keep_last = df.drop_duplicates(keep='last')      # 保留最后一条

# 按子集删除
df_no_email_dup = df.drop_duplicates(subset=['email'])

# 强制删除(忽略索引)
df_clean = df.reset_index(drop=True).drop_duplicates().set_index('id')

# 智能去重(保留最新数据)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df_sorted = df.sort_values('timestamp')
df_dedup = df_sorted.drop_duplicates(subset=['user_id'], keep='last')

4. 数据类型转换

4.1 类型检测和转换

# 当前类型检查
print("数据类型:")
print(df.dtypes)
print("\n对象列内容类型:")
for col in df.select_dtypes(include=['object']).columns:
    print(f"{col}: {df[col].apply(type).value_counts()}")

# 字符串转数值
df['price'] = pd.to_numeric(df['price'], errors='coerce')  # 无效值变NaN
df['quantity'] = pd.to_numeric(df['quantity'], downcast='integer')

# 日期转换
df['date'] = pd.to_datetime(df['date'], errors='coerce', infer_datetime_format=True)
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month

# 分类类型(节省内存)
low_cardinality_cols = df.select_dtypes(include=['object']).columns
for col in low_cardinality_cols:
    if df[col].nunique() / len(df) < 0.05:  # 唯一值少于5%
        df[col] = df[col].astype('category')

# 布尔类型
df['is_active'] = df['status'].isin(['active', 'Active']).astype(bool)

4.2 类型优化

def optimize_dtypes(df):
    """自动优化数据类型"""
    optimized_df = df.copy()

    # 整数优化
    for col in optimized_df.select_dtypes(include=['int64']).columns:
        optimized_df[col] = pd.to_numeric(optimized_df[col], downcast='integer')

    # 浮点数优化
    for col in optimized_df.select_dtypes(include=['float64']).columns:
        optimized_df[col] = pd.to_numeric(optimized_df[col], downcast='float')

    # 对象转分类
    for col in optimized_df.select_dtypes(include=['object']).columns:
        if optimized_df[col].nunique() / len(optimized_df) < 0.5:
            optimized_df[col] = optimized_df[col].astype('category')

    # 日期识别
    for col in optimized_df.select_dtypes(include=['object']).columns:
        try:
            optimized_df[col] = pd.to_datetime(optimized_df[col], errors='coerce')
        except:
            pass

    return optimized_df

df_optimized = optimize_dtypes(df)
print(f"内存节省: {(df.memory_usage(deep=True).sum() - df_optimized.memory_usage(deep=True).sum()) / 1024**2:.2f} MB")

5. 异常值处理

5.1 异常值检测

# IQR 方法
def detect_outliers_iqr(df, column):
    Q1 = df[column].quantile(0.25)
    Q3 = df[column].quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    outliers = df[(df[column] < lower_bound) | (df[column] > upper_bound)]
    return outliers, lower_bound, upper_bound

outliers, lb, ub = detect_outliers_iqr(df, 'price')
print(f"价格异常值: {len(outliers)} 条")
print(f"范围: ({lb:.2f}, {ub:.2f})")

# Z-score 方法
from scipy import stats
z_scores = np.abs(stats.zscore(df['price'].dropna()))
outliers_z = df.iloc[z_scores[z_scores > 3].index]  # 3σ 准则
print(f"Z-score 异常值: {len(outliers_z)} 条")

# 业务规则
def business_rule_outliers(df):
    """基于业务规则检测异常"""
    rules = {
        'price_negative': (df['price'] < 0),
        'quantity_zero': (df['quantity'] <= 0),
        'price_too_high': (df['price'] > 10000),
        'age_invalid': (df['age'] < 0) | (df['age'] > 120)
    }

    outliers = pd.DataFrame(index=df.index, dtype=bool)
    for rule_name, condition in rules.items():
        outliers[rule_name] = condition
        print(f"{rule_name}: {condition.sum()} 条")

    return outliers.any(axis=1), outliers

business_outliers, details = business_rule_outliers(df)

5.2 异常值处理

# 替换为边界值
def cap_outliers(df, column):
    Q1 = df[column].quantile(0.25)
    Q3 = df[column].quantile(0.75)
    IQR = Q3 - Q1
    lower = Q1 - 1.5 * IQR
    upper = Q3 + 1.5 * IQR
    df[column] = df[column].clip(lower=lower, upper=upper)
    return df

df_capped = cap_outliers(df.copy(), 'price')

# 替换为均值/中位数
outlier_mask = (df['price'] < lb) | (df['price'] > ub)
df.loc[outlier_mask, 'price'] = df.loc[~outlier_mask, 'price'].median()

# 删除异常值
df_clean = df[(df['price'] >= lb) & (df['price'] <= ub)]

# 标记异常值
df['is_outlier'] = ((df['price'] < lb) | (df['price'] > ub))

6. 字符串清洗

6.1 文本标准化

# 基础清洗
df['name'] = df['name'].str.strip()                    # 去除首尾空格
df['email'] = df['email'].str.lower().str.strip()      # 转小写并去空格
df['phone'] = df['phone'].str.replace(r'[^\d+]', '', regex=True)  # 提取数字

# 标准化格式
df['city'] = df['city'].str.title()                    # 首字母大写
df['address'] = df['address'].str.replace(r'\s+', ' ', regex=True)  # 统一空格

# 正则替换
df['product_code'] = df['product_code'].str.replace(r'^PROD-', '', regex=True)
df['description'] = df['description'].str.replace(r'<[^>]*>', '', regex=True)  # 移除HTML标签

# 提取信息
df['domain'] = df['email'].str.extract(r'@([\w.-]+)')
df['phone_area'] = df['phone'].str[:3]

# 缺失值处理
df['name'] = df['name'].replace('', np.nan).fillna('Unknown')

6.2 文本验证

import re

def validate_email(email):
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    return bool(re.match(pattern, str(email)))

def validate_phone(phone):
    pattern = r'^\+?1?\d{10,15}$'
    return bool(re.match(pattern, str(phone)))

# 批量验证
df['email_valid'] = df['email'].apply(validate_email)
df['phone_valid'] = df['phone'].apply(validate_phone)

invalid_emails = df[~df['email_valid']]
print(f"无效邮箱: {len(invalid_emails)} 条")

7. 数据一致性检查

7.1 枚举值标准化

# 统一分类值
def standardize_categories(df, column, mapping):
    """标准化分类值"""
    df[column] = df[column].str.lower().str.strip()
    df[column] = df[column].map(mapping).fillna(df[column])
    return df

# 示例映射
status_mapping = {
    'active': 'Active',
    'act': 'Active',
    'a': 'Active',
    'inactive': 'Inactive',
    'inact': 'Inactive',
    'i': 'Inactive'
}

df = standardize_categories(df, 'status', status_mapping)

# 智能映射(模糊匹配)
from fuzzywuzzy import fuzz, process

def smart_mapping(series, valid_values, threshold=80):
    """模糊匹配映射"""
    def map_value(value):
        if pd.isna(value):
            return value
        match = process.extractOne(str(value), valid_values, scorer=fuzz.token_sort_ratio)
        return match[0] if match[1] >= threshold else value

    return series.apply(map_value)

valid_statuses = ['Active', 'Inactive', 'Pending']
df['status_clean'] = smart_mapping(df['status'], valid_statuses)

7.2 跨列一致性

# 年龄和出生日期一致性
def validate_age_birthdate(df):
    current_year = pd.Timestamp.now().year
    df['birth_year_est'] = current_year - df['age']
    df['age_diff'] = df['birth_year_est'] - df['birth_date'].dt.year
    inconsistent = df[abs(df['age_diff']) > 1]
    print(f"年龄与出生日期不一致: {len(inconsistent)} 条")
    return inconsistent

# 业务规则验证
def validate_business_rules(df):
    issues = []

    # 价格不能为负
    if (df['price'] < 0).any():
        issues.append('负价格')

    # 数量与总价关系
    df['calc_total'] = df['quantity'] * df['price']
    price_mismatch = abs(df['total'] - df['calc_total']) > 0.01
    if price_mismatch.any():
        issues.append('总价计算错误')

    # 地区代码验证
    valid_regions = ['BJ', 'SH', 'GZ', 'SZ']
    invalid_regions = ~df['region_code'].isin(valid_regions)
    if invalid_regions.any():
        issues.append(f'无效地区代码: {df[invalid_regions]["region_code"].unique()}')

    return issues

issues = validate_business_rules(df)
for issue in issues:
    print(f"问题: {issue}")

8. 数据标准化和规范化

8.1 地址标准化

def standardize_address(df):
    """地址标准化"""
    # 去除多余空格
    df['address'] = df['address'].str.replace(r'\s+', ' ', regex=True).str.strip()

    # 标准化缩写
    address_mapping = {
        r'\bSt\.?\b': 'Street',
        r'\bAve\.?\b': 'Avenue',
        r'\bRd\.?\b': 'Road',
        r'\bBlvd\.?\b': 'Boulevard',
        r'\bDr\.?\b': 'Drive'
    }

    for pattern, replacement in address_mapping.items():
        df['address'] = df['address'].str.replace(pattern, replacement, regex=True)

    # 提取邮编
    df['zip_code'] = df['address'].str.extract(r'(\d{5}(-\d{4})?)')

    return df

df = standardize_address(df)

8.2 货币格式统一

def standardize_currency(df, column):
    """统一货币格式"""
    # 移除货币符号和逗号
    df[column] = df[column].str.replace(r'[$,€,£,¥]', '', regex=True)
    df[column] = df[column].str.replace(',', '', regex=True)

    # 转换为数值
    df[column] = pd.to_numeric(df[column], errors='coerce')

    # 统一为美元(如果需要汇率转换)
    # df[column] = df[column] * exchange_rate

    return df

df = standardize_currency(df, 'price')

9. 完整数据清洗管道

9.1 自动化清洗流程

class DataCleaningPipeline:
    """数据清洗管道"""

    def __init__(self, df):
        self.original_df = df.copy()
        self.df = df.copy()
        self.report = {
            'initial_shape': df.shape,
            'issues_found': {},
            'actions_taken': []
        }

    def handle_missing(self, strategy='mean', threshold=0.5):
        """处理缺失值"""
        missing_ratios = self.df.isnull().mean()
        cols_to_drop = missing_ratios[missing_ratios > threshold].index

        if len(cols_to_drop) > 0:
            self.df.drop(columns=cols_to_drop, inplace=True)
            self.report['issues_found']['high_missing_cols'] = list(cols_to_drop)
            self.report['actions_taken'].append(f"删除了 {len(cols_to_drop)} 个高缺失率列")

        # 数值列填充
        numeric_cols = self.df.select_dtypes(include=[np.number]).columns
        for col in numeric_cols:
            if col in self.df.columns:
                if strategy == 'mean':
                    fill_value = self.df[col].mean()
                elif strategy == 'median':
                    fill_value = self.df[col].median()
                self.df[col].fillna(fill_value, inplace=True)

        # 分类列填充
        cat_cols = self.df.select_dtypes(include=['object', 'category']).columns
        for col in cat_cols:
            if col in self.df.columns:
                self.df[col].fillna('Unknown', inplace=True)

        self.report['actions_taken'].append(f"缺失值填充策略: {strategy}")
        return self

    def remove_duplicates(self, subset=None):
        """删除重复值"""
        initial_len = len(self.df)
        self.df.drop_duplicates(subset=subset, inplace=True)
        final_len = len(self.df)
        removed = initial_len - final_len

        if removed > 0:
            self.report['issues_found']['duplicates'] = removed
            self.report['actions_taken'].append(f"删除了 {removed} 个重复行")

        return self

    def fix_data_types(self):
        """修复数据类型"""
        # 日期列
        for col in self.df.columns:
            if 'date' in col.lower() or 'time' in col.lower():
                self.df[col] = pd.to_datetime(self.df[col], errors='coerce')

        # 数值列
        for col in self.df.columns:
            if self.df[col].dtype == 'object':
                try:
                    self.df[col] = pd.to_numeric(self.df[col], errors='coerce')
                except:
                    pass

        # 分类优化
        for col in self.df.select_dtypes(include=['object']).columns:
            if self.df[col].nunique() / len(self.df) < 0.1:
                self.df[col] = self.df[col].astype('category')

        self.report['actions_taken'].append("数据类型优化完成")
        return self

    def handle_outliers(self, method='cap', sigma=3):
        """处理异常值"""
        numeric_cols = self.df.select_dtypes(include=[np.number]).columns

        for col in numeric_cols:
            if method == 'cap':
                Q1 = self.df[col].quantile(0.25)
                Q3 = self.df[col].quantile(0.75)
                IQR = Q3 - Q1
                lower = Q1 - 1.5 * IQR
                upper = Q3 + 1.5 * IQR
                initial_outliers = ((self.df[col] < lower) | (self.df[col] > upper)).sum()
                self.df[col] = self.df[col].clip(lower=lower, upper=upper)
                self.report['issues_found'][f'{col}_outliers'] = initial_outliers
            elif method == 'zscore':
                z_scores = np.abs(stats.zscore(self.df[col].dropna()))
                outliers = z_scores > sigma
                self.df.loc[outliers, col] = np.nan  # 或用均值填充

        return self

    def clean_text(self):
        """文本清洗"""
        text_cols = self.df.select_dtypes(include=['object']).columns

        for col in text_cols:
            # 去除空格
            self.df[col] = self.df[col].astype(str).str.strip()
            # 转小写(可选)
            # self.df[col] = self.df[col].str.lower()
            # 替换常见无效值
            self.df[col] = self.df[col].replace(['', 'nan', 'null', 'N/A'], np.nan)

        return self

    def validate_business_rules(self, rules):
        """业务规则验证"""
        issues = []
        for rule_name, rule_func in rules.items():
            invalid_mask = rule_func(self.df)
            invalid_count = invalid_mask.sum()
            if invalid_count > 0:
                issues.append({
                    'rule': rule_name,
                    'invalid_count': invalid_count,
                    'invalid_indices': self.df[invalid_mask].index.tolist()
                })

        self.report['issues_found']['business_rules'] = issues
        return self

    def execute(self):
        """执行完整清洗流程"""
        # 按顺序执行清洗步骤
        self.handle_missing()
        self.remove_duplicates()
        self.fix_data_types()
        self.clean_text()
        self.handle_outliers()

        # 业务规则(需要用户定义)
        business_rules = {
            'negative_price': lambda df: df['price'] < 0,
            'zero_quantity': lambda df: df['quantity'] <= 0
        }
        self.validate_business_rules(business_rules)

        self.report['final_shape'] = self.df.shape
        return self.df, self.report

    def save_report(self, filename):
        """保存清洗报告"""
        import json
        report = {
            'initial_shape': self.report['initial_shape'],
            'final_shape': self.report['final_shape'],
            'actions_taken': self.report['actions_taken'],
            'issues_found': self.report['issues_found']
        }

        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(report, f, ensure_ascii=False, indent=2)

# 使用示例
pipeline = DataCleaningPipeline(df)
clean_df, report = pipeline.execute()
pipeline.save_report('cleaning_report.json')

print("清洗完成!")
print(f"数据形状: {clean_df.shape}")
print("处理报告已保存到 cleaning_report.json")

9.2 质量检查函数

def data_quality_report(df, original_df=None):
    """生成数据质量报告"""
    report = {}

    # 基本信息
    report['shape'] = df.shape
    report['memory_usage'] = df.memory_usage(deep=True).sum() / 1024**2

    # 缺失值
    report['missing_values'] = df.isnull().sum().to_dict()
    report['missing_rate'] = (df.isnull().sum() / len(df) * 100).round(2).to_dict()

    # 唯一值
    report['unique_counts'] = df.nunique().to_dict()

    # 数据类型
    report['dtypes'] = df.dtypes.astype(str).to_dict()

    # 异常值(简单统计)
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    outliers_info = {}
    for col in numeric_cols:
        Q1 = df[col].quantile(0.25)
        Q3 = df[col].quantile(0.75)
        IQR = Q3 - Q1
        outlier_count = ((df[col] < (Q1 - 1.5*IQR)) | (df[col] > (Q3 + 1.5*IQR))).sum()
        outliers_info[col] = outlier_count

    report['outliers'] = outliers_info

    # 与原始数据对比(如果提供)
    if original_df is not None:
        report['rows_removed'] = len(original_df) - len(df)
        report['cols_removed'] = len(original_df.columns) - len(df.columns)

    return report

# 生成报告
quality_report = data_quality_report(clean_df, df)
print("数据质量报告:")
for key, value in quality_report.items():
    if isinstance(value, dict) and len(value) > 0:
        print(f"\n{key}:")
        for k, v in value.items():
            if v > 0:
                print(f"  {k}: {v}")

10. 最佳实践和注意事项

10.1 清洗原则

  1. 记录所有操作:维护详细的清洗日志
  2. 不破坏原始数据:始终保留原始数据副本
  3. 业务规则优先:清洗策略要符合业务逻辑
  4. 渐进式清洗:分步骤执行,便于调试
  5. 自动化测试:建立数据质量检查

10.2 性能优化

# 向量化操作而非循环
# 差:for index, row in df.iterrows(): ...
# 好:df['new_col'] = df['col1'] + df['col2']

# 使用适当的数据类型
df['category'] = df['category'].astype('category')  # 节省内存
df['id'] = df['id'].astype('int32')  # 更小的整数类型

# 分块处理大文件
def clean_large_file(file_path, chunk_size=10000):
    chunks = []
    for chunk in pd.read_csv(file_path, chunksize=chunk_size):
        # 清洗每个块
        chunk_clean = clean_chunk(chunk)
        chunks.append(chunk_clean)
    return pd.concat(chunks, ignore_index=True)

10.3 版本控制和可重复性

# 使用配置文件
CLEANING_CONFIG = {
    'missing_threshold': 0.5,
    'outlier_sigma': 3,
    'filling_strategy': 'median',
    'remove_duplicates': True,
    'standardize_text': True
}

# 种子设置(随机操作)
np.random.seed(42)

数据清洗是数据分析的基础,良好的清洗习惯可以显著提高数据质量和分析结果的可靠性。通过自动化管道和详细的报告机制,可以高效处理复杂的数据清洗任务。

类似文章

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注