前言

資料庫 schema migration 是後端工程師的日常,但也是最容易出事的環節之一。在小型專案或開發環境裡,跑一個 ALTER TABLE 可能幾秒就搞定。但在生產環境——尤其是有千萬級資料的表——一個不小心就是服務中斷、資料遺失,甚至是凌晨三點被叫起來的 incident。

我經歷過幾次因為 migration 導致的線上事故,每次都學到一些血淚教訓。這篇文章把這些經驗整理成系統化的最佳實踐,涵蓋 migration 工具選擇、向前相容的 schema 設計、雙寫策略、以及回滾方案。

核心觀念只有一個:永遠假設 migration 會失敗,並為此做好準備

Schema Migration 工具

常見工具比較

工具            語言     特色
─────────────────────────────────────────────
Alembic         Python   SQLAlchemy 生態系,自動偵測 model 差異
Flyway          Java     版本化 SQL 檔案,支援多種 DB
Liquibase       Java     XML/YAML/JSON 定義 changeset
golang-migrate  Go       輕量級,pure SQL
Knex.js         Node.js  JavaScript schema builder
Django ORM      Python   Django 內建,自動生成 migration

Alembic 實戰設定

# 安裝
pip install alembic sqlalchemy psycopg2-binary

# 初始化 alembic init alembic

# alembic/env.py(關鍵設定)
from sqlalchemy import engine_from_config
from app.models import Base  # 你的 SQLAlchemy Base

# 設定 target_metadata,讓 Alembic 能自動偵測 model 變更 target_metadata = Base.metadata

