from pathlib import Path # Clean BOM for path in Path("app").rglob("*.py"): text = path.read_text(encoding="utf-8-sig") text = text.replace("\ufeff", "") path.write_text(text, encoding="utf-8") print("BOM cleanup completed.") # ===================================================== # 1. requirements.txt update # ===================================================== req_path = Path("requirements.txt") if req_path.exists(): req = req_path.read_text(encoding="utf-8-sig") else: req = "" extras = [ "authlib>=1.3.0", "itsdangerous>=2.1.2" ] for item in extras: if item.split(">=")[0] not in req: req += "\n" + item req_path.write_text(req.strip() + "\n", encoding="utf-8") print("requirements.txt updated.") # ===================================================== # 2. Session-aware auth service # ===================================================== Path("app/product/auth_service.py").write_text(r''' import os from typing import Dict, Any, Optional from fastapi import Request, HTTPException from app.product.product_db import upsert_user DEFAULT_ADMIN_EMAILS = { "2006yugb@gmail.com" } def get_admin_emails(): raw = os.getenv("ADMIN_EMAILS", "") emails = { email.strip().lower() for email in raw.split(",") if email.strip() } return emails | DEFAULT_ADMIN_EMAILS def normalize_email(email: Optional[str]) -> str: return str(email or "").strip().lower() def make_user_id(email: str) -> str: return "user_" + email.replace("@", "_").replace(".", "_") def infer_role(email: str) -> str: if normalize_email(email) in get_admin_emails(): return "admin" return "user" def get_session_user(request: Request): try: return request.session.get("user") except Exception: return None def get_current_user_from_request(request: Request) -> Dict[str, Any]: """ Preferred: - Session cookie from /login or Google OAuth Temporary dev fallback: - X-User-Email header, controlled by ALLOW_HEADER_AUTH """ session_user = get_session_user(request) if session_user and session_user.get("email"): email = normalize_email(session_user.get("email")) role = infer_role(email) user_id = session_user.get("user_id") or make_user_id(email) user = upsert_user( user_id=user_id, email=email, name=session_user.get("name") or email.split("@")[0], role=role, auth_provider=session_user.get("auth_provider", "session") ) user["authenticated"] = True return user allow_header_auth = os.getenv("ALLOW_HEADER_AUTH", "true").strip().lower() in { "1", "true", "yes", "y" } if allow_header_auth: email = normalize_email(request.headers.get("x-user-email")) name = request.headers.get("x-user-name") if email: role = infer_role(email) user_id = make_user_id(email) user = upsert_user( user_id=user_id, email=email, name=name or email.split("@")[0], role=role, auth_provider="header_dev" ) user["authenticated"] = True return user return { "authenticated": False, "user_id": None, "email": None, "name": "Guest", "role": "guest", "auth_provider": "none" } def require_authenticated_user(request: Request) -> Dict[str, Any]: user = get_current_user_from_request(request) if not user.get("authenticated"): raise HTTPException( status_code=401, detail="Authentication required. Please login first." ) return user def require_admin_user(request: Request) -> Dict[str, Any]: user = require_authenticated_user(request) if user.get("role") != "admin": raise HTTPException( status_code=403, detail="Admin access required." ) return user def dev_login_user(email: str, name: Optional[str] = None) -> Dict[str, Any]: email = normalize_email(email) if not email: raise HTTPException(status_code=400, detail="email is required") role = infer_role(email) user_id = make_user_id(email) user = upsert_user( user_id=user_id, email=email, name=name or email.split("@")[0], role=role, auth_provider="dev_login" ) user["authenticated"] = True user["dev_header_hint"] = { "X-User-Email": email, "X-User-Name": name or email.split("@")[0] } return user ''', encoding="utf-8") # ===================================================== # 3. OAuth service # ===================================================== Path("app/product/oauth_service.py").write_text(r''' import os from typing import Optional, Dict, Any from fastapi import Request, HTTPException from starlette.responses import RedirectResponse from app.product.auth_service import infer_role, make_user_id, normalize_email from app.product.product_db import upsert_user try: from authlib.integrations.starlette_client import OAuth AUTHLIB_AVAILABLE = True AUTHLIB_ERROR = None except Exception as exc: OAuth = None AUTHLIB_AVAILABLE = False AUTHLIB_ERROR = str(exc) def get_google_client_id() -> str: return os.getenv("GOOGLE_CLIENT_ID", "").strip() def get_google_client_secret() -> str: return os.getenv("GOOGLE_CLIENT_SECRET", "").strip() def is_google_oauth_configured() -> bool: return bool(AUTHLIB_AVAILABLE and get_google_client_id() and get_google_client_secret()) def get_oauth_status() -> Dict[str, Any]: return { "authlib_available": AUTHLIB_AVAILABLE, "authlib_error": AUTHLIB_ERROR, "google_oauth_configured": is_google_oauth_configured(), "required_env_vars": [ "GOOGLE_CLIENT_ID", "GOOGLE_CLIENT_SECRET", "SESSION_SECRET_KEY", "ADMIN_EMAILS" ], "admin_email_default": "2006yugb@gmail.com" } def build_oauth_client(): if not AUTHLIB_AVAILABLE: raise HTTPException( status_code=500, detail=f"Authlib is not installed or failed to import: {AUTHLIB_ERROR}" ) if not get_google_client_id() or not get_google_client_secret(): raise HTTPException( status_code=400, detail="Google OAuth is not configured. Set GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET." ) oauth = OAuth() oauth.register( name="google", client_id=get_google_client_id(), client_secret=get_google_client_secret(), server_metadata_url="https://accounts.google.com/.well-known/openid-configuration", client_kwargs={ "scope": "openid email profile" } ) return oauth def set_user_session( request: Request, email: str, name: Optional[str] = None, avatar_url: Optional[str] = None, auth_provider: str = "session" ) -> Dict[str, Any]: email = normalize_email(email) if not email: raise HTTPException(status_code=400, detail="Email is required for login.") role = infer_role(email) user_id = make_user_id(email) user = upsert_user( user_id=user_id, email=email, name=name or email.split("@")[0], role=role, auth_provider=auth_provider, avatar_url=avatar_url ) session_user = { "authenticated": True, "user_id": user_id, "email": email, "name": name or email.split("@")[0], "role": role, "avatar_url": avatar_url, "auth_provider": auth_provider } request.session["user"] = session_user return session_user async def start_google_login(request: Request): oauth = build_oauth_client() redirect_uri = request.url_for("auth_google_callback") return await oauth.google.authorize_redirect(request, redirect_uri) async def finish_google_login(request: Request): oauth = build_oauth_client() try: token = await oauth.google.authorize_access_token(request) except Exception as exc: raise HTTPException( status_code=400, detail=f"Google OAuth callback failed: {exc}" ) userinfo = token.get("userinfo") if not userinfo: try: userinfo = await oauth.google.parse_id_token(request, token) except Exception: userinfo = {} email = normalize_email(userinfo.get("email")) name = userinfo.get("name") or userinfo.get("given_name") avatar_url = userinfo.get("picture") if not email: raise HTTPException( status_code=400, detail="Google login succeeded, but email was not returned." ) set_user_session( request=request, email=email, name=name, avatar_url=avatar_url, auth_provider="google" ) return RedirectResponse(url="/app", status_code=302) def dev_session_login(request: Request, email: str, name: Optional[str] = None): set_user_session( request=request, email=email, name=name, avatar_url=None, auth_provider="dev_session" ) return RedirectResponse(url="/app", status_code=302) def clear_session(request: Request): try: request.session.clear() except Exception: pass return RedirectResponse(url="/login", status_code=302) def get_session_payload(request: Request): try: user = request.session.get("user") except Exception: user = None if not user: return { "authenticated": False, "user": None } return { "authenticated": True, "user": user } ''', encoding="utf-8") # ===================================================== # 4. Login UI # ===================================================== Path("app/product/login_ui.py").write_text(r''' def get_login_html() -> str: return """ Login - GraphResearcher
GraphResearcher
Login to upload documents, chat with sources, compare documents, and access your workspace.
Continue with Google

