AdarshDRC's picture
fix: Resolving backend
29bfc1f
"""
src/services/jobs.py — Phase 3: Async upload job queue
"""
import asyncio
import json
import uuid
from typing import Any, Optional
import aiohttp
from src.core.config import (
SUPABASE_URL, SUPABASE_SERVICE_KEY,
USE_ASYNC_UPLOADS,
)
from src.services.cache import cache
QUEUE_KEY = "upload_jobs_queue"
JOB_TTL = 86400 # 24 h
# ──────────────────────────────────────────────────────────────
# Supabase helpers
# ──────────────────────────────────────────────────────────────
def _supa_headers() -> dict:
return {
"apikey": SUPABASE_SERVICE_KEY,
"Authorization": f"Bearer {SUPABASE_SERVICE_KEY}",
"Content-Type": "application/json",
"Prefer": "return=minimal", # FIX: was "return=representation" which requires 200 not 201
}
def _clean_row(row: dict) -> dict:
"""
Remove None values before sending to Supabase.
Supabase REST rejects Python None in JSON — omit the key entirely
and let Postgres use the column default instead.
"""
return {k: v for k, v in row.items() if v is not None}
async def _supa_insert(table: str, row: dict) -> bool:
"""Returns True on success, False on failure. Logs errors explicitly."""
if not SUPABASE_URL or not SUPABASE_SERVICE_KEY:
print(f"[Supabase] SUPABASE_URL or SUPABASE_SERVICE_KEY not set — skipping insert")
return False
url = f"{SUPABASE_URL}/rest/v1/{table}"
clean = _clean_row(row)
try:
async with aiohttp.ClientSession() as s:
async with s.post(url, headers=_supa_headers(), json=clean) as r:
if r.status in (200, 201):
return True
else:
body = await r.text()
print(f"[Supabase INSERT ERROR] table={table} status={r.status} body={body[:500]}")
print(f"[Supabase INSERT ERROR] row_keys={list(clean.keys())}")
return False
except Exception as e:
print(f"[Supabase INSERT EXCEPTION] table={table} error={e}")
return False
async def _supa_patch(table: str, job_id: str, patch: dict) -> bool:
if not SUPABASE_URL or not SUPABASE_SERVICE_KEY:
return False
url = f"{SUPABASE_URL}/rest/v1/{table}?job_id=eq.{job_id}"
clean = _clean_row(patch)
try:
async with aiohttp.ClientSession() as s:
async with s.patch(url, headers=_supa_headers(), json=clean) as r:
if r.status not in (200, 201, 204):
body = await r.text()
print(f"[Supabase PATCH ERROR] job_id={job_id} status={r.status} body={body[:300]}")
return False
return True
except Exception as e:
print(f"[Supabase PATCH EXCEPTION] job_id={job_id} error={e}")
return False
async def _supa_get(table: str, job_id: str) -> Optional[dict]:
if not SUPABASE_URL or not SUPABASE_SERVICE_KEY:
return None
# Use select= to avoid ambiguity with column names
url = f"{SUPABASE_URL}/rest/v1/{table}?select=*&job_id=eq.{job_id}&limit=1"
headers = {**_supa_headers(), "Prefer": "return=representation"}
try:
async with aiohttp.ClientSession() as s:
async with s.get(url, headers=headers) as r:
if r.status == 200:
data = await r.json()
return data[0] if data else None
except Exception as e:
print(f"[Supabase GET EXCEPTION] job_id={job_id} error={e}")
return None
# ──────────────────────────────────────────────────────────────
# Public API
# ──────────────────────────────────────────────────────────────
async def create_job(
user_id: str,
folder: str,
total_files: int,
job_payload: dict,
) -> str:
job_id = str(uuid.uuid4())
# FIX: Only send columns that definitely exist in the table.
# Do NOT send result/error (null) — let Postgres default them.
# Do NOT send created_at/updated_at if table has DEFAULT NOW().
row = {
"job_id": job_id,
"user_id": user_id or "anonymous",
"folder": folder,
"status": "pending",
"total_files": total_files,
"processed_files": 0,
}
success = await _supa_insert("upload_jobs", row)
if success:
print(f"[Jobs] Created job {job_id} in Supabase ✓")
else:
print(f"[Jobs] Supabase insert FAILED for job {job_id} — job will still work via Redis")
# Redis is the source of truth for polling — works even if Supabase fails
await cache.set_json(
f"job:{job_id}",
{**row, "payload": job_payload, "status": "pending"},
ttl=JOB_TTL,
)
await cache.lpush(QUEUE_KEY, job_id)
return job_id
async def get_job_status(job_id: str) -> Optional[dict]:
# Check Redis first (fast path)
cached = await cache.get_json(f"job:{job_id}")
if cached:
cached.pop("payload", None)
return cached
# Fallback to Supabase
return await _supa_get("upload_jobs", job_id)
async def update_job_progress(job_id: str, processed: int, total: int) -> None:
patch = {
"status": "processing",
"processed_files": processed,
}
await _supa_patch("upload_jobs", job_id, patch)
cached = await cache.get_json(f"job:{job_id}") or {}
cached.update(patch)
await cache.set_json(f"job:{job_id}", cached, ttl=JOB_TTL)
async def complete_job(job_id: str, result: dict) -> None:
patch = {
"status": "completed",
"processed_files": result.get("files", 0),
"result": json.dumps(result), # JSONB column — serialize explicitly
}
await _supa_patch("upload_jobs", job_id, patch)
cached = await cache.get_json(f"job:{job_id}") or {}
cached.update({**patch, "result": result}) # keep as dict in Redis
cached.pop("payload", None)
await cache.set_json(f"job:{job_id}", cached, ttl=JOB_TTL)
async def fail_job(job_id: str, error: str) -> None:
patch = {
"status": "failed",
"error": str(error)[:500], # truncate to avoid DB limit issues
}
await _supa_patch("upload_jobs", job_id, patch)
cached = await cache.get_json(f"job:{job_id}") or {}
cached.update(patch)
cached.pop("payload", None)
await cache.set_json(f"job:{job_id}", cached, ttl=JOB_TTL)
# ──────────────────────────────────────────────────────────────
# Background worker
# ──────────────────────────────────────────────────────────────
async def run_worker(app_state) -> None:
print("[JobWorker] started")
while True:
try:
job_id = await cache.rpop(QUEUE_KEY)
if not job_id:
await asyncio.sleep(2)
continue
print(f"[JobWorker] picked up job {job_id}")
cached = await cache.get_json(f"job:{job_id}")
if not cached:
print(f"[JobWorker] job {job_id} not found in Redis — skipping")
continue
payload = cached.get("payload", {})
await _execute_upload_job(job_id, payload, app_state)
except asyncio.CancelledError:
print("[JobWorker] cancelled — shutting down")
break
except Exception as e:
print(f"[JobWorker] unhandled error: {e}")
await asyncio.sleep(5)
async def _execute_upload_job(job_id: str, payload: dict, app_state) -> None:
from src.services.db_client import pinecone_pool, ensure_indexes
from src.api.upload import _process_one_file, _batch_upsert_all
files_data: list[dict] = payload.get("files_data", [])
folder: str = payload.get("folder", "uncategorized")
detect_faces: bool = payload.get("detect_faces", True)
user_id: str = payload.get("user_id", "anonymous")
keys: dict = payload.get("keys", {})
total = len(files_data)
print(f"[JobWorker] executing job {job_id}: {total} files in '{folder}'")
try:
pc = pinecone_pool.get(keys["pinecone_key"])
created = await asyncio.to_thread(ensure_indexes, pc)
if created:
await asyncio.sleep(8)
CHUNK = 10
all_results = []
processed = 0
for chunk_start in range(0, total, CHUNK):
chunk = files_data[chunk_start:chunk_start + CHUNK]
chunk_results = await asyncio.gather(*[
_process_one_file(
file_bytes=bytes(f["bytes"]),
folder=folder,
detect_faces=detect_faces,
keys=keys,
ai=app_state.ai,
sem=app_state.ai_semaphore,
)
for f in chunk
])
all_results.extend(chunk_results)
processed += len(chunk)
await update_job_progress(job_id, processed, total)
print(f"[JobWorker] job {job_id}: {processed}/{total} processed")
summary = await _batch_upsert_all(results=all_results, folder=folder, pc=pc)
await complete_job(job_id, {
"files": len(summary["uploaded_urls"]),
"urls": summary["uploaded_urls"],
"summary": {
"arcface_vecs": summary["arcface_vecs"],
"adaface_vecs": summary["adaface_vecs"],
"object_vecs": summary["object_vecs"],
},
})
print(f"[JobWorker] job {job_id} COMPLETED ✓")
except Exception as e:
print(f"[JobWorker] job {job_id} FAILED: {e}")
import traceback
traceback.print_exc()
await fail_job(job_id, str(e))
# ──────────────────────────────────────────────────────────────
# Utility
# ──────────────────────────────────────────────────────────────
def _iso_now() -> str:
from datetime import datetime, timezone
return datetime.now(timezone.utc).isoformat()