||

Pandas JSON 文件操作详解

1. JSON 基础概念

JSON(JavaScript Object Notation) 是一种轻量级的数据交换格式:

  • 结构:嵌套的字典(对象)和列表(数组)
  • 数据类型:字符串、数字、布尔、null、数组、对象
  • 优势:跨语言兼容、易读、API 常用格式
  • Pandas 支持:直接读写 JSON,支持多种 orient 格式

2. 安装和依赖

# Pandas 自带 JSON 支持,无需额外安装
pip install pandas

# 加速处理(可选)
pip install orjson  # 更快的 JSON 解析器
pip install ujson   # 超快 JSON 库

# 验证
import pandas as pd
import json
print(pd.__version__)

3. JSON 数据结构与 Pandas 对应

3.1 常见 JSON 格式

# 1. 记录列表(最常见)
records_json = [
    {"name": "张三", "age": 25, "city": "北京"},
    {"name": "李四", "age": 30, "city": "上海"}
]

# 2. 列导向
columns_json = {
    "name": ["张三", "李四"],
    "age": [25, 30],
    "city": ["北京", "上海"]
}

# 3. 嵌套对象
nested_json = {
    "data": {
        "employees": [
            {"id": 1, "details": {"name": "张三", "age": 25}},
            {"id": 2, "details": {"name": "李四", "age": 30}}
        ]
    }
}

# 4. 索引-列-值格式
split_json = {
    "index": [0, 1],
    "columns": ["name", "age", "city"],
    "data": [
        ["张三", 25, "北京"],
        ["李四", 30, "上海"]
    ]
}

4. 读取 JSON 文件

4.1 基本读取

# 从文件读取
df = pd.read_json('data.json')

# 从字符串读取
json_str = '''
[
    {"name": "张三", "age": 25, "city": "北京"},
    {"name": "李四", "age": 30, "city": "上海"}
]
'''
df = pd.read_json(json_str)

# 从 URL 读取
df = pd.read_json('https://api.example.com/data.json')

4.2 orient 参数详解

# 不同 orient 对应不同 JSON 结构

# 1. 'split' - 默认(推荐)
split_data = {
    'index': [0, 1],
    'columns': ['name', 'age', 'city'],
    'data': [['张三', 25, '北京'], ['李四', 30, '上海']]
}
df_split = pd.read_json(pd.json_normalize(split_data))

# 2. 'records' - 记录列表
records_data = [
    {'name': '张三', 'age': 25, 'city': '北京'},
    {'name': '李四', 'age': 30, 'city': '上海'}
]
df_records = pd.read_json(json.dumps(records_data), orient='records')

# 3. 'index' - 行导向
index_data = {
    0: {'name': '张三', 'age': 25, 'city': '北京'},
    1: {'name': '李四', 'age': 30, 'city': '上海'}
}
df_index = pd.read_json(json.dumps(index_data), orient='index')

# 4. 'columns' - 列导向
columns_data = {
    'name': ['张三', '李四'],
    'age': [25, 30],
    'city': ['北京', '上海']
}
df_columns = pd.read_json(json.dumps(columns_data), orient='columns')

# 5. 'values' - 纯数据数组
values_data = [['张三', 25, '北京'], ['李四', 30, '上海']]
df_values = pd.read_json(json.dumps(values_data), orient='values')

4.3 高级读取参数

df = pd.read_json('data.json',
                 orient='records',     # JSON 格式
                 lines=True,          # 每行一个 JSON 对象(JSONL)
                 encoding='utf-8',    # 编码
                 dtype={'age': 'int32'}, # 数据类型
                 convert_axes=True,   # 转换索引
                 precise_float=True,  # 精确浮点数
                 nrows=1000,          # 限制行数
                 chunksize=1000       # 分块读取
                 )

# 分块读取大文件
for chunk in pd.read_json('large.json', chunksize=1000, lines=True):
    # 处理每个块
    print(f"处理 {len(chunk)} 行")
    processed = chunk[chunk['age'] > 20]
    # ... 处理逻辑

5. 写入 JSON 文件

5.1 基本写入

# 写入文件
df.to_json('output.json')

# 指定 orient
df.to_json('output_records.json', orient='records', indent=2)

# 压缩
df.to_json('output.json.gz', compression='gzip')

5.2 orient 参数详解

# 不同输出格式
df.to_json('split.json', orient='split')      # 分离索引、列、数据
df.to_json('records.json', orient='records')  # 记录列表
df.to_json('index.json', orient='index')      # 行导向
df.to_json('columns.json', orient='columns')  # 列导向
df.to_json('values.json', orient='values')    # 纯数据

# 美化输出
df.to_json('pretty.json', orient='records', indent=4, ensure_ascii=False)

# 日期格式
df.to_json('dates.json', date_format='iso', orient='records')

5.3 JSON Lines 格式(JSONL)

# 每行一个 JSON 对象,适合流式处理
df.to_json('data.jsonl', orient='records', lines=True)

# 读取 JSONL
df = pd.read_json('data.jsonl', lines=True, orient='records')

6. 嵌套 JSON 处理

6.1 json_normalize 展平嵌套结构

# 嵌套 JSON 数据
nested_data = {
    "employees": [
        {
            "id": 1,
            "name": "张三",
            "department": {
                "name": "技术部",
                "manager": "李经理"
            },
            "skills": ["Python", "Pandas", "SQL"]
        },
        {
            "id": 2,
            "name": "李四",
            "department": {
                "name": "销售部",
                "manager": "王经理"
            },
            "skills": ["Excel", "沟通"]
        }
    ]
}

# 展平嵌套结构
from pandas import json_normalize

# 基本展平
df_flat = json_normalize(nested_data['employees'])
print("基本展平:")
print(df_flat)

# 指定展平路径
df_custom = json_normalize(
    nested_data['employees'],
    record_path=['skills'],           # 数组展平
    meta=['id', ['department', 'name'], ['department', 'manager']]  # 元数据路径
)
print("\n技能展平:")
print(df_custom)

# 多级展平
df_dept = json_normalize(nested_data['employees'], 
                        meta_prefix='employee.')

6.2 处理复杂嵌套

def flatten_nested_json(data, separator='_'):
    """递归展平嵌套字典"""
    def _flatten(d, parent_key='', sep='_'):
        items = []
        for k, v in d.items():
            new_key = f"{parent_key}{sep}{k}" if parent_key else k
            if isinstance(v, dict):
                items.extend(_flatten(v, new_key, sep=sep).items())
            elif isinstance(v, list):
                for i, item in enumerate(v):
                    if isinstance(item, dict):
                        items.extend(_flatten(item, f"{new_key}.{i}", sep=sep).items())
                    else:
                        items.append((f"{new_key}.{i}", item))
            else:
                items.append((new_key, v))
        return dict(items)

    flattened = []
    if isinstance(data, list):
        for item in data:
            flattened.append(_flatten(item))
    else:
        flattened = [_flatten(data)]

    return pd.DataFrame(flattened)

# 使用
complex_data = nested_data['employees']
df_flattened = flatten_nested_json(complex_data)
print(df_flattened)

7. 实际应用示例

7.1 API 数据处理

import requests

def fetch_api_to_dataframe(api_url, params=None):
    """从 API 获取 JSON 并转换为 DataFrame"""
    try:
        response = requests.get(api_url, params=params)
        response.raise_for_status()

        data = response.json()

        # 处理不同响应结构
        if isinstance(data, list):
            df = pd.json_normalize(data)
        elif isinstance(data, dict) and 'data' in data:
            df = pd.json_normalize(data['data'])
        elif isinstance(data, dict) and 'results' in data:
            df = pd.json_normalize(data['results'])
        else:
            df = pd.DataFrame([data])

        return df
    except Exception as e:
        print(f"API 请求失败: {e}")
        return None

# 示例:获取 GitHub 用户信息
url = "https://api.github.com/users/octocat"
df_github = fetch_api_to_dataframe(url)
print(df_github)

