前言
在微服務架構中,不同的服務常常需要共享資料。最直覺的做法是直接讀取其他服務的資料庫,但這破壞了服務的獨立性。另一個常見做法是雙寫(同時寫兩個資料庫),但這很容易造成資料不一致。
那有沒有一種方式,能在資料庫發生變更時,自動、即時地將變更傳播到其他系統?這就是 CDC(Change Data Capture)要解決的問題。
這篇文章我會介紹 CDC 的概念,並用 Debezium + Kafka 實作一個完整的即時資料同步方案。如果你正在處理資料同步、搜尋索引更新、或是事件驅動架構,CDC 可能是你的救星。
CDC 是什麼?
CDC(Change Data Capture)顧名思義,就是「捕捉資料的變更」。具體來說,它監聽資料庫的每一次 INSERT、UPDATE、DELETE 操作,並將這些變更事件即時傳送到下游系統。
CDC 的方法
| 方法 | 說明 | 優點 | 缺點 |
|——|——|——|——|
| 輪詢(Polling) | 定期查詢變更 | 簡單 | 延遲高、給 DB 壓力 |
| 觸發器(Trigger) | DB 觸發器記錄變更 | 即時 | 影響寫入效能 |
| 日誌解析(Log-based) | 讀取 DB 的交易日誌 | 即時、零侵入 | 實作較複雜 |
Debezium 採用的是 日誌解析 方式,它讀取資料庫的 WAL(PostgreSQL)或 binlog(MySQL),完全不影響原始資料庫的效能。
CDC 的常見應用場景
- 搜尋索引同步:資料庫變更 → 自動更新 Elasticsearch
- 快取失效:資料更新 → 自動清除 Redis 快取
- 資料倉儲同步:OLTP → OLAP 即時同步
- 事件驅動架構:資料變更 → 觸發下游業務邏輯
- 跨服務資料同步:微服務之間的資料複製
- 審計日誌:完整記錄每一次資料變更
Debezium 架構
Debezium 是一個基於 Kafka Connect 的開源 CDC 平台。它的架構如下:
[PostgreSQL] --WAL--> [Debezium Connector] ---> [Kafka] ---> [Consumer]
[MySQL] --binlog-> [Debezium Connector] ---> [Kafka] ---> [Consumer]
[MongoDB] --oplog--> [Debezium Connector] ---> [Kafka] ---> [Consumer]
核心元件:
- Source Connector:從資料庫讀取變更(Debezium 提供)
- Kafka:訊息佇列,暫存變更事件
- Sink Connector:將變更寫入目標系統(Elasticsearch、另一個 DB 等)
完整實作:PostgreSQL CDC
環境搭建
# docker-compose.yml
version: '3.8'
services:
# PostgreSQL(來源資料庫)
postgres:
image: postgres:16
ports:
- "5432:5432"
environment:
POSTGRES_USER: cdc_user
POSTGRES_PASSWORD: cdc_pass
POSTGRES_DB: myapp
command:
- "postgres"
- "-c"
- "wal_level=logical" # CDC 必要設定
- "-c"
- "max_replication_slots=4"
- "-c"
- "max_wal_senders=4"
volumes:
- pg_data:/var/lib/postgresql/data
# Zookeeper(Kafka 依賴)
zookeeper:
image: confluentinc/cp-zookeeper:7.5.3
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
# Kafka
kafka:
image: confluentinc/cp-kafka:7.5.3
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
# Kafka Connect(Debezium 運行在這裡)
connect:
image: debezium/connect:2.5
depends_on:
- kafka
- postgres
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:29092
GROUP_ID: "1"
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statuses
CONFIG_STORAGE_REPLICATION_FACTOR: 1
OFFSET_STORAGE_REPLICATION_FACTOR: 1
STATUS_STORAGE_REPLICATION_FACTOR: 1
# Kafka UI(方便觀察)
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: debezium
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://connect:8083
volumes:
pg_data:
# 啟動所有服務
docker compose up -d
# 等待所有服務就緒
sleep 30
# 檢查 Kafka Connect 是否就緒
curl -s http://localhost:8083/ | python3 -m json.tool
準備資料庫
-- 連線到 PostgreSQL
-- psql -h localhost -U cdc_user -d myapp
-- 建立業務表
CREATE TABLE users (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(200) UNIQUE NOT NULL,
status VARCHAR(20) DEFAULT 'active',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
product VARCHAR(200) NOT NULL,
amount DECIMAL(10,2) NOT NULL,
status VARCHAR(20) DEFAULT 'pending',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- 建立更新時間觸發器
CREATE OR REPLACE FUNCTION update_updated_at()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER users_updated_at
BEFORE UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION update_updated_at();
CREATE TRIGGER orders_updated_at
BEFORE UPDATE ON orders
FOR EACH ROW EXECUTE FUNCTION update_updated_at();
-- 插入初始資料
INSERT INTO users (name, email) VALUES
('Alice', 'alice@example.com'),
('Bob', 'bob@example.com'),
('Carol', 'carol@example.com');
INSERT INTO orders (user_id, product, amount) VALUES
(1, 'Laptop', 35000.00),
(2, 'Keyboard', 3000.00),
(1, 'Mouse', 800.00);
設定 Debezium Connector
# 註冊 PostgreSQL Connector
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "myapp-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "cdc_user",
"database.password": "cdc_pass",
"database.dbname": "myapp",
"topic.prefix": "myapp",
"table.include.list": "public.users,public.orders",
"plugin.name": "pgoutput",
"publication.name": "dbz_publication",
"slot.name": "debezium_slot",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"snapshot.mode": "initial",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": false,
"transforms.unwrap.delete.handling.mode": "rewrite"
}
}'
# 檢查 Connector 狀態
curl -s http://localhost:8083/connectors/myapp-postgres-connector/status | python3 -m json.tool
驗證 CDC 事件
# 使用 Kafka console consumer 查看事件
docker exec -it <kafka-container> kafka-console-consumer \
--bootstrap-server localhost:29092 \
--topic myapp.public.users \
--from-beginning
現在去資料庫做一些操作:
-- INSERT
INSERT INTO users (name, email) VALUES ('Dave', 'dave@example.com');
-- UPDATE
UPDATE users SET status = 'inactive' WHERE name = 'Bob';
-- DELETE
DELETE FROM users WHERE name = 'Carol';
你會在 Kafka consumer 中看到對應的事件,格式類似:
{
"id": 4,
"name": "Dave",
"email": "dave@example.com",
"status": "active",
"created_at": "2024-02-15T10:30:00.000000Z",
"updated_at": "2024-02-15T10:30:00.000000Z",
"__op": "c",
"__deleted": "false"
}
__op 欄位表示操作類型:c = create, u = update, d = delete。
用 Python 消費 CDC 事件
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'myapp.public.users',
'myapp.public.orders',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='cdc-consumer-group',
value_deserializer=lambda m: json.loads(m.decode('utf-8')) if m else None
)
print("Listening for CDC events...")
for message in consumer:
topic = message.topic
value = message.value
if value is None:
print(f"[{topic}] Tombstone event (deletion marker)")
continue
op = value.get('__op', 'unknown')
deleted = value.get('__deleted', 'false')
op_names = {'c': 'INSERT', 'u': 'UPDATE', 'd': 'DELETE', 'r': 'SNAPSHOT'}
op_name = op_names.get(op, op)
print(f"[{topic}] {op_name}: {json.dumps(value, indent=2, default=str)}")
# 根據事件類型執行不同的邏輯
table = topic.split('.')[-1]
if table == 'users':
handle_user_change(op, value)
elif table == 'orders':
handle_order_change(op, value)
def handle_user_change(op, data):
"""處理用戶資料變更"""
if op == 'c':
# 新用戶 → 發送歡迎郵件、同步到 Elasticsearch
print(f" -> New user: {data['name']}, syncing to search index...")
# elasticsearch.index(index='users', body=data)
elif op == 'u':
# 用戶更新 → 更新搜尋索引、清除快取
print(f" -> User updated: {data['name']}, invalidating cache...")
# redis.delete(f"user:{data['id']}")
elif op == 'd':
# 用戶刪除 → 從搜尋索引移除
print(f" -> User deleted: id={data.get('id')}")
def handle_order_change(op, data):
"""處理訂單資料變更"""
if op == 'c':
print(f" -> New order: {data['product']} (${data['amount']})")
# 通知倉庫、更新庫存...
elif op == 'u' and data.get('status') == 'completed':
print(f" -> Order completed: {data['id']}")
# 觸發出貨流程...
實際應用:同步到 Elasticsearch
一個常見的 CDC 應用是把資料庫的變更即時同步到搜尋引擎:
from kafka import KafkaConsumer
from elasticsearch import Elasticsearch
import json
es = Elasticsearch("http://localhost:9200")
consumer = KafkaConsumer(
'myapp.public.users',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
group_id='es-sync-group',
value_deserializer=lambda m: json.loads(m.decode('utf-8')) if m else None
)
# 確保 Elasticsearch 索引存在
if not es.indices.exists(index="users"):
es.indices.create(index="users", body={
"mappings": {
"properties": {
"name": {"type": "text"},
"email": {"type": "keyword"},
"status": {"type": "keyword"},
"created_at": {"type": "date"}
}
}
})
print("Syncing users to Elasticsearch...")
for message in consumer:
value = message.value
if value is None:
continue
op = value.get('__op', '')
deleted = value.get('__deleted', 'false') == 'true'
user_id = value.get('id')
if deleted or op == 'd':
# 刪除
try:
es.delete(index="users", id=user_id)
print(f"Deleted user {user_id} from ES")
except Exception:
pass
else:
# 新增或更新
doc = {
"name": value.get("name"),
"email": value.get("email"),
"status": value.get("status"),
"created_at": value.get("created_at")
}
es.index(index="users", id=user_id, body=doc)
print(f"Indexed user {user_id}: {doc['name']}")
進階主題
處理 Schema 變更
資料庫的表結構會隨時間演進,Debezium 可以偵測並處理 schema 變更:
# 在 Connector 設定中加入 schema 歷史記錄
curl -X PUT http://localhost:8083/connectors/myapp-postgres-connector/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "cdc_user",
"database.password": "cdc_pass",
"database.dbname": "myapp",
"topic.prefix": "myapp",
"table.include.list": "public.users,public.orders",
"plugin.name": "pgoutput",
"key.converter.schemas.enable": true,
"value.converter.schemas.enable": true,
"include.schema.changes": true
}'
處理大量歷史資料(Snapshot)
首次啟動 Debezium 時,它會做一次完整的快照(snapshot):
{
"snapshot.mode": "initial",
"snapshot.fetch.size": 10000,
"snapshot.max.threads": 4
}
snapshot.mode 的選項:
initial:首次連線時做完整快照,之後只讀增量never:不做快照,只讀增量(適合已有資料同步的場景)always:每次重啟都做完整快照
監控 Debezium
# 查看 Connector 狀態
curl -s http://localhost:8083/connectors/myapp-postgres-connector/status
# 查看 Connector 的 lag(延遲)
curl -s http://localhost:8083/connectors/myapp-postgres-connector/status | \
python3 -c "import sys,json; d=json.load(sys.stdin); print(json.dumps(d['tasks'], indent=2))"
# 暫停 Connector
curl -X PUT http://localhost:8083/connectors/myapp-postgres-connector/pause
# 恢復 Connector
curl -X PUT http://localhost:8083/connectors/myapp-postgres-connector/resume
# 重啟 Connector
curl -X POST http://localhost:8083/connectors/myapp-postgres-connector/restart
# 刪除 Connector
curl -X DELETE http://localhost:8083/connectors/myapp-postgres-connector
錯誤處理策略
{
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.log.include.messages": true,
"errors.deadletterqueue.topic.name": "myapp.dlq",
"errors.deadletterqueue.topic.replication.factor": 1,
"errors.deadletterqueue.context.headers.enable": true
}
這個設定會把處理失敗的事件送到 Dead Letter Queue(DLQ),而不是讓整個 Connector 停止。
Debezium 以外的 CDC 方案
| 方案 | 支援的資料庫 | 是否需要 Kafka | 特點 |
|——|————-|—————|——|
| Debezium | PG, MySQL, MongoDB, SQL Server | 是 | 最成熟、功能最完整 |
| Maxwell | MySQL | 是 | 輕量、只支援 MySQL |
| pg_logical | PostgreSQL | 否 | PG 原生、簡單 |
| DynamoDB Streams | DynamoDB | 否 | AWS 原生 |
| MongoDB Change Streams | MongoDB | 否 | MongoDB 原生 |
如果你只需要簡單的 PostgreSQL CDC,不想搞 Kafka,可以用原生的 logical replication:
-- PostgreSQL 原生 logical replication
CREATE PUBLICATION my_pub FOR TABLE users, orders;
-- 在另一台 PG 建立 subscription
CREATE SUBSCRIPTION my_sub
CONNECTION 'host=source_host dbname=myapp user=cdc_user password=cdc_pass'
PUBLICATION my_pub;
小結
CDC 是一個在資料架構中經常被忽略但非常強大的模式。它解決了「如何在不侵入原始系統的情況下,即時獲取資料變更」這個核心問題。
Debezium + Kafka 的組合是目前最成熟的 CDC 方案,雖然初始架設需要一些功夫(畢竟要跑 Kafka、Zookeeper、Connect),但一旦架起來,它的可靠性和靈活性是非常值得的。
我的建議:
- 如果只是 PostgreSQL 之間的同步 → 用原生的 logical replication,最簡單
- 如果需要同步到異質系統(ES、Redis、資料倉儲) → Debezium + Kafka
- 如果你的組織已經有 Kafka → Debezium 是最自然的選擇
- 先從一兩張表開始 → 不要一次監聽所有表,逐步擴展
延伸閱讀
- Debezium 官方文件
- Kafka Connect 官方文件
- PostgreSQL Logical Replication
- The Log: What every software engineer should know about real-time data’s unifying abstraction (by Jay Kreps)
- Designing Data-Intensive Applications (by Martin Kleppmann) — 第 11 章