|

Pandas Input/Output (输入输出) API 手册

Pandas Input/Output API 完整手册

1. CSV 文件操作

1.1 读取 CSV

import pandas as pd

# 基本读取
df = pd.read_csv('data.csv')

# 指定参数
df = pd.read_csv(
    'data.csv',
    sep=',',                    # 分隔符
    header=0,                   # 表头行
    names=['col1', 'col2'],     # 自定义列名
    index_col=0,                # 索引列
    usecols=['col1', 'col2'],   # 指定列
    nrows=1000,                 # 读取行数
    skiprows=[1, 2, 5],         # 跳过行
    na_values=['NA', '-1'],     # 空值标识
    dtype={'col1': 'int32'},    # 数据类型
    parse_dates=['date_col'],   # 日期解析
    date_parser=lambda x: pd.to_datetime(x, format='%Y-%m-%d'),
    encoding='utf-8',           # 编码
    compression='gzip',         # 压缩格式
    engine='c',                 # 解析引擎
    low_memory=True             # 低内存模式
)

# 分块读取大文件
for chunk in pd.read_csv('large.csv', chunksize=10000):
    process_chunk(chunk)

# 从URL读取
url_df = pd.read_csv('https://example.com/data.csv')

1.2 写入 CSV

# 基本写入
df.to_csv('output.csv', index=True)

# 高级参数
df.to_csv(
    'output.csv',
    sep=';',                    # 分隔符
    index=False,                # 不写索引
    columns=['col1', 'col2'],   # 指定列
    header=True,                # 写表头
    na_rep='NULL',              # 空值表示
    float_format='%.2f',        # 浮点数格式
    date_format='%Y-%m-%d',     # 日期格式
    encoding='utf-8',           # 编码
    compression='gzip',         # 压缩
    quoting=csv.QUOTE_ALL,      # 引号策略
    line_terminator='\n',       # 行结束符
    chunksize=10000,            # 分块写入
    mode='w'                    # 写入模式
)

# 追加写入
df.to_csv('output.csv', mode='a', header=False, index=False)

2. Excel 文件操作

2.1 读取 Excel

# 安装依赖:pip install openpyxl xlrd
df = pd.read_excel('data.xlsx', sheet_name='Sheet1')

# 多工作表
excel_file = pd.ExcelFile('data.xlsx')
sheets = excel_file.sheet_names
df1 = pd.read_excel(excel_file, sheet_name='Sheet1')
df2 = pd.read_excel(excel_file, sheet_name='Sheet2')

# 读取多个工作表
all_sheets = {sheet: pd.read_excel(excel_file, sheet_name=sheet) 
              for sheet in sheets}

# 高级参数
df = pd.read_excel(
    'data.xlsx',
    sheet_name=0,               # 工作表索引或名称
    header=0,
    names=['col1', 'col2'],
    index_col=0,
    usecols='A:C',              # 列范围(A1:C10)
    nrows=100,
    skiprows=2,
    na_values=['N/A', 'NULL'],
    dtype={'price': 'float'},
    parse_dates=['date'],
    date_parser=pd.to_datetime,
    engine='openpyxl',          # 或 'xlrd'
    converters={'custom': lambda x: custom_parse(x)}
)

# 读取特定区域
df = pd.read_excel('data.xlsx', sheet_name='Sheet1', 
                   skiprows=1, nrows=10, usecols='B:D')

2.2 写入 Excel

# 基本写入
df.to_excel('output.xlsx', sheet_name='Sheet1', index=False)

# 多工作表写入
with pd.ExcelWriter('multi_sheet.xlsx', engine='openpyxl') as writer:
    df1.to_excel(writer, sheet_name='Sales', index=False)
    df2.to_excel(writer, sheet_name='Customers', index=False)
    df3.to_excel(writer, sheet_name='Products', index=False)

# 高级参数
with pd.ExcelWriter('output.xlsx', engine='openpyxl') as writer:
    df.to_excel(writer, sheet_name='Data',
                startrow=1,             # 起始行
                startcol=1,             # 起始列
                index=False,
                header=True,
                na_rep='',
                float_format='%.2f',
                columns=['col1', 'col2'],
                merge_cells=False,
                encoding='utf-8',
                freeze_panes=(1, 1))    # 冻结窗格

# 格式化(需要 openpyxl)
from openpyxl.styles import Font, PatternFill
workbook = writer.book
worksheet = writer.sheets['Data']
for cell in worksheet['1']:
    cell.font = Font(bold=True)

3. JSON 文件操作

3.1 读取 JSON

# 简单JSON
df = pd.read_json('data.json')

