""" src/services/jobs.py — Phase 3: Async upload job queue """ import asyncio import json import uuid from typing import Any, Optional import aiohttp from src.core.config import ( SUPABASE_URL, SUPABASE_SERVICE_KEY, USE_ASYNC_UPLOADS, ) from src.services.cache import cache QUEUE_KEY = "upload_jobs_queue" JOB_TTL = 86400 # 24 h # ────────────────────────────────────────────────────────────── # Supabase helpers # ────────────────────────────────────────────────────────────── def _supa_headers() -> dict: return { "apikey": SUPABASE_SERVICE_KEY, "Authorization": f"Bearer {SUPABASE_SERVICE_KEY}", "Content-Type": "application/json", "Prefer": "return=minimal", # FIX: was "return=representation" which requires 200 not 201 } def _clean_row(row: dict) -> dict: """ Remove None values before sending to Supabase. Supabase REST rejects Python None in JSON — omit the key entirely and let Postgres use the column default instead. """ return {k: v for k, v in row.items() if v is not None} async def _supa_insert(table: str, row: dict) -> bool: """Returns True on success, False on failure. Logs errors explicitly.""" if not SUPABASE_URL or not SUPABASE_SERVICE_KEY: print(f"[Supabase] SUPABASE_URL or SUPABASE_SERVICE_KEY not set — skipping insert") return False url = f"{SUPABASE_URL}/rest/v1/{table}" clean = _clean_row(row) try: async with aiohttp.ClientSession() as s: async with s.post(url, headers=_supa_headers(), json=clean) as r: if r.status in (200, 201): return True else: body = await r.text() print(f"[Supabase INSERT ERROR] table={table} status={r.status} body={body[:500]}") print(f"[Supabase INSERT ERROR] row_keys={list(clean.keys())}") return False except Exception as e: print(f"[Supabase INSERT EXCEPTION] table={table} error={e}") return False async def _supa_patch(table: str, job_id: str, patch: dict) -> bool: if not SUPABASE_URL or not SUPABASE_SERVICE_KEY: return False url = f"{SUPABASE_URL}/rest/v1/{table}?job_id=eq.{job_id}" clean = _clean_row(patch) try: async with aiohttp.ClientSession() as s: async with s.patch(url, headers=_supa_headers(), json=clean) as r: if r.status not in (200, 201, 204): body = await r.text() print(f"[Supabase PATCH ERROR] job_id={job_id} status={r.status} body={body[:300]}") return False return True except Exception as e: print(f"[Supabase PATCH EXCEPTION] job_id={job_id} error={e}") return False async def _supa_get(table: str, job_id: str) -> Optional[dict]: if not SUPABASE_URL or not SUPABASE_SERVICE_KEY: return None # Use select= to avoid ambiguity with column names url = f"{SUPABASE_URL}/rest/v1/{table}?select=*&job_id=eq.{job_id}&limit=1" headers = {**_supa_headers(), "Prefer": "return=representation"} try: async with aiohttp.ClientSession() as s: async with s.get(url, headers=headers) as r: if r.status == 200: data = await r.json() return data[0] if data else None except Exception as e: print(f"[Supabase GET EXCEPTION] job_id={job_id} error={e}") return None # ────────────────────────────────────────────────────────────── # Public API # ────────────────────────────────────────────────────────────── async def create_job( user_id: str, folder: str, total_files: int, job_payload: dict, ) -> str: job_id = str(uuid.uuid4()) # FIX: Only send columns that definitely exist in the table. # Do NOT send result/error (null) — let Postgres default them. # Do NOT send created_at/updated_at if table has DEFAULT NOW(). row = { "job_id": job_id, "user_id": user_id or "anonymous", "folder": folder, "status": "pending", "total_files": total_files, "processed_files": 0, } success = await _supa_insert("upload_jobs", row) if success: print(f"[Jobs] Created job {job_id} in Supabase ✓") else: print(f"[Jobs] Supabase insert FAILED for job {job_id} — job will still work via Redis") # Redis is the source of truth for polling — works even if Supabase fails await cache.set_json( f"job:{job_id}", {**row, "payload": job_payload, "status": "pending"}, ttl=JOB_TTL, ) await cache.lpush(QUEUE_KEY, job_id) return job_id async def get_job_status(job_id: str) -> Optional[dict]: # Check Redis first (fast path) cached = await cache.get_json(f"job:{job_id}") if cached: cached.pop("payload", None) return cached # Fallback to Supabase return await _supa_get("upload_jobs", job_id) async def update_job_progress(job_id: str, processed: int, total: int) -> None: patch = { "status": "processing", "processed_files": processed, } await _supa_patch("upload_jobs", job_id, patch) cached = await cache.get_json(f"job:{job_id}") or {} cached.update(patch) await cache.set_json(f"job:{job_id}", cached, ttl=JOB_TTL) async def complete_job(job_id: str, result: dict) -> None: patch = { "status": "completed", "processed_files": result.get("files", 0), "result": json.dumps(result), # JSONB column — serialize explicitly } await _supa_patch("upload_jobs", job_id, patch) cached = await cache.get_json(f"job:{job_id}") or {} cached.update({**patch, "result": result}) # keep as dict in Redis cached.pop("payload", None) await cache.set_json(f"job:{job_id}", cached, ttl=JOB_TTL) async def fail_job(job_id: str, error: str) -> None: patch = { "status": "failed", "error": str(error)[:500], # truncate to avoid DB limit issues } await _supa_patch("upload_jobs", job_id, patch) cached = await cache.get_json(f"job:{job_id}") or {} cached.update(patch) cached.pop("payload", None) await cache.set_json(f"job:{job_id}", cached, ttl=JOB_TTL) # ────────────────────────────────────────────────────────────── # Background worker # ────────────────────────────────────────────────────────────── async def run_worker(app_state) -> None: print("[JobWorker] started") while True: try: job_id = await cache.rpop(QUEUE_KEY) if not job_id: await asyncio.sleep(2) continue print(f"[JobWorker] picked up job {job_id}") cached = await cache.get_json(f"job:{job_id}") if not cached: print(f"[JobWorker] job {job_id} not found in Redis — skipping") continue payload = cached.get("payload", {}) await _execute_upload_job(job_id, payload, app_state) except asyncio.CancelledError: print("[JobWorker] cancelled — shutting down") break except Exception as e: print(f"[JobWorker] unhandled error: {e}") await asyncio.sleep(5) async def _execute_upload_job(job_id: str, payload: dict, app_state) -> None: from src.services.db_client import pinecone_pool, ensure_indexes from src.api.upload import _process_one_file, _batch_upsert_all files_data: list[dict] = payload.get("files_data", []) folder: str = payload.get("folder", "uncategorized") detect_faces: bool = payload.get("detect_faces", True) user_id: str = payload.get("user_id", "anonymous") keys: dict = payload.get("keys", {}) total = len(files_data) print(f"[JobWorker] executing job {job_id}: {total} files in '{folder}'") try: pc = pinecone_pool.get(keys["pinecone_key"]) created = await asyncio.to_thread(ensure_indexes, pc) if created: await asyncio.sleep(8) CHUNK = 10 all_results = [] processed = 0 for chunk_start in range(0, total, CHUNK): chunk = files_data[chunk_start:chunk_start + CHUNK] chunk_results = await asyncio.gather(*[ _process_one_file( file_bytes=bytes(f["bytes"]), folder=folder, detect_faces=detect_faces, keys=keys, ai=app_state.ai, sem=app_state.ai_semaphore, ) for f in chunk ]) all_results.extend(chunk_results) processed += len(chunk) await update_job_progress(job_id, processed, total) print(f"[JobWorker] job {job_id}: {processed}/{total} processed") summary = await _batch_upsert_all(results=all_results, folder=folder, pc=pc) await complete_job(job_id, { "files": len(summary["uploaded_urls"]), "urls": summary["uploaded_urls"], "summary": { "arcface_vecs": summary["arcface_vecs"], "adaface_vecs": summary["adaface_vecs"], "object_vecs": summary["object_vecs"], }, }) print(f"[JobWorker] job {job_id} COMPLETED ✓") except Exception as e: print(f"[JobWorker] job {job_id} FAILED: {e}") import traceback traceback.print_exc() await fail_job(job_id, str(e)) # ────────────────────────────────────────────────────────────── # Utility # ────────────────────────────────────────────────────────────── def _iso_now() -> str: from datetime import datetime, timezone return datetime.now(timezone.utc).isoformat()