Spaces:
Running
Running
| """Universal OAuth provider routes β authorize, callback, connections CRUD.""" | |
| import logging | |
| from typing import List, Optional | |
| from uuid import UUID | |
| from fastapi import APIRouter, Depends, Query, Request | |
| from fastapi.responses import RedirectResponse | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlmodel import select | |
| from app.api import deps | |
| from app.core.config import settings | |
| from app.core.db import get_db | |
| from app.models.models import OAuthConnection, OAuthConnectionStatus, User, Workspace, WorkspaceRole | |
| from app.schemas.envelope import ResponseEnvelope, wrap_data, wrap_error | |
| from app.schemas.oauth import OAuthAuthorizeResponse, OAuthConnectionRead | |
| from app.services import oauth_service | |
| from app.services.audit_service import audit_event | |
| from app.services.oauth_providers import get_adapter | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter() | |
| def _redirect_uri(provider_code: str, request: Request) -> str: | |
| base = settings.APP_BASE_URL or str(request.base_url).rstrip("/") | |
| return f"{base}{settings.API_V1_STR}/oauth/{provider_code}/callback" | |
| # βββ Authorize ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def authorize( | |
| provider_code: str, | |
| request: Request, | |
| redirect_after: Optional[str] = Query(default=None), | |
| db: AsyncSession = Depends(get_db), | |
| current_user: User = Depends(deps.get_current_user), | |
| workspace: Optional[Workspace] = Depends(deps.get_active_workspace), | |
| ): | |
| """ | |
| Generate the provider authorization URL. The client should redirect the browser to it. | |
| Requires X-Workspace-ID header. | |
| RBAC: workspace Owner or Member (not Viewer). | |
| """ | |
| # RBAC: Viewers cannot connect integrations | |
| membership = await _get_membership(db, current_user.id, workspace.id) | |
| if membership.role == WorkspaceRole.VIEWER: | |
| return wrap_error("Viewers cannot connect OAuth integrations") | |
| try: | |
| provider = await oauth_service.get_provider(db, provider_code) | |
| client_id, _ = oauth_service._get_credentials(provider_code) | |
| except Exception as e: | |
| return wrap_error(str(e)) | |
| state = await oauth_service.create_oauth_state( | |
| db, | |
| provider_code=provider_code, | |
| workspace_id=workspace.id, | |
| user_id=current_user.id, | |
| redirect_after=redirect_after, | |
| ) | |
| await db.commit() | |
| adapter = get_adapter(provider_code) | |
| redirect_uri = _redirect_uri(provider_code, request) | |
| auth_url = oauth_service.build_authorization_url( | |
| provider=provider, | |
| adapter=adapter, | |
| state_token=state.state_token, | |
| redirect_uri=redirect_uri, | |
| client_id=client_id, | |
| ) | |
| await audit_event( | |
| db, | |
| action="oauth.authorize.started", | |
| entity_type="oauth_provider", | |
| entity_id=provider_code, | |
| actor_user_id=current_user.id, | |
| outcome="success", | |
| workspace_id=workspace.id, | |
| request=request, | |
| metadata={"provider_code": provider_code}, | |
| ) | |
| await db.commit() | |
| return wrap_data({"authorization_url": auth_url}) | |
| # βββ Callback βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def callback( | |
| provider_code: str, | |
| request: Request, | |
| code: Optional[str] = Query(default=None), | |
| state: Optional[str] = Query(default=None), | |
| error: Optional[str] = Query(default=None), | |
| db: AsyncSession = Depends(get_db), | |
| ): | |
| """ | |
| Provider redirects here after user authorization. | |
| No auth header β this is a browser redirect from the OAuth provider. | |
| On success: redirect to {FRONTEND_URL}/integrations/callback?status=success&provider={code} | |
| On failure: redirect to {FRONTEND_URL}/integrations/callback?status=error&provider={code}&reason={msg} | |
| """ | |
| frontend_base = settings.FRONTEND_URL | |
| def _error_redirect(reason: str) -> RedirectResponse: | |
| return RedirectResponse( | |
| f"{frontend_base}/integrations/callback?status=error&provider={provider_code}&reason={reason}" | |
| ) | |
| if error or not code or not state: | |
| return _error_redirect(error or "missing_params") | |
| # Validate state | |
| try: | |
| oauth_state = await oauth_service.validate_and_consume_state(db, state) | |
| except Exception: | |
| await db.rollback() | |
| return _error_redirect("invalid_state") | |
| # Exchange code for token | |
| try: | |
| provider = await oauth_service.get_provider(db, provider_code) | |
| adapter = get_adapter(provider_code) | |
| client_id, client_secret = oauth_service._get_credentials(provider_code) | |
| redirect_uri = _redirect_uri(provider_code, request) | |
| token_data = await oauth_service.exchange_code_for_token( | |
| provider=provider, | |
| adapter=adapter, | |
| code=code, | |
| redirect_uri=redirect_uri, | |
| client_id=client_id, | |
| client_secret=client_secret, | |
| ) | |
| except Exception as e: | |
| logger.exception(f"OAuth token exchange failed for {provider_code}: {e}") | |
| try: | |
| await audit_event( | |
| db, | |
| action="oauth.callback.failed", | |
| entity_type="oauth_provider", | |
| entity_id=provider_code, | |
| actor_type="service", | |
| outcome="failure", | |
| workspace_id=oauth_state.workspace_id, | |
| request=request, | |
| error_message=str(e), | |
| metadata={"provider_code": provider_code}, | |
| ) | |
| await db.commit() | |
| except Exception: | |
| pass | |
| return _error_redirect("token_exchange_failed") | |
| # Save connection | |
| try: | |
| conn = await oauth_service.save_connection( | |
| db=db, | |
| provider_code=provider_code, | |
| workspace_id=oauth_state.workspace_id, | |
| user_id=oauth_state.user_id, | |
| token_data=token_data, | |
| adapter=adapter, | |
| ) | |
| await audit_event( | |
| db, | |
| action="oauth.callback.success", | |
| entity_type="oauth_connection", | |
| entity_id=str(conn.id), | |
| actor_user_id=oauth_state.user_id, | |
| actor_type="user", | |
| outcome="success", | |
| workspace_id=oauth_state.workspace_id, | |
| request=request, | |
| metadata={ | |
| "provider_code": provider_code, | |
| "external_account_id": conn.external_account_id, | |
| "external_account_name": conn.external_account_name, | |
| }, | |
| ) | |
| await db.commit() | |
| except Exception as e: | |
| logger.exception(f"OAuth save_connection failed for {provider_code}: {e}") | |
| await db.rollback() | |
| return _error_redirect("save_failed") | |
| redirect_after = oauth_state.redirect_after or "/integrations" | |
| return RedirectResponse( | |
| f"{frontend_base}/integrations/callback?status=success&provider={provider_code}" | |
| f"&redirect_after={redirect_after}" | |
| ) | |
| # βββ List Connections ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def list_connections( | |
| db: AsyncSession = Depends(get_db), | |
| current_user: User = Depends(deps.get_current_user), | |
| workspace: Workspace = Depends(deps.get_active_workspace), | |
| ): | |
| result = await db.execute( | |
| select(OAuthConnection).where( | |
| OAuthConnection.workspace_id == workspace.id, | |
| OAuthConnection.status != OAuthConnectionStatus.REVOKED, | |
| ) | |
| ) | |
| conns = result.scalars().all() | |
| return wrap_data([OAuthConnectionRead.model_validate(c) for c in conns]) | |
| # βββ Disconnect ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def disconnect( | |
| connection_id: UUID, | |
| db: AsyncSession = Depends(get_db), | |
| current_user: User = Depends(deps.get_current_user), | |
| workspace: Workspace = Depends(deps.get_active_workspace), | |
| ): | |
| # RBAC: owner or member only | |
| membership = await _get_membership(db, current_user.id, workspace.id) | |
| if membership.role == WorkspaceRole.VIEWER: | |
| return wrap_error("Viewers cannot disconnect OAuth integrations") | |
| # Confirm connection belongs to this workspace | |
| result = await db.execute( | |
| select(OAuthConnection).where( | |
| OAuthConnection.id == connection_id, | |
| OAuthConnection.workspace_id == workspace.id, | |
| ) | |
| ) | |
| conn = result.scalars().first() | |
| if not conn: | |
| return wrap_error("OAuth connection not found") | |
| conn = await oauth_service.revoke_connection(db, connection_id) | |
| await db.commit() | |
| return wrap_data(OAuthConnectionRead.model_validate(conn)) | |
| # βββ Manual Refresh ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def refresh_connection( | |
| connection_id: UUID, | |
| db: AsyncSession = Depends(get_db), | |
| current_user: User = Depends(deps.get_current_user), | |
| workspace: Workspace = Depends(deps.get_active_workspace), | |
| ): | |
| # RBAC: owner only | |
| membership = await _get_membership(db, current_user.id, workspace.id) | |
| if membership.role != WorkspaceRole.OWNER: | |
| return wrap_error("Only workspace owners can refresh OAuth tokens") | |
| # Confirm connection belongs to this workspace | |
| result = await db.execute( | |
| select(OAuthConnection).where( | |
| OAuthConnection.id == connection_id, | |
| OAuthConnection.workspace_id == workspace.id, | |
| ) | |
| ) | |
| if not result.scalars().first(): | |
| return wrap_error("OAuth connection not found") | |
| try: | |
| conn = await oauth_service.refresh_access_token(db, connection_id) | |
| await db.commit() | |
| return wrap_data(OAuthConnectionRead.model_validate(conn)) | |
| except Exception as e: | |
| return wrap_error(str(e)) | |
| # βββ Helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def _get_membership(db: AsyncSession, user_id: UUID, workspace_id: UUID): | |
| from app.models.models import WorkspaceMember | |
| from fastapi import HTTPException | |
| result = await db.execute( | |
| select(WorkspaceMember).where( | |
| WorkspaceMember.user_id == user_id, | |
| WorkspaceMember.workspace_id == workspace_id, | |
| ) | |
| ) | |
| membership = result.scalars().first() | |
| if not membership: | |
| raise HTTPException(status_code=403, detail="User is not a member of this workspace") | |
| return membership | |