# 不同格式
df = pd.read_json(
    'data.json',
    orient='records',           # 记录格式
    lines=True,                 # JSON Lines
    encoding='utf-8',
    dtype=True,
    convert_axes=True,
    precise_float=True
)

# 嵌套JSON
nested_df = pd.read_json('nested.json', orient='split')

# JSON Lines(每行一个JSON对象)
df_lines = pd.read_json('data.jsonl', lines=True)

# 从字符串读取
json_str = '{"col1": [1,2,3], "col2": ["a","b","c"]}'
df_str = pd.read_json(json_str, orient='split')

3.2 写入 JSON

# 基本写入
df.to_json('output.json', orient='records')

# 不同格式
df.to_json(
    'output.json',
    orient='split',             # {'columns': [...], 'data': [...]}
    date_format='iso',          # 或 'epoch'
    double_precision=17,
    force_ascii=True,           # 转义非ASCII
    date_unit='ms',             # 时间戳单位
    default_handler=lambda x: None,  # 自定义序列化
    lines=False,                # JSON Lines
    compression='gzip',
    index=True
)

# JSON Lines
df.to_json('output.jsonl', orient='records', lines=True)

# 压缩
df.to_json('output.json.gz', compression='gzip')

4. SQL 数据库操作

4.1 连接和配置

from sqlalchemy import create_engine
import sqlite3
from psycopg2 import connect
import pymysql

# SQLite
engine_sqlite = create_engine('sqlite:///database.db')
conn_sqlite = sqlite3.connect('database.db')

# PostgreSQL
engine_pg = create_engine('postgresql://user:password@localhost/dbname')
conn_pg = connect("dbname=dbname user=user password=password")

# MySQL
engine_mysql = create_engine('mysql+pymysql://user:password@localhost/dbname')

# 通用连接字符串
engines = {
    'sqlite': 'sqlite:///data.db',
    'mysql': 'mysql+pymysql://user:pass@localhost/db',
    'postgresql': 'postgresql://user:pass@localhost/db',
    'oracle': 'oracle://user:pass@host:port/service'
}

4.2 读取数据库

# SQL查询
query = "SELECT * FROM sales WHERE date > '2023-01-01'"
df = pd.read_sql_query(query, engine)

# 从表读取
df = pd.read_sql_table('sales', engine, schema='public')

# 高级参数
df = pd.read_sql_query(
    query,
    engine,
    index_col='id',
    coerce_float=True,
    parse_dates=['date'],
    columns=['col1', 'col2'],
    chunksize=1000,
    dtype={'price': 'float64'}
)

# 分块读取
for chunk in pd.read_sql_query(query, engine, chunksize=1000):
    process_chunk(chunk)

4.3 写入数据库

# 基本写入
df.to_sql('table_name', engine, if_exists='replace', index=False)

# 高级参数
df.to_sql(
    'sales',
    engine,
    if_exists='append',         # 'fail', 'replace', 'append'
    index=False,
    index_label='id',
    schema='public',
    chunksize=1000,
    dtype={
        'price': sqlalchemy.Float(precision=2),
        'quantity': sqlalchemy.Integer
    },
    method='multi',             # 批量插入方法
    if_table_exists='append'
)

# 分块写入
df.to_sql('large_table', engine, if_exists='append', 
          index=False, chunksize=10000, method='multi')

5. Parquet 和其他列式存储

5.1 Parquet 操作

# 安装:pip install pyarrow 或 fastparquet
import pyarrow.parquet as pq
import pyarrow as pa

# 写入Parquet
df.to_parquet('data.parquet', 
              engine='pyarrow',       # 或 'fastparquet'
              compression='snappy',   # 'gzip', 'brotli', 'none'
              index=True,
              partition_cols=['category', 'region'],  # 分区
              coerce_timestamps='ms')

# 读取Parquet
df = pd.read_parquet('data.parquet',
                     engine='pyarrow',
                     columns=['col1', 'col2'],
                     filters=[('category', '=', 'A')],  # 谓词下推
                     index='id')

# 分区读取
partitioned_df = pd.read_parquet('partitioned_data/',
                                engine='pyarrow')

# Parquet文件元数据
parquet_file = pq.ParquetFile('data.parquet')
print("Schema:", parquet_file.schema)
print("行数:", parquet_file.metadata.num_rows)

5.2 Feather 格式

# 安装:pip install pyarrow
# 快速读写(列式,无压缩)
df.to_feather('data.feather')
df_fast = pd.read_feather('data.feather')

6. HDF5 文件操作

6.1 基本操作