# 分页处理
def fetch_paginated_api(base_url, params=None, max_pages=10):
    """处理分页 API"""
    all_data = []
    page = 1

    while page <= max_pages:
        params = params or {}
        params['page'] = page
        params['per_page'] = 100

        df_page = fetch_api_to_dataframe(base_url, params)
        if df_page is None or df_page.empty:
            break

        all_data.append(df_page)
        page += 1

    return pd.concat(all_data, ignore_index=True)

# 使用
# df_all = fetch_paginated_api('https://api.github.com/users')

7.2 日志文件处理(JSONL)

def process_jsonl_logs(log_file):
    """处理 JSON 日志文件"""
    chunks = []

    for chunk in pd.read_json(log_file, lines=True, chunksize=10000):
        # 数据清洗
        chunk['timestamp'] = pd.to_datetime(chunk['timestamp'])
        chunk = chunk.dropna(subset=['level', 'message'])

        # 提取结构化信息
        if 'data' in chunk.columns:
            # 处理嵌套数据
            nested_data = chunk['data'].apply(pd.Series)
            chunk = pd.concat([chunk.drop('data', axis=1), nested_data], axis=1)

        # 过滤和转换
        chunk['level'] = chunk['level'].astype('category')
        chunks.append(chunk)

    return pd.concat(chunks, ignore_index=True)

# 使用
logs_df = process_jsonl_logs('app_logs.jsonl')

# 分析日志
print("错误日志统计:")
print(logs_df[logs_df['level'] == 'ERROR'].groupby('service')['message'].count())

7.3 嵌套配置处理

def load_config_json(config_file):
    """加载和验证配置 JSON"""
    with open(config_file, 'r', encoding='utf-8') as f:
        config = json.load(f)

    # 转换为 DataFrame 进行验证
    settings_df = pd.json_normalize(config['settings'])

    # 数据验证
    required_cols = ['name', 'enabled', 'value']
    missing_cols = [col for col in required_cols if col not in settings_df.columns]
    if missing_cols:
        raise ValueError(f"缺少必要配置: {missing_cols}")

    # 类型检查和默认值
    settings_df['enabled'] = settings_df['enabled'].astype(bool)
    settings_df['value'] = pd.to_numeric(settings_df['value'], errors='coerce')

    return config, settings_df

# 使用
config, settings = load_config_json('config.json')
print("配置设置:")
print(settings)

8. 性能优化

8.1 快速 JSON 引擎

# 使用 orjson(需要安装)
import orjson

def fast_json_to_df(json_str, orient='records'):
    """使用 orjson 快速解析"""
    data = orjson.loads(json_str)
    return pd.json_normalize(data) if isinstance(data, list) else pd.DataFrame([data])

# 基准测试
import time
start = time.time()
df_orjson = fast_json_to_df(large_json_string)
print(f"orjson 时间: {time.time() - start:.3f}s")

8.2 大文件分块处理

def process_large_jsonl(file_path, batch_size=10000):
    """高效处理大 JSONL 文件"""
    results = []

    with open(file_path, 'r', encoding='utf-8') as f:
        batch = []
        for line_num, line in enumerate(f, 1):
            try:
                record = json.loads(line.strip())
                batch.append(record)

                if len(batch) >= batch_size:
                    # 处理批次
                    df_batch = pd.json_normalize(batch)
                    results.append(df_batch)
                    batch = []
                    print(f"处理进度: {line_num:,} 行")

            except json.JSONDecodeError as e:
                print(f"第 {line_num} 行 JSON 解析错误: {e}")
                continue

    # 处理剩余记录
    if batch:
        df_last = pd.json_normalize(batch)
        results.append(df_last)

    # 合并结果
    final_df = pd.concat(results, ignore_index=True)
    return final_df

# 使用
large_df = process_large_jsonl('large_logs.jsonl')

8.3 内存优化

def optimize_json_df(df):
    """优化 JSON 加载的 DataFrame"""
    # 分类数据类型转换
    for col in df.select_dtypes(include=['object']).columns:
        if df[col].nunique() / len(df) < 0.5:
            df[col] = df[col].astype('category')

    # 数值类型优化
    for col in df.select_dtypes(include=[np.number]).columns:
        if df[col].dtype.kind in 'i':
            df[col] = pd.to_numeric(df[col], downcast='integer')
        elif df[col].dtype.kind in 'f':
            df[col] = pd.to_numeric(df[col], downcast='float')

    # 日期解析
    date_cols = df.select_dtypes(include=['object']).columns
    for col in date_cols:
        try:
            df[col] = pd.to_datetime(df[col], errors='coerce')
        except:
            pass

    return df