If Google OAuth is not configured yet, use dev login for local testing.

Checking OAuth status...
""" ''', encoding="utf-8") # ===================================================== # 5. Patch main.py # ===================================================== main_path = Path("app/main.py") main_text = main_path.read_text(encoding="utf-8-sig") main_text = main_text.replace("\ufeff", "") imports = ''' import os from fastapi import Request, Query from fastapi.responses import HTMLResponse from starlette.middleware.sessions import SessionMiddleware from app.product.login_ui import get_login_html from app.product.oauth_service import ( get_oauth_status, start_google_login, finish_google_login, dev_session_login, clear_session, get_session_payload ) ''' if "from app.product.oauth_service import" not in main_text: main_text = imports + "\n" + main_text # Insert SessionMiddleware after app = FastAPI(...) block if "app.add_middleware(SessionMiddleware" not in main_text: lines = main_text.splitlines() insert_index = None for i, line in enumerate(lines): if "app = FastAPI(" in line: balance = 0 for j in range(i, len(lines)): balance += lines[j].count("(") balance -= lines[j].count(")") if balance <= 0 and j >= i: insert_index = j + 1 break break middleware_lines = [ "", "app.add_middleware(", " SessionMiddleware,", " secret_key=os.getenv('SESSION_SECRET_KEY', 'dev-change-this-session-secret'),", " same_site='lax',", " https_only=False", ")", "" ] if insert_index is not None: lines[insert_index:insert_index] = middleware_lines main_text = "\n".join(lines) + "\n" print("Inserted SessionMiddleware.") else: print("WARNING: Could not find app = FastAPI(...) to insert SessionMiddleware.") if "# OAuth login endpoints" not in main_text: main_text += ''' # OAuth login endpoints @app.get("/login", response_class=HTMLResponse) def login_page(): return get_login_html() @app.get("/auth/oauth-status") def auth_oauth_status(): return get_oauth_status() @app.get("/auth/session") def auth_session(request: Request): return get_session_payload(request) @app.get("/auth/google/login") async def auth_google_login(request: Request): return await start_google_login(request) @app.get("/auth/google/callback") async def auth_google_callback(request: Request): return await finish_google_login(request) @app.get("/auth/dev-session") def auth_dev_session( request: Request, email: str = Query(..., min_length=3), name: str = Query(None) ): return dev_session_login( request=request, email=email, name=name ) @app.get("/auth/logout") def auth_logout(request: Request): return clear_session(request) ''' main_path.write_text(main_text, encoding="utf-8") print("Phase 32 Google OAuth + session login foundation added.")