# 安装:pip install tables
df.to_hdf('data.h5', key='df', mode='w')

# 读取
df = pd.read_hdf('data.h5', key='df')

# 多键存储
with pd.HDFStore('data.h5') as store:
    store['df1'] = df1
    store['df2'] = df2
    store['metadata'] = metadata

    # 读取
    df1 = store['df1']
    keys = store.keys()

6.2 高级参数

df.to_hdf(
    'data.h5',
    key='sales',
    mode='a',                   # 'w', 'a'
    format='table',             # 'fixed', 'table'
    append=True,
    data_columns=['category'],
    complevel=9,                # 压缩级别 0-9
    complib='zlib',             # 'zlib', 'lzo', 'bzip2'
    fletcher32=True,            # 校验和
    min_itemsize=100
)

# 查询固定格式
df_query = pd.read_hdf('data.h5', 'sales', 
                       where='category="A"')

7. Pickle 序列化

7.1 基本使用

# 写入
df.to_pickle('data.pkl')

# 读取
df = pd.read_pickle('data.pkl')

# 压缩
df.to_pickle('data.pkl.gz', compression='gzip')

7.2 安全注意

# 安全加载(避免恶意pickle)
import pickle
with open('data.pkl', 'rb') as f:
    # 使用pandas内置方法更安全
    df = pd.read_pickle('data.pkl')

# 自定义协议
df.to_pickle('data.pkl', protocol=4)  # Python 3.4+

8. HTML 和网页数据

8.1 读取 HTML 表格

# 安装:pip install lxml beautifulsoup4 html5lib
dfs = pd.read_html('https://example.com/table.html')

# 指定表格
df = pd.read_html('page.html', match='Sales Table')[0]

# 参数
dfs = pd.read_html(
    io,                         # 文件路径、URL、文件对象
    match='keyword',            # 正则匹配
    attrs={'id': 'table1'},     # CSS选择器
    flavor=['lxml', 'html5lib'], # 解析器
    header=0,
    index_col=0,
    skiprows=[1, 2],
    parse_dates=True,
    converters={'col': lambda x: custom_parse(x)}
)

8.2 写入 HTML

df.to_html('table.html',
           index=False,
           classes='table table-striped',
           border=0,
           table_id='sales-table',
           justify='center',
           bold_rows=True,
           escape=False,              # 不转义HTML
           na_rep='',
           formatters={'price': '${:,.2f}'.format},
           sparsify=True)

9. 剪贴板操作

# 从剪贴板读取
df_clip = pd.read_clipboard()

# 写入剪贴板
df.to_clipboard(index=False)

# Excel格式剪贴板
df.to_clipboard(excel=True)

10. 分块处理和流式 IO

10.1 大文件分块处理

def process_large_file_chunks(filepath, chunksize=10000, func=None):
    """分块处理大文件"""
    total_processed = 0

    for chunk_num, chunk in enumerate(pd.read_csv(filepath, chunksize=chunksize)):
        print(f"处理块 {chunk_num + 1}")

        if func:
            processed_chunk = func(chunk)
        else:
            processed_chunk = chunk

        # 累积或处理
        if total_processed == 0:
            result = processed_chunk
        else:
            result = pd.concat([result, processed_chunk], ignore_index=True)

        total_processed += len(chunk)
        print(f"已处理 {total_processed} 行")

        # 内存控制:定期清理
        if total_processed % 1000000 == 0:
            result = result.tail(100000)  # 只保留最近数据

    return result

# 使用
# result = process_large_file_chunks('large.csv', func=clean_chunk)

10.2 压缩文件处理

# 支持的压缩格式
compressions = ['gzip', 'bz2', 'zip', 'xz', 'zstd', 'infer']

# 自动检测
df = pd.read_csv('data.csv.gz')  # 自动识别gzip
df = pd.read_csv('data.zip')     # 读取zip内第一个文件

# 显式指定
df = pd.read_csv('data.txt.gz', compression='gzip')
df.to_csv('output.txt.gz', compression='gzip')

11. 高级 IO 功能

11.1 自定义格式器

def custom_to_csv(df, path, **kwargs):
    """自定义CSV写入"""
    # 预处理
    df_clean = df.fillna('')

    # 自定义列顺序
    cols = ['important_col'] + [c for c in df_clean if c != 'important_col']
    df_clean = df_clean[cols]

    # 写入
    df_clean.to_csv(path, **kwargs)

# 使用自定义日期格式
df['custom_date'] = df['date'].dt.strftime('%Y%m%d')

11.2 内存优化 IO

