import streamlit as st import pandas as pd import numpy as np import os import re import json import faiss import nltk from sklearn.feature_extraction.text import TfidfVectorizer from sentence_transformers import SentenceTransformer from rank_bm25 import BM25Okapi from rapidfuzz import fuzz from nltk.corpus import wordnet # ============================== # INITIAL SETUP # ============================== nltk.download('wordnet', quiet=True) LOG_FILE = "user_logs.csv" # ============================== # LOGGING FUNCTION # ============================== def log_activity(user, action, query, search_type): log_entry = { "User": user, "Action": action, "Query": query, "Search Type": search_type, "Time": str(pd.Timestamp.now()) } try: if os.path.exists(LOG_FILE): df_log = pd.read_csv(LOG_FILE) df_log = pd.concat([df_log, pd.DataFrame([log_entry])], ignore_index=True) else: df_log = pd.DataFrame([log_entry]) df_log.to_csv(LOG_FILE, index=False) except: pass # ============================== # AUTHENTICATION # ============================== def login(): st.title("🔐 Advanced Multi Search") users_json = os.environ.get("USERS") or st.secrets.get("USERS") # ✅ FIX 1: Empty check if not users_json or str(users_json).strip() == "": st.error("⚠️ USERS not configured in Hugging Face secrets!") st.stop() # ✅ FIX 2: JSON validation try: users = json.loads(users_json) except Exception: st.error("❌ Invalid USERS JSON format!") st.code(users_json) st.stop() username = st.text_input("Username") password = st.text_input("Password", type="password") if st.button("Login"): if username in users and users[username]["password"] == password: st.session_state["authenticated"] = True st.session_state["user"] = username st.session_state["role"] = users[username]["role"] st.session_state["login_time"] = pd.Timestamp.now() log_activity(username, "Login Success", "-", "-") st.rerun() else: log_activity(username, "Login Failed", "-", "-") st.error("❌ Invalid credentials") # Session control if "authenticated" not in st.session_state: st.session_state["authenticated"] = False if not st.session_state["authenticated"]: login() st.stop() # ============================== # UI # ============================== st.set_page_config(page_title="Multi Search Engine", layout="wide") st.title("🔍 Advanced Multi-Search Product Engine") st.sidebar.success(f"👤 {st.session_state['user']}") st.sidebar.info(f"Role: {st.session_state['role']}") if st.sidebar.button("🚪 Logout"): log_activity(st.session_state["user"], "Logout", "-", "-") st.session_state.clear() st.rerun() # ============================== # LOAD MODEL # ============================== @st.cache_resource def load_model(): return SentenceTransformer('all-MiniLM-L6-v2', device='cpu') model = load_model() # ============================== # LOAD DATA # ============================== @st.cache_data def load_data(): path = "src/products_10k.csv" if not os.path.exists(path): st.error("Dataset not found!") return None df = pd.read_csv(path) df["combined"] = ( df["product_name"].fillna("") + " " + df["category"].fillna("") + " " + df["brand"].fillna("") + " " + df["description"].fillna("") ) return df df = load_data() if df is None: st.stop() # ============================== # DATA PREVIEW # ============================== st.subheader("📄 Data Preview") rows = st.selectbox("Rows to view", [10, 20, 50, 100]) st.dataframe(df.head(rows)) products = df["combined"].tolist() # ============================== # PREPROCESS # ============================== @st.cache_resource def preprocess(products): tfidf = TfidfVectorizer() tfidf_matrix = tfidf.fit_transform(products) embeddings = model.encode(products, show_progress_bar=False) faiss.normalize_L2(embeddings) index = faiss.IndexFlatIP(embeddings.shape[1]) index.add(np.array(embeddings)) bm25 = BM25Okapi([p.lower().split() for p in products]) return tfidf, tfidf_matrix, embeddings, index, bm25 tfidf, tf_matrix, embs, faiss_index, bm25 = preprocess(products) # ============================== # SYNONYMS # ============================== def get_synonyms(word): synonyms = set() for syn in wordnet.synsets(word): for lemma in syn.lemmas(): synonyms.add(lemma.name()) return list(synonyms) # ============================== # SEARCH ENGINE (15 TYPES) # ============================== def search_engine(query, mode, top_k): if mode == "Keyword": return [(i, 1) for i, p in enumerate(products) if query.lower() in p.lower()] elif mode == "Regex": return [(i, 1) for i, p in enumerate(products) if re.search(query, p, re.IGNORECASE)] elif mode == "Boolean": if "AND" in query: terms = query.split("AND") return [(i, 1) for i, p in enumerate(products) if all(t.strip().lower() in p.lower() for t in terms)] elif "OR" in query: terms = query.split("OR") return [(i, 1) for i, p in enumerate(products) if any(t.strip().lower() in p.lower() for t in terms)] return [] elif mode == "Fuzzy": return sorted([(i, fuzz.ratio(query, p)) for i, p in enumerate(products)], key=lambda x: x[1], reverse=True) elif mode == "N-Gram": return [(i, 1) for i, p in enumerate(products) if any(query.lower() in w for w in p.lower().split())] elif mode == "Prefix": return [(i, 1) for i, p in enumerate(products) if any(w.startswith(query.lower()) for w in p.lower().split())] elif mode == "Suffix": return [(i, 1) for i, p in enumerate(products) if any(w.endswith(query.lower()) for w in p.lower().split())] elif mode == "TF-IDF": scores = (tf_matrix @ tfidf.transform([query]).T).toarray().flatten() return list(enumerate(scores)) elif mode == "BM25": return list(enumerate(bm25.get_scores(query.lower().split()))) elif mode == "Semantic": q_emb = model.encode([query]) faiss.normalize_L2(q_emb) scores = np.dot(embs, q_emb.T).flatten() return list(enumerate(scores)) elif mode == "FAISS": q_emb = model.encode([query]) faiss.normalize_L2(q_emb) D, I = faiss_index.search(np.array(q_emb), top_k) return [(i, float(D[0][idx])) for idx, i in enumerate(I[0])] elif mode == "Hybrid": tfidf_s = dict(search_engine(query, "TF-IDF", top_k)) sem_s = dict(search_engine(query, "Semantic", top_k)) return [(i, tfidf_s.get(i, 0) + sem_s.get(i, 0)) for i in range(len(products))] elif mode == "Query Expansion": expanded = query.split() for w in query.split(): expanded += get_synonyms(w) return search_engine(" ".join(expanded), "TF-IDF", top_k) elif mode == "Weighted Hybrid": tfidf_s = dict(search_engine(query, "TF-IDF", top_k)) sem_s = dict(search_engine(query, "Semantic", top_k)) bm25_s = dict(search_engine(query, "BM25", top_k)) return [(i, 0.4 * tfidf_s.get(i, 0) + 0.4 * sem_s.get(i, 0) + 0.2 * bm25_s.get(i, 0)) for i in range(len(products))] elif mode == "Ensemble": tfidf_s = np.array([s for _, s in search_engine(query, "TF-IDF", top_k)]) sem_s = np.array([s for _, s in search_engine(query, "Semantic", top_k)]) bm25_s = np.array([s for _, s in search_engine(query, "BM25", top_k)]) combined = ( tfidf_s / (np.max(tfidf_s) + 1e-6) + sem_s / (np.max(sem_s) + 1e-6) + bm25_s / (np.max(bm25_s) + 1e-6) ) return list(enumerate(combined)) return [] # ============================== # UI SEARCH # ============================== search_types = [ "Keyword","Regex","Boolean","Fuzzy","N-Gram","Prefix","Suffix", "TF-IDF","BM25","Semantic","FAISS","Hybrid", "Query Expansion","Weighted Hybrid","Ensemble" ] search_type = st.selectbox("🔎 Search Type", search_types) query = st.text_input("Enter query") top_k = st.slider("Top Results", 5, 50, 10) if st.button("Search"): if not query: st.warning("Enter query") else: results = search_engine(query, search_type, top_k) results = sorted(results, key=lambda x: x[1], reverse=True)[:top_k] log_activity(st.session_state["user"], "Search", query, search_type) idx = [i for i, _ in results if i != -1] scores = [round(s, 4) for i, s in results if i != -1] if idx: out = df.iloc[idx].copy() out["Score"] = scores st.dataframe(out.drop(columns=["combined"]), use_container_width=True) else: st.info("No results found") # ============================== # ADMIN LOG VIEW # ============================== if st.session_state["role"] == "admin": st.sidebar.subheader("📊 Activity Logs") if os.path.exists(LOG_FILE): log_df = pd.read_csv(LOG_FILE) st.sidebar.dataframe(log_df.tail(10)) with open(LOG_FILE, "rb") as f: st.sidebar.download_button("⬇ Download Logs", f, file_name="logs.csv") else: st.sidebar.write("No logs yet")