File size: 3,669 Bytes
3b10d4b 7210db0 3b10d4b 7210db0 3b10d4b 7210db0 3b10d4b 7210db0 3b10d4b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | import re
from datetime import datetime
import streamlit as st
import os
from pymongo import MongoClient, ASCENDING
from pymongo.errors import OperationFailure
from dotenv import load_dotenv
from .password_utils import hash_password, verify_password, verify_legacy_password
load_dotenv()
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
def _users_col():
uri = os.getenv("MONGO_URI")
dbname = os.getenv("MONGO_DB", "CreativeImageGeneration")
coll = os.getenv("USERS_COLLECTION", "users")
if not uri:
raise RuntimeError("MONGO_URI is not set")
client = MongoClient(uri)
col = client[dbname][coll]
_ensure_email_unique_index(col)
return col
def _ensure_email_unique_index(col):
try:
unique_ok, to_drop = False, None
for idx in col.list_indexes():
keys = list(idx.get("key", {}).items())
if keys == [("email", 1)]:
if idx.get("unique", False):
unique_ok = True
else:
to_drop = idx["name"]
break
if to_drop:
col.drop_index(to_drop)
if not unique_ok:
col.create_index([("email", ASCENDING)], unique=True)
except OperationFailure:
pass
def login_or_create(email: str, password: str):
if not email or not password:
return False, "Enter both email and password."
email_norm = email.strip().lower()
if not EMAIL_RE.match(email_norm):
return False, "Enter a valid email address."
col = _users_col()
user = col.find_one({"email": email_norm})
if user:
pwd = user.get("password")
if isinstance(pwd, str):
if not verify_password(password, pwd):
return False, "Invalid credentials."
col.update_one(
{"_id": user["_id"]},
{"$set": {"last_login": datetime.utcnow()}}
)
return True, email_norm
if isinstance(pwd, dict):
if verify_legacy_password(password, pwd.get("salt"), pwd.get("hash")):
new_hash = hash_password(password)
col.update_one(
{"_id": user["_id"]},
{"$set": {"password": new_hash, "last_login": datetime.utcnow()}}
)
return True, email_norm
return False, "Invalid credentials."
return False, "Password format invalid in DB."
return False, "Invalid Credentials"
def logout():
st.session_state.pop("uid", None)
st.session_state.pop("login_ts", None)
def login_gate() -> str:
"""
Centered 3-column login UI (same as you want).
No top spacer, no width arg.
"""
if st.session_state.get("uid"):
return st.session_state["uid"]
st.markdown(
"""
<style>
.block-container { padding-top: 10vh; }
</style>
""",
unsafe_allow_html=True
)
c1, c2, c3 = st.columns([1, 1, 1])
with c2:
st.markdown("### Log in")
with st.form("login_form", clear_on_submit=False):
email = st.text_input("Email", key="login_email", placeholder="you@example.com")
password = st.text_input("Password", type="password", key="login_pass")
submit = st.form_submit_button("Continue")
if submit:
ok, msg = login_or_create(email, password)
if ok:
st.session_state["uid"] = msg
st.session_state["login_ts"] = datetime.utcnow().isoformat()
st.rerun()
else:
st.error(msg)
st.stop()
|