from pathlib import Path # Clean BOM for path in Path("app").rglob("*.py"): text = path.read_text(encoding="utf-8-sig") text = text.replace("\ufeff", "") path.write_text(text, encoding="utf-8") print("BOM cleanup completed.") # ===================================================== # 1. requirements.txt update # ===================================================== req_path = Path("requirements.txt") if req_path.exists(): req = req_path.read_text(encoding="utf-8-sig") else: req = "" extras = [ "authlib>=1.3.0", "itsdangerous>=2.1.2" ] for item in extras: if item.split(">=")[0] not in req: req += "\n" + item req_path.write_text(req.strip() + "\n", encoding="utf-8") print("requirements.txt updated.") # ===================================================== # 2. Session-aware auth service # ===================================================== Path("app/product/auth_service.py").write_text(r''' import os from typing import Dict, Any, Optional from fastapi import Request, HTTPException from app.product.product_db import upsert_user DEFAULT_ADMIN_EMAILS = { "2006yugb@gmail.com" } def get_admin_emails(): raw = os.getenv("ADMIN_EMAILS", "") emails = { email.strip().lower() for email in raw.split(",") if email.strip() } return emails | DEFAULT_ADMIN_EMAILS def normalize_email(email: Optional[str]) -> str: return str(email or "").strip().lower() def make_user_id(email: str) -> str: return "user_" + email.replace("@", "_").replace(".", "_") def infer_role(email: str) -> str: if normalize_email(email) in get_admin_emails(): return "admin" return "user" def get_session_user(request: Request): try: return request.session.get("user") except Exception: return None def get_current_user_from_request(request: Request) -> Dict[str, Any]: """ Preferred: - Session cookie from /login or Google OAuth Temporary dev fallback: - X-User-Email header, controlled by ALLOW_HEADER_AUTH """ session_user = get_session_user(request) if session_user and session_user.get("email"): email = normalize_email(session_user.get("email")) role = infer_role(email) user_id = session_user.get("user_id") or make_user_id(email) user = upsert_user( user_id=user_id, email=email, name=session_user.get("name") or email.split("@")[0], role=role, auth_provider=session_user.get("auth_provider", "session") ) user["authenticated"] = True return user allow_header_auth = os.getenv("ALLOW_HEADER_AUTH", "true").strip().lower() in { "1", "true", "yes", "y" } if allow_header_auth: email = normalize_email(request.headers.get("x-user-email")) name = request.headers.get("x-user-name") if email: role = infer_role(email) user_id = make_user_id(email) user = upsert_user( user_id=user_id, email=email, name=name or email.split("@")[0], role=role, auth_provider="header_dev" ) user["authenticated"] = True return user return { "authenticated": False, "user_id": None, "email": None, "name": "Guest", "role": "guest", "auth_provider": "none" } def require_authenticated_user(request: Request) -> Dict[str, Any]: user = get_current_user_from_request(request) if not user.get("authenticated"): raise HTTPException( status_code=401, detail="Authentication required. Please login first." ) return user def require_admin_user(request: Request) -> Dict[str, Any]: user = require_authenticated_user(request) if user.get("role") != "admin": raise HTTPException( status_code=403, detail="Admin access required." ) return user def dev_login_user(email: str, name: Optional[str] = None) -> Dict[str, Any]: email = normalize_email(email) if not email: raise HTTPException(status_code=400, detail="email is required") role = infer_role(email) user_id = make_user_id(email) user = upsert_user( user_id=user_id, email=email, name=name or email.split("@")[0], role=role, auth_provider="dev_login" ) user["authenticated"] = True user["dev_header_hint"] = { "X-User-Email": email, "X-User-Name": name or email.split("@")[0] } return user ''', encoding="utf-8") # ===================================================== # 3. OAuth service # ===================================================== Path("app/product/oauth_service.py").write_text(r''' import os from typing import Optional, Dict, Any from fastapi import Request, HTTPException from starlette.responses import RedirectResponse from app.product.auth_service import infer_role, make_user_id, normalize_email from app.product.product_db import upsert_user try: from authlib.integrations.starlette_client import OAuth AUTHLIB_AVAILABLE = True AUTHLIB_ERROR = None except Exception as exc: OAuth = None AUTHLIB_AVAILABLE = False AUTHLIB_ERROR = str(exc) def get_google_client_id() -> str: return os.getenv("GOOGLE_CLIENT_ID", "").strip() def get_google_client_secret() -> str: return os.getenv("GOOGLE_CLIENT_SECRET", "").strip() def is_google_oauth_configured() -> bool: return bool(AUTHLIB_AVAILABLE and get_google_client_id() and get_google_client_secret()) def get_oauth_status() -> Dict[str, Any]: return { "authlib_available": AUTHLIB_AVAILABLE, "authlib_error": AUTHLIB_ERROR, "google_oauth_configured": is_google_oauth_configured(), "required_env_vars": [ "GOOGLE_CLIENT_ID", "GOOGLE_CLIENT_SECRET", "SESSION_SECRET_KEY", "ADMIN_EMAILS" ], "admin_email_default": "2006yugb@gmail.com" } def build_oauth_client(): if not AUTHLIB_AVAILABLE: raise HTTPException( status_code=500, detail=f"Authlib is not installed or failed to import: {AUTHLIB_ERROR}" ) if not get_google_client_id() or not get_google_client_secret(): raise HTTPException( status_code=400, detail="Google OAuth is not configured. Set GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET." ) oauth = OAuth() oauth.register( name="google", client_id=get_google_client_id(), client_secret=get_google_client_secret(), server_metadata_url="https://accounts.google.com/.well-known/openid-configuration", client_kwargs={ "scope": "openid email profile" } ) return oauth def set_user_session( request: Request, email: str, name: Optional[str] = None, avatar_url: Optional[str] = None, auth_provider: str = "session" ) -> Dict[str, Any]: email = normalize_email(email) if not email: raise HTTPException(status_code=400, detail="Email is required for login.") role = infer_role(email) user_id = make_user_id(email) user = upsert_user( user_id=user_id, email=email, name=name or email.split("@")[0], role=role, auth_provider=auth_provider, avatar_url=avatar_url ) session_user = { "authenticated": True, "user_id": user_id, "email": email, "name": name or email.split("@")[0], "role": role, "avatar_url": avatar_url, "auth_provider": auth_provider } request.session["user"] = session_user return session_user async def start_google_login(request: Request): oauth = build_oauth_client() redirect_uri = request.url_for("auth_google_callback") return await oauth.google.authorize_redirect(request, redirect_uri) async def finish_google_login(request: Request): oauth = build_oauth_client() try: token = await oauth.google.authorize_access_token(request) except Exception as exc: raise HTTPException( status_code=400, detail=f"Google OAuth callback failed: {exc}" ) userinfo = token.get("userinfo") if not userinfo: try: userinfo = await oauth.google.parse_id_token(request, token) except Exception: userinfo = {} email = normalize_email(userinfo.get("email")) name = userinfo.get("name") or userinfo.get("given_name") avatar_url = userinfo.get("picture") if not email: raise HTTPException( status_code=400, detail="Google login succeeded, but email was not returned." ) set_user_session( request=request, email=email, name=name, avatar_url=avatar_url, auth_provider="google" ) return RedirectResponse(url="/app", status_code=302) def dev_session_login(request: Request, email: str, name: Optional[str] = None): set_user_session( request=request, email=email, name=name, avatar_url=None, auth_provider="dev_session" ) return RedirectResponse(url="/app", status_code=302) def clear_session(request: Request): try: request.session.clear() except Exception: pass return RedirectResponse(url="/login", status_code=302) def get_session_payload(request: Request): try: user = request.session.get("user") except Exception: user = None if not user: return { "authenticated": False, "user": None } return { "authenticated": True, "user": user } ''', encoding="utf-8") # ===================================================== # 4. Login UI # ===================================================== Path("app/product/login_ui.py").write_text(r''' def get_login_html() -> str: return """
If Google OAuth is not configured yet, use dev login for local testing.