File size: 7,423 Bytes
521f25e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
"""
VECTOR STORE SERVICE MODULE
===========================

This service builds and queries the FAISS vector index used for context retrieval.
Learning data (database/learning_data/*.txt) and past chats (database/chats_data/*.json)
are loaded at startup, split into chunks, embedded with HuggingFace, and stored in FAISS.
When the user ask a question we embed it and retrieve and k most similar chunks; only
those chunks are sent to the LLM, so token usage is bounded.

LIFECYCLE:
  - create_vector_store(): Load all .txt and .json, chunk, embed, build FAISS, save to disk.
    Called once at startup. Restart the server after adding new .txt files so they are included.
  - get_retriever(k): Return a retriever that fetches k nearest chunks for a query string. 
  - save_vector_store(): Write the current FAISS index to database/vector_store/ (called after create).

Embeddings run locally (sentence-transformers); no extra API key. Groq and Realtime services
call get_retriever() for every request to get context.  
"""

import json
import logging
from pathlib import Path
from typing import List, Optional

from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings 
from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document

from config import (
    LEARNING_DATA_DIR,
    CHATS_DATA_DIR,
    VECTOR_STORE_DIR,
    EMBEDDING_MODEL,
    CHUNK_SIZE,
    CHUNK_OVERLAP,
)


logger = logging.getLogger("J.A.R.V.I.S")


# =========================================================
# VECTOR STORE SERVICE CLASS
# =========================================================

class VectorStoreService:
    """
    Builds a FAISS index from learning_data .txt files and chats_data .json files,
    and provides a retriever to fetch the k most relevant chunks for a query.
    """

    def __init__(self):
        """Create the embedding model (local) and text splitter; vector_store is set in create_vector_store()."""
        # Embeddings run locally (no API key); used to convert text into vectors for similarity search.
        self.embeddings = HuggingFaceEmbeddings(
            model_name=EMBEDDING_MODEL,
            model_kwargs={"device":"cpu"},
        )

        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=CHUNK_SIZE,
            chunk_overlap=CHUNK_OVERLAP,
        )

        self.vector_store: Optional[FAISS] = None
        self._retriever_cache: dict = {}

        # ----------------------------------------------------------------------
        # LoAD DOcUMENTS FROM DISK
        # ----------------------------------------------------------------------

    def load_learning_data(self) -> List[Document]:
        """Read all .text files in database/learning_data/ and return one Document per file (content + source name). """
        
        documents = []
        for file_path in list(LEARNING_DATA_DIR.glob("*.txt")):
            try:
                with open(file_path, "r", encoding="utf-8") as f:
                    content = f.read().strip()
                    if content:
                        documents.append(Document(page_content=content, metadata={"source": str(file_path.name)}))
                        logger.info("[VECTOR] Loaded learning data: %s (%s chars)", file_path.name, len(content))
            except Exception as e:
                logger.warning("Could not load learning data file %s: %s", file_path, e)
        logger.info("[VECTOR] Total learning data files loaded: %d", len(documents))
        return documents

    def load_chat_history(self) -> List[Document]:
        """load all .json files in database/chats_data/; turn each into one Document (User:/Assistant: lines)."""
        
        documents = []
        for file_path in sorted(CHATS_DATA_DIR.glob("*.json")):
            try:
                with open(file_path, "r", encoding="utf-8") as f:
                    chat_data = json.load(f)

                messages = chat_data.get("messages", [])
                # Format as "User: ..." / "Assistant: ..." so the retriever can match past conversations.
                chat_content = "\n".join([
                    f"User: {msg.get('content', '')}"if msg.get('role') == 'user'
                    else f"Assistant: {msg.get('content', '')}"
                    for msg in messages
                ])
                if chat_content.strip():
                    documents.append(Document(page_content=chat_content, metadata={"source": f"chat_{file_path.stem}"}))
                    logger.info("[VECTOR] Loaded chat history: %s (%d messages)", file_path.name, len(messages))
            except Exception as e:
                logger.warning("Could not load chat history file %s: %s", file_path, e)
        logger.info("[VECTOR] Total chat history files loaded: %d", len(documents))
        return documents

    # -------------------------------------------------------
    # BUILD AND SAVE FAISS INDEX
    # -------------------------------------------------------

    def create_vector_store(self) -> FAISS:
        """
        Load learning_data + chats_data, embed, build FAISS index, save to disk.
        Called once at startup. If there are no documents we create a tiny placeholder index.
        """
        learning_docs = self.load_learning_data()
        chat_docs = self.load_chat_history()
        all_documents = learning_docs + chat_docs
        logger.info("[VECTOR] Total documents to index: %d (learning: %d, chat:%d)",
                     len(all_documents), len(learning_docs), len(chat_docs))

        if not all_documents:
            # Placeholder so get_retriever() never fails; return this single chunk for any query.
            self.vector_store = FAISS.from_texts(["No data available yet."], self.embeddings)
            logger.info("[VECTOR] No douments found, created placeholder index")

        else:
            chunks = self.text_splitter.split_documents(all_documents)
            logger.info("[VECTOR] Split into %d chunks (chunk_size=%d, overlap=%d)",
                         len(chunks), CHUNK_SIZE, CHUNK_OVERLAP)


            self.vector_store = FAISS.from_documents(chunks, self.embeddings)
            logger.info("[VECTOR] FAISS index build successfully with %d vectors", len(chunks))

        self._retriever_cache.clear()
        self.save_vector_store()
        return self.vector_store

    def save_vector_store(self):
        """Write the current FAISS index to database/vector_store/. On error we only log."""
        if self.vector_store:
            try:
                self.vector_store.save_local(str(VECTOR_STORE_DIR))
            except Exception as e:
                logger.error("failed to save vector store to disk: %s", e)

    # ---------------------------------------------------------------------------
    # RETRIEVER FOR CONTEXT
    # ---------------------------------------------------------------------------

    def get_retriever(self, k: int = 10):
        """Return a retriever that returns that k most similar chunks for a query string."""
        if not self.vector_store:
            raise RuntimeError("Vector store not initialized. This should not happen.")
        if k not in self._retriever_cache:
            self._retriever_cache[k] = self.vector_store.as_retriever(search_kwargs={"k": k})

        return self._retriever_cache[k]