doc_alive / rag /rag.py
slxhere's picture
Add audio generation
5c9f0d9
# pip install openai faiss-cpu tiktoken numpy
from __future__ import annotations
import os, time, math
from typing import List, Dict, Any
from dataclasses import dataclass
import numpy as np
import faiss
import tiktoken
from openai import OpenAI
import re
# ========= Basic Utilities =========
def l2_normalize(mat: np.ndarray) -> np.ndarray:
"""Row-wise L2 normalize for cosine similarity via inner product."""
norm = np.linalg.norm(mat, axis=1, keepdims=True) + 1e-12
return mat / norm
def batch(iterable, n=128):
"""Yield lists of size n from an iterable (last one may be shorter)."""
buf = []
for x in iterable:
buf.append(x)
if len(buf) >= n:
yield buf
buf = []
if buf:
yield buf
# ========= OpenAI Embeddings RAG =========
@dataclass
class Chunk:
"""A single chunk of the document, with token offsets for traceability."""
id: int
text: str
start_token: int
end_token: int
class OpenAIEmbedRAG:
"""
Retrieval module using OpenAI Embeddings + FAISS (IP over L2-normalized vectors = cosine).
Design notes:
- Single-pass tokenization for the whole document (no repeated encode/decode).
- Chunk.text is ALWAYS a string (never None) to avoid downstream NoneType errors.
- Graceful degradation: empty input => no index; search() returns [].
- Optional MMR re-ranking (diversity) via mmr_search().
"""
def __init__(self,
model: str = "text-embedding-3-small",
chunk_size_tokens: int = 800,
overlap_tokens: int = 100,
batch_size: int = 256,
openai_key=None):
self.client = OpenAI(api_key=openai_key)
self.model = model
self.batch_size = batch_size
self.enc = tiktoken.get_encoding("cl100k_base") # Tokenizer for embedding-3 models
self.chunk_size = max(1, int(chunk_size_tokens))
self.overlap = max(0, int(overlap_tokens))
if self.overlap >= self.chunk_size:
# Ensure forward progress: overlap must be smaller than chunk size
self.overlap = max(0, self.chunk_size // 4)
self._doc_token_ids: List[int] | None = None
self.chunks: List[Chunk] = []
self.index: faiss.IndexFlatIP | None = None
self._emb_dim: int | None = None
self._emb_matrix: np.ndarray | None = None # store chunk embeddings for MMR / analysis
# ---- Text cleaning ----
def _clean_text(self, text: str) -> str:
"""
Light normalization:
- Collapse consecutive whitespace to a single space.
- Remove non-printable control chars (keep \n and \t).
- Trim leading/trailing spaces.
"""
text = re.sub(r"\s+", " ", text or "")
text = "".join(ch for ch in text if ch.isprintable() or ch in "\n\t")
return text.strip()
# ---- Tokenization helpers ----
def _tokenize(self, text: str) -> List[int]:
return self.enc.encode(text)
def _detokenize(self, ids: List[int]) -> str:
return self.enc.decode(ids)
# ---- Chunking (by tokens) ----
# It is possible to use dynamic chunking, however to constraint cost, we use fixed size chunking
def chunk_text(self, text: str) -> List[Chunk]:
"""
Tokenize once and create overlapping windows of token ids.
Each Chunk stores its decoded text and token offsets.
"""
self._doc_token_ids = self._tokenize(text)
total = len(self._doc_token_ids)
chunks: List[Chunk] = []
if total == 0:
return chunks
print(f"[RAG] Total tokens: {total}. Chunk size: {self.chunk_size}, overlap: {self.overlap}")
stride = self.chunk_size - self.overlap
i, cid = 0, 0
while i < total:
j = min(i + self.chunk_size, total)
ids_slice = self._doc_token_ids[i:j]
txt = self._detokenize(ids_slice)
chunks.append(Chunk(id=cid, text=txt, start_token=i, end_token=j))
cid += 1
if j == total:
break
i += stride # always moves forward
return chunks
# ---- OpenAI Embeddings (batched) ----
def _embed_texts(self, texts: List[str], max_retries=3) -> np.ndarray:
"""
Call OpenAI Embeddings with encoding_format='float'.
Returns a float32 matrix with rows aligned to input order.
"""
for attempt in range(max_retries):
try:
resp = self.client.embeddings.create(
model=self.model,
input=texts,
encoding_format="float",
)
vecs = [None] * len(resp.data)
for item in resp.data:
vecs[item.index] = np.array(item.embedding, dtype=np.float32)
arr = np.vstack(vecs)
if self._emb_dim is None:
self._emb_dim = arr.shape[1]
return arr
except Exception as e:
if attempt == max_retries - 1:
raise
# simple exponential backoff
time.sleep(0.8 * (attempt + 1))
# ---- Build FAISS index ----
def build(self, text: str):
"""
Clean -> chunk -> embed -> build an IP index on normalized vectors.
Graceful if text is empty: index remains None and chunks empty.
"""
text = self._clean_text(text)
self.chunks = self.chunk_text(text)
if not self.chunks:
self.index = None
self._emb_matrix = None
return
all_vecs = []
# Embed chunk texts in batches
for chunk_batch in batch([c.text for c in self.chunks], n=self.batch_size):
arr = self._embed_texts(chunk_batch)
all_vecs.append(arr)
mat = np.vstack(all_vecs).astype(np.float32)
mat = l2_normalize(mat)
self._emb_matrix = mat # keep for MMR / diagnostics
self.index = faiss.IndexFlatIP(mat.shape[1])
self.index.add(mat)
# ---- Plain vector search ----
def search(self, query: str, topk: int = 6) -> List[Dict[str, Any]]:
"""
Return top-k chunks by cosine similarity (via IP on normalized vectors).
If the index hasn't been built or the doc is empty, returns [].
"""
if not self.index or not self.chunks:
return []
q = self._clean_text(query)
if not q:
return []
qv = self._embed_texts([q])
qv = l2_normalize(qv)
D, I = self.index.search(qv.astype(np.float32), max(1, int(topk)))
results = []
for rank, idx in enumerate(I[0]):
if idx == -1:
continue
ch = self.chunks[int(idx)]
results.append({
"id": ch.id,
"score": float(D[0][rank]),
"text": ch.text,
"start_token": ch.start_token,
"end_token": ch.end_token
})
return results
# ---- Optional: MMR search (diversified) ----
def mmr_search(self, query: str, topk: int = 6, fetch_k: int | None = None, lambda_mult: float = 0.5) -> List[Dict[str, Any]]:
"""
Maximal Marginal Relevance.
- fetch_k: number of initial candidates to consider (defaults to 4*topk).
- lambda_mult in [0,1]: 1 emphasizes relevance; 0 emphasizes diversity.
"""
if self._emb_matrix is None or not self.chunks:
return []
q = self._clean_text(query)
if not q:
return []
qv = l2_normalize(self._embed_texts([q]))[0] # (d,)
# Precompute query-to-chunk relevance
rel = self._emb_matrix @ qv # (N,)
N = len(self.chunks)
k = max(1, int(topk))
m = min(N, int(fetch_k) if fetch_k else min(N, 4 * k))
# Get top-m by relevance
cand_idx = np.argpartition(-rel, m-1)[:m]
cand_idx = cand_idx[np.argsort(-rel[cand_idx])] # sort by relevance
selected: List[int] = []
selected_set = set()
for _ in range(min(k, m)):
if not selected:
best = int(cand_idx[0])
selected.append(best)
selected_set.add(best)
continue
# Diversity term: max similarity to items already selected
S = self._emb_matrix[selected] # (s, d)
# compute max cosine sim to the selected set for each candidate
# (S @ cand.T) => for each candidate's vector v, max over s rows
cand_vecs = self._emb_matrix[cand_idx] # (m, d)
sims = cand_vecs @ S.T # (m, s)
max_sims = sims.max(axis=1) # (m,)
# MMR objective
scores = lambda_mult * rel[cand_idx] - (1 - lambda_mult) * max_sims
# pick best candidate not yet selected
order = np.argsort(-scores)
for j in order:
idx_j = int(cand_idx[j])
if idx_j not in selected_set:
selected.append(idx_j)
selected_set.add(idx_j)
break
# Format results in the same structure as search()
out = []
for idx in selected:
ch = self.chunks[idx]
out.append({
"id": ch.id,
"score": float(rel[idx]),
"text": ch.text,
"start_token": ch.start_token,
"end_token": ch.end_token
})
return out