前言

提到 Redis,大部分人的第一反應是「快取」。沒錯,Redis 作為快取確實是它最經典的用途——把熱門資料放到記憶體裡,讓讀取速度從毫秒降到微秒。但如果你只把 Redis 當快取用,那真的太可惜了。

Redis 是一個多功能的資料結構伺服器。除了 String、Hash、List、Set、Sorted Set 這些基本資料結構之外,它還提供了 Pub/Sub(發布訂閱)、Stream(事件流)、Lua Script(原子操作腳本)等進階功能。這些功能在實際的後端架構中非常實用,而且往往比你另外引入一個 Kafka 或 RabbitMQ 更輕量。

這篇文章會深入介紹 Redis 的這三個進階功能,每個都有完整的實戰範例。我在自己的專案中大量使用這些功能,用 Pub/Sub 做即時通知、用 Stream 做事件溯源、用 Lua Script 做分散式限流,效果都很好。

Redis 資料結構複習

在進入進階功能之前,先快速複習幾個常用但容易被忽略的資料結構用法:

# Sorted Set — 非常強大的排行榜/排程系統

# 新增分數(用 Unix timestamp 作為 score,可以當延遲隊列用) redis-cli ZADD delayed_tasks 1719849600 "task:send_email:123" redis-cli ZADD delayed_tasks 1719853200 "task:send_sms:456"

# 取得到期的任務(score <= 當前時間) redis-cli ZRANGEBYSCORE delayed_tasks -inf 1719849600

# 取得排行榜前 10 名 redis-cli ZREVRANGE leaderboard 0 9 WITHSCORES

# HyperLogLog — 用極少記憶體做 UV(獨立訪客)統計
import redis

r = redis.Redis()

# 每個頁面瀏覽記錄 user_id r.pfadd("uv:2024-06-15", "user:123", "user:456", "user:789") r.pfadd("uv:2024-06-15", "user:123") # 重複不會多算

# 取得獨立訪客數(近似值,誤差約 0.81%) uv_count = r.pfcount("uv:2024-06-15") print(f"UV: {uv_count}") # 3

# 合併多天的 UV r.pfmerge("uv:2024-06-week", "uv:2024-06-15", "uv:2024-06-16") weekly_uv = r.pfcount("uv:2024-06-week")

# 每個 key 只佔 12KB 記憶體,不管有多少 unique elements!

Pub/Sub 即時通訊

基本概念

Pub/Sub(Publish/Subscribe)是一種訊息傳遞模式:發布者(Publisher)把訊息發到頻道(Channel),所有訂閱該頻道的訂閱者(Subscriber)都會收到訊息。

Publisher A ─→ Channel: "notifications" ─→ Subscriber 1
Publisher B ─↗                           ─→ Subscriber 2
                                         ─→ Subscriber 3

基本用法

# Terminal 1: 訂閱者
redis-cli SUBSCRIBE notifications
# 等待訊息...

# Terminal 2: 發布者 redis-cli PUBLISH notifications '{"type":"new_order","order_id":123}' # (integer) 1 ← 表示有 1 個訂閱者收到了

# Terminal 1 會顯示: # 1) "message" # 2) "notifications" # 3) "{\"type\":\"new_order\",\"order_id\":123}"

# 模式訂閱(支援萬用字元) redis-cli PSUBSCRIBE "events:*" # 會收到 events:order、events:user、events:payment 等所有頻道的訊息

實戰:即時通知系統

import redis
import json
import threading
import time

class NotificationService: """基於 Redis Pub/Sub 的即時通知系統。"""

def __init__(self, redis_url="redis://localhost:6379"): self.redis = redis.from_url(redis_url) self.pubsub = self.redis.pubsub() self._handlers = {}

def subscribe(self, channel: str, handler): """訂閱頻道並註冊處理函式。""" self._handlers[channel] = handler self.pubsub.subscribe(**{channel: self._message_handler})

def _message_handler(self, message): """內部訊息路由。""" if message["type"] != "message": return channel = message["channel"] if isinstance(channel, bytes): channel = channel.decode() data = json.loads(message["data"]) handler = self._handlers.get(channel) if handler: handler(data)

