# Async Job Queue for Long-Running ML Inference **Status**: APPROVED **Created**: 2025-12-12 **Author**: Claude Code Audit --- ## Executive Summary HuggingFace Spaces has a ~60-second gateway timeout that cannot be bypassed through configuration. DeepISLES ML inference typically takes 30-60 seconds, creating intermittent 504 Gateway Timeout errors. This spec defines a robust async job queue system that eliminates timeout issues by immediately returning a job ID and using client-side polling for status/results. ## Problem Statement ### Current Architecture (Synchronous) ``` Frontend Backend ML Inference | | | |--POST /api/segment------->| | | |--run_pipeline_on_case()--->| | | | | (30-60s wait) | (processing) | | | | | |<---result------------------| |<--200 OK + JSON-----------| | ``` **Problem**: HF Spaces proxy times out at ~60s, killing the connection before the ML inference completes. The response is lost even though processing succeeds. ### Target Architecture (Async with Polling) ``` Frontend Backend ML Inference | | | |--POST /api/segment------->| | |<--202 Accepted + job_id---| | | |--BackgroundTask----------->| | | | |--GET /api/jobs/{id}------>| (processing) | |<--200 {status: running}---| | | | | |--GET /api/jobs/{id}------>| | |<--200 {status: running}---| | | |<---result------------------| |--GET /api/jobs/{id}------>| | |<--200 {status: completed, | | | result: {...}}-----| | ``` **Solution**: Initial request returns in <1s. Polling requests are fast (<100ms). No single request exceeds the proxy timeout. ## Technical Design ### 1. Backend Job Store In-memory dictionary storing job state. This is appropriate because: - HF Spaces runs a single uvicorn worker (no multi-worker sync needed) - Jobs are ephemeral (results cached, cleanup after 1 hour) - No external dependencies (Redis, DB) required ```python from dataclasses import dataclass, field from datetime import datetime from enum import Enum from typing import Any class JobStatus(str, Enum): PENDING = "pending" # Job created, not started RUNNING = "running" # Inference in progress COMPLETED = "completed" # Success, results available FAILED = "failed" # Error occurred @dataclass class Job: id: str status: JobStatus case_id: str fast_mode: bool created_at: datetime started_at: datetime | None = None completed_at: datetime | None = None progress: int = 0 # 0-100 percentage progress_message: str = "" result: dict[str, Any] | None = None error: str | None = None # Thread-safe job store (single writer pattern) jobs: dict[str, Job] = {} ``` ### 2. API Endpoints #### POST /api/segment (Modified) Returns immediately with job ID. **Request**: Same as before ```json { "case_id": "sub-strokecase0001", "fast_mode": true } ``` **Response**: 202 Accepted ```json { "jobId": "a1b2c3d4", "status": "pending", "message": "Segmentation job queued" } ``` #### GET /api/jobs/{job_id} Poll for job status and results. **Response (Running)**: ```json { "jobId": "a1b2c3d4", "status": "running", "progress": 45, "progressMessage": "Running DeepISLES inference...", "elapsedSeconds": 23.5 } ``` **Response (Completed)**: ```json { "jobId": "a1b2c3d4", "status": "completed", "progress": 100, "progressMessage": "Segmentation complete", "elapsedSeconds": 42.3, "result": { "caseId": "sub-strokecase0001", "diceScore": 0.847, "volumeMl": 12.34, "dwiUrl": "https://...hf.space/files/a1b2c3d4/...", "predictionUrl": "https://...hf.space/files/a1b2c3d4/..." } } ``` **Response (Failed)**: ```json { "jobId": "a1b2c3d4", "status": "failed", "progress": 0, "progressMessage": "Error occurred", "elapsedSeconds": 5.2, "error": "Case not found: sub-invalid" } ``` **Response (Not Found)**: 404 ```json { "detail": "Job not found: xyz123" } ``` ### 3. Background Task Execution ```python from fastapi import BackgroundTasks @router.post("/segment", response_model=SegmentJobResponse, status_code=202) def create_segment_job( request: Request, body: SegmentRequest, background_tasks: BackgroundTasks ) -> SegmentJobResponse: """Create a segmentation job and return immediately.""" job_id = str(uuid.uuid4())[:8] # Create job record job = Job( id=job_id, status=JobStatus.PENDING, case_id=body.case_id, fast_mode=body.fast_mode, created_at=datetime.now(), ) jobs[job_id] = job # Queue background task background_tasks.add_task( run_segmentation_job, job_id=job_id, case_id=body.case_id, fast_mode=body.fast_mode, backend_url=get_backend_base_url(request), ) return SegmentJobResponse( jobId=job_id, status=JobStatus.PENDING, message="Segmentation job queued", ) ``` ### 4. Job Execution with Progress Updates ```python def run_segmentation_job( job_id: str, case_id: str, fast_mode: bool, backend_url: str, ) -> None: """Execute segmentation in background thread.""" job = jobs.get(job_id) if not job: return try: # Mark as running job.status = JobStatus.RUNNING job.started_at = datetime.now() job.progress = 10 job.progress_message = "Loading case data..." # Run inference with progress callbacks output_dir = RESULTS_BASE / job_id job.progress = 20 job.progress_message = "Staging files for DeepISLES..." result = run_pipeline_on_case( case_id, output_dir=output_dir, fast=fast_mode, compute_dice=True, cleanup_staging=True, # Future: pass progress_callback for finer updates ) job.progress = 90 job.progress_message = "Computing metrics..." # Compute volume volume_ml = None with contextlib.suppress(Exception): volume_ml = round(compute_volume_ml(result.prediction_mask, threshold=0.5), 2) # Build result job.progress = 100 job.progress_message = "Segmentation complete" job.status = JobStatus.COMPLETED job.completed_at = datetime.now() job.result = { "caseId": result.case_id, "diceScore": result.dice_score, "volumeMl": volume_ml, "elapsedSeconds": round(result.elapsed_seconds, 2), "dwiUrl": f"{backend_url}/files/{job_id}/{result.case_id}/{result.input_files['dwi'].name}", "predictionUrl": f"{backend_url}/files/{job_id}/{result.case_id}/{result.prediction_mask.name}", } except Exception as e: job.status = JobStatus.FAILED job.completed_at = datetime.now() job.error = str(e) job.progress_message = "Error occurred" ``` ### 5. Job Cleanup (Memory Management) ```python import threading from datetime import timedelta JOB_TTL = timedelta(hours=1) # Keep completed jobs for 1 hour def cleanup_old_jobs() -> None: """Remove jobs older than TTL to prevent memory leaks.""" now = datetime.now() expired = [ job_id for job_id, job in jobs.items() if job.completed_at and (now - job.completed_at) > JOB_TTL ] for job_id in expired: # Also cleanup result files result_dir = RESULTS_BASE / job_id if result_dir.exists(): shutil.rmtree(result_dir, ignore_errors=True) del jobs[job_id] # Run cleanup every 10 minutes def start_cleanup_scheduler(): def run(): while True: time.sleep(600) # 10 minutes cleanup_old_jobs() thread = threading.Thread(target=run, daemon=True) thread.start() ``` ### 6. Frontend Polling Hook ```typescript // hooks/useJobPolling.ts import { useState, useEffect, useCallback, useRef } from 'react' import { apiClient, JobStatus, JobStatusResponse } from '../api/client' interface UseJobPollingOptions { pollingInterval?: number // ms, default 2000 onComplete?: (result: SegmentationResult) => void onError?: (error: string) => void } export function useJobPolling(options: UseJobPollingOptions = {}) { const { pollingInterval = 2000, onComplete, onError } = options const [jobId, setJobId] = useState(null) const [status, setStatus] = useState(null) const [progress, setProgress] = useState(0) const [progressMessage, setProgressMessage] = useState('') const [error, setError] = useState(null) const [isPolling, setIsPolling] = useState(false) const intervalRef = useRef(null) const onCompleteRef = useRef(onComplete) const onErrorRef = useRef(onError) // Keep callbacks current useEffect(() => { onCompleteRef.current = onComplete onErrorRef.current = onError }) const stopPolling = useCallback(() => { if (intervalRef.current) { clearInterval(intervalRef.current) intervalRef.current = null } setIsPolling(false) }, []) const pollJobStatus = useCallback(async (id: string) => { try { const response = await apiClient.getJobStatus(id) setStatus(response.status) setProgress(response.progress) setProgressMessage(response.progressMessage) if (response.status === 'completed' && response.result) { stopPolling() onCompleteRef.current?.(response.result) } else if (response.status === 'failed') { stopPolling() setError(response.error || 'Job failed') onErrorRef.current?.(response.error || 'Job failed') } } catch (err) { // Don't stop polling on network errors - might be transient console.warn('Polling error:', err) } }, [stopPolling]) const startJob = useCallback(async (caseId: string, fastMode = true) => { // Reset state setError(null) setProgress(0) setProgressMessage('Starting...') setStatus('pending') try { // Create job const response = await apiClient.createSegmentJob(caseId, fastMode) setJobId(response.jobId) setStatus(response.status) // Start polling setIsPolling(true) intervalRef.current = window.setInterval( () => pollJobStatus(response.jobId), pollingInterval ) // Initial poll await pollJobStatus(response.jobId) } catch (err) { const message = err instanceof Error ? err.message : 'Failed to start job' setError(message) onErrorRef.current?.(message) } }, [pollingInterval, pollJobStatus]) // Cleanup on unmount useEffect(() => { return () => { if (intervalRef.current) { clearInterval(intervalRef.current) } } }, []) return { jobId, status, progress, progressMessage, error, isPolling, startJob, stopPolling, } } ``` ### 7. Frontend API Client Extensions ```typescript // api/client.ts additions export type JobStatus = 'pending' | 'running' | 'completed' | 'failed' export interface CreateJobResponse { jobId: string status: JobStatus message: string } export interface JobStatusResponse { jobId: string status: JobStatus progress: number progressMessage: string elapsedSeconds?: number result?: SegmentResponse error?: string } class ApiClient { // ... existing methods ... async createSegmentJob( caseId: string, fastMode: boolean = true, signal?: AbortSignal ): Promise { const response = await fetch(`${this.baseUrl}/api/segment`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ case_id: caseId, fast_mode: fastMode }), signal, }) if (!response.ok) { const error = await response.json().catch(() => ({})) throw new ApiError( `Failed to create job: ${error.detail || response.statusText}`, response.status, error.detail ) } return response.json() } async getJobStatus(jobId: string, signal?: AbortSignal): Promise { const response = await fetch(`${this.baseUrl}/api/jobs/${jobId}`, { signal }) if (response.status === 404) { throw new ApiError('Job not found', 404) } if (!response.ok) { const error = await response.json().catch(() => ({})) throw new ApiError( `Failed to get job status: ${error.detail || response.statusText}`, response.status, error.detail ) } return response.json() } } ``` ### 8. UI Progress Display ```tsx // components/ProgressIndicator.tsx interface ProgressIndicatorProps { progress: number message: string status: JobStatus } export function ProgressIndicator({ progress, message, status }: ProgressIndicatorProps) { return (
{message} {progress}%
) } ``` ## Implementation Checklist ### Backend - [ ] Create `job_store.py` with Job dataclass and jobs dict - [ ] Create Pydantic schemas for job responses - [ ] Modify POST /api/segment to return 202 with job ID - [ ] Add GET /api/jobs/{job_id} endpoint - [ ] Implement background task execution with progress updates - [ ] Add job cleanup scheduler - [ ] Update CORS if needed for new endpoint ### Frontend - [ ] Add job-related types to `types/index.ts` - [ ] Add API client methods for job creation and polling - [ ] Create `useJobPolling` hook - [ ] Create `ProgressIndicator` component - [ ] Update `useSegmentation` to use job polling - [ ] Update `App.tsx` to show progress during processing ### Testing - [ ] Unit tests for job store - [ ] Unit tests for job endpoints - [ ] Unit tests for useJobPolling hook - [ ] E2E test for full job flow - [ ] Manual test on HF Spaces deployment ### Documentation - [ ] Update API documentation - [ ] Update bug tracker with resolution - [ ] Add architecture diagram ## Migration Strategy 1. **Backend**: Add new endpoints alongside existing. Keep old `/api/segment` temporarily for backwards compatibility (marked deprecated). 2. **Frontend**: Update to use new job polling system. Old sync behavior removed. 3. **Testing**: Verify on HF Spaces before removing deprecated endpoint. 4. **Cleanup**: Remove deprecated sync endpoint after validation. ## Performance Considerations | Metric | Before (Sync) | After (Async) | |--------|--------------|---------------| | Initial response time | 30-60s | <1s | | Total request count | 1 | ~15-30 (polling) | | Timeout risk | HIGH | NONE | | User feedback | None during wait | Progress updates | | Network efficiency | 1 large response | Many small responses | ## Alternatives Considered ### 1. SSE (Server-Sent Events) - **Pros**: Real-time updates, single connection - **Cons**: Connection stays open (could still timeout), HF proxy issues possible - **Decision**: Polling is more robust for HF Spaces constraints ### 2. WebSockets - **Pros**: Bi-directional, real-time - **Cons**: Known 404 issues on HF Spaces, complex - **Decision**: Not viable on HF Spaces ### 3. Redis/Celery - **Pros**: Production-grade, multi-worker support - **Cons**: Not available on HF Spaces Docker - **Decision**: In-memory sufficient for single-worker ## References - [FastAPI Background Tasks](https://fastapi.tiangolo.com/tutorial/background-tasks/) - [FastAPI Polling Strategy for Long-Running Tasks](https://openillumi.com/en/en-fastapi-long-task-progress-polling/) - [Managing Background Tasks in FastAPI](https://leapcell.io/blog/managing-background-tasks-and-long-running-operations-in-fastapi) - [Real Time Polling in React Query 2025](https://samwithcode.in/tutorial/react-js/real-time-polling-in-react-query-2025) - [504 Gateway Timeout - HF Forums](https://discuss.huggingface.co/t/504-gateway-timeout-with-http-request/24018)