Spaces:
Running
Running
File size: 7,423 Bytes
03e8305 f175d26 03e8305 f175d26 03e8305 f175d26 03e8305 f175d26 03e8305 f175d26 03e8305 f175d26 03e8305 f175d26 03e8305 f175d26 03e8305 f175d26 03e8305 f175d26 03e8305 f175d26 03e8305 f175d26 03e8305 f175d26 03e8305 f175d26 03e8305 f175d26 03e8305 f175d26 03e8305 f175d26 03e8305 f175d26 03e8305 f175d26 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 | """
VECTOR STORE SERVICE MODULE
===========================
This service builds and queries the FAISS vector index used for context retrieval.
Learning data (database/learning_data/*.txt) and past chats (database/chats_data/*.json)
are loaded at startup, split into chunks, embedded with HuggingFace, and stored in FAISS.
When the user ask a question we embed it and retrieve and k most similar chunks; only
those chunks are sent to the LLM, so token usage is bounded.
LIFECYCLE:
- create_vector_store(): Load all .txt and .json, chunk, embed, build FAISS, save to disk.
Called once at startup. Restart the server after adding new .txt files so they are included.
- get_retriever(k): Return a retriever that fetches k nearest chunks for a query string.
- save_vector_store(): Write the current FAISS index to database/vector_store/ (called after create).
Embeddings run locally (sentence-transformers); no extra API key. Groq and Realtime services
call get_retriever() for every request to get context.
"""
import json
import logging
from pathlib import Path
from typing import List, Optional
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document
from config import (
LEARNING_DATA_DIR,
CHATS_DATA_DIR,
VECTOR_STORE_DIR,
EMBEDDING_MODEL,
CHUNK_SIZE,
CHUNK_OVERLAP,
)
logger = logging.getLogger("J.A.R.V.I.S")
# =========================================================
# VECTOR STORE SERVICE CLASS
# =========================================================
class VectorStoreService:
"""
Builds a FAISS index from learning_data .txt files and chats_data .json files,
and provides a retriever to fetch the k most relevant chunks for a query.
"""
def __init__(self):
"""Create the embedding model (local) and text splitter; vector_store is set in create_vector_store()."""
# Embeddings run locally (no API key); used to convert text into vectors for similarity search.
self.embeddings = HuggingFaceEmbeddings(
model_name=EMBEDDING_MODEL,
model_kwargs={"device":"cpu"},
)
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=CHUNK_OVERLAP,
)
self.vector_store: Optional[FAISS] = None
self._retriever_cache: dict = {}
# ----------------------------------------------------------------------
# LoAD DOcUMENTS FROM DISK
# ----------------------------------------------------------------------
def load_learning_data(self) -> List[Document]:
"""Read all .text files in database/learning_data/ and return one Document per file (content + source name). """
documents = []
for file_path in list(LEARNING_DATA_DIR.glob("*.txt")):
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read().strip()
if content:
documents.append(Document(page_content=content, metadata={"source": str(file_path.name)}))
logger.info("[VECTOR] Loaded learning data: %s (%s chars)", file_path.name, len(content))
except Exception as e:
logger.warning("Could not load learning data file %s: %s", file_path, e)
logger.info("[VECTOR] Total learning data files loaded: %d", len(documents))
return documents
def load_chat_history(self) -> List[Document]:
"""load all .json files in database/chats_data/; turn each into one Document (User:/Assistant: lines)."""
documents = []
for file_path in sorted(CHATS_DATA_DIR.glob("*.json")):
try:
with open(file_path, "r", encoding="utf-8") as f:
chat_data = json.load(f)
messages = chat_data.get("messages", [])
# Format as "User: ..." / "Assistant: ..." so the retriever can match past conversations.
chat_content = "\n".join([
f"User: {msg.get('content', '')}"if msg.get('role') == 'user'
else f"Assistant: {msg.get('content', '')}"
for msg in messages
])
if chat_content.strip():
documents.append(Document(page_content=chat_content, metadata={"source": f"chat_{file_path.stem}"}))
logger.info("[VECTOR] Loaded chat history: %s (%d messages)", file_path.name, len(messages))
except Exception as e:
logger.warning("Could not load chat history file %s: %s", file_path, e)
logger.info("[VECTOR] Total chat history files loaded: %d", len(documents))
return documents
# -------------------------------------------------------
# BUILD AND SAVE FAISS INDEX
# -------------------------------------------------------
def create_vector_store(self) -> FAISS:
"""
Load learning_data + chats_data, embed, build FAISS index, save to disk.
Called once at startup. If there are no documents we create a tiny placeholder index.
"""
learning_docs = self.load_learning_data()
chat_docs = self.load_chat_history()
all_documents = learning_docs + chat_docs
logger.info("[VECTOR] Total documents to index: %d (learning: %d, chat:%d)",
len(all_documents), len(learning_docs), len(chat_docs))
if not all_documents:
# Placeholder so get_retriever() never fails; return this single chunk for any query.
self.vector_store = FAISS.from_texts(["No data available yet."], self.embeddings)
logger.info("[VECTOR] No douments found, created placeholder index")
else:
chunks = self.text_splitter.split_documents(all_documents)
logger.info("[VECTOR] Split into %d chunks (chunk_size=%d, overlap=%d)",
len(chunks), CHUNK_SIZE, CHUNK_OVERLAP)
self.vector_store = FAISS.from_documents(chunks, self.embeddings)
logger.info("[VECTOR] FAISS index build successfully with %d vectors", len(chunks))
self._retriever_cache.clear()
self.save_vector_store()
return self.vector_store
def save_vector_store(self):
"""Write the current FAISS index to database/vector_store/. On error we only log."""
if self.vector_store:
try:
self.vector_store.save_local(str(VECTOR_STORE_DIR))
except Exception as e:
logger.error("failed to save vector store to disk: %s", e)
# ---------------------------------------------------------------------------
# RETRIEVER FOR CONTEXT
# ---------------------------------------------------------------------------
def get_retriever(self, k: int = 10):
"""Return a retriever that returns that k most similar chunks for a query string."""
if not self.vector_store:
raise RuntimeError("Vector store not initialized. This should not happen.")
if k not in self._retriever_cache:
self._retriever_cache[k] = self.vector_store.as_retriever(search_kwargs={"k": k})
return self._retriever_cache[k]
|