|

Pandas CSV 文件

Pandas CSV 文件操作详解

1. CSV 文件读取

1.1 基本读取

import pandas as pd

# 读取 CSV 文件
df = pd.read_csv('data.csv')
print(df.head())

# 指定路径
df = pd.read_csv('./data/sales.csv')
df = pd.read_csv('https://example.com/data.csv')  # URL

1.2 常用参数

# 核心参数
df = pd.read_csv('data.csv',
                sep=',',           # 分隔符,默认逗号
                header=0,          # 表头行,默认第0行
                names=['col1', 'col2'],  # 自定义列名
                index_col=0,       # 设置索引列
                usecols=['A', 'B'], # 只读取指定列
                nrows=100,         # 读取前N行
                skiprows=[0, 2, 3], # 跳过指定行
                skip_blank_lines=True, # 跳过空行
                na_values=['NULL', 'N/A'], # 自定义空值标识
                encoding='utf-8',  # 编码
                dtype={'col1': 'int32', 'col2': 'category'}, # 数据类型
                parse_dates=['date_col'], # 解析日期列
                date_parser=lambda x: pd.to_datetime(x, format='%Y-%m-%d') # 自定义日期解析
                )

1.3 处理大文件

# 分块读取(内存优化)
chunk_iter = pd.read_csv('large_file.csv', chunksize=10000)
processed_chunks = []

for chunk in chunk_iter:
    # 处理每个块
    processed_chunk = chunk[chunk['value'] > 100]  # 示例过滤
    processed_chunks.append(processed_chunk)

df = pd.concat(processed_chunks, ignore_index=True)

1.4 分隔符处理

# 不同分隔符
df_tab = pd.read_csv('data.tsv', sep='\t')           # 制表符
df_semicolon = pd.read_csv('data.csv', sep=';')      # 分号
df_pipe = pd.read_csv('data.txt', sep='|')           # 竖线
df_space = pd.read_csv('data.txt', sep='\s+')        # 空格(正则)

# 自动检测分隔符
df_auto = pd.read_csv('data.csv', sep=None, engine='python')

2. CSV 文件写入

2.1 基本写入

# 写入 CSV
df.to_csv('output.csv', index=False)  # 不写索引

# 指定参数
df.to_csv('output.csv',
         index=False,           # 不写行索引
         columns=['A', 'B'],    # 只写指定列
         sep=';',               # 分隔符
         encoding='utf-8',      # 编码
         na_rep='NULL',         # 空值替换
         date_format='%Y-%m-%d', # 日期格式
         float_format='%.2f',   # 浮点数格式
         decimal=',',           # 小数点
         quoting=csv.QUOTE_ALL  # 引用方式
         )

2.2 追加写入

# 追加模式(需要手动处理)
df1 = pd.DataFrame({'A': [1, 2]})
df2 = pd.DataFrame({'A': [3, 4]})

df1.to_csv('output.csv', index=False)
df2.to_csv('output.csv', mode='a', header=False, index=False)  # 追加,不写表头

2.3 分块写入

# 大文件分块写入
chunk_iter = pd.read_csv('large_input.csv', chunksize=10000)

with open('large_output.csv', 'w') as f:
    first_chunk = True
    for chunk in chunk_iter:
        chunk.to_csv(f, 
                    index=False, 
                    header=first_chunk,  # 第一个块写表头
                    mode='a' if not first_chunk else 'w')
        first_chunk = False

3. 高级读取技巧

3.1 压缩文件

# 自动检测压缩格式
df_gz = pd.read_csv('data.csv.gz')
df_bz2 = pd.read_csv('data.csv.bz2')
df_zip = pd.read_csv('data.csv.zip')

# 明确指定
df = pd.read_csv('data.csv.gz', compression='gzip')

3.2 多表头和复杂结构

# 多级表头
df_multi_header = pd.read_csv('data.csv', header=[0, 1])  # 前两行作为表头

# 无表头,自定义列名
df_no_header = pd.read_csv('data.csv', header=None, 
                          names=['col1', 'col2', 'col3'])

# 跳过注释行
df_comments = pd.read_csv('data.csv', comment='#')

3.3 数据类型优化

# 预先指定数据类型
dtypes = {
    'id': 'int32',
    'price': 'float32',
    'category': 'category',
    'date': 'datetime64[ns]'
}

df = pd.read_csv('data.csv', dtype=dtypes, parse_dates=['date'])

# 延迟类型推断(低内存模式)
df_low_memory = pd.read_csv('data.csv', low_memory=True)

3.4 错误处理

# 容错读取
try:
    df = pd.read_csv('data.csv', on_bad_lines='skip')  # 跳过错误行
except pd.errors.ParserError as e:
    print(f"解析错误: {e}")
    df = pd.read_csv('data.csv', on_bad_lines='warn')  # 警告但继续