def publish(self, channel: str, data: dict): """發布訊息到頻道。""" self.redis.publish(channel, json.dumps(data, ensure_ascii=False))

def start_listening(self): """在背景線程中開始監聽。""" thread = threading.Thread(target=self._listen_loop, daemon=True) thread.start() return thread

def _listen_loop(self): """監聽迴圈。""" for message in self.pubsub.listen(): pass # subscribe() 中註冊的 callback 會自動被呼叫

# === 使用範例 ===

# 處理新訂單通知 def on_new_order(data): print(f"新訂單!ID: {data['order_id']}, 金額: {data['amount']}") # 發送 push notification、更新 dashboard 等

# 處理庫存警告 def on_low_stock(data): print(f"庫存警告!產品: {data['product_id']}, 剩餘: {data['quantity']}")

# 設定訂閱 service = NotificationService() service.subscribe("orders:new", on_new_order) service.subscribe("inventory:low_stock", on_low_stock) service.start_listening()

# 在其他服務中發布訊息 service.publish("orders:new", {"order_id": 123, "amount": 1500}) service.publish("inventory:low_stock", {"product_id": "SKU-001", "quantity": 3})

Pub/Sub 的限制

Pub/Sub 的重要限制:

  1. 訊息不持久化
- 如果沒有訂閱者在線,訊息就丟了 - 訂閱者斷線重連期間的訊息也會丟失
  1. 沒有 ACK 機制
- 無法確認訂閱者是否成功處理了訊息 - 沒有重試機制
  1. 不支援消費者群組
- 多個訂閱者會收到重複的訊息(廣播模式) - 無法做負載均衡

如果需要可靠的訊息傳遞 → 用 Redis Stream 如果需要持久化和重放 → 用 Redis Stream 或 Kafka

Redis Stream 事件流

基本概念

Redis Stream 是 Redis 5.0 引入的資料結構,類似一個 append-only log。它解決了 Pub/Sub 的所有限制:訊息持久化、消費者群組、ACK 機制。

Producer → Stream (有序的訊息列表) → Consumer Group A → Consumer 1
                                                     → Consumer 2
                                  → Consumer Group B → Consumer 3

基本操作

# 寫入訊息到 Stream
redis-cli XADD events '*' type order_created order_id 123 amount 1500
# 回傳訊息 ID:"1719849600000-0"(毫秒時間戳-序號)

redis-cli XADD events '*' type payment_received order_id 123 method credit_card redis-cli XADD events '*' type order_shipped order_id 123 tracking AB123

# 讀取最新的 5 筆訊息 redis-cli XREVRANGE events + - COUNT 5

# 讀取特定時間範圍的訊息 redis-cli XRANGE events 1719849600000 1719853200000

# 查看 Stream 資訊 redis-cli XINFO STREAM events # length: 3 # first-entry: ... # last-entry: ...

# 建立消費者群組 redis-cli XGROUP CREATE events order-processors $ MKSTREAM # $ 表示只處理新訊息 # 0 表示從頭開始處理

# 消費者讀取訊息 redis-cli XREADGROUP GROUP order-processors consumer-1 COUNT 1 BLOCK 5000 STREAMS events > # > 表示讀取尚未分配給此消費者的新訊息

# ACK 確認訊息已處理 redis-cli XACK events order-processors 1719849600000-0

實戰:事件驅動架構

import redis
import json
import time
import signal
import sys
from typing import Callable, Dict

class EventStream: """基於 Redis Stream 的事件驅動系統。"""

def __init__(self, redis_url="redis://localhost:6379"): self.redis = redis.from_url(redis_url, decode_responses=True) self._running = False

def publish(self, stream: str, event_type: str, data: dict): """發布事件到 Stream。""" entry = {"type": event_type, "data": json.dumps(data, ensure_ascii=False)} msg_id = self.redis.xadd(stream, entry) return msg_id

def create_consumer_group(self, stream: str, group: str, start_id="0"): """建立消費者群組。""" try: self.redis.xgroup_create(stream, group, start_id, mkstream=True) print(f"消費者群組 '{group}' 已建立") except redis.exceptions.ResponseError as e: if "BUSYGROUP" in str(e): print(f"消費者群組 '{group}' 已存在") else: raise

