File size: 5,081 Bytes
402298d
 
 
 
 
 
 
 
 
 
 
 
8fb6d95
402298d
 
 
 
 
 
 
 
 
 
 
 
 
 
8fb6d95
 
 
 
402298d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""Faiss vector store service (replaces Qdrant)"""
from typing import List, Dict, Any, Optional
from app.config import settings
from app.utils.logger import setup_logger

import os
import json
import faiss
import numpy as np

logger = setup_logger(__name__)


def _normalize(vectors: np.ndarray) -> np.ndarray:
    """L2-normalize vectors so inner product equals cosine similarity."""
    norms = np.linalg.norm(vectors, axis=1, keepdims=True) + 1e-12
    return vectors / norms

class VectorStoreService:
    """
    Manages a Faiss index + sidecar payload store.
    - Index: Faiss IndexFlatIP (cosine via normalization)
    - Payloads: JSON list aligned to vector IDs
    - Persistence: saves/loads index + payloads from disk
    """

    def __init__(self):

        print("FAISS_INDEX_PATH:", settings.FAISS_INDEX_PATH)
        print("Working directory:", os.getcwd())

        self.index: Optional[faiss.Index] = None
        self.dimension: Optional[int] = None
        self.payloads: List[Dict[str, Any]] = []
        self.index_path = settings.FAISS_INDEX_PATH
        self.payloads_path = settings.FAISS_PAYLOADS_PATH

        self._load_if_exists()

    # ---------- Persistence ----------

    def _load_if_exists(self):
        """Load index + payloads if the files exist."""
        if os.path.exists(self.index_path) and os.path.exists(self.payloads_path):
            try:
                self.index = faiss.read_index(self.index_path)
                self.dimension = self.index.d  # type: ignore[attr-defined]
                with open(self.payloads_path, "r", encoding="utf-8") as f:
                    self.payloads = json.load(f)
                logger.info(
                    f"Loaded Faiss index ({self.dimension}d) with {self.index.ntotal} vectors"  # type: ignore
                )
            except Exception as e:
                logger.error(f"Failed to load Faiss store; starting fresh. Error: {e}")
                self.index = None
                self.payloads = []
                self.dimension = None

    def _save(self):
        """Persist index + payloads to disk."""
        if self.index is not None:
            faiss.write_index(self.index, self.index_path)
        with open(self.payloads_path, "w", encoding="utf-8") as f:
            json.dump(self.payloads, f, ensure_ascii=False)

    # ---------- Collection lifecycle ----------

    def create_collection(self, vector_size: int):
        """
        (Re)create a fresh Faiss index (cosine via normalized vectors).
        WARNING: This clears existing data.
        """
        self.dimension = vector_size
        self.index = faiss.IndexFlatIP(vector_size)  # inner product
        self.payloads = []
        self._save()
        logger.info(f"Created Faiss collection: dim={vector_size}")

    # ---------- Upsert/Search ----------

    def upsert_vectors(
        self,
        vectors: List[List[float]],
        payloads: List[Dict[str, Any]]
    ) -> int:
        """Insert vectors with metadata (IDs are implicit by order)."""
        if self.index is None:
            raise RuntimeError("Faiss index is not initialized. Call create_collection first.")

        arr = np.array(vectors, dtype="float32")
        arr = _normalize(arr)
        self.index.add(arr)  # type: ignore
        self.payloads.extend(payloads)

        self._save()
        logger.info(f"Upserted {len(vectors)} vectors into Faiss")
        return len(vectors)

    def search(
        self,
        query_vector: List[float],
        limit: int = 5,
        score_threshold: float = 0.0
    ) -> List[Dict[str, Any]]:
        """Search similar vectors via inner product (cosine)."""
        if self.index is None or self.index.ntotal == 0:  # type: ignore
            return []

        q = np.array([query_vector], dtype="float32")
        q = _normalize(q)
        scores, indices = self.index.search(q, limit)  # type: ignore
        scores = scores[0].tolist()
        indices = indices[0].tolist()

        results: List[Dict[str, Any]] = []
        for score, idx in zip(scores, indices):
            if idx == -1:
                continue
            if score < score_threshold:
                continue
            payload = self.payloads[idx] if 0 <= idx < len(self.payloads) else {}
            results.append({
                "id": idx,
                "score": float(score),
                "payload": payload
            })
        return results

    # ---------- Introspection/Access ----------

    def get_collection_info(self) -> Dict[str, Any]:
        count = int(self.index.ntotal) if self.index is not None else 0  # type: ignore
        return {
            "vectors_count": count,
            "status": "ready" if count >= 0 else "uninitialized"
        }

    def get_all_payloads(self) -> List[Dict[str, Any]]:
        """Return all payloads (used by metrics)."""
        return list(self.payloads)

    def get_payloads_sample(self, limit: int = 100) -> List[Dict[str, Any]]:
        return self.payloads[:limit]

# Global instance
vector_store = VectorStoreService()