File size: 6,188 Bytes
94ceda1
043de71
 
 
 
 
 
 
 
 
 
 
 
94ceda1
b947b1a
043de71
b947b1a
043de71
 
94ceda1
b947b1a
 
 
 
043de71
 
 
 
 
 
 
 
 
 
 
 
b947b1a
 
043de71
 
 
 
 
 
 
b947b1a
043de71
 
 
b947b1a
 
 
043de71
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b947b1a
043de71
 
 
 
b947b1a
043de71
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b947b1a
043de71
b947b1a
043de71
b947b1a
043de71
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b947b1a
 
043de71
b947b1a
 
043de71
b947b1a
 
 
 
 
043de71
b947b1a
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import gradio as gr
import os, pickle, time
import numpy as np

# try FAISS, else fallback to numpy search
try:
    import faiss
    FAISS_AVAILABLE = True
except Exception as e:
    print("FAISS import failed:", e)
    FAISS_AVAILABLE = False

from sentence_transformers import SentenceTransformer

MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
print("Loading embedding model:", MODEL_NAME)
model = SentenceTransformer(MODEL_NAME)
EMB_DIM = model.get_sentence_embedding_dimension()
print("Model loaded. Embedding dim:", EMB_DIM)

INDEX_FILE = "vector_store.index"
TEXT_FILE = "texts.pkl"

def load_index():
    """Load index and texts; if absent create new."""
    if FAISS_AVAILABLE and os.path.exists(INDEX_FILE) and os.path.exists(TEXT_FILE):
        try:
            index = faiss.read_index(INDEX_FILE)
            with open(TEXT_FILE, "rb") as f:
                texts = pickle.load(f)
            print("Loaded FAISS index and texts.")
            return index, texts
        except Exception as e:
            print("Failed to load FAISS index:", e)

    if (not FAISS_AVAILABLE) and os.path.exists(TEXT_FILE):
        with open(TEXT_FILE, "rb") as f:
            texts = pickle.load(f)
        print("Loaded texts (no FAISS).")
        return None, texts

    # create new
    if FAISS_AVAILABLE:
        index = faiss.IndexFlatIP(EMB_DIM)
        print("Created new FAISS IndexFlatIP.")
    else:
        index = None
        print("FAISS not available; using numpy fallback.")
    texts = []
    return index, texts

def save_index(index, texts):
    """Save index (if faiss) and texts."""
    if FAISS_AVAILABLE and index is not None:
        try:
            faiss.write_index(index, INDEX_FILE)
            print("FAISS index saved.")
        except Exception as e:
            print("Error saving FAISS index:", e)
    try:
        with open(TEXT_FILE, "wb") as f:
            pickle.dump(texts, f)
        print("Texts saved.")
    except Exception as e:
        print("Error saving texts:", e)

def embed_texts(docs):
    """Return normalized float32 embeddings (2D). Robust to single inputs and object arrays."""
    # get embeddings from model (may return list or np.array)
    emb = model.encode(docs, normalize_embeddings=True)
    # If returned as a single vector (1D), wrap it
    if isinstance(emb, np.ndarray):
        if emb.ndim == 1:
            emb = emb.reshape(1, -1)
        elif emb.dtype == object:
            emb = np.vstack([np.array(e, dtype=np.float32).reshape(1, -1) for e in emb])
    else:
        # likely a list of lists
        emb = np.array(emb, dtype=np.float32)
        if emb.ndim == 1:
            emb = emb.reshape(1, -1)
    # ensure float32 and contiguous
    emb = np.array(emb, dtype=np.float32)
    if emb.ndim == 1:
        emb = emb.reshape(1, -1)
    return emb

def add_text(docs_raw):
    docs = [d.strip() for d in docs_raw.split("\n") if d.strip()]
    if not docs:
        return "⚠️ No text detected. Paste lines or paragraphs (one per line)."
    index, texts = load_index()
    try:
        embeddings = embed_texts(docs)
        print("Embeddings shape:", embeddings.shape, "dtype:", embeddings.dtype)
        if FAISS_AVAILABLE and index is not None:
            # if index empty but dimension mismatch, recreate safely
            if index.ntotal == 0:
                try:
                    idx_dim = index.d if hasattr(index, 'd') else EMB_DIM
                except Exception:
                    idx_dim = EMB_DIM
                if idx_dim != EMB_DIM:
                    index = faiss.IndexFlatIP(EMB_DIM)
            index.add(embeddings)
            texts.extend(docs)
            save_index(index, texts)
            return f"✅ Added {len(docs)} snippet(s) to memory (FAISS)."
        else:
            # numpy fallback: append to texts and save
            texts.extend(docs)
            save_index(None, texts)
            return f"✅ Added {len(docs)} snippet(s) to memory (numpy fallback)."
    except Exception as e:
        print("Error in add_text:", e)
        return f"❌ Error adding text: {e}"

def search(query, top_k=5):
    index, texts = load_index()
    if texts is None or len(texts) == 0:
        return "🪣 Memory empty — add some snippets first."
    try:
        q_emb = embed_texts([query])
        print("Query emb shape:", q_emb.shape)
        if FAISS_AVAILABLE and index is not None:
            scores, ids = index.search(q_emb, top_k)
            results = []
            for i, s in zip(ids[0], scores[0]):
                if i < len(texts):
                    results.append(f"• {texts[i]}  (score {round(float(s),3)})")
            return "\n\n".join(results)
        else:
            # numpy cosine sim (since embeddings are normalized, dot product == cosine)
            all_embs = model.encode(texts, normalize_embeddings=True)
            all_embs = np.array(all_embs, dtype=np.float32)
            if all_embs.ndim == 1:
                all_embs = all_embs.reshape(1, -1)
            sims = np.dot(all_embs, q_emb[0])  # shape (n,)
            top_idxs = np.argsort(-sims)[:top_k]
            results = []
            for idx in top_idxs:
                results.append(f"• {texts[idx]}  (score {round(float(sims[idx]),3)})")
            return "\n\n".join(results)
    except Exception as e:
        print("Error in search:", e)
        return f"❌ Search error: {e}"

with gr.Blocks(title="LifeSync Lite") as demo:
    gr.Markdown("## 🧠 LifeSync Lite — Promptless Search\nPaste notes (one per line) under Add. Then ask a query in Search.")

    with gr.Tab("Add"):
        docs_box = gr.Textbox(lines=6, placeholder="Paste lines or paragraphs (one per line).")
        add_btn = gr.Button("Add to Memory")
        add_out = gr.Textbox(label="Status")
        add_btn.click(add_text, inputs=[docs_box], outputs=[add_out])

    with gr.Tab("Search"):
        query_box = gr.Textbox(lines=2, placeholder="Ask something like: 'passport' or 'AI idea'")
        search_btn = gr.Button("Search Memory")
        search_out = gr.Textbox(label="Results")
        search_btn.click(search, inputs=[query_box], outputs=[search_out])

demo.launch()