| | """ |
| | REST API server for BackgroundFX Pro. |
| | Provides HTTP endpoints for all processing functionality. |
| | """ |
| |
|
| | from fastapi import FastAPI, File, UploadFile, Form, HTTPException, BackgroundTasks, Depends, status |
| | from fastapi.responses import FileResponse, StreamingResponse, JSONResponse |
| | from fastapi.middleware.cors import CORSMiddleware |
| | from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
| | from fastapi.staticfiles import StaticFiles |
| | from pydantic import BaseModel, Field, validator |
| | from typing import Dict, List, Optional, Union, Any |
| | from enum import Enum |
| | import asyncio |
| | import aiofiles |
| | from pathlib import Path |
| | import tempfile |
| | import shutil |
| | import uuid |
| | import time |
| | from datetime import datetime, timedelta |
| | import jwt |
| | import cv2 |
| | import numpy as np |
| | import io |
| | import base64 |
| | from concurrent.futures import ThreadPoolExecutor |
| | import redis |
| | from contextlib import asynccontextmanager |
| |
|
| | from ..utils.logger import setup_logger |
| | from .pipeline import ProcessingPipeline, PipelineConfig, ProcessingMode |
| | from .video_processor import VideoProcessorAPI, StreamConfig, VideoStreamMode |
| | from .batch_processor import BatchProcessor, BatchConfig, BatchItem, BatchPriority |
| |
|
| | logger = setup_logger(__name__) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class ServerConfig: |
| | """Server configuration.""" |
| | HOST: str = "0.0.0.0" |
| | PORT: int = 8000 |
| | UPLOAD_DIR: str = "uploads" |
| | OUTPUT_DIR: str = "outputs" |
| | TEMP_DIR: str = "temp" |
| | MAX_UPLOAD_SIZE: int = 500 * 1024 * 1024 |
| | ALLOWED_EXTENSIONS: List[str] = [".jpg", ".jpeg", ".png", ".mp4", ".avi", ".mov"] |
| | |
| | |
| | SECRET_KEY: str = "your-secret-key-change-in-production" |
| | ALGORITHM: str = "HS256" |
| | ACCESS_TOKEN_EXPIRE_MINUTES: int = 30 |
| | |
| | |
| | REDIS_URL: str = "redis://localhost:6379" |
| | CACHE_TTL: int = 3600 |
| | |
| | |
| | RATE_LIMIT_REQUESTS: int = 100 |
| | RATE_LIMIT_WINDOW: int = 60 |
| | |
| | |
| | MAX_WORKERS: int = 4 |
| | ENABLE_GPU: bool = True |
| |
|
| |
|
| | config = ServerConfig() |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class BackgroundType(str, Enum): |
| | """Background types.""" |
| | BLUR = "blur" |
| | OFFICE = "office" |
| | GRADIENT = "gradient" |
| | NATURE = "nature" |
| | CUSTOM = "custom" |
| | NONE = "none" |
| |
|
| |
|
| | class QualityPreset(str, Enum): |
| | """Quality presets.""" |
| | LOW = "low" |
| | MEDIUM = "medium" |
| | HIGH = "high" |
| | ULTRA = "ultra" |
| |
|
| |
|
| | class ProcessingRequest(BaseModel): |
| | """Base processing request.""" |
| | background: BackgroundType = BackgroundType.BLUR |
| | background_url: Optional[str] = None |
| | quality: QualityPreset = QualityPreset.HIGH |
| | preserve_original: bool = False |
| | |
| | class Config: |
| | schema_extra = { |
| | "example": { |
| | "background": "office", |
| | "quality": "high", |
| | "preserve_original": False |
| | } |
| | } |
| |
|
| |
|
| | class ImageProcessingRequest(ProcessingRequest): |
| | """Image processing request.""" |
| | resize: Optional[tuple[int, int]] = None |
| | apply_effects: List[str] = Field(default_factory=list) |
| | output_format: str = "png" |
| |
|
| |
|
| | class VideoProcessingRequest(ProcessingRequest): |
| | """Video processing request.""" |
| | start_time: Optional[float] = None |
| | end_time: Optional[float] = None |
| | fps: Optional[float] = None |
| | resolution: Optional[tuple[int, int]] = None |
| | codec: str = "h264" |
| |
|
| |
|
| | class BatchProcessingRequest(BaseModel): |
| | """Batch processing request.""" |
| | items: List[Dict[str, Any]] |
| | parallel: bool = True |
| | priority: str = "normal" |
| | callback_url: Optional[str] = None |
| |
|
| |
|
| | class StreamingRequest(BaseModel): |
| | """Streaming request.""" |
| | source: str |
| | stream_type: str = "webcam" |
| | output_format: str = "hls" |
| | quality: QualityPreset = QualityPreset.MEDIUM |
| |
|
| |
|
| | class ProcessingResponse(BaseModel): |
| | """Processing response.""" |
| | job_id: str |
| | status: str |
| | progress: float = 0.0 |
| | message: Optional[str] = None |
| | result_url: Optional[str] = None |
| | metadata: Dict[str, Any] = Field(default_factory=dict) |
| | created_at: datetime = Field(default_factory=datetime.now) |
| | completed_at: Optional[datetime] = None |
| |
|
| |
|
| | class JobStatus(BaseModel): |
| | """Job status response.""" |
| | job_id: str |
| | status: str |
| | progress: float |
| | current_stage: Optional[str] = None |
| | time_elapsed: float |
| | time_remaining: Optional[float] = None |
| | errors: List[str] = Field(default_factory=list) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class JobManager: |
| | """Manage processing jobs.""" |
| | |
| | def __init__(self): |
| | self.jobs: Dict[str, ProcessingResponse] = {} |
| | self.executor = ThreadPoolExecutor(max_workers=config.MAX_WORKERS) |
| | self.redis_client = None |
| | try: |
| | self.redis_client = redis.from_url(config.REDIS_URL) |
| | except: |
| | logger.warning("Redis not available, using in-memory storage") |
| | |
| | def create_job(self) -> str: |
| | """Create new job ID.""" |
| | job_id = str(uuid.uuid4()) |
| | self.jobs[job_id] = ProcessingResponse( |
| | job_id=job_id, |
| | status="pending" |
| | ) |
| | return job_id |
| | |
| | def update_job(self, job_id: str, **kwargs): |
| | """Update job status.""" |
| | if job_id in self.jobs: |
| | for key, value in kwargs.items(): |
| | if hasattr(self.jobs[job_id], key): |
| | setattr(self.jobs[job_id], key, value) |
| | |
| | |
| | if self.redis_client: |
| | try: |
| | self.redis_client.setex( |
| | f"job:{job_id}", |
| | config.CACHE_TTL, |
| | self.jobs[job_id].json() |
| | ) |
| | except: |
| | pass |
| | |
| | def get_job(self, job_id: str) -> Optional[ProcessingResponse]: |
| | """Get job status.""" |
| | |
| | if job_id in self.jobs: |
| | return self.jobs[job_id] |
| | |
| | |
| | if self.redis_client: |
| | try: |
| | data = self.redis_client.get(f"job:{job_id}") |
| | if data: |
| | return ProcessingResponse.parse_raw(data) |
| | except: |
| | pass |
| | |
| | return None |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @asynccontextmanager |
| | async def lifespan(app: FastAPI): |
| | """Application lifespan manager.""" |
| | |
| | logger.info("Starting BackgroundFX Pro API Server") |
| | |
| | |
| | for dir_path in [config.UPLOAD_DIR, config.OUTPUT_DIR, config.TEMP_DIR]: |
| | Path(dir_path).mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | app.state.pipeline = ProcessingPipeline( |
| | PipelineConfig(use_gpu=config.ENABLE_GPU) |
| | ) |
| | app.state.video_processor = VideoProcessorAPI() |
| | app.state.batch_processor = BatchProcessor() |
| | app.state.job_manager = JobManager() |
| | |
| | yield |
| | |
| | |
| | logger.info("Shutting down BackgroundFX Pro API Server") |
| | app.state.pipeline.shutdown() |
| | app.state.video_processor.cleanup() |
| | app.state.batch_processor.cleanup() |
| |
|
| |
|
| | app = FastAPI( |
| | title="BackgroundFX Pro API", |
| | description="Professional background removal and replacement API", |
| | version="1.0.0", |
| | lifespan=lifespan |
| | ) |
| |
|
| | |
| | app.add_middleware( |
| | CORSMiddleware, |
| | allow_origins=["*"], |
| | allow_credentials=True, |
| | allow_methods=["*"], |
| | allow_headers=["*"], |
| | ) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | security = HTTPBearer() |
| |
|
| |
|
| | def create_access_token(data: dict) -> str: |
| | """Create JWT access token.""" |
| | to_encode = data.copy() |
| | expire = datetime.utcnow() + timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES) |
| | to_encode.update({"exp": expire}) |
| | return jwt.encode(to_encode, config.SECRET_KEY, algorithm=config.ALGORITHM) |
| |
|
| |
|
| | def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str: |
| | """Verify JWT token.""" |
| | token = credentials.credentials |
| | try: |
| | payload = jwt.decode(token, config.SECRET_KEY, algorithms=[config.ALGORITHM]) |
| | username: str = payload.get("sub") |
| | if username is None: |
| | raise HTTPException( |
| | status_code=status.HTTP_401_UNAUTHORIZED, |
| | detail="Invalid authentication credentials", |
| | ) |
| | return username |
| | except jwt.PyJWTError: |
| | raise HTTPException( |
| | status_code=status.HTTP_401_UNAUTHORIZED, |
| | detail="Invalid authentication credentials", |
| | ) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @app.get("/") |
| | async def root(): |
| | """Root endpoint.""" |
| | return { |
| | "name": "BackgroundFX Pro API", |
| | "version": "1.0.0", |
| | "status": "running", |
| | "endpoints": { |
| | "health": "/health", |
| | "docs": "/docs", |
| | "process_image": "/api/v1/process/image", |
| | "process_video": "/api/v1/process/video", |
| | "batch": "/api/v1/batch", |
| | "stream": "/api/v1/stream" |
| | } |
| | } |
| |
|
| |
|
| | @app.get("/health") |
| | async def health_check(): |
| | """Health check endpoint.""" |
| | return { |
| | "status": "healthy", |
| | "timestamp": datetime.now().isoformat(), |
| | "services": { |
| | "pipeline": "ready", |
| | "video_processor": "ready", |
| | "batch_processor": "ready", |
| | "redis": "connected" if app.state.job_manager.redis_client else "disconnected" |
| | } |
| | } |
| |
|
| |
|
| | @app.get("/api/v1/stats") |
| | async def get_statistics(current_user: str = Depends(verify_token)): |
| | """Get processing statistics.""" |
| | return { |
| | "pipeline": app.state.pipeline.get_statistics(), |
| | "video": app.state.video_processor.get_stats(), |
| | "batch": app.state.batch_processor.get_status() |
| | } |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @app.post("/api/v1/process/image", response_model=ProcessingResponse) |
| | async def process_image( |
| | background_tasks: BackgroundTasks, |
| | file: UploadFile = File(...), |
| | request: ImageProcessingRequest = Depends(), |
| | current_user: str = Depends(verify_token) |
| | ): |
| | """Process a single image.""" |
| | |
| | |
| | if not file.filename.lower().endswith(tuple(config.ALLOWED_EXTENSIONS)): |
| | raise HTTPException(400, "Invalid file format") |
| | |
| | if file.size > config.MAX_UPLOAD_SIZE: |
| | raise HTTPException(413, "File too large") |
| | |
| | |
| | job_id = app.state.job_manager.create_job() |
| | |
| | |
| | upload_path = Path(config.UPLOAD_DIR) / f"{job_id}_{file.filename}" |
| | async with aiofiles.open(upload_path, 'wb') as f: |
| | content = await file.read() |
| | await f.write(content) |
| | |
| | |
| | background_tasks.add_task( |
| | process_image_task, |
| | app.state, |
| | job_id, |
| | str(upload_path), |
| | request |
| | ) |
| | |
| | return ProcessingResponse( |
| | job_id=job_id, |
| | status="processing", |
| | message="Image processing started" |
| | ) |
| |
|
| |
|
| | async def process_image_task(app_state, job_id: str, input_path: str, request: ImageProcessingRequest): |
| | """Background task for image processing.""" |
| | try: |
| | |
| | app_state.job_manager.update_job(job_id, status="processing", progress=0.1) |
| | |
| | |
| | image = cv2.imread(input_path) |
| | |
| | |
| | background = None |
| | if request.background == BackgroundType.CUSTOM and request.background_url: |
| | |
| | |
| | pass |
| | elif request.background != BackgroundType.NONE: |
| | background = request.background.value |
| | |
| | |
| | config = PipelineConfig( |
| | quality_preset=request.quality.value, |
| | apply_effects=request.apply_effects |
| | ) |
| | |
| | |
| | result = app_state.pipeline.process_image(image, background) |
| | |
| | if result.success: |
| | |
| | output_filename = f"{job_id}_output.{request.output_format}" |
| | output_path = Path(config.OUTPUT_DIR) / output_filename |
| | cv2.imwrite(str(output_path), result.output_image) |
| | |
| | |
| | app_state.job_manager.update_job( |
| | job_id, |
| | status="completed", |
| | progress=1.0, |
| | result_url=f"/api/v1/download/{output_filename}", |
| | completed_at=datetime.now(), |
| | metadata={ |
| | "quality_score": result.quality_score, |
| | "processing_time": result.processing_time |
| | } |
| | ) |
| | else: |
| | app_state.job_manager.update_job( |
| | job_id, |
| | status="failed", |
| | message="Processing failed" |
| | ) |
| | |
| | except Exception as e: |
| | logger.error(f"Image processing failed for job {job_id}: {e}") |
| | app_state.job_manager.update_job( |
| | job_id, |
| | status="failed", |
| | message=str(e) |
| | ) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @app.post("/api/v1/process/video", response_model=ProcessingResponse) |
| | async def process_video( |
| | background_tasks: BackgroundTasks, |
| | file: UploadFile = File(...), |
| | request: VideoProcessingRequest = Depends(), |
| | current_user: str = Depends(verify_token) |
| | ): |
| | """Process a video file.""" |
| | |
| | |
| | if not file.filename.lower().endswith(('.mp4', '.avi', '.mov', '.mkv')): |
| | raise HTTPException(400, "Invalid video format") |
| | |
| | |
| | job_id = app_state.job_manager.create_job() |
| | |
| | |
| | upload_path = Path(config.UPLOAD_DIR) / f"{job_id}_{file.filename}" |
| | async with aiofiles.open(upload_path, 'wb') as f: |
| | content = await file.read() |
| | await f.write(content) |
| | |
| | |
| | background_tasks.add_task( |
| | process_video_task, |
| | app.state, |
| | job_id, |
| | str(upload_path), |
| | request |
| | ) |
| | |
| | return ProcessingResponse( |
| | job_id=job_id, |
| | status="processing", |
| | message="Video processing started" |
| | ) |
| |
|
| |
|
| | async def process_video_task(app_state, job_id: str, input_path: str, request: VideoProcessingRequest): |
| | """Background task for video processing.""" |
| | try: |
| | |
| | def progress_callback(progress: float, info: Dict): |
| | app_state.job_manager.update_job( |
| | job_id, |
| | progress=progress, |
| | metadata=info |
| | ) |
| | |
| | |
| | output_path = Path(config.OUTPUT_DIR) / f"{job_id}_output.mp4" |
| | |
| | stats = await app_state.video_processor.process_video_async( |
| | input_path, |
| | str(output_path), |
| | background=request.background.value if request.background != BackgroundType.NONE else None, |
| | progress_callback=progress_callback |
| | ) |
| | |
| | |
| | app_state.job_manager.update_job( |
| | job_id, |
| | status="completed", |
| | progress=1.0, |
| | result_url=f"/api/v1/download/{output_path.name}", |
| | completed_at=datetime.now(), |
| | metadata={ |
| | "frames_processed": stats.frames_processed, |
| | "processing_fps": stats.processing_fps, |
| | "avg_quality": stats.avg_quality_score |
| | } |
| | ) |
| | |
| | except Exception as e: |
| | logger.error(f"Video processing failed for job {job_id}: {e}") |
| | app_state.job_manager.update_job( |
| | job_id, |
| | status="failed", |
| | message=str(e) |
| | ) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @app.post("/api/v1/batch", response_model=ProcessingResponse) |
| | async def process_batch( |
| | background_tasks: BackgroundTasks, |
| | request: BatchProcessingRequest, |
| | current_user: str = Depends(verify_token) |
| | ): |
| | """Process multiple files in batch.""" |
| | |
| | |
| | job_id = app.state.job_manager.create_job() |
| | |
| | |
| | background_tasks.add_task( |
| | process_batch_task, |
| | app.state, |
| | job_id, |
| | request |
| | ) |
| | |
| | return ProcessingResponse( |
| | job_id=job_id, |
| | status="processing", |
| | message=f"Batch processing started for {len(request.items)} items" |
| | ) |
| |
|
| |
|
| | async def process_batch_task(app_state, job_id: str, request: BatchProcessingRequest): |
| | """Background task for batch processing.""" |
| | try: |
| | |
| | batch_items = [] |
| | for item_data in request.items: |
| | batch_item = BatchItem( |
| | id=item_data.get('id', str(uuid.uuid4())), |
| | input_path=item_data['input_path'], |
| | output_path=item_data['output_path'], |
| | file_type=item_data.get('file_type', 'image'), |
| | priority=BatchPriority[request.priority.upper()], |
| | background=item_data.get('background') |
| | ) |
| | batch_items.append(batch_item) |
| | |
| | |
| | def progress_callback(progress: float, info: Dict): |
| | app_state.job_manager.update_job( |
| | job_id, |
| | progress=progress, |
| | metadata=info |
| | ) |
| | |
| | |
| | batch_config = BatchConfig( |
| | progress_callback=progress_callback, |
| | max_workers=config.MAX_WORKERS if request.parallel else 1 |
| | ) |
| | |
| | processor = BatchProcessor(batch_config) |
| | report = processor.process_batch(batch_items) |
| | |
| | |
| | app_state.job_manager.update_job( |
| | job_id, |
| | status="completed", |
| | progress=1.0, |
| | completed_at=datetime.now(), |
| | metadata={ |
| | "total_items": report.total_items, |
| | "successful_items": report.successful_items, |
| | "failed_items": report.failed_items, |
| | "avg_quality": report.quality_metrics.get('avg_quality', 0) |
| | } |
| | ) |
| | |
| | |
| | if request.callback_url: |
| | |
| | |
| | pass |
| | |
| | except Exception as e: |
| | logger.error(f"Batch processing failed for job {job_id}: {e}") |
| | app_state.job_manager.update_job( |
| | job_id, |
| | status="failed", |
| | message=str(e) |
| | ) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @app.post("/api/v1/stream/start") |
| | async def start_stream( |
| | request: StreamingRequest, |
| | current_user: str = Depends(verify_token) |
| | ): |
| | """Start a streaming session.""" |
| | |
| | |
| | stream_config = StreamConfig( |
| | source=request.source, |
| | stream_mode=VideoStreamMode[request.stream_type.upper()], |
| | output_format=request.output_format, |
| | output_path=f"{config.OUTPUT_DIR}/stream_{uuid.uuid4()}" |
| | ) |
| | |
| | |
| | success = app.state.video_processor.start_stream_processing( |
| | stream_config, |
| | background=None |
| | ) |
| | |
| | if success: |
| | return { |
| | "status": "streaming", |
| | "stream_url": f"/api/v1/stream/live/{stream_config.output_path}", |
| | "message": "Streaming started" |
| | } |
| | else: |
| | raise HTTPException(500, "Failed to start streaming") |
| |
|
| |
|
| | @app.get("/api/v1/stream/stop") |
| | async def stop_stream(current_user: str = Depends(verify_token)): |
| | """Stop streaming session.""" |
| | app.state.video_processor.stop_stream_processing() |
| | return {"status": "stopped", "message": "Streaming stopped"} |
| |
|
| |
|
| | @app.get("/api/v1/stream/preview") |
| | async def get_stream_preview(current_user: str = Depends(verify_token)): |
| | """Get stream preview frame.""" |
| | frame = app.state.video_processor.get_preview_frame() |
| | |
| | if frame is not None: |
| | |
| | _, buffer = cv2.imencode('.jpg', frame) |
| | return StreamingResponse( |
| | io.BytesIO(buffer), |
| | media_type="image/jpeg" |
| | ) |
| | else: |
| | raise HTTPException(404, "No preview available") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @app.get("/api/v1/job/{job_id}", response_model=ProcessingResponse) |
| | async def get_job_status( |
| | job_id: str, |
| | current_user: str = Depends(verify_token) |
| | ): |
| | """Get job status.""" |
| | job = app.state.job_manager.get_job(job_id) |
| | |
| | if job: |
| | return job |
| | else: |
| | raise HTTPException(404, "Job not found") |
| |
|
| |
|
| | @app.get("/api/v1/jobs") |
| | async def list_jobs( |
| | current_user: str = Depends(verify_token), |
| | limit: int = 10, |
| | offset: int = 0 |
| | ): |
| | """List recent jobs.""" |
| | jobs = list(app.state.job_manager.jobs.values()) |
| | return { |
| | "total": len(jobs), |
| | "jobs": jobs[offset:offset + limit] |
| | } |
| |
|
| |
|
| | @app.delete("/api/v1/job/{job_id}") |
| | async def cancel_job( |
| | job_id: str, |
| | current_user: str = Depends(verify_token) |
| | ): |
| | """Cancel a job.""" |
| | |
| | app.state.job_manager.update_job(job_id, status="cancelled") |
| | return {"message": "Job cancelled"} |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @app.get("/api/v1/download/{filename}") |
| | async def download_file( |
| | filename: str, |
| | current_user: str = Depends(verify_token) |
| | ): |
| | """Download processed file.""" |
| | file_path = Path(config.OUTPUT_DIR) / filename |
| | |
| | if file_path.exists(): |
| | return FileResponse( |
| | path=file_path, |
| | filename=filename, |
| | media_type='application/octet-stream' |
| | ) |
| | else: |
| | raise HTTPException(404, "File not found") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | from fastapi import WebSocket, WebSocketDisconnect |
| |
|
| | @app.websocket("/ws/job/{job_id}") |
| | async def websocket_job_updates(websocket: WebSocket, job_id: str): |
| | """WebSocket for real-time job updates.""" |
| | await websocket.accept() |
| | |
| | try: |
| | while True: |
| | |
| | job = app.state.job_manager.get_job(job_id) |
| | |
| | if job: |
| | await websocket.send_json(job.dict()) |
| | |
| | if job.status in ["completed", "failed", "cancelled"]: |
| | break |
| | |
| | await asyncio.sleep(1) |
| | |
| | except WebSocketDisconnect: |
| | logger.info(f"WebSocket disconnected for job {job_id}") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | if __name__ == "__main__": |
| | import uvicorn |
| | |
| | uvicorn.run( |
| | app, |
| | host=config.HOST, |
| | port=config.PORT, |
| | log_level="info" |
| | ) |