def consume( self, stream: str, group: str, consumer: str, handlers: Dict[str, Callable], batch_size: int = 10, block_ms: int = 5000, ): """消費並處理事件。""" self._running = True

# 優雅關閉 def signal_handler(sig, frame): print(f"\n收到 {sig} 信號,準備關閉...") self._running = False signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler)

print(f"消費者 '{consumer}' 開始監聽 '{stream}'...")

# 先處理 pending 的訊息(之前領取但未 ACK 的) self._process_pending(stream, group, consumer, handlers)

# 然後處理新訊息 while self._running: try: results = self.redis.xreadgroup( groupname=group, consumername=consumer, streams={stream: ">"}, count=batch_size, block=block_ms, )

if not results: continue

for stream_name, messages in results: for msg_id, fields in messages: self._handle_message( stream_name, group, consumer, msg_id, fields, handlers )

except redis.exceptions.ConnectionError: print("Redis 連線中斷,5 秒後重試...") time.sleep(5)

print(f"消費者 '{consumer}' 已關閉")

def _handle_message(self, stream, group, consumer, msg_id, fields, handlers): """處理單一訊息。""" event_type = fields.get("type", "unknown") handler = handlers.get(event_type)

if handler is None: print(f"未知事件類型: {event_type},跳過") self.redis.xack(stream, group, msg_id) return

try: data = json.loads(fields.get("data", "{}")) handler(data) # 處理成功,ACK self.redis.xack(stream, group, msg_id) except Exception as e: print(f"處理事件 {msg_id} 失敗: {e}") # 不 ACK,訊息會留在 pending 列表中等待重試

