|

Pandas Excel 文件操作详解

1. 安装依赖

1.1 必需库

# 读取 .xlsx 文件
pip install openpyxl

# 读取 .xls 文件(旧格式)
pip install xlrd

# 写入 Excel(推荐)
pip install openpyxl

# 高级功能(图表、格式)
pip install xlsxwriter

# 快速读取(可选)
pip install pyxlsb  # .xlsb 文件
pip install odfpy   # .ods 文件

1.2 验证安装

import pandas as pd

# 检查引擎
print(pd.ExcelFile('test.xlsx').engine)
print(pd.__version__)

# 测试读取
try:
    df = pd.read_excel('test.xlsx')
    print("Excel 支持正常")
except ImportError:
    print("需要安装 openpyxl 或 xlrd")

2. 读取 Excel 文件

2.1 基本读取

# 读取第一个工作表
df = pd.read_excel('data.xlsx')

# 指定工作表
df_sheet1 = pd.read_excel('data.xlsx', sheet_name='Sheet1')
df_sheet2 = pd.read_excel('data.xlsx', sheet_name='销售数据')

# 工作表索引
df_index = pd.read_excel('data.xlsx', sheet_name=0)  # 第一个表

2.2 常用参数

df = pd.read_excel('data.xlsx',
                  sheet_name='数据',        # 工作表名或索引
                  header=0,                # 表头行
                  names=['col1', 'col2'],  # 自定义列名
                  index_col=0,             # 索引列
                  usecols='A:C',           # 列范围(A1:C10)
                  nrows=100,               # 读取前N行
                  skiprows=2,              # 跳过前N行
                  skipfooter=5,            # 跳过最后N行
                  na_values=['N/A', 'NULL'], # 空值标识
                  dtype={'price': 'float'}, # 数据类型
                  parse_dates=['日期'],    # 日期列
                  date_parser=lambda x: pd.to_datetime(x, format='%Y/%m/%d'),
                  engine='openpyxl',       # 引擎
                  convert_float=True       # 转换浮点数
                  )

2.3 读取多个工作表

# 读取所有工作表到字典
excel_file = pd.ExcelFile('data.xlsx')
sheet_names = excel_file.sheet_names
print(f"工作表: {sheet_names}")

all_sheets = {}
for sheet in sheet_names:
    all_sheets[sheet] = pd.read_excel(excel_file, sheet_name=sheet)

# 或者一次性读取
all_sheets = pd.read_excel('data.xlsx', sheet_name=None)  # 返回字典

# 合并所有工作表
combined_df = pd.concat(all_sheets.values(), 
                       keys=all_sheets.keys(),
                       names=['工作表', '索引'])

2.4 复杂结构处理

# 读取特定范围
df_range = pd.read_excel('data.xlsx', 
                        sheet_name='数据',
                        usecols='A1:D20',     # 范围
                        skiprows=1,           # 跳过标题行
                        nrows=15)             # 读取15行

# 多级表头
df_multi_header = pd.read_excel('data.xlsx', 
                               header=[0, 1])  # 前两行作为多级表头

# 无表头
df_no_header = pd.read_excel('data.xlsx', 
                            header=None,
                            names=['产品', '销量', '价格', '日期'])

3. 写入 Excel 文件

3.1 基本写入

# 写入单个工作表
df.to_excel('output.xlsx', index=False)

# 指定工作表名
df.to_excel('output.xlsx', sheet_name='销售数据', index=False)

# 指定引擎
df.to_excel('output.xlsx', engine='openpyxl', index=False)

3.2 写入多个工作表

# 方法1:使用 ExcelWriter
with pd.ExcelWriter('multi_sheets.xlsx', engine='openpyxl') as writer:
    df1.to_excel(writer, sheet_name='销售', index=False)
    df2.to_excel(writer, sheet_name='客户', index=False)
    df3.to_excel(writer, sheet_name='产品', index=False)

# 方法2:字典方式
data_dict = {'销售': df1, '客户': df2, '产品': df3}
with pd.ExcelWriter('multi_sheets.xlsx') as writer:
    for sheet_name, df in data_dict.items():
        df.to_excel(writer, sheet_name=sheet_name, index=False)

# 方法3:直接指定
pd.ExcelWriter('output.xlsx').close()  # 旧版本

3.3 高级写入选项

