Spaces:
Running
Running
| import os | |
| import base64 | |
| import secrets | |
| import httpx | |
| import json | |
| import hmac | |
| import hashlib | |
| import time | |
| from datetime import datetime | |
| from fastapi import FastAPI, Request, HTTPException | |
| from fastapi.responses import RedirectResponse, Response | |
| from urllib.parse import quote, parse_qs | |
| app = FastAPI() | |
| LT_INTERNAL = "http://127.0.0.1:5000" | |
| CLIENT_ID = os.environ.get("OAUTH_CLIENT_ID") | |
| CLIENT_SECRET = os.environ.get("OAUTH_CLIENT_SECRET") | |
| SPACE_HOST = os.environ.get("SPACE_HOST", "localhost") | |
| REDIRECT_URI = f"https://{SPACE_HOST}/auth/callback" | |
| SECRET_KEY = os.environ.get("SESSION_SECRET", CLIENT_SECRET or "fallback-secret").encode() | |
| SECURE_COOKIE = not SPACE_HOST.startswith("localhost") | |
| def _sign(data: str) -> str: | |
| return hmac.new(SECRET_KEY, data.encode(), hashlib.sha256).hexdigest()[:16] | |
| def create_session_token() -> str: | |
| payload = {"auth": True, "iat": int(time.time())} | |
| payload_b64 = base64.urlsafe_b64encode(json.dumps(payload).encode()).decode().rstrip("=") | |
| return f"{payload_b64}.{_sign(payload_b64)}" | |
| def verify_session_token(token: str) -> bool: | |
| if not token or "." not in token: | |
| return False | |
| payload_b64, sig = token.split(".", 1) | |
| if not hmac.compare_digest(sig, _sign(payload_b64)): | |
| return False | |
| try: | |
| pad = 4 - len(payload_b64) % 4 | |
| payload_b64 += "=" * (pad if pad != 4 else 0) | |
| payload = json.loads(base64.urlsafe_b64decode(payload_b64).decode()) | |
| return payload.get("auth") == True | |
| except Exception: | |
| return False | |
| async def auth_login(): | |
| if not CLIENT_ID: | |
| raise HTTPException(status_code=500, detail="OAuth not configured") | |
| state = secrets.token_urlsafe(32) | |
| scope = quote("openid profile") | |
| url = ( | |
| f"https://huggingface.co/oauth/authorize" | |
| f"?client_id={CLIENT_ID}" | |
| f"&redirect_uri={quote(REDIRECT_URI)}" | |
| f"&response_type=code" | |
| f"&scope={scope}" | |
| f"&state={state}" | |
| ) | |
| resp = RedirectResponse(url) | |
| # Use SameSite=None for OAuth state cookie (cross-site redirect) | |
| resp.set_cookie( | |
| "oauth_state", state, | |
| max_age=600, httponly=True, | |
| samesite="none", | |
| secure=True, | |
| path="/" | |
| ) | |
| return resp | |
| async def auth_callback(code: str, state: str, request: Request): | |
| cookie_state = request.cookies.get("oauth_state", "") | |
| # Try state verification, but fall through if cookie is missing | |
| # (HF Spaces proxy may strip cookies on cross-site redirects) | |
| if cookie_state and cookie_state != state: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Invalid state. cookie={cookie_state[:20]}... query={state[:20]}..." | |
| ) | |
| basic = base64.b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode() | |
| async with httpx.AsyncClient() as client: | |
| r = await client.post( | |
| "https://huggingface.co/oauth/token", | |
| data={ | |
| "grant_type": "authorization_code", | |
| "code": code, | |
| "redirect_uri": REDIRECT_URI, | |
| }, | |
| headers={"Authorization": f"Basic {basic}"}, | |
| timeout=30.0, | |
| ) | |
| if r.status_code != 200: | |
| raise HTTPException(status_code=400, detail=f"HF OAuth failed: {r.text}") | |
| session_token = create_session_token() | |
| resp = RedirectResponse("/") | |
| resp.delete_cookie("oauth_state", path="/", samesite="none", secure=True) | |
| resp.set_cookie( | |
| "lt_session", session_token, | |
| httponly=True, samesite="lax", | |
| secure=SECURE_COOKIE, path="/", | |
| max_age=2592000 | |
| ) | |
| return resp | |
| async def auth_logout(): | |
| resp = RedirectResponse("/") | |
| resp.delete_cookie("lt_session", path="/") | |
| return resp | |
| def get_target_lang(body: bytes, content_type: str) -> str: | |
| if not body: | |
| return "" | |
| if "application/json" in content_type: | |
| try: | |
| data = json.loads(body) | |
| return data.get("target", "") or data.get("t", "") | |
| except Exception: | |
| return "" | |
| elif "application/x-www-form-urlencoded" in content_type: | |
| try: | |
| form = parse_qs(body.decode("utf-8")) | |
| return form.get("target", [""])[0] or form.get("t", [""])[0] | |
| except Exception: | |
| return "" | |
| return "" | |
| async def forward(request: Request, path: str, body: bytes): | |
| async with httpx.AsyncClient() as client: | |
| url = f"{LT_INTERNAL}/{path}" | |
| if request.query_params: | |
| url = f"{url}?{request.query_params}" | |
| headers = { | |
| k: v for k, v in request.headers.items() | |
| if k.lower() not in ("host", "content-length") | |
| } | |
| try: | |
| lt = await client.request( | |
| method=request.method, | |
| url=url, | |
| headers=headers, | |
| content=body, | |
| timeout=60.0, | |
| follow_redirects=True, | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=502, detail=f"LibreTranslate unreachable: {e}") | |
| excluded = {"content-encoding", "transfer-encoding", "content-length"} | |
| return Response( | |
| content=lt.content, | |
| status_code=lt.status_code, | |
| headers={k: v for k, v in lt.headers.items() if k.lower() not in excluded}, | |
| ) | |
| async def proxy(request: Request, path: str): | |
| body = await request.body() | |
| content_type = request.headers.get("content-type", "") | |
| if path == "suggest" and request.method == "POST": | |
| session_token = request.cookies.get("lt_session", "") | |
| if not verify_session_token(session_token): | |
| login_url = f"https://{SPACE_HOST}/auth/login" | |
| raise HTTPException( | |
| status_code=401, | |
| detail=f"Please log in with your HuggingFace account to submit suggestions: {login_url}" | |
| ) | |
| target = get_target_lang(body, content_type) | |
| if target and target != "kab": | |
| raise HTTPException( | |
| status_code=403, | |
| detail="Suggestions are only accepted for the Kabyle (kab) language." | |
| ) | |
| response = await forward(request, path, body) | |
| if request.method == "GET" and response.status_code == 200: | |
| resp_content_type = response.headers.get("content-type", "") | |
| if resp_content_type and "text/html" in resp_content_type: | |
| try: | |
| content = response.body.decode("utf-8", errors="replace") | |
| if "</body>" in content: | |
| session_token = request.cookies.get("lt_session", "") | |
| if not verify_session_token(session_token): | |
| login_btn = ( | |
| '<div style="position:fixed;top:10px;right:10px;z-index:9999;">' | |
| f'<a href="https://{SPACE_HOST}/auth/login" ' | |
| 'style="background:#ffcc4d;color:#000;padding:8px 16px;' | |
| 'border-radius:4px;text-decoration:none;font-weight:bold;' | |
| 'font-family:sans-serif;font-size:14px;">' | |
| '🔐 Login with HuggingFace to Suggest</a></div>' | |
| ) | |
| content = content.replace("</body>", f"{login_btn}</body>") | |
| response = Response( | |
| content=content.encode("utf-8"), | |
| status_code=response.status_code, | |
| headers={k: v for k, v in response.headers.items() | |
| if k.lower() not in ("content-length", "content-encoding")} | |
| ) | |
| except Exception: | |
| pass | |
| return response |