# 验证数据
df = pd.read_csv('data.csv', validate_header=True)  # 检查表头

4. 实际应用示例

4.1 销售数据处理

# 读取销售数据
sales_df = pd.read_csv('sales.csv',
                      parse_dates=['order_date', 'ship_date'],
                      dtype={'customer_id': 'category', 
                            'product_id': 'category'})

print("销售数据预览:")
print(sales_df.head())
print("\n数据类型:")
print(sales_df.dtypes)
print("\n缺失值统计:")
print(sales_df.isnull().sum())

4.2 大文件分块处理

def process_large_csv(file_path, chunk_size=10000):
    """分块处理大 CSV 文件"""
    total_rows = 0
    total_revenue = 0

    for chunk in pd.read_csv(file_path, chunksize=chunk_size):
        # 数据清洗
        chunk = chunk.dropna(subset=['price', 'quantity'])
        chunk['revenue'] = chunk['price'] * chunk['quantity']

        # 统计
        total_rows += len(chunk)
        total_revenue += chunk['revenue'].sum()

        print(f"处理了 {total_rows} 行,总收入: {total_revenue:,.2f}")

    return total_rows, total_revenue

# 使用
rows, revenue = process_large_csv('large_sales.csv')

4.3 数据验证和清洗

def clean_csv_data(df):
    """CSV 数据清洗函数"""
    # 1. 检查并处理缺失值
    print("缺失值统计:")
    print(df.isnull().sum())

    # 2. 数据类型转换
    numeric_cols = ['price', 'quantity']
    for col in numeric_cols:
        df[col] = pd.to_numeric(df[col], errors='coerce')

    # 3. 删除无效数据
    df = df.dropna(subset=numeric_cols)
    df = df[df['price'] > 0]  # 价格必须大于0

    # 4. 字符串清洗
    if 'customer_name' in df.columns:
        df['customer_name'] = df['customer_name'].str.strip().str.title()

    # 5. 重复值处理
    initial_len = len(df)
    df = df.drop_duplicates()
    print(f"删除重复行: {initial_len - len(df)} 行")

    return df

# 应用清洗
df_clean = clean_csv_data(pd.read_csv('raw_data.csv'))
df_clean.to_csv('cleaned_data.csv', index=False)

5. 性能优化

5.1 内存优化

# 读取时优化数据类型
def optimize_dtypes(df):
    """优化数据类型以减少内存使用"""
    for col in df.columns:
        if df[col].dtype == 'object':
            # 尝试转换为分类
            if df[col].nunique() / len(df) < 0.5:
                df[col] = df[col].astype('category')
        elif 'int' in str(df[col].dtype):
            # 转换为更小整数类型
            df[col] = pd.to_numeric(df[col], downcast='integer')
        elif 'float' in str(df[col].dtype):
            df[col] = pd.to_numeric(df[col], downcast='float')
    return df

# 使用
df = pd.read_csv('data.csv')
df_optimized = optimize_dtypes(df)
print(f"内存节省: {df.memory_usage(deep=True).sum() - df_optimized.memory_usage(deep=True).sum()} bytes")

5.2 并行处理

# 使用 Dask 处理超大文件(需要安装 dask)
import dask.dataframe as dd

# Dask DataFrame
ddf = dd.read_csv('large_file.csv')
result = ddf.groupby('category')['value'].sum().compute()  # 延迟计算

5.3 快速读取

# 使用 pyarrow 引擎(更快)
df_arrow = pd.read_csv('data.csv', engine='pyarrow')

# 或者使用 Polars(Rust 实现,更快)
# import polars as pl
# df_polars = pl.read_csv('data.csv')

6. 错误处理和调试

6.1 常见问题

# 编码问题
try:
    df = pd.read_csv('data.csv', encoding='utf-8')
except UnicodeDecodeError:
    df = pd.read_csv('data.csv', encoding='gbk')  # 或 'latin1'
    # 或者自动检测
    import chardet
    with open('data.csv', 'rb') as f:
        encoding = chardet.detect(f.read(10000))['encoding']
    df = pd.read_csv('data.csv', encoding=encoding)

# 分隔符问题
df = pd.read_csv('data.csv', sep=None, engine='python')  # 自动检测

6.2 数据质量检查

def validate_csv(df):
    """CSV 数据验证"""
    issues = []

    # 检查空值
    null_counts = df.isnull().sum()
    if null_counts.sum() > 0:
        issues.append(f"发现 {null_counts.sum()} 个空值")

    # 检查数据类型
    for col in df.select_dtypes(include=['object']).columns:
        if df[col].str.len().max() > 1000:
            issues.append(f"列 '{col}' 可能包含异常长字符串")

    # 检查数值列异常
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    for col in numeric_cols:
        if df[col].std() == 0:
            issues.append(f"列 '{col}' 数值恒定")
        if df[col].min() < 0 and col in ['price', 'quantity']:
            issues.append(f"列 '{col}' 包含负值")

    return issues

