MongoDB 聚合(Aggregation)全面指南
MongoDB 的 聚合框架(Aggregation Framework) 是一个强大的数据处理工具,类似于 SQL 中的 GROUP BY、JOIN、HAVING,但更灵活,支持复杂的数据转换、统计、分析等操作。
一、聚合管道(Aggregation Pipeline)核心概念
聚合操作由一系列 阶段(stage) 组成,每个阶段对输入文档进行转换后输出给下一个阶段。
db.collection.aggregate([
{ $stage1: { ... } },
{ $stage2: { ... } },
...
])
管道是顺序执行的,每个阶段的输出是下一个阶段的输入。
二、常用聚合阶段(Stages)
| 阶段 | 说明 | 示例 |
|---|---|---|
$match | 过滤文档(类似 find) | { status: "active" } |
$project | 选择、重命名、计算字段 | { name: 1, total: { $multiply: ["$price", "$qty"] } } |
$sort | 排序 | { createdAt: -1 } |
$limit / $skip | 分页 | $limit: 10 |
$group | 分组统计(核心) | 按 userId 求和 |
$unwind | 拆分数组(一变多) | { path: "$tags" } |
$lookup | 关联查询(类似 JOIN) | 跨集合关联 |
$addFields | 添加新字段 | { fullName: { $concat: ["$first", " ", "$last"] } } |
$set | 同 $addFields(别名) | |
$unset | 删除字段 | { password: "" } |
$count | 计数 | "total" |
$facet | 多子管道并行处理 | 分页 + 统计 |
$bucket / $bucketAuto | 分桶统计 | 按年龄段分组 |
$sortByCount | 按频次排序 | 统计标签出现次数 |
三、核心示例:从简单到复杂
1. 基本过滤 + 投影
db.orders.aggregate([
{ $match: { status: "completed", total: { $gte: 100 } } },
{ $project: { _id: 0, customerId: 1, total: 1 } }
])
2. 按用户统计订单总额($group)
db.orders.aggregate([
{ $match: { status: "completed" } },
{
$group: {
_id: "$customerId",
totalAmount: { $sum: "$total" },
orderCount: { $sum: 1 },
avgAmount: { $avg: "$total" },
items: { $push: "$item" } // 收集订单项
}
},
{ $sort: { totalAmount: -1 } }
])
输出示例:
{
"_id": 123,
"totalAmount": 850,
"orderCount": 3,
"avgAmount": 283.33,
"items": ["book", "pen", "notebook"]
}
3. 数组展开($unwind)
// 文档结构
{
user: "Alice",
tags: ["mongodb", "nosql", "database"]
}
db.users.aggregate([
{ $unwind: "$tags" },
{
$group: {
_id: "$tags",
count: { $sum: 1 }
}
},
{ $sort: { count: -1 } }
])
4. 关联查询($lookup)—— 类似 SQL JOIN
db.orders.aggregate([
{
$lookup: {
from: "users", // 目标集合
localField: "customerId", // 本集合字段
foreignField: "_id", // 目标集合字段
as: "customer" // 输出数组字段名
}
},
{ $unwind: "$customer" }, // 展开为对象(可选)
{
$project: {
orderId: "$_id",
customerName: "$customer.name",
total: 1
}
}
])
5. 分页 + 统计($facet)
db.products.aggregate([
{ $match: { category: "electronics" } },
{
$facet: {
data: [
{ $skip: 20 },
{ $limit: 10 }
],
metadata: [
{ $count: "total" },
{
$addFields: {
page: 3,
pageSize: 10
}
}
]
}
}
])
输出:
{
"data": [/* 10 条记录 */],
"metadata": [{ "total": 156, "page": 3, "pageSize": 10 }]
}
6. 日期分组统计(按天/月)
db.events.aggregate([
{
$group: {
_id: {
year: { $year: "$createdAt" },
month: { $month: "$createdAt" },
day: { $dayOfMonth: "$createdAt" }
},
count: { $sum: 1 }
}
},
{ $sort: { "_id.year": 1, "_id.month": 1, "_id.day": 1 } }
])
四、高级技巧
1. 累加器(Accumulators)在 $group 中
| 操作符 | 说明 |
|---|---|
$sum | 求和 |
$avg | 平均 |
$max / $min | 最大/最小 |
$push | 推入数组 |
$addToSet | 去重推入 |
$first / $last | 结合 $sort 使用 |
{ $group: { _id: "$city", temps: { $push: "$temp" } } }
2. 条件投影($cond, $switch)
{
$project: {
statusLabel: {
$switch: {
branches: [
{ case: { $eq: ["$status", "completed"] }, then: "已完成" },
{ case: { $eq: ["$status", "pending"] }, then: "待处理" }
],
default: "未知"
}
}
}
}
3. 动态字段名(MongoDB 4.4+)
{
$set: {
$mergeObjects: [
"$$ROOT",
{ dynamicField: { $concat: ["prefix_", "$type"] } }
]
}
}
五、性能优化建议
| 建议 | 说明 |
|---|---|
$match 和 $sort 尽量靠前 | 尽早过滤和排序,减少后续处理文档数 |
| 使用索引 | $match 中的字段应有索引 |
避免在 $project 中做复杂计算 | 提前在 $addFields 中计算 |
$lookup + $unwind 慎用 | 可能产生大量中间文档 |
使用 allowDiskUse: true | 大数据量时允许写入磁盘 |
| 分析执行计划 | .explain("executionStats") |
db.orders.aggregate([...]).explain("executionStats")
六、聚合 vs Map-Reduce vs $group
| 方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| Aggregation Pipeline | 高性能、声明式、支持索引 | 内存限制(默认 100MB) | 99% 的统计需求 |
| Map-Reduce | 可使用 JavaScript,灵活 | 慢、不支持索引 | 复杂逻辑、遗留系统 |
$group(单阶段) | 简单 | 功能有限 | 简单统计 |
推荐:优先使用 Aggregation Pipeline
七、实战完整案例:电商销售仪表盘
db.orders.aggregate([
// 1. 过滤有效订单
{ $match: { status: "completed", createdAt: { $gte: ISODate("2025-01-01") } } },
// 2. 关联用户信息
{
$lookup: {
from: "users",
localField: "customerId",
foreignField: "_id",
as: "customer"
}
},
{ $unwind: "$customer" },
// 3. 多维度统计
{
$facet: {
// 总体指标
overview: [
{
$group: {
_id: null,
totalRevenue: { $sum: "$total" },
totalOrders: { $sum: 1 },
avgOrderValue: { $avg: "$total" }
}
}
],
// 按月趋势
monthlyTrend: [
{
$group: {
_id: { $dateToString: { format: "%Y-%m", date: "$createdAt" } },
revenue: { $sum: "$total" },
orders: { $sum: 1 }
}
},
{ $sort: { "_id": 1 } }
],
// Top 5 用户
topCustomers: [
{
$group: {
_id: "$customerId",
name: { $first: "$customer.name" },
spent: { $sum: "$total" }
}
},
{ $sort: { spent: -1 } },
{ $limit: 5 }
]
}
}
])
八、学习资源
- 官方文档:Aggregation Pipeline
- 聚合操作符参考:Operators
- MongoDB University:免费课程 M121: The MongoDB Aggregation Framework
你想要实现哪种业务场景?
例如:
- 用户行为分析?
- 实时排行榜?
- 多集合关联报表?
欢迎贴出你的 集合结构 + 目标输出,我可以帮你写出 高效、可运行 的聚合管道!