File size: 3,588 Bytes
fa152ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import asyncio
import json
from urllib.parse import urlencode

import httpx
from fastapi import Request, HTTPException
from starlette.responses import RedirectResponse

from cbh.api.security.db_requests import verify_code_obj
from cbh.api.security.dto import GoogleCallbackError, VerificationCodeType
from cbh.api.security.models import VerificationCodeModel
from cbh.core.config import settings


def send_error_redirect(
    error: GoogleCallbackError, code: VerificationCodeModel | None = None
) -> RedirectResponse:
    """
    Send an error redirect.
    """
    if code:
        redirect_url = f"{settings.Audience}/signup?code={code.id}&error={error.value}"
    else:
        redirect_url = f"{settings.Audience}/login/callback?error={error.value}"
    return RedirectResponse(redirect_url)


def form_google_login_url():
    """
    Form the Google login URL with the given parameters.
    """
    params = {
        "client_id": settings.GOOGLE_CLIENT_ID,
        "redirect_uri": f"{settings.Issuer}/api/security/google/callback",
        "response_type": "code",
        "scope": "openid email profile",
        "access_type": "offline",
        "state": json.dumps({"secret": settings.SECRET_KEY}),
    }
    return f"https://accounts.google.com/o/oauth2/auth?{urlencode(params)}"


async def get_google_access_token(code: str) -> str:
    """
    Get the Google access token from the given code.
    """
    params = {
        "client_id": settings.GOOGLE_CLIENT_ID,
        "client_secret": settings.GOOGLE_CLIENT_SECRET,
        "code": code,
        "grant_type": "authorization_code",
        "redirect_uri": f"{settings.Issuer}/api/security/google/callback",
    }
    async with httpx.AsyncClient() as client:
        response = await client.post("https://oauth2.googleapis.com/token", data=params)
        response.raise_for_status()
        return response.json()["access_token"]


async def get_google_user_info(access_token: str) -> dict:
    """
    Get the Google user info from the given access token.
    """
    headers = {"Authorization": f"Bearer {access_token}"}
    async with httpx.AsyncClient() as client:
        response = await client.get(
            "https://www.googleapis.com/oauth2/v1/userinfo", headers=headers
        )
        response.raise_for_status()
        return response.json()


async def form_google_user_info(
    request: Request,
) -> dict:
    """
    Form the Google user info from the given request.
    """
    code = request.query_params.get("code")
    state = json.loads(request.query_params.get("state"))
    if state.get("secret") != settings.SECRET_KEY:
        raise HTTPException(status_code=403, detail="Permission denied")
    access_token = await get_google_access_token(code)
    user_info = await get_google_user_info(access_token)
    return user_info


async def _download_google_picture(user_info: dict) -> bytes | None:
    """
    Download the profile picture from the Google user info.
    """
    try:
        async with httpx.AsyncClient(timeout=httpx.Timeout(15)) as client:
            response = await client.get(user_info["picture"])
            response.raise_for_status()
            return response.content
    except Exception:
        return None


async def extract_and_upload_google_picture(user_info: dict) -> str | None:
    """
    Download and upload the Google picture to S3.
    """
    picture = await _download_google_picture(user_info)
    if picture:
        return settings.S3_CLIENT.upload_file(
            picture, f"{user_info['email']}.png", "pictures"
        )
    return None