Pandas Input/Output (输入输出) API 手册
Pandas Input/Output API 完整手册
1. CSV 文件操作
1.1 读取 CSV
import pandas as pd
# 基本读取
df = pd.read_csv('data.csv')
# 指定参数
df = pd.read_csv(
'data.csv',
sep=',', # 分隔符
header=0, # 表头行
names=['col1', 'col2'], # 自定义列名
index_col=0, # 索引列
usecols=['col1', 'col2'], # 指定列
nrows=1000, # 读取行数
skiprows=[1, 2, 5], # 跳过行
na_values=['NA', '-1'], # 空值标识
dtype={'col1': 'int32'}, # 数据类型
parse_dates=['date_col'], # 日期解析
date_parser=lambda x: pd.to_datetime(x, format='%Y-%m-%d'),
encoding='utf-8', # 编码
compression='gzip', # 压缩格式
engine='c', # 解析引擎
low_memory=True # 低内存模式
)
# 分块读取大文件
for chunk in pd.read_csv('large.csv', chunksize=10000):
process_chunk(chunk)
# 从URL读取
url_df = pd.read_csv('https://example.com/data.csv')
1.2 写入 CSV
# 基本写入
df.to_csv('output.csv', index=True)
# 高级参数
df.to_csv(
'output.csv',
sep=';', # 分隔符
index=False, # 不写索引
columns=['col1', 'col2'], # 指定列
header=True, # 写表头
na_rep='NULL', # 空值表示
float_format='%.2f', # 浮点数格式
date_format='%Y-%m-%d', # 日期格式
encoding='utf-8', # 编码
compression='gzip', # 压缩
quoting=csv.QUOTE_ALL, # 引号策略
line_terminator='\n', # 行结束符
chunksize=10000, # 分块写入
mode='w' # 写入模式
)
# 追加写入
df.to_csv('output.csv', mode='a', header=False, index=False)
2. Excel 文件操作
2.1 读取 Excel
# 安装依赖:pip install openpyxl xlrd
df = pd.read_excel('data.xlsx', sheet_name='Sheet1')
# 多工作表
excel_file = pd.ExcelFile('data.xlsx')
sheets = excel_file.sheet_names
df1 = pd.read_excel(excel_file, sheet_name='Sheet1')
df2 = pd.read_excel(excel_file, sheet_name='Sheet2')
# 读取多个工作表
all_sheets = {sheet: pd.read_excel(excel_file, sheet_name=sheet)
for sheet in sheets}
# 高级参数
df = pd.read_excel(
'data.xlsx',
sheet_name=0, # 工作表索引或名称
header=0,
names=['col1', 'col2'],
index_col=0,
usecols='A:C', # 列范围(A1:C10)
nrows=100,
skiprows=2,
na_values=['N/A', 'NULL'],
dtype={'price': 'float'},
parse_dates=['date'],
date_parser=pd.to_datetime,
engine='openpyxl', # 或 'xlrd'
converters={'custom': lambda x: custom_parse(x)}
)
# 读取特定区域
df = pd.read_excel('data.xlsx', sheet_name='Sheet1',
skiprows=1, nrows=10, usecols='B:D')
2.2 写入 Excel
# 基本写入
df.to_excel('output.xlsx', sheet_name='Sheet1', index=False)
# 多工作表写入
with pd.ExcelWriter('multi_sheet.xlsx', engine='openpyxl') as writer:
df1.to_excel(writer, sheet_name='Sales', index=False)
df2.to_excel(writer, sheet_name='Customers', index=False)
df3.to_excel(writer, sheet_name='Products', index=False)
# 高级参数
with pd.ExcelWriter('output.xlsx', engine='openpyxl') as writer:
df.to_excel(writer, sheet_name='Data',
startrow=1, # 起始行
startcol=1, # 起始列
index=False,
header=True,
na_rep='',
float_format='%.2f',
columns=['col1', 'col2'],
merge_cells=False,
encoding='utf-8',
freeze_panes=(1, 1)) # 冻结窗格
# 格式化(需要 openpyxl)
from openpyxl.styles import Font, PatternFill
workbook = writer.book
worksheet = writer.sheets['Data']
for cell in worksheet['1']:
cell.font = Font(bold=True)
3. JSON 文件操作
3.1 读取 JSON
# 简单JSON
df = pd.read_json('data.json')
# 不同格式
df = pd.read_json(
'data.json',
orient='records', # 记录格式
lines=True, # JSON Lines
encoding='utf-8',
dtype=True,
convert_axes=True,
precise_float=True
)
# 嵌套JSON
nested_df = pd.read_json('nested.json', orient='split')
# JSON Lines(每行一个JSON对象)
df_lines = pd.read_json('data.jsonl', lines=True)
# 从字符串读取
json_str = '{"col1": [1,2,3], "col2": ["a","b","c"]}'
df_str = pd.read_json(json_str, orient='split')
3.2 写入 JSON
# 基本写入
df.to_json('output.json', orient='records')
# 不同格式
df.to_json(
'output.json',
orient='split', # {'columns': [...], 'data': [...]}
date_format='iso', # 或 'epoch'
double_precision=17,
force_ascii=True, # 转义非ASCII
date_unit='ms', # 时间戳单位
default_handler=lambda x: None, # 自定义序列化
lines=False, # JSON Lines
compression='gzip',
index=True
)
# JSON Lines
df.to_json('output.jsonl', orient='records', lines=True)
# 压缩
df.to_json('output.json.gz', compression='gzip')
4. SQL 数据库操作
4.1 连接和配置
from sqlalchemy import create_engine
import sqlite3
from psycopg2 import connect
import pymysql
# SQLite
engine_sqlite = create_engine('sqlite:///database.db')
conn_sqlite = sqlite3.connect('database.db')
# PostgreSQL
engine_pg = create_engine('postgresql://user:password@localhost/dbname')
conn_pg = connect("dbname=dbname user=user password=password")
# MySQL
engine_mysql = create_engine('mysql+pymysql://user:password@localhost/dbname')
# 通用连接字符串
engines = {
'sqlite': 'sqlite:///data.db',
'mysql': 'mysql+pymysql://user:pass@localhost/db',
'postgresql': 'postgresql://user:pass@localhost/db',
'oracle': 'oracle://user:pass@host:port/service'
}
4.2 读取数据库
# SQL查询
query = "SELECT * FROM sales WHERE date > '2023-01-01'"
df = pd.read_sql_query(query, engine)
# 从表读取
df = pd.read_sql_table('sales', engine, schema='public')
# 高级参数
df = pd.read_sql_query(
query,
engine,
index_col='id',
coerce_float=True,
parse_dates=['date'],
columns=['col1', 'col2'],
chunksize=1000,
dtype={'price': 'float64'}
)
# 分块读取
for chunk in pd.read_sql_query(query, engine, chunksize=1000):
process_chunk(chunk)
4.3 写入数据库
# 基本写入
df.to_sql('table_name', engine, if_exists='replace', index=False)
# 高级参数
df.to_sql(
'sales',
engine,
if_exists='append', # 'fail', 'replace', 'append'
index=False,
index_label='id',
schema='public',
chunksize=1000,
dtype={
'price': sqlalchemy.Float(precision=2),
'quantity': sqlalchemy.Integer
},
method='multi', # 批量插入方法
if_table_exists='append'
)
# 分块写入
df.to_sql('large_table', engine, if_exists='append',
index=False, chunksize=10000, method='multi')
5. Parquet 和其他列式存储
5.1 Parquet 操作
# 安装:pip install pyarrow 或 fastparquet
import pyarrow.parquet as pq
import pyarrow as pa
# 写入Parquet
df.to_parquet('data.parquet',
engine='pyarrow', # 或 'fastparquet'
compression='snappy', # 'gzip', 'brotli', 'none'
index=True,
partition_cols=['category', 'region'], # 分区
coerce_timestamps='ms')
# 读取Parquet
df = pd.read_parquet('data.parquet',
engine='pyarrow',
columns=['col1', 'col2'],
filters=[('category', '=', 'A')], # 谓词下推
index='id')
# 分区读取
partitioned_df = pd.read_parquet('partitioned_data/',
engine='pyarrow')
# Parquet文件元数据
parquet_file = pq.ParquetFile('data.parquet')
print("Schema:", parquet_file.schema)
print("行数:", parquet_file.metadata.num_rows)
5.2 Feather 格式
# 安装:pip install pyarrow
# 快速读写(列式,无压缩)
df.to_feather('data.feather')
df_fast = pd.read_feather('data.feather')
6. HDF5 文件操作
6.1 基本操作
# 安装:pip install tables
df.to_hdf('data.h5', key='df', mode='w')
# 读取
df = pd.read_hdf('data.h5', key='df')
# 多键存储
with pd.HDFStore('data.h5') as store:
store['df1'] = df1
store['df2'] = df2
store['metadata'] = metadata
# 读取
df1 = store['df1']
keys = store.keys()
6.2 高级参数
df.to_hdf(
'data.h5',
key='sales',
mode='a', # 'w', 'a'
format='table', # 'fixed', 'table'
append=True,
data_columns=['category'],
complevel=9, # 压缩级别 0-9
complib='zlib', # 'zlib', 'lzo', 'bzip2'
fletcher32=True, # 校验和
min_itemsize=100
)
# 查询固定格式
df_query = pd.read_hdf('data.h5', 'sales',
where='category="A"')
7. Pickle 序列化
7.1 基本使用
# 写入
df.to_pickle('data.pkl')
# 读取
df = pd.read_pickle('data.pkl')
# 压缩
df.to_pickle('data.pkl.gz', compression='gzip')
7.2 安全注意
# 安全加载(避免恶意pickle)
import pickle
with open('data.pkl', 'rb') as f:
# 使用pandas内置方法更安全
df = pd.read_pickle('data.pkl')
# 自定义协议
df.to_pickle('data.pkl', protocol=4) # Python 3.4+
8. HTML 和网页数据
8.1 读取 HTML 表格
# 安装:pip install lxml beautifulsoup4 html5lib
dfs = pd.read_html('https://example.com/table.html')
# 指定表格
df = pd.read_html('page.html', match='Sales Table')[0]
# 参数
dfs = pd.read_html(
io, # 文件路径、URL、文件对象
match='keyword', # 正则匹配
attrs={'id': 'table1'}, # CSS选择器
flavor=['lxml', 'html5lib'], # 解析器
header=0,
index_col=0,
skiprows=[1, 2],
parse_dates=True,
converters={'col': lambda x: custom_parse(x)}
)
8.2 写入 HTML
df.to_html('table.html',
index=False,
classes='table table-striped',
border=0,
table_id='sales-table',
justify='center',
bold_rows=True,
escape=False, # 不转义HTML
na_rep='',
formatters={'price': '${:,.2f}'.format},
sparsify=True)
9. 剪贴板操作
# 从剪贴板读取
df_clip = pd.read_clipboard()
# 写入剪贴板
df.to_clipboard(index=False)
# Excel格式剪贴板
df.to_clipboard(excel=True)
10. 分块处理和流式 IO
10.1 大文件分块处理
def process_large_file_chunks(filepath, chunksize=10000, func=None):
"""分块处理大文件"""
total_processed = 0
for chunk_num, chunk in enumerate(pd.read_csv(filepath, chunksize=chunksize)):
print(f"处理块 {chunk_num + 1}")
if func:
processed_chunk = func(chunk)
else:
processed_chunk = chunk
# 累积或处理
if total_processed == 0:
result = processed_chunk
else:
result = pd.concat([result, processed_chunk], ignore_index=True)
total_processed += len(chunk)
print(f"已处理 {total_processed} 行")
# 内存控制:定期清理
if total_processed % 1000000 == 0:
result = result.tail(100000) # 只保留最近数据
return result
# 使用
# result = process_large_file_chunks('large.csv', func=clean_chunk)
10.2 压缩文件处理
# 支持的压缩格式
compressions = ['gzip', 'bz2', 'zip', 'xz', 'zstd', 'infer']
# 自动检测
df = pd.read_csv('data.csv.gz') # 自动识别gzip
df = pd.read_csv('data.zip') # 读取zip内第一个文件
# 显式指定
df = pd.read_csv('data.txt.gz', compression='gzip')
df.to_csv('output.txt.gz', compression='gzip')
11. 高级 IO 功能
11.1 自定义格式器
def custom_to_csv(df, path, **kwargs):
"""自定义CSV写入"""
# 预处理
df_clean = df.fillna('')
# 自定义列顺序
cols = ['important_col'] + [c for c in df_clean if c != 'important_col']
df_clean = df_clean[cols]
# 写入
df_clean.to_csv(path, **kwargs)
# 使用自定义日期格式
df['custom_date'] = df['date'].dt.strftime('%Y%m%d')
11.2 内存优化 IO
def memory_efficient_io(df, output_format='parquet'):
"""内存优化文件写入"""
if output_format == 'parquet':
# Parquet列式存储,压缩好
df.to_parquet('data.parquet', compression='snappy', index=False)
elif output_format == 'csv':
# 分块写入CSV
df.to_csv('data.csv', chunksize=10000, index=False)
elif output_format == 'hdf5':
# HDF5分块存储
df.to_hdf('data.h5', key='data', format='table',
data_columns=True, complevel=9)
# 读取时内存优化
def read_with_optimization(filepath, format='infer'):
"""优化读取"""
if format == 'parquet':
return pd.read_parquet(filepath, engine='pyarrow')
elif format == 'csv':
# 只读取需要的列和行
return pd.read_csv(filepath, usecols=['col1', 'col2'],
nrows=1000000, dtype_backend='pyarrow')
11.3 批量操作
def batch_database_operations(dataframes, table_name, engine):
"""批量数据库操作"""
with engine.connect() as conn:
# 事务控制
trans = conn.begin()
try:
for i, df in enumerate(dataframes):
df.to_sql(table_name, conn, if_exists='append' if i > 0 else 'replace',
index=False, method='multi')
trans.commit()
except Exception as e:
trans.rollback()
raise e
# 批量文件处理
def batch_file_export(dfs_dict, base_path):
"""批量导出文件"""
for name, df in dfs_dict.items():
filename = f"{base_path}_{name}.parquet"
df.to_parquet(filename, index=False)
print(f"导出 {filename}")
12. 错误处理和最佳实践
12.1 异常处理
import logging
from contextlib import contextmanager
@contextmanager
def safe_io_context(filepath, mode='r'):
"""安全的IO上下文"""
try:
if mode == 'r':
with open(filepath, 'r') as f:
yield f
else:
with open(filepath, 'w') as f:
yield f
except FileNotFoundError:
logging.error(f"文件未找到: {filepath}")
raise
except PermissionError:
logging.error(f"权限错误: {filepath}")
raise
except Exception as e:
logging.error(f"IO错误: {e}")
raise
def robust_read_csv(filepath, **kwargs):
"""健壮的CSV读取"""
try:
return pd.read_csv(filepath, **kwargs)
except pd.errors.EmptyDataError:
print("文件为空")
return pd.DataFrame()
except pd.errors.ParserError as e:
print(f"解析错误: {e}")
# 尝试不同编码
encodings = ['utf-8', 'latin1', 'cp1252']
for enc in encodings[1:]:
try:
return pd.read_csv(filepath, encoding=enc, **kwargs)
except:
continue
raise
12.2 性能监控
import time
from functools import wraps
def time_io(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
elapsed = time.time() - start_time
print(f"{func.__name__} 耗时: {elapsed:.2f}秒")
return result
return wrapper
@time_io
def benchmark_read_csv(filepath):
return pd.read_csv(filepath)
@time_io
def benchmark_read_parquet(filepath):
return pd.read_parquet(filepath)
12.3 文件完整性检查
import hashlib
def verify_file_integrity(filepath, expected_hash=None):
"""验证文件完整性"""
hash_md5 = hashlib.md5()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
actual_hash = hash_md5.hexdigest()
if expected_hash and actual_hash != expected_hash:
raise ValueError(f"文件完整性检查失败: {actual_hash} != {expected_hash}")
return actual_hash
# 使用
file_hash = verify_file_integrity('data.parquet')
print(f"文件MD5: {file_hash}")
13. 格式选择指南
格式 | 读取速度 | 写入速度 | 压缩比 | 适用场景 |
---|---|---|---|---|
CSV | 中等 | 慢 | 差 | 通用,兼容性好 |
Parquet | 快 | 快 | 好 | 大数据,列式查询 |
Feather | 最快 | 最快 | 无 | 临时存储,跨语言 |
HDF5 | 中等 | 中等 | 好 | 层次数据,查询 |
Excel | 慢 | 慢 | 无 | 报表,Excel兼容 |
JSON | 慢 | 慢 | 差 | Web API,文档 |
14. 生产环境最佳实践
- 优先使用列式格式(Parquet/Feather)处理大数据
- 分块处理超大文件,避免内存溢出
- 数据类型优化在IO前后进行内存优化
- 连接池管理数据库连接,使用context manager
- 错误恢复实现重试机制和断点续传
- 监控告警记录IO性能和错误日志
- 版本控制使用时间戳或版本号命名文件
- 权限管理确保文件系统权限正确
Pandas IO API 提供了丰富的文件格式支持和灵活的参数配置。通过合理选择格式和参数,可以显著提升数据处理效率和稳定性。