imtrt004
feat: storage capacity queue β€” defer uploads when 95% full, sweep expired docs, drain queue
c19ed49
"""
Upload queue: when Supabase Storage is β‰₯95% full, uploads are queued.
The queue is stored in the `upload_queue` table. A background sweep runs
on every /upload and /health request to drain the queue when space frees up
(docs expire and are cleaned up by the expiry sweeper in app.py).
"""
from __future__ import annotations
from datetime import datetime, UTC
from supabase import create_client
import os
import uuid as _uuid
# ── configurable limits ──────────────────────────────────────────────────────
# Default: 950 MB = ~95% of Supabase free 1 GB storage tier
QUOTA_BYTES: int = int(os.environ.get("STORAGE_QUOTA_BYTES", str(950 * 1024 * 1024)))
# Average seconds to fully process one document (parse+embed)
AVG_PROCESS_SECS: int = int(os.environ.get("AVG_PROCESS_SECS", "120"))
def _client():
return create_client(os.environ["SUPABASE_URL"], os.environ["SUPABASE_KEY"])
# ── storage accounting ───────────────────────────────────────────────────────
def get_used_storage_bytes() -> int:
"""Sum file_size_bytes of all non-expired, non-failed documents."""
now = datetime.now(UTC).isoformat()
result = (
_client()
.table("documents")
.select("file_size_bytes")
.gt("expires_at", now)
.neq("status", "error")
.execute()
)
return sum((row.get("file_size_bytes") or 0) for row in (result.data or []))
def is_storage_near_full(new_file_bytes: int = 0) -> bool:
return (get_used_storage_bytes() + new_file_bytes) >= QUOTA_BYTES
# ── queue management ─────────────────────────────────────────────────────────
def enqueue_upload(
user_id: str,
filename: str,
file_size_bytes: int,
file_content: bytes,
content_type: str,
) -> dict:
"""
Store the raw file in Supabase Storage under a temp 'queue/' prefix,
insert a row into upload_queue, and return queue position + ETA.
"""
client = _client()
queue_id = str(_uuid.uuid4())
storage_path = f"queue/{user_id}/{queue_id}/{filename}"
client.storage.from_("documents").upload(
path=storage_path,
file=file_content,
file_options={"content-type": content_type or "application/octet-stream"},
)
# Count items already waiting so we can give an accurate position
waiting: int = (
client.table("upload_queue")
.select("id", count="exact")
.eq("status", "waiting")
.execute()
.count
or 0
)
position = waiting + 1
client.table("upload_queue").insert({
"id": queue_id,
"user_id": user_id,
"filename": filename,
"file_size_bytes": file_size_bytes,
"content_type": content_type or "application/octet-stream",
"storage_path": storage_path,
"status": "waiting",
}).execute()
est_mins = round((position * AVG_PROCESS_SECS) / 60, 1)
return {
"queued": True,
"queue_id": queue_id,
"position": position,
"estimated_wait_minutes": est_mins,
}
def get_queue_status(user_id: str, queue_id: str) -> dict:
"""Return current queue position, ETA, or final doc_id if done."""
client = _client()
try:
row = (
client.table("upload_queue")
.select("*")
.eq("id", queue_id)
.eq("user_id", user_id)
.single()
.execute()
)
except Exception:
return {"status": "not_found"}
if not row.data:
return {"status": "not_found"}
data = row.data
if data["status"] == "done":
return {"status": "done", "doc_id": data.get("doc_id")}
if data["status"] == "failed":
return {"status": "failed", "error": data.get("error", "Unknown error")}
if data["status"] == "processing":
return {"status": "processing", "position": 0, "estimated_wait_minutes": 1}
# Count items queued before this one
position: int = (
client.table("upload_queue")
.select("id", count="exact")
.eq("status", "waiting")
.lt("queued_at", data["queued_at"])
.execute()
.count
or 0
) + 1
est_mins = round((position * AVG_PROCESS_SECS) / 60, 1)
return {
"status": "waiting",
"position": position,
"estimated_wait_minutes": est_mins,
}
# ── background queue drainer ─────────────────────────────────────────────────
async def try_process_next_queued() -> bool:
"""
Process the oldest waiting queue item if storage has freed up.
Returns True if an item was processed (caller can chain calls).
Designed to run as a FastAPI BackgroundTask.
"""
# Lazy imports to avoid circular dependencies
from ingestion.parser import parse_file
from ingestion.chunker import smart_chunk
from retrieval.embedder import embed_chunks
from retrieval.vectorstore import store_chunks
from persistence.tier import get_user_tier, get_expiry
client = _client()
# Check storage; bail early if still full
if is_storage_near_full():
return False
# Grab oldest waiting item
result = (
client.table("upload_queue")
.select("*")
.eq("status", "waiting")
.order("queued_at")
.limit(1)
.execute()
)
if not result.data:
return False
item = result.data[0]
queue_id = item["id"]
# Mark as processing (prevents double-processing if two requests fire simultaneously)
client.table("upload_queue").update({"status": "processing"}).eq("id", queue_id).execute()
try:
# Retrieve from temp storage
file_bytes: bytes = client.storage.from_("documents").download(item["storage_path"])
user_id = item["user_id"]
filename = item["filename"]
file_size = item.get("file_size_bytes", len(file_bytes))
tier = get_user_tier(user_id)
expires = get_expiry(tier)
doc_id = str(_uuid.uuid4())
perm_path = f"{user_id}/{doc_id}/{filename}"
# Move to permanent storage path
client.storage.from_("documents").upload(
path=perm_path,
file=file_bytes,
file_options={"content-type": item["content_type"]},
)
# Create document metadata row
client.table("documents").insert({
"id": doc_id,
"user_id": user_id,
"filename": filename,
"status": "processing",
"tier_at_upload": str(tier),
"expires_at": expires.isoformat(),
"file_size_bytes": file_size,
}).execute()
# Parse β†’ chunk β†’ embed β†’ store
text = parse_file(file_bytes, filename)
chunks = smart_chunk(text)
embeds = embed_chunks(chunks)
store_chunks(doc_id, user_id, chunks, embeds, expires)
client.table("documents").update({
"status": "ready",
"chunk_count": len(chunks),
}).eq("id", doc_id).execute()
# Mark queue item done
client.table("upload_queue").update({
"status": "done",
"doc_id": doc_id,
"processed_at": datetime.now(UTC).isoformat(),
}).eq("id", queue_id).execute()
# Remove temp file to free storage
try:
client.storage.from_("documents").remove([item["storage_path"]])
except Exception:
pass
return True
except Exception as exc:
client.table("upload_queue").update({
"status": "failed",
"error": str(exc),
}).eq("id", queue_id).execute()
return False
# ── expiry sweeper ───────────────────────────────────────────────────────────
def sweep_expired_documents() -> int:
"""
Delete expired documents from Storage + their DB rows.
Returns count deleted. Call from background tasks so the queue can drain.
"""
client = _client()
now = datetime.now(UTC).isoformat()
expired = (
client.table("documents")
.select("id,user_id,filename")
.lt("expires_at", now)
.execute()
.data
or []
)
deleted = 0
for doc in expired:
# Best-effort storage delete
try:
path = f"{doc['user_id']}/{doc['id']}/{doc['filename']}"
client.storage.from_("documents").remove([path])
except Exception:
pass
# Remove chunks
try:
client.table("chunks").delete().eq("doc_id", doc["id"]).execute()
except Exception:
pass
# Remove doc row
try:
client.table("documents").delete().eq("id", doc["id"]).execute()
except Exception:
pass
deleted += 1
return deleted