test / behavior_backend /app /api /routes /processing.py
hibatorrahmen's picture
added fixes
1d14811
from fastapi import APIRouter, Depends, BackgroundTasks
from sqlalchemy.orm import Session
from app.db.base import get_db
from app.models.processing import ProcessingRequest, ProcessingStatus
from app.services.processing.processing_service import ProcessingService
from app.utils.logging_utils import setup_logger, log_success
from app.utils.auth import get_current_active_user, get_api_key_user
from app.db.models import User
"""
Video Processing API Routes
==========================
This module provides API endpoints for video processing operations, including:
- Initiating video processing
- Checking processing status
- Retrieving processing results
All endpoints are prefixed with '/processing' and include appropriate logging
with endpoint identification in square brackets.
"""
# Setup logger for this module
logger = setup_logger("processing_router")
router = APIRouter(
prefix="/processing",
tags=["processing"],
responses={404: {"description": "Not found"}},
)
@router.post("", response_model=ProcessingStatus)
async def process_video(
request: ProcessingRequest,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""
Initiate processing of a video.
This endpoint accepts a video ID and initiates the processing pipeline
as a background task. It returns a processing status object with the
video ID that can be used to check the status later.
Args:
request (ProcessingRequest): Request object containing the video ID and processing options
background_tasks (BackgroundTasks): FastAPI background tasks manager
db (Session): Database session dependency
current_user (User): Current active user dependency
Returns:
ProcessingStatus: Object containing the video ID and initial processing status
Example:
POST /processing
{
"video_id": "vid-12345",
"frame_rate": 5,
"language": "en",
"generate_annotated_video": true
}
"""
logger.info(f"[process_video] Received request to process video: {request.video_id}")
processing_service = ProcessingService(db)
result = await processing_service.process_video(request, background_tasks)
logger.info(f" : {result.video_id}")
return result
@router.get("/status/{video_id}", response_model=ProcessingStatus)
async def get_processing_status(
video_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""
Get the current processing status of a video.
This endpoint retrieves the current status of a video processing job
using the video ID returned from the process_video endpoint.
Args:
video_id (str): Unique identifier for the video processing job
db (Session): Database session dependency
current_user (User): Current active user dependency
Returns:
ProcessingStatus: Object containing the video ID and current processing status
Example:
GET /processing/status/vid-12345
"""
logger.info(f"[get_processing_status] Checking status for video ID: {video_id}")
processing_service = ProcessingService(db)
status = processing_service.get_processing_status(video_id)
logger.info(f"[get_processing_status] Status for video ID {video_id}: {status.status}")
return status
@router.get("/results/{video_id}")
async def get_processing_results(
video_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""
Get the results of a completed video processing job.
This endpoint retrieves the full results of a video processing job
after it has completed. It should only be called after the status
endpoint indicates that processing is complete.
Args:
video_id (str): Unique identifier for the video processing job
db (Session): Database session dependency
current_user (User): Current active user dependency
Returns:
dict: Processing results including behavior analytics data
Example:
GET /processing/results/vid-12345
"""
logger.info(f"[get_processing_results] Retrieving results for video ID: {video_id}")
processing_service = ProcessingService(db)
results = processing_service.get_processing_results(video_id)
log_success(logger, f"[get_processing_results] Successfully retrieved results for video ID: {video_id}")
return results
@router.post("/direct", response_model=ProcessingStatus)
@router.post("/direct/", response_model=ProcessingStatus)
async def process_video_direct(
request: ProcessingRequest,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
api_key_valid: bool = Depends(get_api_key_user)
):
"""
Initiate processing of a video using API key authentication.
This endpoint accepts a video ID and initiates the processing pipeline
as a background task. It is secured with API key authentication.
Args:
request (ProcessingRequest): Request object containing the video ID and processing options
background_tasks (BackgroundTasks): FastAPI background tasks manager
db (Session): Database session dependency
api_key_valid (bool): API key validation dependency
Returns:
ProcessingStatus: Object containing the video ID and initial processing status
Example:
POST /processing/direct
X-API-Key: your-api-key
{
"video_id": "vid-12345",
"frame_rate": 5,
"language": "en",
"generate_annotated_video": true
}
"""
logger.info(f"[process_video_direct] Received request to process video: {request.video_id}")
processing_service = ProcessingService(db)
result = await processing_service.process_video(request, background_tasks)
logger.info(f"[process_video_direct] Started processing for video ID: {result.video_id}")
return result
@router.get("/direct/status/{video_id}", response_model=ProcessingStatus)
async def get_processing_status_direct(
video_id: str,
db: Session = Depends(get_db),
api_key_valid: bool = Depends(get_api_key_user)
):
"""
Get the current processing status of a video using API key authentication.
This endpoint retrieves the current status of a video processing job.
It is secured with API key authentication.
Args:
video_id (str): Unique identifier for the video processing job
db (Session): Database session dependency
api_key_valid (bool): API key validation dependency
Returns:
ProcessingStatus: Object containing the video ID and current processing status
Example:
GET /processing/direct/status/vid-12345
X-API-Key: your-api-key
"""
logger.info(f"[get_processing_status_direct] Checking status for video ID: {video_id}")
processing_service = ProcessingService(db)
try:
status = processing_service.get_processing_status(video_id)
logger.info(f"[get_processing_status_direct] Status for video ID {video_id}: {status.status}, Progress: {status.progress}")
return status
except Exception as e:
logger.error(f"[get_processing_status_direct] Error getting status: {str(e)}")
raise
@router.get("/direct/results/{video_id}")
async def get_processing_results_direct(
video_id: str,
db: Session = Depends(get_db),
api_key_valid: bool = Depends(get_api_key_user)
):
"""
Get the results of a completed video processing job using API key authentication.
This endpoint retrieves the full results of a video processing job
after it has completed. It is secured with API key authentication.
Args:
video_id (str): Unique identifier for the video processing job
db (Session): Database session dependency
api_key_valid (bool): API key validation dependency
Returns:
dict: Processing results including behavior analytics data
Example:
GET /processing/direct/results/vid-12345
X-API-Key: your-api-key
"""
logger.info(f"[get_processing_results_direct] Retrieving results for video ID: {video_id}")
processing_service = ProcessingService(db)
results = processing_service.get_processing_results(video_id)
log_success(logger, f"[get_processing_results_direct] Successfully retrieved results for video ID: {video_id}")
return results