def memory_efficient_io(df, output_format='parquet'):
    """内存优化文件写入"""
    if output_format == 'parquet':
        # Parquet列式存储,压缩好
        df.to_parquet('data.parquet', compression='snappy', index=False)
    elif output_format == 'csv':
        # 分块写入CSV
        df.to_csv('data.csv', chunksize=10000, index=False)
    elif output_format == 'hdf5':
        # HDF5分块存储
        df.to_hdf('data.h5', key='data', format='table', 
                 data_columns=True, complevel=9)

# 读取时内存优化
def read_with_optimization(filepath, format='infer'):
    """优化读取"""
    if format == 'parquet':
        return pd.read_parquet(filepath, engine='pyarrow')
    elif format == 'csv':
        # 只读取需要的列和行
        return pd.read_csv(filepath, usecols=['col1', 'col2'], 
                          nrows=1000000, dtype_backend='pyarrow')

11.3 批量操作

def batch_database_operations(dataframes, table_name, engine):
    """批量数据库操作"""
    with engine.connect() as conn:
        # 事务控制
        trans = conn.begin()
        try:
            for i, df in enumerate(dataframes):
                df.to_sql(table_name, conn, if_exists='append' if i > 0 else 'replace',
                         index=False, method='multi')
            trans.commit()
        except Exception as e:
            trans.rollback()
            raise e

# 批量文件处理
def batch_file_export(dfs_dict, base_path):
    """批量导出文件"""
    for name, df in dfs_dict.items():
        filename = f"{base_path}_{name}.parquet"
        df.to_parquet(filename, index=False)
        print(f"导出 {filename}")

12. 错误处理和最佳实践

12.1 异常处理

import logging
from contextlib import contextmanager

@contextmanager
def safe_io_context(filepath, mode='r'):
    """安全的IO上下文"""
    try:
        if mode == 'r':
            with open(filepath, 'r') as f:
                yield f
        else:
            with open(filepath, 'w') as f:
                yield f
    except FileNotFoundError:
        logging.error(f"文件未找到: {filepath}")
        raise
    except PermissionError:
        logging.error(f"权限错误: {filepath}")
        raise
    except Exception as e:
        logging.error(f"IO错误: {e}")
        raise

def robust_read_csv(filepath, **kwargs):
    """健壮的CSV读取"""
    try:
        return pd.read_csv(filepath, **kwargs)
    except pd.errors.EmptyDataError:
        print("文件为空")
        return pd.DataFrame()
    except pd.errors.ParserError as e:
        print(f"解析错误: {e}")
        # 尝试不同编码
        encodings = ['utf-8', 'latin1', 'cp1252']
        for enc in encodings[1:]:
            try:
                return pd.read_csv(filepath, encoding=enc, **kwargs)
            except:
                continue
        raise

12.2 性能监控

import time
from functools import wraps

def time_io(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        elapsed = time.time() - start_time
        print(f"{func.__name__} 耗时: {elapsed:.2f}秒")
        return result
    return wrapper

@time_io
def benchmark_read_csv(filepath):
    return pd.read_csv(filepath)

@time_io
def benchmark_read_parquet(filepath):
    return pd.read_parquet(filepath)

12.3 文件完整性检查

import hashlib

def verify_file_integrity(filepath, expected_hash=None):
    """验证文件完整性"""
    hash_md5 = hashlib.md5()
    with open(filepath, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    actual_hash = hash_md5.hexdigest()

    if expected_hash and actual_hash != expected_hash:
        raise ValueError(f"文件完整性检查失败: {actual_hash} != {expected_hash}")

    return actual_hash

# 使用
file_hash = verify_file_integrity('data.parquet')
print(f"文件MD5: {file_hash}")

13. 格式选择指南

格式读取速度写入速度压缩比适用场景
CSV中等通用,兼容性好
Parquet大数据,列式查询
Feather最快最快临时存储,跨语言
HDF5中等中等层次数据,查询
Excel报表,Excel兼容
JSONWeb API,文档

14. 生产环境最佳实践

  1. 优先使用列式格式(Parquet/Feather)处理大数据
  2. 分块处理超大文件,避免内存溢出
  3. 数据类型优化在IO前后进行内存优化
  4. 连接池管理数据库连接,使用context manager
  5. 错误恢复实现重试机制和断点续传
  6. 监控告警记录IO性能和错误日志
  7. 版本控制使用时间戳或版本号命名文件
  8. 权限管理确保文件系统权限正确

Pandas IO API 提供了丰富的文件格式支持和灵活的参数配置。通过合理选择格式和参数,可以显著提升数据处理效率和稳定性。

类似文章

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注