Remove login gates, make app fully open-source
Browse filesStrip MUCHE member login, invitation codes, OTP/SMTP, password hashing,
and fuzzy name matching. Everyone can now use the app freely with just
a display name. Admin dashboard remains protected behind PREFERO_ADMIN_PASSWORD.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- app/auth.py +20 -356
- app/pages/8_๐_Admin.py +1 -6
- app/utils.py +9 -14
- scripts/test_new_modules.py +21 -228
app/auth.py
CHANGED
|
@@ -1,132 +1,15 @@
|
|
| 1 |
"""Authentication module for ๅๆไพ Prefero.
|
| 2 |
|
| 3 |
-
|
| 4 |
-
|
| 5 |
-
2. Invitation code (instant, checked against env var)
|
| 6 |
-
|
| 7 |
-
MUCHE team members are stored as a whitelist.
|
| 8 |
-
First login: match name + set password to claim account.
|
| 9 |
-
Subsequent logins: match name + enter password.
|
| 10 |
-
Fuzzy matching allows minor name variations (e.g. "John" matches "Jonathan").
|
| 11 |
-
|
| 12 |
-
Toggle with PREFERO_AUTH_ENABLED env var ("true" to enable).
|
| 13 |
"""
|
| 14 |
|
| 15 |
from __future__ import annotations
|
| 16 |
|
| 17 |
-
import hashlib
|
| 18 |
-
import os
|
| 19 |
import re
|
| 20 |
-
from difflib import SequenceMatcher
|
| 21 |
|
| 22 |
import streamlit as st
|
| 23 |
|
| 24 |
-
# ---------------------------------------------------------------------------
|
| 25 |
-
# Configuration helpers
|
| 26 |
-
# ---------------------------------------------------------------------------
|
| 27 |
-
|
| 28 |
-
def _auth_enabled() -> bool:
|
| 29 |
-
return os.environ.get("PREFERO_AUTH_ENABLED", "").lower() == "true"
|
| 30 |
-
|
| 31 |
-
|
| 32 |
-
def _get_invitation_codes() -> list[str]:
|
| 33 |
-
raw = os.environ.get("PREFERO_INVITATION_CODES", "")
|
| 34 |
-
if not raw.strip():
|
| 35 |
-
return []
|
| 36 |
-
return [c.strip().upper() for c in raw.split(",") if c.strip()]
|
| 37 |
-
|
| 38 |
-
|
| 39 |
-
# ---------------------------------------------------------------------------
|
| 40 |
-
# MUCHE team whitelist
|
| 41 |
-
# ---------------------------------------------------------------------------
|
| 42 |
-
|
| 43 |
-
MUCHE_MEMBERS: list[str] = [
|
| 44 |
-
# Director
|
| 45 |
-
"Henry Cutler",
|
| 46 |
-
# Academics
|
| 47 |
-
"Mona Aghdaee",
|
| 48 |
-
"Anam Bilgrami",
|
| 49 |
-
"Jonas Fooken",
|
| 50 |
-
"Yuanyuan Gu",
|
| 51 |
-
"Martin Hoyle",
|
| 52 |
-
"Varinder Jeet",
|
| 53 |
-
"Ali Furkan Kalay",
|
| 54 |
-
"Alicia Norman",
|
| 55 |
-
"Bonny Parkinson",
|
| 56 |
-
"Smriti Raichand",
|
| 57 |
-
"Rezwanul Rana",
|
| 58 |
-
"Rajan Sharma",
|
| 59 |
-
"Shari Stathis",
|
| 60 |
-
"Dandan Yu",
|
| 61 |
-
# Operations
|
| 62 |
-
"Ashley Soytemiz",
|
| 63 |
-
# Honorary & Visiting
|
| 64 |
-
"Marco Bertoni",
|
| 65 |
-
"David Cullen",
|
| 66 |
-
"Nils Gutacker",
|
| 67 |
-
"Rowena Jacobs",
|
| 68 |
-
"Elizabeth-Ann Schroeder",
|
| 69 |
-
"Kompal Sinha",
|
| 70 |
-
"Neil Soderlund",
|
| 71 |
-
"Apostolos Tsiachristas",
|
| 72 |
-
# Graduate research students
|
| 73 |
-
"Meimei Chen",
|
| 74 |
-
"Han Cheng",
|
| 75 |
-
"Kadek Darmawan",
|
| 76 |
-
"Shan Jiang",
|
| 77 |
-
"Maggie Lee",
|
| 78 |
-
"Noura Saba",
|
| 79 |
-
"Saranjit Singh",
|
| 80 |
-
"Zachery Tirrell",
|
| 81 |
-
"Hengzhe Zhao",
|
| 82 |
-
]
|
| 83 |
-
|
| 84 |
-
|
| 85 |
-
def _normalize(name: str) -> str:
|
| 86 |
-
"""Lowercase, strip, collapse whitespace, remove hyphens."""
|
| 87 |
-
return re.sub(r"[\s\-]+", " ", name.strip().lower())
|
| 88 |
-
|
| 89 |
-
|
| 90 |
-
def _fuzzy_match(input_name: str, threshold: float = 0.75) -> str | None:
|
| 91 |
-
"""Find the best fuzzy match from MUCHE_MEMBERS.
|
| 92 |
-
|
| 93 |
-
Returns the canonical name if similarity >= threshold, else None.
|
| 94 |
-
Tries full name match first, then first-name-only match.
|
| 95 |
-
"""
|
| 96 |
-
norm_input = _normalize(input_name)
|
| 97 |
-
if not norm_input:
|
| 98 |
-
return None
|
| 99 |
-
|
| 100 |
-
best_name: str | None = None
|
| 101 |
-
best_score: float = 0.0
|
| 102 |
-
|
| 103 |
-
for member in MUCHE_MEMBERS:
|
| 104 |
-
norm_member = _normalize(member)
|
| 105 |
-
score = SequenceMatcher(None, norm_input, norm_member).ratio()
|
| 106 |
-
if score > best_score:
|
| 107 |
-
best_score = score
|
| 108 |
-
best_name = member
|
| 109 |
-
|
| 110 |
-
first_name = norm_member.split()[0]
|
| 111 |
-
score_first = SequenceMatcher(None, norm_input, first_name).ratio()
|
| 112 |
-
if score_first > best_score:
|
| 113 |
-
best_score = score_first
|
| 114 |
-
best_name = member
|
| 115 |
-
|
| 116 |
-
if best_score >= threshold:
|
| 117 |
-
return best_name
|
| 118 |
-
return None
|
| 119 |
-
|
| 120 |
-
|
| 121 |
-
def _hash_password(password: str) -> str:
|
| 122 |
-
"""Simple SHA-256 hash for password storage."""
|
| 123 |
-
return hashlib.sha256(password.encode()).hexdigest()
|
| 124 |
-
|
| 125 |
-
|
| 126 |
-
# ---------------------------------------------------------------------------
|
| 127 |
-
# Auth gate UI
|
| 128 |
-
# ---------------------------------------------------------------------------
|
| 129 |
-
|
| 130 |
_SLOWBRO_IMG = (
|
| 131 |
"https://raw.githubusercontent.com/PokeAPI/sprites/master/sprites"
|
| 132 |
"/pokemon/other/official-artwork/80.png"
|
|
@@ -134,16 +17,19 @@ _SLOWBRO_IMG = (
|
|
| 134 |
|
| 135 |
|
| 136 |
def auth_gate() -> bool:
|
| 137 |
-
"""
|
| 138 |
-
|
| 139 |
-
|
| 140 |
|
| 141 |
-
|
|
|
|
|
|
|
|
|
|
| 142 |
return True
|
| 143 |
|
| 144 |
try:
|
| 145 |
st.set_page_config(
|
| 146 |
-
page_title="ๅๆไพ Prefero -
|
| 147 |
page_icon="๐งฎ",
|
| 148 |
layout="centered",
|
| 149 |
)
|
|
@@ -153,189 +39,6 @@ def auth_gate() -> bool:
|
|
| 153 |
from utils import language_banner
|
| 154 |
language_banner()
|
| 155 |
|
| 156 |
-
st.markdown(
|
| 157 |
-
"<div style='text-align:center;'>"
|
| 158 |
-
f"<img src='{_SLOWBRO_IMG}' width='120' />"
|
| 159 |
-
"</div>",
|
| 160 |
-
unsafe_allow_html=True,
|
| 161 |
-
)
|
| 162 |
-
st.markdown(
|
| 163 |
-
"<h2 style='text-align:center; margin-bottom:0;'>ๅๆไพ Prefero</h2>",
|
| 164 |
-
unsafe_allow_html=True,
|
| 165 |
-
)
|
| 166 |
-
st.markdown(
|
| 167 |
-
"<p style='text-align:center; color:gray;'>Sign In</p>",
|
| 168 |
-
unsafe_allow_html=True,
|
| 169 |
-
)
|
| 170 |
-
|
| 171 |
-
# โโ Concurrent login conflict resolution โโโโโโโโโโโโโโโโโโโโโ
|
| 172 |
-
_conflict_user = st.session_state.get("_auth_conflict_user")
|
| 173 |
-
if _conflict_user:
|
| 174 |
-
_conflict_id = st.session_state.get("_auth_conflict_id", 0)
|
| 175 |
-
_conflict_source = st.session_state.get("_auth_conflict_source", "")
|
| 176 |
-
|
| 177 |
-
st.warning(
|
| 178 |
-
f"**{_conflict_user}** is currently logged in from another session. "
|
| 179 |
-
"Do you want to kick them off and log in here?"
|
| 180 |
-
)
|
| 181 |
-
|
| 182 |
-
_kick_col, _cancel_col = st.columns(2)
|
| 183 |
-
with _kick_col:
|
| 184 |
-
if st.button("Yes, log in here", type="primary", key="_auth_kick_btn", use_container_width=True):
|
| 185 |
-
from session_queue import force_evict_username, register_username
|
| 186 |
-
force_evict_username(_conflict_user)
|
| 187 |
-
st.session_state["authenticated"] = True
|
| 188 |
-
st.session_state["username"] = _conflict_user
|
| 189 |
-
st.session_state["user_id"] = _conflict_id
|
| 190 |
-
register_username(_conflict_user)
|
| 191 |
-
from community_db import log_activity
|
| 192 |
-
log_activity(_conflict_user, "login", f"{_conflict_source} (kicked previous session)")
|
| 193 |
-
# Clean up conflict state
|
| 194 |
-
st.session_state.pop("_auth_conflict_user", None)
|
| 195 |
-
st.session_state.pop("_auth_conflict_id", None)
|
| 196 |
-
st.session_state.pop("_auth_conflict_source", None)
|
| 197 |
-
st.rerun()
|
| 198 |
-
with _cancel_col:
|
| 199 |
-
if st.button("Cancel", key="_auth_cancel_kick_btn", use_container_width=True):
|
| 200 |
-
st.session_state.pop("_auth_conflict_user", None)
|
| 201 |
-
st.session_state.pop("_auth_conflict_id", None)
|
| 202 |
-
st.session_state.pop("_auth_conflict_source", None)
|
| 203 |
-
st.rerun()
|
| 204 |
-
|
| 205 |
-
return False
|
| 206 |
-
|
| 207 |
-
tab_member, tab_invite = st.tabs(["MUCHE Member", "Invitation Code"])
|
| 208 |
-
|
| 209 |
-
# โโ Tab 1: MUCHE Member (name + password) โโโโโโโโโโโโโโโโโโโโโ
|
| 210 |
-
with tab_member:
|
| 211 |
-
st.caption("Enter your name as it appears on the MUCHE website.")
|
| 212 |
-
|
| 213 |
-
name_input = st.text_input(
|
| 214 |
-
"Your name",
|
| 215 |
-
key="_auth_member_name",
|
| 216 |
-
placeholder="e.g. Jordan McAllister",
|
| 217 |
-
)
|
| 218 |
-
|
| 219 |
-
matched_name = None
|
| 220 |
-
if name_input and name_input.strip():
|
| 221 |
-
matched_name = _fuzzy_match(name_input)
|
| 222 |
-
|
| 223 |
-
if matched_name:
|
| 224 |
-
st.success(f"Matched: **{matched_name}**")
|
| 225 |
-
|
| 226 |
-
from community_db import init_db, get_user_by_name
|
| 227 |
-
init_db()
|
| 228 |
-
existing = get_user_by_name(matched_name)
|
| 229 |
-
|
| 230 |
-
if existing and existing.email:
|
| 231 |
-
# Returning user โ enter password
|
| 232 |
-
pw_input = st.text_input(
|
| 233 |
-
"Password",
|
| 234 |
-
key="_auth_member_pw",
|
| 235 |
-
type="password",
|
| 236 |
-
)
|
| 237 |
-
if st.button("Sign In", key="_auth_member_login_btn", use_container_width=True):
|
| 238 |
-
if not pw_input:
|
| 239 |
-
st.error("Please enter your password.")
|
| 240 |
-
elif _hash_password(pw_input) == existing.email:
|
| 241 |
-
from session_queue import is_username_active, register_username
|
| 242 |
-
if is_username_active(existing.username):
|
| 243 |
-
st.session_state["_auth_conflict_user"] = existing.username
|
| 244 |
-
st.session_state["_auth_conflict_id"] = existing.id
|
| 245 |
-
st.session_state["_auth_conflict_source"] = "MUCHE member"
|
| 246 |
-
st.rerun()
|
| 247 |
-
else:
|
| 248 |
-
st.session_state["authenticated"] = True
|
| 249 |
-
st.session_state["username"] = existing.username
|
| 250 |
-
st.session_state["user_id"] = existing.id
|
| 251 |
-
register_username(existing.username)
|
| 252 |
-
from community_db import log_activity
|
| 253 |
-
log_activity(existing.username, "login", "MUCHE member")
|
| 254 |
-
st.success(f"Welcome back, {existing.username}!")
|
| 255 |
-
st.rerun()
|
| 256 |
-
else:
|
| 257 |
-
st.error("Incorrect password.")
|
| 258 |
-
else:
|
| 259 |
-
# First time โ set password to claim account
|
| 260 |
-
st.info("First time? Set a password to claim your account.")
|
| 261 |
-
pw1 = st.text_input(
|
| 262 |
-
"Choose a password",
|
| 263 |
-
key="_auth_member_pw1",
|
| 264 |
-
type="password",
|
| 265 |
-
)
|
| 266 |
-
pw2 = st.text_input(
|
| 267 |
-
"Confirm password",
|
| 268 |
-
key="_auth_member_pw2",
|
| 269 |
-
type="password",
|
| 270 |
-
)
|
| 271 |
-
if st.button("Create Account", key="_auth_member_create_btn", use_container_width=True):
|
| 272 |
-
if not pw1 or not pw2:
|
| 273 |
-
st.error("Please fill in both password fields.")
|
| 274 |
-
elif len(pw1) < 4:
|
| 275 |
-
st.error("Password must be at least 4 characters.")
|
| 276 |
-
elif pw1 != pw2:
|
| 277 |
-
st.error("Passwords do not match.")
|
| 278 |
-
else:
|
| 279 |
-
try:
|
| 280 |
-
from community_db import create_user, log_activity
|
| 281 |
-
from session_queue import register_username
|
| 282 |
-
user = create_user(matched_name, email=_hash_password(pw1))
|
| 283 |
-
st.session_state["authenticated"] = True
|
| 284 |
-
st.session_state["username"] = user.username
|
| 285 |
-
st.session_state["user_id"] = user.id
|
| 286 |
-
register_username(user.username)
|
| 287 |
-
log_activity(user.username, "register", "MUCHE member")
|
| 288 |
-
st.success(f"Account created! Welcome, {user.username}!")
|
| 289 |
-
st.rerun()
|
| 290 |
-
except ValueError as exc:
|
| 291 |
-
st.error(str(exc))
|
| 292 |
-
elif name_input and name_input.strip():
|
| 293 |
-
st.warning(
|
| 294 |
-
"Name not found in the MUCHE team list. "
|
| 295 |
-
"Please check your spelling, or use an invitation code."
|
| 296 |
-
)
|
| 297 |
-
|
| 298 |
-
# โโ Tab 2: Invitation Code โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
|
| 299 |
-
with tab_invite:
|
| 300 |
-
code_input = st.text_input(
|
| 301 |
-
"Enter your invitation code",
|
| 302 |
-
key="_auth_invite_input",
|
| 303 |
-
type="password",
|
| 304 |
-
)
|
| 305 |
-
if st.button("Enter", key="_auth_invite_btn", use_container_width=True):
|
| 306 |
-
valid_codes = _get_invitation_codes()
|
| 307 |
-
if not valid_codes:
|
| 308 |
-
st.error("No invitation codes are configured.")
|
| 309 |
-
elif code_input.strip().upper() in valid_codes:
|
| 310 |
-
st.session_state["authenticated"] = True
|
| 311 |
-
st.session_state["auth_email"] = ""
|
| 312 |
-
st.success("Access granted!")
|
| 313 |
-
st.rerun()
|
| 314 |
-
else:
|
| 315 |
-
st.error("Invalid invitation code.")
|
| 316 |
-
|
| 317 |
-
return False
|
| 318 |
-
|
| 319 |
-
|
| 320 |
-
# ---------------------------------------------------------------------------
|
| 321 |
-
# Username selection gate (shown after auth, before main app)
|
| 322 |
-
# ---------------------------------------------------------------------------
|
| 323 |
-
|
| 324 |
-
def username_gate() -> bool:
|
| 325 |
-
"""Prompt the user to choose a display name if they haven't yet.
|
| 326 |
-
|
| 327 |
-
For MUCHE members, username is already set during auth_gate.
|
| 328 |
-
Only invitation code users see this screen.
|
| 329 |
-
"""
|
| 330 |
-
if not _auth_enabled():
|
| 331 |
-
if not st.session_state.get("username"):
|
| 332 |
-
st.session_state["username"] = "Guest"
|
| 333 |
-
st.session_state["user_id"] = 0
|
| 334 |
-
return True
|
| 335 |
-
|
| 336 |
-
if st.session_state.get("username"):
|
| 337 |
-
return True
|
| 338 |
-
|
| 339 |
st.markdown(
|
| 340 |
"<div style='text-align:center;'>"
|
| 341 |
f"<img src='{_SLOWBRO_IMG}' width='100' />"
|
|
@@ -343,44 +46,12 @@ def username_gate() -> bool:
|
|
| 343 |
unsafe_allow_html=True,
|
| 344 |
)
|
| 345 |
st.markdown(
|
| 346 |
-
"<h3 style='text-align:center;'>
|
|
|
|
|
|
|
| 347 |
unsafe_allow_html=True,
|
| 348 |
)
|
| 349 |
|
| 350 |
-
# โโ Concurrent login conflict resolution โโโโโโโโโโโโโโโโโโโโโ
|
| 351 |
-
_conflict_user = st.session_state.get("_auth_conflict_user")
|
| 352 |
-
if _conflict_user:
|
| 353 |
-
_conflict_id = st.session_state.get("_auth_conflict_id", 0)
|
| 354 |
-
_conflict_source = st.session_state.get("_auth_conflict_source", "")
|
| 355 |
-
|
| 356 |
-
st.warning(
|
| 357 |
-
f"**{_conflict_user}** is currently logged in from another session. "
|
| 358 |
-
"Do you want to kick them off and log in here?"
|
| 359 |
-
)
|
| 360 |
-
|
| 361 |
-
_kick_col, _cancel_col = st.columns(2)
|
| 362 |
-
with _kick_col:
|
| 363 |
-
if st.button("Yes, log in here", type="primary", key="_ugate_kick_btn", use_container_width=True):
|
| 364 |
-
from session_queue import force_evict_username, register_username
|
| 365 |
-
force_evict_username(_conflict_user)
|
| 366 |
-
st.session_state["username"] = _conflict_user
|
| 367 |
-
st.session_state["user_id"] = _conflict_id
|
| 368 |
-
register_username(_conflict_user)
|
| 369 |
-
from community_db import log_activity
|
| 370 |
-
log_activity(_conflict_user, "login", f"{_conflict_source} (kicked previous session)")
|
| 371 |
-
st.session_state.pop("_auth_conflict_user", None)
|
| 372 |
-
st.session_state.pop("_auth_conflict_id", None)
|
| 373 |
-
st.session_state.pop("_auth_conflict_source", None)
|
| 374 |
-
st.rerun()
|
| 375 |
-
with _cancel_col:
|
| 376 |
-
if st.button("Cancel", key="_ugate_cancel_kick_btn", use_container_width=True):
|
| 377 |
-
st.session_state.pop("_auth_conflict_user", None)
|
| 378 |
-
st.session_state.pop("_auth_conflict_id", None)
|
| 379 |
-
st.session_state.pop("_auth_conflict_source", None)
|
| 380 |
-
st.rerun()
|
| 381 |
-
|
| 382 |
-
return False
|
| 383 |
-
|
| 384 |
name_input = st.text_input(
|
| 385 |
"Username",
|
| 386 |
key="_auth_username_input",
|
|
@@ -399,29 +70,22 @@ def username_gate() -> bool:
|
|
| 399 |
else:
|
| 400 |
try:
|
| 401 |
from community_db import init_db, create_user, get_user_by_name, log_activity
|
| 402 |
-
from session_queue import
|
| 403 |
|
| 404 |
init_db()
|
| 405 |
existing = get_user_by_name(name)
|
| 406 |
if existing:
|
| 407 |
-
|
| 408 |
-
|
| 409 |
-
|
| 410 |
-
|
| 411 |
-
|
| 412 |
-
else:
|
| 413 |
-
st.session_state["username"] = existing.username
|
| 414 |
-
st.session_state["user_id"] = existing.id
|
| 415 |
-
register_username(existing.username)
|
| 416 |
-
log_activity(existing.username, "login", "Returning user")
|
| 417 |
-
st.success(f"Welcome back, {existing.username}!")
|
| 418 |
-
st.rerun()
|
| 419 |
else:
|
| 420 |
user = create_user(name, email="")
|
| 421 |
st.session_state["username"] = user.username
|
| 422 |
st.session_state["user_id"] = user.id
|
| 423 |
register_username(user.username)
|
| 424 |
-
|
| 425 |
st.rerun()
|
| 426 |
except Exception as exc:
|
| 427 |
st.error(f"Error: {exc}")
|
|
|
|
| 1 |
"""Authentication module for ๅๆไพ Prefero.
|
| 2 |
|
| 3 |
+
Open access โ no login required. Users pick a display name for community features.
|
| 4 |
+
Admin access is controlled separately via PREFERO_ADMIN_PASSWORD env var.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 5 |
"""
|
| 6 |
|
| 7 |
from __future__ import annotations
|
| 8 |
|
|
|
|
|
|
|
| 9 |
import re
|
|
|
|
| 10 |
|
| 11 |
import streamlit as st
|
| 12 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 13 |
_SLOWBRO_IMG = (
|
| 14 |
"https://raw.githubusercontent.com/PokeAPI/sprites/master/sprites"
|
| 15 |
"/pokemon/other/official-artwork/80.png"
|
|
|
|
| 17 |
|
| 18 |
|
| 19 |
def auth_gate() -> bool:
|
| 20 |
+
"""Always returns True โ open access, no login required."""
|
| 21 |
+
st.session_state["authenticated"] = True
|
| 22 |
+
return True
|
| 23 |
|
| 24 |
+
|
| 25 |
+
def username_gate() -> bool:
|
| 26 |
+
"""Prompt the user to choose a display name if they haven't yet."""
|
| 27 |
+
if st.session_state.get("username"):
|
| 28 |
return True
|
| 29 |
|
| 30 |
try:
|
| 31 |
st.set_page_config(
|
| 32 |
+
page_title="ๅๆไพ Prefero - Welcome",
|
| 33 |
page_icon="๐งฎ",
|
| 34 |
layout="centered",
|
| 35 |
)
|
|
|
|
| 39 |
from utils import language_banner
|
| 40 |
language_banner()
|
| 41 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 42 |
st.markdown(
|
| 43 |
"<div style='text-align:center;'>"
|
| 44 |
f"<img src='{_SLOWBRO_IMG}' width='100' />"
|
|
|
|
| 46 |
unsafe_allow_html=True,
|
| 47 |
)
|
| 48 |
st.markdown(
|
| 49 |
+
"<h3 style='text-align:center;'>Welcome to ๅๆไพ Prefero</h3>"
|
| 50 |
+
"<p style='text-align:center; color:gray;'>"
|
| 51 |
+
"Choose a display name to get started</p>",
|
| 52 |
unsafe_allow_html=True,
|
| 53 |
)
|
| 54 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 55 |
name_input = st.text_input(
|
| 56 |
"Username",
|
| 57 |
key="_auth_username_input",
|
|
|
|
| 70 |
else:
|
| 71 |
try:
|
| 72 |
from community_db import init_db, create_user, get_user_by_name, log_activity
|
| 73 |
+
from session_queue import register_username
|
| 74 |
|
| 75 |
init_db()
|
| 76 |
existing = get_user_by_name(name)
|
| 77 |
if existing:
|
| 78 |
+
st.session_state["username"] = existing.username
|
| 79 |
+
st.session_state["user_id"] = existing.id
|
| 80 |
+
register_username(existing.username)
|
| 81 |
+
log_activity(existing.username, "login", "Returning user")
|
| 82 |
+
st.rerun()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 83 |
else:
|
| 84 |
user = create_user(name, email="")
|
| 85 |
st.session_state["username"] = user.username
|
| 86 |
st.session_state["user_id"] = user.id
|
| 87 |
register_username(user.username)
|
| 88 |
+
log_activity(user.username, "register", "New user")
|
| 89 |
st.rerun()
|
| 90 |
except Exception as exc:
|
| 91 |
st.error(f"Error: {exc}")
|
app/pages/8_๐_Admin.py
CHANGED
|
@@ -427,12 +427,8 @@ with c3:
|
|
| 427 |
|
| 428 |
st.header("Space Configuration")
|
| 429 |
|
| 430 |
-
auth_enabled = os.environ.get("PREFERO_AUTH_ENABLED", "false")
|
| 431 |
queue_enabled = os.environ.get("PREFERO_QUEUE_ENABLED", "false")
|
| 432 |
|
| 433 |
-
invite_codes_raw = os.environ.get("PREFERO_INVITATION_CODES", "")
|
| 434 |
-
invite_count = len([c for c in invite_codes_raw.split(",") if c.strip()]) if invite_codes_raw.strip() else 0
|
| 435 |
-
|
| 436 |
admin_users_display = os.environ.get("PREFERO_ADMIN_USERS", "(not set)")
|
| 437 |
|
| 438 |
smtp_configured = bool(os.environ.get("SMTP_USER")) and bool(os.environ.get("SMTP_PASS"))
|
|
@@ -440,10 +436,9 @@ smtp_configured = bool(os.environ.get("SMTP_USER")) and bool(os.environ.get("SMT
|
|
| 440 |
cfg_col1, cfg_col2 = st.columns(2)
|
| 441 |
|
| 442 |
with cfg_col1:
|
| 443 |
-
st.metric("
|
| 444 |
st.metric("Queue enabled", queue_enabled)
|
| 445 |
st.metric("SMTP configured", "Yes" if smtp_configured else "No")
|
| 446 |
|
| 447 |
with cfg_col2:
|
| 448 |
-
st.metric("Invitation codes", str(invite_count))
|
| 449 |
st.metric("Admin users", admin_users_display)
|
|
|
|
| 427 |
|
| 428 |
st.header("Space Configuration")
|
| 429 |
|
|
|
|
| 430 |
queue_enabled = os.environ.get("PREFERO_QUEUE_ENABLED", "false")
|
| 431 |
|
|
|
|
|
|
|
|
|
|
| 432 |
admin_users_display = os.environ.get("PREFERO_ADMIN_USERS", "(not set)")
|
| 433 |
|
| 434 |
smtp_configured = bool(os.environ.get("SMTP_USER")) and bool(os.environ.get("SMTP_PASS"))
|
|
|
|
| 436 |
cfg_col1, cfg_col2 = st.columns(2)
|
| 437 |
|
| 438 |
with cfg_col1:
|
| 439 |
+
st.metric("Access mode", "Open (no login)")
|
| 440 |
st.metric("Queue enabled", queue_enabled)
|
| 441 |
st.metric("SMTP configured", "Yes" if smtp_configured else "No")
|
| 442 |
|
| 443 |
with cfg_col2:
|
|
|
|
| 444 |
st.metric("Admin users", admin_users_display)
|
app/utils.py
CHANGED
|
@@ -35,12 +35,7 @@ _SESSION_DEFAULTS: dict[str, object] = {
|
|
| 35 |
|
| 36 |
|
| 37 |
def require_auth() -> None:
|
| 38 |
-
"""
|
| 39 |
-
|
| 40 |
-
When auth is disabled via PREFERO_AUTH_ENABLED, this is a no-op.
|
| 41 |
-
Otherwise it shows the auth gate and stops execution until the
|
| 42 |
-
user successfully authenticates.
|
| 43 |
-
"""
|
| 44 |
if not auth_gate():
|
| 45 |
st.stop()
|
| 46 |
|
|
@@ -56,7 +51,7 @@ def require_queue_slot() -> None:
|
|
| 56 |
|
| 57 |
|
| 58 |
def _check_session_timeout() -> None:
|
| 59 |
-
"""If the session was evicted due to inactivity, clear
|
| 60 |
|
| 61 |
Skips the check when estimation is actively running (the user is waiting
|
| 62 |
for Slowbro to finish, not actually idle).
|
|
@@ -69,22 +64,22 @@ def _check_session_timeout() -> None:
|
|
| 69 |
if st.session_state.get("_estimation_running"):
|
| 70 |
return
|
| 71 |
|
| 72 |
-
if not st.session_state.get("
|
| 73 |
-
return #
|
| 74 |
|
| 75 |
# Only check if user was previously admitted to the queue.
|
| 76 |
-
# Without this,
|
| 77 |
-
#
|
| 78 |
if not st.session_state.get("_queue_admitted"):
|
| 79 |
return
|
| 80 |
|
| 81 |
if not is_session_active():
|
| 82 |
-
# Session was evicted โ clear
|
| 83 |
-
for key in ("
|
| 84 |
st.session_state[key] = _SESSION_DEFAULTS.get(key, "")
|
| 85 |
st.session_state.pop("_queue_session_id", None)
|
| 86 |
st.session_state.pop("_queue_admitted", None)
|
| 87 |
-
st.warning("Your session expired due to inactivity. Please
|
| 88 |
st.stop()
|
| 89 |
|
| 90 |
|
|
|
|
| 35 |
|
| 36 |
|
| 37 |
def require_auth() -> None:
|
| 38 |
+
"""Run the auth gate. Currently a no-op (open access)."""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 39 |
if not auth_gate():
|
| 40 |
st.stop()
|
| 41 |
|
|
|
|
| 51 |
|
| 52 |
|
| 53 |
def _check_session_timeout() -> None:
|
| 54 |
+
"""If the session was evicted due to inactivity, clear state and redirect.
|
| 55 |
|
| 56 |
Skips the check when estimation is actively running (the user is waiting
|
| 57 |
for Slowbro to finish, not actually idle).
|
|
|
|
| 64 |
if st.session_state.get("_estimation_running"):
|
| 65 |
return
|
| 66 |
|
| 67 |
+
if not st.session_state.get("username"):
|
| 68 |
+
return # no username set โ nothing to expire
|
| 69 |
|
| 70 |
# Only check if user was previously admitted to the queue.
|
| 71 |
+
# Without this, fresh users who haven't entered the queue yet
|
| 72 |
+
# would be incorrectly treated as evicted.
|
| 73 |
if not st.session_state.get("_queue_admitted"):
|
| 74 |
return
|
| 75 |
|
| 76 |
if not is_session_active():
|
| 77 |
+
# Session was evicted โ clear state so user re-enters
|
| 78 |
+
for key in ("username", "user_id"):
|
| 79 |
st.session_state[key] = _SESSION_DEFAULTS.get(key, "")
|
| 80 |
st.session_state.pop("_queue_session_id", None)
|
| 81 |
st.session_state.pop("_queue_admitted", None)
|
| 82 |
+
st.warning("Your session expired due to inactivity. Please choose a username to continue.")
|
| 83 |
st.stop()
|
| 84 |
|
| 85 |
|
scripts/test_new_modules.py
CHANGED
|
@@ -2,7 +2,7 @@
|
|
| 2 |
"""Comprehensive tests for newly created/modified modules in ๅๆไพ Prefero.
|
| 3 |
|
| 4 |
Tested modules:
|
| 5 |
-
1. app/auth.py โ Authentication (
|
| 6 |
2. app/session_queue.py โ Concurrent user queue
|
| 7 |
3. app/waiting_facts.py โ Cultural facts list
|
| 8 |
4. app/pages/1_Data.py โ _generate_template_excel()
|
|
@@ -57,242 +57,33 @@ def _reset_session_state():
|
|
| 57 |
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
|
| 58 |
|
| 59 |
class TestAuth(unittest.TestCase):
|
| 60 |
-
"""Tests for the authentication module."""
|
| 61 |
|
| 62 |
def setUp(self):
|
| 63 |
_reset_session_state()
|
| 64 |
-
# Clean env vars that auth.py reads
|
| 65 |
-
for var in ("PREFERO_AUTH_ENABLED", "PREFERO_INVITATION_CODES",
|
| 66 |
-
"SMTP_USER", "SMTP_PASS", "SMTP_HOST", "SMTP_PORT"):
|
| 67 |
-
os.environ.pop(var, None)
|
| 68 |
-
|
| 69 |
-
def tearDown(self):
|
| 70 |
-
for var in ("PREFERO_AUTH_ENABLED", "PREFERO_INVITATION_CODES",
|
| 71 |
-
"SMTP_USER", "SMTP_PASS", "SMTP_HOST", "SMTP_PORT"):
|
| 72 |
-
os.environ.pop(var, None)
|
| 73 |
-
|
| 74 |
-
# โโ _generate_otp โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
|
| 75 |
-
|
| 76 |
-
def test_generate_otp_returns_6_digit_string(self):
|
| 77 |
-
from auth import _generate_otp
|
| 78 |
-
otp = _generate_otp()
|
| 79 |
-
self.assertIsInstance(otp, str)
|
| 80 |
-
self.assertEqual(len(otp), 6)
|
| 81 |
-
self.assertTrue(otp.isdigit(), f"OTP '{otp}' is not all digits")
|
| 82 |
-
|
| 83 |
-
def test_generate_otp_multiple_calls_vary(self):
|
| 84 |
-
"""Multiple calls should (very likely) produce different OTPs."""
|
| 85 |
-
from auth import _generate_otp
|
| 86 |
-
otps = {_generate_otp() for _ in range(50)}
|
| 87 |
-
# With 10^6 possibilities and 50 draws, collisions are extremely rare
|
| 88 |
-
self.assertGreater(len(otps), 1, "50 OTP calls all returned the same value")
|
| 89 |
-
|
| 90 |
-
# โโ _get_invitation_codes โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
|
| 91 |
-
|
| 92 |
-
def test_invitation_codes_empty_when_unset(self):
|
| 93 |
-
from auth import _get_invitation_codes
|
| 94 |
-
self.assertEqual(_get_invitation_codes(), [])
|
| 95 |
-
|
| 96 |
-
def test_invitation_codes_empty_string(self):
|
| 97 |
-
from auth import _get_invitation_codes
|
| 98 |
-
os.environ["PREFERO_INVITATION_CODES"] = ""
|
| 99 |
-
self.assertEqual(_get_invitation_codes(), [])
|
| 100 |
-
|
| 101 |
-
def test_invitation_codes_whitespace_only(self):
|
| 102 |
-
from auth import _get_invitation_codes
|
| 103 |
-
os.environ["PREFERO_INVITATION_CODES"] = " "
|
| 104 |
-
self.assertEqual(_get_invitation_codes(), [])
|
| 105 |
-
|
| 106 |
-
def test_invitation_codes_single(self):
|
| 107 |
-
from auth import _get_invitation_codes
|
| 108 |
-
os.environ["PREFERO_INVITATION_CODES"] = "abc123"
|
| 109 |
-
self.assertEqual(_get_invitation_codes(), ["ABC123"])
|
| 110 |
-
|
| 111 |
-
def test_invitation_codes_multiple_with_spaces(self):
|
| 112 |
-
from auth import _get_invitation_codes
|
| 113 |
-
os.environ["PREFERO_INVITATION_CODES"] = " code1 , code2 , CODE3 "
|
| 114 |
-
self.assertEqual(_get_invitation_codes(), ["CODE1", "CODE2", "CODE3"])
|
| 115 |
-
|
| 116 |
-
def test_invitation_codes_case_insensitive_upper(self):
|
| 117 |
-
"""Codes are stored upper-cased for case-insensitive matching."""
|
| 118 |
-
from auth import _get_invitation_codes
|
| 119 |
-
os.environ["PREFERO_INVITATION_CODES"] = "HelloWorld"
|
| 120 |
-
codes = _get_invitation_codes()
|
| 121 |
-
self.assertIn("HELLOWORLD", codes)
|
| 122 |
-
|
| 123 |
-
def test_invitation_codes_skip_empty_segments(self):
|
| 124 |
-
from auth import _get_invitation_codes
|
| 125 |
-
os.environ["PREFERO_INVITATION_CODES"] = "a,,b, ,c"
|
| 126 |
-
codes = _get_invitation_codes()
|
| 127 |
-
self.assertEqual(codes, ["A", "B", "C"])
|
| 128 |
-
|
| 129 |
-
# โโ _auth_enabled โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
|
| 130 |
-
|
| 131 |
-
def test_auth_enabled_false_when_unset(self):
|
| 132 |
-
from auth import _auth_enabled
|
| 133 |
-
self.assertFalse(_auth_enabled())
|
| 134 |
-
|
| 135 |
-
def test_auth_enabled_false_when_empty(self):
|
| 136 |
-
from auth import _auth_enabled
|
| 137 |
-
os.environ["PREFERO_AUTH_ENABLED"] = ""
|
| 138 |
-
self.assertFalse(_auth_enabled())
|
| 139 |
-
|
| 140 |
-
def test_auth_enabled_false_when_random_string(self):
|
| 141 |
-
from auth import _auth_enabled
|
| 142 |
-
os.environ["PREFERO_AUTH_ENABLED"] = "yes"
|
| 143 |
-
self.assertFalse(_auth_enabled())
|
| 144 |
-
|
| 145 |
-
def test_auth_enabled_true(self):
|
| 146 |
-
from auth import _auth_enabled
|
| 147 |
-
os.environ["PREFERO_AUTH_ENABLED"] = "true"
|
| 148 |
-
self.assertTrue(_auth_enabled())
|
| 149 |
-
|
| 150 |
-
def test_auth_enabled_true_case_insensitive(self):
|
| 151 |
-
from auth import _auth_enabled
|
| 152 |
-
os.environ["PREFERO_AUTH_ENABLED"] = "TRUE"
|
| 153 |
-
self.assertTrue(_auth_enabled())
|
| 154 |
-
|
| 155 |
-
def test_auth_enabled_true_mixed_case(self):
|
| 156 |
-
from auth import _auth_enabled
|
| 157 |
-
os.environ["PREFERO_AUTH_ENABLED"] = "True"
|
| 158 |
-
self.assertTrue(_auth_enabled())
|
| 159 |
-
|
| 160 |
-
# โโ _smtp_configured โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
|
| 161 |
-
|
| 162 |
-
def test_smtp_not_configured_when_unset(self):
|
| 163 |
-
from auth import _smtp_configured
|
| 164 |
-
self.assertFalse(_smtp_configured())
|
| 165 |
-
|
| 166 |
-
def test_smtp_not_configured_with_only_user(self):
|
| 167 |
-
from auth import _smtp_configured
|
| 168 |
-
os.environ["SMTP_USER"] = "user@example.com"
|
| 169 |
-
self.assertFalse(_smtp_configured())
|
| 170 |
-
|
| 171 |
-
def test_smtp_not_configured_with_only_pass(self):
|
| 172 |
-
from auth import _smtp_configured
|
| 173 |
-
os.environ["SMTP_PASS"] = "secret"
|
| 174 |
-
self.assertFalse(_smtp_configured())
|
| 175 |
-
|
| 176 |
-
def test_smtp_configured_with_both(self):
|
| 177 |
-
from auth import _smtp_configured
|
| 178 |
-
os.environ["SMTP_USER"] = "user@example.com"
|
| 179 |
-
os.environ["SMTP_PASS"] = "secret"
|
| 180 |
-
self.assertTrue(_smtp_configured())
|
| 181 |
-
|
| 182 |
-
def test_smtp_not_configured_with_empty_user(self):
|
| 183 |
-
from auth import _smtp_configured
|
| 184 |
-
os.environ["SMTP_USER"] = ""
|
| 185 |
-
os.environ["SMTP_PASS"] = "secret"
|
| 186 |
-
self.assertFalse(_smtp_configured())
|
| 187 |
-
|
| 188 |
-
# โโ _check_rate_limit & _record_otp_attempt โโโโโโโโโโโโโโโโโโโ
|
| 189 |
-
|
| 190 |
-
def test_rate_limit_allows_first_attempt(self):
|
| 191 |
-
from auth import _check_rate_limit
|
| 192 |
-
self.assertTrue(_check_rate_limit("test@example.com"))
|
| 193 |
-
|
| 194 |
-
def test_rate_limit_allows_up_to_three(self):
|
| 195 |
-
from auth import _check_rate_limit, _record_otp_attempt
|
| 196 |
-
email = "ratelimit@example.com"
|
| 197 |
-
_record_otp_attempt(email)
|
| 198 |
-
self.assertTrue(_check_rate_limit(email))
|
| 199 |
-
_record_otp_attempt(email)
|
| 200 |
-
self.assertTrue(_check_rate_limit(email))
|
| 201 |
-
_record_otp_attempt(email)
|
| 202 |
-
# Now at 3 attempts โ should be blocked
|
| 203 |
-
self.assertFalse(_check_rate_limit(email))
|
| 204 |
-
|
| 205 |
-
def test_rate_limit_different_emails_independent(self):
|
| 206 |
-
from auth import _check_rate_limit, _record_otp_attempt
|
| 207 |
-
for _ in range(3):
|
| 208 |
-
_record_otp_attempt("a@example.com")
|
| 209 |
-
self.assertFalse(_check_rate_limit("a@example.com"))
|
| 210 |
-
# Different email should still be allowed
|
| 211 |
-
self.assertTrue(_check_rate_limit("b@example.com"))
|
| 212 |
-
|
| 213 |
-
def test_rate_limit_old_attempts_expire(self):
|
| 214 |
-
"""Attempts older than 10 minutes should not count."""
|
| 215 |
-
from auth import _check_rate_limit, _record_otp_attempt
|
| 216 |
-
email = "expire@example.com"
|
| 217 |
-
# Manually set old timestamps
|
| 218 |
-
old_time = time.time() - 11 * 60 # 11 minutes ago
|
| 219 |
-
_mock_session_state["_otp_attempts"] = {email: [old_time, old_time, old_time]}
|
| 220 |
-
self.assertTrue(_check_rate_limit(email))
|
| 221 |
-
|
| 222 |
-
def test_record_otp_creates_key_if_missing(self):
|
| 223 |
-
from auth import _record_otp_attempt
|
| 224 |
-
self.assertNotIn("_otp_attempts", _mock_session_state)
|
| 225 |
-
_record_otp_attempt("new@example.com")
|
| 226 |
-
self.assertIn("_otp_attempts", _mock_session_state)
|
| 227 |
-
self.assertEqual(len(_mock_session_state["_otp_attempts"]["new@example.com"]), 1)
|
| 228 |
-
|
| 229 |
-
# โโ _send_otp_email (mocked) โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
|
| 230 |
-
|
| 231 |
-
def test_send_otp_email_constructs_message(self):
|
| 232 |
-
from auth import _send_otp_email
|
| 233 |
-
import base64
|
| 234 |
-
os.environ["SMTP_USER"] = "sender@example.com"
|
| 235 |
-
os.environ["SMTP_PASS"] = "pass123"
|
| 236 |
-
os.environ["SMTP_HOST"] = "smtp.example.com"
|
| 237 |
-
os.environ["SMTP_PORT"] = "587"
|
| 238 |
-
|
| 239 |
-
with patch("auth.smtplib.SMTP") as mock_smtp_class:
|
| 240 |
-
mock_server = MagicMock()
|
| 241 |
-
mock_smtp_class.return_value.__enter__ = MagicMock(return_value=mock_server)
|
| 242 |
-
mock_smtp_class.return_value.__exit__ = MagicMock(return_value=False)
|
| 243 |
-
|
| 244 |
-
_send_otp_email("recipient@example.com", "123456")
|
| 245 |
-
|
| 246 |
-
# Verify SMTP was initialized with correct host/port
|
| 247 |
-
mock_smtp_class.assert_called_once_with("smtp.example.com", 587, timeout=15)
|
| 248 |
-
# Verify the sendmail call happened
|
| 249 |
-
mock_server.starttls.assert_called_once()
|
| 250 |
-
mock_server.login.assert_called_once_with("sender@example.com", "pass123")
|
| 251 |
-
mock_server.sendmail.assert_called_once()
|
| 252 |
-
call_args = mock_server.sendmail.call_args
|
| 253 |
-
self.assertEqual(call_args[0][0], "sender@example.com")
|
| 254 |
-
self.assertEqual(call_args[0][1], ["recipient@example.com"])
|
| 255 |
-
# Verify the MIME headers
|
| 256 |
-
msg_string = call_args[0][2]
|
| 257 |
-
self.assertIn("Subject: Your Prefero Login Code", msg_string)
|
| 258 |
-
self.assertIn("From: sender@example.com", msg_string)
|
| 259 |
-
self.assertIn("To: recipient@example.com", msg_string)
|
| 260 |
-
# The body is base64-encoded (MIMEText with utf-8 charset)
|
| 261 |
-
# Decode and verify the OTP code is in the body
|
| 262 |
-
from email import message_from_string
|
| 263 |
-
parsed = message_from_string(msg_string)
|
| 264 |
-
body = parsed.get_payload(decode=True).decode("utf-8")
|
| 265 |
-
self.assertIn("123456", body)
|
| 266 |
-
self.assertIn("Prefero", body)
|
| 267 |
-
self.assertIn("verification code", body)
|
| 268 |
-
|
| 269 |
-
def test_send_otp_email_default_host_and_port(self):
|
| 270 |
-
from auth import _send_otp_email
|
| 271 |
-
os.environ["SMTP_USER"] = "u@example.com"
|
| 272 |
-
os.environ["SMTP_PASS"] = "p"
|
| 273 |
-
# Don't set SMTP_HOST or SMTP_PORT โ should use defaults
|
| 274 |
-
|
| 275 |
-
with patch("auth.smtplib.SMTP") as mock_smtp_class:
|
| 276 |
-
mock_server = MagicMock()
|
| 277 |
-
mock_smtp_class.return_value.__enter__ = MagicMock(return_value=mock_server)
|
| 278 |
-
mock_smtp_class.return_value.__exit__ = MagicMock(return_value=False)
|
| 279 |
-
|
| 280 |
-
_send_otp_email("r@example.com", "999999")
|
| 281 |
-
|
| 282 |
-
mock_smtp_class.assert_called_once_with("smtp.gmail.com", 587, timeout=15)
|
| 283 |
|
| 284 |
# โโ auth_gate โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
|
| 285 |
|
| 286 |
-
def
|
| 287 |
from auth import auth_gate
|
| 288 |
-
# PREFERO_AUTH_ENABLED not set โ disabled
|
| 289 |
self.assertTrue(auth_gate())
|
| 290 |
|
| 291 |
-
def
|
| 292 |
from auth import auth_gate
|
| 293 |
-
|
| 294 |
-
_mock_session_state
|
| 295 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 296 |
|
| 297 |
|
| 298 |
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
|
|
@@ -913,7 +704,6 @@ class TestUtils(unittest.TestCase):
|
|
| 913 |
|
| 914 |
def setUp(self):
|
| 915 |
_reset_session_state()
|
| 916 |
-
os.environ.pop("PREFERO_AUTH_ENABLED", None)
|
| 917 |
os.environ.pop("PREFERO_QUEUE_ENABLED", None)
|
| 918 |
|
| 919 |
# โโ Imports โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
|
|
@@ -997,6 +787,9 @@ class TestUtils(unittest.TestCase):
|
|
| 997 |
for key, default_val in _SESSION_DEFAULTS.items():
|
| 998 |
self.assertIn(key, _mock_session_state,
|
| 999 |
f"Key '{key}' not found in session state after init")
|
|
|
|
|
|
|
|
|
|
| 1000 |
self.assertEqual(_mock_session_state[key], default_val)
|
| 1001 |
|
| 1002 |
def test_init_session_state_does_not_overwrite_existing(self):
|
|
|
|
| 2 |
"""Comprehensive tests for newly created/modified modules in ๅๆไพ Prefero.
|
| 3 |
|
| 4 |
Tested modules:
|
| 5 |
+
1. app/auth.py โ Authentication (open access, username gate)
|
| 6 |
2. app/session_queue.py โ Concurrent user queue
|
| 7 |
3. app/waiting_facts.py โ Cultural facts list
|
| 8 |
4. app/pages/1_Data.py โ _generate_template_excel()
|
|
|
|
| 57 |
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
|
| 58 |
|
| 59 |
class TestAuth(unittest.TestCase):
|
| 60 |
+
"""Tests for the authentication module (open access)."""
|
| 61 |
|
| 62 |
def setUp(self):
|
| 63 |
_reset_session_state()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 64 |
|
| 65 |
# โโ auth_gate โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
|
| 66 |
|
| 67 |
+
def test_auth_gate_always_returns_true(self):
|
| 68 |
from auth import auth_gate
|
|
|
|
| 69 |
self.assertTrue(auth_gate())
|
| 70 |
|
| 71 |
+
def test_auth_gate_sets_authenticated_flag(self):
|
| 72 |
from auth import auth_gate
|
| 73 |
+
auth_gate()
|
| 74 |
+
self.assertTrue(_mock_session_state.get("authenticated"))
|
| 75 |
+
|
| 76 |
+
# โโ username_gate โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
|
| 77 |
+
|
| 78 |
+
def test_username_gate_returns_true_when_username_set(self):
|
| 79 |
+
from auth import username_gate
|
| 80 |
+
_mock_session_state["username"] = "TestUser"
|
| 81 |
+
self.assertTrue(username_gate())
|
| 82 |
+
|
| 83 |
+
def test_username_gate_returns_false_when_no_username(self):
|
| 84 |
+
from auth import username_gate
|
| 85 |
+
result = username_gate()
|
| 86 |
+
self.assertFalse(result)
|
| 87 |
|
| 88 |
|
| 89 |
# โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
|
|
|
|
| 704 |
|
| 705 |
def setUp(self):
|
| 706 |
_reset_session_state()
|
|
|
|
| 707 |
os.environ.pop("PREFERO_QUEUE_ENABLED", None)
|
| 708 |
|
| 709 |
# โโ Imports โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
|
|
|
|
| 787 |
for key, default_val in _SESSION_DEFAULTS.items():
|
| 788 |
self.assertIn(key, _mock_session_state,
|
| 789 |
f"Key '{key}' not found in session state after init")
|
| 790 |
+
# auth_gate() sets authenticated=True during init, so skip that check
|
| 791 |
+
if key == "authenticated":
|
| 792 |
+
continue
|
| 793 |
self.assertEqual(_mock_session_state[key], default_val)
|
| 794 |
|
| 795 |
def test_init_session_state_does_not_overwrite_existing(self):
|