# 使用
issues = validate_csv(df)
for issue in issues:
    print(f"⚠️  {issue}")

7. 与其他格式的互转

7.1 CSV 与 Excel

# CSV 转 Excel
df.to_excel('output.xlsx', index=False, engine='openpyxl')

# Excel 转 CSV
df_excel = pd.read_excel('input.xlsx')
df_excel.to_csv('output.csv', index=False)

7.2 CSV 与 JSON

# CSV 转 JSON
df.to_json('output.json', orient='records', indent=2)

# JSON 转 CSV
df_json = pd.read_json('input.json')
df_json.to_csv('output.csv', index=False)

7.3 CSV 与数据库

from sqlalchemy import create_engine

# 写入数据库
engine = create_engine('sqlite:///data.db')
df.to_sql('table_name', engine, if_exists='replace', index=False)

# 从数据库读取
df_db = pd.read_sql('SELECT * FROM table_name', engine)
df_db.to_csv('output.csv', index=False)

8. 最佳实践

8.1 文件操作规范

def robust_csv_read(file_path, **kwargs):
    """健壮的 CSV 读取函数"""
    default_kwargs = {
        'encoding': 'utf-8',
        'low_memory': False,
        'on_bad_lines': 'warn'
    }
    default_kwargs.update(kwargs)

    try:
        df = pd.read_csv(file_path, **default_kwargs)

        # 自动数据清洗
        print(f"读取文件: {file_path}")
        print(f"形状: {df.shape}")
        print(f"内存使用: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

        return df
    except Exception as e:
        print(f"读取失败: {e}")
        # 尝试备用编码
        return pd.read_csv(file_path, encoding='gbk', **default_kwargs)

# 使用
df = robust_csv_read('sales_data.csv', parse_dates=['date'])

8.2 批量处理

import glob
import os

def process_csv_files(pattern):
    """批量处理 CSV 文件"""
    all_dfs = []

    for file_path in glob.glob(pattern):
        print(f"处理: {file_path}")
        df = pd.read_csv(file_path)

        # 添加文件来源列
        df['source_file'] = os.path.basename(file_path)
        all_dfs.append(df)

    # 合并所有文件
    combined_df = pd.concat(all_dfs, ignore_index=True)
    return combined_df

# 处理所有 CSV 文件
df_all = process_csv_files('data/*.csv')
df_all.to_csv('combined_data.csv', index=False)

8.3 配置管理

# config.py
CSV_CONFIG = {
    'encoding': 'utf-8',
    'sep': ',',
    'date_cols': ['order_date', 'ship_date'],
    'category_cols': ['customer_type', 'product_category'],
    'na_values': ['NULL', 'N/A', '', 'null']
}

# 使用配置
def read_configured_csv(file_path):
    config = CSV_CONFIG
    return pd.read_csv(file_path,
                      encoding=config['encoding'],
                      na_values=config['na_values'],
                      parse_dates=config['date_cols'],
                      dtype={col: 'category' for col in config['category_cols']})

df = read_configured_csv('sales.csv')

9. 常见问题解决

9.1 内存不足

# 解决方案1:分块处理
def memory_efficient_read(file_path, func, chunk_size=10000):
    results = []
    for chunk in pd.read_csv(file_path, chunksize=chunk_size):
        processed = func(chunk)
        if processed is not None:
            results.append(processed)
    return pd.concat(results, ignore_index=True)

# 解决方案2:选择性读取
df = pd.read_csv('large.csv', usecols=['important_col1', 'important_col2'])

9.2 编码问题

import chardet

def detect_encoding(file_path):
    with open(file_path, 'rb') as f:
        raw_data = f.read(100000)  # 读取前100KB
        result = chardet.detect(raw_data)
        return result['encoding']

encoding = detect_encoding('data.csv')
df = pd.read_csv('data.csv', encoding=encoding)

9.3 数据不一致

# 检查列一致性(多文件合并前)
def check_column_consistency(file_paths):
    first_cols = pd.read_csv(file_paths[0], nrows=0).columns
    for path in file_paths[1:]:
        cols = pd.read_csv(path, nrows=0).columns
        if not cols.equals(first_cols):
            print(f"列不一致: {path}")
            print(f"期望: {first_cols.tolist()}")
            print(f"实际: {cols.tolist()}")

check_column_consistency(['file1.csv', 'file2.csv'])

CSV 文件操作是 Pandas 的基础功能,掌握各种读取参数、错误处理和性能优化技巧,可以高效处理各种复杂的数据文件。通过实际项目练习,可以快速提升数据导入导出能力。

类似文章

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注