with pd.ExcelWriter('advanced.xlsx', engine='xlsxwriter') as writer:
    # 基础写入
    df.to_excel(writer, sheet_name='数据', index=False)

    # 获取 workbook 和 worksheet 对象
    workbook = writer.book
    worksheet = writer.sheets['数据']

    # 设置列宽
    worksheet.set_column('A:D', 15)

    # 格式化表头
    header_format = workbook.add_format({
        'bold': True,
        'text_wrap': True,
        'valign': 'top',
        'fg_color': '#D7E4BC',
        'border': 1
    })

    # 应用格式(需要先写入数据)
    for col_num, value in enumerate(df.columns.values):
        worksheet.write(0, col_num, value, header_format)

4. 格式化和样式(openpyxl)

4.1 单元格样式

from openpyxl.styles import PatternFill, Font, Alignment
from openpyxl.utils.dataframe import dataframe_to_rows

# 创建带样式的 Excel
def write_styled_excel(df, filename):
    with pd.ExcelWriter(filename, engine='openpyxl') as writer:
        df.to_excel(writer, sheet_name='数据', index=False, header=True)

        workbook = writer.book
        worksheet = writer.sheets['数据']

        # 表头样式
        header_font = Font(bold=True, color='FFFFFF')
        header_fill = PatternFill(start_color='366092', end_color='366092', fill_type='solid')

        for cell in worksheet[1]:  # 表头行
            cell.font = header_font
            cell.fill = header_fill
            cell.alignment = Alignment(horizontal='center')

        # 数据格式化
        for row in worksheet.iter_rows(min_row=2, values_only=False):
            for cell in row:
                cell.alignment = Alignment(horizontal='center')

        # 条件格式
        from openpyxl.formatting.rule import ColorScaleRule
        worksheet.conditional_formatting.add('C2:C100',
            ColorScaleRule(start_type='min', start_color='FF0000',
                          end_type='max', end_color='00FF00'))

        # 自动调整列宽
        for column in worksheet.columns:
            max_length = 0
            column_letter = column[0].column_letter
            for cell in column:
                try:
                    if len(str(cell.value)) > max_length:
                        max_length = len(str(cell.value))
                except:
                    pass
            adjusted_width = min(max_length + 2, 50)
            worksheet.column_dimensions[column_letter].width = adjusted_width

# 使用
write_styled_excel(df, 'styled_output.xlsx')

4.2 使用 xlsxwriter 格式化

import xlsxwriter

def write_with_xlsxwriter(df, filename):
    workbook = xlsxwriter.Workbook(filename)
    worksheet = workbook.add_worksheet('数据')

    # 格式定义
    header_format = workbook.add_format({
        'bold': True,
        'text_wrap': True,
        'valign': 'top',
        'fg_color': '#D7E4BC',
        'border': 1
    })

    # 写入表头
    for col_num, value in enumerate(df.columns.values):
        worksheet.write(0, col_num, value, header_format)

    # 写入数据
    for row_num, row_data in enumerate(df.values, 1):
        for col_num, value in enumerate(row_data):
            worksheet.write(row_num, col_num, value)

    # 设置列宽
    worksheet.set_column('A:D', 15)

    # 条件格式
    worksheet.conditional_format('C2:C100', {
        'type': '3_color_scale',
        'min_color': '#FF0000',
        'mid_color': '#FFFF00',
        'max_color': '#00FF00'
    })

    # 图表
    chart = workbook.add_chart({'type': 'column'})
    chart.add_series({
        'name': '销量',
        'categories': ['数据', 1, 0, len(df)-1, 0],
        'values': ['数据', 1, 2, len(df)-1, 2],
    })
    worksheet.insert_chart('F2', chart)

    workbook.close()

# 使用
write_with_xlsxwriter(df, 'chart_output.xlsx')

5. 实际应用示例

5.1 销售报表生成

def generate_sales_report(sales_df, filename):
    """生成销售分析报表"""
    with pd.ExcelWriter(filename, engine='openpyxl') as writer:
        # 原始数据
        sales_df.to_excel(writer, sheet_name='原始数据', index=False)

        # 汇总统计
        summary = sales_df.groupby('产品').agg({
            '销量': 'sum',
            '收入': 'sum',
            '单价': 'mean'
        }).round(2)
        summary.to_excel(writer, sheet_name='产品汇总')

        # 月度分析
        sales_df['月份'] = pd.to_datetime(sales_df['日期']).dt.to_period('M')
        monthly = sales_df.groupby(['月份', '产品'])['收入'].sum().unstack()
        monthly.to_excel(writer, sheet_name='月度分析')

        # 排名
        top_products = sales_df.groupby('产品')['收入'].sum().sort_values(ascending=False)
        top_df = pd.DataFrame({'收入': top_products}).reset_index()
        top_df.to_excel(writer, sheet_name='产品排名', index=False)

        # 透视表
        pivot = sales_df.pivot_table(values='收入', 
                                   index='产品', 
                                   columns='地区', 
                                   aggfunc='sum', 
                                   fill_value=0)
        pivot.to_excel(writer, sheet_name='地区分析')

