前言

在微服務架構中,不同的服務常常需要共享資料。最直覺的做法是直接讀取其他服務的資料庫,但這破壞了服務的獨立性。另一個常見做法是雙寫(同時寫兩個資料庫),但這很容易造成資料不一致。

那有沒有一種方式,能在資料庫發生變更時,自動、即時地將變更傳播到其他系統?這就是 CDC(Change Data Capture)要解決的問題。

這篇文章我會介紹 CDC 的概念,並用 Debezium + Kafka 實作一個完整的即時資料同步方案。如果你正在處理資料同步、搜尋索引更新、或是事件驅動架構,CDC 可能是你的救星。


CDC 是什麼?

CDC(Change Data Capture)顧名思義,就是「捕捉資料的變更」。具體來說,它監聽資料庫的每一次 INSERT、UPDATE、DELETE 操作,並將這些變更事件即時傳送到下游系統。

CDC 的方法

| 方法 | 說明 | 優點 | 缺點 |
|——|——|——|——|
| 輪詢(Polling) | 定期查詢變更 | 簡單 | 延遲高、給 DB 壓力 |
| 觸發器(Trigger) | DB 觸發器記錄變更 | 即時 | 影響寫入效能 |
| 日誌解析(Log-based) | 讀取 DB 的交易日誌 | 即時、零侵入 | 實作較複雜 |

Debezium 採用的是 日誌解析 方式,它讀取資料庫的 WAL(PostgreSQL)或 binlog(MySQL),完全不影響原始資料庫的效能。

CDC 的常見應用場景

  • 搜尋索引同步:資料庫變更 → 自動更新 Elasticsearch
  • 快取失效:資料更新 → 自動清除 Redis 快取
  • 資料倉儲同步:OLTP → OLAP 即時同步
  • 事件驅動架構:資料變更 → 觸發下游業務邏輯
  • 跨服務資料同步:微服務之間的資料複製
  • 審計日誌:完整記錄每一次資料變更

Debezium 架構

Debezium 是一個基於 Kafka Connect 的開源 CDC 平台。它的架構如下:

[PostgreSQL] --WAL--> [Debezium Connector] ---> [Kafka] ---> [Consumer]
[MySQL]    --binlog-> [Debezium Connector] ---> [Kafka] ---> [Consumer]
[MongoDB]  --oplog--> [Debezium Connector] ---> [Kafka] ---> [Consumer]

核心元件:

  • Source Connector:從資料庫讀取變更(Debezium 提供)
  • Kafka:訊息佇列,暫存變更事件
  • Sink Connector:將變更寫入目標系統(Elasticsearch、另一個 DB 等)

完整實作:PostgreSQL CDC

環境搭建

# docker-compose.yml
version: '3.8'
services:
  # PostgreSQL(來源資料庫)
  postgres:
    image: postgres:16
    ports:
      - "5432:5432"
    environment:
      POSTGRES_USER: cdc_user
      POSTGRES_PASSWORD: cdc_pass
      POSTGRES_DB: myapp
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"           # CDC 必要設定
      - "-c"
      - "max_replication_slots=4"
      - "-c"
      - "max_wal_senders=4"
    volumes:
      - pg_data:/var/lib/postgresql/data

# Zookeeper(Kafka 依賴) zookeeper: image: confluentinc/cp-zookeeper:7.5.3 environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000

# Kafka kafka: image: confluentinc/cp-kafka:7.5.3 depends_on: - zookeeper ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

# Kafka Connect(Debezium 運行在這裡) connect: image: debezium/connect:2.5 depends_on: - kafka - postgres ports: - "8083:8083" environment: BOOTSTRAP_SERVERS: kafka:29092 GROUP_ID: "1" CONFIG_STORAGE_TOPIC: connect_configs OFFSET_STORAGE_TOPIC: connect_offsets STATUS_STORAGE_TOPIC: connect_statuses CONFIG_STORAGE_REPLICATION_FACTOR: 1 OFFSET_STORAGE_REPLICATION_FACTOR: 1 STATUS_STORAGE_REPLICATION_FACTOR: 1

# Kafka UI(方便觀察) kafka-ui: image: provectuslabs/kafka-ui:latest ports: - "8080:8080" environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: debezium KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://connect:8083

volumes: pg_data:

# 啟動所有服務
docker compose up -d

# 等待所有服務就緒 sleep 30

# 檢查 Kafka Connect 是否就緒 curl -s http://localhost:8083/ | python3 -m json.tool

準備資料庫

-- 連線到 PostgreSQL
-- psql -h localhost -U cdc_user -d myapp

-- 建立業務表 CREATE TABLE users ( id SERIAL PRIMARY KEY, name VARCHAR(100) NOT NULL, email VARCHAR(200) UNIQUE NOT NULL, status VARCHAR(20) DEFAULT 'active', created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() );

CREATE TABLE orders ( id SERIAL PRIMARY KEY, user_id INTEGER REFERENCES users(id), product VARCHAR(200) NOT NULL, amount DECIMAL(10,2) NOT NULL, status VARCHAR(20) DEFAULT 'pending', created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() );

-- 建立更新時間觸發器 CREATE OR REPLACE FUNCTION update_updated_at() RETURNS TRIGGER AS $$ BEGIN NEW.updated_at = NOW(); RETURN NEW; END; $$ LANGUAGE plpgsql;

CREATE TRIGGER users_updated_at BEFORE UPDATE ON users FOR EACH ROW EXECUTE FUNCTION update_updated_at();

CREATE TRIGGER orders_updated_at BEFORE UPDATE ON orders FOR EACH ROW EXECUTE FUNCTION update_updated_at();

-- 插入初始資料 INSERT INTO users (name, email) VALUES ('Alice', 'alice@example.com'), ('Bob', 'bob@example.com'), ('Carol', 'carol@example.com');

INSERT INTO orders (user_id, product, amount) VALUES (1, 'Laptop', 35000.00), (2, 'Keyboard', 3000.00), (1, 'Mouse', 800.00);

設定 Debezium Connector

# 註冊 PostgreSQL Connector
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "myapp-postgres-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "cdc_user",
        "database.password": "cdc_pass",
        "database.dbname": "myapp",
        "topic.prefix": "myapp",
        "table.include.list": "public.users,public.orders",
        "plugin.name": "pgoutput",
        "publication.name": "dbz_publication",
        "slot.name": "debezium_slot",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "snapshot.mode": "initial",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": false,
        "transforms.unwrap.delete.handling.mode": "rewrite"
    }
}'

# 檢查 Connector 狀態 curl -s http://localhost:8083/connectors/myapp-postgres-connector/status | python3 -m json.tool

驗證 CDC 事件

# 使用 Kafka console consumer 查看事件
docker exec -it <kafka-container> kafka-console-consumer \
    --bootstrap-server localhost:29092 \
    --topic myapp.public.users \
    --from-beginning

現在去資料庫做一些操作:

-- INSERT
INSERT INTO users (name, email) VALUES ('Dave', 'dave@example.com');

-- UPDATE UPDATE users SET status = 'inactive' WHERE name = 'Bob';

-- DELETE DELETE FROM users WHERE name = 'Carol';

你會在 Kafka consumer 中看到對應的事件,格式類似:

{
  "id": 4,
  "name": "Dave",
  "email": "dave@example.com",
  "status": "active",
  "created_at": "2024-02-15T10:30:00.000000Z",
  "updated_at": "2024-02-15T10:30:00.000000Z",
  "__op": "c",
  "__deleted": "false"
}

__op 欄位表示操作類型:c = create, u = update, d = delete。


用 Python 消費 CDC 事件

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer( 'myapp.public.users', 'myapp.public.orders', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, group_id='cdc-consumer-group', value_deserializer=lambda m: json.loads(m.decode('utf-8')) if m else None )

print("Listening for CDC events...") for message in consumer: topic = message.topic value = message.value

if value is None: print(f"[{topic}] Tombstone event (deletion marker)") continue

op = value.get('__op', 'unknown') deleted = value.get('__deleted', 'false')

op_names = {'c': 'INSERT', 'u': 'UPDATE', 'd': 'DELETE', 'r': 'SNAPSHOT'} op_name = op_names.get(op, op)

print(f"[{topic}] {op_name}: {json.dumps(value, indent=2, default=str)}")

# 根據事件類型執行不同的邏輯 table = topic.split('.')[-1]

if table == 'users': handle_user_change(op, value) elif table == 'orders': handle_order_change(op, value)

def handle_user_change(op, data): """處理用戶資料變更""" if op == 'c': # 新用戶 → 發送歡迎郵件、同步到 Elasticsearch print(f" -> New user: {data['name']}, syncing to search index...") # elasticsearch.index(index='users', body=data) elif op == 'u': # 用戶更新 → 更新搜尋索引、清除快取 print(f" -> User updated: {data['name']}, invalidating cache...") # redis.delete(f"user:{data['id']}") elif op == 'd': # 用戶刪除 → 從搜尋索引移除 print(f" -> User deleted: id={data.get('id')}")

def handle_order_change(op, data): """處理訂單資料變更""" if op == 'c': print(f" -> New order: {data['product']} (${data['amount']})") # 通知倉庫、更新庫存... elif op == 'u' and data.get('status') == 'completed': print(f" -> Order completed: {data['id']}") # 觸發出貨流程...


實際應用:同步到 Elasticsearch

一個常見的 CDC 應用是把資料庫的變更即時同步到搜尋引擎:

from kafka import KafkaConsumer
from elasticsearch import Elasticsearch
import json

es = Elasticsearch("http://localhost:9200") consumer = KafkaConsumer( 'myapp.public.users', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', group_id='es-sync-group', value_deserializer=lambda m: json.loads(m.decode('utf-8')) if m else None )

# 確保 Elasticsearch 索引存在 if not es.indices.exists(index="users"): es.indices.create(index="users", body={ "mappings": { "properties": { "name": {"type": "text"}, "email": {"type": "keyword"}, "status": {"type": "keyword"}, "created_at": {"type": "date"} } } })

print("Syncing users to Elasticsearch...") for message in consumer: value = message.value if value is None: continue

op = value.get('__op', '') deleted = value.get('__deleted', 'false') == 'true' user_id = value.get('id')

if deleted or op == 'd': # 刪除 try: es.delete(index="users", id=user_id) print(f"Deleted user {user_id} from ES") except Exception: pass else: # 新增或更新 doc = { "name": value.get("name"), "email": value.get("email"), "status": value.get("status"), "created_at": value.get("created_at") } es.index(index="users", id=user_id, body=doc) print(f"Indexed user {user_id}: {doc['name']}")


進階主題

處理 Schema 變更

資料庫的表結構會隨時間演進,Debezium 可以偵測並處理 schema 變更:

# 在 Connector 設定中加入 schema 歷史記錄
curl -X PUT http://localhost:8083/connectors/myapp-postgres-connector/config \
  -H "Content-Type: application/json" \
  -d '{
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "cdc_user",
    "database.password": "cdc_pass",
    "database.dbname": "myapp",
    "topic.prefix": "myapp",
    "table.include.list": "public.users,public.orders",
    "plugin.name": "pgoutput",
    "key.converter.schemas.enable": true,
    "value.converter.schemas.enable": true,
    "include.schema.changes": true
  }'

處理大量歷史資料(Snapshot)

首次啟動 Debezium 時,它會做一次完整的快照(snapshot):

{
    "snapshot.mode": "initial",
    "snapshot.fetch.size": 10000,
    "snapshot.max.threads": 4
}

snapshot.mode 的選項:

  • initial:首次連線時做完整快照,之後只讀增量
  • never:不做快照,只讀增量(適合已有資料同步的場景)
  • always:每次重啟都做完整快照

監控 Debezium

# 查看 Connector 狀態
curl -s http://localhost:8083/connectors/myapp-postgres-connector/status

# 查看 Connector 的 lag(延遲) curl -s http://localhost:8083/connectors/myapp-postgres-connector/status | \ python3 -c "import sys,json; d=json.load(sys.stdin); print(json.dumps(d['tasks'], indent=2))"

# 暫停 Connector curl -X PUT http://localhost:8083/connectors/myapp-postgres-connector/pause

# 恢復 Connector curl -X PUT http://localhost:8083/connectors/myapp-postgres-connector/resume

# 重啟 Connector curl -X POST http://localhost:8083/connectors/myapp-postgres-connector/restart

# 刪除 Connector curl -X DELETE http://localhost:8083/connectors/myapp-postgres-connector

錯誤處理策略

{
    "errors.tolerance": "all",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    "errors.deadletterqueue.topic.name": "myapp.dlq",
    "errors.deadletterqueue.topic.replication.factor": 1,
    "errors.deadletterqueue.context.headers.enable": true
}

這個設定會把處理失敗的事件送到 Dead Letter Queue(DLQ),而不是讓整個 Connector 停止。


Debezium 以外的 CDC 方案

| 方案 | 支援的資料庫 | 是否需要 Kafka | 特點 |
|——|————-|—————|——|
| Debezium | PG, MySQL, MongoDB, SQL Server | 是 | 最成熟、功能最完整 |
| Maxwell | MySQL | 是 | 輕量、只支援 MySQL |
| pg_logical | PostgreSQL | 否 | PG 原生、簡單 |
| DynamoDB Streams | DynamoDB | 否 | AWS 原生 |
| MongoDB Change Streams | MongoDB | 否 | MongoDB 原生 |

如果你只需要簡單的 PostgreSQL CDC,不想搞 Kafka,可以用原生的 logical replication:

-- PostgreSQL 原生 logical replication
CREATE PUBLICATION my_pub FOR TABLE users, orders;

-- 在另一台 PG 建立 subscription CREATE SUBSCRIPTION my_sub CONNECTION 'host=source_host dbname=myapp user=cdc_user password=cdc_pass' PUBLICATION my_pub;


小結

CDC 是一個在資料架構中經常被忽略但非常強大的模式。它解決了「如何在不侵入原始系統的情況下,即時獲取資料變更」這個核心問題。

Debezium + Kafka 的組合是目前最成熟的 CDC 方案,雖然初始架設需要一些功夫(畢竟要跑 Kafka、Zookeeper、Connect),但一旦架起來,它的可靠性和靈活性是非常值得的。

我的建議:

  1. 如果只是 PostgreSQL 之間的同步 → 用原生的 logical replication,最簡單
  2. 如果需要同步到異質系統(ES、Redis、資料倉儲) → Debezium + Kafka
  3. 如果你的組織已經有 Kafka → Debezium 是最自然的選擇
  4. 先從一兩張表開始 → 不要一次監聽所有表,逐步擴展

延伸閱讀