前言

你有沒有過這種經驗:你知道你在某個筆記本、某篇文件、某個 Slack 訊息裡寫過某個東西,但就是找不到?搜尋引擎靠關鍵字比對,但你記得的是「那個概念」,不是「那個關鍵字」。

RAG(Retrieval-Augmented Generation)就是來解決這個問題的。它讓 LLM 先去你的文件庫裡「找到相關內容」,再基於這些內容回答你的問題。換句話說,你可以跟你的筆記「對話」。

這篇文章會帶你從零實作一個完整的 RAG 系統。不用 LangChain、不用 LlamaIndex — 我們用最基礎的元件組裝,這樣你才能真正理解每一步在幹嘛。

RAG 架構總覽

┌─────────────────────────────────────────────────────┐
│                    RAG 系統架構                       │
│                                                      │
│  [文件庫]                                            │
│     │                                                │
│     ▼                                                │
│  [文件切割] ──→ [Embedding 生成] ──→ [向量資料庫]     │
│                                          │           │
│                                          │ 搜尋      │
│  [使用者問題] ──→ [Query Embedding] ─────┘           │
│                                          │           │
│                          [Top-K 相關片段] ▼           │
│                                          │           │
│  [Prompt 組裝] ←─────────────────────────┘           │
│     │                                                │
│     ▼                                                │
│  [LLM 生成回答]                                      │
│     │                                                │
│     ▼                                                │
│  [回覆使用者]                                         │
└─────────────────────────────────────────────────────┘

四個核心步驟:

  1. 文件切割(Chunking):把長文件切成適當大小的片段
  2. Embedding 生成:把每個片段轉成向量
  3. 向量搜尋(Retrieval):找到跟問題最相關的片段
  4. LLM 整合回答(Generation):把相關片段塞進 prompt,讓 LLM 回答

Step 1:文件切割

這一步看似簡單但其實是 RAG 系統中最影響品質的環節。切太大,搜尋不精確;切太小,丟失上下文。

import re
from dataclasses import dataclass
from pathlib import Path

@dataclass class Chunk: content: str source: str # 來源檔案 chunk_id: int # 片段編號 metadata: dict # 額外資訊(標題、日期等)

class DocumentChunker: def __init__(self, chunk_size=500, chunk_overlap=100): self.chunk_size = chunk_size # 每個片段的目標字數 self.chunk_overlap = chunk_overlap # 片段之間的重疊字數

def load_markdown(self, file_path: str) -> str: """讀取 Markdown 檔案""" with open(file_path, "r", encoding="utf-8") as f: return f.read()

def split_by_headers(self, text: str) -> list[str]: """先按照標題切割,保持語意完整性""" # 用 ## 和 ### 作為切割點 sections = re.split(r'\n(?=#{2,3}\s)', text) return [s.strip() for s in sections if s.strip()]

def split_by_size(self, text: str) -> list[str]: """把太長的段落按字數切割,保留重疊""" if len(text) <= self.chunk_size: return [text]

chunks = [] start = 0 while start < len(text): end = start + self.chunk_size

# 嘗試在句號處切割,避免截斷句子 if end < len(text): # 往前找最近的句號或換行 for sep in ['。\n', '.\n', '\n\n', '。', '.', '\n']: last_sep = text[start:end].rfind(sep) if last_sep > self.chunk_size * 0.5: # 至少切到一半 end = start + last_sep + len(sep) break

chunks.append(text[start:end].strip()) start = end - self.chunk_overlap # 重疊部分

return chunks

def chunk_document(self, file_path: str) -> list[Chunk]: """完整的文件切割流程""" text = self.load_markdown(file_path) file_name = Path(file_path).name

# 提取標題作為 metadata title_match = re.search(r'^#\s+(.+)$', text, re.MULTILINE) title = title_match.group(1) if title_match else file_name

# 先按標題切 sections = self.split_by_headers(text)

# 再按大小切 all_chunks = [] chunk_id = 0 for section in sections: sub_chunks = self.split_by_size(section) for sub in sub_chunks: all_chunks.append(Chunk( content=sub, source=file_name, chunk_id=chunk_id, metadata={"title": title, "file_path": file_path} )) chunk_id += 1

return all_chunks

# 使用範例 chunker = DocumentChunker(chunk_size=500, chunk_overlap=100) chunks = chunker.chunk_document("my_notes/docker_guide.md") print(f"切割成 {len(chunks)} 個片段") for chunk in chunks[:3]: print(f" [{chunk.chunk_id}] {chunk.content[:80]}...")

Step 2:Embedding 生成

Embedding 是把文字轉成向量(一堆浮點數)的過程。語意相近的文字,向量會很接近。

import numpy as np
from typing import Optional

class EmbeddingGenerator: def __init__(self, model_name: str = "all-MiniLM-L6-v2"): """ 選擇 Embedding 模型: - all-MiniLM-L6-v2: 小巧快速,384 維,適合入門 - bge-large-zh-v1.5: 中文專用,1024 維,品質好 - text-embedding-3-small: OpenAI API,1536 維 """ self.model_name = model_name self._model = None

