MogensR commited on
Commit
01ce34f
·
1 Parent(s): 803d2bf

Create api/api_server.py

Browse files
Files changed (1) hide show
  1. api/api_server.py +805 -0
api/api_server.py ADDED
@@ -0,0 +1,805 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ REST API server for BackgroundFX Pro.
3
+ Provides HTTP endpoints for all processing functionality.
4
+ """
5
+
6
+ from fastapi import FastAPI, File, UploadFile, Form, HTTPException, BackgroundTasks, Depends, status
7
+ from fastapi.responses import FileResponse, StreamingResponse, JSONResponse
8
+ from fastapi.middleware.cors import CORSMiddleware
9
+ from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
10
+ from fastapi.staticfiles import StaticFiles
11
+ from pydantic import BaseModel, Field, validator
12
+ from typing import Dict, List, Optional, Union, Any
13
+ from enum import Enum
14
+ import asyncio
15
+ import aiofiles
16
+ from pathlib import Path
17
+ import tempfile
18
+ import shutil
19
+ import uuid
20
+ import time
21
+ from datetime import datetime, timedelta
22
+ import jwt
23
+ import cv2
24
+ import numpy as np
25
+ import io
26
+ import base64
27
+ from concurrent.futures import ThreadPoolExecutor
28
+ import redis
29
+ from contextlib import asynccontextmanager
30
+
31
+ from ..utils.logger import setup_logger
32
+ from .pipeline import ProcessingPipeline, PipelineConfig, ProcessingMode
33
+ from .video_processor import VideoProcessorAPI, StreamConfig, VideoStreamMode
34
+ from .batch_processor import BatchProcessor, BatchConfig, BatchItem, BatchPriority
35
+
36
+ logger = setup_logger(__name__)
37
+
38
+
39
+ # ============================================================================
40
+ # Configuration and Models
41
+ # ============================================================================
42
+
43
+ class ServerConfig:
44
+ """Server configuration."""
45
+ HOST: str = "0.0.0.0"
46
+ PORT: int = 8000
47
+ UPLOAD_DIR: str = "uploads"
48
+ OUTPUT_DIR: str = "outputs"
49
+ TEMP_DIR: str = "temp"
50
+ MAX_UPLOAD_SIZE: int = 500 * 1024 * 1024 # 500MB
51
+ ALLOWED_EXTENSIONS: List[str] = [".jpg", ".jpeg", ".png", ".mp4", ".avi", ".mov"]
52
+
53
+ # Security
54
+ SECRET_KEY: str = "your-secret-key-change-in-production"
55
+ ALGORITHM: str = "HS256"
56
+ ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
57
+
58
+ # Redis cache
59
+ REDIS_URL: str = "redis://localhost:6379"
60
+ CACHE_TTL: int = 3600 # 1 hour
61
+
62
+ # Rate limiting
63
+ RATE_LIMIT_REQUESTS: int = 100
64
+ RATE_LIMIT_WINDOW: int = 60 # seconds
65
+
66
+ # Processing
67
+ MAX_WORKERS: int = 4
68
+ ENABLE_GPU: bool = True
69
+
70
+
71
+ config = ServerConfig()
72
+
73
+
74
+ # ============================================================================
75
+ # Pydantic Models
76
+ # ============================================================================
77
+
78
+ class BackgroundType(str, Enum):
79
+ """Background types."""
80
+ BLUR = "blur"
81
+ OFFICE = "office"
82
+ GRADIENT = "gradient"
83
+ NATURE = "nature"
84
+ CUSTOM = "custom"
85
+ NONE = "none"
86
+
87
+
88
+ class QualityPreset(str, Enum):
89
+ """Quality presets."""
90
+ LOW = "low"
91
+ MEDIUM = "medium"
92
+ HIGH = "high"
93
+ ULTRA = "ultra"
94
+
95
+
96
+ class ProcessingRequest(BaseModel):
97
+ """Base processing request."""
98
+ background: BackgroundType = BackgroundType.BLUR
99
+ background_url: Optional[str] = None
100
+ quality: QualityPreset = QualityPreset.HIGH
101
+ preserve_original: bool = False
102
+
103
+ class Config:
104
+ schema_extra = {
105
+ "example": {
106
+ "background": "office",
107
+ "quality": "high",
108
+ "preserve_original": False
109
+ }
110
+ }
111
+
112
+
113
+ class ImageProcessingRequest(ProcessingRequest):
114
+ """Image processing request."""
115
+ resize: Optional[tuple[int, int]] = None
116
+ apply_effects: List[str] = Field(default_factory=list)
117
+ output_format: str = "png"
118
+
119
+
120
+ class VideoProcessingRequest(ProcessingRequest):
121
+ """Video processing request."""
122
+ start_time: Optional[float] = None
123
+ end_time: Optional[float] = None
124
+ fps: Optional[float] = None
125
+ resolution: Optional[tuple[int, int]] = None
126
+ codec: str = "h264"
127
+
128
+
129
+ class BatchProcessingRequest(BaseModel):
130
+ """Batch processing request."""
131
+ items: List[Dict[str, Any]]
132
+ parallel: bool = True
133
+ priority: str = "normal"
134
+ callback_url: Optional[str] = None
135
+
136
+
137
+ class StreamingRequest(BaseModel):
138
+ """Streaming request."""
139
+ source: str
140
+ stream_type: str = "webcam"
141
+ output_format: str = "hls"
142
+ quality: QualityPreset = QualityPreset.MEDIUM
143
+
144
+
145
+ class ProcessingResponse(BaseModel):
146
+ """Processing response."""
147
+ job_id: str
148
+ status: str
149
+ progress: float = 0.0
150
+ message: Optional[str] = None
151
+ result_url: Optional[str] = None
152
+ metadata: Dict[str, Any] = Field(default_factory=dict)
153
+ created_at: datetime = Field(default_factory=datetime.now)
154
+ completed_at: Optional[datetime] = None
155
+
156
+
157
+ class JobStatus(BaseModel):
158
+ """Job status response."""
159
+ job_id: str
160
+ status: str
161
+ progress: float
162
+ current_stage: Optional[str] = None
163
+ time_elapsed: float
164
+ time_remaining: Optional[float] = None
165
+ errors: List[str] = Field(default_factory=list)
166
+
167
+
168
+ # ============================================================================
169
+ # Job Management
170
+ # ============================================================================
171
+
172
+ class JobManager:
173
+ """Manage processing jobs."""
174
+
175
+ def __init__(self):
176
+ self.jobs: Dict[str, ProcessingResponse] = {}
177
+ self.executor = ThreadPoolExecutor(max_workers=config.MAX_WORKERS)
178
+ self.redis_client = None
179
+ try:
180
+ self.redis_client = redis.from_url(config.REDIS_URL)
181
+ except:
182
+ logger.warning("Redis not available, using in-memory storage")
183
+
184
+ def create_job(self) -> str:
185
+ """Create new job ID."""
186
+ job_id = str(uuid.uuid4())
187
+ self.jobs[job_id] = ProcessingResponse(
188
+ job_id=job_id,
189
+ status="pending"
190
+ )
191
+ return job_id
192
+
193
+ def update_job(self, job_id: str, **kwargs):
194
+ """Update job status."""
195
+ if job_id in self.jobs:
196
+ for key, value in kwargs.items():
197
+ if hasattr(self.jobs[job_id], key):
198
+ setattr(self.jobs[job_id], key, value)
199
+
200
+ # Store in Redis if available
201
+ if self.redis_client:
202
+ try:
203
+ self.redis_client.setex(
204
+ f"job:{job_id}",
205
+ config.CACHE_TTL,
206
+ self.jobs[job_id].json()
207
+ )
208
+ except:
209
+ pass
210
+
211
+ def get_job(self, job_id: str) -> Optional[ProcessingResponse]:
212
+ """Get job status."""
213
+ # Check memory first
214
+ if job_id in self.jobs:
215
+ return self.jobs[job_id]
216
+
217
+ # Check Redis
218
+ if self.redis_client:
219
+ try:
220
+ data = self.redis_client.get(f"job:{job_id}")
221
+ if data:
222
+ return ProcessingResponse.parse_raw(data)
223
+ except:
224
+ pass
225
+
226
+ return None
227
+
228
+
229
+ # ============================================================================
230
+ # FastAPI Application
231
+ # ============================================================================
232
+
233
+ @asynccontextmanager
234
+ async def lifespan(app: FastAPI):
235
+ """Application lifespan manager."""
236
+ # Startup
237
+ logger.info("Starting BackgroundFX Pro API Server")
238
+
239
+ # Create directories
240
+ for dir_path in [config.UPLOAD_DIR, config.OUTPUT_DIR, config.TEMP_DIR]:
241
+ Path(dir_path).mkdir(parents=True, exist_ok=True)
242
+
243
+ # Initialize processors
244
+ app.state.pipeline = ProcessingPipeline(
245
+ PipelineConfig(use_gpu=config.ENABLE_GPU)
246
+ )
247
+ app.state.video_processor = VideoProcessorAPI()
248
+ app.state.batch_processor = BatchProcessor()
249
+ app.state.job_manager = JobManager()
250
+
251
+ yield
252
+
253
+ # Shutdown
254
+ logger.info("Shutting down BackgroundFX Pro API Server")
255
+ app.state.pipeline.shutdown()
256
+ app.state.video_processor.cleanup()
257
+ app.state.batch_processor.cleanup()
258
+
259
+
260
+ app = FastAPI(
261
+ title="BackgroundFX Pro API",
262
+ description="Professional background removal and replacement API",
263
+ version="1.0.0",
264
+ lifespan=lifespan
265
+ )
266
+
267
+ # CORS middleware
268
+ app.add_middleware(
269
+ CORSMiddleware,
270
+ allow_origins=["*"], # Configure appropriately for production
271
+ allow_credentials=True,
272
+ allow_methods=["*"],
273
+ allow_headers=["*"],
274
+ )
275
+
276
+
277
+ # ============================================================================
278
+ # Authentication
279
+ # ============================================================================
280
+
281
+ security = HTTPBearer()
282
+
283
+
284
+ def create_access_token(data: dict) -> str:
285
+ """Create JWT access token."""
286
+ to_encode = data.copy()
287
+ expire = datetime.utcnow() + timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES)
288
+ to_encode.update({"exp": expire})
289
+ return jwt.encode(to_encode, config.SECRET_KEY, algorithm=config.ALGORITHM)
290
+
291
+
292
+ def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str:
293
+ """Verify JWT token."""
294
+ token = credentials.credentials
295
+ try:
296
+ payload = jwt.decode(token, config.SECRET_KEY, algorithms=[config.ALGORITHM])
297
+ username: str = payload.get("sub")
298
+ if username is None:
299
+ raise HTTPException(
300
+ status_code=status.HTTP_401_UNAUTHORIZED,
301
+ detail="Invalid authentication credentials",
302
+ )
303
+ return username
304
+ except jwt.PyJWTError:
305
+ raise HTTPException(
306
+ status_code=status.HTTP_401_UNAUTHORIZED,
307
+ detail="Invalid authentication credentials",
308
+ )
309
+
310
+
311
+ # ============================================================================
312
+ # Health and Status Endpoints
313
+ # ============================================================================
314
+
315
+ @app.get("/")
316
+ async def root():
317
+ """Root endpoint."""
318
+ return {
319
+ "name": "BackgroundFX Pro API",
320
+ "version": "1.0.0",
321
+ "status": "running",
322
+ "endpoints": {
323
+ "health": "/health",
324
+ "docs": "/docs",
325
+ "process_image": "/api/v1/process/image",
326
+ "process_video": "/api/v1/process/video",
327
+ "batch": "/api/v1/batch",
328
+ "stream": "/api/v1/stream"
329
+ }
330
+ }
331
+
332
+
333
+ @app.get("/health")
334
+ async def health_check():
335
+ """Health check endpoint."""
336
+ return {
337
+ "status": "healthy",
338
+ "timestamp": datetime.now().isoformat(),
339
+ "services": {
340
+ "pipeline": "ready",
341
+ "video_processor": "ready",
342
+ "batch_processor": "ready",
343
+ "redis": "connected" if app.state.job_manager.redis_client else "disconnected"
344
+ }
345
+ }
346
+
347
+
348
+ @app.get("/api/v1/stats")
349
+ async def get_statistics(current_user: str = Depends(verify_token)):
350
+ """Get processing statistics."""
351
+ return {
352
+ "pipeline": app.state.pipeline.get_statistics(),
353
+ "video": app.state.video_processor.get_stats(),
354
+ "batch": app.state.batch_processor.get_status()
355
+ }
356
+
357
+
358
+ # ============================================================================
359
+ # Image Processing Endpoints
360
+ # ============================================================================
361
+
362
+ @app.post("/api/v1/process/image", response_model=ProcessingResponse)
363
+ async def process_image(
364
+ background_tasks: BackgroundTasks,
365
+ file: UploadFile = File(...),
366
+ request: ImageProcessingRequest = Depends(),
367
+ current_user: str = Depends(verify_token)
368
+ ):
369
+ """Process a single image."""
370
+
371
+ # Validate file
372
+ if not file.filename.lower().endswith(tuple(config.ALLOWED_EXTENSIONS)):
373
+ raise HTTPException(400, "Invalid file format")
374
+
375
+ if file.size > config.MAX_UPLOAD_SIZE:
376
+ raise HTTPException(413, "File too large")
377
+
378
+ # Create job
379
+ job_id = app.state.job_manager.create_job()
380
+
381
+ # Save uploaded file
382
+ upload_path = Path(config.UPLOAD_DIR) / f"{job_id}_{file.filename}"
383
+ async with aiofiles.open(upload_path, 'wb') as f:
384
+ content = await file.read()
385
+ await f.write(content)
386
+
387
+ # Process in background
388
+ background_tasks.add_task(
389
+ process_image_task,
390
+ app.state,
391
+ job_id,
392
+ str(upload_path),
393
+ request
394
+ )
395
+
396
+ return ProcessingResponse(
397
+ job_id=job_id,
398
+ status="processing",
399
+ message="Image processing started"
400
+ )
401
+
402
+
403
+ async def process_image_task(app_state, job_id: str, input_path: str, request: ImageProcessingRequest):
404
+ """Background task for image processing."""
405
+ try:
406
+ # Update job status
407
+ app_state.job_manager.update_job(job_id, status="processing", progress=0.1)
408
+
409
+ # Load image
410
+ image = cv2.imread(input_path)
411
+
412
+ # Prepare background
413
+ background = None
414
+ if request.background == BackgroundType.CUSTOM and request.background_url:
415
+ # Download custom background
416
+ # ... implementation ...
417
+ pass
418
+ elif request.background != BackgroundType.NONE:
419
+ background = request.background.value
420
+
421
+ # Configure pipeline
422
+ config = PipelineConfig(
423
+ quality_preset=request.quality.value,
424
+ apply_effects=request.apply_effects
425
+ )
426
+
427
+ # Process image
428
+ result = app_state.pipeline.process_image(image, background)
429
+
430
+ if result.success:
431
+ # Save output
432
+ output_filename = f"{job_id}_output.{request.output_format}"
433
+ output_path = Path(config.OUTPUT_DIR) / output_filename
434
+ cv2.imwrite(str(output_path), result.output_image)
435
+
436
+ # Update job
437
+ app_state.job_manager.update_job(
438
+ job_id,
439
+ status="completed",
440
+ progress=1.0,
441
+ result_url=f"/api/v1/download/{output_filename}",
442
+ completed_at=datetime.now(),
443
+ metadata={
444
+ "quality_score": result.quality_score,
445
+ "processing_time": result.processing_time
446
+ }
447
+ )
448
+ else:
449
+ app_state.job_manager.update_job(
450
+ job_id,
451
+ status="failed",
452
+ message="Processing failed"
453
+ )
454
+
455
+ except Exception as e:
456
+ logger.error(f"Image processing failed for job {job_id}: {e}")
457
+ app_state.job_manager.update_job(
458
+ job_id,
459
+ status="failed",
460
+ message=str(e)
461
+ )
462
+
463
+
464
+ # ============================================================================
465
+ # Video Processing Endpoints
466
+ # ============================================================================
467
+
468
+ @app.post("/api/v1/process/video", response_model=ProcessingResponse)
469
+ async def process_video(
470
+ background_tasks: BackgroundTasks,
471
+ file: UploadFile = File(...),
472
+ request: VideoProcessingRequest = Depends(),
473
+ current_user: str = Depends(verify_token)
474
+ ):
475
+ """Process a video file."""
476
+
477
+ # Validate file
478
+ if not file.filename.lower().endswith(('.mp4', '.avi', '.mov', '.mkv')):
479
+ raise HTTPException(400, "Invalid video format")
480
+
481
+ # Create job
482
+ job_id = app_state.job_manager.create_job()
483
+
484
+ # Save uploaded file
485
+ upload_path = Path(config.UPLOAD_DIR) / f"{job_id}_{file.filename}"
486
+ async with aiofiles.open(upload_path, 'wb') as f:
487
+ content = await file.read()
488
+ await f.write(content)
489
+
490
+ # Process in background
491
+ background_tasks.add_task(
492
+ process_video_task,
493
+ app.state,
494
+ job_id,
495
+ str(upload_path),
496
+ request
497
+ )
498
+
499
+ return ProcessingResponse(
500
+ job_id=job_id,
501
+ status="processing",
502
+ message="Video processing started"
503
+ )
504
+
505
+
506
+ async def process_video_task(app_state, job_id: str, input_path: str, request: VideoProcessingRequest):
507
+ """Background task for video processing."""
508
+ try:
509
+ # Progress callback
510
+ def progress_callback(progress: float, info: Dict):
511
+ app_state.job_manager.update_job(
512
+ job_id,
513
+ progress=progress,
514
+ metadata=info
515
+ )
516
+
517
+ # Process video
518
+ output_path = Path(config.OUTPUT_DIR) / f"{job_id}_output.mp4"
519
+
520
+ stats = await app_state.video_processor.process_video_async(
521
+ input_path,
522
+ str(output_path),
523
+ background=request.background.value if request.background != BackgroundType.NONE else None,
524
+ progress_callback=progress_callback
525
+ )
526
+
527
+ # Update job
528
+ app_state.job_manager.update_job(
529
+ job_id,
530
+ status="completed",
531
+ progress=1.0,
532
+ result_url=f"/api/v1/download/{output_path.name}",
533
+ completed_at=datetime.now(),
534
+ metadata={
535
+ "frames_processed": stats.frames_processed,
536
+ "processing_fps": stats.processing_fps,
537
+ "avg_quality": stats.avg_quality_score
538
+ }
539
+ )
540
+
541
+ except Exception as e:
542
+ logger.error(f"Video processing failed for job {job_id}: {e}")
543
+ app_state.job_manager.update_job(
544
+ job_id,
545
+ status="failed",
546
+ message=str(e)
547
+ )
548
+
549
+
550
+ # ============================================================================
551
+ # Batch Processing Endpoints
552
+ # ============================================================================
553
+
554
+ @app.post("/api/v1/batch", response_model=ProcessingResponse)
555
+ async def process_batch(
556
+ background_tasks: BackgroundTasks,
557
+ request: BatchProcessingRequest,
558
+ current_user: str = Depends(verify_token)
559
+ ):
560
+ """Process multiple files in batch."""
561
+
562
+ # Create job
563
+ job_id = app.state.job_manager.create_job()
564
+
565
+ # Process in background
566
+ background_tasks.add_task(
567
+ process_batch_task,
568
+ app.state,
569
+ job_id,
570
+ request
571
+ )
572
+
573
+ return ProcessingResponse(
574
+ job_id=job_id,
575
+ status="processing",
576
+ message=f"Batch processing started for {len(request.items)} items"
577
+ )
578
+
579
+
580
+ async def process_batch_task(app_state, job_id: str, request: BatchProcessingRequest):
581
+ """Background task for batch processing."""
582
+ try:
583
+ # Convert request items to BatchItems
584
+ batch_items = []
585
+ for item_data in request.items:
586
+ batch_item = BatchItem(
587
+ id=item_data.get('id', str(uuid.uuid4())),
588
+ input_path=item_data['input_path'],
589
+ output_path=item_data['output_path'],
590
+ file_type=item_data.get('file_type', 'image'),
591
+ priority=BatchPriority[request.priority.upper()],
592
+ background=item_data.get('background')
593
+ )
594
+ batch_items.append(batch_item)
595
+
596
+ # Progress callback
597
+ def progress_callback(progress: float, info: Dict):
598
+ app_state.job_manager.update_job(
599
+ job_id,
600
+ progress=progress,
601
+ metadata=info
602
+ )
603
+
604
+ # Configure batch processor
605
+ batch_config = BatchConfig(
606
+ progress_callback=progress_callback,
607
+ max_workers=config.MAX_WORKERS if request.parallel else 1
608
+ )
609
+
610
+ processor = BatchProcessor(batch_config)
611
+ report = processor.process_batch(batch_items)
612
+
613
+ # Update job
614
+ app_state.job_manager.update_job(
615
+ job_id,
616
+ status="completed",
617
+ progress=1.0,
618
+ completed_at=datetime.now(),
619
+ metadata={
620
+ "total_items": report.total_items,
621
+ "successful_items": report.successful_items,
622
+ "failed_items": report.failed_items,
623
+ "avg_quality": report.quality_metrics.get('avg_quality', 0)
624
+ }
625
+ )
626
+
627
+ # Callback if provided
628
+ if request.callback_url:
629
+ # Send completion callback
630
+ # ... implementation ...
631
+ pass
632
+
633
+ except Exception as e:
634
+ logger.error(f"Batch processing failed for job {job_id}: {e}")
635
+ app_state.job_manager.update_job(
636
+ job_id,
637
+ status="failed",
638
+ message=str(e)
639
+ )
640
+
641
+
642
+ # ============================================================================
643
+ # Streaming Endpoints
644
+ # ============================================================================
645
+
646
+ @app.post("/api/v1/stream/start")
647
+ async def start_stream(
648
+ request: StreamingRequest,
649
+ current_user: str = Depends(verify_token)
650
+ ):
651
+ """Start a streaming session."""
652
+
653
+ # Configure streaming
654
+ stream_config = StreamConfig(
655
+ source=request.source,
656
+ stream_mode=VideoStreamMode[request.stream_type.upper()],
657
+ output_format=request.output_format,
658
+ output_path=f"{config.OUTPUT_DIR}/stream_{uuid.uuid4()}"
659
+ )
660
+
661
+ # Start streaming
662
+ success = app.state.video_processor.start_stream_processing(
663
+ stream_config,
664
+ background=None # Configure as needed
665
+ )
666
+
667
+ if success:
668
+ return {
669
+ "status": "streaming",
670
+ "stream_url": f"/api/v1/stream/live/{stream_config.output_path}",
671
+ "message": "Streaming started"
672
+ }
673
+ else:
674
+ raise HTTPException(500, "Failed to start streaming")
675
+
676
+
677
+ @app.get("/api/v1/stream/stop")
678
+ async def stop_stream(current_user: str = Depends(verify_token)):
679
+ """Stop streaming session."""
680
+ app.state.video_processor.stop_stream_processing()
681
+ return {"status": "stopped", "message": "Streaming stopped"}
682
+
683
+
684
+ @app.get("/api/v1/stream/preview")
685
+ async def get_stream_preview(current_user: str = Depends(verify_token)):
686
+ """Get stream preview frame."""
687
+ frame = app.state.video_processor.get_preview_frame()
688
+
689
+ if frame is not None:
690
+ # Convert to JPEG
691
+ _, buffer = cv2.imencode('.jpg', frame)
692
+ return StreamingResponse(
693
+ io.BytesIO(buffer),
694
+ media_type="image/jpeg"
695
+ )
696
+ else:
697
+ raise HTTPException(404, "No preview available")
698
+
699
+
700
+ # ============================================================================
701
+ # Job Management Endpoints
702
+ # ============================================================================
703
+
704
+ @app.get("/api/v1/job/{job_id}", response_model=ProcessingResponse)
705
+ async def get_job_status(
706
+ job_id: str,
707
+ current_user: str = Depends(verify_token)
708
+ ):
709
+ """Get job status."""
710
+ job = app.state.job_manager.get_job(job_id)
711
+
712
+ if job:
713
+ return job
714
+ else:
715
+ raise HTTPException(404, "Job not found")
716
+
717
+
718
+ @app.get("/api/v1/jobs")
719
+ async def list_jobs(
720
+ current_user: str = Depends(verify_token),
721
+ limit: int = 10,
722
+ offset: int = 0
723
+ ):
724
+ """List recent jobs."""
725
+ jobs = list(app.state.job_manager.jobs.values())
726
+ return {
727
+ "total": len(jobs),
728
+ "jobs": jobs[offset:offset + limit]
729
+ }
730
+
731
+
732
+ @app.delete("/api/v1/job/{job_id}")
733
+ async def cancel_job(
734
+ job_id: str,
735
+ current_user: str = Depends(verify_token)
736
+ ):
737
+ """Cancel a job."""
738
+ # Implementation would depend on your cancellation mechanism
739
+ app.state.job_manager.update_job(job_id, status="cancelled")
740
+ return {"message": "Job cancelled"}
741
+
742
+
743
+ # ============================================================================
744
+ # Download Endpoints
745
+ # ============================================================================
746
+
747
+ @app.get("/api/v1/download/{filename}")
748
+ async def download_file(
749
+ filename: str,
750
+ current_user: str = Depends(verify_token)
751
+ ):
752
+ """Download processed file."""
753
+ file_path = Path(config.OUTPUT_DIR) / filename
754
+
755
+ if file_path.exists():
756
+ return FileResponse(
757
+ path=file_path,
758
+ filename=filename,
759
+ media_type='application/octet-stream'
760
+ )
761
+ else:
762
+ raise HTTPException(404, "File not found")
763
+
764
+
765
+ # ============================================================================
766
+ # WebSocket for Real-time Updates
767
+ # ============================================================================
768
+
769
+ from fastapi import WebSocket, WebSocketDisconnect
770
+
771
+ @app.websocket("/ws/job/{job_id}")
772
+ async def websocket_job_updates(websocket: WebSocket, job_id: str):
773
+ """WebSocket for real-time job updates."""
774
+ await websocket.accept()
775
+
776
+ try:
777
+ while True:
778
+ # Get job status
779
+ job = app.state.job_manager.get_job(job_id)
780
+
781
+ if job:
782
+ await websocket.send_json(job.dict())
783
+
784
+ if job.status in ["completed", "failed", "cancelled"]:
785
+ break
786
+
787
+ await asyncio.sleep(1)
788
+
789
+ except WebSocketDisconnect:
790
+ logger.info(f"WebSocket disconnected for job {job_id}")
791
+
792
+
793
+ # ============================================================================
794
+ # Run Server
795
+ # ============================================================================
796
+
797
+ if __name__ == "__main__":
798
+ import uvicorn
799
+
800
+ uvicorn.run(
801
+ app,
802
+ host=config.HOST,
803
+ port=config.PORT,
804
+ log_level="info"
805
+ )