前言
MongoDB 的 Aggregation Pipeline 大概是我覺得最被低估的功能之一。很多團隊用 MongoDB,卻只會 find() 和 updateOne(),遇到複雜的查詢需求就把資料撈回應用層用程式碼處理。這不但效率差,還浪費了 MongoDB 引擎的運算能力。
我曾經接手一個專案,有支 API 的回應時間超過 3 秒,原因就是在 Node.js 裡做了一堆本來可以在資料庫端完成的聚合運算。改用 Aggregation Pipeline 後,回應時間降到 200ms 以下。
這篇文章我會從 Pipeline 的基本概念開始,介紹常用的 stage,再用實際案例展示如何寫出高效的聚合查詢。
Pipeline 基本概念
Aggregation Pipeline 就像一條工廠的生產線:資料從一端進入,經過一連串的「加工站」(stage),每個 stage 對資料做一次轉換,最後輸出結果。
文件集合 → [$match] → [$group] → [$sort] → [$limit] → 結果
每個 stage 的輸出就是下一個 stage 的輸入。這個設計讓你可以把複雜的查詢拆成一個個簡單的步驟。
準備測試資料
我們用一個電商訂單的場景來練習:
// 連線到 MongoDB
// mongosh mongodb://localhost:27017/ecommerce
// 建立測試資料
db.orders.insertMany([
{
orderId: "ORD-001",
customer: { name: "Alice", city: "Taipei", tier: "gold" },
items: [
{ product: "Laptop", category: "electronics", price: 35000, qty: 1 },
{ product: "Mouse", category: "electronics", price: 800, qty: 2 }
],
status: "completed",
createdAt: ISODate("2024-01-15T10:30:00Z")
},
{
orderId: "ORD-002",
customer: { name: "Bob", city: "Tokyo", tier: "silver" },
items: [
{ product: "T-Shirt", category: "clothing", price: 500, qty: 3 },
{ product: "Jeans", category: "clothing", price: 1200, qty: 1 }
],
status: "completed",
createdAt: ISODate("2024-01-16T14:20:00Z")
},
{
orderId: "ORD-003",
customer: { name: "Carol", city: "Taipei", tier: "gold" },
items: [
{ product: "Book", category: "books", price: 350, qty: 5 },
{ product: "Laptop", category: "electronics", price: 35000, qty: 1 }
],
status: "pending",
createdAt: ISODate("2024-01-17T09:15:00Z")
},
{
orderId: "ORD-004",
customer: { name: "Alice", city: "Taipei", tier: "gold" },
items: [
{ product: "Headphones", category: "electronics", price: 2500, qty: 1 },
],
status: "completed",
createdAt: ISODate("2024-02-01T11:00:00Z")
},
{
orderId: "ORD-005",
customer: { name: "Dave", city: "Osaka", tier: "bronze" },
items: [
{ product: "Keyboard", category: "electronics", price: 3000, qty: 1 },
{ product: "Monitor", category: "electronics", price: 12000, qty: 2 }
],
status: "completed",
createdAt: ISODate("2024-02-05T16:45:00Z")
}
]);
常用 Stage 詳解
$match — 過濾文件
$match 就像 SQL 的 WHERE,放在 pipeline 開頭可以大幅減少後續 stage 要處理的資料量。
// 只看已完成的訂單
db.orders.aggregate([
{ $match: { status: "completed" } }
])
// 多條件過濾
db.orders.aggregate([
{
$match: {
status: "completed",
"customer.city": "Taipei",
createdAt: { $gte: ISODate("2024-01-01"), $lt: ISODate("2024-02-01") }
}
}
])
$group — 分組聚合
$group 是最強大的 stage,類似 SQL 的 GROUP BY。
// 每個城市的訂單數和總金額
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $unwind: "$items" },
{
$group: {
_id: "$customer.city",
orderCount: { $sum: 1 },
totalRevenue: { $sum: { $multiply: ["$items.price", "$items.qty"] } },
avgOrderValue: { $avg: { $multiply: ["$items.price", "$items.qty"] } },
customers: { $addToSet: "$customer.name" }
}
},
{ $sort: { totalRevenue: -1 } }
])
常用的累加器(Accumulator):
| 累加器 | 功能 | 範例 |
|——–|——|——|
| $sum | 加總 | { $sum: "$price" } |
| $avg | 平均 | { $avg: "$price" } |
| $min / $max | 最小/最大值 | { $max: "$price" } |
| $first / $last | 第一/最後一筆 | { $first: "$name" } |
| $push | 收集成陣列 | { $push: "$name" } |
| $addToSet | 收集成唯一陣列 | { $addToSet: "$city" } |
| $count | 計數 | { $count: {} } |
$unwind — 展開陣列
$unwind 把陣列欄位展開成多筆文件,是處理內嵌陣列的關鍵。
// 展開 items 陣列
db.orders.aggregate([
{ $unwind: "$items" },
// 原本 1 筆含 2 個 items 的訂單,會變成 2 筆文件
{ $project: { orderId: 1, product: "$items.product", price: "$items.price" } }
])
// 搭配 preserveNullAndEmptyArrays 保留沒有 items 的文件
db.orders.aggregate([
{ $unwind: { path: "$items", preserveNullAndEmptyArrays: true } }
])
$lookup — 跨集合查詢(類似 JOIN)
// 假設有一個 products 集合
db.products.insertMany([
{ name: "Laptop", brand: "ASUS", warranty: 24 },
{ name: "Mouse", brand: "Logitech", warranty: 12 },
{ name: "Keyboard", brand: "Keychron", warranty: 12 }
]);
// 在訂單中 JOIN 產品資訊
db.orders.aggregate([
{ $unwind: "$items" },
{
$lookup: {
from: "products",
localField: "items.product",
foreignField: "name",
as: "productInfo"
}
},
{ $unwind: { path: "$productInfo", preserveNullAndEmptyArrays: true } },
{
$project: {
orderId: 1,
product: "$items.product",
price: "$items.price",
brand: "$productInfo.brand",
warranty: "$productInfo.warranty"
}
}
])
$project 和 $addFields — 欄位轉換
// $project:選擇和重塑欄位
db.orders.aggregate([
{
$project: {
orderId: 1,
customerName: "$customer.name",
city: "$customer.city",
itemCount: { $size: "$items" },
orderMonth: { $month: "$createdAt" },
// 計算訂單總額
totalAmount: {
$reduce: {
input: "$items",
initialValue: 0,
in: { $add: ["$$value", { $multiply: ["$$this.price", "$$this.qty"] }] }
}
}
}
}
])
// $addFields:新增欄位(不影響原有欄位)
db.orders.aggregate([
{
$addFields: {
totalAmount: {
$reduce: {
input: "$items",
initialValue: 0,
in: { $add: ["$$value", { $multiply: ["$$this.price", "$$this.qty"] }] }
}
}
}
},
{ $match: { totalAmount: { $gte: 10000 } } }
])
$facet — 多維度聚合
$facet 讓你在一次查詢中同時執行多條 pipeline,非常適合做 dashboard 的資料。
db.orders.aggregate([
{ $match: { status: "completed" } },
{
$facet: {
// 維度一:按城市統計
byCity: [
{ $group: { _id: "$customer.city", count: { $sum: 1 } } },
{ $sort: { count: -1 } }
],
// 維度二:按月份統計
byMonth: [
{
$group: {
_id: { $month: "$createdAt" },
count: { $sum: 1 }
}
},
{ $sort: { _id: 1 } }
],
// 維度三:按客戶等級統計
byTier: [
{ $group: { _id: "$customer.tier", count: { $sum: 1 } } }
],
// 總計
total: [
{ $count: "count" }
]
}
}
])
實戰案例
案例一:暢銷商品排行榜
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $unwind: "$items" },
{
$group: {
_id: "$items.product",
category: { $first: "$items.category" },
totalSold: { $sum: "$items.qty" },
totalRevenue: { $sum: { $multiply: ["$items.price", "$items.qty"] } },
avgPrice: { $avg: "$items.price" },
orderCount: { $sum: 1 }
}
},
{ $sort: { totalRevenue: -1 } },
{ $limit: 10 },
{
$project: {
_id: 0,
product: "$_id",
category: 1,
totalSold: 1,
totalRevenue: 1,
avgPrice: { $round: ["$avgPrice", 0] }
}
}
])
案例二:RFM 客戶分析
RFM(Recency, Frequency, Monetary)是一個經典的客戶分析框架:
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $unwind: "$items" },
{
$group: {
_id: "$customer.name",
lastOrderDate: { $max: "$createdAt" },
orderFrequency: { $addToSet: "$orderId" },
totalSpent: { $sum: { $multiply: ["$items.price", "$items.qty"] } }
}
},
{
$addFields: {
recencyDays: {
$dateDiff: {
startDate: "$lastOrderDate",
endDate: ISODate("2024-03-01"),
unit: "day"
}
},
frequency: { $size: "$orderFrequency" }
}
},
{
$project: {
_id: 0,
customer: "$_id",
recencyDays: 1,
frequency: 1,
totalSpent: 1,
segment: {
$switch: {
branches: [
{
case: {
$and: [
{ $lte: ["$recencyDays", 30] },
{ $gte: ["$frequency", 2] }
]
},
then: "VIP"
},
{
case: { $lte: ["$recencyDays", 30] },
then: "Active"
},
{
case: { $gte: ["$recencyDays", 60] },
then: "At Risk"
}
],
default: "Regular"
}
}
}
},
{ $sort: { totalSpent: -1 } }
])
案例三:每月營收趨勢
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $unwind: "$items" },
{
$group: {
_id: {
year: { $year: "$createdAt" },
month: { $month: "$createdAt" }
},
revenue: { $sum: { $multiply: ["$items.price", "$items.qty"] } },
orderCount: { $addToSet: "$orderId" },
uniqueCustomers: { $addToSet: "$customer.name" }
}
},
{
$project: {
_id: 0,
period: {
$concat: [
{ $toString: "$_id.year" }, "-",
{ $cond: [{ $lt: ["$_id.month", 10] }, "0", ""] },
{ $toString: "$_id.month" }
]
},
revenue: 1,
orderCount: { $size: "$orderCount" },
uniqueCustomers: { $size: "$uniqueCustomers" },
avgRevenuePerOrder: {
$round: [{ $divide: ["$revenue", { $size: "$orderCount" }] }, 0]
}
}
},
{ $sort: { period: 1 } }
])
效能最佳化
原則一:$match 和 $sort 放最前面
// 好:先過濾再處理
db.orders.aggregate([
{ $match: { status: "completed", createdAt: { $gte: ISODate("2024-01-01") } } },
{ $unwind: "$items" },
{ $group: { / ... / } }
])
// 差:先展開全部再過濾
db.orders.aggregate([
{ $unwind: "$items" },
{ $match: { status: "completed" } }, // 太晚了!
{ $group: { / ... / } }
])
原則二:善用索引
$match 和 $sort 放在 pipeline 開頭時,可以利用索引:
// 建立複合索引
db.orders.createIndex({ status: 1, createdAt: -1 })
// 用 explain 檢查是否有使用索引
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $sort: { createdAt: -1 } }
]).explain("executionStats")
原則三:$project 提早減少欄位
// 在 $unwind 之前先去掉不需要的欄位
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $project: { items: 1, "customer.city": 1 } }, // 只保留需要的
{ $unwind: "$items" },
{ $group: { / ... / } }
])
原則四:考慮 allowDiskUse
大資料集可能超過 100MB 的記憶體限制:
db.orders.aggregate([
// 複雜的 pipeline...
], { allowDiskUse: true })
用 Python (pymongo) 執行 Pipeline
from pymongo import MongoClient
from datetime import datetime
client = MongoClient("mongodb://localhost:27017/")
db = client["ecommerce"]
pipeline = [
{"$match": {"status": "completed"}},
{"$unwind": "$items"},
{
"$group": {
"_id": "$items.category",
"totalRevenue": {
"$sum": {"$multiply": ["$items.price", "$items.qty"]}
},
"itemCount": {"$sum": "$items.qty"}
}
},
{"$sort": {"totalRevenue": -1}},
{
"$project": {
"_id": 0,
"category": "$_id",
"totalRevenue": 1,
"itemCount": 1
}
}
]
results = db.orders.aggregate(pipeline)
for doc in results:
print(f"{doc['category']}: ${doc['totalRevenue']:,.0f} "
f"({doc['itemCount']} items)")
小結
Aggregation Pipeline 是 MongoDB 最強大的查詢功能,學會它可以讓你把很多本來在應用層做的運算下推到資料庫端,不但效能更好,程式碼也更簡潔。
我的建議是:先在 MongoDB Compass 或 mongosh 裡互動式地測試你的 pipeline,一個 stage 一個 stage 地加上去,看每個步驟的輸出是否符合預期。等到確認結果正確,再搬到程式碼裡。
延伸閱讀
- MongoDB Aggregation Pipeline 官方文件
- MongoDB Compass — 視覺化 Pipeline 建構工具
- MongoDB Atlas Charts — 直接從 Pipeline 產生圖表
- 進階主題:
$merge(寫入結果到另一個集合)、$unionWith(合併多個集合)