前言

MongoDB 的 Aggregation Pipeline 大概是我覺得最被低估的功能之一。很多團隊用 MongoDB,卻只會 find()updateOne(),遇到複雜的查詢需求就把資料撈回應用層用程式碼處理。這不但效率差,還浪費了 MongoDB 引擎的運算能力。

我曾經接手一個專案,有支 API 的回應時間超過 3 秒,原因就是在 Node.js 裡做了一堆本來可以在資料庫端完成的聚合運算。改用 Aggregation Pipeline 後,回應時間降到 200ms 以下。

這篇文章我會從 Pipeline 的基本概念開始,介紹常用的 stage,再用實際案例展示如何寫出高效的聚合查詢。


Pipeline 基本概念

Aggregation Pipeline 就像一條工廠的生產線:資料從一端進入,經過一連串的「加工站」(stage),每個 stage 對資料做一次轉換,最後輸出結果。

文件集合 → [$match] → [$group] → [$sort] → [$limit] → 結果

每個 stage 的輸出就是下一個 stage 的輸入。這個設計讓你可以把複雜的查詢拆成一個個簡單的步驟。

準備測試資料

我們用一個電商訂單的場景來練習:

// 連線到 MongoDB
// mongosh mongodb://localhost:27017/ecommerce

// 建立測試資料 db.orders.insertMany([ { orderId: "ORD-001", customer: { name: "Alice", city: "Taipei", tier: "gold" }, items: [ { product: "Laptop", category: "electronics", price: 35000, qty: 1 }, { product: "Mouse", category: "electronics", price: 800, qty: 2 } ], status: "completed", createdAt: ISODate("2024-01-15T10:30:00Z") }, { orderId: "ORD-002", customer: { name: "Bob", city: "Tokyo", tier: "silver" }, items: [ { product: "T-Shirt", category: "clothing", price: 500, qty: 3 }, { product: "Jeans", category: "clothing", price: 1200, qty: 1 } ], status: "completed", createdAt: ISODate("2024-01-16T14:20:00Z") }, { orderId: "ORD-003", customer: { name: "Carol", city: "Taipei", tier: "gold" }, items: [ { product: "Book", category: "books", price: 350, qty: 5 }, { product: "Laptop", category: "electronics", price: 35000, qty: 1 } ], status: "pending", createdAt: ISODate("2024-01-17T09:15:00Z") }, { orderId: "ORD-004", customer: { name: "Alice", city: "Taipei", tier: "gold" }, items: [ { product: "Headphones", category: "electronics", price: 2500, qty: 1 }, ], status: "completed", createdAt: ISODate("2024-02-01T11:00:00Z") }, { orderId: "ORD-005", customer: { name: "Dave", city: "Osaka", tier: "bronze" }, items: [ { product: "Keyboard", category: "electronics", price: 3000, qty: 1 }, { product: "Monitor", category: "electronics", price: 12000, qty: 2 } ], status: "completed", createdAt: ISODate("2024-02-05T16:45:00Z") } ]);


常用 Stage 詳解

$match — 過濾文件

$match 就像 SQL 的 WHERE,放在 pipeline 開頭可以大幅減少後續 stage 要處理的資料量。

// 只看已完成的訂單
db.orders.aggregate([
  { $match: { status: "completed" } }
])

// 多條件過濾 db.orders.aggregate([ { $match: { status: "completed", "customer.city": "Taipei", createdAt: { $gte: ISODate("2024-01-01"), $lt: ISODate("2024-02-01") } } } ])

$group — 分組聚合

$group 是最強大的 stage,類似 SQL 的 GROUP BY

// 每個城市的訂單數和總金額
db.orders.aggregate([
  { $match: { status: "completed" } },
  { $unwind: "$items" },
  {
    $group: {
      _id: "$customer.city",
      orderCount: { $sum: 1 },
      totalRevenue: { $sum: { $multiply: ["$items.price", "$items.qty"] } },
      avgOrderValue: { $avg: { $multiply: ["$items.price", "$items.qty"] } },
      customers: { $addToSet: "$customer.name" }
    }
  },
  { $sort: { totalRevenue: -1 } }
])

常用的累加器(Accumulator):

| 累加器 | 功能 | 範例 |
|——–|——|——|
| $sum | 加總 | { $sum: "$price" } |
| $avg | 平均 | { $avg: "$price" } |
| $min / $max | 最小/最大值 | { $max: "$price" } |
| $first / $last | 第一/最後一筆 | { $first: "$name" } |
| $push | 收集成陣列 | { $push: "$name" } |
| $addToSet | 收集成唯一陣列 | { $addToSet: "$city" } |
| $count | 計數 | { $count: {} } |

$unwind — 展開陣列

$unwind 把陣列欄位展開成多筆文件,是處理內嵌陣列的關鍵。

// 展開 items 陣列
db.orders.aggregate([
  { $unwind: "$items" },
  // 原本 1 筆含 2 個 items 的訂單,會變成 2 筆文件
  { $project: { orderId: 1, product: "$items.product", price: "$items.price" } }
])

// 搭配 preserveNullAndEmptyArrays 保留沒有 items 的文件 db.orders.aggregate([ { $unwind: { path: "$items", preserveNullAndEmptyArrays: true } } ])

$lookup — 跨集合查詢(類似 JOIN)

// 假設有一個 products 集合
db.products.insertMany([
  { name: "Laptop", brand: "ASUS", warranty: 24 },
  { name: "Mouse", brand: "Logitech", warranty: 12 },
  { name: "Keyboard", brand: "Keychron", warranty: 12 }
]);

// 在訂單中 JOIN 產品資訊 db.orders.aggregate([ { $unwind: "$items" }, { $lookup: { from: "products", localField: "items.product", foreignField: "name", as: "productInfo" } }, { $unwind: { path: "$productInfo", preserveNullAndEmptyArrays: true } }, { $project: { orderId: 1, product: "$items.product", price: "$items.price", brand: "$productInfo.brand", warranty: "$productInfo.warranty" } } ])

$project 和 $addFields — 欄位轉換

// $project:選擇和重塑欄位
db.orders.aggregate([
  {
    $project: {
      orderId: 1,
      customerName: "$customer.name",
      city: "$customer.city",
      itemCount: { $size: "$items" },
      orderMonth: { $month: "$createdAt" },
      // 計算訂單總額
      totalAmount: {
        $reduce: {
          input: "$items",
          initialValue: 0,
          in: { $add: ["$$value", { $multiply: ["$$this.price", "$$this.qty"] }] }
        }
      }
    }
  }
])

// $addFields:新增欄位(不影響原有欄位) db.orders.aggregate([ { $addFields: { totalAmount: { $reduce: { input: "$items", initialValue: 0, in: { $add: ["$$value", { $multiply: ["$$this.price", "$$this.qty"] }] } } } } }, { $match: { totalAmount: { $gte: 10000 } } } ])

$facet — 多維度聚合

$facet 讓你在一次查詢中同時執行多條 pipeline,非常適合做 dashboard 的資料。

db.orders.aggregate([
  { $match: { status: "completed" } },
  {
    $facet: {
      // 維度一:按城市統計
      byCity: [
        { $group: { _id: "$customer.city", count: { $sum: 1 } } },
        { $sort: { count: -1 } }
      ],
      // 維度二:按月份統計
      byMonth: [
        {
          $group: {
            _id: { $month: "$createdAt" },
            count: { $sum: 1 }
          }
        },
        { $sort: { _id: 1 } }
      ],
      // 維度三:按客戶等級統計
      byTier: [
        { $group: { _id: "$customer.tier", count: { $sum: 1 } } }
      ],
      // 總計
      total: [
        { $count: "count" }
      ]
    }
  }
])

實戰案例

案例一:暢銷商品排行榜

db.orders.aggregate([
  { $match: { status: "completed" } },
  { $unwind: "$items" },
  {
    $group: {
      _id: "$items.product",
      category: { $first: "$items.category" },
      totalSold: { $sum: "$items.qty" },
      totalRevenue: { $sum: { $multiply: ["$items.price", "$items.qty"] } },
      avgPrice: { $avg: "$items.price" },
      orderCount: { $sum: 1 }
    }
  },
  { $sort: { totalRevenue: -1 } },
  { $limit: 10 },
  {
    $project: {
      _id: 0,
      product: "$_id",
      category: 1,
      totalSold: 1,
      totalRevenue: 1,
      avgPrice: { $round: ["$avgPrice", 0] }
    }
  }
])

案例二:RFM 客戶分析

RFM(Recency, Frequency, Monetary)是一個經典的客戶分析框架:

db.orders.aggregate([
  { $match: { status: "completed" } },
  { $unwind: "$items" },
  {
    $group: {
      _id: "$customer.name",
      lastOrderDate: { $max: "$createdAt" },
      orderFrequency: { $addToSet: "$orderId" },
      totalSpent: { $sum: { $multiply: ["$items.price", "$items.qty"] } }
    }
  },
  {
    $addFields: {
      recencyDays: {
        $dateDiff: {
          startDate: "$lastOrderDate",
          endDate: ISODate("2024-03-01"),
          unit: "day"
        }
      },
      frequency: { $size: "$orderFrequency" }
    }
  },
  {
    $project: {
      _id: 0,
      customer: "$_id",
      recencyDays: 1,
      frequency: 1,
      totalSpent: 1,
      segment: {
        $switch: {
          branches: [
            {
              case: {
                $and: [
                  { $lte: ["$recencyDays", 30] },
                  { $gte: ["$frequency", 2] }
                ]
              },
              then: "VIP"
            },
            {
              case: { $lte: ["$recencyDays", 30] },
              then: "Active"
            },
            {
              case: { $gte: ["$recencyDays", 60] },
              then: "At Risk"
            }
          ],
          default: "Regular"
        }
      }
    }
  },
  { $sort: { totalSpent: -1 } }
])

案例三:每月營收趨勢

db.orders.aggregate([
  { $match: { status: "completed" } },
  { $unwind: "$items" },
  {
    $group: {
      _id: {
        year: { $year: "$createdAt" },
        month: { $month: "$createdAt" }
      },
      revenue: { $sum: { $multiply: ["$items.price", "$items.qty"] } },
      orderCount: { $addToSet: "$orderId" },
      uniqueCustomers: { $addToSet: "$customer.name" }
    }
  },
  {
    $project: {
      _id: 0,
      period: {
        $concat: [
          { $toString: "$_id.year" }, "-",
          { $cond: [{ $lt: ["$_id.month", 10] }, "0", ""] },
          { $toString: "$_id.month" }
        ]
      },
      revenue: 1,
      orderCount: { $size: "$orderCount" },
      uniqueCustomers: { $size: "$uniqueCustomers" },
      avgRevenuePerOrder: {
        $round: [{ $divide: ["$revenue", { $size: "$orderCount" }] }, 0]
      }
    }
  },
  { $sort: { period: 1 } }
])

效能最佳化

原則一:$match 和 $sort 放最前面

// 好:先過濾再處理
db.orders.aggregate([
  { $match: { status: "completed", createdAt: { $gte: ISODate("2024-01-01") } } },
  { $unwind: "$items" },
  { $group: { / ... / } }
])

// 差:先展開全部再過濾 db.orders.aggregate([ { $unwind: "$items" }, { $match: { status: "completed" } }, // 太晚了! { $group: { / ... / } } ])

原則二:善用索引

$match$sort 放在 pipeline 開頭時,可以利用索引:

// 建立複合索引
db.orders.createIndex({ status: 1, createdAt: -1 })

// 用 explain 檢查是否有使用索引 db.orders.aggregate([ { $match: { status: "completed" } }, { $sort: { createdAt: -1 } } ]).explain("executionStats")

原則三:$project 提早減少欄位

// 在 $unwind 之前先去掉不需要的欄位
db.orders.aggregate([
  { $match: { status: "completed" } },
  { $project: { items: 1, "customer.city": 1 } },  // 只保留需要的
  { $unwind: "$items" },
  { $group: { / ... / } }
])

原則四:考慮 allowDiskUse

大資料集可能超過 100MB 的記憶體限制:

db.orders.aggregate([
  // 複雜的 pipeline...
], { allowDiskUse: true })

用 Python (pymongo) 執行 Pipeline

from pymongo import MongoClient
from datetime import datetime

client = MongoClient("mongodb://localhost:27017/") db = client["ecommerce"]

pipeline = [ {"$match": {"status": "completed"}}, {"$unwind": "$items"}, { "$group": { "_id": "$items.category", "totalRevenue": { "$sum": {"$multiply": ["$items.price", "$items.qty"]} }, "itemCount": {"$sum": "$items.qty"} } }, {"$sort": {"totalRevenue": -1}}, { "$project": { "_id": 0, "category": "$_id", "totalRevenue": 1, "itemCount": 1 } } ]

results = db.orders.aggregate(pipeline) for doc in results: print(f"{doc['category']}: ${doc['totalRevenue']:,.0f} " f"({doc['itemCount']} items)")


小結

Aggregation Pipeline 是 MongoDB 最強大的查詢功能,學會它可以讓你把很多本來在應用層做的運算下推到資料庫端,不但效能更好,程式碼也更簡潔。

我的建議是:先在 MongoDB Compass 或 mongosh 裡互動式地測試你的 pipeline,一個 stage 一個 stage 地加上去,看每個步驟的輸出是否符合預期。等到確認結果正確,再搬到程式碼裡。

延伸閱讀