| import os |
| import uuid |
| import pickle |
| import threading |
| import logging |
| import numpy as np |
| from typing import List, Dict, Optional |
| from pypdf import PdfReader |
|
|
| logger = logging.getLogger(__name__) |
| logging.basicConfig(level=logging.INFO) |
|
|
|
|
| class VectorIndex: |
| """ |
| Robust VectorIndex for HF Inference API embeddings with multiple request shape fallbacks |
| and optional local sentence-transformers fallback. |
| |
| Usage: |
| vi = VectorIndex(storage_dir="/tmp/vector_data", hf_token_env_value=HF_HUB_TOKEN, use_local_fallback=False) |
| """ |
|
|
| def __init__( |
| self, |
| storage_dir: str = "/tmp/vector_data", |
| embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2", |
| chunk_size: int = 1000, |
| chunk_overlap: int = 100, |
| hf_token_env_value: Optional[str] = None, |
| use_local_fallback: bool = False, |
| ): |
| self.storage_dir = storage_dir |
| os.makedirs(self.storage_dir, exist_ok=True) |
|
|
| self.embedding_model = embedding_model |
| self.chunk_size = chunk_size |
| self.chunk_overlap = chunk_overlap |
| self.hf_token = hf_token_env_value |
| self.use_local_fallback = use_local_fallback |
|
|
| self.meta_path = os.path.join(self.storage_dir, "doc_store.pkl") |
| self.emb_path = os.path.join(self.storage_dir, "embeddings.npy") |
|
|
| self.lock = threading.Lock() |
| self.doc_store: List[Dict] = [] |
| self.embeddings = None |
|
|
| self._load_persistent() |
|
|
| |
| def _load_persistent(self): |
| try: |
| if os.path.exists(self.meta_path): |
| with open(self.meta_path, "rb") as f: |
| self.doc_store = pickle.load(f) |
| if os.path.exists(self.emb_path): |
| self.embeddings = np.load(self.emb_path) |
| if self.embeddings is None: |
| |
| self.embeddings = np.zeros((0, 384), dtype=np.float32) |
| logger.info(f"Loaded store: {len(self.doc_store)} chunks") |
| except Exception as e: |
| logger.warning(f"Failed to load persisted store: {e}") |
| self.doc_store = [] |
| self.embeddings = np.zeros((0, 384), dtype=np.float32) |
|
|
| def _persist(self): |
| try: |
| with open(self.meta_path, "wb") as f: |
| pickle.dump(self.doc_store, f) |
| if self.embeddings is not None: |
| np.save(self.emb_path, self.embeddings) |
| logger.info(f"Persisted {len(self.doc_store)} chunks") |
| except Exception as e: |
| logger.error(f"Failed to persist: {e}") |
|
|
| |
| def _read_pdf(self, file_path: str) -> str: |
| try: |
| reader = PdfReader(file_path) |
| pages = [] |
| for page in reader.pages: |
| pages.append(page.extract_text() or "") |
| return "\n".join(pages) |
| except Exception as e: |
| raise RuntimeError(f"Failed to read PDF: {e}") |
|
|
| def _read_txt(self, file_path: str, encoding: str = "utf-8") -> str: |
| try: |
| with open(file_path, "r", encoding=encoding, errors="ignore") as f: |
| return f.read() |
| except Exception as e: |
| raise RuntimeError(f"Failed to read TXT: {e}") |
|
|
| def _split_text(self, text: str) -> List[str]: |
| text = text.replace("\r\n", "\n") |
| if not text: |
| return [] |
| chunks = [] |
| start = 0 |
| L = len(text) |
| while start < L: |
| end = start + self.chunk_size |
| chunk = text[start:end] |
| chunks.append(chunk) |
| start = end - self.chunk_overlap if (end - self.chunk_overlap) > start else end |
| return chunks |
|
|
| |
| def _call_hf(self, url: str, headers: dict, payload) -> Dict: |
| """ |
| Helper to call HF Inference models endpoint and return (status_code, body). |
| """ |
| import requests |
| resp = requests.post(url, headers=headers, json=payload, timeout=90) |
| |
| body = None |
| try: |
| body = resp.json() |
| except Exception: |
| body = resp.text |
| return {"status": resp.status_code, "body": body, "raw": resp} |
|
|
| def _parse_embedding_response(self, data, expected_len: int) -> List[List[float]]: |
| """ |
| Parse known embedding shapes from HF response body into list-of-vectors. |
| Raises on unexpected formats. |
| """ |
| vectors = [] |
| |
| if isinstance(data, dict): |
| |
| for key in ("embeddings", "embedding", "vectors", "array"): |
| if key in data: |
| data = data[key] |
| break |
|
|
| if isinstance(data, list): |
| |
| |
| if all(isinstance(item, list) and item and all(isinstance(x, (int, float)) for x in item) for item in data): |
| |
| if len(data) == expected_len: |
| return [list(map(float, v)) for v in data] |
| |
| |
| |
| |
| |
| |
| out = [] |
| for item in data: |
| if isinstance(item, list) and item and all(isinstance(x, (int, float)) for x in item): |
| out.append([float(x) for x in item]) |
| elif isinstance(item, list) and item and isinstance(item[0], list): |
| arr = np.asarray(item, dtype=np.float32) |
| if arr.ndim == 2: |
| out.append(arr.mean(axis=0).tolist()) |
| else: |
| out.append(arr.flatten().tolist()) |
| else: |
| |
| raise ValueError("Unexpected embedding item format") |
| if len(out) == expected_len: |
| return out |
| |
| if len(out) == 1 and expected_len > 1: |
| return [out[0] for _ in range(expected_len)] |
| return out |
|
|
| raise ValueError("Unexpected embedding response format") |
|
|
| def _get_embeddings_api(self, texts: List[str]) -> List[List[float]]: |
| """ |
| Robust embedding retrieval that attempts multiple request formats to handle different hosted pipeline types. |
| Tries: |
| 1) batch inputs: {"inputs": texts} |
| 2) per-text calls: {"inputs": single_text} for each text |
| 3) similarity-style: {"inputs": {"sentences": texts}} or {"inputs": {"sentence": texts}} |
| If all fail and use_local_fallback=True, tries local sentence-transformers. |
| Surfaces HF response body in raised errors for debugging. |
| """ |
| import requests |
|
|
| model_path = self.embedding_model |
| url = f"https://api-inference.huggingface.co/models/{model_path}" |
| headers = {"Content-Type": "application/json"} |
| if self.hf_token: |
| headers["Authorization"] = f"Bearer {self.hf_token}" |
|
|
| attempts = [] |
|
|
| |
| try: |
| payload = {"inputs": texts} |
| res = self._call_hf(url, headers, payload) |
| attempts.append(("batch", res)) |
| if res["status"] < 400: |
| try: |
| return self._parse_embedding_response(res["body"], len(texts)) |
| except Exception as e: |
| |
| logger.info(f"Batch parse failed: {e}") |
| except Exception as e: |
| logger.info(f"Batch request failed: {e}") |
|
|
| |
| try: |
| per_vecs = [] |
| ok = True |
| for t in texts: |
| payload = {"inputs": t} |
| res = self._call_hf(url, headers, payload) |
| attempts.append(("single", res)) |
| if res["status"] >= 400: |
| ok = False |
| break |
| try: |
| parsed = self._parse_embedding_response(res["body"], 1) |
| per_vecs.extend(parsed) |
| except Exception as e: |
| logger.info(f"Single parse failed for input: {e}") |
| ok = False |
| break |
| if ok and len(per_vecs) == len(texts): |
| return per_vecs |
| except Exception as e: |
| logger.info(f"Single-item requests failed: {e}") |
|
|
| |
| try: |
| for key in ("sentences", "sentence", "texts"): |
| payload = {"inputs": {key: texts}} |
| res = self._call_hf(url, headers, payload) |
| attempts.append((f"key:{key}", res)) |
| if res["status"] < 400: |
| try: |
| return self._parse_embedding_response(res["body"], len(texts)) |
| except Exception as e: |
| logger.info(f"Parse after key {key} failed: {e}") |
| except Exception as e: |
| logger.info(f"Similarity-key attempts failed: {e}") |
|
|
| |
| |
| last_body = None |
| last_status = None |
| if attempts: |
| last_status = attempts[-1][1]["status"] |
| last_body = attempts[-1][1]["body"] |
| |
| logger.error("HF embedding attempts failed. Attempts summary:") |
| for name, res in attempts: |
| logger.error(f"Attempt '{name}': status={res['status']}, body={res['body']}") |
|
|
| |
| if self.use_local_fallback: |
| try: |
| from sentence_transformers import SentenceTransformer |
| except Exception as imp_err: |
| raise RuntimeError( |
| f"Embedding API failed (HF attempts). Last status={last_status}, body={last_body}. " |
| f"Local fallback requested but sentence-transformers not installed: {imp_err}" |
| ) |
| try: |
| local_model_name = model_path.split("sentence-transformers/")[-1] |
| model = SentenceTransformer(local_model_name) |
| emb = model.encode(texts, convert_to_numpy=True) |
| return emb.tolist() |
| except Exception as local_e: |
| raise RuntimeError( |
| f"Embedding API failed (HF attempts). Last status={last_status}, body={last_body}. " |
| f"Local fallback also failed: {local_e}" |
| ) |
|
|
| |
| raise RuntimeError( |
| f"Embedding API failed after multiple request formats. Last status={last_status}, body={last_body}. " |
| "If you see 403, check HF_HUB_TOKEN and model access. Consider enabling local fallback with sentence-transformers." |
| ) |
|
|
| |
| def add_file(self, file_path: str, source: str = "user-upload", metadata: dict = None) -> int: |
| _, ext = os.path.splitext(file_path) |
| ext = ext.lower() |
| if ext == ".pdf": |
| text = self._read_pdf(file_path) |
| elif ext in (".txt", ".text"): |
| text = self._read_txt(file_path) |
| else: |
| raise ValueError("Unsupported file type. Only PDF and TXT are supported.") |
|
|
| if not text.strip(): |
| raise ValueError("Document is empty or unreadable.") |
|
|
| chunks = self._split_text(text) |
| if not chunks: |
| raise ValueError("No chunks produced from document.") |
|
|
| added = 0 |
| batch_size = 16 |
| with self.lock: |
| for i in range(0, len(chunks), batch_size): |
| batch = chunks[i : i + batch_size] |
| vecs = self._get_embeddings_api(batch) |
| vecs = np.asarray(vecs, dtype=np.float32) |
| if self.embeddings is None or self.embeddings.size == 0: |
| self.embeddings = vecs |
| else: |
| if vecs.shape[1] != self.embeddings.shape[1]: |
| raise RuntimeError( |
| f"Embedding dimension mismatch (existing: {self.embeddings.shape[1]}, new: {vecs.shape[1]})" |
| ) |
| self.embeddings = np.vstack([self.embeddings, vecs]) |
| for j, chunk in enumerate(batch): |
| self.doc_store.append({ |
| "chunk_id": str(uuid.uuid4()), |
| "content": chunk, |
| "source": source, |
| "metadata": metadata or {}, |
| "vector_idx": len(self.doc_store), |
| }) |
| added += len(batch) |
| self._persist() |
| logger.info(f"Added {added} chunks from {os.path.basename(file_path)}") |
| return added |
|
|
| def _cosine_sim(self, a: np.ndarray, b: np.ndarray) -> np.ndarray: |
| a_norm = a / (np.linalg.norm(a, axis=1, keepdims=True) + 1e-12) |
| b_norm = b / (np.linalg.norm(b, axis=1, keepdims=True) + 1e-12) |
| return np.dot(a_norm, b_norm.T) |
|
|
| def search(self, query: str, top_k: int = 3) -> List[Dict]: |
| if not query or not query.strip(): |
| return [] |
| with self.lock: |
| if self.embeddings is None or self.embeddings.shape[0] == 0: |
| return [] |
| q_vecs = self._get_embeddings_api([query]) |
| q_arr = np.asarray(q_vecs, dtype=np.float32) |
| sims = self._cosine_sim(q_arr, self.embeddings)[0] |
| top_k = min(top_k, len(sims)) |
| idxs = np.argsort(-sims)[:top_k] |
| results = [] |
| for idx in idxs: |
| entry = self.doc_store[idx] |
| results.append({ |
| "content": entry["content"], |
| "metadata": entry.get("metadata", {}), |
| "source": entry.get("source"), |
| "score": float(sims[idx]), |
| }) |
| return results |
|
|
| def list_documents(self) -> List[Dict]: |
| return list(self.doc_store) |
|
|
| def clear(self): |
| with self.lock: |
| self.doc_store = [] |
| self.embeddings = np.zeros((0, 384), dtype=np.float32) |
| self._persist() |
|
|
| def count(self) -> int: |
| return len(self.doc_store) |