Pandas CSV 文件
Pandas CSV 文件操作详解
1. CSV 文件读取
1.1 基本读取
import pandas as pd
# 读取 CSV 文件
df = pd.read_csv('data.csv')
print(df.head())
# 指定路径
df = pd.read_csv('./data/sales.csv')
df = pd.read_csv('https://example.com/data.csv') # URL
1.2 常用参数
# 核心参数
df = pd.read_csv('data.csv',
sep=',', # 分隔符,默认逗号
header=0, # 表头行,默认第0行
names=['col1', 'col2'], # 自定义列名
index_col=0, # 设置索引列
usecols=['A', 'B'], # 只读取指定列
nrows=100, # 读取前N行
skiprows=[0, 2, 3], # 跳过指定行
skip_blank_lines=True, # 跳过空行
na_values=['NULL', 'N/A'], # 自定义空值标识
encoding='utf-8', # 编码
dtype={'col1': 'int32', 'col2': 'category'}, # 数据类型
parse_dates=['date_col'], # 解析日期列
date_parser=lambda x: pd.to_datetime(x, format='%Y-%m-%d') # 自定义日期解析
)
1.3 处理大文件
# 分块读取(内存优化)
chunk_iter = pd.read_csv('large_file.csv', chunksize=10000)
processed_chunks = []
for chunk in chunk_iter:
# 处理每个块
processed_chunk = chunk[chunk['value'] > 100] # 示例过滤
processed_chunks.append(processed_chunk)
df = pd.concat(processed_chunks, ignore_index=True)
1.4 分隔符处理
# 不同分隔符
df_tab = pd.read_csv('data.tsv', sep='\t') # 制表符
df_semicolon = pd.read_csv('data.csv', sep=';') # 分号
df_pipe = pd.read_csv('data.txt', sep='|') # 竖线
df_space = pd.read_csv('data.txt', sep='\s+') # 空格(正则)
# 自动检测分隔符
df_auto = pd.read_csv('data.csv', sep=None, engine='python')
2. CSV 文件写入
2.1 基本写入
# 写入 CSV
df.to_csv('output.csv', index=False) # 不写索引
# 指定参数
df.to_csv('output.csv',
index=False, # 不写行索引
columns=['A', 'B'], # 只写指定列
sep=';', # 分隔符
encoding='utf-8', # 编码
na_rep='NULL', # 空值替换
date_format='%Y-%m-%d', # 日期格式
float_format='%.2f', # 浮点数格式
decimal=',', # 小数点
quoting=csv.QUOTE_ALL # 引用方式
)
2.2 追加写入
# 追加模式(需要手动处理)
df1 = pd.DataFrame({'A': [1, 2]})
df2 = pd.DataFrame({'A': [3, 4]})
df1.to_csv('output.csv', index=False)
df2.to_csv('output.csv', mode='a', header=False, index=False) # 追加,不写表头
2.3 分块写入
# 大文件分块写入
chunk_iter = pd.read_csv('large_input.csv', chunksize=10000)
with open('large_output.csv', 'w') as f:
first_chunk = True
for chunk in chunk_iter:
chunk.to_csv(f,
index=False,
header=first_chunk, # 第一个块写表头
mode='a' if not first_chunk else 'w')
first_chunk = False
3. 高级读取技巧
3.1 压缩文件
# 自动检测压缩格式
df_gz = pd.read_csv('data.csv.gz')
df_bz2 = pd.read_csv('data.csv.bz2')
df_zip = pd.read_csv('data.csv.zip')
# 明确指定
df = pd.read_csv('data.csv.gz', compression='gzip')
3.2 多表头和复杂结构
# 多级表头
df_multi_header = pd.read_csv('data.csv', header=[0, 1]) # 前两行作为表头
# 无表头,自定义列名
df_no_header = pd.read_csv('data.csv', header=None,
names=['col1', 'col2', 'col3'])
# 跳过注释行
df_comments = pd.read_csv('data.csv', comment='#')
3.3 数据类型优化
# 预先指定数据类型
dtypes = {
'id': 'int32',
'price': 'float32',
'category': 'category',
'date': 'datetime64[ns]'
}
df = pd.read_csv('data.csv', dtype=dtypes, parse_dates=['date'])
# 延迟类型推断(低内存模式)
df_low_memory = pd.read_csv('data.csv', low_memory=True)
3.4 错误处理
# 容错读取
try:
df = pd.read_csv('data.csv', on_bad_lines='skip') # 跳过错误行
except pd.errors.ParserError as e:
print(f"解析错误: {e}")
df = pd.read_csv('data.csv', on_bad_lines='warn') # 警告但继续
# 验证数据
df = pd.read_csv('data.csv', validate_header=True) # 检查表头
4. 实际应用示例
4.1 销售数据处理
# 读取销售数据
sales_df = pd.read_csv('sales.csv',
parse_dates=['order_date', 'ship_date'],
dtype={'customer_id': 'category',
'product_id': 'category'})
print("销售数据预览:")
print(sales_df.head())
print("\n数据类型:")
print(sales_df.dtypes)
print("\n缺失值统计:")
print(sales_df.isnull().sum())
4.2 大文件分块处理
def process_large_csv(file_path, chunk_size=10000):
"""分块处理大 CSV 文件"""
total_rows = 0
total_revenue = 0
for chunk in pd.read_csv(file_path, chunksize=chunk_size):
# 数据清洗
chunk = chunk.dropna(subset=['price', 'quantity'])
chunk['revenue'] = chunk['price'] * chunk['quantity']
# 统计
total_rows += len(chunk)
total_revenue += chunk['revenue'].sum()
print(f"处理了 {total_rows} 行,总收入: {total_revenue:,.2f}")
return total_rows, total_revenue
# 使用
rows, revenue = process_large_csv('large_sales.csv')
4.3 数据验证和清洗
def clean_csv_data(df):
"""CSV 数据清洗函数"""
# 1. 检查并处理缺失值
print("缺失值统计:")
print(df.isnull().sum())
# 2. 数据类型转换
numeric_cols = ['price', 'quantity']
for col in numeric_cols:
df[col] = pd.to_numeric(df[col], errors='coerce')
# 3. 删除无效数据
df = df.dropna(subset=numeric_cols)
df = df[df['price'] > 0] # 价格必须大于0
# 4. 字符串清洗
if 'customer_name' in df.columns:
df['customer_name'] = df['customer_name'].str.strip().str.title()
# 5. 重复值处理
initial_len = len(df)
df = df.drop_duplicates()
print(f"删除重复行: {initial_len - len(df)} 行")
return df
# 应用清洗
df_clean = clean_csv_data(pd.read_csv('raw_data.csv'))
df_clean.to_csv('cleaned_data.csv', index=False)
5. 性能优化
5.1 内存优化
# 读取时优化数据类型
def optimize_dtypes(df):
"""优化数据类型以减少内存使用"""
for col in df.columns:
if df[col].dtype == 'object':
# 尝试转换为分类
if df[col].nunique() / len(df) < 0.5:
df[col] = df[col].astype('category')
elif 'int' in str(df[col].dtype):
# 转换为更小整数类型
df[col] = pd.to_numeric(df[col], downcast='integer')
elif 'float' in str(df[col].dtype):
df[col] = pd.to_numeric(df[col], downcast='float')
return df
# 使用
df = pd.read_csv('data.csv')
df_optimized = optimize_dtypes(df)
print(f"内存节省: {df.memory_usage(deep=True).sum() - df_optimized.memory_usage(deep=True).sum()} bytes")
5.2 并行处理
# 使用 Dask 处理超大文件(需要安装 dask)
import dask.dataframe as dd
# Dask DataFrame
ddf = dd.read_csv('large_file.csv')
result = ddf.groupby('category')['value'].sum().compute() # 延迟计算
5.3 快速读取
# 使用 pyarrow 引擎(更快)
df_arrow = pd.read_csv('data.csv', engine='pyarrow')
# 或者使用 Polars(Rust 实现,更快)
# import polars as pl
# df_polars = pl.read_csv('data.csv')
6. 错误处理和调试
6.1 常见问题
# 编码问题
try:
df = pd.read_csv('data.csv', encoding='utf-8')
except UnicodeDecodeError:
df = pd.read_csv('data.csv', encoding='gbk') # 或 'latin1'
# 或者自动检测
import chardet
with open('data.csv', 'rb') as f:
encoding = chardet.detect(f.read(10000))['encoding']
df = pd.read_csv('data.csv', encoding=encoding)
# 分隔符问题
df = pd.read_csv('data.csv', sep=None, engine='python') # 自动检测
6.2 数据质量检查
def validate_csv(df):
"""CSV 数据验证"""
issues = []
# 检查空值
null_counts = df.isnull().sum()
if null_counts.sum() > 0:
issues.append(f"发现 {null_counts.sum()} 个空值")
# 检查数据类型
for col in df.select_dtypes(include=['object']).columns:
if df[col].str.len().max() > 1000:
issues.append(f"列 '{col}' 可能包含异常长字符串")
# 检查数值列异常
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
if df[col].std() == 0:
issues.append(f"列 '{col}' 数值恒定")
if df[col].min() < 0 and col in ['price', 'quantity']:
issues.append(f"列 '{col}' 包含负值")
return issues
# 使用
issues = validate_csv(df)
for issue in issues:
print(f"⚠️ {issue}")
7. 与其他格式的互转
7.1 CSV 与 Excel
# CSV 转 Excel
df.to_excel('output.xlsx', index=False, engine='openpyxl')
# Excel 转 CSV
df_excel = pd.read_excel('input.xlsx')
df_excel.to_csv('output.csv', index=False)
7.2 CSV 与 JSON
# CSV 转 JSON
df.to_json('output.json', orient='records', indent=2)
# JSON 转 CSV
df_json = pd.read_json('input.json')
df_json.to_csv('output.csv', index=False)
7.3 CSV 与数据库
from sqlalchemy import create_engine
# 写入数据库
engine = create_engine('sqlite:///data.db')
df.to_sql('table_name', engine, if_exists='replace', index=False)
# 从数据库读取
df_db = pd.read_sql('SELECT * FROM table_name', engine)
df_db.to_csv('output.csv', index=False)
8. 最佳实践
8.1 文件操作规范
def robust_csv_read(file_path, **kwargs):
"""健壮的 CSV 读取函数"""
default_kwargs = {
'encoding': 'utf-8',
'low_memory': False,
'on_bad_lines': 'warn'
}
default_kwargs.update(kwargs)
try:
df = pd.read_csv(file_path, **default_kwargs)
# 自动数据清洗
print(f"读取文件: {file_path}")
print(f"形状: {df.shape}")
print(f"内存使用: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
return df
except Exception as e:
print(f"读取失败: {e}")
# 尝试备用编码
return pd.read_csv(file_path, encoding='gbk', **default_kwargs)
# 使用
df = robust_csv_read('sales_data.csv', parse_dates=['date'])
8.2 批量处理
import glob
import os
def process_csv_files(pattern):
"""批量处理 CSV 文件"""
all_dfs = []
for file_path in glob.glob(pattern):
print(f"处理: {file_path}")
df = pd.read_csv(file_path)
# 添加文件来源列
df['source_file'] = os.path.basename(file_path)
all_dfs.append(df)
# 合并所有文件
combined_df = pd.concat(all_dfs, ignore_index=True)
return combined_df
# 处理所有 CSV 文件
df_all = process_csv_files('data/*.csv')
df_all.to_csv('combined_data.csv', index=False)
8.3 配置管理
# config.py
CSV_CONFIG = {
'encoding': 'utf-8',
'sep': ',',
'date_cols': ['order_date', 'ship_date'],
'category_cols': ['customer_type', 'product_category'],
'na_values': ['NULL', 'N/A', '', 'null']
}
# 使用配置
def read_configured_csv(file_path):
config = CSV_CONFIG
return pd.read_csv(file_path,
encoding=config['encoding'],
na_values=config['na_values'],
parse_dates=config['date_cols'],
dtype={col: 'category' for col in config['category_cols']})
df = read_configured_csv('sales.csv')
9. 常见问题解决
9.1 内存不足
# 解决方案1:分块处理
def memory_efficient_read(file_path, func, chunk_size=10000):
results = []
for chunk in pd.read_csv(file_path, chunksize=chunk_size):
processed = func(chunk)
if processed is not None:
results.append(processed)
return pd.concat(results, ignore_index=True)
# 解决方案2:选择性读取
df = pd.read_csv('large.csv', usecols=['important_col1', 'important_col2'])
9.2 编码问题
import chardet
def detect_encoding(file_path):
with open(file_path, 'rb') as f:
raw_data = f.read(100000) # 读取前100KB
result = chardet.detect(raw_data)
return result['encoding']
encoding = detect_encoding('data.csv')
df = pd.read_csv('data.csv', encoding=encoding)
9.3 数据不一致
# 检查列一致性(多文件合并前)
def check_column_consistency(file_paths):
first_cols = pd.read_csv(file_paths[0], nrows=0).columns
for path in file_paths[1:]:
cols = pd.read_csv(path, nrows=0).columns
if not cols.equals(first_cols):
print(f"列不一致: {path}")
print(f"期望: {first_cols.tolist()}")
print(f"实际: {cols.tolist()}")
check_column_consistency(['file1.csv', 'file2.csv'])
CSV 文件操作是 Pandas 的基础功能,掌握各种读取参数、错误处理和性能优化技巧,可以高效处理各种复杂的数据文件。通过实际项目练习,可以快速提升数据导入导出能力。