pradeep4321's picture
Update src/app.py
f9fb326 verified
raw
history blame
9.73 kB
import streamlit as st
import pandas as pd
import numpy as np
import os
import re
import json
import faiss
import nltk
from sklearn.feature_extraction.text import TfidfVectorizer
from sentence_transformers import SentenceTransformer
from rank_bm25 import BM25Okapi
from rapidfuzz import fuzz
from nltk.corpus import wordnet
# ==============================
# INITIAL SETUP
# ==============================
nltk.download('wordnet', quiet=True)
LOG_FILE = "user_logs.csv"
# ==============================
# LOGGING FUNCTION
# ==============================
def log_activity(user, action, query, search_type):
log_entry = {
"User": user,
"Action": action,
"Query": query,
"Search Type": search_type,
"Time": str(pd.Timestamp.now())
}
try:
if os.path.exists(LOG_FILE):
df_log = pd.read_csv(LOG_FILE)
df_log = pd.concat([df_log, pd.DataFrame([log_entry])], ignore_index=True)
else:
df_log = pd.DataFrame([log_entry])
df_log.to_csv(LOG_FILE, index=False)
except:
pass
# ==============================
# AUTHENTICATION
# ==============================
def login():
st.title("๐Ÿ” Advanced Multi Search")
users_json = os.environ.get("USERS") or st.secrets.get("USERS")
# โœ… FIX 1: Empty check
if not users_json or str(users_json).strip() == "":
st.error("โš ๏ธ USERS not configured in Hugging Face secrets!")
st.stop()
# โœ… FIX 2: JSON validation
try:
users = json.loads(users_json)
except Exception:
st.error("โŒ Invalid USERS JSON format!")
st.code(users_json)
st.stop()
username = st.text_input("Username")
password = st.text_input("Password", type="password")
if st.button("Login"):
if username in users and users[username]["password"] == password:
st.session_state["authenticated"] = True
st.session_state["user"] = username
st.session_state["role"] = users[username]["role"]
st.session_state["login_time"] = pd.Timestamp.now()
log_activity(username, "Login Success", "-", "-")
st.rerun()
else:
log_activity(username, "Login Failed", "-", "-")
st.error("โŒ Invalid credentials")
# Session control
if "authenticated" not in st.session_state:
st.session_state["authenticated"] = False
if not st.session_state["authenticated"]:
login()
st.stop()
# ==============================
# UI
# ==============================
st.set_page_config(page_title="Multi Search Engine", layout="wide")
st.title("๐Ÿ” Advanced Multi-Search Product Engine")
st.sidebar.success(f"๐Ÿ‘ค {st.session_state['user']}")
st.sidebar.info(f"Role: {st.session_state['role']}")
if st.sidebar.button("๐Ÿšช Logout"):
log_activity(st.session_state["user"], "Logout", "-", "-")
st.session_state.clear()
st.rerun()
# ==============================
# LOAD MODEL
# ==============================
@st.cache_resource
def load_model():
return SentenceTransformer('all-MiniLM-L6-v2', device='cpu')
model = load_model()
# ==============================
# LOAD DATA
# ==============================
@st.cache_data
def load_data():
path = "src/products_10k.csv"
if not os.path.exists(path):
st.error("Dataset not found!")
return None
df = pd.read_csv(path)
df["combined"] = (
df["product_name"].fillna("") + " " +
df["category"].fillna("") + " " +
df["brand"].fillna("") + " " +
df["description"].fillna("")
)
return df
df = load_data()
if df is None:
st.stop()
# ==============================
# DATA PREVIEW
# ==============================
st.subheader("๐Ÿ“„ Data Preview")
rows = st.selectbox("Rows to view", [10, 20, 50, 100])
st.dataframe(df.head(rows))
products = df["combined"].tolist()
# ==============================
# PREPROCESS
# ==============================
@st.cache_resource
def preprocess(products):
tfidf = TfidfVectorizer()
tfidf_matrix = tfidf.fit_transform(products)
embeddings = model.encode(products, show_progress_bar=False)
faiss.normalize_L2(embeddings)
index = faiss.IndexFlatIP(embeddings.shape[1])
index.add(np.array(embeddings))
bm25 = BM25Okapi([p.lower().split() for p in products])
return tfidf, tfidf_matrix, embeddings, index, bm25
tfidf, tf_matrix, embs, faiss_index, bm25 = preprocess(products)
# ==============================
# SYNONYMS
# ==============================
def get_synonyms(word):
synonyms = set()
for syn in wordnet.synsets(word):
for lemma in syn.lemmas():
synonyms.add(lemma.name())
return list(synonyms)
# ==============================
# SEARCH ENGINE (15 TYPES)
# ==============================
def search_engine(query, mode, top_k):
if mode == "Keyword":
return [(i, 1) for i, p in enumerate(products) if query.lower() in p.lower()]
elif mode == "Regex":
return [(i, 1) for i, p in enumerate(products) if re.search(query, p, re.IGNORECASE)]
elif mode == "Boolean":
if "AND" in query:
terms = query.split("AND")
return [(i, 1) for i, p in enumerate(products)
if all(t.strip().lower() in p.lower() for t in terms)]
elif "OR" in query:
terms = query.split("OR")
return [(i, 1) for i, p in enumerate(products)
if any(t.strip().lower() in p.lower() for t in terms)]
return []
elif mode == "Fuzzy":
return sorted([(i, fuzz.ratio(query, p)) for i, p in enumerate(products)],
key=lambda x: x[1], reverse=True)
elif mode == "N-Gram":
return [(i, 1) for i, p in enumerate(products)
if any(query.lower() in w for w in p.lower().split())]
elif mode == "Prefix":
return [(i, 1) for i, p in enumerate(products)
if any(w.startswith(query.lower()) for w in p.lower().split())]
elif mode == "Suffix":
return [(i, 1) for i, p in enumerate(products)
if any(w.endswith(query.lower()) for w in p.lower().split())]
elif mode == "TF-IDF":
scores = (tf_matrix @ tfidf.transform([query]).T).toarray().flatten()
return list(enumerate(scores))
elif mode == "BM25":
return list(enumerate(bm25.get_scores(query.lower().split())))
elif mode == "Semantic":
q_emb = model.encode([query])
faiss.normalize_L2(q_emb)
scores = np.dot(embs, q_emb.T).flatten()
return list(enumerate(scores))
elif mode == "FAISS":
q_emb = model.encode([query])
faiss.normalize_L2(q_emb)
D, I = faiss_index.search(np.array(q_emb), top_k)
return [(i, float(D[0][idx])) for idx, i in enumerate(I[0])]
elif mode == "Hybrid":
tfidf_s = dict(search_engine(query, "TF-IDF", top_k))
sem_s = dict(search_engine(query, "Semantic", top_k))
return [(i, tfidf_s.get(i, 0) + sem_s.get(i, 0)) for i in range(len(products))]
elif mode == "Query Expansion":
expanded = query.split()
for w in query.split():
expanded += get_synonyms(w)
return search_engine(" ".join(expanded), "TF-IDF", top_k)
elif mode == "Weighted Hybrid":
tfidf_s = dict(search_engine(query, "TF-IDF", top_k))
sem_s = dict(search_engine(query, "Semantic", top_k))
bm25_s = dict(search_engine(query, "BM25", top_k))
return [(i,
0.4 * tfidf_s.get(i, 0) +
0.4 * sem_s.get(i, 0) +
0.2 * bm25_s.get(i, 0))
for i in range(len(products))]
elif mode == "Ensemble":
tfidf_s = np.array([s for _, s in search_engine(query, "TF-IDF", top_k)])
sem_s = np.array([s for _, s in search_engine(query, "Semantic", top_k)])
bm25_s = np.array([s for _, s in search_engine(query, "BM25", top_k)])
combined = (
tfidf_s / (np.max(tfidf_s) + 1e-6) +
sem_s / (np.max(sem_s) + 1e-6) +
bm25_s / (np.max(bm25_s) + 1e-6)
)
return list(enumerate(combined))
return []
# ==============================
# UI SEARCH
# ==============================
search_types = [
"Keyword","Regex","Boolean","Fuzzy","N-Gram","Prefix","Suffix",
"TF-IDF","BM25","Semantic","FAISS","Hybrid",
"Query Expansion","Weighted Hybrid","Ensemble"
]
search_type = st.selectbox("๐Ÿ”Ž Search Type", search_types)
query = st.text_input("Enter query")
top_k = st.slider("Top Results", 5, 50, 10)
if st.button("Search"):
if not query:
st.warning("Enter query")
else:
results = search_engine(query, search_type, top_k)
results = sorted(results, key=lambda x: x[1], reverse=True)[:top_k]
log_activity(st.session_state["user"], "Search", query, search_type)
idx = [i for i, _ in results if i != -1]
scores = [round(s, 4) for i, s in results if i != -1]
if idx:
out = df.iloc[idx].copy()
out["Score"] = scores
st.dataframe(out.drop(columns=["combined"]), use_container_width=True)
else:
st.info("No results found")
# ==============================
# ADMIN LOG VIEW
# ==============================
if st.session_state["role"] == "admin":
st.sidebar.subheader("๐Ÿ“Š Activity Logs")
if os.path.exists(LOG_FILE):
log_df = pd.read_csv(LOG_FILE)
st.sidebar.dataframe(log_df.tail(10))
with open(LOG_FILE, "rb") as f:
st.sidebar.download_button("โฌ‡ Download Logs", f, file_name="logs.csv")
else:
st.sidebar.write("No logs yet")