imtrt004 commited on
Commit
c19ed49
·
1 Parent(s): 96e8fa5

feat: storage capacity queue — defer uploads when 95% full, sweep expired docs, drain queue

Browse files
Files changed (2) hide show
  1. app.py +65 -4
  2. persistence/queue.py +283 -0
app.py CHANGED
@@ -21,6 +21,15 @@ from persistence.tier import (
21
  check_message_limit,
22
  Tier,
23
  )
 
 
 
 
 
 
 
 
 
24
 
25
 
26
  def _supa():
@@ -65,10 +74,24 @@ async def upload(
65
  bg: BackgroundTasks,
66
  ):
67
  content = await file.read()
68
- ok, msg = can_upload(user_id, len(content))
 
 
69
  if not ok:
70
  raise HTTPException(status_code=403, detail=msg)
71
 
 
 
 
 
 
 
 
 
 
 
 
 
72
  tier = get_user_tier(user_id)
73
  expires = get_expiry(tier)
74
  doc_id = str(uuid.uuid4())
@@ -81,7 +104,7 @@ async def upload(
81
  file_options={"content-type": file.content_type or "application/octet-stream"},
82
  )
83
 
84
- # Create doc metadata row
85
  supa.table("documents").insert({
86
  "id": doc_id,
87
  "user_id": user_id,
@@ -89,6 +112,7 @@ async def upload(
89
  "status": "processing",
90
  "tier_at_upload": str(tier),
91
  "expires_at": expires.isoformat(),
 
92
  }).execute()
93
 
94
  # Process in background (parse → chunk → embed → store)
@@ -206,6 +230,43 @@ async def doc_status(doc_id: str):
206
  return result.data
207
 
208
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
209
  @app.get("/health")
210
- def health():
211
- return {"status": "alive", "model": "Qwen3-4B-Instruct-Q4_K_M"}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
  check_message_limit,
22
  Tier,
23
  )
24
+ from persistence.queue import (
25
+ is_storage_near_full,
26
+ get_used_storage_bytes,
27
+ enqueue_upload,
28
+ get_queue_status,
29
+ try_process_next_queued,
30
+ sweep_expired_documents,
31
+ QUOTA_BYTES,
32
+ )
33
 
34
 
35
  def _supa():
 
74
  bg: BackgroundTasks,
75
  ):
76
  content = await file.read()
77
+ file_size = len(content)
78
+
79
+ ok, msg = can_upload(user_id, file_size)
80
  if not ok:
81
  raise HTTPException(status_code=403, detail=msg)
82
 
83
+ # ── Storage capacity gate ─────────────────────────────────────────────
84
+ if is_storage_near_full(file_size):
85
+ # Queue the upload; it will be processed once expired docs are purged
86
+ result = enqueue_upload(
87
+ user_id=user_id,
88
+ filename=file.filename or "upload",
89
+ file_size_bytes=file_size,
90
+ file_content=content,
91
+ content_type=file.content_type or "application/octet-stream",
92
+ )
93
+ return result # {queued: true, queue_id, position, estimated_wait_minutes}
94
+
95
  tier = get_user_tier(user_id)
96
  expires = get_expiry(tier)
97
  doc_id = str(uuid.uuid4())
 
104
  file_options={"content-type": file.content_type or "application/octet-stream"},
105
  )
106
 
107
+ # Create doc metadata row (include file size for storage accounting)
108
  supa.table("documents").insert({
109
  "id": doc_id,
110
  "user_id": user_id,
 
112
  "status": "processing",
113
  "tier_at_upload": str(tier),
114
  "expires_at": expires.isoformat(),
115
+ "file_size_bytes": file_size,
116
  }).execute()
117
 
118
  # Process in background (parse → chunk → embed → store)
 
230
  return result.data
231
 
232
 
