File size: 6,804 Bytes
3506c42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import os
import json
import faiss
import numpy as np
from shared_utilities import chunk_text, validate_chunk_sizes, generate_embeddings_batch
import asyncio
import logging

logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)

# ---------- Config ----------
TOP_K = 5

PERSIST_DIR = "/persistent/faiss_store"
if not os.path.exists("/persistent"):  # fallback if running locally
    PERSIST_DIR = "./faiss_store"
os.makedirs(PERSIST_DIR, exist_ok=True)

INDEX_PATH = os.path.join(PERSIST_DIR, "store.index")
META_PATH = os.path.join(PERSIST_DIR, "store.json")

# OpenAI text-embedding-3-small dimension (used for FAISS)
OPENAI_EMBEDDING_DIM = 1536


def _normalize_embeddings(embeddings: np.ndarray) -> np.ndarray:
    """Normalize embeddings for cosine similarity (inner product in FAISS)."""
    norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
    return (embeddings / np.maximum(norms, 1e-9)).astype("float32")


# ---------- FAISS Vector Store ----------
class FaissStore:
    def __init__(self, dim):
        self.index = faiss.IndexFlatIP(dim)  # Inner product for cosine similarity (normalized vectors)
        self.metadatas = []

    def add(self, vectors: np.ndarray, metadatas):
        self.index.add(vectors)
        self.metadatas.extend(metadatas)

    def search(self, q_vec: np.ndarray, k=TOP_K):
        if self.index.ntotal == 0:
            return []
        D, I = self.index.search(q_vec, k)
        results = []
        for score, idx in zip(D[0], I[0]):
            if idx < 0:
                continue
            results.append((float(score), self.metadatas[idx]))
        return results

    def save(self, index_path, meta_path):
        faiss.write_index(self.index, index_path)
        with open(meta_path, "w", encoding="utf-8") as f:
            json.dump(self.metadatas, f, ensure_ascii=False, indent=2)

    @classmethod
    def load(cls, index_path, meta_path):
        index = faiss.read_index(index_path)
        with open(meta_path, "r", encoding="utf-8") as f:
            metadatas = json.load(f)
        store = cls(index.d)
        store.index = index
        store.metadatas = metadatas
        return store

# ---------- Global store ----------
faiss_stores = {}  # Dictionary to store multiple FAISS indices by method
VECTOR_DIM = None

def get_index_paths(method_suffix=""):
    """Get index paths based on method suffix"""
    if method_suffix:
        index_path = os.path.join(PERSIST_DIR, f"store{method_suffix}.index")
        meta_path = os.path.join(PERSIST_DIR, f"store{method_suffix}.json")
    else:
        index_path = INDEX_PATH
        meta_path = META_PATH
    return index_path, meta_path


# ---------- RAG Builder ----------
async def create_rag_from_text_selfhosted(extracted_text: str, source_info: str, progress_callback=None, method_suffix=""):
    global faiss_stores, VECTOR_DIM

    # Get index paths based on method suffix
    index_path, meta_path = get_index_paths(method_suffix)

    if progress_callback:
        await progress_callback("📄 Chunking text into segments...")

    chunks = chunk_text(extracted_text)
    chunks = validate_chunk_sizes(chunks, max_tokens=8000)

    if progress_callback:
        await progress_callback(f"🔢 Creating {len(chunks)} embeddings (OpenAI)...")

    # Create embeddings using OpenAI text-embedding-3-small
    raw_embeddings = await generate_embeddings_batch(chunks, progress_callback)
    embeddings = np.array(raw_embeddings, dtype=np.float32)
    embeddings = _normalize_embeddings(embeddings)

    VECTOR_DIM = embeddings.shape[1]
    
    # Create or get store for this method
    if method_suffix not in faiss_stores:
        faiss_stores[method_suffix] = FaissStore(VECTOR_DIM)
    faiss_store = faiss_stores[method_suffix]

    metas = [{"id": i, "text": c, "source": source_info} for i, c in enumerate(chunks)]
    faiss_store.add(embeddings, metas)

    # Save to persistent storage
    faiss_store.save(index_path, meta_path)

    if progress_callback:
        await progress_callback(f"✅ Indexed {len(chunks)} chunks and saved to {PERSIST_DIR}")
        logger.info(f"Indexed {len(chunks)} chunks and saved to {PERSIST_DIR}") 
    return {
        "status": "success",
        "message": f"✅ Indexed {len(chunks)} chunks and saved to {PERSIST_DIR}. Ready for queries.",
        "vector_index": f"faiss{method_suffix}"
    }

# ---------- RAG Search (Self-hosted) ----------
async def search_rag_documents_selfhosted(query: str, top_k: int = 5, method_suffix: str = "") -> list:
    """

    Search for relevant document chunks using the locally stored FAISS index.

    Returns a list of dicts with content, score, and metadata.

    

    Args:

        query (str): Search query

        top_k (int): Number of results to return

        method_suffix (str): Optional suffix to append to index name (e.g., "_hybrid", "_regular")

    """
    try:
        global faiss_stores, VECTOR_DIM

        # Get index paths based on method suffix
        index_path, meta_path = get_index_paths(method_suffix)

        # Load FAISS index + metadata if not already loaded
        if method_suffix not in faiss_stores or faiss_stores[method_suffix].index.ntotal == 0:
            if os.path.exists(index_path) and os.path.exists(meta_path):
                faiss_stores[method_suffix] = FaissStore.load(index_path, meta_path)
                VECTOR_DIM = faiss_stores[method_suffix].index.d
                logger.info(f"Loaded FAISS store with {faiss_stores[method_suffix].index.ntotal} vectors.")
            else:
                logger.warning(f"No FAISS store found for method '{method_suffix}'. Please create RAG index first.")
                return []
        
        faiss_store = faiss_stores[method_suffix]

        # Generate query embedding using OpenAI
        raw_query = await generate_embeddings_batch([query])
        q_vec = np.array([raw_query[0]], dtype=np.float32)
        q_vec = _normalize_embeddings(q_vec)  # shape: (1, dim)

        # Search for top_k similar chunks
        results = faiss_store.search(q_vec, k=top_k)

        formatted_results = []
        for score, meta in results:
            formatted_results.append({
                "score": float(score),
                "content": meta.get("text", ""),
                "source": meta.get("source", "selfhosted"),
                "chunk_index": meta.get("id", None),
                "title": meta.get("title", None)
            })

        return formatted_results

    except Exception as e:
        logger.error(f"Error searching self-hosted RAG documents: {e}")
        return []