def run_migrations_online(): connectable = engine_from_config( config.get_section(config.config_ini_section), prefix="sqlalchemy.", ) with connectable.connect() as connection: context.configure( connection=connection, target_metadata=target_metadata, # 重要:比較型別差異 compare_type=True, # 重要:比較伺服器預設值 compare_server_default=True, ) with context.begin_transaction(): context.run_migrations()

# 自動生成 migration(根據 model 變更)
alembic revision --autogenerate -m "add_user_email_column"

# 查看生成的 migration 檔案,確認內容正確後才執行 # 手動執行 migration alembic upgrade head

# 回滾一步 alembic downgrade -1

# 查看目前版本 alembic current

# 查看所有 migration 歷史 alembic history --verbose

Migration 檔案範例

"""add user email column

Revision ID: a1b2c3d4e5f6 Revises: 9z8y7x6w5v4u Create Date: 2024-06-15 10:30:00.000000 """ from alembic import op import sqlalchemy as sa

revision = 'a1b2c3d4e5f6' down_revision = '9z8y7x6w5v4u'

def upgrade(): # 1. 先加欄位,允許 NULL(不鎖表) op.add_column('users', sa.Column('email', sa.String(255), nullable=True))

# 2. 建立索引(CONCURRENTLY 避免鎖表) op.execute( "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_email ON users (email)" )

def downgrade(): op.drop_index('idx_users_email', table_name='users') op.drop_column('users', 'email')

向前相容設計

向前相容(forward compatibility)的意思是:新的 schema 必須能跟舊版本的應用程式碼相容。這是零停機部署的關鍵。

原則:Expand and Contract 模式

傳統做法(危險):
  部署新 code → 跑 migration → 禱告
  問題:migration 期間新舊 code 可能同時運行

Expand and Contract(安全): Phase 1: Expand → 擴展 schema(新增欄位/表,不刪除) Phase 2: Migrate → 部署新 code(同時支援新舊 schema) Phase 3: Contract → 清理舊 schema(刪除不需要的欄位/表)

每個 phase 之間可以有數天甚至數週的間隔

範例:重新命名欄位

-- 假設要把 users.name 改名為 users.full_name
-- 錯誤做法(會導致停機):
ALTER TABLE users RENAME COLUMN name TO full_name;
-- 舊 code 還在查 name,直接爆炸

-- 正確做法:三步走

-- Phase 1: Expand(加新欄位) ALTER TABLE users ADD COLUMN full_name VARCHAR(100);

-- 用觸發器同步舊欄位到新欄位 CREATE OR REPLACE FUNCTION sync_user_name() RETURNS TRIGGER AS $$ BEGIN IF TG_OP = 'INSERT' OR TG_OP = 'UPDATE' THEN IF NEW.full_name IS NULL AND NEW.name IS NOT NULL THEN NEW.full_name := NEW.name; ELSIF NEW.name IS NULL AND NEW.full_name IS NOT NULL THEN NEW.name := NEW.full_name; END IF; END IF; RETURN NEW; END; $$ LANGUAGE plpgsql;

CREATE TRIGGER trg_sync_user_name BEFORE INSERT OR UPDATE ON users FOR EACH ROW EXECUTE FUNCTION sync_user_name();

-- 回填既有資料 UPDATE users SET full_name = name WHERE full_name IS NULL; -- 大表要分批更新!見下面的範例

# Phase 2: Migrate(新 code 使用新欄位)
# 應用程式碼改為讀寫 full_name,但仍然相容 name

class User: @property def display_name(self): return self.full_name or self.name # 向後相容

-- Phase 3: Contract(等所有服務都更新後,清理舊欄位)
DROP TRIGGER trg_sync_user_name ON users;
DROP FUNCTION sync_user_name;
ALTER TABLE users DROP COLUMN name;

大表資料回填

# 大表更新千萬不要一次 UPDATE 全部,會鎖表很久
# 用批次更新

import time import psycopg2

def backfill_in_batches(conn_string, batch_size=5000, sleep_seconds=0.1): """分批更新大表資料,避免長時間鎖表。""" conn = psycopg2.connect(conn_string) conn.autocommit = True # 每批獨立 commit

total_updated = 0 while True: with conn.cursor() as cur: cur.execute(""" UPDATE users SET full_name = name WHERE id IN ( SELECT id FROM users WHERE full_name IS NULL AND name IS NOT NULL LIMIT %s FOR UPDATE SKIP LOCKED -- 跳過被鎖定的 row ) """, (batch_size,))

updated = cur.rowcount total_updated += updated print(f"已更新 {total_updated} 筆(本批 {updated} 筆)")

if updated == 0: break

time.sleep(sleep_seconds) # 讓出 CPU/IO 給正常查詢

conn.close() print(f"回填完成,共更新 {total_updated} 筆")

雙寫策略

當需要遷移到新的表結構或新的資料庫時,雙寫(dual write)是保證資料一致性的關鍵策略。

情境:拆分巨大的單體表

原始結構:
  orders 表(包含訂單 + 付款 + 物流 資訊,50 個欄位)

目標結構: orders 表(訂單基本資訊) payments 表(付款資訊) shipments 表(物流資訊)

# 雙寫實作(應用層)

class OrderService: def __init__(self, db, use_new_schema: bool = False): self.db = db self.use_new_schema = use_new_schema

def create_order(self, order_data): """建立訂單,同時寫入新舊表。""" with self.db.transaction() as tx: # 永遠寫入舊表(保底) order_id = tx.execute(""" INSERT INTO orders (customer_id, amount, payment_method, payment_status, shipping_address, tracking_no) VALUES (%(customer_id)s, %(amount)s, %(payment_method)s, %(payment_status)s, %(shipping_address)s, %(tracking_no)s) RETURNING id """, order_data)

# 同時寫入新表 tx.execute(""" INSERT INTO orders_v2 (id, customer_id, amount) VALUES (%(id)s, %(customer_id)s, %(amount)s) """, {"id": order_id, **order_data})

tx.execute(""" INSERT INTO payments (order_id, method, status) VALUES (%(order_id)s, %(method)s, %(status)s) """, { "order_id": order_id, "method": order_data["payment_method"], "status": order_data["payment_status"], })

tx.execute(""" INSERT INTO shipments (order_id, address, tracking_no) VALUES (%(order_id)s, %(address)s, %(tracking_no)s) """, { "order_id": order_id, "address": order_data["shipping_address"], "tracking_no": order_data["tracking_no"], })

return order_id

def get_order(self, order_id): """讀取訂單,根據 feature flag 決定讀新表或舊表。""" if self.use_new_schema: return self.db.query(""" SELECT o.*, p.method, p.status as payment_status, s.address, s.tracking_no FROM orders_v2 o LEFT JOIN payments p ON p.order_id = o.id LEFT JOIN shipments s ON s.order_id = o.id WHERE o.id = %s """, order_id) else: return self.db.query( "SELECT * FROM orders WHERE id = %s", order_id )

資料一致性驗證

# 定期比較新舊表的資料是否一致

def verify_consistency(db, sample_size=1000): """抽樣驗證新舊表資料一致性。""" # 隨機取樣 order_ids = db.query(""" SELECT id FROM orders ORDER BY RANDOM() LIMIT %s """, sample_size)

mismatches = [] for row in order_ids: oid = row["id"]

old_data = db.query("SELECT * FROM orders WHERE id = %s", oid) new_order = db.query("SELECT * FROM orders_v2 WHERE id = %s", oid) new_payment = db.query("SELECT * FROM payments WHERE order_id = %s", oid)

if old_data["amount"] != new_order["amount"]: mismatches.append({ "order_id": oid, "field": "amount", "old": old_data["amount"], "new": new_order["amount"], })

if old_data["payment_status"] != new_payment["status"]: mismatches.append({ "order_id": oid, "field": "payment_status", "old": old_data["payment_status"], "new": new_payment["status"], })

match_rate = (sample_size - len(mismatches)) / sample_size * 100 print(f"一致性: {match_rate:.2f}% ({len(mismatches)} 筆不一致)")

if mismatches: for m in mismatches[:10]: print(f" Order {m['order_id']}: {m['field']} " f"old={m['old']} new={m['new']}")

return mismatches

回滾方案

每個 Migration 都要有對應的 Downgrade

# alembic migration 範例:加入 NOT NULL 約束

def upgrade(): # Step 1: 設定預設值 op.execute("UPDATE orders SET status = 'unknown' WHERE status IS NULL") # Step 2: 加 NOT NULL 約束 op.alter_column('orders', 'status', existing_type=sa.String(20), nullable=False, server_default='pending')

def downgrade(): # 回滾:移除 NOT NULL 約束 op.alter_column('orders', 'status', existing_type=sa.String(20), nullable=True, server_default=None)

自動化回滾腳本

#!/bin/bash
# rollback_migration.sh — 一鍵回滾 migration

set -euo pipefail

ALEMBIC_CONFIG="alembic.ini" CURRENT=$(alembic -c "$ALEMBIC_CONFIG" current 2>/dev/null | head -1 | awk '{print $1}')

echo "目前版本: $CURRENT" echo "準備回滾到前一個版本..."

# 備份當前狀態 pg_dump -h localhost -U app_user -d myapp --schema-only > "schema_backup_${CURRENT}.sql" echo "Schema 備份完成: schema_backup_${CURRENT}.sql"

# 執行回滾 alembic -c "$ALEMBIC_CONFIG" downgrade -1

NEW_VERSION=$(alembic -c "$ALEMBIC_CONFIG" current 2>/dev/null | head -1 | awk '{print $1}') echo "回滾完成!新版本: $NEW_VERSION"

# 驗證 echo "驗證中..." python -c " from app.db import get_engine from sqlalchemy import inspect engine = get_engine() inspector = inspect(engine) tables = inspector.get_table_names() print(f'Tables: {len(tables)}') for t in sorted(tables): cols = [c[\"name\"] for c in inspector.get_columns(t)] print(f' {t}: {len(cols)} columns') "

危險操作的安全做法

加 NOT NULL 約束

-- 危險!如果表很大,這會鎖表很久
ALTER TABLE orders ALTER COLUMN status SET NOT NULL;

-- 安全做法(PostgreSQL 12+): -- 先加 CHECK 約束 NOT VALID(不驗證既有資料,不鎖表) ALTER TABLE orders ADD CONSTRAINT orders_status_not_null CHECK (status IS NOT NULL) NOT VALID;

-- 然後驗證約束(會掃描全表但不鎖 DML) ALTER TABLE orders VALIDATE CONSTRAINT orders_status_not_null;

-- 最後才設 NOT NULL(PostgreSQL 會直接參考 CHECK 約束,瞬間完成) ALTER TABLE orders ALTER COLUMN status SET NOT NULL;

-- 清理 CHECK 約束(已經不需要了) ALTER TABLE orders DROP CONSTRAINT orders_status_not_null;

刪除欄位

-- 不要直接刪!先確認沒有 code 在用

-- Step 1:先把欄位標記為 deprecated(加註解) COMMENT ON COLUMN orders.legacy_field IS 'DEPRECATED: 請使用 new_field,預計 2024-08-01 刪除';

-- Step 2:在 code 中停止讀寫這個欄位(部署上線,觀察一段時間)

-- Step 3:確認沒有查詢在讀這個欄位 SELECT * FROM pg_stat_user_tables WHERE relname = 'orders'; -- 搭配 pg_stat_statements 確認沒有查詢引用 legacy_field

-- Step 4:最後才刪除 ALTER TABLE orders DROP COLUMN legacy_field;

修改欄位型別

-- 危險!大表改型別會重寫整張表
ALTER TABLE orders ALTER COLUMN amount TYPE NUMERIC(12,2);

-- 安全做法:用新欄位 + 雙寫 -- Step 1: 加新欄位 ALTER TABLE orders ADD COLUMN amount_v2 NUMERIC(12,2);

-- Step 2: 觸發器同步 CREATE OR REPLACE FUNCTION sync_amount() RETURNS TRIGGER AS $$ BEGIN NEW.amount_v2 := NEW.amount::NUMERIC(12,2); RETURN NEW; END; $$ LANGUAGE plpgsql;

CREATE TRIGGER trg_sync_amount BEFORE INSERT OR UPDATE ON orders FOR EACH ROW EXECUTE FUNCTION sync_amount();

-- Step 3: 回填 -- (用前面的 backfill_in_batches 腳本)

-- Step 4: 切換 code 讀新欄位 -- Step 5: 刪除舊欄位和觸發器

Migration 檢查清單

在每次 migration 上線前,我會跑這份清單:

# migration_checklist.py
"""
Migration 上線前檢查清單
"""

CHECKLIST = [ "[ ] migration 有對應的 downgrade", "[ ] 在本地測試過 upgrade + downgrade", "[ ] 在 staging 環境測試過", "[ ] 估算過大表操作的執行時間", "[ ] 確認不會長時間鎖表(用 CONCURRENTLY 或分批操作)", "[ ] 新 schema 與舊版本 code 相容", "[ ] 有資料備份(或確認可以從 replica 恢復)", "[ ] 準備好回滾方案和指令", "[ ] 通知了相關團隊(SRE、其他依賴此表的服務)", "[ ] 選擇了低流量時段執行", ]

for item in CHECKLIST: print(item)

小結

資料庫 migration 的核心心得:

  1. 永遠向前相容:新 schema 必須和舊 code 共存
  2. 小步快跑:一次 migration 只做一件事,不要打包
  3. 大表操作要分批:任何涉及全表掃描/鎖定的操作都要拆開
  4. 先 expand 後 contract:先加新的,確認沒問題再刪舊的
  5. 每個 migration 都要能回滾:沒有 downgrade 的 migration 不能上線
  6. 在 staging 先跑一次:用接近生產環境的資料量測試

延伸閱讀建議: