Spaces:
Running
Running
| """ | |
| DocuScan AI — Cloud Storage Integration Router | |
| OAuth-based connections for Google Drive, OneDrive, Dropbox + credential-based S3. | |
| Popup OAuth flow to work inside HF Spaces iframes. | |
| """ | |
| import os | |
| import io | |
| import uuid | |
| import json | |
| import asyncio | |
| import logging | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from fastapi import APIRouter, Request, HTTPException, Depends | |
| from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse | |
| from jose import jwt, JWTError | |
| from sqlalchemy.orm import Session | |
| from cryptography.fernet import Fernet | |
| from database import get_db, SessionLocal | |
| from models import User, Organization, Document, CloudConnection, CloudImport, EmailImport | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(prefix="/api/cloud", tags=["cloud-storage"]) | |
| # ─── Config ─────────────────────────────────────────────────────────────────── | |
| SECRET_KEY = "docuscan-demo-secret-key-change-in-production" | |
| ALGORITHM = "HS256" | |
| UPLOAD_DIR = Path("/data/uploads") | |
| # Token encryption | |
| CLOUD_TOKEN_ENCRYPTION_KEY = os.environ.get("CLOUD_TOKEN_ENCRYPTION_KEY", "") | |
| _fernet = None | |
| def get_fernet(): | |
| global _fernet | |
| if _fernet is None: | |
| key = CLOUD_TOKEN_ENCRYPTION_KEY | |
| if not key: | |
| key = Fernet.generate_key().decode() | |
| logger.warning("CLOUD_TOKEN_ENCRYPTION_KEY not set — using ephemeral key (tokens won't survive restart)") | |
| _fernet = Fernet(key.encode() if isinstance(key, str) else key) | |
| return _fernet | |
| def encrypt_token(token: str) -> str: | |
| if not token: | |
| return "" | |
| return get_fernet().encrypt(token.encode()).decode() | |
| def decrypt_token(encrypted: str) -> str: | |
| if not encrypted: | |
| return "" | |
| try: | |
| return get_fernet().decrypt(encrypted.encode()).decode() | |
| except Exception: | |
| return "" | |
| # OAuth credentials from env | |
| GOOGLE_CLIENT_ID = os.environ.get("GOOGLE_OAUTH_CLIENT_ID", "") | |
| GOOGLE_CLIENT_SECRET = os.environ.get("GOOGLE_OAUTH_CLIENT_SECRET", "") | |
| MICROSOFT_CLIENT_ID = os.environ.get("MICROSOFT_CLIENT_ID", "") | |
| MICROSOFT_CLIENT_SECRET = os.environ.get("MICROSOFT_CLIENT_SECRET", "") | |
| MICROSOFT_TENANT_ID = os.environ.get("MICROSOFT_TENANT_ID", "common") | |
| DROPBOX_APP_KEY = os.environ.get("DROPBOX_APP_KEY", "") | |
| DROPBOX_APP_SECRET = os.environ.get("DROPBOX_APP_SECRET", "") | |
| # ─── Auth Helper ────────────────────────────────────────────────────────────── | |
| def verify_token(request: Request, db: Session = Depends(get_db)): | |
| auth = request.headers.get("Authorization", "") | |
| if not auth.startswith("Bearer "): | |
| raise HTTPException(status_code=401, detail="Not authenticated") | |
| try: | |
| payload = jwt.decode(auth.split(" ")[1], SECRET_KEY, algorithms=[ALGORITHM]) | |
| email = payload.get("sub") | |
| user = db.query(User).filter_by(email=email, is_active=True).first() | |
| if not user: | |
| raise HTTPException(status_code=401, detail="User not found") | |
| return user | |
| except JWTError: | |
| raise HTTPException(status_code=401, detail="Invalid token") | |
| def get_base_url(request: Request) -> str: | |
| """Build base URL from request host for dual-domain support.""" | |
| host = request.headers.get("host", "") | |
| scheme = request.headers.get("x-forwarded-proto", "https") | |
| return f"{scheme}://{host}" | |
| def create_oauth_state(user_id: int, provider: str, nonce: str = None) -> str: | |
| """Create signed JWT state parameter for OAuth CSRF protection.""" | |
| payload = { | |
| "user_id": user_id, | |
| "provider": provider, | |
| "nonce": nonce or uuid.uuid4().hex, | |
| "exp": datetime.utcnow() + timedelta(minutes=10), | |
| } | |
| return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) | |
| def verify_oauth_state(state: str) -> dict: | |
| """Verify and decode OAuth state JWT.""" | |
| try: | |
| return jwt.decode(state, SECRET_KEY, algorithms=[ALGORITHM]) | |
| except JWTError: | |
| raise HTTPException(status_code=400, detail="Invalid or expired OAuth state") | |
| def popup_close_html(success: bool, provider: str, error: str = "") -> HTMLResponse: | |
| """Render HTML that posts message to opener and closes popup.""" | |
| msg_type = "cloud-auth-complete" if success else "cloud-auth-error" | |
| return HTMLResponse(f"""<!DOCTYPE html><html><body> | |
| <script> | |
| if (window.opener) {{ | |
| window.opener.postMessage({{type: '{msg_type}', provider: '{provider}', error: '{error}'}}, '*'); | |
| }} | |
| window.close(); | |
| </script> | |
| <p>{'Connected successfully!' if success else f'Error: {error}'} You can close this window.</p> | |
| </body></html>""") | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| # ─── CONNECTION MANAGEMENT ───────────────────────────────────────────────────── | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| async def list_connections(user: User = Depends(verify_token), db: Session = Depends(get_db)): | |
| """List user's active cloud connections.""" | |
| q = db.query(CloudConnection).filter( | |
| CloudConnection.user_id == user.id, | |
| CloudConnection.status != "revoked", | |
| ) | |
| if user.role != "super_admin": | |
| q = q.filter(CloudConnection.org_id == user.org_id) | |
| conns = q.order_by(CloudConnection.created_at.desc()).all() | |
| return {"connections": [c.to_dict() for c in conns]} | |
| async def revoke_connection(connection_id: str, user: User = Depends(verify_token), | |
| db: Session = Depends(get_db)): | |
| """Soft-revoke a cloud connection.""" | |
| conn = db.query(CloudConnection).filter_by(connection_id=connection_id).first() | |
| if not conn: | |
| raise HTTPException(status_code=404, detail="Connection not found") | |
| if conn.user_id != user.id and user.role != "super_admin": | |
| raise HTTPException(status_code=403, detail="Not authorized") | |
| conn.status = "revoked" | |
| db.commit() | |
| return {"message": "Connection revoked", "connection_id": connection_id} | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| # ─── GOOGLE DRIVE OAuth ──────────────────────────────────────────────────────── | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| async def google_auth_start(request: Request, db: Session = Depends(get_db)): | |
| """Redirect popup to Google OAuth consent screen.""" | |
| # Get user from query param (popup can't send Bearer header easily) | |
| token = request.query_params.get("token", "") | |
| if not token: | |
| return HTMLResponse("<p>Missing token</p>", status_code=401) | |
| try: | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| user = db.query(User).filter_by(email=payload.get("sub"), is_active=True).first() | |
| if not user: | |
| return HTMLResponse("<p>User not found</p>", status_code=401) | |
| except JWTError: | |
| return HTMLResponse("<p>Invalid token</p>", status_code=401) | |
| if not GOOGLE_CLIENT_ID: | |
| return popup_close_html(False, "google_drive", "Google OAuth not configured") | |
| base_url = get_base_url(request) | |
| redirect_uri = f"{base_url}/api/cloud/google/callback" | |
| state = create_oauth_state(user.id, "google_drive") | |
| auth_url = ( | |
| "https://accounts.google.com/o/oauth2/v2/auth?" | |
| f"client_id={GOOGLE_CLIENT_ID}" | |
| f"&redirect_uri={redirect_uri}" | |
| "&response_type=code" | |
| "&scope=https://www.googleapis.com/auth/drive.readonly" | |
| "&access_type=offline" | |
| "&prompt=consent" | |
| f"&state={state}" | |
| ) | |
| return RedirectResponse(auth_url) | |
| async def google_callback(request: Request, db: Session = Depends(get_db)): | |
| """Handle Google OAuth callback — store tokens, close popup.""" | |
| code = request.query_params.get("code", "") | |
| state = request.query_params.get("state", "") | |
| error = request.query_params.get("error", "") | |
| if error: | |
| return popup_close_html(False, "google_drive", error) | |
| state_data = verify_oauth_state(state) | |
| user = db.query(User).filter_by(id=state_data["user_id"], is_active=True).first() | |
| if not user: | |
| return popup_close_html(False, "google_drive", "User not found") | |
| # Exchange code for tokens | |
| base_url = get_base_url(request) | |
| redirect_uri = f"{base_url}/api/cloud/google/callback" | |
| import httpx | |
| async with httpx.AsyncClient() as client: | |
| resp = await client.post("https://oauth2.googleapis.com/token", data={ | |
| "code": code, | |
| "client_id": GOOGLE_CLIENT_ID, | |
| "client_secret": GOOGLE_CLIENT_SECRET, | |
| "redirect_uri": redirect_uri, | |
| "grant_type": "authorization_code", | |
| }) | |
| if resp.status_code != 200: | |
| return popup_close_html(False, "google_drive", f"Token exchange failed: {resp.text}") | |
| tokens = resp.json() | |
| access_token = tokens.get("access_token", "") | |
| refresh_token = tokens.get("refresh_token", "") | |
| expires_in = tokens.get("expires_in", 3600) | |
| # Get user email from Google | |
| async with httpx.AsyncClient() as client: | |
| userinfo = await client.get("https://www.googleapis.com/oauth2/v2/userinfo", | |
| headers={"Authorization": f"Bearer {access_token}"}) | |
| google_email = userinfo.json().get("email", "") if userinfo.status_code == 200 else "" | |
| # Create or update connection | |
| conn_id = f"cc_{uuid.uuid4().hex[:8]}" | |
| conn = CloudConnection( | |
| connection_id=conn_id, | |
| user_id=user.id, | |
| org_id=user.org_id, | |
| provider="google_drive", | |
| display_name=f"Google Drive ({google_email})" if google_email else "Google Drive", | |
| access_token_encrypted=encrypt_token(access_token), | |
| refresh_token_encrypted=encrypt_token(refresh_token), | |
| token_expires_at=datetime.utcnow() + timedelta(seconds=expires_in), | |
| provider_user_email=google_email, | |
| status="active", | |
| ) | |
| db.add(conn) | |
| db.commit() | |
| return popup_close_html(True, "google_drive") | |
| async def refresh_google_token(conn: CloudConnection, db: Session) -> str: | |
| """Refresh Google access token if expired. Returns current valid token.""" | |
| if conn.token_expires_at and conn.token_expires_at > datetime.utcnow() + timedelta(minutes=2): | |
| return decrypt_token(conn.access_token_encrypted) | |
| refresh_token = decrypt_token(conn.refresh_token_encrypted) | |
| if not refresh_token: | |
| conn.status = "expired" | |
| db.commit() | |
| raise HTTPException(status_code=401, detail="No refresh token — please reconnect") | |
| import httpx | |
| async with httpx.AsyncClient() as client: | |
| resp = await client.post("https://oauth2.googleapis.com/token", data={ | |
| "client_id": GOOGLE_CLIENT_ID, | |
| "client_secret": GOOGLE_CLIENT_SECRET, | |
| "refresh_token": refresh_token, | |
| "grant_type": "refresh_token", | |
| }) | |
| if resp.status_code != 200: | |
| conn.status = "expired" | |
| db.commit() | |
| raise HTTPException(status_code=401, detail="Token refresh failed — please reconnect") | |
| tokens = resp.json() | |
| conn.access_token_encrypted = encrypt_token(tokens["access_token"]) | |
| conn.token_expires_at = datetime.utcnow() + timedelta(seconds=tokens.get("expires_in", 3600)) | |
| conn.status = "active" | |
| db.commit() | |
| return tokens["access_token"] | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| # ─── MICROSOFT OneDrive/SharePoint OAuth ─────────────────────────────────────── | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| async def onedrive_auth_start(request: Request, db: Session = Depends(get_db)): | |
| """Redirect popup to Microsoft Entra ID OAuth.""" | |
| token = request.query_params.get("token", "") | |
| if not token: | |
| return HTMLResponse("<p>Missing token</p>", status_code=401) | |
| try: | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| user = db.query(User).filter_by(email=payload.get("sub"), is_active=True).first() | |
| if not user: | |
| return HTMLResponse("<p>User not found</p>", status_code=401) | |
| except JWTError: | |
| return HTMLResponse("<p>Invalid token</p>", status_code=401) | |
| if not MICROSOFT_CLIENT_ID: | |
| return popup_close_html(False, "onedrive", "Microsoft OAuth not configured") | |
| base_url = get_base_url(request) | |
| redirect_uri = f"{base_url}/api/cloud/onedrive/callback" | |
| state = create_oauth_state(user.id, "onedrive") | |
| auth_url = ( | |
| f"https://login.microsoftonline.com/{MICROSOFT_TENANT_ID}/oauth2/v2.0/authorize?" | |
| f"client_id={MICROSOFT_CLIENT_ID}" | |
| f"&redirect_uri={redirect_uri}" | |
| "&response_type=code" | |
| "&scope=https://graph.microsoft.com/Files.Read.All https://graph.microsoft.com/Sites.Read.All offline_access" | |
| f"&state={state}" | |
| ) | |
| return RedirectResponse(auth_url) | |
| async def onedrive_callback(request: Request, db: Session = Depends(get_db)): | |
| """Handle Microsoft OAuth callback.""" | |
| code = request.query_params.get("code", "") | |
| state = request.query_params.get("state", "") | |
| error = request.query_params.get("error", "") | |
| if error: | |
| return popup_close_html(False, "onedrive", request.query_params.get("error_description", error)) | |
| state_data = verify_oauth_state(state) | |
| user = db.query(User).filter_by(id=state_data["user_id"], is_active=True).first() | |
| if not user: | |
| return popup_close_html(False, "onedrive", "User not found") | |
| base_url = get_base_url(request) | |
| redirect_uri = f"{base_url}/api/cloud/onedrive/callback" | |
| import httpx | |
| async with httpx.AsyncClient() as client: | |
| resp = await client.post( | |
| f"https://login.microsoftonline.com/{MICROSOFT_TENANT_ID}/oauth2/v2.0/token", | |
| data={ | |
| "code": code, | |
| "client_id": MICROSOFT_CLIENT_ID, | |
| "client_secret": MICROSOFT_CLIENT_SECRET, | |
| "redirect_uri": redirect_uri, | |
| "grant_type": "authorization_code", | |
| }, | |
| ) | |
| if resp.status_code != 200: | |
| return popup_close_html(False, "onedrive", f"Token exchange failed") | |
| tokens = resp.json() | |
| access_token = tokens.get("access_token", "") | |
| refresh_token = tokens.get("refresh_token", "") | |
| expires_in = tokens.get("expires_in", 3600) | |
| # Get user profile | |
| async with httpx.AsyncClient() as client: | |
| profile = await client.get("https://graph.microsoft.com/v1.0/me", | |
| headers={"Authorization": f"Bearer {access_token}"}) | |
| ms_email = "" | |
| ms_name = "OneDrive" | |
| if profile.status_code == 200: | |
| pdata = profile.json() | |
| ms_email = pdata.get("mail", pdata.get("userPrincipalName", "")) | |
| ms_name = pdata.get("displayName", "OneDrive") | |
| conn_id = f"cc_{uuid.uuid4().hex[:8]}" | |
| conn = CloudConnection( | |
| connection_id=conn_id, | |
| user_id=user.id, | |
| org_id=user.org_id, | |
| provider="onedrive", | |
| display_name=f"OneDrive ({ms_email})" if ms_email else f"OneDrive ({ms_name})", | |
| access_token_encrypted=encrypt_token(access_token), | |
| refresh_token_encrypted=encrypt_token(refresh_token), | |
| token_expires_at=datetime.utcnow() + timedelta(seconds=expires_in), | |
| provider_user_email=ms_email, | |
| status="active", | |
| ) | |
| db.add(conn) | |
| db.commit() | |
| return popup_close_html(True, "onedrive") | |
| async def refresh_onedrive_token(conn: CloudConnection, db: Session) -> str: | |
| """Refresh Microsoft access token if expired.""" | |
| if conn.token_expires_at and conn.token_expires_at > datetime.utcnow() + timedelta(minutes=2): | |
| return decrypt_token(conn.access_token_encrypted) | |
| refresh_token = decrypt_token(conn.refresh_token_encrypted) | |
| if not refresh_token: | |
| conn.status = "expired" | |
| db.commit() | |
| raise HTTPException(status_code=401, detail="No refresh token — please reconnect") | |
| import httpx | |
| async with httpx.AsyncClient() as client: | |
| resp = await client.post( | |
| f"https://login.microsoftonline.com/{MICROSOFT_TENANT_ID}/oauth2/v2.0/token", | |
| data={ | |
| "client_id": MICROSOFT_CLIENT_ID, | |
| "client_secret": MICROSOFT_CLIENT_SECRET, | |
| "refresh_token": refresh_token, | |
| "grant_type": "refresh_token", | |
| }, | |
| ) | |
| if resp.status_code != 200: | |
| conn.status = "expired" | |
| db.commit() | |
| raise HTTPException(status_code=401, detail="Token refresh failed — please reconnect") | |
| tokens = resp.json() | |
| conn.access_token_encrypted = encrypt_token(tokens["access_token"]) | |
| if tokens.get("refresh_token"): | |
| conn.refresh_token_encrypted = encrypt_token(tokens["refresh_token"]) | |
| conn.token_expires_at = datetime.utcnow() + timedelta(seconds=tokens.get("expires_in", 3600)) | |
| conn.status = "active" | |
| db.commit() | |
| return tokens["access_token"] | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| # ─── DROPBOX OAuth ───────────────────────────────────────────────────────────── | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| async def dropbox_auth_start(request: Request, db: Session = Depends(get_db)): | |
| """Redirect popup to Dropbox OAuth.""" | |
| token = request.query_params.get("token", "") | |
| if not token: | |
| return HTMLResponse("<p>Missing token</p>", status_code=401) | |
| try: | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| user = db.query(User).filter_by(email=payload.get("sub"), is_active=True).first() | |
| if not user: | |
| return HTMLResponse("<p>User not found</p>", status_code=401) | |
| except JWTError: | |
| return HTMLResponse("<p>Invalid token</p>", status_code=401) | |
| if not DROPBOX_APP_KEY: | |
| return popup_close_html(False, "dropbox", "Dropbox OAuth not configured") | |
| base_url = get_base_url(request) | |
| redirect_uri = f"{base_url}/api/cloud/dropbox/callback" | |
| state = create_oauth_state(user.id, "dropbox") | |
| auth_url = ( | |
| "https://www.dropbox.com/oauth2/authorize?" | |
| f"client_id={DROPBOX_APP_KEY}" | |
| f"&redirect_uri={redirect_uri}" | |
| "&response_type=code" | |
| "&token_access_type=offline" | |
| f"&state={state}" | |
| ) | |
| return RedirectResponse(auth_url) | |
| async def dropbox_callback(request: Request, db: Session = Depends(get_db)): | |
| """Handle Dropbox OAuth callback.""" | |
| code = request.query_params.get("code", "") | |
| state = request.query_params.get("state", "") | |
| error = request.query_params.get("error", "") | |
| if error: | |
| return popup_close_html(False, "dropbox", error) | |
| state_data = verify_oauth_state(state) | |
| user = db.query(User).filter_by(id=state_data["user_id"], is_active=True).first() | |
| if not user: | |
| return popup_close_html(False, "dropbox", "User not found") | |
| base_url = get_base_url(request) | |
| redirect_uri = f"{base_url}/api/cloud/dropbox/callback" | |
| import httpx | |
| async with httpx.AsyncClient() as client: | |
| resp = await client.post("https://api.dropboxapi.com/oauth2/token", data={ | |
| "code": code, | |
| "grant_type": "authorization_code", | |
| "redirect_uri": redirect_uri, | |
| "client_id": DROPBOX_APP_KEY, | |
| "client_secret": DROPBOX_APP_SECRET, | |
| }) | |
| if resp.status_code != 200: | |
| return popup_close_html(False, "dropbox", "Token exchange failed") | |
| tokens = resp.json() | |
| access_token = tokens.get("access_token", "") | |
| refresh_token = tokens.get("refresh_token", "") | |
| expires_in = tokens.get("expires_in", 14400) | |
| # Get account info | |
| dbx_email = "" | |
| dbx_name = "Dropbox" | |
| async with httpx.AsyncClient() as client: | |
| acct = await client.post("https://api.dropboxapi.com/2/users/get_current_account", | |
| headers={"Authorization": f"Bearer {access_token}"}, | |
| content=b"null") | |
| if acct.status_code == 200: | |
| adata = acct.json() | |
| dbx_email = adata.get("email", "") | |
| dbx_name = adata.get("name", {}).get("display_name", "Dropbox") | |
| conn_id = f"cc_{uuid.uuid4().hex[:8]}" | |
| conn = CloudConnection( | |
| connection_id=conn_id, | |
| user_id=user.id, | |
| org_id=user.org_id, | |
| provider="dropbox", | |
| display_name=f"Dropbox ({dbx_email})" if dbx_email else f"Dropbox ({dbx_name})", | |
| access_token_encrypted=encrypt_token(access_token), | |
| refresh_token_encrypted=encrypt_token(refresh_token), | |
| token_expires_at=datetime.utcnow() + timedelta(seconds=expires_in), | |
| provider_user_email=dbx_email, | |
| status="active", | |
| ) | |
| db.add(conn) | |
| db.commit() | |
| return popup_close_html(True, "dropbox") | |
| async def refresh_dropbox_token(conn: CloudConnection, db: Session) -> str: | |
| """Refresh Dropbox access token if expired.""" | |
| if conn.token_expires_at and conn.token_expires_at > datetime.utcnow() + timedelta(minutes=2): | |
| return decrypt_token(conn.access_token_encrypted) | |
| refresh_token = decrypt_token(conn.refresh_token_encrypted) | |
| if not refresh_token: | |
| conn.status = "expired" | |
| db.commit() | |
| raise HTTPException(status_code=401, detail="No refresh token — please reconnect") | |
| import httpx | |
| async with httpx.AsyncClient() as client: | |
| resp = await client.post("https://api.dropboxapi.com/oauth2/token", data={ | |
| "refresh_token": refresh_token, | |
| "grant_type": "refresh_token", | |
| "client_id": DROPBOX_APP_KEY, | |
| "client_secret": DROPBOX_APP_SECRET, | |
| }) | |
| if resp.status_code != 200: | |
| conn.status = "expired" | |
| db.commit() | |
| raise HTTPException(status_code=401, detail="Token refresh failed — please reconnect") | |
| tokens = resp.json() | |
| conn.access_token_encrypted = encrypt_token(tokens["access_token"]) | |
| conn.token_expires_at = datetime.utcnow() + timedelta(seconds=tokens.get("expires_in", 14400)) | |
| conn.status = "active" | |
| db.commit() | |
| return tokens["access_token"] | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| # ─── AWS S3 (Credential-based, no OAuth) ────────────────────────────────────── | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| async def s3_connect(request: Request, user: User = Depends(verify_token), | |
| db: Session = Depends(get_db)): | |
| """Create S3 connection with access key credentials.""" | |
| body = await request.json() | |
| access_key = body.get("access_key", "").strip() | |
| secret_key = body.get("secret_key", "").strip() | |
| bucket = body.get("bucket", "").strip() | |
| region = body.get("region", "us-east-1").strip() | |
| if not all([access_key, secret_key, bucket]): | |
| raise HTTPException(status_code=400, detail="access_key, secret_key, and bucket are required") | |
| # Validate credentials by listing bucket | |
| try: | |
| import boto3 | |
| s3 = boto3.client("s3", aws_access_key_id=access_key, aws_secret_access_key=secret_key, | |
| region_name=region) | |
| s3.head_bucket(Bucket=bucket) | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"S3 validation failed: {str(e)}") | |
| conn_id = f"cc_{uuid.uuid4().hex[:8]}" | |
| conn = CloudConnection( | |
| connection_id=conn_id, | |
| user_id=user.id, | |
| org_id=user.org_id, | |
| provider="s3", | |
| display_name=f"S3: {bucket}", | |
| access_token_encrypted=encrypt_token(access_key), | |
| refresh_token_encrypted=encrypt_token(secret_key), | |
| provider_metadata={"bucket": bucket, "region": region}, | |
| status="active", | |
| ) | |
| db.add(conn) | |
| db.commit() | |
| return {"connection_id": conn_id, "message": f"Connected to S3 bucket: {bucket}"} | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| # ─── FILE BROWSING (Provider-agnostic) ──────────────────────────────────────── | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| IMPORTABLE_EXTENSIONS = {".pdf", ".csv", ".xlsx", ".xls", ".docx", ".txt", ".doc"} | |
| def is_importable(filename: str) -> bool: | |
| return Path(filename).suffix.lower() in IMPORTABLE_EXTENSIONS | |
| async def browse_files(connection_id: str, path: str = "/", request: Request = None, | |
| user: User = Depends(verify_token), db: Session = Depends(get_db)): | |
| """Browse files and folders in a cloud connection.""" | |
| conn = db.query(CloudConnection).filter_by(connection_id=connection_id, status="active").first() | |
| if not conn: | |
| raise HTTPException(status_code=404, detail="Connection not found or inactive") | |
| if conn.user_id != user.id and user.role != "super_admin": | |
| raise HTTPException(status_code=403, detail="Not authorized") | |
| conn.last_used_at = datetime.utcnow() | |
| db.commit() | |
| if conn.provider == "google_drive": | |
| return await browse_google_drive(conn, path, db) | |
| elif conn.provider == "onedrive": | |
| return await browse_onedrive(conn, path, db) | |
| elif conn.provider == "s3": | |
| return await browse_s3(conn, path) | |
| elif conn.provider == "dropbox": | |
| return await browse_dropbox(conn, path, db) | |
| else: | |
| raise HTTPException(status_code=400, detail=f"Unsupported provider: {conn.provider}") | |
| async def browse_google_drive(conn: CloudConnection, path: str, db: Session): | |
| """Browse Google Drive files.""" | |
| access_token = await refresh_google_token(conn, db) | |
| import httpx | |
| # path = "root" or a folder ID | |
| folder_id = path if path != "/" else "root" | |
| query = f"'{folder_id}' in parents and trashed = false" | |
| items = [] | |
| page_token = None | |
| async with httpx.AsyncClient() as client: | |
| while True: | |
| params = { | |
| "q": query, | |
| "fields": "nextPageToken,files(id,name,mimeType,size,modifiedTime)", | |
| "pageSize": 100, | |
| "orderBy": "folder,name", | |
| } | |
| if page_token: | |
| params["pageToken"] = page_token | |
| resp = await client.get("https://www.googleapis.com/drive/v3/files", | |
| headers={"Authorization": f"Bearer {access_token}"}, | |
| params=params) | |
| if resp.status_code != 200: | |
| raise HTTPException(status_code=502, detail="Failed to list Google Drive files") | |
| data = resp.json() | |
| for f in data.get("files", []): | |
| is_folder = f["mimeType"] == "application/vnd.google-apps.folder" | |
| items.append({ | |
| "id": f["id"], | |
| "name": f["name"], | |
| "type": "folder" if is_folder else "file", | |
| "size": int(f.get("size", 0)), | |
| "modified": f.get("modifiedTime", ""), | |
| "importable": not is_folder and is_importable(f["name"]), | |
| }) | |
| page_token = data.get("nextPageToken") | |
| if not page_token: | |
| break | |
| return {"path": path, "items": items} | |
| async def browse_onedrive(conn: CloudConnection, path: str, db: Session): | |
| """Browse OneDrive/SharePoint files.""" | |
| access_token = await refresh_onedrive_token(conn, db) | |
| import httpx | |
| if path == "/" or path == "root": | |
| url = "https://graph.microsoft.com/v1.0/me/drive/root/children" | |
| else: | |
| url = f"https://graph.microsoft.com/v1.0/me/drive/items/{path}/children" | |
| items = [] | |
| async with httpx.AsyncClient() as client: | |
| resp = await client.get(url, headers={"Authorization": f"Bearer {access_token}"}, | |
| params={"$top": 200, "$orderby": "name"}) | |
| if resp.status_code != 200: | |
| raise HTTPException(status_code=502, detail="Failed to list OneDrive files") | |
| data = resp.json() | |
| for f in data.get("value", []): | |
| is_folder = "folder" in f | |
| items.append({ | |
| "id": f["id"], | |
| "name": f["name"], | |
| "type": "folder" if is_folder else "file", | |
| "size": f.get("size", 0), | |
| "modified": f.get("lastModifiedDateTime", ""), | |
| "importable": not is_folder and is_importable(f["name"]), | |
| }) | |
| return {"path": path, "items": items} | |
| async def browse_s3(conn: CloudConnection, path: str): | |
| """Browse S3 bucket.""" | |
| access_key = decrypt_token(conn.access_token_encrypted) | |
| secret_key = decrypt_token(conn.refresh_token_encrypted) | |
| meta = conn.provider_metadata | |
| bucket = meta.get("bucket", "") | |
| region = meta.get("region", "us-east-1") | |
| import boto3 | |
| s3 = boto3.client("s3", aws_access_key_id=access_key, aws_secret_access_key=secret_key, | |
| region_name=region) | |
| prefix = path.lstrip("/") | |
| if prefix and not prefix.endswith("/"): | |
| prefix += "/" | |
| if prefix == "/": | |
| prefix = "" | |
| resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix, Delimiter="/", MaxKeys=200) | |
| items = [] | |
| # Folders (common prefixes) | |
| for cp in resp.get("CommonPrefixes", []): | |
| folder_name = cp["Prefix"].rstrip("/").split("/")[-1] | |
| items.append({ | |
| "id": cp["Prefix"], | |
| "name": folder_name, | |
| "type": "folder", | |
| "size": 0, | |
| "modified": "", | |
| "importable": False, | |
| }) | |
| # Files | |
| for obj in resp.get("Contents", []): | |
| key = obj["Key"] | |
| if key == prefix: | |
| continue | |
| name = key.split("/")[-1] | |
| items.append({ | |
| "id": key, | |
| "name": name, | |
| "type": "file", | |
| "size": obj.get("Size", 0), | |
| "modified": obj.get("LastModified", "").isoformat() if hasattr(obj.get("LastModified", ""), "isoformat") else str(obj.get("LastModified", "")), | |
| "importable": is_importable(name), | |
| }) | |
| return {"path": path, "items": items} | |
| async def browse_dropbox(conn: CloudConnection, path: str, db: Session): | |
| """Browse Dropbox files.""" | |
| access_token = await refresh_dropbox_token(conn, db) | |
| import httpx | |
| dbx_path = "" if path == "/" else path | |
| items = [] | |
| async with httpx.AsyncClient() as client: | |
| resp = await client.post( | |
| "https://api.dropboxapi.com/2/files/list_folder", | |
| headers={"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"}, | |
| json={"path": dbx_path, "limit": 200, "include_non_downloadable_files": False}, | |
| ) | |
| if resp.status_code != 200: | |
| raise HTTPException(status_code=502, detail="Failed to list Dropbox files") | |
| data = resp.json() | |
| for entry in data.get("entries", []): | |
| is_folder = entry[".tag"] == "folder" | |
| items.append({ | |
| "id": entry.get("id", entry.get("path_lower", "")), | |
| "name": entry["name"], | |
| "type": "folder" if is_folder else "file", | |
| "size": entry.get("size", 0), | |
| "modified": entry.get("server_modified", ""), | |
| "importable": not is_folder and is_importable(entry["name"]), | |
| "path_lower": entry.get("path_lower", ""), | |
| }) | |
| return {"path": path, "items": items} | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| # ─── SEARCH ──────────────────────────────────────────────────────────────────── | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| async def search_files(connection_id: str, q: str = "", user: User = Depends(verify_token), | |
| db: Session = Depends(get_db)): | |
| """Search files across a cloud connection.""" | |
| conn = db.query(CloudConnection).filter_by(connection_id=connection_id, status="active").first() | |
| if not conn: | |
| raise HTTPException(status_code=404, detail="Connection not found") | |
| if conn.user_id != user.id and user.role != "super_admin": | |
| raise HTTPException(status_code=403, detail="Not authorized") | |
| if not q: | |
| return {"items": []} | |
| import httpx | |
| if conn.provider == "google_drive": | |
| access_token = await refresh_google_token(conn, db) | |
| query = f"name contains '{q}' and trashed = false" | |
| async with httpx.AsyncClient() as client: | |
| resp = await client.get("https://www.googleapis.com/drive/v3/files", | |
| headers={"Authorization": f"Bearer {access_token}"}, | |
| params={"q": query, "fields": "files(id,name,mimeType,size,modifiedTime)", "pageSize": 50}) | |
| if resp.status_code != 200: | |
| return {"items": []} | |
| items = [] | |
| for f in resp.json().get("files", []): | |
| is_folder = f["mimeType"] == "application/vnd.google-apps.folder" | |
| items.append({ | |
| "id": f["id"], "name": f["name"], | |
| "type": "folder" if is_folder else "file", | |
| "size": int(f.get("size", 0)), "modified": f.get("modifiedTime", ""), | |
| "importable": not is_folder and is_importable(f["name"]), | |
| }) | |
| return {"items": items} | |
| elif conn.provider == "onedrive": | |
| access_token = await refresh_onedrive_token(conn, db) | |
| async with httpx.AsyncClient() as client: | |
| resp = await client.get(f"https://graph.microsoft.com/v1.0/me/drive/root/search(q='{q}')", | |
| headers={"Authorization": f"Bearer {access_token}"}, | |
| params={"$top": 50}) | |
| if resp.status_code != 200: | |
| return {"items": []} | |
| items = [] | |
| for f in resp.json().get("value", []): | |
| is_folder = "folder" in f | |
| items.append({ | |
| "id": f["id"], "name": f["name"], | |
| "type": "folder" if is_folder else "file", | |
| "size": f.get("size", 0), "modified": f.get("lastModifiedDateTime", ""), | |
| "importable": not is_folder and is_importable(f["name"]), | |
| }) | |
| return {"items": items} | |
| elif conn.provider == "dropbox": | |
| access_token = await refresh_dropbox_token(conn, db) | |
| async with httpx.AsyncClient() as client: | |
| resp = await client.post("https://api.dropboxapi.com/2/files/search_v2", | |
| headers={"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"}, | |
| json={"query": q, "options": {"max_results": 50}}) | |
| if resp.status_code != 200: | |
| return {"items": []} | |
| items = [] | |
| for match in resp.json().get("matches", []): | |
| entry = match.get("metadata", {}).get("metadata", {}) | |
| is_folder = entry.get(".tag") == "folder" | |
| items.append({ | |
| "id": entry.get("id", ""), "name": entry.get("name", ""), | |
| "type": "folder" if is_folder else "file", | |
| "size": entry.get("size", 0), "modified": entry.get("server_modified", ""), | |
| "importable": not is_folder and is_importable(entry.get("name", "")), | |
| }) | |
| return {"items": items} | |
| elif conn.provider == "s3": | |
| # S3 doesn't have native search — list with prefix filter | |
| access_key = decrypt_token(conn.access_token_encrypted) | |
| secret_key = decrypt_token(conn.refresh_token_encrypted) | |
| meta = conn.provider_metadata | |
| import boto3 | |
| s3 = boto3.client("s3", aws_access_key_id=access_key, aws_secret_access_key=secret_key, | |
| region_name=meta.get("region", "us-east-1")) | |
| resp = s3.list_objects_v2(Bucket=meta.get("bucket", ""), MaxKeys=500) | |
| items = [] | |
| for obj in resp.get("Contents", []): | |
| name = obj["Key"].split("/")[-1] | |
| if q.lower() in name.lower(): | |
| items.append({ | |
| "id": obj["Key"], "name": name, "type": "file", | |
| "size": obj.get("Size", 0), | |
| "modified": obj.get("LastModified", "").isoformat() if hasattr(obj.get("LastModified", ""), "isoformat") else "", | |
| "importable": is_importable(name), | |
| }) | |
| return {"items": items[:50]} | |
| return {"items": []} | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| # ─── IMPORT ──────────────────────────────────────────────────────────────────── | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| async def import_files(request: Request, user: User = Depends(verify_token), | |
| db: Session = Depends(get_db)): | |
| """Import files from cloud storage into DocuScan.""" | |
| body = await request.json() | |
| connection_id = body.get("connection_id", "") | |
| files = body.get("files", []) | |
| if not connection_id or not files: | |
| raise HTTPException(status_code=400, detail="connection_id and files are required") | |
| conn = db.query(CloudConnection).filter_by(connection_id=connection_id, status="active").first() | |
| if not conn: | |
| raise HTTPException(status_code=404, detail="Connection not found") | |
| if conn.user_id != user.id and user.role != "super_admin": | |
| raise HTTPException(status_code=403, detail="Not authorized") | |
| import_ids = [] | |
| for f in files: | |
| import_id = f"ci_{uuid.uuid4().hex[:8]}" | |
| ci = CloudImport( | |
| import_id=import_id, | |
| connection_id=connection_id, | |
| provider=conn.provider, | |
| cloud_file_id=f.get("cloud_file_id", f.get("id", "")), | |
| cloud_file_path=f.get("path", ""), | |
| cloud_file_name=f.get("name", "unknown"), | |
| cloud_file_size=f.get("size", 0), | |
| status="downloading", | |
| user_email=user.email, | |
| org_id=user.org_id, | |
| ) | |
| db.add(ci) | |
| import_ids.append(import_id) | |
| db.commit() | |
| # Kick off async downloads | |
| for import_id in import_ids: | |
| asyncio.create_task(download_and_process(import_id, connection_id, user.email, user.org_id)) | |
| return {"message": f"Importing {len(files)} file(s)", "import_ids": import_ids} | |
| async def download_and_process(import_id: str, connection_id: str, user_email: str, org_id: int): | |
| """Download file from cloud provider and process it through DocuScan pipeline.""" | |
| db = SessionLocal() | |
| try: | |
| ci = db.query(CloudImport).filter_by(import_id=import_id).first() | |
| conn = db.query(CloudConnection).filter_by(connection_id=connection_id).first() | |
| if not ci or not conn: | |
| return | |
| # Download file content | |
| file_bytes = None | |
| filename = ci.cloud_file_name | |
| try: | |
| if conn.provider == "google_drive": | |
| file_bytes = await download_google_drive_file(conn, ci.cloud_file_id, db) | |
| elif conn.provider == "onedrive": | |
| file_bytes = await download_onedrive_file(conn, ci.cloud_file_id, db) | |
| elif conn.provider == "s3": | |
| file_bytes = await download_s3_file(conn, ci.cloud_file_id) | |
| elif conn.provider == "dropbox": | |
| file_bytes = await download_dropbox_file(conn, ci.cloud_file_path or ci.cloud_file_id, db) | |
| except Exception as e: | |
| ci.status = "error" | |
| ci.error = str(e) | |
| db.commit() | |
| return | |
| if not file_bytes: | |
| ci.status = "error" | |
| ci.error = "Download returned empty content" | |
| db.commit() | |
| return | |
| # Save to disk | |
| doc_id = f"doc_{uuid.uuid4().hex[:8]}" | |
| file_path = UPLOAD_DIR / f"{doc_id}_{filename}" | |
| with open(file_path, "wb") as f: | |
| f.write(file_bytes) | |
| # Create Document row (same as upload endpoint) | |
| import os | |
| doc = Document( | |
| doc_id=doc_id, | |
| filename=filename, | |
| file_path=str(file_path), | |
| file_size=os.path.getsize(file_path), | |
| uploaded_by=user_email, | |
| org_id=org_id, | |
| status="processing", | |
| conversion_status={"text": "pending", "tables": "pending", "images": "pending"}, | |
| ) | |
| db.add(doc) | |
| ci.doc_id = doc_id | |
| ci.status = "processing" | |
| db.commit() | |
| # Reuse existing process_document pipeline | |
| from backend import process_document | |
| await process_document(doc_id, file_path, filename, user_email, org_id) | |
| # Update import status | |
| ci_refresh = db.query(CloudImport).filter_by(import_id=import_id).first() | |
| doc_refresh = db.query(Document).filter_by(doc_id=doc_id).first() | |
| if ci_refresh and doc_refresh: | |
| ci_refresh.status = "ready" if doc_refresh.status == "ready" else doc_refresh.status | |
| if doc_refresh.error: | |
| ci_refresh.error = doc_refresh.error | |
| db.commit() | |
| except Exception as e: | |
| logger.exception(f"Import {import_id} failed: {e}") | |
| ci = db.query(CloudImport).filter_by(import_id=import_id).first() | |
| if ci: | |
| ci.status = "error" | |
| ci.error = str(e) | |
| db.commit() | |
| finally: | |
| db.close() | |
| # ─── Download helpers ────────────────────────────────────────────────────────── | |
| async def download_google_drive_file(conn: CloudConnection, file_id: str, db: Session) -> bytes: | |
| access_token = await refresh_google_token(conn, db) | |
| import httpx | |
| async with httpx.AsyncClient() as client: | |
| resp = await client.get( | |
| f"https://www.googleapis.com/drive/v3/files/{file_id}?alt=media", | |
| headers={"Authorization": f"Bearer {access_token}"}, | |
| follow_redirects=True, | |
| ) | |
| if resp.status_code != 200: | |
| raise Exception(f"Google Drive download failed ({resp.status_code})") | |
| return resp.content | |
| async def download_onedrive_file(conn: CloudConnection, item_id: str, db: Session) -> bytes: | |
| access_token = await refresh_onedrive_token(conn, db) | |
| import httpx | |
| async with httpx.AsyncClient() as client: | |
| resp = await client.get( | |
| f"https://graph.microsoft.com/v1.0/me/drive/items/{item_id}/content", | |
| headers={"Authorization": f"Bearer {access_token}"}, | |
| follow_redirects=True, | |
| ) | |
| if resp.status_code != 200: | |
| raise Exception(f"OneDrive download failed ({resp.status_code})") | |
| return resp.content | |
| async def download_s3_file(conn: CloudConnection, key: str) -> bytes: | |
| access_key = decrypt_token(conn.access_token_encrypted) | |
| secret_key = decrypt_token(conn.refresh_token_encrypted) | |
| meta = conn.provider_metadata | |
| import boto3 | |
| s3 = boto3.client("s3", aws_access_key_id=access_key, aws_secret_access_key=secret_key, | |
| region_name=meta.get("region", "us-east-1")) | |
| obj = s3.get_object(Bucket=meta.get("bucket", ""), Key=key) | |
| return obj["Body"].read() | |
| async def download_dropbox_file(conn: CloudConnection, path: str, db: Session) -> bytes: | |
| access_token = await refresh_dropbox_token(conn, db) | |
| import httpx | |
| async with httpx.AsyncClient() as client: | |
| resp = await client.post( | |
| "https://content.dropboxapi.com/2/files/download", | |
| headers={ | |
| "Authorization": f"Bearer {access_token}", | |
| "Dropbox-API-Arg": json.dumps({"path": path}), | |
| }, | |
| ) | |
| if resp.status_code != 200: | |
| raise Exception(f"Dropbox download failed ({resp.status_code})") | |
| return resp.content | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| # ─── IMPORT HISTORY ──────────────────────────────────────────────────────────── | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| async def list_imports(user: User = Depends(verify_token), db: Session = Depends(get_db)): | |
| """List recent cloud imports for the user.""" | |
| q = db.query(CloudImport).filter_by(user_email=user.email) | |
| if user.role != "super_admin": | |
| q = q.filter_by(org_id=user.org_id) | |
| imports = q.order_by(CloudImport.created_at.desc()).limit(50).all() | |
| return {"imports": [ci.to_dict() for ci in imports]} | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| # ─── EMAIL FORWARDING (Mailgun Inbound Routes) ──────────────────────────────── | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| # | |
| # Multi-tenant routing via To address: | |
| # {org-slug}@docuscan.5gvector.com → routes to that org | |
| # statements@docuscan.5gvector.com → falls back to default org | |
| # | |
| # Mailgun POSTs multipart form data with: | |
| # sender, from, recipient, To, subject, body-plain, body-html, | |
| # attachment-1, attachment-2, ... (file uploads, no count field) | |
| # timestamp, token, signature (for HMAC verification) | |
| # ═══════════════════════════════════════════════════════════════════════════════ | |
| EMAIL_DOMAIN = "docuscan.5gvector.com" | |
| MAILGUN_WEBHOOK_SIGNING_KEY = os.environ.get("MAILGUN_WEBHOOK_SIGNING_KEY", "") | |
| def verify_mailgun_signature(timestamp: str, token: str, signature: str) -> bool: | |
| """Verify Mailgun webhook signature using HMAC-SHA256.""" | |
| if not MAILGUN_WEBHOOK_SIGNING_KEY: | |
| return True # Skip verification if no key configured | |
| import hmac, hashlib | |
| expected = hmac.new( | |
| MAILGUN_WEBHOOK_SIGNING_KEY.encode(), | |
| f"{timestamp}{token}".encode(), | |
| hashlib.sha256, | |
| ).hexdigest() | |
| return hmac.compare_digest(expected, signature) | |
| def parse_org_from_to_address(to_address: str, db: Session): | |
| """Extract org from to-address like 'docuscan@docuscan.5gvector.com' or 'Org Name <slug@domain>'.""" | |
| import re | |
| # Extract email from "Name <email>" format | |
| match = re.search(r'<([^>]+)>', to_address) | |
| email = match.group(1) if match else to_address.strip() | |
| local_part = email.split("@")[0].lower().strip() | |
| # Try to match org slug | |
| org = db.query(Organization).filter( | |
| Organization.slug == local_part, | |
| Organization.is_active == True, | |
| ).first() | |
| if org: | |
| return org | |
| # Fallback: "statements" or any unmatched → default org | |
| return db.query(Organization).first() | |
| async def email_inbound(request: Request): | |
| """ | |
| Mailgun Inbound Routes webhook — receives forwarded emails with attachments. | |
| No auth required (Mailgun POSTs directly). Secured by optional HMAC signature. | |
| """ | |
| db = SessionLocal() | |
| try: | |
| form = await request.form() | |
| # Optional: verify Mailgun webhook signature | |
| mg_timestamp = form.get("timestamp", "") | |
| mg_token = form.get("token", "") | |
| mg_signature = form.get("signature", "") | |
| if MAILGUN_WEBHOOK_SIGNING_KEY and not verify_mailgun_signature(mg_timestamp, mg_token, mg_signature): | |
| logger.warning("Email inbound: invalid Mailgun signature") | |
| return JSONResponse({"error": "unauthorized"}, status_code=406) | |
| # Mailgun fields: "sender" is clean email, "from" includes display name | |
| from_email = form.get("sender", form.get("from", "unknown@unknown.com")) | |
| to_email = form.get("recipient", form.get("To", form.get("to", ""))) | |
| subject = form.get("subject", "(no subject)") | |
| # Extract clean from email | |
| import re | |
| from_match = re.search(r'<([^>]+)>', from_email) | |
| from_clean = from_match.group(1) if from_match else from_email.strip() | |
| # Route to org based on To address | |
| org = parse_org_from_to_address(to_email, db) | |
| if not org: | |
| logger.error(f"Email inbound: no org found for to={to_email}") | |
| return JSONResponse({"error": "no matching organization"}, status_code=200) | |
| # Find a user in this org to attribute the import to (prefer admin, then any active user) | |
| attributed_user = ( | |
| db.query(User) | |
| .filter(User.org_id == org.id, User.is_active == True) | |
| .order_by(User.role.desc()) | |
| .first() | |
| ) | |
| attributed_email = attributed_user.email if attributed_user else from_clean | |
| # Extract attachments — Mailgun uses "attachment-1", "attachment-2", etc. | |
| # Also scan all form keys for any UploadFile objects (handles both formats) | |
| processed = 0 | |
| attachment_keys = [] | |
| for key in form: | |
| val = form[key] | |
| # Mailgun: attachment-1, attachment-2, ... | |
| # SendGrid compat: attachment1, attachment2, ... | |
| if hasattr(val, "read") and (key.startswith("attachment-") or key.startswith("attachment")): | |
| attachment_keys.append(key) | |
| for key in attachment_keys: | |
| attachment = form[key] | |
| filename = getattr(attachment, "filename", f"{key}.bin") | |
| content_type = getattr(attachment, "content_type", "application/octet-stream") | |
| # Only process importable file types | |
| if not is_importable(filename): | |
| logger.info(f"Email inbound: skipping non-importable attachment {filename}") | |
| continue | |
| # Read file content | |
| file_bytes = await attachment.read() | |
| if not file_bytes: | |
| continue | |
| import_id = f"ei_{uuid.uuid4().hex[:8]}" | |
| doc_id = f"doc_{uuid.uuid4().hex[:8]}" | |
| # Save to disk | |
| file_path = UPLOAD_DIR / f"{doc_id}_{filename}" | |
| with open(file_path, "wb") as f: | |
| f.write(file_bytes) | |
| file_size = len(file_bytes) | |
| # Create Document row | |
| doc = Document( | |
| doc_id=doc_id, | |
| filename=filename, | |
| file_path=str(file_path), | |
| file_size=file_size, | |
| uploaded_by=attributed_email, | |
| org_id=org.id, | |
| status="processing", | |
| conversion_status={"text": "pending", "tables": "pending", "images": "pending"}, | |
| ) | |
| db.add(doc) | |
| # Create EmailImport row | |
| ei = EmailImport( | |
| import_id=import_id, | |
| org_id=org.id, | |
| from_email=from_clean, | |
| to_email=to_email, | |
| subject=subject, | |
| attachment_name=filename, | |
| attachment_size=file_size, | |
| doc_id=doc_id, | |
| status="processing", | |
| matched_user_email=attributed_email, | |
| ) | |
| db.add(ei) | |
| db.commit() | |
| # Kick off async processing | |
| asyncio.create_task( | |
| process_email_attachment(import_id, doc_id, str(file_path), filename, attributed_email, org.id) | |
| ) | |
| processed += 1 | |
| if processed == 0 and len(attachment_keys) == 0: | |
| # No attachments — still log the email | |
| import_id = f"ei_{uuid.uuid4().hex[:8]}" | |
| ei = EmailImport( | |
| import_id=import_id, | |
| org_id=org.id if org else None, | |
| from_email=from_clean, | |
| to_email=to_email, | |
| subject=subject, | |
| attachment_name="(no attachments)", | |
| attachment_size=0, | |
| status="error", | |
| error="Email had no attachments to process", | |
| matched_user_email=attributed_email if org else None, | |
| ) | |
| db.add(ei) | |
| db.commit() | |
| logger.info(f"Email inbound: from={from_clean} to={to_email} org={org.slug if org else 'none'} attachments={processed}") | |
| # Mailgun expects 200 OK (non-200 triggers retries) | |
| return JSONResponse({"message": f"Processed {processed} attachment(s)", "org": org.slug if org else None}) | |
| except Exception as e: | |
| logger.exception(f"Email inbound error: {e}") | |
| return JSONResponse({"error": str(e)}, status_code=200) # Always 200 to prevent retries | |
| finally: | |
| db.close() | |
| async def process_email_attachment(import_id: str, doc_id: str, file_path: str, | |
| filename: str, user_email: str, org_id: int): | |
| """Process an email attachment through the DocuScan pipeline.""" | |
| db = SessionLocal() | |
| try: | |
| from backend import process_document | |
| await process_document(doc_id, file_path, filename, user_email, org_id) | |
| # Update email import status | |
| ei = db.query(EmailImport).filter_by(import_id=import_id).first() | |
| doc = db.query(Document).filter_by(doc_id=doc_id).first() | |
| if ei and doc: | |
| ei.status = "ready" if doc.status == "ready" else doc.status | |
| if doc.error: | |
| ei.error = doc.error | |
| db.commit() | |
| except Exception as e: | |
| logger.exception(f"Email attachment processing failed: {e}") | |
| ei = db.query(EmailImport).filter_by(import_id=import_id).first() | |
| if ei: | |
| ei.status = "error" | |
| ei.error = str(e) | |
| db.commit() | |
| finally: | |
| db.close() | |
| async def list_email_imports(user: User = Depends(verify_token), db: Session = Depends(get_db)): | |
| """List recent email imports for the user's org.""" | |
| q = db.query(EmailImport) | |
| if user.role != "super_admin": | |
| q = q.filter_by(org_id=user.org_id) | |
| imports = q.order_by(EmailImport.created_at.desc()).limit(50).all() | |
| return {"imports": [ei.to_dict() for ei in imports]} | |
| async def get_email_address(user: User = Depends(verify_token), db: Session = Depends(get_db)): | |
| """Get the inbound email address for the user's org.""" | |
| org = db.query(Organization).filter_by(id=user.org_id).first() | |
| if not org: | |
| return {"address": f"statements@{EMAIL_DOMAIN}", "org_slug": "default"} | |
| return {"address": f"{org.slug}@{EMAIL_DOMAIN}", "org_slug": org.slug} | |