# 示例数据
sales_data = pd.DataFrame({
    '日期': pd.date_range('2023-01-01', periods=100, freq='D'),
    '产品': np.random.choice(['A', 'B', 'C'], 100),
    '地区': np.random.choice(['华北', '华东', '华南'], 100),
    '销量': np.random.randint(10, 100, 100),
    '单价': np.random.uniform(10, 50, 100)
})
sales_data['收入'] = sales_data['销量'] * sales_data['单价']

generate_sales_report(sales_data, '销售报表.xlsx')

5.2 多源数据整合

def consolidate_excel_data(folder_path):
    """整合文件夹中所有 Excel 文件"""
    import glob
    import os

    all_files = glob.glob(os.path.join(folder_path, '*.xlsx'))
    combined_data = []

    for file_path in all_files:
        print(f"处理: {os.path.basename(file_path)}")

        # 读取所有工作表
        excel_file = pd.ExcelFile(file_path)
        for sheet_name in excel_file.sheet_names:
            df = pd.read_excel(file_path, sheet_name=sheet_name)
            df['源文件'] = os.path.basename(file_path)
            df['工作表'] = sheet_name
            combined_data.append(df)

    # 合并所有数据
    final_df = pd.concat(combined_data, ignore_index=True)

    # 数据清洗
    final_df = final_df.dropna(subset=['关键列'])  # 根据实际情况调整

    return final_df

# 使用
consolidated = consolidate_excel_data('./excel_files/')
consolidated.to_excel('整合数据.xlsx', index=False)

5.3 数据验证和质量报告

def create_data_quality_report(df, filename):
    """创建数据质量报告"""
    with pd.ExcelWriter(filename, engine='openpyxl') as writer:
        # 基本信息
        info = pd.DataFrame({
            '指标': ['总行数', '总列数', '缺失值总数', '重复行数', '内存使用(MB)'],
            '值': [len(df), len(df.columns), df.isnull().sum().sum(),
                  df.duplicated().sum(), df.memory_usage(deep=True).sum() / 1024**2]
        })
        info.to_excel(writer, sheet_name='基本信息', index=False)

        # 缺失值明细
        missing_detail = pd.DataFrame({
            '列名': df.columns,
            '缺失值': df.isnull().sum(),
            '缺失率%': (df.isnull().sum() / len(df) * 100).round(2)
        })
        missing_detail.to_excel(writer, sheet_name='缺失值明细', index=False)

        # 数据类型
        dtype_info = pd.DataFrame({
            '列名': df.columns,
            '数据类型': df.dtypes,
            '唯一值数量': df.nunique()
        })
        dtype_info.to_excel(writer, sheet_name='数据类型', index=False)

        # 数值统计
        numeric_stats = df.describe()
        numeric_stats.to_excel(writer, sheet_name='数值统计')

        # 异常值检测
        outliers = {}
        for col in df.select_dtypes(include=[np.number]).columns:
            Q1 = df[col].quantile(0.25)
            Q3 = df[col].quantile(0.75)
            IQR = Q3 - Q1
            outliers[col] = ((df[col] < (Q1 - 1.5 * IQR)) | 
                           (df[col] > (Q3 + 1.5 * IQR))).sum()

        outlier_df = pd.DataFrame(list(outliers.items()), 
                                 columns=['列名', '异常值数量'])
        outlier_df.to_excel(writer, sheet_name='异常值', index=False)

# 使用
create_data_quality_report(df, '数据质量报告.xlsx')

6. 性能优化

6.1 大文件处理

# 分块读取(Excel 不直接支持,需预处理)
def process_large_excel(file_path, sheet_name, chunk_size=10000):
    """处理大 Excel 文件"""
    # 先读取所有行数
    xl_file = pd.ExcelFile(file_path)
    total_rows = len(pd.read_excel(xl_file, sheet_name=sheet_name, nrows=0))

    results = []
    for start_row in range(0, total_rows, chunk_size):
        df_chunk = pd.read_excel(file_path, 
                               sheet_name=sheet_name,
                               skiprows=start_row,
                               nrows=chunk_size)
        # 处理 chunk
        processed = df_chunk[df_chunk['value'] > 100]  # 示例
        results.append(processed)

    return pd.concat(results, ignore_index=True)

# 使用 Dask 处理超大文件
import dask.dataframe as dd
# ddf = dd.read_excel('large.xlsx')  # Dask 暂不支持直接读取 Excel

6.2 内存优化

def optimize_excel_read(file_path, sheet_name):
    """优化 Excel 读取性能"""
    # 只读取需要的列
    df = pd.read_excel(file_path, sheet_name=sheet_name,
                      usecols=['col1', 'col2', 'date'])  # 指定列

    # 优化数据类型
    for col in df.select_dtypes(include=['object']).columns:
        if df[col].nunique() < len(df) * 0.1:  # 低基数转换为 category
            df[col] = df[col].astype('category')

    numeric_cols = df.select_dtypes(include=[np.number]).columns
    for col in numeric_cols:
        df[col] = pd.to_numeric(df[col], downcast='integer' if df[col].dtype.kind == 'i' else 'float')

    return df

# 写入时优化
with pd.ExcelWriter('optimized.xlsx', engine='openpyxl') as writer:
    # 转换 category 为 object(Excel 不支持 category)
    df_ready = df.copy()
    for col in df_ready.select_dtypes(['category']).columns:
        df_ready[col] = df_ready[col].astype(str)
    df_ready.to_excel(writer, index=False)

7. 与 CSV 的对比和互转

7.1 Excel vs CSV

# Excel 优势:格式、公式、多工作表、图表
# CSV 优势:轻量、兼容性好、处理速度快

# Excel 转 CSV
df = pd.read_excel('input.xlsx', sheet_name='数据')
df.to_csv('output.csv', index=False, encoding='utf-8')

# CSV 转 Excel
df_csv = pd.read_csv('input.csv')
with pd.ExcelWriter('output.xlsx') as writer:
    df_csv.to_excel(writer, sheet_name='数据', index=False)

7.2 批量转换

def batch_convert_excel_to_csv(excel_folder, csv_folder):
    """批量将 Excel 转换为 CSV"""
    import os
    import shutil

    os.makedirs(csv_folder, exist_ok=True)

    for excel_file in os.listdir(excel_folder):
        if excel_file.endswith('.xlsx'):
            input_path = os.path.join(excel_folder, excel_file)
            base_name = os.path.splitext(excel_file)[0]

            # 读取所有工作表
            xl_file = pd.ExcelFile(input_path)
            for sheet_name in xl_file.sheet_names:
                df = pd.read_excel(xl_file, sheet_name=sheet_name)
                output_path = os.path.join(csv_folder, f"{base_name}_{sheet_name}.csv")
                df.to_csv(output_path, index=False, encoding='utf-8')
                print(f"转换: {sheet_name} -> {output_path}")

8. 错误处理和调试

8.1 常见问题解决

def robust_excel_read(file_path, sheet_name=None, **kwargs):
    """健壮的 Excel 读取"""
    engines = ['openpyxl', 'xlrd', 'calamine']  # 尝试不同引擎

    for engine in engines:
        try:
            kwargs['engine'] = engine
            df = pd.read_excel(file_path, sheet_name=sheet_name, **kwargs)
            print(f"成功使用引擎: {engine}")
            return df
        except ImportError:
            print(f"引擎 {engine} 未安装")
            continue
        except Exception as e:
            print(f"引擎 {engine} 失败: {e}")
            continue

    raise Exception("所有引擎都失败了")

# 编码和格式问题
try:
    df = pd.read_excel('data.xlsx', encoding='utf-8')
except UnicodeDecodeError:
    # Excel 通常不需要 encoding 参数,但某些情况下可能需要
    pass

8.2 文件验证

def validate_excel_file(file_path):
    """验证 Excel 文件"""
    try:
        xl_file = pd.ExcelFile(file_path)
        print(f"文件: {file_path}")
        print(f"工作表数量: {len(xl_file.sheet_names)}")
        print(f"工作表: {xl_file.sheet_names}")

        for sheet in xl_file.sheet_names:
            # 检查每张表的行数和列数
            df_sample = pd.read_excel(xl_file, sheet_name=sheet, nrows=5)
            print(f"  {sheet}: {len(df_sample.columns)} 列")

        return True
    except Exception as e:
        print(f"文件验证失败: {e}")
        return False

# 使用
validate_excel_file('data.xlsx')

9. 高级功能

9.1 公式和计算

# 在 DataFrame 中预计算,然后写入
df['总价'] = df['数量'] * df['单价']
df['税费'] = df['总价'] * 0.08
df['净收入'] = df['总价'] - df['税费']

