Spaces:
Running
Running
| import re | |
| from datetime import datetime | |
| import streamlit as st | |
| import os | |
| from pymongo import MongoClient, ASCENDING | |
| from pymongo.errors import OperationFailure | |
| from dotenv import load_dotenv | |
| from .password_utils import hash_password, verify_password, verify_legacy_password | |
| load_dotenv() | |
| EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") | |
| def _users_col(): | |
| uri = os.getenv("MONGO_URI") | |
| dbname = os.getenv("MONGO_DB", "Gen_Ai") | |
| coll = os.getenv("USERS_COLLECTION", "users") | |
| if not uri: | |
| raise RuntimeError("MONGO_URI is not set") | |
| client = MongoClient(uri) | |
| col = client[dbname][coll] | |
| _ensure_email_unique_index(col) | |
| return col | |
| def _ensure_email_unique_index(col): | |
| try: | |
| unique_ok, to_drop = False, None | |
| for idx in col.list_indexes(): | |
| keys = list(idx.get("key", {}).items()) | |
| if keys == [("email", 1)]: | |
| if idx.get("unique", False): unique_ok = True | |
| else: to_drop = idx["name"] | |
| break | |
| if to_drop: col.drop_index(to_drop) | |
| if not unique_ok: col.create_index([("email", ASCENDING)], unique=True) | |
| except OperationFailure: | |
| pass | |
| def login_or_create(email: str, password: str): | |
| if not email or not password: | |
| return False, "Enter both email and password." | |
| email_norm = email.strip().lower() | |
| if not EMAIL_RE.match(email_norm): | |
| return False, "Enter a valid email address." | |
| col = _users_col() | |
| user = col.find_one({"email": email_norm}) | |
| if user: | |
| pwd = user.get("password") | |
| if isinstance(pwd, str): | |
| if not verify_password(password, pwd): | |
| return False, "Invalid credentials." | |
| col.update_one({"_id": user["_id"]}, {"$set": {"last_login": datetime.utcnow()}}) | |
| st.session_state["uid"] = email_norm | |
| return True, email_norm | |
| if isinstance(pwd, dict): | |
| if verify_legacy_password(password, pwd.get("salt"), pwd.get("hash")): | |
| new_hash = hash_password(password) | |
| col.update_one( | |
| {"_id": user["_id"]}, | |
| {"$set": {"password": new_hash, "last_login": datetime.utcnow()}} | |
| ) | |
| st.session_state["uid"] = email_norm | |
| return True, email_norm | |
| return False, "Invalid credentials." | |
| else: | |
| st.warning("Invalid Credentials") | |
| return False, "Invalid Credentials" | |
| def logout(): | |
| st.session_state.pop("uid", None) | |
| st.session_state.pop("login_ts", None) | |
| def login_gate() -> str: | |
| if st.session_state.get("uid"): | |
| return st.session_state["uid"] | |
| st.markdown("<div style='height: 6vh'></div>", unsafe_allow_html=True) | |
| c1, c2, c3 = st.columns([1, 1, 1]) | |
| with c2: | |
| st.markdown("### Log in") | |
| with st.form("login_form", clear_on_submit=False): | |
| email = st.text_input("Email", key="login_email", placeholder="you@example.com") | |
| password = st.text_input("Password", type="password", key="login_pass") | |
| submit = st.form_submit_button("Continue",width='stretch') | |
| if submit: | |
| ok, msg = login_or_create(email, password) | |
| if ok: | |
| st.session_state["uid"] = msg | |
| st.session_state["login_ts"] = datetime.utcnow().isoformat() | |
| st.rerun() | |
| else: | |
| st.error(msg) | |
| st.stop() | |