Pandas JSON 文件操作详解
1. JSON 基础概念
JSON(JavaScript Object Notation) 是一种轻量级的数据交换格式:
- 结构:嵌套的字典(对象)和列表(数组)
- 数据类型:字符串、数字、布尔、null、数组、对象
- 优势:跨语言兼容、易读、API 常用格式
- Pandas 支持:直接读写 JSON,支持多种 orient 格式
2. 安装和依赖
# Pandas 自带 JSON 支持,无需额外安装
pip install pandas
# 加速处理(可选)
pip install orjson # 更快的 JSON 解析器
pip install ujson # 超快 JSON 库
# 验证
import pandas as pd
import json
print(pd.__version__)
3. JSON 数据结构与 Pandas 对应
3.1 常见 JSON 格式
# 1. 记录列表(最常见)
records_json = [
{"name": "张三", "age": 25, "city": "北京"},
{"name": "李四", "age": 30, "city": "上海"}
]
# 2. 列导向
columns_json = {
"name": ["张三", "李四"],
"age": [25, 30],
"city": ["北京", "上海"]
}
# 3. 嵌套对象
nested_json = {
"data": {
"employees": [
{"id": 1, "details": {"name": "张三", "age": 25}},
{"id": 2, "details": {"name": "李四", "age": 30}}
]
}
}
# 4. 索引-列-值格式
split_json = {
"index": [0, 1],
"columns": ["name", "age", "city"],
"data": [
["张三", 25, "北京"],
["李四", 30, "上海"]
]
}
4. 读取 JSON 文件
4.1 基本读取
# 从文件读取
df = pd.read_json('data.json')
# 从字符串读取
json_str = '''
[
{"name": "张三", "age": 25, "city": "北京"},
{"name": "李四", "age": 30, "city": "上海"}
]
'''
df = pd.read_json(json_str)
# 从 URL 读取
df = pd.read_json('https://api.example.com/data.json')
4.2 orient 参数详解
# 不同 orient 对应不同 JSON 结构
# 1. 'split' - 默认(推荐)
split_data = {
'index': [0, 1],
'columns': ['name', 'age', 'city'],
'data': [['张三', 25, '北京'], ['李四', 30, '上海']]
}
df_split = pd.read_json(pd.json_normalize(split_data))
# 2. 'records' - 记录列表
records_data = [
{'name': '张三', 'age': 25, 'city': '北京'},
{'name': '李四', 'age': 30, 'city': '上海'}
]
df_records = pd.read_json(json.dumps(records_data), orient='records')
# 3. 'index' - 行导向
index_data = {
0: {'name': '张三', 'age': 25, 'city': '北京'},
1: {'name': '李四', 'age': 30, 'city': '上海'}
}
df_index = pd.read_json(json.dumps(index_data), orient='index')
# 4. 'columns' - 列导向
columns_data = {
'name': ['张三', '李四'],
'age': [25, 30],
'city': ['北京', '上海']
}
df_columns = pd.read_json(json.dumps(columns_data), orient='columns')
# 5. 'values' - 纯数据数组
values_data = [['张三', 25, '北京'], ['李四', 30, '上海']]
df_values = pd.read_json(json.dumps(values_data), orient='values')
4.3 高级读取参数
df = pd.read_json('data.json',
orient='records', # JSON 格式
lines=True, # 每行一个 JSON 对象(JSONL)
encoding='utf-8', # 编码
dtype={'age': 'int32'}, # 数据类型
convert_axes=True, # 转换索引
precise_float=True, # 精确浮点数
nrows=1000, # 限制行数
chunksize=1000 # 分块读取
)
# 分块读取大文件
for chunk in pd.read_json('large.json', chunksize=1000, lines=True):
# 处理每个块
print(f"处理 {len(chunk)} 行")
processed = chunk[chunk['age'] > 20]
# ... 处理逻辑
5. 写入 JSON 文件
5.1 基本写入
# 写入文件
df.to_json('output.json')
# 指定 orient
df.to_json('output_records.json', orient='records', indent=2)
# 压缩
df.to_json('output.json.gz', compression='gzip')
5.2 orient 参数详解
# 不同输出格式
df.to_json('split.json', orient='split') # 分离索引、列、数据
df.to_json('records.json', orient='records') # 记录列表
df.to_json('index.json', orient='index') # 行导向
df.to_json('columns.json', orient='columns') # 列导向
df.to_json('values.json', orient='values') # 纯数据
# 美化输出
df.to_json('pretty.json', orient='records', indent=4, ensure_ascii=False)
# 日期格式
df.to_json('dates.json', date_format='iso', orient='records')
5.3 JSON Lines 格式(JSONL)
# 每行一个 JSON 对象,适合流式处理
df.to_json('data.jsonl', orient='records', lines=True)
# 读取 JSONL
df = pd.read_json('data.jsonl', lines=True, orient='records')
6. 嵌套 JSON 处理
6.1 json_normalize 展平嵌套结构
# 嵌套 JSON 数据
nested_data = {
"employees": [
{
"id": 1,
"name": "张三",
"department": {
"name": "技术部",
"manager": "李经理"
},
"skills": ["Python", "Pandas", "SQL"]
},
{
"id": 2,
"name": "李四",
"department": {
"name": "销售部",
"manager": "王经理"
},
"skills": ["Excel", "沟通"]
}
]
}
# 展平嵌套结构
from pandas import json_normalize
# 基本展平
df_flat = json_normalize(nested_data['employees'])
print("基本展平:")
print(df_flat)
# 指定展平路径
df_custom = json_normalize(
nested_data['employees'],
record_path=['skills'], # 数组展平
meta=['id', ['department', 'name'], ['department', 'manager']] # 元数据路径
)
print("\n技能展平:")
print(df_custom)
# 多级展平
df_dept = json_normalize(nested_data['employees'],
meta_prefix='employee.')
6.2 处理复杂嵌套
def flatten_nested_json(data, separator='_'):
"""递归展平嵌套字典"""
def _flatten(d, parent_key='', sep='_'):
items = []
for k, v in d.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.extend(_flatten(v, new_key, sep=sep).items())
elif isinstance(v, list):
for i, item in enumerate(v):
if isinstance(item, dict):
items.extend(_flatten(item, f"{new_key}.{i}", sep=sep).items())
else:
items.append((f"{new_key}.{i}", item))
else:
items.append((new_key, v))
return dict(items)
flattened = []
if isinstance(data, list):
for item in data:
flattened.append(_flatten(item))
else:
flattened = [_flatten(data)]
return pd.DataFrame(flattened)
# 使用
complex_data = nested_data['employees']
df_flattened = flatten_nested_json(complex_data)
print(df_flattened)
7. 实际应用示例
7.1 API 数据处理
import requests
def fetch_api_to_dataframe(api_url, params=None):
"""从 API 获取 JSON 并转换为 DataFrame"""
try:
response = requests.get(api_url, params=params)
response.raise_for_status()
data = response.json()
# 处理不同响应结构
if isinstance(data, list):
df = pd.json_normalize(data)
elif isinstance(data, dict) and 'data' in data:
df = pd.json_normalize(data['data'])
elif isinstance(data, dict) and 'results' in data:
df = pd.json_normalize(data['results'])
else:
df = pd.DataFrame([data])
return df
except Exception as e:
print(f"API 请求失败: {e}")
return None
# 示例:获取 GitHub 用户信息
url = "https://api.github.com/users/octocat"
df_github = fetch_api_to_dataframe(url)
print(df_github)
# 分页处理
def fetch_paginated_api(base_url, params=None, max_pages=10):
"""处理分页 API"""
all_data = []
page = 1
while page <= max_pages:
params = params or {}
params['page'] = page
params['per_page'] = 100
df_page = fetch_api_to_dataframe(base_url, params)
if df_page is None or df_page.empty:
break
all_data.append(df_page)
page += 1
return pd.concat(all_data, ignore_index=True)
# 使用
# df_all = fetch_paginated_api('https://api.github.com/users')
7.2 日志文件处理(JSONL)
def process_jsonl_logs(log_file):
"""处理 JSON 日志文件"""
chunks = []
for chunk in pd.read_json(log_file, lines=True, chunksize=10000):
# 数据清洗
chunk['timestamp'] = pd.to_datetime(chunk['timestamp'])
chunk = chunk.dropna(subset=['level', 'message'])
# 提取结构化信息
if 'data' in chunk.columns:
# 处理嵌套数据
nested_data = chunk['data'].apply(pd.Series)
chunk = pd.concat([chunk.drop('data', axis=1), nested_data], axis=1)
# 过滤和转换
chunk['level'] = chunk['level'].astype('category')
chunks.append(chunk)
return pd.concat(chunks, ignore_index=True)
# 使用
logs_df = process_jsonl_logs('app_logs.jsonl')
# 分析日志
print("错误日志统计:")
print(logs_df[logs_df['level'] == 'ERROR'].groupby('service')['message'].count())
7.3 嵌套配置处理
def load_config_json(config_file):
"""加载和验证配置 JSON"""
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
# 转换为 DataFrame 进行验证
settings_df = pd.json_normalize(config['settings'])
# 数据验证
required_cols = ['name', 'enabled', 'value']
missing_cols = [col for col in required_cols if col not in settings_df.columns]
if missing_cols:
raise ValueError(f"缺少必要配置: {missing_cols}")
# 类型检查和默认值
settings_df['enabled'] = settings_df['enabled'].astype(bool)
settings_df['value'] = pd.to_numeric(settings_df['value'], errors='coerce')
return config, settings_df
# 使用
config, settings = load_config_json('config.json')
print("配置设置:")
print(settings)
8. 性能优化
8.1 快速 JSON 引擎
# 使用 orjson(需要安装)
import orjson
def fast_json_to_df(json_str, orient='records'):
"""使用 orjson 快速解析"""
data = orjson.loads(json_str)
return pd.json_normalize(data) if isinstance(data, list) else pd.DataFrame([data])
# 基准测试
import time
start = time.time()
df_orjson = fast_json_to_df(large_json_string)
print(f"orjson 时间: {time.time() - start:.3f}s")
8.2 大文件分块处理
def process_large_jsonl(file_path, batch_size=10000):
"""高效处理大 JSONL 文件"""
results = []
with open(file_path, 'r', encoding='utf-8') as f:
batch = []
for line_num, line in enumerate(f, 1):
try:
record = json.loads(line.strip())
batch.append(record)
if len(batch) >= batch_size:
# 处理批次
df_batch = pd.json_normalize(batch)
results.append(df_batch)
batch = []
print(f"处理进度: {line_num:,} 行")
except json.JSONDecodeError as e:
print(f"第 {line_num} 行 JSON 解析错误: {e}")
continue
# 处理剩余记录
if batch:
df_last = pd.json_normalize(batch)
results.append(df_last)
# 合并结果
final_df = pd.concat(results, ignore_index=True)
return final_df
# 使用
large_df = process_large_jsonl('large_logs.jsonl')
8.3 内存优化
def optimize_json_df(df):
"""优化 JSON 加载的 DataFrame"""
# 分类数据类型转换
for col in df.select_dtypes(include=['object']).columns:
if df[col].nunique() / len(df) < 0.5:
df[col] = df[col].astype('category')
# 数值类型优化
for col in df.select_dtypes(include=[np.number]).columns:
if df[col].dtype.kind in 'i':
df[col] = pd.to_numeric(df[col], downcast='integer')
elif df[col].dtype.kind in 'f':
df[col] = pd.to_numeric(df[col], downcast='float')
# 日期解析
date_cols = df.select_dtypes(include=['object']).columns
for col in date_cols:
try:
df[col] = pd.to_datetime(df[col], errors='coerce')
except:
pass
return df
# 使用
df_raw = pd.read_json('data.json')
df_optimized = optimize_json_df(df_raw)
print(f"内存节省: {(df_raw.memory_usage(deep=True).sum() - df_optimized.memory_usage(deep=True).sum()) / 1024**2:.2f} MB")
9. 与其他格式互转
9.1 JSON 与 CSV/Excel
# JSON -> CSV
df = pd.read_json('input.json', orient='records')
df.to_csv('output.csv', index=False)
# CSV -> JSON
df_csv = pd.read_csv('input.csv')
df_csv.to_json('output.json', orient='records', indent=2)
# JSON -> Excel
df.to_excel('output.xlsx', index=False)
# Excel -> JSON
df_excel = pd.read_excel('input.xlsx')
df_excel.to_json('output.json', orient='records')
9.2 数据库互转
from sqlalchemy import create_engine
# JSON -> 数据库
df = pd.read_json('data.json')
engine = create_engine('sqlite:///data.db')
df.to_sql('json_table', engine, if_exists='replace', index=False)
# 数据库 -> JSON
df_db = pd.read_sql('SELECT * FROM json_table', engine)
df_db.to_json('output.json', orient='records')
10. 错误处理和验证
10.1 JSON 验证
import jsonschema
from jsonschema import validate, ValidationError
# JSON Schema 定义
schema = {
"type": "object",
"properties": {
"name": {"type": "string"},
"age": {"type": "integer", "minimum": 0},
"city": {"type": "string"}
},
"required": ["name", "age"]
}
def validate_json_data(json_data, schema):
"""验证 JSON 数据"""
try:
validate(instance=json_data, schema=schema)
print("JSON 数据验证通过")
return True
except ValidationError as e:
print(f"JSON 验证失败: {e.message}")
return False
# 使用(单条记录)
record = {"name": "张三", "age": 25, "city": "北京"}
validate_json_data(record, schema)
# DataFrame 验证
df = pd.read_json('data.json')
for _, row in df.iterrows():
validate_json_data(row.to_dict(), schema)
10.2 健壮读取
def robust_json_read(file_path, **kwargs):
"""健壮的 JSON 读取"""
try:
# 尝试标准读取
df = pd.read_json(file_path, **kwargs)
return df
except ValueError as e:
print(f"JSON 格式错误: {e}")
# 尝试逐行读取
try:
with open(file_path, 'r', encoding='utf-8') as f:
lines = [json.loads(line.strip()) for line in f if line.strip()]
df = pd.json_normalize(lines)
return df
except:
pass
# 最后尝试原始文本处理
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
try:
data = json.loads(content)
return pd.json_normalize(data)
except:
raise ValueError("无法解析 JSON 文件")
# 使用
df = robust_json_read('potentially_corrupt.json', lines=True)
11. 高级应用:实时数据流
11.1 WebSocket 数据处理
import asyncio
import websockets
import json
from collections import deque
import pandas as pd
class RealTimeJsonProcessor:
def __init__(self, max_buffer=10000):
self.buffer = deque(maxlen=max_buffer)
self.df = pd.DataFrame()
async def process_stream(self, websocket_url):
"""处理 WebSocket JSON 流"""
async with websockets.connect(websocket_url) as websocket:
while True:
try:
message = await websocket.recv()
data = json.loads(message)
# 添加到缓冲区
self.buffer.append(data)
# 定期转换为 DataFrame
if len(self.buffer) % 100 == 0:
new_df = pd.json_normalize(list(self.buffer)[-100:])
self.df = pd.concat([self.df, new_df], ignore_index=True)
print(f"处理了 {len(self.df)} 条记录")
# 数据分析
if 'price' in new_df.columns:
print(f"最新价格: {new_df['price'].iloc[-1]}")
except Exception as e:
print(f"流处理错误: {e}")
break
def get_summary(self):
"""获取汇总统计"""
return {
'total_records': len(self.df),
'columns': list(self.df.columns),
'memory_usage': self.df.memory_usage(deep=True).sum() / 1024**2
}
# 使用(示例)
# processor = RealTimeJsonProcessor()
# asyncio.run(processor.process_stream('wss://api.example.com/stream'))
11.2 Kafka 集成
from kafka import KafkaConsumer
import json
def process_kafka_json(topic, bootstrap_servers):
"""处理 Kafka JSON 消息"""
consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='latest'
)
records = []
for message in consumer:
data = message.value
# 转换为 DataFrame
df_new = pd.json_normalize([data])
records.append(df_new)
# 批量处理
if len(records) >= 100:
batch_df = pd.concat(records, ignore_index=True)
# 处理业务逻辑
process_batch(batch_df)
records = []
consumer.close()
def process_batch(df):
"""批量处理函数"""
# 示例:实时统计
if not df.empty and 'event_type' in df.columns:
print(f"最新事件: {df['event_type'].value_counts().iloc[0]}")
12. 最佳实践
12.1 数据管道设计
class JsonDataPipeline:
"""JSON 数据处理管道"""
def __init__(self):
self.validators = []
self.transformers = []
self.writers = []
def add_validator(self, validator_func):
"""添加验证器"""
self.validators.append(validator_func)
return self
def add_transformer(self, transformer_func):
"""添加转换器"""
self.transformers.append(transformer_func)
return self
def add_writer(self, writer_func):
"""添加写入器"""
self.writers.append(writer_func)
return self
def process(self, json_data):
"""执行完整处理流程"""
data = json_data
# 验证阶段
for validator in self.validators:
if not validator(data):
raise ValueError("数据验证失败")
# 转换阶段
df = pd.json_normalize(data)
for transformer in self.transformers:
df = transformer(df)
# 写入阶段
for writer in self.writers:
writer(df)
return df
# 使用示例
pipeline = JsonDataPipeline()
# 定义处理函数
def validate_structure(data):
return isinstance(data, list) and all(isinstance(d, dict) for d in data)
def clean_data(df):
return df.dropna().assign(timestamp=pd.to_datetime(df.get('timestamp', None)))
def save_to_db(df):
# 数据库保存逻辑
print(f"保存 {len(df)} 条记录到数据库")
pipeline.add_validator(validate_structure)\
.add_transformer(clean_data)\
.add_writer(save_to_db)
# 处理数据
result = pipeline.process(json_data)
12.2 配置管理
# config.json
JSON_CONFIG = {
"read": {
"orient": "records",
"lines": False,
"encoding": "utf-8",
"dtype": {"id": "int32", "timestamp": "datetime64[ns]"}
},
"write": {
"orient": "records",
"indent": 2,
"date_format": "iso",
"ensure_ascii": False
},
"processing": {
"chunk_size": 10000,
"max_memory_mb": 100,
"category_threshold": 0.5
}
}
def create_json_pipeline(config_path="config.json"):
"""基于配置创建处理管道"""
with open(config_path, 'r') as f:
config = json.load(f)
def read_json(file_path):
return pd.read_json(file_path, **config['read'])
def write_json(df, file_path):
df.to_json(file_path, **config['write'])
return read_json, write_json
# 使用
read_func, write_func = create_json_pipeline()
df = read_func('input.json')
# ... 处理 ...
write_func(df, 'output.json')
JSON 操作是现代数据处理的重要技能,特别是处理 API、日志和配置数据时。掌握不同 orient 格式、嵌套处理和性能优化技巧,可以高效应对各种 JSON 数据场景。