docuscan-ai / cloud_storage.py
pramodmisra's picture
Switch email inbound from SendGrid to Mailgun format
9d5a46c
"""
DocuScan AI — Cloud Storage Integration Router
OAuth-based connections for Google Drive, OneDrive, Dropbox + credential-based S3.
Popup OAuth flow to work inside HF Spaces iframes.
"""
import os
import io
import uuid
import json
import asyncio
import logging
from datetime import datetime, timedelta
from pathlib import Path
from fastapi import APIRouter, Request, HTTPException, Depends
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
from jose import jwt, JWTError
from sqlalchemy.orm import Session
from cryptography.fernet import Fernet
from database import get_db, SessionLocal
from models import User, Organization, Document, CloudConnection, CloudImport, EmailImport
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/cloud", tags=["cloud-storage"])
# ─── Config ───────────────────────────────────────────────────────────────────
SECRET_KEY = "docuscan-demo-secret-key-change-in-production"
ALGORITHM = "HS256"
UPLOAD_DIR = Path("/data/uploads")
# Token encryption
CLOUD_TOKEN_ENCRYPTION_KEY = os.environ.get("CLOUD_TOKEN_ENCRYPTION_KEY", "")
_fernet = None
def get_fernet():
global _fernet
if _fernet is None:
key = CLOUD_TOKEN_ENCRYPTION_KEY
if not key:
key = Fernet.generate_key().decode()
logger.warning("CLOUD_TOKEN_ENCRYPTION_KEY not set — using ephemeral key (tokens won't survive restart)")
_fernet = Fernet(key.encode() if isinstance(key, str) else key)
return _fernet
def encrypt_token(token: str) -> str:
if not token:
return ""
return get_fernet().encrypt(token.encode()).decode()
def decrypt_token(encrypted: str) -> str:
if not encrypted:
return ""
try:
return get_fernet().decrypt(encrypted.encode()).decode()
except Exception:
return ""
# OAuth credentials from env
GOOGLE_CLIENT_ID = os.environ.get("GOOGLE_OAUTH_CLIENT_ID", "")
GOOGLE_CLIENT_SECRET = os.environ.get("GOOGLE_OAUTH_CLIENT_SECRET", "")
MICROSOFT_CLIENT_ID = os.environ.get("MICROSOFT_CLIENT_ID", "")
MICROSOFT_CLIENT_SECRET = os.environ.get("MICROSOFT_CLIENT_SECRET", "")
MICROSOFT_TENANT_ID = os.environ.get("MICROSOFT_TENANT_ID", "common")
DROPBOX_APP_KEY = os.environ.get("DROPBOX_APP_KEY", "")
DROPBOX_APP_SECRET = os.environ.get("DROPBOX_APP_SECRET", "")
# ─── Auth Helper ──────────────────────────────────────────────────────────────
def verify_token(request: Request, db: Session = Depends(get_db)):
auth = request.headers.get("Authorization", "")
if not auth.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Not authenticated")
try:
payload = jwt.decode(auth.split(" ")[1], SECRET_KEY, algorithms=[ALGORITHM])
email = payload.get("sub")
user = db.query(User).filter_by(email=email, is_active=True).first()
if not user:
raise HTTPException(status_code=401, detail="User not found")
return user
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
def get_base_url(request: Request) -> str:
"""Build base URL from request host for dual-domain support."""
host = request.headers.get("host", "")
scheme = request.headers.get("x-forwarded-proto", "https")
return f"{scheme}://{host}"
def create_oauth_state(user_id: int, provider: str, nonce: str = None) -> str:
"""Create signed JWT state parameter for OAuth CSRF protection."""
payload = {
"user_id": user_id,
"provider": provider,
"nonce": nonce or uuid.uuid4().hex,
"exp": datetime.utcnow() + timedelta(minutes=10),
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def verify_oauth_state(state: str) -> dict:
"""Verify and decode OAuth state JWT."""
try:
return jwt.decode(state, SECRET_KEY, algorithms=[ALGORITHM])
except JWTError:
raise HTTPException(status_code=400, detail="Invalid or expired OAuth state")
def popup_close_html(success: bool, provider: str, error: str = "") -> HTMLResponse:
"""Render HTML that posts message to opener and closes popup."""
msg_type = "cloud-auth-complete" if success else "cloud-auth-error"
return HTMLResponse(f"""<!DOCTYPE html><html><body>
<script>
if (window.opener) {{
window.opener.postMessage({{type: '{msg_type}', provider: '{provider}', error: '{error}'}}, '*');
}}
window.close();
</script>
<p>{'Connected successfully!' if success else f'Error: {error}'} You can close this window.</p>
</body></html>""")
# ═══════════════════════════════════════════════════════════════════════════════
# ─── CONNECTION MANAGEMENT ─────────────────────────────────────────────────────
# ═══════════════════════════════════════════════════════════════════════════════
@router.get("/connections")
async def list_connections(user: User = Depends(verify_token), db: Session = Depends(get_db)):
"""List user's active cloud connections."""
q = db.query(CloudConnection).filter(
CloudConnection.user_id == user.id,
CloudConnection.status != "revoked",
)
if user.role != "super_admin":
q = q.filter(CloudConnection.org_id == user.org_id)
conns = q.order_by(CloudConnection.created_at.desc()).all()
return {"connections": [c.to_dict() for c in conns]}
@router.delete("/connections/{connection_id}")
async def revoke_connection(connection_id: str, user: User = Depends(verify_token),
db: Session = Depends(get_db)):
"""Soft-revoke a cloud connection."""
conn = db.query(CloudConnection).filter_by(connection_id=connection_id).first()
if not conn:
raise HTTPException(status_code=404, detail="Connection not found")
if conn.user_id != user.id and user.role != "super_admin":
raise HTTPException(status_code=403, detail="Not authorized")
conn.status = "revoked"
db.commit()
return {"message": "Connection revoked", "connection_id": connection_id}
# ═══════════════════════════════════════════════════════════════════════════════
# ─── GOOGLE DRIVE OAuth ────────────────────────────────────────────────────────
# ═══════════════════════════════════════════════════════════════════════════════
@router.get("/google/auth-start")
async def google_auth_start(request: Request, db: Session = Depends(get_db)):
"""Redirect popup to Google OAuth consent screen."""
# Get user from query param (popup can't send Bearer header easily)
token = request.query_params.get("token", "")
if not token:
return HTMLResponse("<p>Missing token</p>", status_code=401)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user = db.query(User).filter_by(email=payload.get("sub"), is_active=True).first()
if not user:
return HTMLResponse("<p>User not found</p>", status_code=401)
except JWTError:
return HTMLResponse("<p>Invalid token</p>", status_code=401)
if not GOOGLE_CLIENT_ID:
return popup_close_html(False, "google_drive", "Google OAuth not configured")
base_url = get_base_url(request)
redirect_uri = f"{base_url}/api/cloud/google/callback"
state = create_oauth_state(user.id, "google_drive")
auth_url = (
"https://accounts.google.com/o/oauth2/v2/auth?"
f"client_id={GOOGLE_CLIENT_ID}"
f"&redirect_uri={redirect_uri}"
"&response_type=code"
"&scope=https://www.googleapis.com/auth/drive.readonly"
"&access_type=offline"
"&prompt=consent"
f"&state={state}"
)
return RedirectResponse(auth_url)
@router.get("/google/callback")
async def google_callback(request: Request, db: Session = Depends(get_db)):
"""Handle Google OAuth callback — store tokens, close popup."""
code = request.query_params.get("code", "")
state = request.query_params.get("state", "")
error = request.query_params.get("error", "")
if error:
return popup_close_html(False, "google_drive", error)
state_data = verify_oauth_state(state)
user = db.query(User).filter_by(id=state_data["user_id"], is_active=True).first()
if not user:
return popup_close_html(False, "google_drive", "User not found")
# Exchange code for tokens
base_url = get_base_url(request)
redirect_uri = f"{base_url}/api/cloud/google/callback"
import httpx
async with httpx.AsyncClient() as client:
resp = await client.post("https://oauth2.googleapis.com/token", data={
"code": code,
"client_id": GOOGLE_CLIENT_ID,
"client_secret": GOOGLE_CLIENT_SECRET,
"redirect_uri": redirect_uri,
"grant_type": "authorization_code",
})
if resp.status_code != 200:
return popup_close_html(False, "google_drive", f"Token exchange failed: {resp.text}")
tokens = resp.json()
access_token = tokens.get("access_token", "")
refresh_token = tokens.get("refresh_token", "")
expires_in = tokens.get("expires_in", 3600)
# Get user email from Google
async with httpx.AsyncClient() as client:
userinfo = await client.get("https://www.googleapis.com/oauth2/v2/userinfo",
headers={"Authorization": f"Bearer {access_token}"})
google_email = userinfo.json().get("email", "") if userinfo.status_code == 200 else ""
# Create or update connection
conn_id = f"cc_{uuid.uuid4().hex[:8]}"
conn = CloudConnection(
connection_id=conn_id,
user_id=user.id,
org_id=user.org_id,
provider="google_drive",
display_name=f"Google Drive ({google_email})" if google_email else "Google Drive",
access_token_encrypted=encrypt_token(access_token),
refresh_token_encrypted=encrypt_token(refresh_token),
token_expires_at=datetime.utcnow() + timedelta(seconds=expires_in),
provider_user_email=google_email,
status="active",
)
db.add(conn)
db.commit()
return popup_close_html(True, "google_drive")
async def refresh_google_token(conn: CloudConnection, db: Session) -> str:
"""Refresh Google access token if expired. Returns current valid token."""
if conn.token_expires_at and conn.token_expires_at > datetime.utcnow() + timedelta(minutes=2):
return decrypt_token(conn.access_token_encrypted)
refresh_token = decrypt_token(conn.refresh_token_encrypted)
if not refresh_token:
conn.status = "expired"
db.commit()
raise HTTPException(status_code=401, detail="No refresh token — please reconnect")
import httpx
async with httpx.AsyncClient() as client:
resp = await client.post("https://oauth2.googleapis.com/token", data={
"client_id": GOOGLE_CLIENT_ID,
"client_secret": GOOGLE_CLIENT_SECRET,
"refresh_token": refresh_token,
"grant_type": "refresh_token",
})
if resp.status_code != 200:
conn.status = "expired"
db.commit()
raise HTTPException(status_code=401, detail="Token refresh failed — please reconnect")
tokens = resp.json()
conn.access_token_encrypted = encrypt_token(tokens["access_token"])
conn.token_expires_at = datetime.utcnow() + timedelta(seconds=tokens.get("expires_in", 3600))
conn.status = "active"
db.commit()
return tokens["access_token"]
# ═══════════════════════════════════════════════════════════════════════════════
# ─── MICROSOFT OneDrive/SharePoint OAuth ───────────────────────────────────────
# ═══════════════════════════════════════════════════════════════════════════════
@router.get("/onedrive/auth-start")
async def onedrive_auth_start(request: Request, db: Session = Depends(get_db)):
"""Redirect popup to Microsoft Entra ID OAuth."""
token = request.query_params.get("token", "")
if not token:
return HTMLResponse("<p>Missing token</p>", status_code=401)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user = db.query(User).filter_by(email=payload.get("sub"), is_active=True).first()
if not user:
return HTMLResponse("<p>User not found</p>", status_code=401)
except JWTError:
return HTMLResponse("<p>Invalid token</p>", status_code=401)
if not MICROSOFT_CLIENT_ID:
return popup_close_html(False, "onedrive", "Microsoft OAuth not configured")
base_url = get_base_url(request)
redirect_uri = f"{base_url}/api/cloud/onedrive/callback"
state = create_oauth_state(user.id, "onedrive")
auth_url = (
f"https://login.microsoftonline.com/{MICROSOFT_TENANT_ID}/oauth2/v2.0/authorize?"
f"client_id={MICROSOFT_CLIENT_ID}"
f"&redirect_uri={redirect_uri}"
"&response_type=code"
"&scope=https://graph.microsoft.com/Files.Read.All https://graph.microsoft.com/Sites.Read.All offline_access"
f"&state={state}"
)
return RedirectResponse(auth_url)
@router.get("/onedrive/callback")
async def onedrive_callback(request: Request, db: Session = Depends(get_db)):
"""Handle Microsoft OAuth callback."""
code = request.query_params.get("code", "")
state = request.query_params.get("state", "")
error = request.query_params.get("error", "")
if error:
return popup_close_html(False, "onedrive", request.query_params.get("error_description", error))
state_data = verify_oauth_state(state)
user = db.query(User).filter_by(id=state_data["user_id"], is_active=True).first()
if not user:
return popup_close_html(False, "onedrive", "User not found")
base_url = get_base_url(request)
redirect_uri = f"{base_url}/api/cloud/onedrive/callback"
import httpx
async with httpx.AsyncClient() as client:
resp = await client.post(
f"https://login.microsoftonline.com/{MICROSOFT_TENANT_ID}/oauth2/v2.0/token",
data={
"code": code,
"client_id": MICROSOFT_CLIENT_ID,
"client_secret": MICROSOFT_CLIENT_SECRET,
"redirect_uri": redirect_uri,
"grant_type": "authorization_code",
},
)
if resp.status_code != 200:
return popup_close_html(False, "onedrive", f"Token exchange failed")
tokens = resp.json()
access_token = tokens.get("access_token", "")
refresh_token = tokens.get("refresh_token", "")
expires_in = tokens.get("expires_in", 3600)
# Get user profile
async with httpx.AsyncClient() as client:
profile = await client.get("https://graph.microsoft.com/v1.0/me",
headers={"Authorization": f"Bearer {access_token}"})
ms_email = ""
ms_name = "OneDrive"
if profile.status_code == 200:
pdata = profile.json()
ms_email = pdata.get("mail", pdata.get("userPrincipalName", ""))
ms_name = pdata.get("displayName", "OneDrive")
conn_id = f"cc_{uuid.uuid4().hex[:8]}"
conn = CloudConnection(
connection_id=conn_id,
user_id=user.id,
org_id=user.org_id,
provider="onedrive",
display_name=f"OneDrive ({ms_email})" if ms_email else f"OneDrive ({ms_name})",
access_token_encrypted=encrypt_token(access_token),
refresh_token_encrypted=encrypt_token(refresh_token),
token_expires_at=datetime.utcnow() + timedelta(seconds=expires_in),
provider_user_email=ms_email,
status="active",
)
db.add(conn)
db.commit()
return popup_close_html(True, "onedrive")
async def refresh_onedrive_token(conn: CloudConnection, db: Session) -> str:
"""Refresh Microsoft access token if expired."""
if conn.token_expires_at and conn.token_expires_at > datetime.utcnow() + timedelta(minutes=2):
return decrypt_token(conn.access_token_encrypted)
refresh_token = decrypt_token(conn.refresh_token_encrypted)
if not refresh_token:
conn.status = "expired"
db.commit()
raise HTTPException(status_code=401, detail="No refresh token — please reconnect")
import httpx
async with httpx.AsyncClient() as client:
resp = await client.post(
f"https://login.microsoftonline.com/{MICROSOFT_TENANT_ID}/oauth2/v2.0/token",
data={
"client_id": MICROSOFT_CLIENT_ID,
"client_secret": MICROSOFT_CLIENT_SECRET,
"refresh_token": refresh_token,
"grant_type": "refresh_token",
},
)
if resp.status_code != 200:
conn.status = "expired"
db.commit()
raise HTTPException(status_code=401, detail="Token refresh failed — please reconnect")
tokens = resp.json()
conn.access_token_encrypted = encrypt_token(tokens["access_token"])
if tokens.get("refresh_token"):
conn.refresh_token_encrypted = encrypt_token(tokens["refresh_token"])
conn.token_expires_at = datetime.utcnow() + timedelta(seconds=tokens.get("expires_in", 3600))
conn.status = "active"
db.commit()
return tokens["access_token"]
# ═══════════════════════════════════════════════════════════════════════════════
# ─── DROPBOX OAuth ─────────────────────────────────────────────────────────────
# ═══════════════════════════════════════════════════════════════════════════════
@router.get("/dropbox/auth-start")
async def dropbox_auth_start(request: Request, db: Session = Depends(get_db)):
"""Redirect popup to Dropbox OAuth."""
token = request.query_params.get("token", "")
if not token:
return HTMLResponse("<p>Missing token</p>", status_code=401)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user = db.query(User).filter_by(email=payload.get("sub"), is_active=True).first()
if not user:
return HTMLResponse("<p>User not found</p>", status_code=401)
except JWTError:
return HTMLResponse("<p>Invalid token</p>", status_code=401)
if not DROPBOX_APP_KEY:
return popup_close_html(False, "dropbox", "Dropbox OAuth not configured")
base_url = get_base_url(request)
redirect_uri = f"{base_url}/api/cloud/dropbox/callback"
state = create_oauth_state(user.id, "dropbox")
auth_url = (
"https://www.dropbox.com/oauth2/authorize?"
f"client_id={DROPBOX_APP_KEY}"
f"&redirect_uri={redirect_uri}"
"&response_type=code"
"&token_access_type=offline"
f"&state={state}"
)
return RedirectResponse(auth_url)
@router.get("/dropbox/callback")
async def dropbox_callback(request: Request, db: Session = Depends(get_db)):
"""Handle Dropbox OAuth callback."""
code = request.query_params.get("code", "")
state = request.query_params.get("state", "")
error = request.query_params.get("error", "")
if error:
return popup_close_html(False, "dropbox", error)
state_data = verify_oauth_state(state)
user = db.query(User).filter_by(id=state_data["user_id"], is_active=True).first()
if not user:
return popup_close_html(False, "dropbox", "User not found")
base_url = get_base_url(request)
redirect_uri = f"{base_url}/api/cloud/dropbox/callback"
import httpx
async with httpx.AsyncClient() as client:
resp = await client.post("https://api.dropboxapi.com/oauth2/token", data={
"code": code,
"grant_type": "authorization_code",
"redirect_uri": redirect_uri,
"client_id": DROPBOX_APP_KEY,
"client_secret": DROPBOX_APP_SECRET,
})
if resp.status_code != 200:
return popup_close_html(False, "dropbox", "Token exchange failed")
tokens = resp.json()
access_token = tokens.get("access_token", "")
refresh_token = tokens.get("refresh_token", "")
expires_in = tokens.get("expires_in", 14400)
# Get account info
dbx_email = ""
dbx_name = "Dropbox"
async with httpx.AsyncClient() as client:
acct = await client.post("https://api.dropboxapi.com/2/users/get_current_account",
headers={"Authorization": f"Bearer {access_token}"},
content=b"null")
if acct.status_code == 200:
adata = acct.json()
dbx_email = adata.get("email", "")
dbx_name = adata.get("name", {}).get("display_name", "Dropbox")
conn_id = f"cc_{uuid.uuid4().hex[:8]}"
conn = CloudConnection(
connection_id=conn_id,
user_id=user.id,
org_id=user.org_id,
provider="dropbox",
display_name=f"Dropbox ({dbx_email})" if dbx_email else f"Dropbox ({dbx_name})",
access_token_encrypted=encrypt_token(access_token),
refresh_token_encrypted=encrypt_token(refresh_token),
token_expires_at=datetime.utcnow() + timedelta(seconds=expires_in),
provider_user_email=dbx_email,
status="active",
)
db.add(conn)
db.commit()
return popup_close_html(True, "dropbox")
async def refresh_dropbox_token(conn: CloudConnection, db: Session) -> str:
"""Refresh Dropbox access token if expired."""
if conn.token_expires_at and conn.token_expires_at > datetime.utcnow() + timedelta(minutes=2):
return decrypt_token(conn.access_token_encrypted)
refresh_token = decrypt_token(conn.refresh_token_encrypted)
if not refresh_token:
conn.status = "expired"
db.commit()
raise HTTPException(status_code=401, detail="No refresh token — please reconnect")
import httpx
async with httpx.AsyncClient() as client:
resp = await client.post("https://api.dropboxapi.com/oauth2/token", data={
"refresh_token": refresh_token,
"grant_type": "refresh_token",
"client_id": DROPBOX_APP_KEY,
"client_secret": DROPBOX_APP_SECRET,
})
if resp.status_code != 200:
conn.status = "expired"
db.commit()
raise HTTPException(status_code=401, detail="Token refresh failed — please reconnect")
tokens = resp.json()
conn.access_token_encrypted = encrypt_token(tokens["access_token"])
conn.token_expires_at = datetime.utcnow() + timedelta(seconds=tokens.get("expires_in", 14400))
conn.status = "active"
db.commit()
return tokens["access_token"]
# ═══════════════════════════════════════════════════════════════════════════════
# ─── AWS S3 (Credential-based, no OAuth) ──────────────────────────────────────
# ═══════════════════════════════════════════════════════════════════════════════
@router.post("/s3/connect")
async def s3_connect(request: Request, user: User = Depends(verify_token),
db: Session = Depends(get_db)):
"""Create S3 connection with access key credentials."""
body = await request.json()
access_key = body.get("access_key", "").strip()
secret_key = body.get("secret_key", "").strip()
bucket = body.get("bucket", "").strip()
region = body.get("region", "us-east-1").strip()
if not all([access_key, secret_key, bucket]):
raise HTTPException(status_code=400, detail="access_key, secret_key, and bucket are required")
# Validate credentials by listing bucket
try:
import boto3
s3 = boto3.client("s3", aws_access_key_id=access_key, aws_secret_access_key=secret_key,
region_name=region)
s3.head_bucket(Bucket=bucket)
except Exception as e:
raise HTTPException(status_code=400, detail=f"S3 validation failed: {str(e)}")
conn_id = f"cc_{uuid.uuid4().hex[:8]}"
conn = CloudConnection(
connection_id=conn_id,
user_id=user.id,
org_id=user.org_id,
provider="s3",
display_name=f"S3: {bucket}",
access_token_encrypted=encrypt_token(access_key),
refresh_token_encrypted=encrypt_token(secret_key),
provider_metadata={"bucket": bucket, "region": region},
status="active",
)
db.add(conn)
db.commit()
return {"connection_id": conn_id, "message": f"Connected to S3 bucket: {bucket}"}
# ═══════════════════════════════════════════════════════════════════════════════
# ─── FILE BROWSING (Provider-agnostic) ────────────────────────────────────────
# ═══════════════════════════════════════════════════════════════════════════════
IMPORTABLE_EXTENSIONS = {".pdf", ".csv", ".xlsx", ".xls", ".docx", ".txt", ".doc"}
def is_importable(filename: str) -> bool:
return Path(filename).suffix.lower() in IMPORTABLE_EXTENSIONS
@router.get("/{connection_id}/browse")
async def browse_files(connection_id: str, path: str = "/", request: Request = None,
user: User = Depends(verify_token), db: Session = Depends(get_db)):
"""Browse files and folders in a cloud connection."""
conn = db.query(CloudConnection).filter_by(connection_id=connection_id, status="active").first()
if not conn:
raise HTTPException(status_code=404, detail="Connection not found or inactive")
if conn.user_id != user.id and user.role != "super_admin":
raise HTTPException(status_code=403, detail="Not authorized")
conn.last_used_at = datetime.utcnow()
db.commit()
if conn.provider == "google_drive":
return await browse_google_drive(conn, path, db)
elif conn.provider == "onedrive":
return await browse_onedrive(conn, path, db)
elif conn.provider == "s3":
return await browse_s3(conn, path)
elif conn.provider == "dropbox":
return await browse_dropbox(conn, path, db)
else:
raise HTTPException(status_code=400, detail=f"Unsupported provider: {conn.provider}")
async def browse_google_drive(conn: CloudConnection, path: str, db: Session):
"""Browse Google Drive files."""
access_token = await refresh_google_token(conn, db)
import httpx
# path = "root" or a folder ID
folder_id = path if path != "/" else "root"
query = f"'{folder_id}' in parents and trashed = false"
items = []
page_token = None
async with httpx.AsyncClient() as client:
while True:
params = {
"q": query,
"fields": "nextPageToken,files(id,name,mimeType,size,modifiedTime)",
"pageSize": 100,
"orderBy": "folder,name",
}
if page_token:
params["pageToken"] = page_token
resp = await client.get("https://www.googleapis.com/drive/v3/files",
headers={"Authorization": f"Bearer {access_token}"},
params=params)
if resp.status_code != 200:
raise HTTPException(status_code=502, detail="Failed to list Google Drive files")
data = resp.json()
for f in data.get("files", []):
is_folder = f["mimeType"] == "application/vnd.google-apps.folder"
items.append({
"id": f["id"],
"name": f["name"],
"type": "folder" if is_folder else "file",
"size": int(f.get("size", 0)),
"modified": f.get("modifiedTime", ""),
"importable": not is_folder and is_importable(f["name"]),
})
page_token = data.get("nextPageToken")
if not page_token:
break
return {"path": path, "items": items}
async def browse_onedrive(conn: CloudConnection, path: str, db: Session):
"""Browse OneDrive/SharePoint files."""
access_token = await refresh_onedrive_token(conn, db)
import httpx
if path == "/" or path == "root":
url = "https://graph.microsoft.com/v1.0/me/drive/root/children"
else:
url = f"https://graph.microsoft.com/v1.0/me/drive/items/{path}/children"
items = []
async with httpx.AsyncClient() as client:
resp = await client.get(url, headers={"Authorization": f"Bearer {access_token}"},
params={"$top": 200, "$orderby": "name"})
if resp.status_code != 200:
raise HTTPException(status_code=502, detail="Failed to list OneDrive files")
data = resp.json()
for f in data.get("value", []):
is_folder = "folder" in f
items.append({
"id": f["id"],
"name": f["name"],
"type": "folder" if is_folder else "file",
"size": f.get("size", 0),
"modified": f.get("lastModifiedDateTime", ""),
"importable": not is_folder and is_importable(f["name"]),
})
return {"path": path, "items": items}
async def browse_s3(conn: CloudConnection, path: str):
"""Browse S3 bucket."""
access_key = decrypt_token(conn.access_token_encrypted)
secret_key = decrypt_token(conn.refresh_token_encrypted)
meta = conn.provider_metadata
bucket = meta.get("bucket", "")
region = meta.get("region", "us-east-1")
import boto3
s3 = boto3.client("s3", aws_access_key_id=access_key, aws_secret_access_key=secret_key,
region_name=region)
prefix = path.lstrip("/")
if prefix and not prefix.endswith("/"):
prefix += "/"
if prefix == "/":
prefix = ""
resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix, Delimiter="/", MaxKeys=200)
items = []
# Folders (common prefixes)
for cp in resp.get("CommonPrefixes", []):
folder_name = cp["Prefix"].rstrip("/").split("/")[-1]
items.append({
"id": cp["Prefix"],
"name": folder_name,
"type": "folder",
"size": 0,
"modified": "",
"importable": False,
})
# Files
for obj in resp.get("Contents", []):
key = obj["Key"]
if key == prefix:
continue
name = key.split("/")[-1]
items.append({
"id": key,
"name": name,
"type": "file",
"size": obj.get("Size", 0),
"modified": obj.get("LastModified", "").isoformat() if hasattr(obj.get("LastModified", ""), "isoformat") else str(obj.get("LastModified", "")),
"importable": is_importable(name),
})
return {"path": path, "items": items}
async def browse_dropbox(conn: CloudConnection, path: str, db: Session):
"""Browse Dropbox files."""
access_token = await refresh_dropbox_token(conn, db)
import httpx
dbx_path = "" if path == "/" else path
items = []
async with httpx.AsyncClient() as client:
resp = await client.post(
"https://api.dropboxapi.com/2/files/list_folder",
headers={"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"},
json={"path": dbx_path, "limit": 200, "include_non_downloadable_files": False},
)
if resp.status_code != 200:
raise HTTPException(status_code=502, detail="Failed to list Dropbox files")
data = resp.json()
for entry in data.get("entries", []):
is_folder = entry[".tag"] == "folder"
items.append({
"id": entry.get("id", entry.get("path_lower", "")),
"name": entry["name"],
"type": "folder" if is_folder else "file",
"size": entry.get("size", 0),
"modified": entry.get("server_modified", ""),
"importable": not is_folder and is_importable(entry["name"]),
"path_lower": entry.get("path_lower", ""),
})
return {"path": path, "items": items}
# ═══════════════════════════════════════════════════════════════════════════════
# ─── SEARCH ────────────────────────────────────────────────────────────────────
# ═══════════════════════════════════════════════════════════════════════════════
@router.get("/{connection_id}/search")
async def search_files(connection_id: str, q: str = "", user: User = Depends(verify_token),
db: Session = Depends(get_db)):
"""Search files across a cloud connection."""
conn = db.query(CloudConnection).filter_by(connection_id=connection_id, status="active").first()
if not conn:
raise HTTPException(status_code=404, detail="Connection not found")
if conn.user_id != user.id and user.role != "super_admin":
raise HTTPException(status_code=403, detail="Not authorized")
if not q:
return {"items": []}
import httpx
if conn.provider == "google_drive":
access_token = await refresh_google_token(conn, db)
query = f"name contains '{q}' and trashed = false"
async with httpx.AsyncClient() as client:
resp = await client.get("https://www.googleapis.com/drive/v3/files",
headers={"Authorization": f"Bearer {access_token}"},
params={"q": query, "fields": "files(id,name,mimeType,size,modifiedTime)", "pageSize": 50})
if resp.status_code != 200:
return {"items": []}
items = []
for f in resp.json().get("files", []):
is_folder = f["mimeType"] == "application/vnd.google-apps.folder"
items.append({
"id": f["id"], "name": f["name"],
"type": "folder" if is_folder else "file",
"size": int(f.get("size", 0)), "modified": f.get("modifiedTime", ""),
"importable": not is_folder and is_importable(f["name"]),
})
return {"items": items}
elif conn.provider == "onedrive":
access_token = await refresh_onedrive_token(conn, db)
async with httpx.AsyncClient() as client:
resp = await client.get(f"https://graph.microsoft.com/v1.0/me/drive/root/search(q='{q}')",
headers={"Authorization": f"Bearer {access_token}"},
params={"$top": 50})
if resp.status_code != 200:
return {"items": []}
items = []
for f in resp.json().get("value", []):
is_folder = "folder" in f
items.append({
"id": f["id"], "name": f["name"],
"type": "folder" if is_folder else "file",
"size": f.get("size", 0), "modified": f.get("lastModifiedDateTime", ""),
"importable": not is_folder and is_importable(f["name"]),
})
return {"items": items}
elif conn.provider == "dropbox":
access_token = await refresh_dropbox_token(conn, db)
async with httpx.AsyncClient() as client:
resp = await client.post("https://api.dropboxapi.com/2/files/search_v2",
headers={"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"},
json={"query": q, "options": {"max_results": 50}})
if resp.status_code != 200:
return {"items": []}
items = []
for match in resp.json().get("matches", []):
entry = match.get("metadata", {}).get("metadata", {})
is_folder = entry.get(".tag") == "folder"
items.append({
"id": entry.get("id", ""), "name": entry.get("name", ""),
"type": "folder" if is_folder else "file",
"size": entry.get("size", 0), "modified": entry.get("server_modified", ""),
"importable": not is_folder and is_importable(entry.get("name", "")),
})
return {"items": items}
elif conn.provider == "s3":
# S3 doesn't have native search — list with prefix filter
access_key = decrypt_token(conn.access_token_encrypted)
secret_key = decrypt_token(conn.refresh_token_encrypted)
meta = conn.provider_metadata
import boto3
s3 = boto3.client("s3", aws_access_key_id=access_key, aws_secret_access_key=secret_key,
region_name=meta.get("region", "us-east-1"))
resp = s3.list_objects_v2(Bucket=meta.get("bucket", ""), MaxKeys=500)
items = []
for obj in resp.get("Contents", []):
name = obj["Key"].split("/")[-1]
if q.lower() in name.lower():
items.append({
"id": obj["Key"], "name": name, "type": "file",
"size": obj.get("Size", 0),
"modified": obj.get("LastModified", "").isoformat() if hasattr(obj.get("LastModified", ""), "isoformat") else "",
"importable": is_importable(name),
})
return {"items": items[:50]}
return {"items": []}
# ═══════════════════════════════════════════════════════════════════════════════
# ─── IMPORT ────────────────────────────────────────────────────────────────────
# ═══════════════════════════════════════════════════════════════════════════════
@router.post("/import")
async def import_files(request: Request, user: User = Depends(verify_token),
db: Session = Depends(get_db)):
"""Import files from cloud storage into DocuScan."""
body = await request.json()
connection_id = body.get("connection_id", "")
files = body.get("files", [])
if not connection_id or not files:
raise HTTPException(status_code=400, detail="connection_id and files are required")
conn = db.query(CloudConnection).filter_by(connection_id=connection_id, status="active").first()
if not conn:
raise HTTPException(status_code=404, detail="Connection not found")
if conn.user_id != user.id and user.role != "super_admin":
raise HTTPException(status_code=403, detail="Not authorized")
import_ids = []
for f in files:
import_id = f"ci_{uuid.uuid4().hex[:8]}"
ci = CloudImport(
import_id=import_id,
connection_id=connection_id,
provider=conn.provider,
cloud_file_id=f.get("cloud_file_id", f.get("id", "")),
cloud_file_path=f.get("path", ""),
cloud_file_name=f.get("name", "unknown"),
cloud_file_size=f.get("size", 0),
status="downloading",
user_email=user.email,
org_id=user.org_id,
)
db.add(ci)
import_ids.append(import_id)
db.commit()
# Kick off async downloads
for import_id in import_ids:
asyncio.create_task(download_and_process(import_id, connection_id, user.email, user.org_id))
return {"message": f"Importing {len(files)} file(s)", "import_ids": import_ids}
async def download_and_process(import_id: str, connection_id: str, user_email: str, org_id: int):
"""Download file from cloud provider and process it through DocuScan pipeline."""
db = SessionLocal()
try:
ci = db.query(CloudImport).filter_by(import_id=import_id).first()
conn = db.query(CloudConnection).filter_by(connection_id=connection_id).first()
if not ci or not conn:
return
# Download file content
file_bytes = None
filename = ci.cloud_file_name
try:
if conn.provider == "google_drive":
file_bytes = await download_google_drive_file(conn, ci.cloud_file_id, db)
elif conn.provider == "onedrive":
file_bytes = await download_onedrive_file(conn, ci.cloud_file_id, db)
elif conn.provider == "s3":
file_bytes = await download_s3_file(conn, ci.cloud_file_id)
elif conn.provider == "dropbox":
file_bytes = await download_dropbox_file(conn, ci.cloud_file_path or ci.cloud_file_id, db)
except Exception as e:
ci.status = "error"
ci.error = str(e)
db.commit()
return
if not file_bytes:
ci.status = "error"
ci.error = "Download returned empty content"
db.commit()
return
# Save to disk
doc_id = f"doc_{uuid.uuid4().hex[:8]}"
file_path = UPLOAD_DIR / f"{doc_id}_{filename}"
with open(file_path, "wb") as f:
f.write(file_bytes)
# Create Document row (same as upload endpoint)
import os
doc = Document(
doc_id=doc_id,
filename=filename,
file_path=str(file_path),
file_size=os.path.getsize(file_path),
uploaded_by=user_email,
org_id=org_id,
status="processing",
conversion_status={"text": "pending", "tables": "pending", "images": "pending"},
)
db.add(doc)
ci.doc_id = doc_id
ci.status = "processing"
db.commit()
# Reuse existing process_document pipeline
from backend import process_document
await process_document(doc_id, file_path, filename, user_email, org_id)
# Update import status
ci_refresh = db.query(CloudImport).filter_by(import_id=import_id).first()
doc_refresh = db.query(Document).filter_by(doc_id=doc_id).first()
if ci_refresh and doc_refresh:
ci_refresh.status = "ready" if doc_refresh.status == "ready" else doc_refresh.status
if doc_refresh.error:
ci_refresh.error = doc_refresh.error
db.commit()
except Exception as e:
logger.exception(f"Import {import_id} failed: {e}")
ci = db.query(CloudImport).filter_by(import_id=import_id).first()
if ci:
ci.status = "error"
ci.error = str(e)
db.commit()
finally:
db.close()
# ─── Download helpers ──────────────────────────────────────────────────────────
async def download_google_drive_file(conn: CloudConnection, file_id: str, db: Session) -> bytes:
access_token = await refresh_google_token(conn, db)
import httpx
async with httpx.AsyncClient() as client:
resp = await client.get(
f"https://www.googleapis.com/drive/v3/files/{file_id}?alt=media",
headers={"Authorization": f"Bearer {access_token}"},
follow_redirects=True,
)
if resp.status_code != 200:
raise Exception(f"Google Drive download failed ({resp.status_code})")
return resp.content
async def download_onedrive_file(conn: CloudConnection, item_id: str, db: Session) -> bytes:
access_token = await refresh_onedrive_token(conn, db)
import httpx
async with httpx.AsyncClient() as client:
resp = await client.get(
f"https://graph.microsoft.com/v1.0/me/drive/items/{item_id}/content",
headers={"Authorization": f"Bearer {access_token}"},
follow_redirects=True,
)
if resp.status_code != 200:
raise Exception(f"OneDrive download failed ({resp.status_code})")
return resp.content
async def download_s3_file(conn: CloudConnection, key: str) -> bytes:
access_key = decrypt_token(conn.access_token_encrypted)
secret_key = decrypt_token(conn.refresh_token_encrypted)
meta = conn.provider_metadata
import boto3
s3 = boto3.client("s3", aws_access_key_id=access_key, aws_secret_access_key=secret_key,
region_name=meta.get("region", "us-east-1"))
obj = s3.get_object(Bucket=meta.get("bucket", ""), Key=key)
return obj["Body"].read()
async def download_dropbox_file(conn: CloudConnection, path: str, db: Session) -> bytes:
access_token = await refresh_dropbox_token(conn, db)
import httpx
async with httpx.AsyncClient() as client:
resp = await client.post(
"https://content.dropboxapi.com/2/files/download",
headers={
"Authorization": f"Bearer {access_token}",
"Dropbox-API-Arg": json.dumps({"path": path}),
},
)
if resp.status_code != 200:
raise Exception(f"Dropbox download failed ({resp.status_code})")
return resp.content
# ═══════════════════════════════════════════════════════════════════════════════
# ─── IMPORT HISTORY ────────────────────────────────────────────────────────────
# ═══════════════════════════════════════════════════════════════════════════════
@router.get("/imports")
async def list_imports(user: User = Depends(verify_token), db: Session = Depends(get_db)):
"""List recent cloud imports for the user."""
q = db.query(CloudImport).filter_by(user_email=user.email)
if user.role != "super_admin":
q = q.filter_by(org_id=user.org_id)
imports = q.order_by(CloudImport.created_at.desc()).limit(50).all()
return {"imports": [ci.to_dict() for ci in imports]}
# ═══════════════════════════════════════════════════════════════════════════════
# ─── EMAIL FORWARDING (Mailgun Inbound Routes) ────────────────────────────────
# ═══════════════════════════════════════════════════════════════════════════════
#
# Multi-tenant routing via To address:
# {org-slug}@docuscan.5gvector.com → routes to that org
# statements@docuscan.5gvector.com → falls back to default org
#
# Mailgun POSTs multipart form data with:
# sender, from, recipient, To, subject, body-plain, body-html,
# attachment-1, attachment-2, ... (file uploads, no count field)
# timestamp, token, signature (for HMAC verification)
# ═══════════════════════════════════════════════════════════════════════════════
EMAIL_DOMAIN = "docuscan.5gvector.com"
MAILGUN_WEBHOOK_SIGNING_KEY = os.environ.get("MAILGUN_WEBHOOK_SIGNING_KEY", "")
def verify_mailgun_signature(timestamp: str, token: str, signature: str) -> bool:
"""Verify Mailgun webhook signature using HMAC-SHA256."""
if not MAILGUN_WEBHOOK_SIGNING_KEY:
return True # Skip verification if no key configured
import hmac, hashlib
expected = hmac.new(
MAILGUN_WEBHOOK_SIGNING_KEY.encode(),
f"{timestamp}{token}".encode(),
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(expected, signature)
def parse_org_from_to_address(to_address: str, db: Session):
"""Extract org from to-address like 'docuscan@docuscan.5gvector.com' or 'Org Name <slug@domain>'."""
import re
# Extract email from "Name <email>" format
match = re.search(r'<([^>]+)>', to_address)
email = match.group(1) if match else to_address.strip()
local_part = email.split("@")[0].lower().strip()
# Try to match org slug
org = db.query(Organization).filter(
Organization.slug == local_part,
Organization.is_active == True,
).first()
if org:
return org
# Fallback: "statements" or any unmatched → default org
return db.query(Organization).first()
@router.post("/email/inbound")
async def email_inbound(request: Request):
"""
Mailgun Inbound Routes webhook — receives forwarded emails with attachments.
No auth required (Mailgun POSTs directly). Secured by optional HMAC signature.
"""
db = SessionLocal()
try:
form = await request.form()
# Optional: verify Mailgun webhook signature
mg_timestamp = form.get("timestamp", "")
mg_token = form.get("token", "")
mg_signature = form.get("signature", "")
if MAILGUN_WEBHOOK_SIGNING_KEY and not verify_mailgun_signature(mg_timestamp, mg_token, mg_signature):
logger.warning("Email inbound: invalid Mailgun signature")
return JSONResponse({"error": "unauthorized"}, status_code=406)
# Mailgun fields: "sender" is clean email, "from" includes display name
from_email = form.get("sender", form.get("from", "unknown@unknown.com"))
to_email = form.get("recipient", form.get("To", form.get("to", "")))
subject = form.get("subject", "(no subject)")
# Extract clean from email
import re
from_match = re.search(r'<([^>]+)>', from_email)
from_clean = from_match.group(1) if from_match else from_email.strip()
# Route to org based on To address
org = parse_org_from_to_address(to_email, db)
if not org:
logger.error(f"Email inbound: no org found for to={to_email}")
return JSONResponse({"error": "no matching organization"}, status_code=200)
# Find a user in this org to attribute the import to (prefer admin, then any active user)
attributed_user = (
db.query(User)
.filter(User.org_id == org.id, User.is_active == True)
.order_by(User.role.desc())
.first()
)
attributed_email = attributed_user.email if attributed_user else from_clean
# Extract attachments — Mailgun uses "attachment-1", "attachment-2", etc.
# Also scan all form keys for any UploadFile objects (handles both formats)
processed = 0
attachment_keys = []
for key in form:
val = form[key]
# Mailgun: attachment-1, attachment-2, ...
# SendGrid compat: attachment1, attachment2, ...
if hasattr(val, "read") and (key.startswith("attachment-") or key.startswith("attachment")):
attachment_keys.append(key)
for key in attachment_keys:
attachment = form[key]
filename = getattr(attachment, "filename", f"{key}.bin")
content_type = getattr(attachment, "content_type", "application/octet-stream")
# Only process importable file types
if not is_importable(filename):
logger.info(f"Email inbound: skipping non-importable attachment {filename}")
continue
# Read file content
file_bytes = await attachment.read()
if not file_bytes:
continue
import_id = f"ei_{uuid.uuid4().hex[:8]}"
doc_id = f"doc_{uuid.uuid4().hex[:8]}"
# Save to disk
file_path = UPLOAD_DIR / f"{doc_id}_{filename}"
with open(file_path, "wb") as f:
f.write(file_bytes)
file_size = len(file_bytes)
# Create Document row
doc = Document(
doc_id=doc_id,
filename=filename,
file_path=str(file_path),
file_size=file_size,
uploaded_by=attributed_email,
org_id=org.id,
status="processing",
conversion_status={"text": "pending", "tables": "pending", "images": "pending"},
)
db.add(doc)
# Create EmailImport row
ei = EmailImport(
import_id=import_id,
org_id=org.id,
from_email=from_clean,
to_email=to_email,
subject=subject,
attachment_name=filename,
attachment_size=file_size,
doc_id=doc_id,
status="processing",
matched_user_email=attributed_email,
)
db.add(ei)
db.commit()
# Kick off async processing
asyncio.create_task(
process_email_attachment(import_id, doc_id, str(file_path), filename, attributed_email, org.id)
)
processed += 1
if processed == 0 and len(attachment_keys) == 0:
# No attachments — still log the email
import_id = f"ei_{uuid.uuid4().hex[:8]}"
ei = EmailImport(
import_id=import_id,
org_id=org.id if org else None,
from_email=from_clean,
to_email=to_email,
subject=subject,
attachment_name="(no attachments)",
attachment_size=0,
status="error",
error="Email had no attachments to process",
matched_user_email=attributed_email if org else None,
)
db.add(ei)
db.commit()
logger.info(f"Email inbound: from={from_clean} to={to_email} org={org.slug if org else 'none'} attachments={processed}")
# Mailgun expects 200 OK (non-200 triggers retries)
return JSONResponse({"message": f"Processed {processed} attachment(s)", "org": org.slug if org else None})
except Exception as e:
logger.exception(f"Email inbound error: {e}")
return JSONResponse({"error": str(e)}, status_code=200) # Always 200 to prevent retries
finally:
db.close()
async def process_email_attachment(import_id: str, doc_id: str, file_path: str,
filename: str, user_email: str, org_id: int):
"""Process an email attachment through the DocuScan pipeline."""
db = SessionLocal()
try:
from backend import process_document
await process_document(doc_id, file_path, filename, user_email, org_id)
# Update email import status
ei = db.query(EmailImport).filter_by(import_id=import_id).first()
doc = db.query(Document).filter_by(doc_id=doc_id).first()
if ei and doc:
ei.status = "ready" if doc.status == "ready" else doc.status
if doc.error:
ei.error = doc.error
db.commit()
except Exception as e:
logger.exception(f"Email attachment processing failed: {e}")
ei = db.query(EmailImport).filter_by(import_id=import_id).first()
if ei:
ei.status = "error"
ei.error = str(e)
db.commit()
finally:
db.close()
@router.get("/email/imports")
async def list_email_imports(user: User = Depends(verify_token), db: Session = Depends(get_db)):
"""List recent email imports for the user's org."""
q = db.query(EmailImport)
if user.role != "super_admin":
q = q.filter_by(org_id=user.org_id)
imports = q.order_by(EmailImport.created_at.desc()).limit(50).all()
return {"imports": [ei.to_dict() for ei in imports]}
@router.get("/email/address")
async def get_email_address(user: User = Depends(verify_token), db: Session = Depends(get_db)):
"""Get the inbound email address for the user's org."""
org = db.query(Organization).filter_by(id=user.org_id).first()
if not org:
return {"address": f"statements@{EMAIL_DOMAIN}", "org_slug": "default"}
return {"address": f"{org.slug}@{EMAIL_DOMAIN}", "org_slug": org.slug}