""" DocuScan AI — Cloud Storage Integration Router OAuth-based connections for Google Drive, OneDrive, Dropbox + credential-based S3. Popup OAuth flow to work inside HF Spaces iframes. """ import os import io import uuid import json import asyncio import logging from datetime import datetime, timedelta from pathlib import Path from fastapi import APIRouter, Request, HTTPException, Depends from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse from jose import jwt, JWTError from sqlalchemy.orm import Session from cryptography.fernet import Fernet from database import get_db, SessionLocal from models import User, Organization, Document, CloudConnection, CloudImport, EmailImport logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/cloud", tags=["cloud-storage"]) # ─── Config ─────────────────────────────────────────────────────────────────── SECRET_KEY = "docuscan-demo-secret-key-change-in-production" ALGORITHM = "HS256" UPLOAD_DIR = Path("/data/uploads") # Token encryption CLOUD_TOKEN_ENCRYPTION_KEY = os.environ.get("CLOUD_TOKEN_ENCRYPTION_KEY", "") _fernet = None def get_fernet(): global _fernet if _fernet is None: key = CLOUD_TOKEN_ENCRYPTION_KEY if not key: key = Fernet.generate_key().decode() logger.warning("CLOUD_TOKEN_ENCRYPTION_KEY not set — using ephemeral key (tokens won't survive restart)") _fernet = Fernet(key.encode() if isinstance(key, str) else key) return _fernet def encrypt_token(token: str) -> str: if not token: return "" return get_fernet().encrypt(token.encode()).decode() def decrypt_token(encrypted: str) -> str: if not encrypted: return "" try: return get_fernet().decrypt(encrypted.encode()).decode() except Exception: return "" # OAuth credentials from env GOOGLE_CLIENT_ID = os.environ.get("GOOGLE_OAUTH_CLIENT_ID", "") GOOGLE_CLIENT_SECRET = os.environ.get("GOOGLE_OAUTH_CLIENT_SECRET", "") MICROSOFT_CLIENT_ID = os.environ.get("MICROSOFT_CLIENT_ID", "") MICROSOFT_CLIENT_SECRET = os.environ.get("MICROSOFT_CLIENT_SECRET", "") MICROSOFT_TENANT_ID = os.environ.get("MICROSOFT_TENANT_ID", "common") DROPBOX_APP_KEY = os.environ.get("DROPBOX_APP_KEY", "") DROPBOX_APP_SECRET = os.environ.get("DROPBOX_APP_SECRET", "") # ─── Auth Helper ────────────────────────────────────────────────────────────── def verify_token(request: Request, db: Session = Depends(get_db)): auth = request.headers.get("Authorization", "") if not auth.startswith("Bearer "): raise HTTPException(status_code=401, detail="Not authenticated") try: payload = jwt.decode(auth.split(" ")[1], SECRET_KEY, algorithms=[ALGORITHM]) email = payload.get("sub") user = db.query(User).filter_by(email=email, is_active=True).first() if not user: raise HTTPException(status_code=401, detail="User not found") return user except JWTError: raise HTTPException(status_code=401, detail="Invalid token") def get_base_url(request: Request) -> str: """Build base URL from request host for dual-domain support.""" host = request.headers.get("host", "") scheme = request.headers.get("x-forwarded-proto", "https") return f"{scheme}://{host}" def create_oauth_state(user_id: int, provider: str, nonce: str = None) -> str: """Create signed JWT state parameter for OAuth CSRF protection.""" payload = { "user_id": user_id, "provider": provider, "nonce": nonce or uuid.uuid4().hex, "exp": datetime.utcnow() + timedelta(minutes=10), } return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) def verify_oauth_state(state: str) -> dict: """Verify and decode OAuth state JWT.""" try: return jwt.decode(state, SECRET_KEY, algorithms=[ALGORITHM]) except JWTError: raise HTTPException(status_code=400, detail="Invalid or expired OAuth state") def popup_close_html(success: bool, provider: str, error: str = "") -> HTMLResponse: """Render HTML that posts message to opener and closes popup.""" msg_type = "cloud-auth-complete" if success else "cloud-auth-error" return HTMLResponse(f"""

{'Connected successfully!' if success else f'Error: {error}'} You can close this window.

""") # ═══════════════════════════════════════════════════════════════════════════════ # ─── CONNECTION MANAGEMENT ───────────────────────────────────────────────────── # ═══════════════════════════════════════════════════════════════════════════════ @router.get("/connections") async def list_connections(user: User = Depends(verify_token), db: Session = Depends(get_db)): """List user's active cloud connections.""" q = db.query(CloudConnection).filter( CloudConnection.user_id == user.id, CloudConnection.status != "revoked", ) if user.role != "super_admin": q = q.filter(CloudConnection.org_id == user.org_id) conns = q.order_by(CloudConnection.created_at.desc()).all() return {"connections": [c.to_dict() for c in conns]} @router.delete("/connections/{connection_id}") async def revoke_connection(connection_id: str, user: User = Depends(verify_token), db: Session = Depends(get_db)): """Soft-revoke a cloud connection.""" conn = db.query(CloudConnection).filter_by(connection_id=connection_id).first() if not conn: raise HTTPException(status_code=404, detail="Connection not found") if conn.user_id != user.id and user.role != "super_admin": raise HTTPException(status_code=403, detail="Not authorized") conn.status = "revoked" db.commit() return {"message": "Connection revoked", "connection_id": connection_id} # ═══════════════════════════════════════════════════════════════════════════════ # ─── GOOGLE DRIVE OAuth ──────────────────────────────────────────────────────── # ═══════════════════════════════════════════════════════════════════════════════ @router.get("/google/auth-start") async def google_auth_start(request: Request, db: Session = Depends(get_db)): """Redirect popup to Google OAuth consent screen.""" # Get user from query param (popup can't send Bearer header easily) token = request.query_params.get("token", "") if not token: return HTMLResponse("

Missing token

", status_code=401) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user = db.query(User).filter_by(email=payload.get("sub"), is_active=True).first() if not user: return HTMLResponse("

User not found

", status_code=401) except JWTError: return HTMLResponse("

Invalid token

", status_code=401) if not GOOGLE_CLIENT_ID: return popup_close_html(False, "google_drive", "Google OAuth not configured") base_url = get_base_url(request) redirect_uri = f"{base_url}/api/cloud/google/callback" state = create_oauth_state(user.id, "google_drive") auth_url = ( "https://accounts.google.com/o/oauth2/v2/auth?" f"client_id={GOOGLE_CLIENT_ID}" f"&redirect_uri={redirect_uri}" "&response_type=code" "&scope=https://www.googleapis.com/auth/drive.readonly" "&access_type=offline" "&prompt=consent" f"&state={state}" ) return RedirectResponse(auth_url) @router.get("/google/callback") async def google_callback(request: Request, db: Session = Depends(get_db)): """Handle Google OAuth callback — store tokens, close popup.""" code = request.query_params.get("code", "") state = request.query_params.get("state", "") error = request.query_params.get("error", "") if error: return popup_close_html(False, "google_drive", error) state_data = verify_oauth_state(state) user = db.query(User).filter_by(id=state_data["user_id"], is_active=True).first() if not user: return popup_close_html(False, "google_drive", "User not found") # Exchange code for tokens base_url = get_base_url(request) redirect_uri = f"{base_url}/api/cloud/google/callback" import httpx async with httpx.AsyncClient() as client: resp = await client.post("https://oauth2.googleapis.com/token", data={ "code": code, "client_id": GOOGLE_CLIENT_ID, "client_secret": GOOGLE_CLIENT_SECRET, "redirect_uri": redirect_uri, "grant_type": "authorization_code", }) if resp.status_code != 200: return popup_close_html(False, "google_drive", f"Token exchange failed: {resp.text}") tokens = resp.json() access_token = tokens.get("access_token", "") refresh_token = tokens.get("refresh_token", "") expires_in = tokens.get("expires_in", 3600) # Get user email from Google async with httpx.AsyncClient() as client: userinfo = await client.get("https://www.googleapis.com/oauth2/v2/userinfo", headers={"Authorization": f"Bearer {access_token}"}) google_email = userinfo.json().get("email", "") if userinfo.status_code == 200 else "" # Create or update connection conn_id = f"cc_{uuid.uuid4().hex[:8]}" conn = CloudConnection( connection_id=conn_id, user_id=user.id, org_id=user.org_id, provider="google_drive", display_name=f"Google Drive ({google_email})" if google_email else "Google Drive", access_token_encrypted=encrypt_token(access_token), refresh_token_encrypted=encrypt_token(refresh_token), token_expires_at=datetime.utcnow() + timedelta(seconds=expires_in), provider_user_email=google_email, status="active", ) db.add(conn) db.commit() return popup_close_html(True, "google_drive") async def refresh_google_token(conn: CloudConnection, db: Session) -> str: """Refresh Google access token if expired. Returns current valid token.""" if conn.token_expires_at and conn.token_expires_at > datetime.utcnow() + timedelta(minutes=2): return decrypt_token(conn.access_token_encrypted) refresh_token = decrypt_token(conn.refresh_token_encrypted) if not refresh_token: conn.status = "expired" db.commit() raise HTTPException(status_code=401, detail="No refresh token — please reconnect") import httpx async with httpx.AsyncClient() as client: resp = await client.post("https://oauth2.googleapis.com/token", data={ "client_id": GOOGLE_CLIENT_ID, "client_secret": GOOGLE_CLIENT_SECRET, "refresh_token": refresh_token, "grant_type": "refresh_token", }) if resp.status_code != 200: conn.status = "expired" db.commit() raise HTTPException(status_code=401, detail="Token refresh failed — please reconnect") tokens = resp.json() conn.access_token_encrypted = encrypt_token(tokens["access_token"]) conn.token_expires_at = datetime.utcnow() + timedelta(seconds=tokens.get("expires_in", 3600)) conn.status = "active" db.commit() return tokens["access_token"] # ═══════════════════════════════════════════════════════════════════════════════ # ─── MICROSOFT OneDrive/SharePoint OAuth ─────────────────────────────────────── # ═══════════════════════════════════════════════════════════════════════════════ @router.get("/onedrive/auth-start") async def onedrive_auth_start(request: Request, db: Session = Depends(get_db)): """Redirect popup to Microsoft Entra ID OAuth.""" token = request.query_params.get("token", "") if not token: return HTMLResponse("

Missing token

", status_code=401) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user = db.query(User).filter_by(email=payload.get("sub"), is_active=True).first() if not user: return HTMLResponse("

User not found

", status_code=401) except JWTError: return HTMLResponse("

Invalid token

", status_code=401) if not MICROSOFT_CLIENT_ID: return popup_close_html(False, "onedrive", "Microsoft OAuth not configured") base_url = get_base_url(request) redirect_uri = f"{base_url}/api/cloud/onedrive/callback" state = create_oauth_state(user.id, "onedrive") auth_url = ( f"https://login.microsoftonline.com/{MICROSOFT_TENANT_ID}/oauth2/v2.0/authorize?" f"client_id={MICROSOFT_CLIENT_ID}" f"&redirect_uri={redirect_uri}" "&response_type=code" "&scope=https://graph.microsoft.com/Files.Read.All https://graph.microsoft.com/Sites.Read.All offline_access" f"&state={state}" ) return RedirectResponse(auth_url) @router.get("/onedrive/callback") async def onedrive_callback(request: Request, db: Session = Depends(get_db)): """Handle Microsoft OAuth callback.""" code = request.query_params.get("code", "") state = request.query_params.get("state", "") error = request.query_params.get("error", "") if error: return popup_close_html(False, "onedrive", request.query_params.get("error_description", error)) state_data = verify_oauth_state(state) user = db.query(User).filter_by(id=state_data["user_id"], is_active=True).first() if not user: return popup_close_html(False, "onedrive", "User not found") base_url = get_base_url(request) redirect_uri = f"{base_url}/api/cloud/onedrive/callback" import httpx async with httpx.AsyncClient() as client: resp = await client.post( f"https://login.microsoftonline.com/{MICROSOFT_TENANT_ID}/oauth2/v2.0/token", data={ "code": code, "client_id": MICROSOFT_CLIENT_ID, "client_secret": MICROSOFT_CLIENT_SECRET, "redirect_uri": redirect_uri, "grant_type": "authorization_code", }, ) if resp.status_code != 200: return popup_close_html(False, "onedrive", f"Token exchange failed") tokens = resp.json() access_token = tokens.get("access_token", "") refresh_token = tokens.get("refresh_token", "") expires_in = tokens.get("expires_in", 3600) # Get user profile async with httpx.AsyncClient() as client: profile = await client.get("https://graph.microsoft.com/v1.0/me", headers={"Authorization": f"Bearer {access_token}"}) ms_email = "" ms_name = "OneDrive" if profile.status_code == 200: pdata = profile.json() ms_email = pdata.get("mail", pdata.get("userPrincipalName", "")) ms_name = pdata.get("displayName", "OneDrive") conn_id = f"cc_{uuid.uuid4().hex[:8]}" conn = CloudConnection( connection_id=conn_id, user_id=user.id, org_id=user.org_id, provider="onedrive", display_name=f"OneDrive ({ms_email})" if ms_email else f"OneDrive ({ms_name})", access_token_encrypted=encrypt_token(access_token), refresh_token_encrypted=encrypt_token(refresh_token), token_expires_at=datetime.utcnow() + timedelta(seconds=expires_in), provider_user_email=ms_email, status="active", ) db.add(conn) db.commit() return popup_close_html(True, "onedrive") async def refresh_onedrive_token(conn: CloudConnection, db: Session) -> str: """Refresh Microsoft access token if expired.""" if conn.token_expires_at and conn.token_expires_at > datetime.utcnow() + timedelta(minutes=2): return decrypt_token(conn.access_token_encrypted) refresh_token = decrypt_token(conn.refresh_token_encrypted) if not refresh_token: conn.status = "expired" db.commit() raise HTTPException(status_code=401, detail="No refresh token — please reconnect") import httpx async with httpx.AsyncClient() as client: resp = await client.post( f"https://login.microsoftonline.com/{MICROSOFT_TENANT_ID}/oauth2/v2.0/token", data={ "client_id": MICROSOFT_CLIENT_ID, "client_secret": MICROSOFT_CLIENT_SECRET, "refresh_token": refresh_token, "grant_type": "refresh_token", }, ) if resp.status_code != 200: conn.status = "expired" db.commit() raise HTTPException(status_code=401, detail="Token refresh failed — please reconnect") tokens = resp.json() conn.access_token_encrypted = encrypt_token(tokens["access_token"]) if tokens.get("refresh_token"): conn.refresh_token_encrypted = encrypt_token(tokens["refresh_token"]) conn.token_expires_at = datetime.utcnow() + timedelta(seconds=tokens.get("expires_in", 3600)) conn.status = "active" db.commit() return tokens["access_token"] # ═══════════════════════════════════════════════════════════════════════════════ # ─── DROPBOX OAuth ───────────────────────────────────────────────────────────── # ═══════════════════════════════════════════════════════════════════════════════ @router.get("/dropbox/auth-start") async def dropbox_auth_start(request: Request, db: Session = Depends(get_db)): """Redirect popup to Dropbox OAuth.""" token = request.query_params.get("token", "") if not token: return HTMLResponse("

Missing token

", status_code=401) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user = db.query(User).filter_by(email=payload.get("sub"), is_active=True).first() if not user: return HTMLResponse("

User not found

", status_code=401) except JWTError: return HTMLResponse("

Invalid token

", status_code=401) if not DROPBOX_APP_KEY: return popup_close_html(False, "dropbox", "Dropbox OAuth not configured") base_url = get_base_url(request) redirect_uri = f"{base_url}/api/cloud/dropbox/callback" state = create_oauth_state(user.id, "dropbox") auth_url = ( "https://www.dropbox.com/oauth2/authorize?" f"client_id={DROPBOX_APP_KEY}" f"&redirect_uri={redirect_uri}" "&response_type=code" "&token_access_type=offline" f"&state={state}" ) return RedirectResponse(auth_url) @router.get("/dropbox/callback") async def dropbox_callback(request: Request, db: Session = Depends(get_db)): """Handle Dropbox OAuth callback.""" code = request.query_params.get("code", "") state = request.query_params.get("state", "") error = request.query_params.get("error", "") if error: return popup_close_html(False, "dropbox", error) state_data = verify_oauth_state(state) user = db.query(User).filter_by(id=state_data["user_id"], is_active=True).first() if not user: return popup_close_html(False, "dropbox", "User not found") base_url = get_base_url(request) redirect_uri = f"{base_url}/api/cloud/dropbox/callback" import httpx async with httpx.AsyncClient() as client: resp = await client.post("https://api.dropboxapi.com/oauth2/token", data={ "code": code, "grant_type": "authorization_code", "redirect_uri": redirect_uri, "client_id": DROPBOX_APP_KEY, "client_secret": DROPBOX_APP_SECRET, }) if resp.status_code != 200: return popup_close_html(False, "dropbox", "Token exchange failed") tokens = resp.json() access_token = tokens.get("access_token", "") refresh_token = tokens.get("refresh_token", "") expires_in = tokens.get("expires_in", 14400) # Get account info dbx_email = "" dbx_name = "Dropbox" async with httpx.AsyncClient() as client: acct = await client.post("https://api.dropboxapi.com/2/users/get_current_account", headers={"Authorization": f"Bearer {access_token}"}, content=b"null") if acct.status_code == 200: adata = acct.json() dbx_email = adata.get("email", "") dbx_name = adata.get("name", {}).get("display_name", "Dropbox") conn_id = f"cc_{uuid.uuid4().hex[:8]}" conn = CloudConnection( connection_id=conn_id, user_id=user.id, org_id=user.org_id, provider="dropbox", display_name=f"Dropbox ({dbx_email})" if dbx_email else f"Dropbox ({dbx_name})", access_token_encrypted=encrypt_token(access_token), refresh_token_encrypted=encrypt_token(refresh_token), token_expires_at=datetime.utcnow() + timedelta(seconds=expires_in), provider_user_email=dbx_email, status="active", ) db.add(conn) db.commit() return popup_close_html(True, "dropbox") async def refresh_dropbox_token(conn: CloudConnection, db: Session) -> str: """Refresh Dropbox access token if expired.""" if conn.token_expires_at and conn.token_expires_at > datetime.utcnow() + timedelta(minutes=2): return decrypt_token(conn.access_token_encrypted) refresh_token = decrypt_token(conn.refresh_token_encrypted) if not refresh_token: conn.status = "expired" db.commit() raise HTTPException(status_code=401, detail="No refresh token — please reconnect") import httpx async with httpx.AsyncClient() as client: resp = await client.post("https://api.dropboxapi.com/oauth2/token", data={ "refresh_token": refresh_token, "grant_type": "refresh_token", "client_id": DROPBOX_APP_KEY, "client_secret": DROPBOX_APP_SECRET, }) if resp.status_code != 200: conn.status = "expired" db.commit() raise HTTPException(status_code=401, detail="Token refresh failed — please reconnect") tokens = resp.json() conn.access_token_encrypted = encrypt_token(tokens["access_token"]) conn.token_expires_at = datetime.utcnow() + timedelta(seconds=tokens.get("expires_in", 14400)) conn.status = "active" db.commit() return tokens["access_token"] # ═══════════════════════════════════════════════════════════════════════════════ # ─── AWS S3 (Credential-based, no OAuth) ────────────────────────────────────── # ═══════════════════════════════════════════════════════════════════════════════ @router.post("/s3/connect") async def s3_connect(request: Request, user: User = Depends(verify_token), db: Session = Depends(get_db)): """Create S3 connection with access key credentials.""" body = await request.json() access_key = body.get("access_key", "").strip() secret_key = body.get("secret_key", "").strip() bucket = body.get("bucket", "").strip() region = body.get("region", "us-east-1").strip() if not all([access_key, secret_key, bucket]): raise HTTPException(status_code=400, detail="access_key, secret_key, and bucket are required") # Validate credentials by listing bucket try: import boto3 s3 = boto3.client("s3", aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) s3.head_bucket(Bucket=bucket) except Exception as e: raise HTTPException(status_code=400, detail=f"S3 validation failed: {str(e)}") conn_id = f"cc_{uuid.uuid4().hex[:8]}" conn = CloudConnection( connection_id=conn_id, user_id=user.id, org_id=user.org_id, provider="s3", display_name=f"S3: {bucket}", access_token_encrypted=encrypt_token(access_key), refresh_token_encrypted=encrypt_token(secret_key), provider_metadata={"bucket": bucket, "region": region}, status="active", ) db.add(conn) db.commit() return {"connection_id": conn_id, "message": f"Connected to S3 bucket: {bucket}"} # ═══════════════════════════════════════════════════════════════════════════════ # ─── FILE BROWSING (Provider-agnostic) ──────────────────────────────────────── # ═══════════════════════════════════════════════════════════════════════════════ IMPORTABLE_EXTENSIONS = {".pdf", ".csv", ".xlsx", ".xls", ".docx", ".txt", ".doc"} def is_importable(filename: str) -> bool: return Path(filename).suffix.lower() in IMPORTABLE_EXTENSIONS @router.get("/{connection_id}/browse") async def browse_files(connection_id: str, path: str = "/", request: Request = None, user: User = Depends(verify_token), db: Session = Depends(get_db)): """Browse files and folders in a cloud connection.""" conn = db.query(CloudConnection).filter_by(connection_id=connection_id, status="active").first() if not conn: raise HTTPException(status_code=404, detail="Connection not found or inactive") if conn.user_id != user.id and user.role != "super_admin": raise HTTPException(status_code=403, detail="Not authorized") conn.last_used_at = datetime.utcnow() db.commit() if conn.provider == "google_drive": return await browse_google_drive(conn, path, db) elif conn.provider == "onedrive": return await browse_onedrive(conn, path, db) elif conn.provider == "s3": return await browse_s3(conn, path) elif conn.provider == "dropbox": return await browse_dropbox(conn, path, db) else: raise HTTPException(status_code=400, detail=f"Unsupported provider: {conn.provider}") async def browse_google_drive(conn: CloudConnection, path: str, db: Session): """Browse Google Drive files.""" access_token = await refresh_google_token(conn, db) import httpx # path = "root" or a folder ID folder_id = path if path != "/" else "root" query = f"'{folder_id}' in parents and trashed = false" items = [] page_token = None async with httpx.AsyncClient() as client: while True: params = { "q": query, "fields": "nextPageToken,files(id,name,mimeType,size,modifiedTime)", "pageSize": 100, "orderBy": "folder,name", } if page_token: params["pageToken"] = page_token resp = await client.get("https://www.googleapis.com/drive/v3/files", headers={"Authorization": f"Bearer {access_token}"}, params=params) if resp.status_code != 200: raise HTTPException(status_code=502, detail="Failed to list Google Drive files") data = resp.json() for f in data.get("files", []): is_folder = f["mimeType"] == "application/vnd.google-apps.folder" items.append({ "id": f["id"], "name": f["name"], "type": "folder" if is_folder else "file", "size": int(f.get("size", 0)), "modified": f.get("modifiedTime", ""), "importable": not is_folder and is_importable(f["name"]), }) page_token = data.get("nextPageToken") if not page_token: break return {"path": path, "items": items} async def browse_onedrive(conn: CloudConnection, path: str, db: Session): """Browse OneDrive/SharePoint files.""" access_token = await refresh_onedrive_token(conn, db) import httpx if path == "/" or path == "root": url = "https://graph.microsoft.com/v1.0/me/drive/root/children" else: url = f"https://graph.microsoft.com/v1.0/me/drive/items/{path}/children" items = [] async with httpx.AsyncClient() as client: resp = await client.get(url, headers={"Authorization": f"Bearer {access_token}"}, params={"$top": 200, "$orderby": "name"}) if resp.status_code != 200: raise HTTPException(status_code=502, detail="Failed to list OneDrive files") data = resp.json() for f in data.get("value", []): is_folder = "folder" in f items.append({ "id": f["id"], "name": f["name"], "type": "folder" if is_folder else "file", "size": f.get("size", 0), "modified": f.get("lastModifiedDateTime", ""), "importable": not is_folder and is_importable(f["name"]), }) return {"path": path, "items": items} async def browse_s3(conn: CloudConnection, path: str): """Browse S3 bucket.""" access_key = decrypt_token(conn.access_token_encrypted) secret_key = decrypt_token(conn.refresh_token_encrypted) meta = conn.provider_metadata bucket = meta.get("bucket", "") region = meta.get("region", "us-east-1") import boto3 s3 = boto3.client("s3", aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) prefix = path.lstrip("/") if prefix and not prefix.endswith("/"): prefix += "/" if prefix == "/": prefix = "" resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix, Delimiter="/", MaxKeys=200) items = [] # Folders (common prefixes) for cp in resp.get("CommonPrefixes", []): folder_name = cp["Prefix"].rstrip("/").split("/")[-1] items.append({ "id": cp["Prefix"], "name": folder_name, "type": "folder", "size": 0, "modified": "", "importable": False, }) # Files for obj in resp.get("Contents", []): key = obj["Key"] if key == prefix: continue name = key.split("/")[-1] items.append({ "id": key, "name": name, "type": "file", "size": obj.get("Size", 0), "modified": obj.get("LastModified", "").isoformat() if hasattr(obj.get("LastModified", ""), "isoformat") else str(obj.get("LastModified", "")), "importable": is_importable(name), }) return {"path": path, "items": items} async def browse_dropbox(conn: CloudConnection, path: str, db: Session): """Browse Dropbox files.""" access_token = await refresh_dropbox_token(conn, db) import httpx dbx_path = "" if path == "/" else path items = [] async with httpx.AsyncClient() as client: resp = await client.post( "https://api.dropboxapi.com/2/files/list_folder", headers={"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"}, json={"path": dbx_path, "limit": 200, "include_non_downloadable_files": False}, ) if resp.status_code != 200: raise HTTPException(status_code=502, detail="Failed to list Dropbox files") data = resp.json() for entry in data.get("entries", []): is_folder = entry[".tag"] == "folder" items.append({ "id": entry.get("id", entry.get("path_lower", "")), "name": entry["name"], "type": "folder" if is_folder else "file", "size": entry.get("size", 0), "modified": entry.get("server_modified", ""), "importable": not is_folder and is_importable(entry["name"]), "path_lower": entry.get("path_lower", ""), }) return {"path": path, "items": items} # ═══════════════════════════════════════════════════════════════════════════════ # ─── SEARCH ──────────────────────────────────────────────────────────────────── # ═══════════════════════════════════════════════════════════════════════════════ @router.get("/{connection_id}/search") async def search_files(connection_id: str, q: str = "", user: User = Depends(verify_token), db: Session = Depends(get_db)): """Search files across a cloud connection.""" conn = db.query(CloudConnection).filter_by(connection_id=connection_id, status="active").first() if not conn: raise HTTPException(status_code=404, detail="Connection not found") if conn.user_id != user.id and user.role != "super_admin": raise HTTPException(status_code=403, detail="Not authorized") if not q: return {"items": []} import httpx if conn.provider == "google_drive": access_token = await refresh_google_token(conn, db) query = f"name contains '{q}' and trashed = false" async with httpx.AsyncClient() as client: resp = await client.get("https://www.googleapis.com/drive/v3/files", headers={"Authorization": f"Bearer {access_token}"}, params={"q": query, "fields": "files(id,name,mimeType,size,modifiedTime)", "pageSize": 50}) if resp.status_code != 200: return {"items": []} items = [] for f in resp.json().get("files", []): is_folder = f["mimeType"] == "application/vnd.google-apps.folder" items.append({ "id": f["id"], "name": f["name"], "type": "folder" if is_folder else "file", "size": int(f.get("size", 0)), "modified": f.get("modifiedTime", ""), "importable": not is_folder and is_importable(f["name"]), }) return {"items": items} elif conn.provider == "onedrive": access_token = await refresh_onedrive_token(conn, db) async with httpx.AsyncClient() as client: resp = await client.get(f"https://graph.microsoft.com/v1.0/me/drive/root/search(q='{q}')", headers={"Authorization": f"Bearer {access_token}"}, params={"$top": 50}) if resp.status_code != 200: return {"items": []} items = [] for f in resp.json().get("value", []): is_folder = "folder" in f items.append({ "id": f["id"], "name": f["name"], "type": "folder" if is_folder else "file", "size": f.get("size", 0), "modified": f.get("lastModifiedDateTime", ""), "importable": not is_folder and is_importable(f["name"]), }) return {"items": items} elif conn.provider == "dropbox": access_token = await refresh_dropbox_token(conn, db) async with httpx.AsyncClient() as client: resp = await client.post("https://api.dropboxapi.com/2/files/search_v2", headers={"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"}, json={"query": q, "options": {"max_results": 50}}) if resp.status_code != 200: return {"items": []} items = [] for match in resp.json().get("matches", []): entry = match.get("metadata", {}).get("metadata", {}) is_folder = entry.get(".tag") == "folder" items.append({ "id": entry.get("id", ""), "name": entry.get("name", ""), "type": "folder" if is_folder else "file", "size": entry.get("size", 0), "modified": entry.get("server_modified", ""), "importable": not is_folder and is_importable(entry.get("name", "")), }) return {"items": items} elif conn.provider == "s3": # S3 doesn't have native search — list with prefix filter access_key = decrypt_token(conn.access_token_encrypted) secret_key = decrypt_token(conn.refresh_token_encrypted) meta = conn.provider_metadata import boto3 s3 = boto3.client("s3", aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=meta.get("region", "us-east-1")) resp = s3.list_objects_v2(Bucket=meta.get("bucket", ""), MaxKeys=500) items = [] for obj in resp.get("Contents", []): name = obj["Key"].split("/")[-1] if q.lower() in name.lower(): items.append({ "id": obj["Key"], "name": name, "type": "file", "size": obj.get("Size", 0), "modified": obj.get("LastModified", "").isoformat() if hasattr(obj.get("LastModified", ""), "isoformat") else "", "importable": is_importable(name), }) return {"items": items[:50]} return {"items": []} # ═══════════════════════════════════════════════════════════════════════════════ # ─── IMPORT ──────────────────────────────────────────────────────────────────── # ═══════════════════════════════════════════════════════════════════════════════ @router.post("/import") async def import_files(request: Request, user: User = Depends(verify_token), db: Session = Depends(get_db)): """Import files from cloud storage into DocuScan.""" body = await request.json() connection_id = body.get("connection_id", "") files = body.get("files", []) if not connection_id or not files: raise HTTPException(status_code=400, detail="connection_id and files are required") conn = db.query(CloudConnection).filter_by(connection_id=connection_id, status="active").first() if not conn: raise HTTPException(status_code=404, detail="Connection not found") if conn.user_id != user.id and user.role != "super_admin": raise HTTPException(status_code=403, detail="Not authorized") import_ids = [] for f in files: import_id = f"ci_{uuid.uuid4().hex[:8]}" ci = CloudImport( import_id=import_id, connection_id=connection_id, provider=conn.provider, cloud_file_id=f.get("cloud_file_id", f.get("id", "")), cloud_file_path=f.get("path", ""), cloud_file_name=f.get("name", "unknown"), cloud_file_size=f.get("size", 0), status="downloading", user_email=user.email, org_id=user.org_id, ) db.add(ci) import_ids.append(import_id) db.commit() # Kick off async downloads for import_id in import_ids: asyncio.create_task(download_and_process(import_id, connection_id, user.email, user.org_id)) return {"message": f"Importing {len(files)} file(s)", "import_ids": import_ids} async def download_and_process(import_id: str, connection_id: str, user_email: str, org_id: int): """Download file from cloud provider and process it through DocuScan pipeline.""" db = SessionLocal() try: ci = db.query(CloudImport).filter_by(import_id=import_id).first() conn = db.query(CloudConnection).filter_by(connection_id=connection_id).first() if not ci or not conn: return # Download file content file_bytes = None filename = ci.cloud_file_name try: if conn.provider == "google_drive": file_bytes = await download_google_drive_file(conn, ci.cloud_file_id, db) elif conn.provider == "onedrive": file_bytes = await download_onedrive_file(conn, ci.cloud_file_id, db) elif conn.provider == "s3": file_bytes = await download_s3_file(conn, ci.cloud_file_id) elif conn.provider == "dropbox": file_bytes = await download_dropbox_file(conn, ci.cloud_file_path or ci.cloud_file_id, db) except Exception as e: ci.status = "error" ci.error = str(e) db.commit() return if not file_bytes: ci.status = "error" ci.error = "Download returned empty content" db.commit() return # Save to disk doc_id = f"doc_{uuid.uuid4().hex[:8]}" file_path = UPLOAD_DIR / f"{doc_id}_{filename}" with open(file_path, "wb") as f: f.write(file_bytes) # Create Document row (same as upload endpoint) import os doc = Document( doc_id=doc_id, filename=filename, file_path=str(file_path), file_size=os.path.getsize(file_path), uploaded_by=user_email, org_id=org_id, status="processing", conversion_status={"text": "pending", "tables": "pending", "images": "pending"}, ) db.add(doc) ci.doc_id = doc_id ci.status = "processing" db.commit() # Reuse existing process_document pipeline from backend import process_document await process_document(doc_id, file_path, filename, user_email, org_id) # Update import status ci_refresh = db.query(CloudImport).filter_by(import_id=import_id).first() doc_refresh = db.query(Document).filter_by(doc_id=doc_id).first() if ci_refresh and doc_refresh: ci_refresh.status = "ready" if doc_refresh.status == "ready" else doc_refresh.status if doc_refresh.error: ci_refresh.error = doc_refresh.error db.commit() except Exception as e: logger.exception(f"Import {import_id} failed: {e}") ci = db.query(CloudImport).filter_by(import_id=import_id).first() if ci: ci.status = "error" ci.error = str(e) db.commit() finally: db.close() # ─── Download helpers ────────────────────────────────────────────────────────── async def download_google_drive_file(conn: CloudConnection, file_id: str, db: Session) -> bytes: access_token = await refresh_google_token(conn, db) import httpx async with httpx.AsyncClient() as client: resp = await client.get( f"https://www.googleapis.com/drive/v3/files/{file_id}?alt=media", headers={"Authorization": f"Bearer {access_token}"}, follow_redirects=True, ) if resp.status_code != 200: raise Exception(f"Google Drive download failed ({resp.status_code})") return resp.content async def download_onedrive_file(conn: CloudConnection, item_id: str, db: Session) -> bytes: access_token = await refresh_onedrive_token(conn, db) import httpx async with httpx.AsyncClient() as client: resp = await client.get( f"https://graph.microsoft.com/v1.0/me/drive/items/{item_id}/content", headers={"Authorization": f"Bearer {access_token}"}, follow_redirects=True, ) if resp.status_code != 200: raise Exception(f"OneDrive download failed ({resp.status_code})") return resp.content async def download_s3_file(conn: CloudConnection, key: str) -> bytes: access_key = decrypt_token(conn.access_token_encrypted) secret_key = decrypt_token(conn.refresh_token_encrypted) meta = conn.provider_metadata import boto3 s3 = boto3.client("s3", aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=meta.get("region", "us-east-1")) obj = s3.get_object(Bucket=meta.get("bucket", ""), Key=key) return obj["Body"].read() async def download_dropbox_file(conn: CloudConnection, path: str, db: Session) -> bytes: access_token = await refresh_dropbox_token(conn, db) import httpx async with httpx.AsyncClient() as client: resp = await client.post( "https://content.dropboxapi.com/2/files/download", headers={ "Authorization": f"Bearer {access_token}", "Dropbox-API-Arg": json.dumps({"path": path}), }, ) if resp.status_code != 200: raise Exception(f"Dropbox download failed ({resp.status_code})") return resp.content # ═══════════════════════════════════════════════════════════════════════════════ # ─── IMPORT HISTORY ──────────────────────────────────────────────────────────── # ═══════════════════════════════════════════════════════════════════════════════ @router.get("/imports") async def list_imports(user: User = Depends(verify_token), db: Session = Depends(get_db)): """List recent cloud imports for the user.""" q = db.query(CloudImport).filter_by(user_email=user.email) if user.role != "super_admin": q = q.filter_by(org_id=user.org_id) imports = q.order_by(CloudImport.created_at.desc()).limit(50).all() return {"imports": [ci.to_dict() for ci in imports]} # ═══════════════════════════════════════════════════════════════════════════════ # ─── EMAIL FORWARDING (Mailgun Inbound Routes) ──────────────────────────────── # ═══════════════════════════════════════════════════════════════════════════════ # # Multi-tenant routing via To address: # {org-slug}@docuscan.5gvector.com → routes to that org # statements@docuscan.5gvector.com → falls back to default org # # Mailgun POSTs multipart form data with: # sender, from, recipient, To, subject, body-plain, body-html, # attachment-1, attachment-2, ... (file uploads, no count field) # timestamp, token, signature (for HMAC verification) # ═══════════════════════════════════════════════════════════════════════════════ EMAIL_DOMAIN = "docuscan.5gvector.com" MAILGUN_WEBHOOK_SIGNING_KEY = os.environ.get("MAILGUN_WEBHOOK_SIGNING_KEY", "") def verify_mailgun_signature(timestamp: str, token: str, signature: str) -> bool: """Verify Mailgun webhook signature using HMAC-SHA256.""" if not MAILGUN_WEBHOOK_SIGNING_KEY: return True # Skip verification if no key configured import hmac, hashlib expected = hmac.new( MAILGUN_WEBHOOK_SIGNING_KEY.encode(), f"{timestamp}{token}".encode(), hashlib.sha256, ).hexdigest() return hmac.compare_digest(expected, signature) def parse_org_from_to_address(to_address: str, db: Session): """Extract org from to-address like 'docuscan@docuscan.5gvector.com' or 'Org Name '.""" import re # Extract email from "Name " format match = re.search(r'<([^>]+)>', to_address) email = match.group(1) if match else to_address.strip() local_part = email.split("@")[0].lower().strip() # Try to match org slug org = db.query(Organization).filter( Organization.slug == local_part, Organization.is_active == True, ).first() if org: return org # Fallback: "statements" or any unmatched → default org return db.query(Organization).first() @router.post("/email/inbound") async def email_inbound(request: Request): """ Mailgun Inbound Routes webhook — receives forwarded emails with attachments. No auth required (Mailgun POSTs directly). Secured by optional HMAC signature. """ db = SessionLocal() try: form = await request.form() # Optional: verify Mailgun webhook signature mg_timestamp = form.get("timestamp", "") mg_token = form.get("token", "") mg_signature = form.get("signature", "") if MAILGUN_WEBHOOK_SIGNING_KEY and not verify_mailgun_signature(mg_timestamp, mg_token, mg_signature): logger.warning("Email inbound: invalid Mailgun signature") return JSONResponse({"error": "unauthorized"}, status_code=406) # Mailgun fields: "sender" is clean email, "from" includes display name from_email = form.get("sender", form.get("from", "unknown@unknown.com")) to_email = form.get("recipient", form.get("To", form.get("to", ""))) subject = form.get("subject", "(no subject)") # Extract clean from email import re from_match = re.search(r'<([^>]+)>', from_email) from_clean = from_match.group(1) if from_match else from_email.strip() # Route to org based on To address org = parse_org_from_to_address(to_email, db) if not org: logger.error(f"Email inbound: no org found for to={to_email}") return JSONResponse({"error": "no matching organization"}, status_code=200) # Find a user in this org to attribute the import to (prefer admin, then any active user) attributed_user = ( db.query(User) .filter(User.org_id == org.id, User.is_active == True) .order_by(User.role.desc()) .first() ) attributed_email = attributed_user.email if attributed_user else from_clean # Extract attachments — Mailgun uses "attachment-1", "attachment-2", etc. # Also scan all form keys for any UploadFile objects (handles both formats) processed = 0 attachment_keys = [] for key in form: val = form[key] # Mailgun: attachment-1, attachment-2, ... # SendGrid compat: attachment1, attachment2, ... if hasattr(val, "read") and (key.startswith("attachment-") or key.startswith("attachment")): attachment_keys.append(key) for key in attachment_keys: attachment = form[key] filename = getattr(attachment, "filename", f"{key}.bin") content_type = getattr(attachment, "content_type", "application/octet-stream") # Only process importable file types if not is_importable(filename): logger.info(f"Email inbound: skipping non-importable attachment {filename}") continue # Read file content file_bytes = await attachment.read() if not file_bytes: continue import_id = f"ei_{uuid.uuid4().hex[:8]}" doc_id = f"doc_{uuid.uuid4().hex[:8]}" # Save to disk file_path = UPLOAD_DIR / f"{doc_id}_{filename}" with open(file_path, "wb") as f: f.write(file_bytes) file_size = len(file_bytes) # Create Document row doc = Document( doc_id=doc_id, filename=filename, file_path=str(file_path), file_size=file_size, uploaded_by=attributed_email, org_id=org.id, status="processing", conversion_status={"text": "pending", "tables": "pending", "images": "pending"}, ) db.add(doc) # Create EmailImport row ei = EmailImport( import_id=import_id, org_id=org.id, from_email=from_clean, to_email=to_email, subject=subject, attachment_name=filename, attachment_size=file_size, doc_id=doc_id, status="processing", matched_user_email=attributed_email, ) db.add(ei) db.commit() # Kick off async processing asyncio.create_task( process_email_attachment(import_id, doc_id, str(file_path), filename, attributed_email, org.id) ) processed += 1 if processed == 0 and len(attachment_keys) == 0: # No attachments — still log the email import_id = f"ei_{uuid.uuid4().hex[:8]}" ei = EmailImport( import_id=import_id, org_id=org.id if org else None, from_email=from_clean, to_email=to_email, subject=subject, attachment_name="(no attachments)", attachment_size=0, status="error", error="Email had no attachments to process", matched_user_email=attributed_email if org else None, ) db.add(ei) db.commit() logger.info(f"Email inbound: from={from_clean} to={to_email} org={org.slug if org else 'none'} attachments={processed}") # Mailgun expects 200 OK (non-200 triggers retries) return JSONResponse({"message": f"Processed {processed} attachment(s)", "org": org.slug if org else None}) except Exception as e: logger.exception(f"Email inbound error: {e}") return JSONResponse({"error": str(e)}, status_code=200) # Always 200 to prevent retries finally: db.close() async def process_email_attachment(import_id: str, doc_id: str, file_path: str, filename: str, user_email: str, org_id: int): """Process an email attachment through the DocuScan pipeline.""" db = SessionLocal() try: from backend import process_document await process_document(doc_id, file_path, filename, user_email, org_id) # Update email import status ei = db.query(EmailImport).filter_by(import_id=import_id).first() doc = db.query(Document).filter_by(doc_id=doc_id).first() if ei and doc: ei.status = "ready" if doc.status == "ready" else doc.status if doc.error: ei.error = doc.error db.commit() except Exception as e: logger.exception(f"Email attachment processing failed: {e}") ei = db.query(EmailImport).filter_by(import_id=import_id).first() if ei: ei.status = "error" ei.error = str(e) db.commit() finally: db.close() @router.get("/email/imports") async def list_email_imports(user: User = Depends(verify_token), db: Session = Depends(get_db)): """List recent email imports for the user's org.""" q = db.query(EmailImport) if user.role != "super_admin": q = q.filter_by(org_id=user.org_id) imports = q.order_by(EmailImport.created_at.desc()).limit(50).all() return {"imports": [ei.to_dict() for ei in imports]} @router.get("/email/address") async def get_email_address(user: User = Depends(verify_token), db: Session = Depends(get_db)): """Get the inbound email address for the user's org.""" org = db.query(Organization).filter_by(id=user.org_id).first() if not org: return {"address": f"statements@{EMAIL_DOMAIN}", "org_slug": "default"} return {"address": f"{org.slug}@{EMAIL_DOMAIN}", "org_slug": org.slug}