Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os, pickle, time | |
| import numpy as np | |
| # try FAISS, else fallback to numpy search | |
| try: | |
| import faiss | |
| FAISS_AVAILABLE = True | |
| except Exception as e: | |
| print("FAISS import failed:", e) | |
| FAISS_AVAILABLE = False | |
| from sentence_transformers import SentenceTransformer | |
| MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2" | |
| print("Loading embedding model:", MODEL_NAME) | |
| model = SentenceTransformer(MODEL_NAME) | |
| EMB_DIM = model.get_sentence_embedding_dimension() | |
| print("Model loaded. Embedding dim:", EMB_DIM) | |
| INDEX_FILE = "vector_store.index" | |
| TEXT_FILE = "texts.pkl" | |
| def load_index(): | |
| """Load index and texts; if absent create new.""" | |
| if FAISS_AVAILABLE and os.path.exists(INDEX_FILE) and os.path.exists(TEXT_FILE): | |
| try: | |
| index = faiss.read_index(INDEX_FILE) | |
| with open(TEXT_FILE, "rb") as f: | |
| texts = pickle.load(f) | |
| print("Loaded FAISS index and texts.") | |
| return index, texts | |
| except Exception as e: | |
| print("Failed to load FAISS index:", e) | |
| if (not FAISS_AVAILABLE) and os.path.exists(TEXT_FILE): | |
| with open(TEXT_FILE, "rb") as f: | |
| texts = pickle.load(f) | |
| print("Loaded texts (no FAISS).") | |
| return None, texts | |
| # create new | |
| if FAISS_AVAILABLE: | |
| index = faiss.IndexFlatIP(EMB_DIM) | |
| print("Created new FAISS IndexFlatIP.") | |
| else: | |
| index = None | |
| print("FAISS not available; using numpy fallback.") | |
| texts = [] | |
| return index, texts | |
| def save_index(index, texts): | |
| """Save index (if faiss) and texts.""" | |
| if FAISS_AVAILABLE and index is not None: | |
| try: | |
| faiss.write_index(index, INDEX_FILE) | |
| print("FAISS index saved.") | |
| except Exception as e: | |
| print("Error saving FAISS index:", e) | |
| try: | |
| with open(TEXT_FILE, "wb") as f: | |
| pickle.dump(texts, f) | |
| print("Texts saved.") | |
| except Exception as e: | |
| print("Error saving texts:", e) | |
| def embed_texts(docs): | |
| """Return normalized float32 embeddings (2D). Robust to single inputs and object arrays.""" | |
| # get embeddings from model (may return list or np.array) | |
| emb = model.encode(docs, normalize_embeddings=True) | |
| # If returned as a single vector (1D), wrap it | |
| if isinstance(emb, np.ndarray): | |
| if emb.ndim == 1: | |
| emb = emb.reshape(1, -1) | |
| elif emb.dtype == object: | |
| emb = np.vstack([np.array(e, dtype=np.float32).reshape(1, -1) for e in emb]) | |
| else: | |
| # likely a list of lists | |
| emb = np.array(emb, dtype=np.float32) | |
| if emb.ndim == 1: | |
| emb = emb.reshape(1, -1) | |
| # ensure float32 and contiguous | |
| emb = np.array(emb, dtype=np.float32) | |
| if emb.ndim == 1: | |
| emb = emb.reshape(1, -1) | |
| return emb | |
| def add_text(docs_raw): | |
| docs = [d.strip() for d in docs_raw.split("\n") if d.strip()] | |
| if not docs: | |
| return "⚠️ No text detected. Paste lines or paragraphs (one per line)." | |
| index, texts = load_index() | |
| try: | |
| embeddings = embed_texts(docs) | |
| print("Embeddings shape:", embeddings.shape, "dtype:", embeddings.dtype) | |
| if FAISS_AVAILABLE and index is not None: | |
| # if index empty but dimension mismatch, recreate safely | |
| if index.ntotal == 0: | |
| try: | |
| idx_dim = index.d if hasattr(index, 'd') else EMB_DIM | |
| except Exception: | |
| idx_dim = EMB_DIM | |
| if idx_dim != EMB_DIM: | |
| index = faiss.IndexFlatIP(EMB_DIM) | |
| index.add(embeddings) | |
| texts.extend(docs) | |
| save_index(index, texts) | |
| return f"✅ Added {len(docs)} snippet(s) to memory (FAISS)." | |
| else: | |
| # numpy fallback: append to texts and save | |
| texts.extend(docs) | |
| save_index(None, texts) | |
| return f"✅ Added {len(docs)} snippet(s) to memory (numpy fallback)." | |
| except Exception as e: | |
| print("Error in add_text:", e) | |
| return f"❌ Error adding text: {e}" | |
| def search(query, top_k=5): | |
| index, texts = load_index() | |
| if texts is None or len(texts) == 0: | |
| return "🪣 Memory empty — add some snippets first." | |
| try: | |
| q_emb = embed_texts([query]) | |
| print("Query emb shape:", q_emb.shape) | |
| if FAISS_AVAILABLE and index is not None: | |
| scores, ids = index.search(q_emb, top_k) | |
| results = [] | |
| for i, s in zip(ids[0], scores[0]): | |
| if i < len(texts): | |
| results.append(f"• {texts[i]} (score {round(float(s),3)})") | |
| return "\n\n".join(results) | |
| else: | |
| # numpy cosine sim (since embeddings are normalized, dot product == cosine) | |
| all_embs = model.encode(texts, normalize_embeddings=True) | |
| all_embs = np.array(all_embs, dtype=np.float32) | |
| if all_embs.ndim == 1: | |
| all_embs = all_embs.reshape(1, -1) | |
| sims = np.dot(all_embs, q_emb[0]) # shape (n,) | |
| top_idxs = np.argsort(-sims)[:top_k] | |
| results = [] | |
| for idx in top_idxs: | |
| results.append(f"• {texts[idx]} (score {round(float(sims[idx]),3)})") | |
| return "\n\n".join(results) | |
| except Exception as e: | |
| print("Error in search:", e) | |
| return f"❌ Search error: {e}" | |
| with gr.Blocks(title="LifeSync Lite") as demo: | |
| gr.Markdown("## 🧠 LifeSync Lite — Promptless Search\nPaste notes (one per line) under Add. Then ask a query in Search.") | |
| with gr.Tab("Add"): | |
| docs_box = gr.Textbox(lines=6, placeholder="Paste lines or paragraphs (one per line).") | |
| add_btn = gr.Button("Add to Memory") | |
| add_out = gr.Textbox(label="Status") | |
| add_btn.click(add_text, inputs=[docs_box], outputs=[add_out]) | |
| with gr.Tab("Search"): | |
| query_box = gr.Textbox(lines=2, placeholder="Ask something like: 'passport' or 'AI idea'") | |
| search_btn = gr.Button("Search Memory") | |
| search_out = gr.Textbox(label="Results") | |
| search_btn.click(search, inputs=[query_box], outputs=[search_out]) | |
| demo.launch() | |