| import asyncio |
| import json |
| from urllib.parse import urlencode |
|
|
| import httpx |
| from fastapi import Request, HTTPException |
| from starlette.responses import RedirectResponse |
|
|
| from cbh.api.security.db_requests import verify_code_obj |
| from cbh.api.security.dto import GoogleCallbackError, VerificationCodeType |
| from cbh.api.security.models import VerificationCodeModel |
| from cbh.core.config import settings |
|
|
|
|
| def send_error_redirect( |
| error: GoogleCallbackError, code: VerificationCodeModel | None = None |
| ) -> RedirectResponse: |
| """ |
| Send an error redirect. |
| """ |
| if code: |
| redirect_url = f"{settings.Audience}/signup?code={code.id}&error={error.value}" |
| else: |
| redirect_url = f"{settings.Audience}/login/callback?error={error.value}" |
| return RedirectResponse(redirect_url) |
|
|
|
|
| def form_google_login_url(): |
| """ |
| Form the Google login URL with the given parameters. |
| """ |
| params = { |
| "client_id": settings.GOOGLE_CLIENT_ID, |
| "redirect_uri": f"{settings.Issuer}/api/security/google/callback", |
| "response_type": "code", |
| "scope": "openid email profile", |
| "access_type": "offline", |
| "state": json.dumps({"secret": settings.SECRET_KEY}), |
| } |
| return f"https://accounts.google.com/o/oauth2/auth?{urlencode(params)}" |
|
|
|
|
| async def get_google_access_token(code: str) -> str: |
| """ |
| Get the Google access token from the given code. |
| """ |
| params = { |
| "client_id": settings.GOOGLE_CLIENT_ID, |
| "client_secret": settings.GOOGLE_CLIENT_SECRET, |
| "code": code, |
| "grant_type": "authorization_code", |
| "redirect_uri": f"{settings.Issuer}/api/security/google/callback", |
| } |
| async with httpx.AsyncClient() as client: |
| response = await client.post("https://oauth2.googleapis.com/token", data=params) |
| response.raise_for_status() |
| return response.json()["access_token"] |
|
|
|
|
| async def get_google_user_info(access_token: str) -> dict: |
| """ |
| Get the Google user info from the given access token. |
| """ |
| headers = {"Authorization": f"Bearer {access_token}"} |
| async with httpx.AsyncClient() as client: |
| response = await client.get( |
| "https://www.googleapis.com/oauth2/v1/userinfo", headers=headers |
| ) |
| response.raise_for_status() |
| return response.json() |
|
|
|
|
| async def form_google_user_info( |
| request: Request, |
| ) -> dict: |
| """ |
| Form the Google user info from the given request. |
| """ |
| code = request.query_params.get("code") |
| state = json.loads(request.query_params.get("state")) |
| if state.get("secret") != settings.SECRET_KEY: |
| raise HTTPException(status_code=403, detail="Permission denied") |
| access_token = await get_google_access_token(code) |
| user_info = await get_google_user_info(access_token) |
| return user_info |
|
|
|
|
| async def _download_google_picture(user_info: dict) -> bytes | None: |
| """ |
| Download the profile picture from the Google user info. |
| """ |
| try: |
| async with httpx.AsyncClient(timeout=httpx.Timeout(15)) as client: |
| response = await client.get(user_info["picture"]) |
| response.raise_for_status() |
| return response.content |
| except Exception: |
| return None |
|
|
|
|
| async def extract_and_upload_google_picture(user_info: dict) -> str | None: |
| """ |
| Download and upload the Google picture to S3. |
| """ |
| picture = await _download_google_picture(user_info) |
| if picture: |
| return settings.S3_CLIENT.upload_file( |
| picture, f"{user_info['email']}.png", "pictures" |
| ) |
| return None |
|
|