# Excel 中写入公式(xlsxwriter)
with pd.ExcelWriter('formulas.xlsx', engine='xlsxwriter') as writer:
    df[['数量', '单价']].to_excel(writer, sheet_name='计算', startrow=0, index=False)
    worksheet = writer.sheets['计算']

    # 在 Excel 中添加公式
    for row in range(1, len(df) + 1):
        worksheet.write_formula(row, 2, f'=B{row+1}*C{row+1}')  # 总价
        worksheet.write_formula(row, 3, f'=D{row+1}*0.08')      # 税费

9.2 图表和可视化

def create_chart_excel(df, filename):
    """创建带图表的 Excel"""
    with pd.ExcelWriter(filename, engine='xlsxwriter') as writer:
        df.to_excel(writer, sheet_name='数据', index=False)

        workbook = writer.book
        worksheet = writer.sheets['数据']

        # 创建柱状图
        chart = workbook.add_chart({'type': 'column'})
        chart.add_series({
            'name': '销量',
            'categories': ['数据', 1, 0, len(df)-1, 0],  # X轴
            'values': ['数据', 1, 1, len(df)-1, 1],     # Y轴(假设第2列是销量)
            'fill': {'color': '#4472C4'}
        })

        # 创建折线图
        line_chart = workbook.add_chart({'type': 'line'})
        line_chart.add_series({
            'name': '趋势',
            'categories': ['数据', 1, 0, len(df)-1, 0],
            'values': ['数据', 1, 2, len(df)-1, 2],  # 假设第3列
        })

        # 插入图表
        worksheet.insert_chart('G2', chart)
        worksheet.insert_chart('G18', line_chart)

        # 设置图表格式
        chart.set_title({'name': '销售数据分析'})
        chart.set_x_axis({'name': '产品'})
        chart.set_y_axis({'name': '销量'})

9.3 保护和加密

from openpyxl import Workbook
from openpyxl.utils.dataframe import dataframe_to_rows

def create_protected_excel(df, filename, password):
    """创建受保护的 Excel"""
    wb = Workbook()
    ws = wb.active

    # 写入数据
    for r in dataframe_to_rows(df, index=False, header=True):
        ws.append(r)

    # 保护工作表
    ws.protection.sheet = True
    ws.protection.password = password

    # 保护特定单元格
    for row in ws.iter_rows():
        for cell in row:
            if cell.row == 1:  # 保护表头
                cell.protection = ws.protection

    # 保护工作簿
    wb.security.workbookPassword = password
    wb.security.lockStructure = True

    wb.save(filename)

# 使用(注意:openpyxl 的保护功能有限)
create_protected_excel(df, 'protected.xlsx', 'mypassword')

10. 最佳实践

10.1 文件操作规范

class ExcelManager:
    """Excel 文件管理器"""
    def __init__(self, file_path):
        self.file_path = file_path
        self.excel_file = pd.ExcelFile(file_path)
        self.sheets = self.excel_file.sheet_names

    def read_sheet(self, sheet_name, **kwargs):
        """安全读取工作表"""
        if sheet_name not in self.sheets:
            raise ValueError(f"工作表 {sheet_name} 不存在")
        return pd.read_excel(self.excel_file, sheet_name, **kwargs)

    def read_all(self, **kwargs):
        """读取所有工作表"""
        return {sheet: self.read_sheet(sheet, **kwargs) 
                for sheet in self.sheets}

    def close(self):
        self.excel_file.close()

# 使用
manager = ExcelManager('data.xlsx')
df = manager.read_sheet('销售数据')
manager.close()

10.2 错误恢复

import logging
import traceback

def safe_excel_operation(func):
    """Excel 操作错误处理装饰器"""
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except FileNotFoundError:
            logging.error(f"文件未找到: {kwargs.get('file_path', 'unknown')}")
            return None
        except PermissionError:
            logging.error("文件权限错误,请检查文件是否被占用")
            return None
        except pd.errors.EmptyDataError:
            logging.warning("Excel 文件为空")
            return pd.DataFrame()
        except Exception as e:
            logging.error(f"未知错误: {e}")
            logging.error(traceback.format_exc())
            return None
    return wrapper

@safe_excel_operation
def read_sales_data(file_path, sheet_name='销售'):
    return pd.read_excel(file_path, sheet_name=sheet_name)

# 配置日志
logging.basicConfig(level=logging.INFO, 
                   format='%(asctime)s - %(levelname)s - %(message)s')

Excel 文件操作比 CSV 复杂但功能强大,适合需要格式化、图表和多工作表的场景。掌握不同引擎、格式化和错误处理技巧,可以高效处理各种 Excel 数据需求。

类似文章

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注