import re from datetime import datetime import streamlit as st import os from pymongo import MongoClient, ASCENDING from pymongo.errors import OperationFailure from dotenv import load_dotenv from .password_utils import hash_password, verify_password, verify_legacy_password load_dotenv() EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") def _users_col(): uri = os.getenv("MONGO_URI") dbname = os.getenv("MONGO_DB", "Gen_Ai") coll = os.getenv("USERS_COLLECTION", "users") if not uri: raise RuntimeError("MONGO_URI is not set") client = MongoClient(uri) col = client[dbname][coll] _ensure_email_unique_index(col) return col def _ensure_email_unique_index(col): try: unique_ok, to_drop = False, None for idx in col.list_indexes(): keys = list(idx.get("key", {}).items()) if keys == [("email", 1)]: if idx.get("unique", False): unique_ok = True else: to_drop = idx["name"] break if to_drop: col.drop_index(to_drop) if not unique_ok: col.create_index([("email", ASCENDING)], unique=True) except OperationFailure: pass def login_or_create(email: str, password: str): if not email or not password: return False, "Enter both email and password." email_norm = email.strip().lower() if not EMAIL_RE.match(email_norm): return False, "Enter a valid email address." col = _users_col() user = col.find_one({"email": email_norm}) if user: pwd = user.get("password") if isinstance(pwd, str): if not verify_password(password, pwd): return False, "Invalid credentials." col.update_one({"_id": user["_id"]}, {"$set": {"last_login": datetime.utcnow()}}) st.session_state["uid"] = email_norm return True, email_norm if isinstance(pwd, dict): if verify_legacy_password(password, pwd.get("salt"), pwd.get("hash")): new_hash = hash_password(password) col.update_one( {"_id": user["_id"]}, {"$set": {"password": new_hash, "last_login": datetime.utcnow()}} ) st.session_state["uid"] = email_norm return True, email_norm return False, "Invalid credentials." else: st.warning("Invalid Credentials") return False, "Invalid Credentials" def logout(): st.session_state.pop("uid", None) st.session_state.pop("login_ts", None) def login_gate() -> str: if st.session_state.get("uid"): return st.session_state["uid"] st.markdown("
", unsafe_allow_html=True) c1, c2, c3 = st.columns([1, 1, 1]) with c2: st.markdown("### Log in") with st.form("login_form", clear_on_submit=False): email = st.text_input("Email", key="login_email", placeholder="you@example.com") password = st.text_input("Password", type="password", key="login_pass") submit = st.form_submit_button("Continue",width='stretch') if submit: ok, msg = login_or_create(email, password) if ok: st.session_state["uid"] = msg st.session_state["login_ts"] = datetime.utcnow().isoformat() st.rerun() else: st.error(msg) st.stop()