前言
提到 Redis,大部分人的第一反應是「快取」。沒錯,Redis 作為快取確實是它最經典的用途——把熱門資料放到記憶體裡,讓讀取速度從毫秒降到微秒。但如果你只把 Redis 當快取用,那真的太可惜了。
Redis 是一個多功能的資料結構伺服器。除了 String、Hash、List、Set、Sorted Set 這些基本資料結構之外,它還提供了 Pub/Sub(發布訂閱)、Stream(事件流)、Lua Script(原子操作腳本)等進階功能。這些功能在實際的後端架構中非常實用,而且往往比你另外引入一個 Kafka 或 RabbitMQ 更輕量。
這篇文章會深入介紹 Redis 的這三個進階功能,每個都有完整的實戰範例。我在自己的專案中大量使用這些功能,用 Pub/Sub 做即時通知、用 Stream 做事件溯源、用 Lua Script 做分散式限流,效果都很好。
Redis 資料結構複習
在進入進階功能之前,先快速複習幾個常用但容易被忽略的資料結構用法:
# Sorted Set — 非常強大的排行榜/排程系統
# 新增分數(用 Unix timestamp 作為 score,可以當延遲隊列用)
redis-cli ZADD delayed_tasks 1719849600 "task:send_email:123"
redis-cli ZADD delayed_tasks 1719853200 "task:send_sms:456"
# 取得到期的任務(score <= 當前時間)
redis-cli ZRANGEBYSCORE delayed_tasks -inf 1719849600
# 取得排行榜前 10 名
redis-cli ZREVRANGE leaderboard 0 9 WITHSCORES
# HyperLogLog — 用極少記憶體做 UV(獨立訪客)統計
import redis
r = redis.Redis()
# 每個頁面瀏覽記錄 user_id
r.pfadd("uv:2024-06-15", "user:123", "user:456", "user:789")
r.pfadd("uv:2024-06-15", "user:123") # 重複不會多算
# 取得獨立訪客數(近似值,誤差約 0.81%)
uv_count = r.pfcount("uv:2024-06-15")
print(f"UV: {uv_count}") # 3
# 合併多天的 UV
r.pfmerge("uv:2024-06-week", "uv:2024-06-15", "uv:2024-06-16")
weekly_uv = r.pfcount("uv:2024-06-week")
# 每個 key 只佔 12KB 記憶體,不管有多少 unique elements!
Pub/Sub 即時通訊
基本概念
Pub/Sub(Publish/Subscribe)是一種訊息傳遞模式:發布者(Publisher)把訊息發到頻道(Channel),所有訂閱該頻道的訂閱者(Subscriber)都會收到訊息。
Publisher A ─→ Channel: "notifications" ─→ Subscriber 1
Publisher B ─↗ ─→ Subscriber 2
─→ Subscriber 3
基本用法
# Terminal 1: 訂閱者
redis-cli SUBSCRIBE notifications
# 等待訊息...
# Terminal 2: 發布者
redis-cli PUBLISH notifications '{"type":"new_order","order_id":123}'
# (integer) 1 ← 表示有 1 個訂閱者收到了
# Terminal 1 會顯示:
# 1) "message"
# 2) "notifications"
# 3) "{\"type\":\"new_order\",\"order_id\":123}"
# 模式訂閱(支援萬用字元)
redis-cli PSUBSCRIBE "events:*"
# 會收到 events:order、events:user、events:payment 等所有頻道的訊息
實戰:即時通知系統
import redis
import json
import threading
import time
class NotificationService:
"""基於 Redis Pub/Sub 的即時通知系統。"""
def __init__(self, redis_url="redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
self.pubsub = self.redis.pubsub()
self._handlers = {}
def subscribe(self, channel: str, handler):
"""訂閱頻道並註冊處理函式。"""
self._handlers[channel] = handler
self.pubsub.subscribe(**{channel: self._message_handler})
def _message_handler(self, message):
"""內部訊息路由。"""
if message["type"] != "message":
return
channel = message["channel"]
if isinstance(channel, bytes):
channel = channel.decode()
data = json.loads(message["data"])
handler = self._handlers.get(channel)
if handler:
handler(data)
def publish(self, channel: str, data: dict):
"""發布訊息到頻道。"""
self.redis.publish(channel, json.dumps(data, ensure_ascii=False))
def start_listening(self):
"""在背景線程中開始監聽。"""
thread = threading.Thread(target=self._listen_loop, daemon=True)
thread.start()
return thread
def _listen_loop(self):
"""監聽迴圈。"""
for message in self.pubsub.listen():
pass # subscribe() 中註冊的 callback 會自動被呼叫
# === 使用範例 ===
# 處理新訂單通知
def on_new_order(data):
print(f"新訂單!ID: {data['order_id']}, 金額: {data['amount']}")
# 發送 push notification、更新 dashboard 等
# 處理庫存警告
def on_low_stock(data):
print(f"庫存警告!產品: {data['product_id']}, 剩餘: {data['quantity']}")
# 設定訂閱
service = NotificationService()
service.subscribe("orders:new", on_new_order)
service.subscribe("inventory:low_stock", on_low_stock)
service.start_listening()
# 在其他服務中發布訊息
service.publish("orders:new", {"order_id": 123, "amount": 1500})
service.publish("inventory:low_stock", {"product_id": "SKU-001", "quantity": 3})
Pub/Sub 的限制
Pub/Sub 的重要限制:
- 訊息不持久化
- 如果沒有訂閱者在線,訊息就丟了
- 訂閱者斷線重連期間的訊息也會丟失
- 沒有 ACK 機制
- 無法確認訂閱者是否成功處理了訊息
- 沒有重試機制
- 不支援消費者群組
- 多個訂閱者會收到重複的訊息(廣播模式)
- 無法做負載均衡
如果需要可靠的訊息傳遞 → 用 Redis Stream
如果需要持久化和重放 → 用 Redis Stream 或 Kafka
Redis Stream 事件流
基本概念
Redis Stream 是 Redis 5.0 引入的資料結構,類似一個 append-only log。它解決了 Pub/Sub 的所有限制:訊息持久化、消費者群組、ACK 機制。
Producer → Stream (有序的訊息列表) → Consumer Group A → Consumer 1
→ Consumer 2
→ Consumer Group B → Consumer 3
基本操作
# 寫入訊息到 Stream
redis-cli XADD events '*' type order_created order_id 123 amount 1500
# 回傳訊息 ID:"1719849600000-0"(毫秒時間戳-序號)
redis-cli XADD events '*' type payment_received order_id 123 method credit_card
redis-cli XADD events '*' type order_shipped order_id 123 tracking AB123
# 讀取最新的 5 筆訊息
redis-cli XREVRANGE events + - COUNT 5
# 讀取特定時間範圍的訊息
redis-cli XRANGE events 1719849600000 1719853200000
# 查看 Stream 資訊
redis-cli XINFO STREAM events
# length: 3
# first-entry: ...
# last-entry: ...
# 建立消費者群組
redis-cli XGROUP CREATE events order-processors $ MKSTREAM
# $ 表示只處理新訊息
# 0 表示從頭開始處理
# 消費者讀取訊息
redis-cli XREADGROUP GROUP order-processors consumer-1 COUNT 1 BLOCK 5000 STREAMS events >
# > 表示讀取尚未分配給此消費者的新訊息
# ACK 確認訊息已處理
redis-cli XACK events order-processors 1719849600000-0
實戰:事件驅動架構
import redis
import json
import time
import signal
import sys
from typing import Callable, Dict
class EventStream:
"""基於 Redis Stream 的事件驅動系統。"""
def __init__(self, redis_url="redis://localhost:6379"):
self.redis = redis.from_url(redis_url, decode_responses=True)
self._running = False
def publish(self, stream: str, event_type: str, data: dict):
"""發布事件到 Stream。"""
entry = {"type": event_type, "data": json.dumps(data, ensure_ascii=False)}
msg_id = self.redis.xadd(stream, entry)
return msg_id
def create_consumer_group(self, stream: str, group: str, start_id="0"):
"""建立消費者群組。"""
try:
self.redis.xgroup_create(stream, group, start_id, mkstream=True)
print(f"消費者群組 '{group}' 已建立")
except redis.exceptions.ResponseError as e:
if "BUSYGROUP" in str(e):
print(f"消費者群組 '{group}' 已存在")
else:
raise
def consume(
self,
stream: str,
group: str,
consumer: str,
handlers: Dict[str, Callable],
batch_size: int = 10,
block_ms: int = 5000,
):
"""消費並處理事件。"""
self._running = True
# 優雅關閉
def signal_handler(sig, frame):
print(f"\n收到 {sig} 信號,準備關閉...")
self._running = False
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
print(f"消費者 '{consumer}' 開始監聽 '{stream}'...")
# 先處理 pending 的訊息(之前領取但未 ACK 的)
self._process_pending(stream, group, consumer, handlers)
# 然後處理新訊息
while self._running:
try:
results = self.redis.xreadgroup(
groupname=group,
consumername=consumer,
streams={stream: ">"},
count=batch_size,
block=block_ms,
)
if not results:
continue
for stream_name, messages in results:
for msg_id, fields in messages:
self._handle_message(
stream_name, group, consumer, msg_id, fields, handlers
)
except redis.exceptions.ConnectionError:
print("Redis 連線中斷,5 秒後重試...")
time.sleep(5)
print(f"消費者 '{consumer}' 已關閉")
def _handle_message(self, stream, group, consumer, msg_id, fields, handlers):
"""處理單一訊息。"""
event_type = fields.get("type", "unknown")
handler = handlers.get(event_type)
if handler is None:
print(f"未知事件類型: {event_type},跳過")
self.redis.xack(stream, group, msg_id)
return
try:
data = json.loads(fields.get("data", "{}"))
handler(data)
# 處理成功,ACK
self.redis.xack(stream, group, msg_id)
except Exception as e:
print(f"處理事件 {msg_id} 失敗: {e}")
# 不 ACK,訊息會留在 pending 列表中等待重試
def _process_pending(self, stream, group, consumer, handlers):
"""處理 pending 的訊息(之前失敗未 ACK 的)。"""
while True:
pending = self.redis.xreadgroup(
groupname=group,
consumername=consumer,
streams={stream: "0"}, # 0 表示讀取 pending 的訊息
count=10,
)
if not pending or not pending[0][1]:
break
for stream_name, messages in pending:
for msg_id, fields in messages:
if not fields: # 已經被其他消費者處理
continue
print(f"重新處理 pending 訊息: {msg_id}")
self._handle_message(
stream_name, group, consumer, msg_id, fields, handlers
)
def get_stream_info(self, stream: str) -> dict:
"""取得 Stream 的統計資訊。"""
info = self.redis.xinfo_stream(stream)
groups = self.redis.xinfo_groups(stream)
return {
"length": info["length"],
"first_entry": info.get("first-entry"),
"last_entry": info.get("last-entry"),
"groups": [
{
"name": g["name"],
"consumers": g["consumers"],
"pending": g["pending"],
"last_delivered_id": g["last-delivered-id"],
}
for g in groups
],
}
# === 使用範例 ===
stream = EventStream()
# 設定消費者群組
stream.create_consumer_group("events", "order-service")
stream.create_consumer_group("events", "notification-service")
# 定義事件處理器
def handle_order_created(data):
print(f"[訂單服務] 新訂單: {data['order_id']}, 金額: {data['amount']}")
# 建立出貨單、扣庫存等
def handle_payment_received(data):
print(f"[訂單服務] 收到付款: 訂單 {data['order_id']}")
# 更新訂單狀態
def handle_order_notification(data):
print(f"[通知服務] 發送訂單確認信給客戶")
# 寄 email、push notification
# 發布事件
stream.publish("events", "order_created", {"order_id": 1001, "amount": 2500})
stream.publish("events", "payment_received", {"order_id": 1001, "method": "credit_card"})
# 啟動消費者(通常在不同的 process/container 中)
# 訂單服務的消費者
stream.consume("events", "order-service", "worker-1", {
"order_created": handle_order_created,
"payment_received": handle_payment_received,
})
Stream 的清理策略
# Stream 會無限增長,需要定期清理
# 方法一:限制 Stream 長度(MAXLEN)
redis-cli XADD events MAXLEN ~ 100000 '*' type test data hello
# ~ 表示近似截斷(效能更好),實際保留可能略多於 100000
# 方法二:按時間清理(MINID)
# 刪除 ID 小於指定值的訊息
redis-cli XTRIM events MINID ~ 1719849600000-0
# 方法三:程式化清理
def cleanup_stream(r, stream, max_age_hours=24):
"""清理超過指定時間的 Stream 訊息。"""
cutoff_ms = int((time.time() - max_age_hours 3600) 1000)
cutoff_id = f"{cutoff_ms}-0"
# 先檢查要清理多少
info = r.xinfo_stream(stream)
before_len = info["length"]
# 執行清理
r.xtrim(stream, minid=cutoff_id)
after_len = r.xlen(stream)
print(f"清理完成: {before_len} → {after_len}(刪除 {before_len - after_len} 筆)")
Lua Script 原子操作
為什麼需要 Lua Script
Redis 的單一命令是原子的,但多個命令的組合不是。當你需要「讀取 → 判斷 → 寫入」這種 read-modify-write 操作時,就需要 Lua Script 來保證原子性。
# 不安全的做法(race condition):
balance = r.get("balance:user:123") # 讀取:1000
if int(balance) >= 100:
r.decrby("balance:user:123", 100) # 扣款
# 問題:兩個請求同時讀到 1000,各扣 100,結果只扣了一次
# 安全的做法:用 Lua Script
基本用法
import redis
r = redis.Redis()
# 定義 Lua Script
# KEYS[] 是 Redis key 參數
# ARGV[] 是其他參數
# 在 Redis 伺服器端原子執行
# 範例:安全扣款
DEDUCT_SCRIPT = """
local balance = tonumber(redis.call('GET', KEYS[1]))
if balance == nil then
return -1 -- 帳戶不存在
end
if balance < tonumber(ARGV[1]) then
return -2 -- 餘額不足
end
redis.call('DECRBY', KEYS[1], ARGV[1])
return balance - tonumber(ARGV[1])
"""
# 註冊腳本(只傳輸一次,之後用 SHA 呼叫)
deduct = r.register_script(DEDUCT_SCRIPT)
# 設定初始餘額
r.set("balance:user:123", 1000)
# 安全扣款
result = deduct(keys=["balance:user:123"], args=[100])
print(f"扣款後餘額: {result}") # 900
result = deduct(keys=["balance:user:123"], args=[2000])
print(f"餘額不足: {result}") # -2
實戰:分散式限流器(Sliding Window)
# 滑動視窗限流器 — 比固定視窗更精確
RATE_LIMITER_SCRIPT = """
local key = KEYS[1]
local now = tonumber(ARGV[1]) -- 當前時間(毫秒)
local window = tonumber(ARGV[2]) -- 時間視窗(毫秒)
local limit = tonumber(ARGV[3]) -- 最大請求數
-- 清除過期的記錄
redis.call('ZREMRANGEBYSCORE', key, '-inf', now - window)
-- 計算當前視窗內的請求數
local count = redis.call('ZCARD', key)
if count < limit then
-- 允許請求,記錄時間戳
redis.call('ZADD', key, now, now .. ':' .. math.random(1000000))
redis.call('EXPIRE', key, math.ceil(window / 1000))
return 1 -- 允許
else
return 0 -- 拒絕
end
"""
class SlidingWindowRateLimiter:
def __init__(self, redis_client):
self.redis = redis_client
self._script = self.redis.register_script(RATE_LIMITER_SCRIPT)
def is_allowed(self, key: str, limit: int, window_seconds: int) -> bool:
"""檢查請求是否允許。"""
now_ms = int(time.time() * 1000)
window_ms = window_seconds * 1000
result = self._script(
keys=[f"ratelimit:{key}"],
args=[now_ms, window_ms, limit],
)
return result == 1
# 使用範例
limiter = SlidingWindowRateLimiter(r)
# API 限流:每個用戶每分鐘最多 60 次
user_id = "user:123"
for i in range(65):
allowed = limiter.is_allowed(f"api:{user_id}", limit=60, window_seconds=60)
if not allowed:
print(f"第 {i+1} 次請求被拒絕(限流)")
break
實戰:分散式鎖
# 分散式鎖的正確實作(帶自動過期和安全釋放)
ACQUIRE_LOCK_SCRIPT = """
local key = KEYS[1]
local token = ARGV[1]
local ttl = tonumber(ARGV[2])
if redis.call('SET', key, token, 'NX', 'PX', ttl) then
return 1
else
return 0
end
"""
RELEASE_LOCK_SCRIPT = """
local key = KEYS[1]
local token = ARGV[1]
-- 只有鎖的持有者才能釋放(比較 token)
if redis.call('GET', key) == token then
redis.call('DEL', key)
return 1
else
return 0 -- 鎖已經被別人取得或已過期
end
"""
EXTEND_LOCK_SCRIPT = """
local key = KEYS[1]
local token = ARGV[1]
local ttl = tonumber(ARGV[2])
if redis.call('GET', key) == token then
redis.call('PEXPIRE', key, ttl)
return 1
else
return 0
end
"""
import uuid
import time
class DistributedLock:
def __init__(self, redis_client, name: str, ttl_ms: int = 10000):
self.redis = redis_client
self.name = f"lock:{name}"
self.ttl_ms = ttl_ms
self.token = str(uuid.uuid4())
self._acquire_script = self.redis.register_script(ACQUIRE_LOCK_SCRIPT)
self._release_script = self.redis.register_script(RELEASE_LOCK_SCRIPT)
self._extend_script = self.redis.register_script(EXTEND_LOCK_SCRIPT)
def acquire(self, timeout_seconds: float = 5.0) -> bool:
"""嘗試取得鎖。"""
deadline = time.monotonic() + timeout_seconds
while time.monotonic() < deadline:
result = self._acquire_script(
keys=[self.name], args=[self.token, self.ttl_ms]
)
if result == 1:
return True
time.sleep(0.05) # 50ms 重試間隔
return False
def release(self) -> bool:
"""釋放鎖。"""
result = self._release_script(keys=[self.name], args=[self.token])
return result == 1
def extend(self, additional_ms: int = None) -> bool:
"""延長鎖的 TTL。"""
ttl = additional_ms or self.ttl_ms
result = self._extend_script(keys=[self.name], args=[self.token, ttl])
return result == 1
def __enter__(self):
if not self.acquire():
raise TimeoutError(f"無法取得鎖: {self.name}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
# 使用範例
# 確保同一時間只有一個 worker 處理特定的訂單
with DistributedLock(r, "process-order:1001", ttl_ms=30000):
print("取得鎖,開始處理訂單...")
# 處理訂單邏輯
time.sleep(1)
print("訂單處理完成")
# 自動釋放鎖
Lua Script 的限制與注意事項
"""
Lua Script 的注意事項:
- 執行時間限制
- 預設 lua-time-limit = 5000 ms
- 超時後 Redis 不會中斷腳本,但會拒絕其他命令
- 可以用 SCRIPT KILL 強制終止(如果沒有寫入操作)
- 不能存取外部資源
- 不能做 HTTP 請求、檔案 IO
- 只能呼叫 redis.call() 或 redis.pcall()
- 確定性要求
- 腳本必須是確定性的(相同輸入 → 相同輸出)
- 避免使用 TIME、RANDOMKEY 等非確定性命令
- math.random 需要先 math.randomseed
- Cluster 模式的限制
- 所有 KEYS 必須在同一個 hash slot
- 用 hash tag 確保相關的 key 在同一個 slot
- 例如: {user:123}:balance 和 {user:123}:orders
"""
小結
Redis 的進階功能各有適用場景:
| 功能 | 適用場景 | 優點 | 限制 |
|——|———|——|——|
| Pub/Sub | 即時通知、廣播 | 極低延遲、簡單 | 訊息不持久化 |
| Stream | 事件流、任務隊列 | 持久化、消費者群組、ACK | 記憶體佔用較高 |
| Lua Script | 原子操作、限流、分散式鎖 | 原子性保證 | 不能太複雜 |
實務建議:
- 簡單的即時通知用 Pub/Sub:不需要持久化的場景,例如 WebSocket 廣播
- 需要可靠性的事件處理用 Stream:可以取代輕量級的 Kafka 使用
- 需要原子性的多步操作用 Lua Script:限流、扣款、搶購等場景
- Stream 記得設 MAXLEN:不要讓 Stream 無限增長吃光記憶體
- Lua Script 保持簡短:複雜邏輯放在應用層,Lua 只做最核心的原子操作
延伸閱讀建議:
- Redis Streams 官方教程
- Redis Lua Scripting
- Redlock 分散式鎖算法 — Martin Kleppmann 對此有著名的批評文章也值得一讀
- Redis 設計與實現 — 黃健宏的經典中文書