Shubham 10000
version 2.0 storage file & requirment for sentence changes
bf11009
import os
import uuid
import pickle
import threading
import logging
import numpy as np
from typing import List, Dict, Optional
from pypdf import PdfReader
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
class VectorIndex:
"""
Robust VectorIndex for HF Inference API embeddings with multiple request shape fallbacks
and optional local sentence-transformers fallback.
Usage:
vi = VectorIndex(storage_dir="/tmp/vector_data", hf_token_env_value=HF_HUB_TOKEN, use_local_fallback=False)
"""
def __init__(
self,
storage_dir: str = "/tmp/vector_data",
embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2",
chunk_size: int = 1000,
chunk_overlap: int = 100,
hf_token_env_value: Optional[str] = None,
use_local_fallback: bool = False,
):
self.storage_dir = storage_dir
os.makedirs(self.storage_dir, exist_ok=True)
self.embedding_model = embedding_model
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.hf_token = hf_token_env_value
self.use_local_fallback = use_local_fallback
self.meta_path = os.path.join(self.storage_dir, "doc_store.pkl")
self.emb_path = os.path.join(self.storage_dir, "embeddings.npy")
self.lock = threading.Lock()
self.doc_store: List[Dict] = []
self.embeddings = None
self._load_persistent()
# ---------------- persistence ---------------- #
def _load_persistent(self):
try:
if os.path.exists(self.meta_path):
with open(self.meta_path, "rb") as f:
self.doc_store = pickle.load(f)
if os.path.exists(self.emb_path):
self.embeddings = np.load(self.emb_path)
if self.embeddings is None:
# default shape if no embeddings yet
self.embeddings = np.zeros((0, 384), dtype=np.float32)
logger.info(f"Loaded store: {len(self.doc_store)} chunks")
except Exception as e:
logger.warning(f"Failed to load persisted store: {e}")
self.doc_store = []
self.embeddings = np.zeros((0, 384), dtype=np.float32)
def _persist(self):
try:
with open(self.meta_path, "wb") as f:
pickle.dump(self.doc_store, f)
if self.embeddings is not None:
np.save(self.emb_path, self.embeddings)
logger.info(f"Persisted {len(self.doc_store)} chunks")
except Exception as e:
logger.error(f"Failed to persist: {e}")
# ---------------- file reading & splitting ---------------- #
def _read_pdf(self, file_path: str) -> str:
try:
reader = PdfReader(file_path)
pages = []
for page in reader.pages:
pages.append(page.extract_text() or "")
return "\n".join(pages)
except Exception as e:
raise RuntimeError(f"Failed to read PDF: {e}")
def _read_txt(self, file_path: str, encoding: str = "utf-8") -> str:
try:
with open(file_path, "r", encoding=encoding, errors="ignore") as f:
return f.read()
except Exception as e:
raise RuntimeError(f"Failed to read TXT: {e}")
def _split_text(self, text: str) -> List[str]:
text = text.replace("\r\n", "\n")
if not text:
return []
chunks = []
start = 0
L = len(text)
while start < L:
end = start + self.chunk_size
chunk = text[start:end]
chunks.append(chunk)
start = end - self.chunk_overlap if (end - self.chunk_overlap) > start else end
return chunks
# ---------------- embeddings via HF Inference API ---------------- #
def _call_hf(self, url: str, headers: dict, payload) -> Dict:
"""
Helper to call HF Inference models endpoint and return (status_code, body).
"""
import requests
resp = requests.post(url, headers=headers, json=payload, timeout=90)
# attempt to parse body
body = None
try:
body = resp.json()
except Exception:
body = resp.text
return {"status": resp.status_code, "body": body, "raw": resp}
def _parse_embedding_response(self, data, expected_len: int) -> List[List[float]]:
"""
Parse known embedding shapes from HF response body into list-of-vectors.
Raises on unexpected formats.
"""
vectors = []
# If the model returned a dict containing embeddings under some key, try to find them
if isinstance(data, dict):
# common key candidates
for key in ("embeddings", "embedding", "vectors", "array"):
if key in data:
data = data[key]
break
if isinstance(data, list):
# Case: list of vectors or list of token vectors per input
# If each item is a list of floats -> direct
if all(isinstance(item, list) and item and all(isinstance(x, (int, float)) for x in item) for item in data):
# Might be list-of-vectors for batch
if len(data) == expected_len:
return [list(map(float, v)) for v in data]
# If returned token vectors for a single input, handle below
# If data is a nested list (token vectors), try averaging per item
# Try to coerce one vector per input
# Heuristic: if len(data) == expected_len and each entry is vector -> done
# If len(data) == 1 and expected_len >1, maybe API returned single vector for first input
# Fallback: if length mismatch but elements are lists of lists (token vectors), average them
out = []
for item in data:
if isinstance(item, list) and item and all(isinstance(x, (int, float)) for x in item):
out.append([float(x) for x in item])
elif isinstance(item, list) and item and isinstance(item[0], list):
arr = np.asarray(item, dtype=np.float32)
if arr.ndim == 2:
out.append(arr.mean(axis=0).tolist())
else:
out.append(arr.flatten().tolist())
else:
# unknown item shape
raise ValueError("Unexpected embedding item format")
if len(out) == expected_len:
return out
# If out length differs, but equals 1 and expected >1, maybe API returned pooled vector for all inputs -> broadcast
if len(out) == 1 and expected_len > 1:
return [out[0] for _ in range(expected_len)]
return out
raise ValueError("Unexpected embedding response format")
def _get_embeddings_api(self, texts: List[str]) -> List[List[float]]:
"""
Robust embedding retrieval that attempts multiple request formats to handle different hosted pipeline types.
Tries:
1) batch inputs: {"inputs": texts}
2) per-text calls: {"inputs": single_text} for each text
3) similarity-style: {"inputs": {"sentences": texts}} or {"inputs": {"sentence": texts}}
If all fail and use_local_fallback=True, tries local sentence-transformers.
Surfaces HF response body in raised errors for debugging.
"""
import requests # local import for runtime environments
model_path = self.embedding_model
url = f"https://api-inference.huggingface.co/models/{model_path}"
headers = {"Content-Type": "application/json"}
if self.hf_token:
headers["Authorization"] = f"Bearer {self.hf_token}"
attempts = []
# Attempt A: batch inputs (most common)
try:
payload = {"inputs": texts}
res = self._call_hf(url, headers, payload)
attempts.append(("batch", res))
if res["status"] < 400:
try:
return self._parse_embedding_response(res["body"], len(texts))
except Exception as e:
# parsing failed; proceed to next attempt
logger.info(f"Batch parse failed: {e}")
except Exception as e:
logger.info(f"Batch request failed: {e}")
# Attempt B: single-item calls (some models only accept single input)
try:
per_vecs = []
ok = True
for t in texts:
payload = {"inputs": t}
res = self._call_hf(url, headers, payload)
attempts.append(("single", res))
if res["status"] >= 400:
ok = False
break
try:
parsed = self._parse_embedding_response(res["body"], 1)
per_vecs.extend(parsed)
except Exception as e:
logger.info(f"Single parse failed for input: {e}")
ok = False
break
if ok and len(per_vecs) == len(texts):
return per_vecs
except Exception as e:
logger.info(f"Single-item requests failed: {e}")
# Attempt C: similarity-style payloads
try:
for key in ("sentences", "sentence", "texts"):
payload = {"inputs": {key: texts}}
res = self._call_hf(url, headers, payload)
attempts.append((f"key:{key}", res))
if res["status"] < 400:
try:
return self._parse_embedding_response(res["body"], len(texts))
except Exception as e:
logger.info(f"Parse after key {key} failed: {e}")
except Exception as e:
logger.info(f"Similarity-key attempts failed: {e}")
# If reached here all HF attempts failed
# Build an informative error showing the attempts and last HF body if available
last_body = None
last_status = None
if attempts:
last_status = attempts[-1][1]["status"]
last_body = attempts[-1][1]["body"]
# Log all attempts for debugging
logger.error("HF embedding attempts failed. Attempts summary:")
for name, res in attempts:
logger.error(f"Attempt '{name}': status={res['status']}, body={res['body']}")
# Optional local fallback
if self.use_local_fallback:
try:
from sentence_transformers import SentenceTransformer
except Exception as imp_err:
raise RuntimeError(
f"Embedding API failed (HF attempts). Last status={last_status}, body={last_body}. "
f"Local fallback requested but sentence-transformers not installed: {imp_err}"
)
try:
local_model_name = model_path.split("sentence-transformers/")[-1]
model = SentenceTransformer(local_model_name)
emb = model.encode(texts, convert_to_numpy=True)
return emb.tolist()
except Exception as local_e:
raise RuntimeError(
f"Embedding API failed (HF attempts). Last status={last_status}, body={last_body}. "
f"Local fallback also failed: {local_e}"
)
# No fallback: raise with HF details
raise RuntimeError(
f"Embedding API failed after multiple request formats. Last status={last_status}, body={last_body}. "
"If you see 403, check HF_HUB_TOKEN and model access. Consider enabling local fallback with sentence-transformers."
)
# ---------------- index operations ---------------- #
def add_file(self, file_path: str, source: str = "user-upload", metadata: dict = None) -> int:
_, ext = os.path.splitext(file_path)
ext = ext.lower()
if ext == ".pdf":
text = self._read_pdf(file_path)
elif ext in (".txt", ".text"):
text = self._read_txt(file_path)
else:
raise ValueError("Unsupported file type. Only PDF and TXT are supported.")
if not text.strip():
raise ValueError("Document is empty or unreadable.")
chunks = self._split_text(text)
if not chunks:
raise ValueError("No chunks produced from document.")
added = 0
batch_size = 16
with self.lock:
for i in range(0, len(chunks), batch_size):
batch = chunks[i : i + batch_size]
vecs = self._get_embeddings_api(batch)
vecs = np.asarray(vecs, dtype=np.float32)
if self.embeddings is None or self.embeddings.size == 0:
self.embeddings = vecs
else:
if vecs.shape[1] != self.embeddings.shape[1]:
raise RuntimeError(
f"Embedding dimension mismatch (existing: {self.embeddings.shape[1]}, new: {vecs.shape[1]})"
)
self.embeddings = np.vstack([self.embeddings, vecs])
for j, chunk in enumerate(batch):
self.doc_store.append({
"chunk_id": str(uuid.uuid4()),
"content": chunk,
"source": source,
"metadata": metadata or {},
"vector_idx": len(self.doc_store),
})
added += len(batch)
self._persist()
logger.info(f"Added {added} chunks from {os.path.basename(file_path)}")
return added
def _cosine_sim(self, a: np.ndarray, b: np.ndarray) -> np.ndarray:
a_norm = a / (np.linalg.norm(a, axis=1, keepdims=True) + 1e-12)
b_norm = b / (np.linalg.norm(b, axis=1, keepdims=True) + 1e-12)
return np.dot(a_norm, b_norm.T)
def search(self, query: str, top_k: int = 3) -> List[Dict]:
if not query or not query.strip():
return []
with self.lock:
if self.embeddings is None or self.embeddings.shape[0] == 0:
return []
q_vecs = self._get_embeddings_api([query])
q_arr = np.asarray(q_vecs, dtype=np.float32)
sims = self._cosine_sim(q_arr, self.embeddings)[0]
top_k = min(top_k, len(sims))
idxs = np.argsort(-sims)[:top_k]
results = []
for idx in idxs:
entry = self.doc_store[idx]
results.append({
"content": entry["content"],
"metadata": entry.get("metadata", {}),
"source": entry.get("source"),
"score": float(sims[idx]),
})
return results
def list_documents(self) -> List[Dict]:
return list(self.doc_store)
def clear(self):
with self.lock:
self.doc_store = []
self.embeddings = np.zeros((0, 384), dtype=np.float32)
self._persist()
def count(self) -> int:
return len(self.doc_store)