""" Upload queue: when Supabase Storage is ≥95% full, uploads are queued. The queue is stored in the `upload_queue` table. A background sweep runs on every /upload and /health request to drain the queue when space frees up (docs expire and are cleaned up by the expiry sweeper in app.py). """ from __future__ import annotations from datetime import datetime, UTC from supabase import create_client import os import uuid as _uuid # ── configurable limits ────────────────────────────────────────────────────── # Default: 950 MB = ~95% of Supabase free 1 GB storage tier QUOTA_BYTES: int = int(os.environ.get("STORAGE_QUOTA_BYTES", str(950 * 1024 * 1024))) # Average seconds to fully process one document (parse+embed) AVG_PROCESS_SECS: int = int(os.environ.get("AVG_PROCESS_SECS", "120")) def _client(): return create_client(os.environ["SUPABASE_URL"], os.environ["SUPABASE_KEY"]) # ── storage accounting ─────────────────────────────────────────────────────── def get_used_storage_bytes() -> int: """Sum file_size_bytes of all non-expired, non-failed documents.""" now = datetime.now(UTC).isoformat() result = ( _client() .table("documents") .select("file_size_bytes") .gt("expires_at", now) .neq("status", "error") .execute() ) return sum((row.get("file_size_bytes") or 0) for row in (result.data or [])) def is_storage_near_full(new_file_bytes: int = 0) -> bool: return (get_used_storage_bytes() + new_file_bytes) >= QUOTA_BYTES # ── queue management ───────────────────────────────────────────────────────── def enqueue_upload( user_id: str, filename: str, file_size_bytes: int, file_content: bytes, content_type: str, ) -> dict: """ Store the raw file in Supabase Storage under a temp 'queue/' prefix, insert a row into upload_queue, and return queue position + ETA. """ client = _client() queue_id = str(_uuid.uuid4()) storage_path = f"queue/{user_id}/{queue_id}/{filename}" client.storage.from_("documents").upload( path=storage_path, file=file_content, file_options={"content-type": content_type or "application/octet-stream"}, ) # Count items already waiting so we can give an accurate position waiting: int = ( client.table("upload_queue") .select("id", count="exact") .eq("status", "waiting") .execute() .count or 0 ) position = waiting + 1 client.table("upload_queue").insert({ "id": queue_id, "user_id": user_id, "filename": filename, "file_size_bytes": file_size_bytes, "content_type": content_type or "application/octet-stream", "storage_path": storage_path, "status": "waiting", }).execute() est_mins = round((position * AVG_PROCESS_SECS) / 60, 1) return { "queued": True, "queue_id": queue_id, "position": position, "estimated_wait_minutes": est_mins, } def get_queue_status(user_id: str, queue_id: str) -> dict: """Return current queue position, ETA, or final doc_id if done.""" client = _client() try: row = ( client.table("upload_queue") .select("*") .eq("id", queue_id) .eq("user_id", user_id) .single() .execute() ) except Exception: return {"status": "not_found"} if not row.data: return {"status": "not_found"} data = row.data if data["status"] == "done": return {"status": "done", "doc_id": data.get("doc_id")} if data["status"] == "failed": return {"status": "failed", "error": data.get("error", "Unknown error")} if data["status"] == "processing": return {"status": "processing", "position": 0, "estimated_wait_minutes": 1} # Count items queued before this one position: int = ( client.table("upload_queue") .select("id", count="exact") .eq("status", "waiting") .lt("queued_at", data["queued_at"]) .execute() .count or 0 ) + 1 est_mins = round((position * AVG_PROCESS_SECS) / 60, 1) return { "status": "waiting", "position": position, "estimated_wait_minutes": est_mins, } # ── background queue drainer ───────────────────────────────────────────────── async def try_process_next_queued() -> bool: """ Process the oldest waiting queue item if storage has freed up. Returns True if an item was processed (caller can chain calls). Designed to run as a FastAPI BackgroundTask. """ # Lazy imports to avoid circular dependencies from ingestion.parser import parse_file from ingestion.chunker import smart_chunk from retrieval.embedder import embed_chunks from retrieval.vectorstore import store_chunks from persistence.tier import get_user_tier, get_expiry client = _client() # Check storage; bail early if still full if is_storage_near_full(): return False # Grab oldest waiting item result = ( client.table("upload_queue") .select("*") .eq("status", "waiting") .order("queued_at") .limit(1) .execute() ) if not result.data: return False item = result.data[0] queue_id = item["id"] # Mark as processing (prevents double-processing if two requests fire simultaneously) client.table("upload_queue").update({"status": "processing"}).eq("id", queue_id).execute() try: # Retrieve from temp storage file_bytes: bytes = client.storage.from_("documents").download(item["storage_path"]) user_id = item["user_id"] filename = item["filename"] file_size = item.get("file_size_bytes", len(file_bytes)) tier = get_user_tier(user_id) expires = get_expiry(tier) doc_id = str(_uuid.uuid4()) perm_path = f"{user_id}/{doc_id}/{filename}" # Move to permanent storage path client.storage.from_("documents").upload( path=perm_path, file=file_bytes, file_options={"content-type": item["content_type"]}, ) # Create document metadata row client.table("documents").insert({ "id": doc_id, "user_id": user_id, "filename": filename, "status": "processing", "tier_at_upload": str(tier), "expires_at": expires.isoformat(), "file_size_bytes": file_size, }).execute() # Parse → chunk → embed → store text = parse_file(file_bytes, filename) chunks = smart_chunk(text) embeds = embed_chunks(chunks) store_chunks(doc_id, user_id, chunks, embeds, expires) client.table("documents").update({ "status": "ready", "chunk_count": len(chunks), }).eq("id", doc_id).execute() # Mark queue item done client.table("upload_queue").update({ "status": "done", "doc_id": doc_id, "processed_at": datetime.now(UTC).isoformat(), }).eq("id", queue_id).execute() # Remove temp file to free storage try: client.storage.from_("documents").remove([item["storage_path"]]) except Exception: pass return True except Exception as exc: client.table("upload_queue").update({ "status": "failed", "error": str(exc), }).eq("id", queue_id).execute() return False # ── expiry sweeper ─────────────────────────────────────────────────────────── def sweep_expired_documents() -> int: """ Delete expired documents from Storage + their DB rows. Returns count deleted. Call from background tasks so the queue can drain. """ client = _client() now = datetime.now(UTC).isoformat() expired = ( client.table("documents") .select("id,user_id,filename") .lt("expires_at", now) .execute() .data or [] ) deleted = 0 for doc in expired: # Best-effort storage delete try: path = f"{doc['user_id']}/{doc['id']}/{doc['filename']}" client.storage.from_("documents").remove([path]) except Exception: pass # Remove chunks try: client.table("chunks").delete().eq("doc_id", doc["id"]).execute() except Exception: pass # Remove doc row try: client.table("documents").delete().eq("id", doc["id"]).execute() except Exception: pass deleted += 1 return deleted