Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| import os | |
| import re | |
| import json | |
| import faiss | |
| import nltk | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sentence_transformers import SentenceTransformer | |
| from rank_bm25 import BM25Okapi | |
| from rapidfuzz import fuzz | |
| from nltk.corpus import wordnet | |
| # ============================== | |
| # INITIAL SETUP | |
| # ============================== | |
| nltk.download('wordnet', quiet=True) | |
| LOG_FILE = "user_logs.csv" | |
| # ============================== | |
| # LOGGING FUNCTION | |
| # ============================== | |
| def log_activity(user, action, query, search_type): | |
| log_entry = { | |
| "User": user, | |
| "Action": action, | |
| "Query": query, | |
| "Search Type": search_type, | |
| "Time": str(pd.Timestamp.now()) | |
| } | |
| try: | |
| if os.path.exists(LOG_FILE): | |
| df_log = pd.read_csv(LOG_FILE) | |
| df_log = pd.concat([df_log, pd.DataFrame([log_entry])], ignore_index=True) | |
| else: | |
| df_log = pd.DataFrame([log_entry]) | |
| df_log.to_csv(LOG_FILE, index=False) | |
| except: | |
| pass | |
| # ============================== | |
| # AUTHENTICATION | |
| # ============================== | |
| def login(): | |
| st.title("๐ Advanced Multi Search") | |
| users_json = os.environ.get("USERS") or st.secrets.get("USERS") | |
| # โ FIX 1: Empty check | |
| if not users_json or str(users_json).strip() == "": | |
| st.error("โ ๏ธ USERS not configured in Hugging Face secrets!") | |
| st.stop() | |
| # โ FIX 2: JSON validation | |
| try: | |
| users = json.loads(users_json) | |
| except Exception: | |
| st.error("โ Invalid USERS JSON format!") | |
| st.code(users_json) | |
| st.stop() | |
| username = st.text_input("Username") | |
| password = st.text_input("Password", type="password") | |
| if st.button("Login"): | |
| if username in users and users[username]["password"] == password: | |
| st.session_state["authenticated"] = True | |
| st.session_state["user"] = username | |
| st.session_state["role"] = users[username]["role"] | |
| st.session_state["login_time"] = pd.Timestamp.now() | |
| log_activity(username, "Login Success", "-", "-") | |
| st.rerun() | |
| else: | |
| log_activity(username, "Login Failed", "-", "-") | |
| st.error("โ Invalid credentials") | |
| # Session control | |
| if "authenticated" not in st.session_state: | |
| st.session_state["authenticated"] = False | |
| if not st.session_state["authenticated"]: | |
| login() | |
| st.stop() | |
| # ============================== | |
| # UI | |
| # ============================== | |
| st.set_page_config(page_title="Multi Search Engine", layout="wide") | |
| st.title("๐ Advanced Multi-Search Product Engine") | |
| st.sidebar.success(f"๐ค {st.session_state['user']}") | |
| st.sidebar.info(f"Role: {st.session_state['role']}") | |
| if st.sidebar.button("๐ช Logout"): | |
| log_activity(st.session_state["user"], "Logout", "-", "-") | |
| st.session_state.clear() | |
| st.rerun() | |
| # ============================== | |
| # LOAD MODEL | |
| # ============================== | |
| def load_model(): | |
| return SentenceTransformer('all-MiniLM-L6-v2', device='cpu') | |
| model = load_model() | |
| # ============================== | |
| # LOAD DATA | |
| # ============================== | |
| def load_data(): | |
| path = "src/products_10k.csv" | |
| if not os.path.exists(path): | |
| st.error("Dataset not found!") | |
| return None | |
| df = pd.read_csv(path) | |
| df["combined"] = ( | |
| df["product_name"].fillna("") + " " + | |
| df["category"].fillna("") + " " + | |
| df["brand"].fillna("") + " " + | |
| df["description"].fillna("") | |
| ) | |
| return df | |
| df = load_data() | |
| if df is None: | |
| st.stop() | |
| # ============================== | |
| # DATA PREVIEW | |
| # ============================== | |
| st.subheader("๐ Data Preview") | |
| rows = st.selectbox("Rows to view", [10, 20, 50, 100]) | |
| st.dataframe(df.head(rows)) | |
| products = df["combined"].tolist() | |
| # ============================== | |
| # PREPROCESS | |
| # ============================== | |
| def preprocess(products): | |
| tfidf = TfidfVectorizer() | |
| tfidf_matrix = tfidf.fit_transform(products) | |
| embeddings = model.encode(products, show_progress_bar=False) | |
| faiss.normalize_L2(embeddings) | |
| index = faiss.IndexFlatIP(embeddings.shape[1]) | |
| index.add(np.array(embeddings)) | |
| bm25 = BM25Okapi([p.lower().split() for p in products]) | |
| return tfidf, tfidf_matrix, embeddings, index, bm25 | |
| tfidf, tf_matrix, embs, faiss_index, bm25 = preprocess(products) | |
| # ============================== | |
| # SYNONYMS | |
| # ============================== | |
| def get_synonyms(word): | |
| synonyms = set() | |
| for syn in wordnet.synsets(word): | |
| for lemma in syn.lemmas(): | |
| synonyms.add(lemma.name()) | |
| return list(synonyms) | |
| # ============================== | |
| # SEARCH ENGINE (15 TYPES) | |
| # ============================== | |
| def search_engine(query, mode, top_k): | |
| if mode == "Keyword": | |
| return [(i, 1) for i, p in enumerate(products) if query.lower() in p.lower()] | |
| elif mode == "Regex": | |
| return [(i, 1) for i, p in enumerate(products) if re.search(query, p, re.IGNORECASE)] | |
| elif mode == "Boolean": | |
| if "AND" in query: | |
| terms = query.split("AND") | |
| return [(i, 1) for i, p in enumerate(products) | |
| if all(t.strip().lower() in p.lower() for t in terms)] | |
| elif "OR" in query: | |
| terms = query.split("OR") | |
| return [(i, 1) for i, p in enumerate(products) | |
| if any(t.strip().lower() in p.lower() for t in terms)] | |
| return [] | |
| elif mode == "Fuzzy": | |
| return sorted([(i, fuzz.ratio(query, p)) for i, p in enumerate(products)], | |
| key=lambda x: x[1], reverse=True) | |
| elif mode == "N-Gram": | |
| return [(i, 1) for i, p in enumerate(products) | |
| if any(query.lower() in w for w in p.lower().split())] | |
| elif mode == "Prefix": | |
| return [(i, 1) for i, p in enumerate(products) | |
| if any(w.startswith(query.lower()) for w in p.lower().split())] | |
| elif mode == "Suffix": | |
| return [(i, 1) for i, p in enumerate(products) | |
| if any(w.endswith(query.lower()) for w in p.lower().split())] | |
| elif mode == "TF-IDF": | |
| scores = (tf_matrix @ tfidf.transform([query]).T).toarray().flatten() | |
| return list(enumerate(scores)) | |
| elif mode == "BM25": | |
| return list(enumerate(bm25.get_scores(query.lower().split()))) | |
| elif mode == "Semantic": | |
| q_emb = model.encode([query]) | |
| faiss.normalize_L2(q_emb) | |
| scores = np.dot(embs, q_emb.T).flatten() | |
| return list(enumerate(scores)) | |
| elif mode == "FAISS": | |
| q_emb = model.encode([query]) | |
| faiss.normalize_L2(q_emb) | |
| D, I = faiss_index.search(np.array(q_emb), top_k) | |
| return [(i, float(D[0][idx])) for idx, i in enumerate(I[0])] | |
| elif mode == "Hybrid": | |
| tfidf_s = dict(search_engine(query, "TF-IDF", top_k)) | |
| sem_s = dict(search_engine(query, "Semantic", top_k)) | |
| return [(i, tfidf_s.get(i, 0) + sem_s.get(i, 0)) for i in range(len(products))] | |
| elif mode == "Query Expansion": | |
| expanded = query.split() | |
| for w in query.split(): | |
| expanded += get_synonyms(w) | |
| return search_engine(" ".join(expanded), "TF-IDF", top_k) | |
| elif mode == "Weighted Hybrid": | |
| tfidf_s = dict(search_engine(query, "TF-IDF", top_k)) | |
| sem_s = dict(search_engine(query, "Semantic", top_k)) | |
| bm25_s = dict(search_engine(query, "BM25", top_k)) | |
| return [(i, | |
| 0.4 * tfidf_s.get(i, 0) + | |
| 0.4 * sem_s.get(i, 0) + | |
| 0.2 * bm25_s.get(i, 0)) | |
| for i in range(len(products))] | |
| elif mode == "Ensemble": | |
| tfidf_s = np.array([s for _, s in search_engine(query, "TF-IDF", top_k)]) | |
| sem_s = np.array([s for _, s in search_engine(query, "Semantic", top_k)]) | |
| bm25_s = np.array([s for _, s in search_engine(query, "BM25", top_k)]) | |
| combined = ( | |
| tfidf_s / (np.max(tfidf_s) + 1e-6) + | |
| sem_s / (np.max(sem_s) + 1e-6) + | |
| bm25_s / (np.max(bm25_s) + 1e-6) | |
| ) | |
| return list(enumerate(combined)) | |
| return [] | |
| # ============================== | |
| # UI SEARCH | |
| # ============================== | |
| search_types = [ | |
| "Keyword","Regex","Boolean","Fuzzy","N-Gram","Prefix","Suffix", | |
| "TF-IDF","BM25","Semantic","FAISS","Hybrid", | |
| "Query Expansion","Weighted Hybrid","Ensemble" | |
| ] | |
| search_type = st.selectbox("๐ Search Type", search_types) | |
| query = st.text_input("Enter query") | |
| top_k = st.slider("Top Results", 5, 50, 10) | |
| if st.button("Search"): | |
| if not query: | |
| st.warning("Enter query") | |
| else: | |
| results = search_engine(query, search_type, top_k) | |
| results = sorted(results, key=lambda x: x[1], reverse=True)[:top_k] | |
| log_activity(st.session_state["user"], "Search", query, search_type) | |
| idx = [i for i, _ in results if i != -1] | |
| scores = [round(s, 4) for i, s in results if i != -1] | |
| if idx: | |
| out = df.iloc[idx].copy() | |
| out["Score"] = scores | |
| st.dataframe(out.drop(columns=["combined"]), use_container_width=True) | |
| else: | |
| st.info("No results found") | |
| # ============================== | |
| # ADMIN LOG VIEW | |
| # ============================== | |
| if st.session_state["role"] == "admin": | |
| st.sidebar.subheader("๐ Activity Logs") | |
| if os.path.exists(LOG_FILE): | |
| log_df = pd.read_csv(LOG_FILE) | |
| st.sidebar.dataframe(log_df.tail(10)) | |
| with open(LOG_FILE, "rb") as f: | |
| st.sidebar.download_button("โฌ Download Logs", f, file_name="logs.csv") | |
| else: | |
| st.sidebar.write("No logs yet") |