@property def model(self): if self._model is None: from sentence_transformers import SentenceTransformer self._model = SentenceTransformer(self.model_name) return self._model

def embed(self, texts: list[str]) -> np.ndarray: """批次生成 embedding""" embeddings = self.model.encode( texts, show_progress_bar=True, normalize_embeddings=True, # L2 正規化,方便計算 cosine similarity batch_size=32, ) return np.array(embeddings, dtype=np.float32)

def embed_single(self, text: str) -> np.ndarray: """單筆生成 embedding""" return self.embed([text])[0]

# 使用 Ollama 的 embedding API(本地,不需要外部服務) class OllamaEmbedding: def __init__(self, model: str = "nomic-embed-text", base_url: str = "http://localhost:11434"): self.model = model self.base_url = base_url

def embed(self, texts: list[str]) -> np.ndarray: import requests embeddings = [] for text in texts: response = requests.post( f"{self.base_url}/api/embeddings", json={"model": self.model, "prompt": text} ) embeddings.append(response.json()["embedding"]) return np.array(embeddings, dtype=np.float32)

def embed_single(self, text: str) -> np.ndarray: return self.embed([text])[0]

# 使用範例 embedder = EmbeddingGenerator("all-MiniLM-L6-v2") texts = ["Docker 容器化技術", "Kubernetes 叢集管理", "今天天氣真好"] vectors = embedder.embed(texts) print(f"向量維度: {vectors.shape}") # (3, 384)

Step 3:向量搜尋

簡易版:用 NumPy

先從最簡單的開始,不需要任何資料庫:

import json
import numpy as np
from pathlib import Path

class SimpleVectorStore: """用 NumPy 實作的簡易向量資料庫"""

def __init__(self, dimension: int): self.dimension = dimension self.vectors = np.empty((0, dimension), dtype=np.float32) self.documents = [] # 對應的文件片段

def add(self, vectors: np.ndarray, documents: list[dict]): """加入向量和對應的文件""" self.vectors = np.vstack([self.vectors, vectors]) self.documents.extend(documents)

def search(self, query_vector: np.ndarray, top_k: int = 5) -> list[dict]: """用 cosine similarity 搜尋最相關的文件""" if len(self.vectors) == 0: return []

# Cosine similarity(因為已經 L2 正規化,直接點積即可) similarities = np.dot(self.vectors, query_vector)

# 取 Top-K top_indices = np.argsort(similarities)[::-1][:top_k]

results = [] for idx in top_indices: results.append({ "document": self.documents[idx], "score": float(similarities[idx]), }) return results

def save(self, path: str): """持久化到檔案""" np.save(f"{path}_vectors.npy", self.vectors) with open(f"{path}_docs.json", "w", encoding="utf-8") as f: json.dump(self.documents, f, ensure_ascii=False, indent=2)

def load(self, path: str): """從檔案載入""" self.vectors = np.load(f"{path}_vectors.npy") with open(f"{path}_docs.json", "r", encoding="utf-8") as f: self.documents = json.load(f)

進階版:用 PostgreSQL + pgvector

import psycopg2
from pgvector.psycopg2 import register_vector

class PgVectorStore: """用 PostgreSQL pgvector 實作的向量資料庫"""

def __init__(self, connection_string: str, dimension: int = 384): self.conn = psycopg2.connect(connection_string) self.dimension = dimension register_vector(self.conn) self._create_table()

def _create_table(self): with self.conn.cursor() as cur: cur.execute("CREATE EXTENSION IF NOT EXISTS vector") cur.execute(f""" CREATE TABLE IF NOT EXISTS documents ( id SERIAL PRIMARY KEY, content TEXT NOT NULL, source VARCHAR(255), chunk_id INTEGER, metadata JSONB, embedding vector({self.dimension}) ) """) # 建立 IVFFlat 索引(加速搜尋) cur.execute(f""" CREATE INDEX IF NOT EXISTS documents_embedding_idx ON documents USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100) """) self.conn.commit()

def add(self, chunks: list, embeddings: np.ndarray): with self.conn.cursor() as cur: for chunk, embedding in zip(chunks, embeddings): cur.execute( """INSERT INTO documents (content, source, chunk_id, metadata, embedding) VALUES (%s, %s, %s, %s, %s)""", ( chunk.content, chunk.source, chunk.chunk_id, json.dumps(chunk.metadata, ensure_ascii=False), embedding.tolist(), ) ) self.conn.commit()

def search(self, query_embedding: np.ndarray, top_k: int = 5) -> list[dict]: with self.conn.cursor() as cur: cur.execute( """SELECT content, source, chunk_id, metadata, 1 - (embedding <=> %s) AS similarity FROM documents ORDER BY embedding <=> %s LIMIT %s""", (query_embedding.tolist(), query_embedding.tolist(), top_k) ) results = [] for row in cur.fetchall(): results.append({ "content": row[0], "source": row[1], "chunk_id": row[2], "metadata": row[3], "score": float(row[4]), }) return results

Step 4:LLM 整合回答

import requests

class RAGSystem: """完整的 RAG 系統"""

def __init__(self, embedder, vector_store, llm_model="llama3.1"): self.embedder = embedder self.vector_store = vector_store self.llm_model = llm_model

def index_documents(self, file_paths: list[str]): """索引文件""" chunker = DocumentChunker(chunk_size=500, chunk_overlap=100)

for file_path in file_paths: print(f"正在索引: {file_path}") chunks = chunker.chunk_document(file_path)

# 生成 embeddings texts = [chunk.content for chunk in chunks] embeddings = self.embedder.embed(texts)

# 存入向量資料庫 self.vector_store.add(chunks, embeddings)

print(f"索引完成,共 {len(file_paths)} 個文件")

def query(self, question: str, top_k: int = 5) -> str: """查詢""" # 1. 把問題轉成 embedding query_embedding = self.embedder.embed_single(question)

# 2. 搜尋相關片段 results = self.vector_store.search(query_embedding, top_k=top_k)

# 3. 組裝 prompt context = self._build_context(results) prompt = self._build_prompt(question, context)

# 4. 送給 LLM 生成回答 answer = self._generate(prompt)

return answer

def _build_context(self, results: list[dict]) -> str: """組裝上下文""" context_parts = [] for i, r in enumerate(results): doc = r.get("document", r) content = doc.get("content", doc) if isinstance(doc, dict) else str(doc) source = doc.get("source", "unknown") if isinstance(doc, dict) else "unknown" score = r.get("score", 0) context_parts.append( f"[片段 {i+1}] (來源: {source}, 相關度: {score:.3f})\n{content}" ) return "\n\n---\n\n".join(context_parts)

def _build_prompt(self, question: str, context: str) -> str: """組裝 prompt""" return f"""根據以下參考資料回答問題。如果參考資料中沒有相關資訊,請明確告知。 回答時請引用資料來源。用繁體中文回答。

參考資料

{context}

問題

{question}

回答

"""

def _generate(self, prompt: str) -> str: """呼叫 LLM 生成回答""" response = requests.post( "http://localhost:11434/api/generate", json={ "model": self.llm_model, "prompt": prompt, "stream": False, "options": { "temperature": 0.3, # RAG 用低溫度,避免幻覺 "num_predict": 1024, } } ) return response.json()["response"]

# 完整使用範例 if __name__ == "__main__": # 初始化元件 embedder = EmbeddingGenerator("all-MiniLM-L6-v2") store = SimpleVectorStore(dimension=384)

# 建立 RAG 系統 rag = RAGSystem(embedder, store)

# 索引文件 rag.index_documents([ "notes/docker_guide.md", "notes/kubernetes_notes.md", "notes/python_best_practices.md", ])

# 查詢 answer = rag.query("Docker 和虛擬機有什麼差別?") print(answer)

品質優化技巧

1. Hybrid Search(關鍵字 + 語意)

def hybrid_search(self, question: str, top_k: int = 5, alpha: float = 0.7):
    """混合搜尋:結合向量搜尋和關鍵字搜尋"""
    # 向量搜尋
    vector_results = self.vector_store.search(
        self.embedder.embed_single(question), top_k=top_k * 2
    )

# 關鍵字搜尋(簡易版用 BM25) keyword_results = self.keyword_search(question, top_k=top_k * 2)

# 合併分數(Reciprocal Rank Fusion) combined = {} for rank, r in enumerate(vector_results): doc_id = r["chunk_id"] combined[doc_id] = combined.get(doc_id, 0) + alpha / (rank + 60)

for rank, r in enumerate(keyword_results): doc_id = r["chunk_id"] combined[doc_id] = combined.get(doc_id, 0) + (1 - alpha) / (rank + 60)

# 排序 sorted_ids = sorted(combined, key=combined.get, reverse=True)[:top_k] return [self.get_document(doc_id) for doc_id in sorted_ids]

2. Re-ranking

def rerank(self, question: str, candidates: list[dict], top_k: int = 3):
    """用 Cross-Encoder 重新排序"""
    from sentence_transformers import CrossEncoder
    reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")

pairs = [(question, c["content"]) for c in candidates] scores = reranker.predict(pairs)

for i, score in enumerate(scores): candidates[i]["rerank_score"] = float(score)

candidates.sort(key=lambda x: x["rerank_score"], reverse=True) return candidates[:top_k]

小結

RAG 系統的核心概念其實很簡單:把文件切片、轉向量、搜尋、塞進 prompt。但魔鬼藏在細節裡 — 切片的粒度、embedding 模型的選擇、prompt 的設計、這些都會大幅影響最終的回答品質。

我的建議是:

  1. 先用最簡單的方案跑起來(NumPy 向量搜尋 + Ollama)
  2. 觀察失敗的 case(什麼問題答不好?為什麼?)
  3. 針對性優化(切片太大?embedding 不夠好?需要 re-ranking?)
  4. 不要過度工程化(很多情況下 simple RAG 就夠用了)

延伸閱讀:

  • 向量資料庫比較:pgvector vs Milvus vs Qdrant vs Chroma
  • Embedding 模型排行榜:MTEB Leaderboard
  • RAG 進階技術:HyDE、Multi-Query、FLARE
  • LangChain / LlamaIndex 的 RAG 實作(如果你偏好用框架)