import asyncio import json from urllib.parse import urlencode import httpx from fastapi import Request, HTTPException from starlette.responses import RedirectResponse from cbh.api.security.db_requests import verify_code_obj from cbh.api.security.dto import GoogleCallbackError, VerificationCodeType from cbh.api.security.models import VerificationCodeModel from cbh.core.config import settings def send_error_redirect( error: GoogleCallbackError, code: VerificationCodeModel | None = None ) -> RedirectResponse: """ Send an error redirect. """ if code: redirect_url = f"{settings.Audience}/signup?code={code.id}&error={error.value}" else: redirect_url = f"{settings.Audience}/login/callback?error={error.value}" return RedirectResponse(redirect_url) def form_google_login_url(): """ Form the Google login URL with the given parameters. """ params = { "client_id": settings.GOOGLE_CLIENT_ID, "redirect_uri": f"{settings.Issuer}/api/security/google/callback", "response_type": "code", "scope": "openid email profile", "access_type": "offline", "state": json.dumps({"secret": settings.SECRET_KEY}), } return f"https://accounts.google.com/o/oauth2/auth?{urlencode(params)}" async def get_google_access_token(code: str) -> str: """ Get the Google access token from the given code. """ params = { "client_id": settings.GOOGLE_CLIENT_ID, "client_secret": settings.GOOGLE_CLIENT_SECRET, "code": code, "grant_type": "authorization_code", "redirect_uri": f"{settings.Issuer}/api/security/google/callback", } async with httpx.AsyncClient() as client: response = await client.post("https://oauth2.googleapis.com/token", data=params) response.raise_for_status() return response.json()["access_token"] async def get_google_user_info(access_token: str) -> dict: """ Get the Google user info from the given access token. """ headers = {"Authorization": f"Bearer {access_token}"} async with httpx.AsyncClient() as client: response = await client.get( "https://www.googleapis.com/oauth2/v1/userinfo", headers=headers ) response.raise_for_status() return response.json() async def form_google_user_info( request: Request, ) -> dict: """ Form the Google user info from the given request. """ code = request.query_params.get("code") state = json.loads(request.query_params.get("state")) if state.get("secret") != settings.SECRET_KEY: raise HTTPException(status_code=403, detail="Permission denied") access_token = await get_google_access_token(code) user_info = await get_google_user_info(access_token) return user_info async def _download_google_picture(user_info: dict) -> bytes | None: """ Download the profile picture from the Google user info. """ try: async with httpx.AsyncClient(timeout=httpx.Timeout(15)) as client: response = await client.get(user_info["picture"]) response.raise_for_status() return response.content except Exception: return None async def extract_and_upload_google_picture(user_info: dict) -> str | None: """ Download and upload the Google picture to S3. """ picture = await _download_google_picture(user_info) if picture: return settings.S3_CLIENT.upload_file( picture, f"{user_info['email']}.png", "pictures" ) return None