# 使用
df_raw = pd.read_json('data.json')
df_optimized = optimize_json_df(df_raw)
print(f"内存节省: {(df_raw.memory_usage(deep=True).sum() - df_optimized.memory_usage(deep=True).sum()) / 1024**2:.2f} MB")

9. 与其他格式互转

9.1 JSON 与 CSV/Excel

# JSON -> CSV
df = pd.read_json('input.json', orient='records')
df.to_csv('output.csv', index=False)

# CSV -> JSON
df_csv = pd.read_csv('input.csv')
df_csv.to_json('output.json', orient='records', indent=2)

# JSON -> Excel
df.to_excel('output.xlsx', index=False)

# Excel -> JSON
df_excel = pd.read_excel('input.xlsx')
df_excel.to_json('output.json', orient='records')

9.2 数据库互转

from sqlalchemy import create_engine

# JSON -> 数据库
df = pd.read_json('data.json')
engine = create_engine('sqlite:///data.db')
df.to_sql('json_table', engine, if_exists='replace', index=False)

# 数据库 -> JSON
df_db = pd.read_sql('SELECT * FROM json_table', engine)
df_db.to_json('output.json', orient='records')

10. 错误处理和验证

10.1 JSON 验证

import jsonschema
from jsonschema import validate, ValidationError

# JSON Schema 定义
schema = {
    "type": "object",
    "properties": {
        "name": {"type": "string"},
        "age": {"type": "integer", "minimum": 0},
        "city": {"type": "string"}
    },
    "required": ["name", "age"]
}

def validate_json_data(json_data, schema):
    """验证 JSON 数据"""
    try:
        validate(instance=json_data, schema=schema)
        print("JSON 数据验证通过")
        return True
    except ValidationError as e:
        print(f"JSON 验证失败: {e.message}")
        return False

# 使用(单条记录)
record = {"name": "张三", "age": 25, "city": "北京"}
validate_json_data(record, schema)

# DataFrame 验证
df = pd.read_json('data.json')
for _, row in df.iterrows():
    validate_json_data(row.to_dict(), schema)

10.2 健壮读取

def robust_json_read(file_path, **kwargs):
    """健壮的 JSON 读取"""
    try:
        # 尝试标准读取
        df = pd.read_json(file_path, **kwargs)
        return df
    except ValueError as e:
        print(f"JSON 格式错误: {e}")

        # 尝试逐行读取
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                lines = [json.loads(line.strip()) for line in f if line.strip()]
            df = pd.json_normalize(lines)
            return df
        except:
            pass

    # 最后尝试原始文本处理
    with open(file_path, 'r', encoding='utf-8') as f:
        content = f.read()

    try:
        data = json.loads(content)
        return pd.json_normalize(data)
    except:
        raise ValueError("无法解析 JSON 文件")

# 使用
df = robust_json_read('potentially_corrupt.json', lines=True)

11. 高级应用:实时数据流

11.1 WebSocket 数据处理

import asyncio
import websockets
import json
from collections import deque
import pandas as pd

class RealTimeJsonProcessor:
    def __init__(self, max_buffer=10000):
        self.buffer = deque(maxlen=max_buffer)
        self.df = pd.DataFrame()

    async def process_stream(self, websocket_url):
        """处理 WebSocket JSON 流"""
        async with websockets.connect(websocket_url) as websocket:
            while True:
                try:
                    message = await websocket.recv()
                    data = json.loads(message)

                    # 添加到缓冲区
                    self.buffer.append(data)

                    # 定期转换为 DataFrame
                    if len(self.buffer) % 100 == 0:
                        new_df = pd.json_normalize(list(self.buffer)[-100:])
                        self.df = pd.concat([self.df, new_df], ignore_index=True)
                        print(f"处理了 {len(self.df)} 条记录")

                        # 数据分析
                        if 'price' in new_df.columns:
                            print(f"最新价格: {new_df['price'].iloc[-1]}")

                except Exception as e:
                    print(f"流处理错误: {e}")
                    break

    def get_summary(self):
        """获取汇总统计"""
        return {
            'total_records': len(self.df),
            'columns': list(self.df.columns),
            'memory_usage': self.df.memory_usage(deep=True).sum() / 1024**2
        }

