stroke-viewer-frontend / docs /specs /async-job-queue.md
VibecoderMcSwaggins's picture
feat(api): async job queue with comprehensive test coverage (#36)
722753e unverified
|
raw
history blame
17.2 kB
# Async Job Queue for Long-Running ML Inference
**Status**: APPROVED
**Created**: 2025-12-12
**Author**: Claude Code Audit
---
## Executive Summary
HuggingFace Spaces has a ~60-second gateway timeout that cannot be bypassed through
configuration. DeepISLES ML inference typically takes 30-60 seconds, creating
intermittent 504 Gateway Timeout errors. This spec defines a robust async job queue
system that eliminates timeout issues by immediately returning a job ID and using
client-side polling for status/results.
## Problem Statement
### Current Architecture (Synchronous)
```
Frontend Backend ML Inference
| | |
|--POST /api/segment------->| |
| |--run_pipeline_on_case()--->|
| | |
| (30-60s wait) | (processing) |
| | |
| |<---result------------------|
|<--200 OK + JSON-----------| |
```
**Problem**: HF Spaces proxy times out at ~60s, killing the connection before
the ML inference completes. The response is lost even though processing succeeds.
### Target Architecture (Async with Polling)
```
Frontend Backend ML Inference
| | |
|--POST /api/segment------->| |
|<--202 Accepted + job_id---| |
| |--BackgroundTask----------->|
| | |
|--GET /api/jobs/{id}------>| (processing) |
|<--200 {status: running}---| |
| | |
|--GET /api/jobs/{id}------>| |
|<--200 {status: running}---| |
| |<---result------------------|
|--GET /api/jobs/{id}------>| |
|<--200 {status: completed, | |
| result: {...}}-----| |
```
**Solution**: Initial request returns in <1s. Polling requests are fast (<100ms).
No single request exceeds the proxy timeout.
## Technical Design
### 1. Backend Job Store
In-memory dictionary storing job state. This is appropriate because:
- HF Spaces runs a single uvicorn worker (no multi-worker sync needed)
- Jobs are ephemeral (results cached, cleanup after 1 hour)
- No external dependencies (Redis, DB) required
```python
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any
class JobStatus(str, Enum):
PENDING = "pending" # Job created, not started
RUNNING = "running" # Inference in progress
COMPLETED = "completed" # Success, results available
FAILED = "failed" # Error occurred
@dataclass
class Job:
id: str
status: JobStatus
case_id: str
fast_mode: bool
created_at: datetime
started_at: datetime | None = None
completed_at: datetime | None = None
progress: int = 0 # 0-100 percentage
progress_message: str = ""
result: dict[str, Any] | None = None
error: str | None = None
# Thread-safe job store (single writer pattern)
jobs: dict[str, Job] = {}
```
### 2. API Endpoints
#### POST /api/segment (Modified)
Returns immediately with job ID.
**Request**: Same as before
```json
{
"case_id": "sub-strokecase0001",
"fast_mode": true
}
```
**Response**: 202 Accepted
```json
{
"jobId": "a1b2c3d4",
"status": "pending",
"message": "Segmentation job queued"
}
```
#### GET /api/jobs/{job_id}
Poll for job status and results.
**Response (Running)**:
```json
{
"jobId": "a1b2c3d4",
"status": "running",
"progress": 45,
"progressMessage": "Running DeepISLES inference...",
"elapsedSeconds": 23.5
}
```
**Response (Completed)**:
```json
{
"jobId": "a1b2c3d4",
"status": "completed",
"progress": 100,
"progressMessage": "Segmentation complete",
"elapsedSeconds": 42.3,
"result": {
"caseId": "sub-strokecase0001",
"diceScore": 0.847,
"volumeMl": 12.34,
"dwiUrl": "https://...hf.space/files/a1b2c3d4/...",
"predictionUrl": "https://...hf.space/files/a1b2c3d4/..."
}
}
```
**Response (Failed)**:
```json
{
"jobId": "a1b2c3d4",
"status": "failed",
"progress": 0,
"progressMessage": "Error occurred",
"elapsedSeconds": 5.2,
"error": "Case not found: sub-invalid"
}
```
**Response (Not Found)**: 404
```json
{
"detail": "Job not found: xyz123"
}
```
### 3. Background Task Execution
```python
from fastapi import BackgroundTasks
@router.post("/segment", response_model=SegmentJobResponse, status_code=202)
def create_segment_job(
request: Request,
body: SegmentRequest,
background_tasks: BackgroundTasks
) -> SegmentJobResponse:
"""Create a segmentation job and return immediately."""
job_id = str(uuid.uuid4())[:8]
# Create job record
job = Job(
id=job_id,
status=JobStatus.PENDING,
case_id=body.case_id,
fast_mode=body.fast_mode,
created_at=datetime.now(),
)
jobs[job_id] = job
# Queue background task
background_tasks.add_task(
run_segmentation_job,
job_id=job_id,
case_id=body.case_id,
fast_mode=body.fast_mode,
backend_url=get_backend_base_url(request),
)
return SegmentJobResponse(
jobId=job_id,
status=JobStatus.PENDING,
message="Segmentation job queued",
)
```
### 4. Job Execution with Progress Updates
```python
def run_segmentation_job(
job_id: str,
case_id: str,
fast_mode: bool,
backend_url: str,
) -> None:
"""Execute segmentation in background thread."""
job = jobs.get(job_id)
if not job:
return
try:
# Mark as running
job.status = JobStatus.RUNNING
job.started_at = datetime.now()
job.progress = 10
job.progress_message = "Loading case data..."
# Run inference with progress callbacks
output_dir = RESULTS_BASE / job_id
job.progress = 20
job.progress_message = "Staging files for DeepISLES..."
result = run_pipeline_on_case(
case_id,
output_dir=output_dir,
fast=fast_mode,
compute_dice=True,
cleanup_staging=True,
# Future: pass progress_callback for finer updates
)
job.progress = 90
job.progress_message = "Computing metrics..."
# Compute volume
volume_ml = None
with contextlib.suppress(Exception):
volume_ml = round(compute_volume_ml(result.prediction_mask, threshold=0.5), 2)
# Build result
job.progress = 100
job.progress_message = "Segmentation complete"
job.status = JobStatus.COMPLETED
job.completed_at = datetime.now()
job.result = {
"caseId": result.case_id,
"diceScore": result.dice_score,
"volumeMl": volume_ml,
"elapsedSeconds": round(result.elapsed_seconds, 2),
"dwiUrl": f"{backend_url}/files/{job_id}/{result.case_id}/{result.input_files['dwi'].name}",
"predictionUrl": f"{backend_url}/files/{job_id}/{result.case_id}/{result.prediction_mask.name}",
}
except Exception as e:
job.status = JobStatus.FAILED
job.completed_at = datetime.now()
job.error = str(e)
job.progress_message = "Error occurred"
```
### 5. Job Cleanup (Memory Management)
```python
import threading
from datetime import timedelta
JOB_TTL = timedelta(hours=1) # Keep completed jobs for 1 hour
def cleanup_old_jobs() -> None:
"""Remove jobs older than TTL to prevent memory leaks."""
now = datetime.now()
expired = [
job_id for job_id, job in jobs.items()
if job.completed_at and (now - job.completed_at) > JOB_TTL
]
for job_id in expired:
# Also cleanup result files
result_dir = RESULTS_BASE / job_id
if result_dir.exists():
shutil.rmtree(result_dir, ignore_errors=True)
del jobs[job_id]
# Run cleanup every 10 minutes
def start_cleanup_scheduler():
def run():
while True:
time.sleep(600) # 10 minutes
cleanup_old_jobs()
thread = threading.Thread(target=run, daemon=True)
thread.start()
```
### 6. Frontend Polling Hook
```typescript
// hooks/useJobPolling.ts
import { useState, useEffect, useCallback, useRef } from 'react'
import { apiClient, JobStatus, JobStatusResponse } from '../api/client'
interface UseJobPollingOptions {
pollingInterval?: number // ms, default 2000
onComplete?: (result: SegmentationResult) => void
onError?: (error: string) => void
}
export function useJobPolling(options: UseJobPollingOptions = {}) {
const { pollingInterval = 2000, onComplete, onError } = options
const [jobId, setJobId] = useState<string | null>(null)
const [status, setStatus] = useState<JobStatus | null>(null)
const [progress, setProgress] = useState(0)
const [progressMessage, setProgressMessage] = useState('')
const [error, setError] = useState<string | null>(null)
const [isPolling, setIsPolling] = useState(false)
const intervalRef = useRef<number | null>(null)
const onCompleteRef = useRef(onComplete)
const onErrorRef = useRef(onError)
// Keep callbacks current
useEffect(() => {
onCompleteRef.current = onComplete
onErrorRef.current = onError
})
const stopPolling = useCallback(() => {
if (intervalRef.current) {
clearInterval(intervalRef.current)
intervalRef.current = null
}
setIsPolling(false)
}, [])
const pollJobStatus = useCallback(async (id: string) => {
try {
const response = await apiClient.getJobStatus(id)
setStatus(response.status)
setProgress(response.progress)
setProgressMessage(response.progressMessage)
if (response.status === 'completed' && response.result) {
stopPolling()
onCompleteRef.current?.(response.result)
} else if (response.status === 'failed') {
stopPolling()
setError(response.error || 'Job failed')
onErrorRef.current?.(response.error || 'Job failed')
}
} catch (err) {
// Don't stop polling on network errors - might be transient
console.warn('Polling error:', err)
}
}, [stopPolling])
const startJob = useCallback(async (caseId: string, fastMode = true) => {
// Reset state
setError(null)
setProgress(0)
setProgressMessage('Starting...')
setStatus('pending')
try {
// Create job
const response = await apiClient.createSegmentJob(caseId, fastMode)
setJobId(response.jobId)
setStatus(response.status)
// Start polling
setIsPolling(true)
intervalRef.current = window.setInterval(
() => pollJobStatus(response.jobId),
pollingInterval
)
// Initial poll
await pollJobStatus(response.jobId)
} catch (err) {
const message = err instanceof Error ? err.message : 'Failed to start job'
setError(message)
onErrorRef.current?.(message)
}
}, [pollingInterval, pollJobStatus])
// Cleanup on unmount
useEffect(() => {
return () => {
if (intervalRef.current) {
clearInterval(intervalRef.current)
}
}
}, [])
return {
jobId,
status,
progress,
progressMessage,
error,
isPolling,
startJob,
stopPolling,
}
}
```
### 7. Frontend API Client Extensions
```typescript
// api/client.ts additions
export type JobStatus = 'pending' | 'running' | 'completed' | 'failed'
export interface CreateJobResponse {
jobId: string
status: JobStatus
message: string
}
export interface JobStatusResponse {
jobId: string
status: JobStatus
progress: number
progressMessage: string
elapsedSeconds?: number
result?: SegmentResponse
error?: string
}
class ApiClient {
// ... existing methods ...
async createSegmentJob(
caseId: string,
fastMode: boolean = true,
signal?: AbortSignal
): Promise<CreateJobResponse> {
const response = await fetch(`${this.baseUrl}/api/segment`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ case_id: caseId, fast_mode: fastMode }),
signal,
})
if (!response.ok) {
const error = await response.json().catch(() => ({}))
throw new ApiError(
`Failed to create job: ${error.detail || response.statusText}`,
response.status,
error.detail
)
}
return response.json()
}
async getJobStatus(jobId: string, signal?: AbortSignal): Promise<JobStatusResponse> {
const response = await fetch(`${this.baseUrl}/api/jobs/${jobId}`, { signal })
if (response.status === 404) {
throw new ApiError('Job not found', 404)
}
if (!response.ok) {
const error = await response.json().catch(() => ({}))
throw new ApiError(
`Failed to get job status: ${error.detail || response.statusText}`,
response.status,
error.detail
)
}
return response.json()
}
}
```
### 8. UI Progress Display
```tsx
// components/ProgressIndicator.tsx
interface ProgressIndicatorProps {
progress: number
message: string
status: JobStatus
}
export function ProgressIndicator({ progress, message, status }: ProgressIndicatorProps) {
return (
<div className="bg-gray-800 rounded-lg p-4 space-y-3">
<div className="flex justify-between text-sm">
<span className="text-gray-400">{message}</span>
<span className="text-gray-300">{progress}%</span>
</div>
<div className="w-full bg-gray-700 rounded-full h-2">
<div
className={`h-2 rounded-full transition-all duration-300 ${
status === 'failed' ? 'bg-red-500' : 'bg-blue-500'
}`}
style={{ width: `${progress}%` }}
/>
</div>
</div>
)
}
```
## Implementation Checklist
### Backend
- [ ] Create `job_store.py` with Job dataclass and jobs dict
- [ ] Create Pydantic schemas for job responses
- [ ] Modify POST /api/segment to return 202 with job ID
- [ ] Add GET /api/jobs/{job_id} endpoint
- [ ] Implement background task execution with progress updates
- [ ] Add job cleanup scheduler
- [ ] Update CORS if needed for new endpoint
### Frontend
- [ ] Add job-related types to `types/index.ts`
- [ ] Add API client methods for job creation and polling
- [ ] Create `useJobPolling` hook
- [ ] Create `ProgressIndicator` component
- [ ] Update `useSegmentation` to use job polling
- [ ] Update `App.tsx` to show progress during processing
### Testing
- [ ] Unit tests for job store
- [ ] Unit tests for job endpoints
- [ ] Unit tests for useJobPolling hook
- [ ] E2E test for full job flow
- [ ] Manual test on HF Spaces deployment
### Documentation
- [ ] Update API documentation
- [ ] Update bug tracker with resolution
- [ ] Add architecture diagram
## Migration Strategy
1. **Backend**: Add new endpoints alongside existing. Keep old `/api/segment`
temporarily for backwards compatibility (marked deprecated).
2. **Frontend**: Update to use new job polling system. Old sync behavior removed.
3. **Testing**: Verify on HF Spaces before removing deprecated endpoint.
4. **Cleanup**: Remove deprecated sync endpoint after validation.
## Performance Considerations
| Metric | Before (Sync) | After (Async) |
|--------|--------------|---------------|
| Initial response time | 30-60s | <1s |
| Total request count | 1 | ~15-30 (polling) |
| Timeout risk | HIGH | NONE |
| User feedback | None during wait | Progress updates |
| Network efficiency | 1 large response | Many small responses |
## Alternatives Considered
### 1. SSE (Server-Sent Events)
- **Pros**: Real-time updates, single connection
- **Cons**: Connection stays open (could still timeout), HF proxy issues possible
- **Decision**: Polling is more robust for HF Spaces constraints
### 2. WebSockets
- **Pros**: Bi-directional, real-time
- **Cons**: Known 404 issues on HF Spaces, complex
- **Decision**: Not viable on HF Spaces
### 3. Redis/Celery
- **Pros**: Production-grade, multi-worker support
- **Cons**: Not available on HF Spaces Docker
- **Decision**: In-memory sufficient for single-worker
## References
- [FastAPI Background Tasks](https://fastapi.tiangolo.com/tutorial/background-tasks/)
- [FastAPI Polling Strategy for Long-Running Tasks](https://openillumi.com/en/en-fastapi-long-task-progress-polling/)
- [Managing Background Tasks in FastAPI](https://leapcell.io/blog/managing-background-tasks-and-long-running-operations-in-fastapi)
- [Real Time Polling in React Query 2025](https://samwithcode.in/tutorial/react-js/real-time-polling-in-react-query-2025)
- [504 Gateway Timeout - HF Forums](https://discuss.huggingface.co/t/504-gateway-timeout-with-http-request/24018)