AdGenesis-App / authen /authentication.py
userIdc2024's picture
Update authen/authentication.py
4869d0c verified
import re
from datetime import datetime
import streamlit as st
import os
from pymongo import MongoClient, ASCENDING
from pymongo.errors import OperationFailure
from dotenv import load_dotenv
from .password_utils import hash_password, verify_password, verify_legacy_password
load_dotenv()
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
def _users_col():
uri = os.getenv("MONGO_URI")
dbname = os.getenv("MONGO_DB", "Gen_Ai")
coll = os.getenv("USERS_COLLECTION", "users")
if not uri:
raise RuntimeError("MONGO_URI is not set")
client = MongoClient(uri)
col = client[dbname][coll]
_ensure_email_unique_index(col)
return col
def _ensure_email_unique_index(col):
try:
unique_ok, to_drop = False, None
for idx in col.list_indexes():
keys = list(idx.get("key", {}).items())
if keys == [("email", 1)]:
if idx.get("unique", False): unique_ok = True
else: to_drop = idx["name"]
break
if to_drop: col.drop_index(to_drop)
if not unique_ok: col.create_index([("email", ASCENDING)], unique=True)
except OperationFailure:
pass
def login_or_create(email: str, password: str):
if not email or not password:
return False, "Enter both email and password."
email_norm = email.strip().lower()
if not EMAIL_RE.match(email_norm):
return False, "Enter a valid email address."
col = _users_col()
user = col.find_one({"email": email_norm})
if user:
pwd = user.get("password")
if isinstance(pwd, str):
if not verify_password(password, pwd):
return False, "Invalid credentials."
col.update_one({"_id": user["_id"]}, {"$set": {"last_login": datetime.utcnow()}})
st.session_state["uid"] = email_norm
return True, email_norm
if isinstance(pwd, dict):
if verify_legacy_password(password, pwd.get("salt"), pwd.get("hash")):
new_hash = hash_password(password)
col.update_one(
{"_id": user["_id"]},
{"$set": {"password": new_hash, "last_login": datetime.utcnow()}}
)
st.session_state["uid"] = email_norm
return True, email_norm
return False, "Invalid credentials."
else:
st.warning("Invalid Credentials")
return False, "Invalid Credentials"
def logout():
st.session_state.pop("uid", None)
st.session_state.pop("login_ts", None)
def login_gate() -> str:
if st.session_state.get("uid"):
return st.session_state["uid"]
st.markdown("<div style='height: 6vh'></div>", unsafe_allow_html=True)
c1, c2, c3 = st.columns([1, 1, 1])
with c2:
st.markdown("### Log in")
with st.form("login_form", clear_on_submit=False):
email = st.text_input("Email", key="login_email", placeholder="you@example.com")
password = st.text_input("Password", type="password", key="login_pass")
submit = st.form_submit_button("Continue",width='stretch')
if submit:
ok, msg = login_or_create(email, password)
if ok:
st.session_state["uid"] = msg
st.session_state["login_ts"] = datetime.utcnow().isoformat()
st.rerun()
else:
st.error(msg)
st.stop()