# 使用(示例)
# processor = RealTimeJsonProcessor()
# asyncio.run(processor.process_stream('wss://api.example.com/stream'))

11.2 Kafka 集成

from kafka import KafkaConsumer
import json

def process_kafka_json(topic, bootstrap_servers):
    """处理 Kafka JSON 消息"""
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        value_deserializer=lambda m: json.loads(m.decode('utf-8')),
        auto_offset_reset='latest'
    )

    records = []
    for message in consumer:
        data = message.value

        # 转换为 DataFrame
        df_new = pd.json_normalize([data])
        records.append(df_new)

        # 批量处理
        if len(records) >= 100:
            batch_df = pd.concat(records, ignore_index=True)
            # 处理业务逻辑
            process_batch(batch_df)
            records = []

    consumer.close()

def process_batch(df):
    """批量处理函数"""
    # 示例:实时统计
    if not df.empty and 'event_type' in df.columns:
        print(f"最新事件: {df['event_type'].value_counts().iloc[0]}")

12. 最佳实践

12.1 数据管道设计

class JsonDataPipeline:
    """JSON 数据处理管道"""

    def __init__(self):
        self.validators = []
        self.transformers = []
        self.writers = []

    def add_validator(self, validator_func):
        """添加验证器"""
        self.validators.append(validator_func)
        return self

    def add_transformer(self, transformer_func):
        """添加转换器"""
        self.transformers.append(transformer_func)
        return self

    def add_writer(self, writer_func):
        """添加写入器"""
        self.writers.append(writer_func)
        return self

    def process(self, json_data):
        """执行完整处理流程"""
        data = json_data

        # 验证阶段
        for validator in self.validators:
            if not validator(data):
                raise ValueError("数据验证失败")

        # 转换阶段
        df = pd.json_normalize(data)
        for transformer in self.transformers:
            df = transformer(df)

        # 写入阶段
        for writer in self.writers:
            writer(df)

        return df

# 使用示例
pipeline = JsonDataPipeline()

# 定义处理函数
def validate_structure(data):
    return isinstance(data, list) and all(isinstance(d, dict) for d in data)

def clean_data(df):
    return df.dropna().assign(timestamp=pd.to_datetime(df.get('timestamp', None)))

def save_to_db(df):
    # 数据库保存逻辑
    print(f"保存 {len(df)} 条记录到数据库")

pipeline.add_validator(validate_structure)\
        .add_transformer(clean_data)\
        .add_writer(save_to_db)

# 处理数据
result = pipeline.process(json_data)

12.2 配置管理

# config.json
JSON_CONFIG = {
    "read": {
        "orient": "records",
        "lines": False,
        "encoding": "utf-8",
        "dtype": {"id": "int32", "timestamp": "datetime64[ns]"}
    },
    "write": {
        "orient": "records",
        "indent": 2,
        "date_format": "iso",
        "ensure_ascii": False
    },
    "processing": {
        "chunk_size": 10000,
        "max_memory_mb": 100,
        "category_threshold": 0.5
    }
}

def create_json_pipeline(config_path="config.json"):
    """基于配置创建处理管道"""
    with open(config_path, 'r') as f:
        config = json.load(f)

    def read_json(file_path):
        return pd.read_json(file_path, **config['read'])

    def write_json(df, file_path):
        df.to_json(file_path, **config['write'])

    return read_json, write_json

# 使用
read_func, write_func = create_json_pipeline()
df = read_func('input.json')
# ... 处理 ...
write_func(df, 'output.json')

JSON 操作是现代数据处理的重要技能,特别是处理 API、日志和配置数据时。掌握不同 orient 格式、嵌套处理和性能优化技巧,可以高效应对各种 JSON 数据场景。

类似文章

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注