File size: 3,447 Bytes
9bc1376
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
974593f
 
 
9bc1376
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
974593f
40e96bd
9bc1376
 
 
 
 
 
 
 
 
 
 
 
974593f
 
 
 
 
 
4869d0c
974593f
 
 
 
 
 
 
 
9bc1376
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import re
from datetime import datetime

import streamlit as st
import os
from pymongo import MongoClient, ASCENDING
from pymongo.errors import OperationFailure
from dotenv import load_dotenv

from .password_utils import hash_password, verify_password, verify_legacy_password

load_dotenv()
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")

def _users_col():

    uri = os.getenv("MONGO_URI")
    dbname = os.getenv("MONGO_DB", "Gen_Ai")
    coll = os.getenv("USERS_COLLECTION", "users")
    if not uri:
        raise RuntimeError("MONGO_URI is not set")
    client = MongoClient(uri)
    col = client[dbname][coll]
    _ensure_email_unique_index(col)
    return col

def _ensure_email_unique_index(col):
    try:
        unique_ok, to_drop = False, None
        for idx in col.list_indexes():
            keys = list(idx.get("key", {}).items())
            if keys == [("email", 1)]:
                if idx.get("unique", False): unique_ok = True
                else: to_drop = idx["name"]
                break
        if to_drop: col.drop_index(to_drop)
        if not unique_ok: col.create_index([("email", ASCENDING)], unique=True)
    except OperationFailure:
        pass

def login_or_create(email: str, password: str):
    if not email or not password:
        return False, "Enter both email and password."
    email_norm = email.strip().lower()
    if not EMAIL_RE.match(email_norm):
        return False, "Enter a valid email address."

    col = _users_col()
    user = col.find_one({"email": email_norm})

    if user:
        pwd = user.get("password")
        if isinstance(pwd, str):
            if not verify_password(password, pwd):
                return False, "Invalid credentials."
            col.update_one({"_id": user["_id"]}, {"$set": {"last_login": datetime.utcnow()}})
            st.session_state["uid"] = email_norm
            return True, email_norm

        if isinstance(pwd, dict):
            if verify_legacy_password(password, pwd.get("salt"), pwd.get("hash")):
                new_hash = hash_password(password)
                col.update_one(
                    {"_id": user["_id"]},
                    {"$set": {"password": new_hash, "last_login": datetime.utcnow()}}
                )
                st.session_state["uid"] = email_norm
                return True, email_norm
            return False, "Invalid credentials."
    else:
        st.warning("Invalid Credentials")
        return False, "Invalid Credentials"



def logout():
    st.session_state.pop("uid", None)
    st.session_state.pop("login_ts", None)

def login_gate() -> str:
    if st.session_state.get("uid"):
        return st.session_state["uid"]

    st.markdown("<div style='height: 6vh'></div>", unsafe_allow_html=True)
    c1, c2, c3 = st.columns([1, 1, 1])
    with c2:
        st.markdown("### Log in")
        with st.form("login_form", clear_on_submit=False):
            email = st.text_input("Email", key="login_email", placeholder="you@example.com")
            password = st.text_input("Password", type="password", key="login_pass")
            submit = st.form_submit_button("Continue",width='stretch')
        if submit:
            ok, msg = login_or_create(email, password)
            if ok:
                st.session_state["uid"] = msg
                st.session_state["login_ts"] = datetime.utcnow().isoformat()
                st.rerun()
            else:
                st.error(msg)
    st.stop()