def _process_pending(self, stream, group, consumer, handlers): """處理 pending 的訊息(之前失敗未 ACK 的)。""" while True: pending = self.redis.xreadgroup( groupname=group, consumername=consumer, streams={stream: "0"}, # 0 表示讀取 pending 的訊息 count=10, ) if not pending or not pending[0][1]: break

for stream_name, messages in pending: for msg_id, fields in messages: if not fields: # 已經被其他消費者處理 continue print(f"重新處理 pending 訊息: {msg_id}") self._handle_message( stream_name, group, consumer, msg_id, fields, handlers )

def get_stream_info(self, stream: str) -> dict: """取得 Stream 的統計資訊。""" info = self.redis.xinfo_stream(stream) groups = self.redis.xinfo_groups(stream) return { "length": info["length"], "first_entry": info.get("first-entry"), "last_entry": info.get("last-entry"), "groups": [ { "name": g["name"], "consumers": g["consumers"], "pending": g["pending"], "last_delivered_id": g["last-delivered-id"], } for g in groups ], }

# === 使用範例 ===

stream = EventStream()

# 設定消費者群組 stream.create_consumer_group("events", "order-service") stream.create_consumer_group("events", "notification-service")

# 定義事件處理器 def handle_order_created(data): print(f"[訂單服務] 新訂單: {data['order_id']}, 金額: {data['amount']}") # 建立出貨單、扣庫存等

def handle_payment_received(data): print(f"[訂單服務] 收到付款: 訂單 {data['order_id']}") # 更新訂單狀態

def handle_order_notification(data): print(f"[通知服務] 發送訂單確認信給客戶") # 寄 email、push notification

# 發布事件 stream.publish("events", "order_created", {"order_id": 1001, "amount": 2500}) stream.publish("events", "payment_received", {"order_id": 1001, "method": "credit_card"})

# 啟動消費者(通常在不同的 process/container 中) # 訂單服務的消費者 stream.consume("events", "order-service", "worker-1", { "order_created": handle_order_created, "payment_received": handle_payment_received, })

Stream 的清理策略

# Stream 會無限增長,需要定期清理

# 方法一:限制 Stream 長度(MAXLEN) redis-cli XADD events MAXLEN ~ 100000 '*' type test data hello # ~ 表示近似截斷(效能更好),實際保留可能略多於 100000

# 方法二:按時間清理(MINID) # 刪除 ID 小於指定值的訊息 redis-cli XTRIM events MINID ~ 1719849600000-0

# 方法三:程式化清理

def cleanup_stream(r, stream, max_age_hours=24):
    """清理超過指定時間的 Stream 訊息。"""
    cutoff_ms = int((time.time() - max_age_hours  3600)  1000)
    cutoff_id = f"{cutoff_ms}-0"

# 先檢查要清理多少 info = r.xinfo_stream(stream) before_len = info["length"]

# 執行清理 r.xtrim(stream, minid=cutoff_id)

after_len = r.xlen(stream) print(f"清理完成: {before_len} → {after_len}(刪除 {before_len - after_len} 筆)")

Lua Script 原子操作

為什麼需要 Lua Script

Redis 的單一命令是原子的,但多個命令的組合不是。當你需要「讀取 → 判斷 → 寫入」這種 read-modify-write 操作時,就需要 Lua Script 來保證原子性。

# 不安全的做法(race condition):
balance = r.get("balance:user:123")  # 讀取:1000
if int(balance) >= 100:
    r.decrby("balance:user:123", 100)  # 扣款
# 問題:兩個請求同時讀到 1000,各扣 100,結果只扣了一次

# 安全的做法:用 Lua Script

基本用法

import redis

r = redis.Redis()

# 定義 Lua Script # KEYS[] 是 Redis key 參數 # ARGV[] 是其他參數 # 在 Redis 伺服器端原子執行

# 範例:安全扣款 DEDUCT_SCRIPT = """ local balance = tonumber(redis.call('GET', KEYS[1])) if balance == nil then return -1 -- 帳戶不存在 end if balance < tonumber(ARGV[1]) then return -2 -- 餘額不足 end redis.call('DECRBY', KEYS[1], ARGV[1]) return balance - tonumber(ARGV[1]) """

# 註冊腳本(只傳輸一次,之後用 SHA 呼叫) deduct = r.register_script(DEDUCT_SCRIPT)

# 設定初始餘額 r.set("balance:user:123", 1000)

# 安全扣款 result = deduct(keys=["balance:user:123"], args=[100]) print(f"扣款後餘額: {result}") # 900

result = deduct(keys=["balance:user:123"], args=[2000]) print(f"餘額不足: {result}") # -2

實戰:分散式限流器(Sliding Window)

# 滑動視窗限流器 — 比固定視窗更精確
RATE_LIMITER_SCRIPT = """
local key = KEYS[1]
local now = tonumber(ARGV[1])           -- 當前時間(毫秒)
local window = tonumber(ARGV[2])         -- 時間視窗(毫秒)
local limit = tonumber(ARGV[3])          -- 最大請求數

-- 清除過期的記錄 redis.call('ZREMRANGEBYSCORE', key, '-inf', now - window)

-- 計算當前視窗內的請求數 local count = redis.call('ZCARD', key)

if count < limit then -- 允許請求,記錄時間戳 redis.call('ZADD', key, now, now .. ':' .. math.random(1000000)) redis.call('EXPIRE', key, math.ceil(window / 1000)) return 1 -- 允許 else return 0 -- 拒絕 end """

class SlidingWindowRateLimiter: def __init__(self, redis_client): self.redis = redis_client self._script = self.redis.register_script(RATE_LIMITER_SCRIPT)

def is_allowed(self, key: str, limit: int, window_seconds: int) -> bool: """檢查請求是否允許。""" now_ms = int(time.time() * 1000) window_ms = window_seconds * 1000 result = self._script( keys=[f"ratelimit:{key}"], args=[now_ms, window_ms, limit], ) return result == 1

# 使用範例 limiter = SlidingWindowRateLimiter(r)

# API 限流:每個用戶每分鐘最多 60 次 user_id = "user:123" for i in range(65): allowed = limiter.is_allowed(f"api:{user_id}", limit=60, window_seconds=60) if not allowed: print(f"第 {i+1} 次請求被拒絕(限流)") break

實戰:分散式鎖

# 分散式鎖的正確實作(帶自動過期和安全釋放)
ACQUIRE_LOCK_SCRIPT = """
local key = KEYS[1]
local token = ARGV[1]
local ttl = tonumber(ARGV[2])

if redis.call('SET', key, token, 'NX', 'PX', ttl) then return 1 else return 0 end """

RELEASE_LOCK_SCRIPT = """ local key = KEYS[1] local token = ARGV[1]

-- 只有鎖的持有者才能釋放(比較 token) if redis.call('GET', key) == token then redis.call('DEL', key) return 1 else return 0 -- 鎖已經被別人取得或已過期 end """

EXTEND_LOCK_SCRIPT = """ local key = KEYS[1] local token = ARGV[1] local ttl = tonumber(ARGV[2])

if redis.call('GET', key) == token then redis.call('PEXPIRE', key, ttl) return 1 else return 0 end """

import uuid import time

class DistributedLock: def __init__(self, redis_client, name: str, ttl_ms: int = 10000): self.redis = redis_client self.name = f"lock:{name}" self.ttl_ms = ttl_ms self.token = str(uuid.uuid4()) self._acquire_script = self.redis.register_script(ACQUIRE_LOCK_SCRIPT) self._release_script = self.redis.register_script(RELEASE_LOCK_SCRIPT) self._extend_script = self.redis.register_script(EXTEND_LOCK_SCRIPT)

def acquire(self, timeout_seconds: float = 5.0) -> bool: """嘗試取得鎖。""" deadline = time.monotonic() + timeout_seconds while time.monotonic() < deadline: result = self._acquire_script( keys=[self.name], args=[self.token, self.ttl_ms] ) if result == 1: return True time.sleep(0.05) # 50ms 重試間隔 return False

def release(self) -> bool: """釋放鎖。""" result = self._release_script(keys=[self.name], args=[self.token]) return result == 1

def extend(self, additional_ms: int = None) -> bool: """延長鎖的 TTL。""" ttl = additional_ms or self.ttl_ms result = self._extend_script(keys=[self.name], args=[self.token, ttl]) return result == 1

def __enter__(self): if not self.acquire(): raise TimeoutError(f"無法取得鎖: {self.name}") return self

def __exit__(self, exc_type, exc_val, exc_tb): self.release()

# 使用範例 # 確保同一時間只有一個 worker 處理特定的訂單 with DistributedLock(r, "process-order:1001", ttl_ms=30000): print("取得鎖,開始處理訂單...") # 處理訂單邏輯 time.sleep(1) print("訂單處理完成") # 自動釋放鎖

Lua Script 的限制與注意事項

"""
Lua Script 的注意事項:

  1. 執行時間限制
- 預設 lua-time-limit = 5000 ms - 超時後 Redis 不會中斷腳本,但會拒絕其他命令 - 可以用 SCRIPT KILL 強制終止(如果沒有寫入操作)
  1. 不能存取外部資源
- 不能做 HTTP 請求、檔案 IO - 只能呼叫 redis.call() 或 redis.pcall()
  1. 確定性要求
- 腳本必須是確定性的(相同輸入 → 相同輸出) - 避免使用 TIME、RANDOMKEY 等非確定性命令 - math.random 需要先 math.randomseed
  1. Cluster 模式的限制
- 所有 KEYS 必須在同一個 hash slot - 用 hash tag 確保相關的 key 在同一個 slot - 例如: {user:123}:balance 和 {user:123}:orders """

小結

Redis 的進階功能各有適用場景:

| 功能 | 適用場景 | 優點 | 限制 |
|——|———|——|——|
| Pub/Sub | 即時通知、廣播 | 極低延遲、簡單 | 訊息不持久化 |
| Stream | 事件流、任務隊列 | 持久化、消費者群組、ACK | 記憶體佔用較高 |
| Lua Script | 原子操作、限流、分散式鎖 | 原子性保證 | 不能太複雜 |

實務建議:

  1. 簡單的即時通知用 Pub/Sub:不需要持久化的場景,例如 WebSocket 廣播
  2. 需要可靠性的事件處理用 Stream:可以取代輕量級的 Kafka 使用
  3. 需要原子性的多步操作用 Lua Script:限流、扣款、搶購等場景
  4. Stream 記得設 MAXLEN:不要讓 Stream 無限增長吃光記憶體
  5. Lua Script 保持簡短:複雜邏輯放在應用層,Lua 只做最核心的原子操作

延伸閱讀建議: