Spaces:
Running
Running
imtrt004
feat: storage capacity queue β defer uploads when 95% full, sweep expired docs, drain queue
c19ed49 | """ | |
| Upload queue: when Supabase Storage is β₯95% full, uploads are queued. | |
| The queue is stored in the `upload_queue` table. A background sweep runs | |
| on every /upload and /health request to drain the queue when space frees up | |
| (docs expire and are cleaned up by the expiry sweeper in app.py). | |
| """ | |
| from __future__ import annotations | |
| from datetime import datetime, UTC | |
| from supabase import create_client | |
| import os | |
| import uuid as _uuid | |
| # ββ configurable limits ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Default: 950 MB = ~95% of Supabase free 1 GB storage tier | |
| QUOTA_BYTES: int = int(os.environ.get("STORAGE_QUOTA_BYTES", str(950 * 1024 * 1024))) | |
| # Average seconds to fully process one document (parse+embed) | |
| AVG_PROCESS_SECS: int = int(os.environ.get("AVG_PROCESS_SECS", "120")) | |
| def _client(): | |
| return create_client(os.environ["SUPABASE_URL"], os.environ["SUPABASE_KEY"]) | |
| # ββ storage accounting βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_used_storage_bytes() -> int: | |
| """Sum file_size_bytes of all non-expired, non-failed documents.""" | |
| now = datetime.now(UTC).isoformat() | |
| result = ( | |
| _client() | |
| .table("documents") | |
| .select("file_size_bytes") | |
| .gt("expires_at", now) | |
| .neq("status", "error") | |
| .execute() | |
| ) | |
| return sum((row.get("file_size_bytes") or 0) for row in (result.data or [])) | |
| def is_storage_near_full(new_file_bytes: int = 0) -> bool: | |
| return (get_used_storage_bytes() + new_file_bytes) >= QUOTA_BYTES | |
| # ββ queue management βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def enqueue_upload( | |
| user_id: str, | |
| filename: str, | |
| file_size_bytes: int, | |
| file_content: bytes, | |
| content_type: str, | |
| ) -> dict: | |
| """ | |
| Store the raw file in Supabase Storage under a temp 'queue/' prefix, | |
| insert a row into upload_queue, and return queue position + ETA. | |
| """ | |
| client = _client() | |
| queue_id = str(_uuid.uuid4()) | |
| storage_path = f"queue/{user_id}/{queue_id}/{filename}" | |
| client.storage.from_("documents").upload( | |
| path=storage_path, | |
| file=file_content, | |
| file_options={"content-type": content_type or "application/octet-stream"}, | |
| ) | |
| # Count items already waiting so we can give an accurate position | |
| waiting: int = ( | |
| client.table("upload_queue") | |
| .select("id", count="exact") | |
| .eq("status", "waiting") | |
| .execute() | |
| .count | |
| or 0 | |
| ) | |
| position = waiting + 1 | |
| client.table("upload_queue").insert({ | |
| "id": queue_id, | |
| "user_id": user_id, | |
| "filename": filename, | |
| "file_size_bytes": file_size_bytes, | |
| "content_type": content_type or "application/octet-stream", | |
| "storage_path": storage_path, | |
| "status": "waiting", | |
| }).execute() | |
| est_mins = round((position * AVG_PROCESS_SECS) / 60, 1) | |
| return { | |
| "queued": True, | |
| "queue_id": queue_id, | |
| "position": position, | |
| "estimated_wait_minutes": est_mins, | |
| } | |
| def get_queue_status(user_id: str, queue_id: str) -> dict: | |
| """Return current queue position, ETA, or final doc_id if done.""" | |
| client = _client() | |
| try: | |
| row = ( | |
| client.table("upload_queue") | |
| .select("*") | |
| .eq("id", queue_id) | |
| .eq("user_id", user_id) | |
| .single() | |
| .execute() | |
| ) | |
| except Exception: | |
| return {"status": "not_found"} | |
| if not row.data: | |
| return {"status": "not_found"} | |
| data = row.data | |
| if data["status"] == "done": | |
| return {"status": "done", "doc_id": data.get("doc_id")} | |
| if data["status"] == "failed": | |
| return {"status": "failed", "error": data.get("error", "Unknown error")} | |
| if data["status"] == "processing": | |
| return {"status": "processing", "position": 0, "estimated_wait_minutes": 1} | |
| # Count items queued before this one | |
| position: int = ( | |
| client.table("upload_queue") | |
| .select("id", count="exact") | |
| .eq("status", "waiting") | |
| .lt("queued_at", data["queued_at"]) | |
| .execute() | |
| .count | |
| or 0 | |
| ) + 1 | |
| est_mins = round((position * AVG_PROCESS_SECS) / 60, 1) | |
| return { | |
| "status": "waiting", | |
| "position": position, | |
| "estimated_wait_minutes": est_mins, | |
| } | |
| # ββ background queue drainer βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def try_process_next_queued() -> bool: | |
| """ | |
| Process the oldest waiting queue item if storage has freed up. | |
| Returns True if an item was processed (caller can chain calls). | |
| Designed to run as a FastAPI BackgroundTask. | |
| """ | |
| # Lazy imports to avoid circular dependencies | |
| from ingestion.parser import parse_file | |
| from ingestion.chunker import smart_chunk | |
| from retrieval.embedder import embed_chunks | |
| from retrieval.vectorstore import store_chunks | |
| from persistence.tier import get_user_tier, get_expiry | |
| client = _client() | |
| # Check storage; bail early if still full | |
| if is_storage_near_full(): | |
| return False | |
| # Grab oldest waiting item | |
| result = ( | |
| client.table("upload_queue") | |
| .select("*") | |
| .eq("status", "waiting") | |
| .order("queued_at") | |
| .limit(1) | |
| .execute() | |
| ) | |
| if not result.data: | |
| return False | |
| item = result.data[0] | |
| queue_id = item["id"] | |
| # Mark as processing (prevents double-processing if two requests fire simultaneously) | |
| client.table("upload_queue").update({"status": "processing"}).eq("id", queue_id).execute() | |
| try: | |
| # Retrieve from temp storage | |
| file_bytes: bytes = client.storage.from_("documents").download(item["storage_path"]) | |
| user_id = item["user_id"] | |
| filename = item["filename"] | |
| file_size = item.get("file_size_bytes", len(file_bytes)) | |
| tier = get_user_tier(user_id) | |
| expires = get_expiry(tier) | |
| doc_id = str(_uuid.uuid4()) | |
| perm_path = f"{user_id}/{doc_id}/{filename}" | |
| # Move to permanent storage path | |
| client.storage.from_("documents").upload( | |
| path=perm_path, | |
| file=file_bytes, | |
| file_options={"content-type": item["content_type"]}, | |
| ) | |
| # Create document metadata row | |
| client.table("documents").insert({ | |
| "id": doc_id, | |
| "user_id": user_id, | |
| "filename": filename, | |
| "status": "processing", | |
| "tier_at_upload": str(tier), | |
| "expires_at": expires.isoformat(), | |
| "file_size_bytes": file_size, | |
| }).execute() | |
| # Parse β chunk β embed β store | |
| text = parse_file(file_bytes, filename) | |
| chunks = smart_chunk(text) | |
| embeds = embed_chunks(chunks) | |
| store_chunks(doc_id, user_id, chunks, embeds, expires) | |
| client.table("documents").update({ | |
| "status": "ready", | |
| "chunk_count": len(chunks), | |
| }).eq("id", doc_id).execute() | |
| # Mark queue item done | |
| client.table("upload_queue").update({ | |
| "status": "done", | |
| "doc_id": doc_id, | |
| "processed_at": datetime.now(UTC).isoformat(), | |
| }).eq("id", queue_id).execute() | |
| # Remove temp file to free storage | |
| try: | |
| client.storage.from_("documents").remove([item["storage_path"]]) | |
| except Exception: | |
| pass | |
| return True | |
| except Exception as exc: | |
| client.table("upload_queue").update({ | |
| "status": "failed", | |
| "error": str(exc), | |
| }).eq("id", queue_id).execute() | |
| return False | |
| # ββ expiry sweeper βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def sweep_expired_documents() -> int: | |
| """ | |
| Delete expired documents from Storage + their DB rows. | |
| Returns count deleted. Call from background tasks so the queue can drain. | |
| """ | |
| client = _client() | |
| now = datetime.now(UTC).isoformat() | |
| expired = ( | |
| client.table("documents") | |
| .select("id,user_id,filename") | |
| .lt("expires_at", now) | |
| .execute() | |
| .data | |
| or [] | |
| ) | |
| deleted = 0 | |
| for doc in expired: | |
| # Best-effort storage delete | |
| try: | |
| path = f"{doc['user_id']}/{doc['id']}/{doc['filename']}" | |
| client.storage.from_("documents").remove([path]) | |
| except Exception: | |
| pass | |
| # Remove chunks | |
| try: | |
| client.table("chunks").delete().eq("doc_id", doc["id"]).execute() | |
| except Exception: | |
| pass | |
| # Remove doc row | |
| try: | |
| client.table("documents").delete().eq("id", doc["id"]).execute() | |
| except Exception: | |
| pass | |
| deleted += 1 | |
| return deleted | |