staffily / cbh /api /security /services /utils.py
brestok's picture
init
fa152ae
import asyncio
import json
from urllib.parse import urlencode
import httpx
from fastapi import Request, HTTPException
from starlette.responses import RedirectResponse
from cbh.api.security.db_requests import verify_code_obj
from cbh.api.security.dto import GoogleCallbackError, VerificationCodeType
from cbh.api.security.models import VerificationCodeModel
from cbh.core.config import settings
def send_error_redirect(
error: GoogleCallbackError, code: VerificationCodeModel | None = None
) -> RedirectResponse:
"""
Send an error redirect.
"""
if code:
redirect_url = f"{settings.Audience}/signup?code={code.id}&error={error.value}"
else:
redirect_url = f"{settings.Audience}/login/callback?error={error.value}"
return RedirectResponse(redirect_url)
def form_google_login_url():
"""
Form the Google login URL with the given parameters.
"""
params = {
"client_id": settings.GOOGLE_CLIENT_ID,
"redirect_uri": f"{settings.Issuer}/api/security/google/callback",
"response_type": "code",
"scope": "openid email profile",
"access_type": "offline",
"state": json.dumps({"secret": settings.SECRET_KEY}),
}
return f"https://accounts.google.com/o/oauth2/auth?{urlencode(params)}"
async def get_google_access_token(code: str) -> str:
"""
Get the Google access token from the given code.
"""
params = {
"client_id": settings.GOOGLE_CLIENT_ID,
"client_secret": settings.GOOGLE_CLIENT_SECRET,
"code": code,
"grant_type": "authorization_code",
"redirect_uri": f"{settings.Issuer}/api/security/google/callback",
}
async with httpx.AsyncClient() as client:
response = await client.post("https://oauth2.googleapis.com/token", data=params)
response.raise_for_status()
return response.json()["access_token"]
async def get_google_user_info(access_token: str) -> dict:
"""
Get the Google user info from the given access token.
"""
headers = {"Authorization": f"Bearer {access_token}"}
async with httpx.AsyncClient() as client:
response = await client.get(
"https://www.googleapis.com/oauth2/v1/userinfo", headers=headers
)
response.raise_for_status()
return response.json()
async def form_google_user_info(
request: Request,
) -> dict:
"""
Form the Google user info from the given request.
"""
code = request.query_params.get("code")
state = json.loads(request.query_params.get("state"))
if state.get("secret") != settings.SECRET_KEY:
raise HTTPException(status_code=403, detail="Permission denied")
access_token = await get_google_access_token(code)
user_info = await get_google_user_info(access_token)
return user_info
async def _download_google_picture(user_info: dict) -> bytes | None:
"""
Download the profile picture from the Google user info.
"""
try:
async with httpx.AsyncClient(timeout=httpx.Timeout(15)) as client:
response = await client.get(user_info["picture"])
response.raise_for_status()
return response.content
except Exception:
return None
async def extract_and_upload_google_picture(user_info: dict) -> str | None:
"""
Download and upload the Google picture to S3.
"""
picture = await _download_google_picture(user_info)
if picture:
return settings.S3_CLIENT.upload_file(
picture, f"{user_info['email']}.png", "pictures"
)
return None