File size: 3,669 Bytes
3b10d4b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7210db0
3b10d4b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7210db0
3b10d4b
 
 
 
 
 
 
 
 
 
 
 
7210db0
3b10d4b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7210db0
3b10d4b
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import re
from datetime import datetime

import streamlit as st
import os
from pymongo import MongoClient, ASCENDING
from pymongo.errors import OperationFailure
from dotenv import load_dotenv

from .password_utils import hash_password, verify_password, verify_legacy_password

load_dotenv()
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")


def _users_col():
    uri = os.getenv("MONGO_URI")
    dbname = os.getenv("MONGO_DB", "CreativeImageGeneration")
    coll = os.getenv("USERS_COLLECTION", "users")
    if not uri:
        raise RuntimeError("MONGO_URI is not set")

    client = MongoClient(uri)
    col = client[dbname][coll]
    _ensure_email_unique_index(col)
    return col


def _ensure_email_unique_index(col):
    try:
        unique_ok, to_drop = False, None
        for idx in col.list_indexes():
            keys = list(idx.get("key", {}).items())
            if keys == [("email", 1)]:
                if idx.get("unique", False):
                    unique_ok = True
                else:
                    to_drop = idx["name"]
                break

        if to_drop:
            col.drop_index(to_drop)
        if not unique_ok:
            col.create_index([("email", ASCENDING)], unique=True)

    except OperationFailure:
        pass


def login_or_create(email: str, password: str):
 
    if not email or not password:
        return False, "Enter both email and password."

    email_norm = email.strip().lower()
    if not EMAIL_RE.match(email_norm):
        return False, "Enter a valid email address."

    col = _users_col()
    user = col.find_one({"email": email_norm})

    if user:
        pwd = user.get("password")

        if isinstance(pwd, str):
            if not verify_password(password, pwd):
                return False, "Invalid credentials."
            col.update_one(
                {"_id": user["_id"]},
                {"$set": {"last_login": datetime.utcnow()}}
            )
            return True, email_norm
        
        if isinstance(pwd, dict):
            if verify_legacy_password(password, pwd.get("salt"), pwd.get("hash")):
                new_hash = hash_password(password)
                col.update_one(
                    {"_id": user["_id"]},
                    {"$set": {"password": new_hash, "last_login": datetime.utcnow()}}
                )
                return True, email_norm
            return False, "Invalid credentials."

        return False, "Password format invalid in DB."

    
    return False, "Invalid Credentials"


def logout():
    st.session_state.pop("uid", None)
    st.session_state.pop("login_ts", None)


def login_gate() -> str:
    """
    Centered 3-column login UI (same as you want).
    No top spacer, no width arg.
    """
    if st.session_state.get("uid"):
        return st.session_state["uid"]

    st.markdown(
        """
        <style>
        .block-container { padding-top: 10vh; }
        </style>
        """,
        unsafe_allow_html=True
    )

    c1, c2, c3 = st.columns([1, 1, 1])
    with c2:
        st.markdown("### Log in")

        with st.form("login_form", clear_on_submit=False):
            email = st.text_input("Email", key="login_email", placeholder="you@example.com")
            password = st.text_input("Password", type="password", key="login_pass")
            submit = st.form_submit_button("Continue")  

        if submit:
            ok, msg = login_or_create(email, password)
            if ok:
                st.session_state["uid"] = msg
                st.session_state["login_ts"] = datetime.utcnow().isoformat()
                st.rerun()
            else:
                st.error(msg)

    st.stop()