GraphResearcher / scripts /phase32_google_oauth_session_login.py
yugbirla's picture
Add Google OAuth and session login foundation
57243aa
Raw
History Blame Contribute Delete
16.7 kB
from pathlib import Path
# Clean BOM
for path in Path("app").rglob("*.py"):
text = path.read_text(encoding="utf-8-sig")
text = text.replace("\ufeff", "")
path.write_text(text, encoding="utf-8")
print("BOM cleanup completed.")
# =====================================================
# 1. requirements.txt update
# =====================================================
req_path = Path("requirements.txt")
if req_path.exists():
req = req_path.read_text(encoding="utf-8-sig")
else:
req = ""
extras = [
"authlib>=1.3.0",
"itsdangerous>=2.1.2"
]
for item in extras:
if item.split(">=")[0] not in req:
req += "\n" + item
req_path.write_text(req.strip() + "\n", encoding="utf-8")
print("requirements.txt updated.")
# =====================================================
# 2. Session-aware auth service
# =====================================================
Path("app/product/auth_service.py").write_text(r'''
import os
from typing import Dict, Any, Optional
from fastapi import Request, HTTPException
from app.product.product_db import upsert_user
DEFAULT_ADMIN_EMAILS = {
"2006yugb@gmail.com"
}
def get_admin_emails():
raw = os.getenv("ADMIN_EMAILS", "")
emails = {
email.strip().lower()
for email in raw.split(",")
if email.strip()
}
return emails | DEFAULT_ADMIN_EMAILS
def normalize_email(email: Optional[str]) -> str:
return str(email or "").strip().lower()
def make_user_id(email: str) -> str:
return "user_" + email.replace("@", "_").replace(".", "_")
def infer_role(email: str) -> str:
if normalize_email(email) in get_admin_emails():
return "admin"
return "user"
def get_session_user(request: Request):
try:
return request.session.get("user")
except Exception:
return None
def get_current_user_from_request(request: Request) -> Dict[str, Any]:
"""
Preferred:
- Session cookie from /login or Google OAuth
Temporary dev fallback:
- X-User-Email header, controlled by ALLOW_HEADER_AUTH
"""
session_user = get_session_user(request)
if session_user and session_user.get("email"):
email = normalize_email(session_user.get("email"))
role = infer_role(email)
user_id = session_user.get("user_id") or make_user_id(email)
user = upsert_user(
user_id=user_id,
email=email,
name=session_user.get("name") or email.split("@")[0],
role=role,
auth_provider=session_user.get("auth_provider", "session")
)
user["authenticated"] = True
return user
allow_header_auth = os.getenv("ALLOW_HEADER_AUTH", "true").strip().lower() in {
"1", "true", "yes", "y"
}
if allow_header_auth:
email = normalize_email(request.headers.get("x-user-email"))
name = request.headers.get("x-user-name")
if email:
role = infer_role(email)
user_id = make_user_id(email)
user = upsert_user(
user_id=user_id,
email=email,
name=name or email.split("@")[0],
role=role,
auth_provider="header_dev"
)
user["authenticated"] = True
return user
return {
"authenticated": False,
"user_id": None,
"email": None,
"name": "Guest",
"role": "guest",
"auth_provider": "none"
}
def require_authenticated_user(request: Request) -> Dict[str, Any]:
user = get_current_user_from_request(request)
if not user.get("authenticated"):
raise HTTPException(
status_code=401,
detail="Authentication required. Please login first."
)
return user
def require_admin_user(request: Request) -> Dict[str, Any]:
user = require_authenticated_user(request)
if user.get("role") != "admin":
raise HTTPException(
status_code=403,
detail="Admin access required."
)
return user
def dev_login_user(email: str, name: Optional[str] = None) -> Dict[str, Any]:
email = normalize_email(email)
if not email:
raise HTTPException(status_code=400, detail="email is required")
role = infer_role(email)
user_id = make_user_id(email)
user = upsert_user(
user_id=user_id,
email=email,
name=name or email.split("@")[0],
role=role,
auth_provider="dev_login"
)
user["authenticated"] = True
user["dev_header_hint"] = {
"X-User-Email": email,
"X-User-Name": name or email.split("@")[0]
}
return user
''', encoding="utf-8")
# =====================================================
# 3. OAuth service
# =====================================================
Path("app/product/oauth_service.py").write_text(r'''
import os
from typing import Optional, Dict, Any
from fastapi import Request, HTTPException
from starlette.responses import RedirectResponse
from app.product.auth_service import infer_role, make_user_id, normalize_email
from app.product.product_db import upsert_user
try:
from authlib.integrations.starlette_client import OAuth
AUTHLIB_AVAILABLE = True
AUTHLIB_ERROR = None
except Exception as exc:
OAuth = None
AUTHLIB_AVAILABLE = False
AUTHLIB_ERROR = str(exc)
def get_google_client_id() -> str:
return os.getenv("GOOGLE_CLIENT_ID", "").strip()
def get_google_client_secret() -> str:
return os.getenv("GOOGLE_CLIENT_SECRET", "").strip()
def is_google_oauth_configured() -> bool:
return bool(AUTHLIB_AVAILABLE and get_google_client_id() and get_google_client_secret())
def get_oauth_status() -> Dict[str, Any]:
return {
"authlib_available": AUTHLIB_AVAILABLE,
"authlib_error": AUTHLIB_ERROR,
"google_oauth_configured": is_google_oauth_configured(),
"required_env_vars": [
"GOOGLE_CLIENT_ID",
"GOOGLE_CLIENT_SECRET",
"SESSION_SECRET_KEY",
"ADMIN_EMAILS"
],
"admin_email_default": "2006yugb@gmail.com"
}
def build_oauth_client():
if not AUTHLIB_AVAILABLE:
raise HTTPException(
status_code=500,
detail=f"Authlib is not installed or failed to import: {AUTHLIB_ERROR}"
)
if not get_google_client_id() or not get_google_client_secret():
raise HTTPException(
status_code=400,
detail="Google OAuth is not configured. Set GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET."
)
oauth = OAuth()
oauth.register(
name="google",
client_id=get_google_client_id(),
client_secret=get_google_client_secret(),
server_metadata_url="https://accounts.google.com/.well-known/openid-configuration",
client_kwargs={
"scope": "openid email profile"
}
)
return oauth
def set_user_session(
request: Request,
email: str,
name: Optional[str] = None,
avatar_url: Optional[str] = None,
auth_provider: str = "session"
) -> Dict[str, Any]:
email = normalize_email(email)
if not email:
raise HTTPException(status_code=400, detail="Email is required for login.")
role = infer_role(email)
user_id = make_user_id(email)
user = upsert_user(
user_id=user_id,
email=email,
name=name or email.split("@")[0],
role=role,
auth_provider=auth_provider,
avatar_url=avatar_url
)
session_user = {
"authenticated": True,
"user_id": user_id,
"email": email,
"name": name or email.split("@")[0],
"role": role,
"avatar_url": avatar_url,
"auth_provider": auth_provider
}
request.session["user"] = session_user
return session_user
async def start_google_login(request: Request):
oauth = build_oauth_client()
redirect_uri = request.url_for("auth_google_callback")
return await oauth.google.authorize_redirect(request, redirect_uri)
async def finish_google_login(request: Request):
oauth = build_oauth_client()
try:
token = await oauth.google.authorize_access_token(request)
except Exception as exc:
raise HTTPException(
status_code=400,
detail=f"Google OAuth callback failed: {exc}"
)
userinfo = token.get("userinfo")
if not userinfo:
try:
userinfo = await oauth.google.parse_id_token(request, token)
except Exception:
userinfo = {}
email = normalize_email(userinfo.get("email"))
name = userinfo.get("name") or userinfo.get("given_name")
avatar_url = userinfo.get("picture")
if not email:
raise HTTPException(
status_code=400,
detail="Google login succeeded, but email was not returned."
)
set_user_session(
request=request,
email=email,
name=name,
avatar_url=avatar_url,
auth_provider="google"
)
return RedirectResponse(url="/app", status_code=302)
def dev_session_login(request: Request, email: str, name: Optional[str] = None):
set_user_session(
request=request,
email=email,
name=name,
avatar_url=None,
auth_provider="dev_session"
)
return RedirectResponse(url="/app", status_code=302)
def clear_session(request: Request):
try:
request.session.clear()
except Exception:
pass
return RedirectResponse(url="/login", status_code=302)
def get_session_payload(request: Request):
try:
user = request.session.get("user")
except Exception:
user = None
if not user:
return {
"authenticated": False,
"user": None
}
return {
"authenticated": True,
"user": user
}
''', encoding="utf-8")
# =====================================================
# 4. Login UI
# =====================================================
Path("app/product/login_ui.py").write_text(r'''
def get_login_html() -> str:
return """
<!DOCTYPE html>
<html>
<head>
<title>Login - GraphResearcher</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<style>
* { box-sizing: border-box; }
body {
margin: 0;
font-family: Inter, Arial, sans-serif;
background: linear-gradient(135deg, #0f172a, #1d4ed8);
color: #0f172a;
min-height: 100vh;
display: grid;
place-items: center;
padding: 24px;
}
.card {
width: min(460px, 96vw);
background: white;
border-radius: 24px;
padding: 30px;
box-shadow: 0 30px 80px rgba(0,0,0,0.32);
}
.brand {
font-size: 28px;
font-weight: 900;
margin-bottom: 6px;
letter-spacing: -0.8px;
}
.sub {
color: #64748b;
line-height: 1.6;
margin-bottom: 24px;
}
.btn {
display: block;
width: 100%;
text-align: center;
text-decoration: none;
border: none;
border-radius: 12px;
padding: 13px;
font-weight: 900;
margin-bottom: 12px;
cursor: pointer;
}
.google {
background: #2563eb;
color: white;
}
.dark {
background: #0f172a;
color: white;
}
input {
width: 100%;
border: 1px solid #cbd5e1;
border-radius: 11px;
padding: 12px;
margin-bottom: 10px;
}
.small {
color: #64748b;
font-size: 13px;
line-height: 1.5;
}
.status {
background: #f1f5f9;
border-radius: 12px;
padding: 12px;
font-size: 13px;
color: #334155;
margin-top: 12px;
white-space: pre-wrap;
}
</style>
</head>
<body>
<div class="card">
<div class="brand">GraphResearcher</div>
<div class="sub">
Login to upload documents, chat with sources, compare documents, and access your workspace.
</div>
<a class="btn google" href="/auth/google/login">Continue with Google</a>
<p class="small">
If Google OAuth is not configured yet, use dev login for local testing.
</p>
<input id="email" value="2006yugb@gmail.com" placeholder="email">
<input id="name" value="Admin" placeholder="name">
<button class="btn dark" onclick="devLogin()">Dev Login</button>
<div id="status" class="status">Checking OAuth status...</div>
</div>
<script>
async function checkStatus() {
try {
const res = await fetch("/auth/oauth-status");
const data = await res.json();
document.getElementById("status").textContent =
"Google OAuth configured: " + data.google_oauth_configured +
"\\nAdmin default: " + data.admin_email_default;
} catch (error) {
document.getElementById("status").textContent = "Could not load OAuth status.";
}
}
function devLogin() {
const email = encodeURIComponent(document.getElementById("email").value.trim());
const name = encodeURIComponent(document.getElementById("name").value.trim());
window.location.href = `/auth/dev-session?email=${email}&name=${name}`;
}
checkStatus();
</script>
</body>
</html>
"""
''', encoding="utf-8")
# =====================================================
# 5. Patch main.py
# =====================================================
main_path = Path("app/main.py")
main_text = main_path.read_text(encoding="utf-8-sig")
main_text = main_text.replace("\ufeff", "")
imports = '''
import os
from fastapi import Request, Query
from fastapi.responses import HTMLResponse
from starlette.middleware.sessions import SessionMiddleware
from app.product.login_ui import get_login_html
from app.product.oauth_service import (
get_oauth_status,
start_google_login,
finish_google_login,
dev_session_login,
clear_session,
get_session_payload
)
'''
if "from app.product.oauth_service import" not in main_text:
main_text = imports + "\n" + main_text
# Insert SessionMiddleware after app = FastAPI(...) block
if "app.add_middleware(SessionMiddleware" not in main_text:
lines = main_text.splitlines()
insert_index = None
for i, line in enumerate(lines):
if "app = FastAPI(" in line:
balance = 0
for j in range(i, len(lines)):
balance += lines[j].count("(")
balance -= lines[j].count(")")
if balance <= 0 and j >= i:
insert_index = j + 1
break
break
middleware_lines = [
"",
"app.add_middleware(",
" SessionMiddleware,",
" secret_key=os.getenv('SESSION_SECRET_KEY', 'dev-change-this-session-secret'),",
" same_site='lax',",
" https_only=False",
")",
""
]
if insert_index is not None:
lines[insert_index:insert_index] = middleware_lines
main_text = "\n".join(lines) + "\n"
print("Inserted SessionMiddleware.")
else:
print("WARNING: Could not find app = FastAPI(...) to insert SessionMiddleware.")
if "# OAuth login endpoints" not in main_text:
main_text += '''
# OAuth login endpoints
@app.get("/login", response_class=HTMLResponse)
def login_page():
return get_login_html()
@app.get("/auth/oauth-status")
def auth_oauth_status():
return get_oauth_status()
@app.get("/auth/session")
def auth_session(request: Request):
return get_session_payload(request)
@app.get("/auth/google/login")
async def auth_google_login(request: Request):
return await start_google_login(request)
@app.get("/auth/google/callback")
async def auth_google_callback(request: Request):
return await finish_google_login(request)
@app.get("/auth/dev-session")
def auth_dev_session(
request: Request,
email: str = Query(..., min_length=3),
name: str = Query(None)
):
return dev_session_login(
request=request,
email=email,
name=name
)
@app.get("/auth/logout")
def auth_logout(request: Request):
return clear_session(request)
'''
main_path.write_text(main_text, encoding="utf-8")
print("Phase 32 Google OAuth + session login foundation added.")