233
+ @app.get("/queue-status/{user_id}/{queue_id}")
234
+ async def queue_status(user_id: str, queue_id: str, bg: BackgroundTasks):
235
+ """Poll upload queue position and ETA. Returns {status, position, estimated_wait_minutes} or {status:'done', doc_id}."""
236
+ # Opportunistically sweep expired docs and drain queue in background
237
+ bg.add_task(_sweep_and_drain)
238
+ return get_queue_status(user_id, queue_id)
239
+
240
+
241
+ @app.get("/storage-status")
242
+ async def storage_status():
243
+ """Current storage usage as bytes and percentage of quota."""
244
+ used = get_used_storage_bytes()
245
+ return {
246
+ "used_bytes": used,
247
+ "quota_bytes": QUOTA_BYTES,
248
+ "used_pct": round(used / QUOTA_BYTES * 100, 1),
249
+ "near_full": is_storage_near_full(),
250
+ }
251
+
252
+
253
  @app.get("/health")
254
+ async def health(bg: BackgroundTasks):
255
+ # Sweep + drain queue every health check (CF ping every 25 min)
256
+ bg.add_task(_sweep_and_drain)
257
+ return {"status": "alive", "model": "ReFusion 3.0 by Md Tusar Akon"}
258
+
259
+
260
+ async def _sweep_and_drain():
261
+ """Delete expired docs, then try to process the next queued upload."""
262
+ try:
263
+ deleted = sweep_expired_documents()
264
+ if deleted > 0:
265
+ # Storage freed up — process as many queued items as now fit
266
+ for _ in range(deleted):
267
+ if not await try_process_next_queued():
268
+ break
269
+ else:
270
+ await try_process_next_queued()
271
+ except Exception as exc:
272
+ print(f"⚠️ sweep_and_drain error: {exc}")
persistence/queue.py ADDED
@@ -0,0 +1,283 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Upload queue: when Supabase Storage is ≥95% full, uploads are queued.
3
+ The queue is stored in the `upload_queue` table. A background sweep runs
4
+ on every /upload and /health request to drain the queue when space frees up
5
+ (docs expire and are cleaned up by the expiry sweeper in app.py).
6
+ """
7
+ from __future__ import annotations
8
+
9
+ from datetime import datetime, UTC
10
+ from supabase import create_client
11
+ import os
12
+ import uuid as _uuid
13
+
14
+ # ── configurable limits ──────────────────────────────────────────────────────
15
+ # Default: 950 MB = ~95% of Supabase free 1 GB storage tier
16
+ QUOTA_BYTES: int = int(os.environ.get("STORAGE_QUOTA_BYTES", str(950 * 1024 * 1024)))
17
+ # Average seconds to fully process one document (parse+embed)
18
+ AVG_PROCESS_SECS: int = int(os.environ.get("AVG_PROCESS_SECS", "120"))
19
+
20
+
21
+ def _client():
22
+ return create_client(os.environ["SUPABASE_URL"], os.environ["SUPABASE_KEY"])
23
+
24
+
25
+ # ── storage accounting ───────────────────────────────────────────────────────
26
+
27
+ def get_used_storage_bytes() -> int:
28
+ """Sum file_size_bytes of all non-expired, non-failed documents."""
29
+ now = datetime.now(UTC).isoformat()
30
+ result = (
31
+ _client()
32
+ .table("documents")
33
+ .select("file_size_bytes")
34
+ .gt("expires_at", now)
35
+ .neq("status", "error")
36
+ .execute()
37
+ )
38
+ return sum((row.get("file_size_bytes") or 0) for row in (result.data or []))
39
+
40
+
41
+ def is_storage_near_full(new_file_bytes: int = 0) -> bool:
42
+ return (get_used_storage_bytes() + new_file_bytes) >= QUOTA_BYTES
43
+
44
+
45
+ # ── queue management ─────────────────────────────────────────────────────────
46
+
47
+ def enqueue_upload(
48
+ user_id: str,
49
+ filename: str,
50
+ file_size_bytes: int,
51
+ file_content: bytes,
52
+ content_type: str,
53
+ ) -> dict:
54
+ """
55
+ Store the raw file in Supabase Storage under a temp 'queue/' prefix,
56
+ insert a row into upload_queue, and return queue position + ETA.
57
+ """
58
+ client = _client()
59
+ queue_id = str(_uuid.uuid4())
60
+ storage_path = f"queue/{user_id}/{queue_id}/{filename}"
61
+
62
+ client.storage.from_("documents").upload(
63
+ path=storage_path,
64
+ file=file_content,
65
+ file_options={"content-type": content_type or "application/octet-stream"},
66
+ )
67
+
68
+ # Count items already waiting so we can give an accurate position
69
+ waiting: int = (
70
+ client.table("upload_queue")
71
+ .select("id", count="exact")
72
+ .eq("status", "waiting")
73
+ .execute()
74
+ .count
75
+ or 0
76
+ )
77
+ position = waiting + 1
78
+
79
+ client.table("upload_queue").insert({
80
+ "id": queue_id,
81
+ "user_id": user_id,
82
+ "filename": filename,
83
+ "file_size_bytes": file_size_bytes,
84
+ "content_type": content_type or "application/octet-stream",
85
+ "storage_path": storage_path,
86
+ "status": "waiting",
87
+ }).execute()
88
+
89
+ est_mins = round((position * AVG_PROCESS_SECS) / 60, 1)
90
+ return {
91
+ "queued": True,
92
+ "queue_id": queue_id,
93
+ "position": position,
94
+ "estimated_wait_minutes": est_mins,
95
+ }
96
+
97
+
98
+ def get_queue_status(user_id: str, queue_id: str) -> dict:
99
+ """Return current queue position, ETA, or final doc_id if done."""
100
+ client = _client()
101
+ try:
102
+ row = (
103
+ client.table("upload_queue")
104
+ .select("*")
105
+ .eq("id", queue_id)
106
+ .eq("user_id", user_id)
107
+ .single()
108
+ .execute()
109
+ )
110
+ except Exception:
111
+ return {"status": "not_found"}
112
+
113
+ if not row.data:
114
+ return {"status": "not_found"}
115
+
116
+ data = row.data
117
+ if data["status"] == "done":
118
+ return {"status": "done", "doc_id": data.get("doc_id")}
119
+ if data["status"] == "failed":
120
+ return {"status": "failed", "error": data.get("error", "Unknown error")}
121
+ if data["status"] == "processing":
122
+ return {"status": "processing", "position": 0, "estimated_wait_minutes": 1}
123
+
124
+ # Count items queued before this one
125
+ position: int = (
126
+ client.table("upload_queue")
127
+ .select("id", count="exact")
128
+ .eq("status", "waiting")
129
+ .lt("queued_at", data["queued_at"])
130
+ .execute()
131
+ .count
132
+ or 0
133
+ ) + 1
134
+
135
+ est_mins = round((position * AVG_PROCESS_SECS) / 60, 1)
136
+ return {
137
+ "status": "waiting",
138
+ "position": position,
139
+ "estimated_wait_minutes": est_mins,
140
+ }
141
+
142
+
143
+ # ── background queue drainer ─────────────────────────────────────────────────
144
+
145
+ async def try_process_next_queued() -> bool:
146
+ """
147
+ Process the oldest waiting queue item if storage has freed up.
148
+ Returns True if an item was processed (caller can chain calls).
149
+ Designed to run as a FastAPI BackgroundTask.
150
+ """
151
+ # Lazy imports to avoid circular dependencies
152
+ from ingestion.parser import parse_file
153
+ from ingestion.chunker import smart_chunk
154
+ from retrieval.embedder import embed_chunks
155
+ from retrieval.vectorstore import store_chunks
156
+ from persistence.tier import get_user_tier, get_expiry
157
+
158
+ client = _client()
159
+
160
+ # Check storage; bail early if still full
161
+ if is_storage_near_full():
162
+ return False
163
+
164
+ # Grab oldest waiting item
165
+ result = (
166
+ client.table("upload_queue")
167
+ .select("*")
168
+ .eq("status", "waiting")
169
+ .order("queued_at")
170
+ .limit(1)
171
+ .execute()
172
+ )
173
+ if not result.data:
174
+ return False
175
+
176
+ item = result.data[0]
177
+ queue_id = item["id"]
178
+
179
+ # Mark as processing (prevents double-processing if two requests fire simultaneously)
180
+ client.table("upload_queue").update({"status": "processing"}).eq("id", queue_id).execute()
181
+
182
+ try:
183
+ # Retrieve from temp storage
184
+ file_bytes: bytes = client.storage.from_("documents").download(item["storage_path"])
185
+
186
+ user_id = item["user_id"]
187
+ filename = item["filename"]
188
+ file_size = item.get("file_size_bytes", len(file_bytes))
189
+ tier = get_user_tier(user_id)
190
+ expires = get_expiry(tier)
191
+ doc_id = str(_uuid.uuid4())
192
+ perm_path = f"{user_id}/{doc_id}/{filename}"
193
+
194
+ # Move to permanent storage path
195
+ client.storage.from_("documents").upload(
196
+ path=perm_path,
197
+ file=file_bytes,
198
+ file_options={"content-type": item["content_type"]},
199
+ )
200
+
201
+ # Create document metadata row
202
+ client.table("documents").insert({
203
+ "id": doc_id,
204
+ "user_id": user_id,
205
+ "filename": filename,
206
+ "status": "processing",
207
+ "tier_at_upload": str(tier),
208
+ "expires_at": expires.isoformat(),
209
+ "file_size_bytes": file_size,
210
+ }).execute()
211
+
212
+ # Parse → chunk → embed → store
213
+ text = parse_file(file_bytes, filename)
214
+ chunks = smart_chunk(text)
215
+ embeds = embed_chunks(chunks)
216
+ store_chunks(doc_id, user_id, chunks, embeds, expires)
217
+
218
+ client.table("documents").update({
219
+ "status": "ready",
220
+ "chunk_count": len(chunks),
221
+ }).eq("id", doc_id).execute()
222
+
223
+ # Mark queue item done
224
+ client.table("upload_queue").update({
225
+ "status": "done",
226
+ "doc_id": doc_id,
227
+ "processed_at": datetime.now(UTC).isoformat(),
228
+ }).eq("id", queue_id).execute()
229
+
230
+ # Remove temp file to free storage
231
+ try:
232
+ client.storage.from_("documents").remove([item["storage_path"]])
233
+ except Exception:
234
+ pass
235
+
236
+ return True
237
+
238
+ except Exception as exc:
239
+ client.table("upload_queue").update({
240
+ "status": "failed",
241
+ "error": str(exc),
242
+ }).eq("id", queue_id).execute()
243
+ return False
244
+
245
+
246
+ # ── expiry sweeper ───────────────────────────────────────────────────────────
247
+
248
+ def sweep_expired_documents() -> int:
249
+ """
250
+ Delete expired documents from Storage + their DB rows.
251
+ Returns count deleted. Call from background tasks so the queue can drain.
252
+ """
253
+ client = _client()
254
+ now = datetime.now(UTC).isoformat()
255
+ expired = (
256
+ client.table("documents")
257
+ .select("id,user_id,filename")
258
+ .lt("expires_at", now)
259
+ .execute()
260
+ .data
261
+ or []
262
+ )
263
+ deleted = 0
264
+ for doc in expired:
265
+ # Best-effort storage delete
266
+ try:
267
+ path = f"{doc['user_id']}/{doc['id']}/{doc['filename']}"
268
+ client.storage.from_("documents").remove([path])
269
+ except Exception:
270
+ pass
271
+ # Remove chunks
272
+ try:
273
+ client.table("chunks").delete().eq("doc_id", doc["id"]).execute()
274
+ except Exception:
275
+ pass
276
+ # Remove doc row
277
+ try:
278
+ client.table("documents").delete().eq("id", doc["id"]).execute()
279
+ except Exception:
280
+ pass
281
+ deleted += 1
282
+
283
+ return deleted