前言
你有沒有過這種經驗:你知道你在某個筆記本、某篇文件、某個 Slack 訊息裡寫過某個東西,但就是找不到?搜尋引擎靠關鍵字比對,但你記得的是「那個概念」,不是「那個關鍵字」。
RAG(Retrieval-Augmented Generation)就是來解決這個問題的。它讓 LLM 先去你的文件庫裡「找到相關內容」,再基於這些內容回答你的問題。換句話說,你可以跟你的筆記「對話」。
這篇文章會帶你從零實作一個完整的 RAG 系統。不用 LangChain、不用 LlamaIndex — 我們用最基礎的元件組裝,這樣你才能真正理解每一步在幹嘛。
RAG 架構總覽
┌─────────────────────────────────────────────────────┐
│ RAG 系統架構 │
│ │
│ [文件庫] │
│ │ │
│ ▼ │
│ [文件切割] ──→ [Embedding 生成] ──→ [向量資料庫] │
│ │ │
│ │ 搜尋 │
│ [使用者問題] ──→ [Query Embedding] ─────┘ │
│ │ │
│ [Top-K 相關片段] ▼ │
│ │ │
│ [Prompt 組裝] ←─────────────────────────┘ │
│ │ │
│ ▼ │
│ [LLM 生成回答] │
│ │ │
│ ▼ │
│ [回覆使用者] │
└─────────────────────────────────────────────────────┘
四個核心步驟:
- 文件切割(Chunking):把長文件切成適當大小的片段
- Embedding 生成:把每個片段轉成向量
- 向量搜尋(Retrieval):找到跟問題最相關的片段
- LLM 整合回答(Generation):把相關片段塞進 prompt,讓 LLM 回答
Step 1:文件切割
這一步看似簡單但其實是 RAG 系統中最影響品質的環節。切太大,搜尋不精確;切太小,丟失上下文。
import re
from dataclasses import dataclass
from pathlib import Path
@dataclass
class Chunk:
content: str
source: str # 來源檔案
chunk_id: int # 片段編號
metadata: dict # 額外資訊(標題、日期等)
class DocumentChunker:
def __init__(self, chunk_size=500, chunk_overlap=100):
self.chunk_size = chunk_size # 每個片段的目標字數
self.chunk_overlap = chunk_overlap # 片段之間的重疊字數
def load_markdown(self, file_path: str) -> str:
"""讀取 Markdown 檔案"""
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
def split_by_headers(self, text: str) -> list[str]:
"""先按照標題切割,保持語意完整性"""
# 用 ## 和 ### 作為切割點
sections = re.split(r'\n(?=#{2,3}\s)', text)
return [s.strip() for s in sections if s.strip()]
def split_by_size(self, text: str) -> list[str]:
"""把太長的段落按字數切割,保留重疊"""
if len(text) <= self.chunk_size:
return [text]
chunks = []
start = 0
while start < len(text):
end = start + self.chunk_size
# 嘗試在句號處切割,避免截斷句子
if end < len(text):
# 往前找最近的句號或換行
for sep in ['。\n', '.\n', '\n\n', '。', '.', '\n']:
last_sep = text[start:end].rfind(sep)
if last_sep > self.chunk_size * 0.5: # 至少切到一半
end = start + last_sep + len(sep)
break
chunks.append(text[start:end].strip())
start = end - self.chunk_overlap # 重疊部分
return chunks
def chunk_document(self, file_path: str) -> list[Chunk]:
"""完整的文件切割流程"""
text = self.load_markdown(file_path)
file_name = Path(file_path).name
# 提取標題作為 metadata
title_match = re.search(r'^#\s+(.+)$', text, re.MULTILINE)
title = title_match.group(1) if title_match else file_name
# 先按標題切
sections = self.split_by_headers(text)
# 再按大小切
all_chunks = []
chunk_id = 0
for section in sections:
sub_chunks = self.split_by_size(section)
for sub in sub_chunks:
all_chunks.append(Chunk(
content=sub,
source=file_name,
chunk_id=chunk_id,
metadata={"title": title, "file_path": file_path}
))
chunk_id += 1
return all_chunks
# 使用範例
chunker = DocumentChunker(chunk_size=500, chunk_overlap=100)
chunks = chunker.chunk_document("my_notes/docker_guide.md")
print(f"切割成 {len(chunks)} 個片段")
for chunk in chunks[:3]:
print(f" [{chunk.chunk_id}] {chunk.content[:80]}...")
Step 2:Embedding 生成
Embedding 是把文字轉成向量(一堆浮點數)的過程。語意相近的文字,向量會很接近。
import numpy as np
from typing import Optional
class EmbeddingGenerator:
def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
"""
選擇 Embedding 模型:
- all-MiniLM-L6-v2: 小巧快速,384 維,適合入門
- bge-large-zh-v1.5: 中文專用,1024 維,品質好
- text-embedding-3-small: OpenAI API,1536 維
"""
self.model_name = model_name
self._model = None
@property
def model(self):
if self._model is None:
from sentence_transformers import SentenceTransformer
self._model = SentenceTransformer(self.model_name)
return self._model
def embed(self, texts: list[str]) -> np.ndarray:
"""批次生成 embedding"""
embeddings = self.model.encode(
texts,
show_progress_bar=True,
normalize_embeddings=True, # L2 正規化,方便計算 cosine similarity
batch_size=32,
)
return np.array(embeddings, dtype=np.float32)
def embed_single(self, text: str) -> np.ndarray:
"""單筆生成 embedding"""
return self.embed([text])[0]
# 使用 Ollama 的 embedding API(本地,不需要外部服務)
class OllamaEmbedding:
def __init__(self, model: str = "nomic-embed-text", base_url: str = "http://localhost:11434"):
self.model = model
self.base_url = base_url
def embed(self, texts: list[str]) -> np.ndarray:
import requests
embeddings = []
for text in texts:
response = requests.post(
f"{self.base_url}/api/embeddings",
json={"model": self.model, "prompt": text}
)
embeddings.append(response.json()["embedding"])
return np.array(embeddings, dtype=np.float32)
def embed_single(self, text: str) -> np.ndarray:
return self.embed([text])[0]
# 使用範例
embedder = EmbeddingGenerator("all-MiniLM-L6-v2")
texts = ["Docker 容器化技術", "Kubernetes 叢集管理", "今天天氣真好"]
vectors = embedder.embed(texts)
print(f"向量維度: {vectors.shape}") # (3, 384)
Step 3:向量搜尋
簡易版:用 NumPy
先從最簡單的開始,不需要任何資料庫:
import json
import numpy as np
from pathlib import Path
class SimpleVectorStore:
"""用 NumPy 實作的簡易向量資料庫"""
def __init__(self, dimension: int):
self.dimension = dimension
self.vectors = np.empty((0, dimension), dtype=np.float32)
self.documents = [] # 對應的文件片段
def add(self, vectors: np.ndarray, documents: list[dict]):
"""加入向量和對應的文件"""
self.vectors = np.vstack([self.vectors, vectors])
self.documents.extend(documents)
def search(self, query_vector: np.ndarray, top_k: int = 5) -> list[dict]:
"""用 cosine similarity 搜尋最相關的文件"""
if len(self.vectors) == 0:
return []
# Cosine similarity(因為已經 L2 正規化,直接點積即可)
similarities = np.dot(self.vectors, query_vector)
# 取 Top-K
top_indices = np.argsort(similarities)[::-1][:top_k]
results = []
for idx in top_indices:
results.append({
"document": self.documents[idx],
"score": float(similarities[idx]),
})
return results
def save(self, path: str):
"""持久化到檔案"""
np.save(f"{path}_vectors.npy", self.vectors)
with open(f"{path}_docs.json", "w", encoding="utf-8") as f:
json.dump(self.documents, f, ensure_ascii=False, indent=2)
def load(self, path: str):
"""從檔案載入"""
self.vectors = np.load(f"{path}_vectors.npy")
with open(f"{path}_docs.json", "r", encoding="utf-8") as f:
self.documents = json.load(f)
進階版:用 PostgreSQL + pgvector
import psycopg2
from pgvector.psycopg2 import register_vector
class PgVectorStore:
"""用 PostgreSQL pgvector 實作的向量資料庫"""
def __init__(self, connection_string: str, dimension: int = 384):
self.conn = psycopg2.connect(connection_string)
self.dimension = dimension
register_vector(self.conn)
self._create_table()
def _create_table(self):
with self.conn.cursor() as cur:
cur.execute("CREATE EXTENSION IF NOT EXISTS vector")
cur.execute(f"""
CREATE TABLE IF NOT EXISTS documents (
id SERIAL PRIMARY KEY,
content TEXT NOT NULL,
source VARCHAR(255),
chunk_id INTEGER,
metadata JSONB,
embedding vector({self.dimension})
)
""")
# 建立 IVFFlat 索引(加速搜尋)
cur.execute(f"""
CREATE INDEX IF NOT EXISTS documents_embedding_idx
ON documents
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100)
""")
self.conn.commit()
def add(self, chunks: list, embeddings: np.ndarray):
with self.conn.cursor() as cur:
for chunk, embedding in zip(chunks, embeddings):
cur.execute(
"""INSERT INTO documents
(content, source, chunk_id, metadata, embedding)
VALUES (%s, %s, %s, %s, %s)""",
(
chunk.content,
chunk.source,
chunk.chunk_id,
json.dumps(chunk.metadata, ensure_ascii=False),
embedding.tolist(),
)
)
self.conn.commit()
def search(self, query_embedding: np.ndarray, top_k: int = 5) -> list[dict]:
with self.conn.cursor() as cur:
cur.execute(
"""SELECT content, source, chunk_id, metadata,
1 - (embedding <=> %s) AS similarity
FROM documents
ORDER BY embedding <=> %s
LIMIT %s""",
(query_embedding.tolist(), query_embedding.tolist(), top_k)
)
results = []
for row in cur.fetchall():
results.append({
"content": row[0],
"source": row[1],
"chunk_id": row[2],
"metadata": row[3],
"score": float(row[4]),
})
return results
Step 4:LLM 整合回答
import requests
class RAGSystem:
"""完整的 RAG 系統"""
def __init__(self, embedder, vector_store, llm_model="llama3.1"):
self.embedder = embedder
self.vector_store = vector_store
self.llm_model = llm_model
def index_documents(self, file_paths: list[str]):
"""索引文件"""
chunker = DocumentChunker(chunk_size=500, chunk_overlap=100)
for file_path in file_paths:
print(f"正在索引: {file_path}")
chunks = chunker.chunk_document(file_path)
# 生成 embeddings
texts = [chunk.content for chunk in chunks]
embeddings = self.embedder.embed(texts)
# 存入向量資料庫
self.vector_store.add(chunks, embeddings)
print(f"索引完成,共 {len(file_paths)} 個文件")
def query(self, question: str, top_k: int = 5) -> str:
"""查詢"""
# 1. 把問題轉成 embedding
query_embedding = self.embedder.embed_single(question)
# 2. 搜尋相關片段
results = self.vector_store.search(query_embedding, top_k=top_k)
# 3. 組裝 prompt
context = self._build_context(results)
prompt = self._build_prompt(question, context)
# 4. 送給 LLM 生成回答
answer = self._generate(prompt)
return answer
def _build_context(self, results: list[dict]) -> str:
"""組裝上下文"""
context_parts = []
for i, r in enumerate(results):
doc = r.get("document", r)
content = doc.get("content", doc) if isinstance(doc, dict) else str(doc)
source = doc.get("source", "unknown") if isinstance(doc, dict) else "unknown"
score = r.get("score", 0)
context_parts.append(
f"[片段 {i+1}] (來源: {source}, 相關度: {score:.3f})\n{content}"
)
return "\n\n---\n\n".join(context_parts)
def _build_prompt(self, question: str, context: str) -> str:
"""組裝 prompt"""
return f"""根據以下參考資料回答問題。如果參考資料中沒有相關資訊,請明確告知。
回答時請引用資料來源。用繁體中文回答。
參考資料
{context}
問題
{question}
回答
"""
def _generate(self, prompt: str) -> str:
"""呼叫 LLM 生成回答"""
response = requests.post(
"http://localhost:11434/api/generate",
json={
"model": self.llm_model,
"prompt": prompt,
"stream": False,
"options": {
"temperature": 0.3, # RAG 用低溫度,避免幻覺
"num_predict": 1024,
}
}
)
return response.json()["response"]
# 完整使用範例
if __name__ == "__main__":
# 初始化元件
embedder = EmbeddingGenerator("all-MiniLM-L6-v2")
store = SimpleVectorStore(dimension=384)
# 建立 RAG 系統
rag = RAGSystem(embedder, store)
# 索引文件
rag.index_documents([
"notes/docker_guide.md",
"notes/kubernetes_notes.md",
"notes/python_best_practices.md",
])
# 查詢
answer = rag.query("Docker 和虛擬機有什麼差別?")
print(answer)
品質優化技巧
1. Hybrid Search(關鍵字 + 語意)
def hybrid_search(self, question: str, top_k: int = 5, alpha: float = 0.7):
"""混合搜尋:結合向量搜尋和關鍵字搜尋"""
# 向量搜尋
vector_results = self.vector_store.search(
self.embedder.embed_single(question), top_k=top_k * 2
)
# 關鍵字搜尋(簡易版用 BM25)
keyword_results = self.keyword_search(question, top_k=top_k * 2)
# 合併分數(Reciprocal Rank Fusion)
combined = {}
for rank, r in enumerate(vector_results):
doc_id = r["chunk_id"]
combined[doc_id] = combined.get(doc_id, 0) + alpha / (rank + 60)
for rank, r in enumerate(keyword_results):
doc_id = r["chunk_id"]
combined[doc_id] = combined.get(doc_id, 0) + (1 - alpha) / (rank + 60)
# 排序
sorted_ids = sorted(combined, key=combined.get, reverse=True)[:top_k]
return [self.get_document(doc_id) for doc_id in sorted_ids]
2. Re-ranking
def rerank(self, question: str, candidates: list[dict], top_k: int = 3):
"""用 Cross-Encoder 重新排序"""
from sentence_transformers import CrossEncoder
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
pairs = [(question, c["content"]) for c in candidates]
scores = reranker.predict(pairs)
for i, score in enumerate(scores):
candidates[i]["rerank_score"] = float(score)
candidates.sort(key=lambda x: x["rerank_score"], reverse=True)
return candidates[:top_k]
小結
RAG 系統的核心概念其實很簡單:把文件切片、轉向量、搜尋、塞進 prompt。但魔鬼藏在細節裡 — 切片的粒度、embedding 模型的選擇、prompt 的設計、這些都會大幅影響最終的回答品質。
我的建議是:
- 先用最簡單的方案跑起來(NumPy 向量搜尋 + Ollama)
- 觀察失敗的 case(什麼問題答不好?為什麼?)
- 針對性優化(切片太大?embedding 不夠好?需要 re-ranking?)
- 不要過度工程化(很多情況下 simple RAG 就夠用了)
延伸閱讀:
- 向量資料庫比較:pgvector vs Milvus vs Qdrant vs Chroma
- Embedding 模型排行榜:MTEB Leaderboard
- RAG 進階技術:HyDE、Multi-Query、FLARE
- LangChain / LlamaIndex 的 RAG 實作(如果你偏好用框架)