前言
資料庫 schema migration 是後端工程師的日常,但也是最容易出事的環節之一。在小型專案或開發環境裡,跑一個 ALTER TABLE 可能幾秒就搞定。但在生產環境——尤其是有千萬級資料的表——一個不小心就是服務中斷、資料遺失,甚至是凌晨三點被叫起來的 incident。
我經歷過幾次因為 migration 導致的線上事故,每次都學到一些血淚教訓。這篇文章把這些經驗整理成系統化的最佳實踐,涵蓋 migration 工具選擇、向前相容的 schema 設計、雙寫策略、以及回滾方案。
核心觀念只有一個:永遠假設 migration 會失敗,並為此做好準備。
Schema Migration 工具
常見工具比較
工具 語言 特色
─────────────────────────────────────────────
Alembic Python SQLAlchemy 生態系,自動偵測 model 差異
Flyway Java 版本化 SQL 檔案,支援多種 DB
Liquibase Java XML/YAML/JSON 定義 changeset
golang-migrate Go 輕量級,pure SQL
Knex.js Node.js JavaScript schema builder
Django ORM Python Django 內建,自動生成 migration
Alembic 實戰設定
# 安裝
pip install alembic sqlalchemy psycopg2-binary
# 初始化
alembic init alembic
# alembic/env.py(關鍵設定)
from sqlalchemy import engine_from_config
from app.models import Base # 你的 SQLAlchemy Base
# 設定 target_metadata,讓 Alembic 能自動偵測 model 變更
target_metadata = Base.metadata
def run_migrations_online():
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
# 重要:比較型別差異
compare_type=True,
# 重要:比較伺服器預設值
compare_server_default=True,
)
with context.begin_transaction():
context.run_migrations()
# 自動生成 migration(根據 model 變更)
alembic revision --autogenerate -m "add_user_email_column"
# 查看生成的 migration 檔案,確認內容正確後才執行
# 手動執行 migration
alembic upgrade head
# 回滾一步
alembic downgrade -1
# 查看目前版本
alembic current
# 查看所有 migration 歷史
alembic history --verbose
Migration 檔案範例
"""add user email column
Revision ID: a1b2c3d4e5f6
Revises: 9z8y7x6w5v4u
Create Date: 2024-06-15 10:30:00.000000
"""
from alembic import op
import sqlalchemy as sa
revision = 'a1b2c3d4e5f6'
down_revision = '9z8y7x6w5v4u'
def upgrade():
# 1. 先加欄位,允許 NULL(不鎖表)
op.add_column('users', sa.Column('email', sa.String(255), nullable=True))
# 2. 建立索引(CONCURRENTLY 避免鎖表)
op.execute(
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_email ON users (email)"
)
def downgrade():
op.drop_index('idx_users_email', table_name='users')
op.drop_column('users', 'email')
向前相容設計
向前相容(forward compatibility)的意思是:新的 schema 必須能跟舊版本的應用程式碼相容。這是零停機部署的關鍵。
原則:Expand and Contract 模式
傳統做法(危險):
部署新 code → 跑 migration → 禱告
問題:migration 期間新舊 code 可能同時運行
Expand and Contract(安全):
Phase 1: Expand → 擴展 schema(新增欄位/表,不刪除)
Phase 2: Migrate → 部署新 code(同時支援新舊 schema)
Phase 3: Contract → 清理舊 schema(刪除不需要的欄位/表)
每個 phase 之間可以有數天甚至數週的間隔
範例:重新命名欄位
-- 假設要把 users.name 改名為 users.full_name
-- 錯誤做法(會導致停機):
ALTER TABLE users RENAME COLUMN name TO full_name;
-- 舊 code 還在查 name,直接爆炸
-- 正確做法:三步走
-- Phase 1: Expand(加新欄位)
ALTER TABLE users ADD COLUMN full_name VARCHAR(100);
-- 用觸發器同步舊欄位到新欄位
CREATE OR REPLACE FUNCTION sync_user_name() RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' OR TG_OP = 'UPDATE' THEN
IF NEW.full_name IS NULL AND NEW.name IS NOT NULL THEN
NEW.full_name := NEW.name;
ELSIF NEW.name IS NULL AND NEW.full_name IS NOT NULL THEN
NEW.name := NEW.full_name;
END IF;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_sync_user_name
BEFORE INSERT OR UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION sync_user_name();
-- 回填既有資料
UPDATE users SET full_name = name WHERE full_name IS NULL;
-- 大表要分批更新!見下面的範例
# Phase 2: Migrate(新 code 使用新欄位)
# 應用程式碼改為讀寫 full_name,但仍然相容 name
class User:
@property
def display_name(self):
return self.full_name or self.name # 向後相容
-- Phase 3: Contract(等所有服務都更新後,清理舊欄位)
DROP TRIGGER trg_sync_user_name ON users;
DROP FUNCTION sync_user_name;
ALTER TABLE users DROP COLUMN name;
大表資料回填
# 大表更新千萬不要一次 UPDATE 全部,會鎖表很久
# 用批次更新
import time
import psycopg2
def backfill_in_batches(conn_string, batch_size=5000, sleep_seconds=0.1):
"""分批更新大表資料,避免長時間鎖表。"""
conn = psycopg2.connect(conn_string)
conn.autocommit = True # 每批獨立 commit
total_updated = 0
while True:
with conn.cursor() as cur:
cur.execute("""
UPDATE users
SET full_name = name
WHERE id IN (
SELECT id FROM users
WHERE full_name IS NULL AND name IS NOT NULL
LIMIT %s
FOR UPDATE SKIP LOCKED -- 跳過被鎖定的 row
)
""", (batch_size,))
updated = cur.rowcount
total_updated += updated
print(f"已更新 {total_updated} 筆(本批 {updated} 筆)")
if updated == 0:
break
time.sleep(sleep_seconds) # 讓出 CPU/IO 給正常查詢
conn.close()
print(f"回填完成,共更新 {total_updated} 筆")
雙寫策略
當需要遷移到新的表結構或新的資料庫時,雙寫(dual write)是保證資料一致性的關鍵策略。
情境:拆分巨大的單體表
原始結構:
orders 表(包含訂單 + 付款 + 物流 資訊,50 個欄位)
目標結構:
orders 表(訂單基本資訊)
payments 表(付款資訊)
shipments 表(物流資訊)
# 雙寫實作(應用層)
class OrderService:
def __init__(self, db, use_new_schema: bool = False):
self.db = db
self.use_new_schema = use_new_schema
def create_order(self, order_data):
"""建立訂單,同時寫入新舊表。"""
with self.db.transaction() as tx:
# 永遠寫入舊表(保底)
order_id = tx.execute("""
INSERT INTO orders (customer_id, amount, payment_method,
payment_status, shipping_address, tracking_no)
VALUES (%(customer_id)s, %(amount)s, %(payment_method)s,
%(payment_status)s, %(shipping_address)s, %(tracking_no)s)
RETURNING id
""", order_data)
# 同時寫入新表
tx.execute("""
INSERT INTO orders_v2 (id, customer_id, amount)
VALUES (%(id)s, %(customer_id)s, %(amount)s)
""", {"id": order_id, **order_data})
tx.execute("""
INSERT INTO payments (order_id, method, status)
VALUES (%(order_id)s, %(method)s, %(status)s)
""", {
"order_id": order_id,
"method": order_data["payment_method"],
"status": order_data["payment_status"],
})
tx.execute("""
INSERT INTO shipments (order_id, address, tracking_no)
VALUES (%(order_id)s, %(address)s, %(tracking_no)s)
""", {
"order_id": order_id,
"address": order_data["shipping_address"],
"tracking_no": order_data["tracking_no"],
})
return order_id
def get_order(self, order_id):
"""讀取訂單,根據 feature flag 決定讀新表或舊表。"""
if self.use_new_schema:
return self.db.query("""
SELECT o.*, p.method, p.status as payment_status,
s.address, s.tracking_no
FROM orders_v2 o
LEFT JOIN payments p ON p.order_id = o.id
LEFT JOIN shipments s ON s.order_id = o.id
WHERE o.id = %s
""", order_id)
else:
return self.db.query(
"SELECT * FROM orders WHERE id = %s", order_id
)
資料一致性驗證
# 定期比較新舊表的資料是否一致
def verify_consistency(db, sample_size=1000):
"""抽樣驗證新舊表資料一致性。"""
# 隨機取樣
order_ids = db.query("""
SELECT id FROM orders
ORDER BY RANDOM()
LIMIT %s
""", sample_size)
mismatches = []
for row in order_ids:
oid = row["id"]
old_data = db.query("SELECT * FROM orders WHERE id = %s", oid)
new_order = db.query("SELECT * FROM orders_v2 WHERE id = %s", oid)
new_payment = db.query("SELECT * FROM payments WHERE order_id = %s", oid)
if old_data["amount"] != new_order["amount"]:
mismatches.append({
"order_id": oid, "field": "amount",
"old": old_data["amount"], "new": new_order["amount"],
})
if old_data["payment_status"] != new_payment["status"]:
mismatches.append({
"order_id": oid, "field": "payment_status",
"old": old_data["payment_status"], "new": new_payment["status"],
})
match_rate = (sample_size - len(mismatches)) / sample_size * 100
print(f"一致性: {match_rate:.2f}% ({len(mismatches)} 筆不一致)")
if mismatches:
for m in mismatches[:10]:
print(f" Order {m['order_id']}: {m['field']} "
f"old={m['old']} new={m['new']}")
return mismatches
回滾方案
每個 Migration 都要有對應的 Downgrade
# alembic migration 範例:加入 NOT NULL 約束
def upgrade():
# Step 1: 設定預設值
op.execute("UPDATE orders SET status = 'unknown' WHERE status IS NULL")
# Step 2: 加 NOT NULL 約束
op.alter_column('orders', 'status',
existing_type=sa.String(20),
nullable=False,
server_default='pending')
def downgrade():
# 回滾:移除 NOT NULL 約束
op.alter_column('orders', 'status',
existing_type=sa.String(20),
nullable=True,
server_default=None)
自動化回滾腳本
#!/bin/bash
# rollback_migration.sh — 一鍵回滾 migration
set -euo pipefail
ALEMBIC_CONFIG="alembic.ini"
CURRENT=$(alembic -c "$ALEMBIC_CONFIG" current 2>/dev/null | head -1 | awk '{print $1}')
echo "目前版本: $CURRENT"
echo "準備回滾到前一個版本..."
# 備份當前狀態
pg_dump -h localhost -U app_user -d myapp --schema-only > "schema_backup_${CURRENT}.sql"
echo "Schema 備份完成: schema_backup_${CURRENT}.sql"
# 執行回滾
alembic -c "$ALEMBIC_CONFIG" downgrade -1
NEW_VERSION=$(alembic -c "$ALEMBIC_CONFIG" current 2>/dev/null | head -1 | awk '{print $1}')
echo "回滾完成!新版本: $NEW_VERSION"
# 驗證
echo "驗證中..."
python -c "
from app.db import get_engine
from sqlalchemy import inspect
engine = get_engine()
inspector = inspect(engine)
tables = inspector.get_table_names()
print(f'Tables: {len(tables)}')
for t in sorted(tables):
cols = [c[\"name\"] for c in inspector.get_columns(t)]
print(f' {t}: {len(cols)} columns')
"
危險操作的安全做法
加 NOT NULL 約束
-- 危險!如果表很大,這會鎖表很久
ALTER TABLE orders ALTER COLUMN status SET NOT NULL;
-- 安全做法(PostgreSQL 12+):
-- 先加 CHECK 約束 NOT VALID(不驗證既有資料,不鎖表)
ALTER TABLE orders ADD CONSTRAINT orders_status_not_null
CHECK (status IS NOT NULL) NOT VALID;
-- 然後驗證約束(會掃描全表但不鎖 DML)
ALTER TABLE orders VALIDATE CONSTRAINT orders_status_not_null;
-- 最後才設 NOT NULL(PostgreSQL 會直接參考 CHECK 約束,瞬間完成)
ALTER TABLE orders ALTER COLUMN status SET NOT NULL;
-- 清理 CHECK 約束(已經不需要了)
ALTER TABLE orders DROP CONSTRAINT orders_status_not_null;
刪除欄位
-- 不要直接刪!先確認沒有 code 在用
-- Step 1:先把欄位標記為 deprecated(加註解)
COMMENT ON COLUMN orders.legacy_field IS 'DEPRECATED: 請使用 new_field,預計 2024-08-01 刪除';
-- Step 2:在 code 中停止讀寫這個欄位(部署上線,觀察一段時間)
-- Step 3:確認沒有查詢在讀這個欄位
SELECT * FROM pg_stat_user_tables WHERE relname = 'orders';
-- 搭配 pg_stat_statements 確認沒有查詢引用 legacy_field
-- Step 4:最後才刪除
ALTER TABLE orders DROP COLUMN legacy_field;
修改欄位型別
-- 危險!大表改型別會重寫整張表
ALTER TABLE orders ALTER COLUMN amount TYPE NUMERIC(12,2);
-- 安全做法:用新欄位 + 雙寫
-- Step 1: 加新欄位
ALTER TABLE orders ADD COLUMN amount_v2 NUMERIC(12,2);
-- Step 2: 觸發器同步
CREATE OR REPLACE FUNCTION sync_amount() RETURNS TRIGGER AS $$
BEGIN
NEW.amount_v2 := NEW.amount::NUMERIC(12,2);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_sync_amount
BEFORE INSERT OR UPDATE ON orders
FOR EACH ROW EXECUTE FUNCTION sync_amount();
-- Step 3: 回填
-- (用前面的 backfill_in_batches 腳本)
-- Step 4: 切換 code 讀新欄位
-- Step 5: 刪除舊欄位和觸發器
Migration 檢查清單
在每次 migration 上線前,我會跑這份清單:
# migration_checklist.py
"""
Migration 上線前檢查清單
"""
CHECKLIST = [
"[ ] migration 有對應的 downgrade",
"[ ] 在本地測試過 upgrade + downgrade",
"[ ] 在 staging 環境測試過",
"[ ] 估算過大表操作的執行時間",
"[ ] 確認不會長時間鎖表(用 CONCURRENTLY 或分批操作)",
"[ ] 新 schema 與舊版本 code 相容",
"[ ] 有資料備份(或確認可以從 replica 恢復)",
"[ ] 準備好回滾方案和指令",
"[ ] 通知了相關團隊(SRE、其他依賴此表的服務)",
"[ ] 選擇了低流量時段執行",
]
for item in CHECKLIST:
print(item)
小結
資料庫 migration 的核心心得:
- 永遠向前相容:新 schema 必須和舊 code 共存
- 小步快跑:一次 migration 只做一件事,不要打包
- 大表操作要分批:任何涉及全表掃描/鎖定的操作都要拆開
- 先 expand 後 contract:先加新的,確認沒問題再刪舊的
- 每個 migration 都要能回滾:沒有 downgrade 的 migration 不能上線
- 在 staging 先跑一次:用接近生產環境的資料量測試
延伸閱讀建議:
- Alembic 官方文件
- Strong Migrations (Rails) — Rails 的安全 migration 檢查器,概念通用
- PostgreSQL Lock Monitoring — 監控鎖等待
- gh-ost — GitHub 開發的 MySQL 線上 schema migration 工具