Spaces:
Sleeping
Sleeping
| """Audio extraction API routes.""" | |
| from fastapi import APIRouter, BackgroundTasks, UploadFile, Form, HTTPException | |
| from fastapi.responses import JSONResponse | |
| from dataclasses import asdict | |
| from typing import Optional | |
| from ..dependencies import ValidatedVideo, ExtractionParams, UseCases, Services, BearerToken | |
| from ..responses import JobCreatedResponse | |
| from application.dto.extraction_request import ExtractionRequestDTO | |
| from domain.entities.job import Job | |
| from domain.exceptions.domain_exceptions import ( | |
| ValidationError, | |
| DuplicateExternalJobIdError, | |
| InvalidExternalJobIdFormatError | |
| ) | |
| router = APIRouter() | |
| async def extract_audio( | |
| background_tasks: BackgroundTasks, | |
| video: ValidatedVideo, | |
| params: ExtractionParams, | |
| token: BearerToken, | |
| use_cases: UseCases, | |
| services: Services, | |
| job_id: Optional[str] = Form(None, description="Optional external job identifier") | |
| ): | |
| """ | |
| Extract audio from uploaded video file. | |
| All videos are processed asynchronously regardless of file size. | |
| Returns a job ID that can be used to: | |
| - Check processing status: GET /jobs/{job_id} | |
| - Download result when complete: GET /jobs/{job_id}/download | |
| """ | |
| # Validate external job ID format if provided | |
| if job_id: | |
| try: | |
| use_cases.validation_service.validate_external_job_id(job_id) | |
| except InvalidExternalJobIdFormatError as e: | |
| raise HTTPException( | |
| status_code=400, | |
| detail={ | |
| "error": "Invalid external job ID format", | |
| "details": str(e), | |
| "code": "INVALID_EXTERNAL_JOB_ID_FORMAT", | |
| "field": "job_id", | |
| "value": e.job_id | |
| } | |
| ) | |
| # Get file size | |
| file_size = _get_file_size(video) | |
| # Create job with external ID and bearer token | |
| try: | |
| job = Job.create_new( | |
| video_filename=video.filename, | |
| file_size_bytes=file_size, | |
| output_format=params.output_format, | |
| quality=params.quality, | |
| external_job_id=job_id, | |
| bearer_token=token | |
| ) | |
| except Exception as e: | |
| # This shouldn't happen with Job.create_new, but catch any unexpected errors | |
| raise HTTPException(status_code=500, detail=f"Failed to create job: {str(e)}") | |
| # Save uploaded file with the job ID | |
| file_path = await services.file_repository.save_stream( | |
| video, | |
| video.filename, | |
| job_id=job.id # Pass the job ID here | |
| ) | |
| # Create DTO | |
| extraction_dto = ExtractionRequestDTO( | |
| video_filename=video.filename, | |
| video_file_path=file_path, | |
| video_file_size=file_size, | |
| output_format=params.output_format, | |
| quality=params.quality, | |
| content_type=video.content_type | |
| ) | |
| # Execute use case with pre-created job | |
| try: | |
| result = await use_cases.extract_audio_async.execute_with_job( | |
| job, # Pass the job object | |
| extraction_dto, | |
| background_tasks | |
| ) | |
| return JSONResponse( | |
| content=asdict(result), | |
| status_code=202 | |
| ) | |
| except DuplicateExternalJobIdError as e: | |
| # Clean up input file on error | |
| await services.file_repository.delete_file(file_path) | |
| raise HTTPException( | |
| status_code=400, | |
| detail={ | |
| "error": str(e), | |
| "code": "DUPLICATE_EXTERNAL_JOB_ID", | |
| "external_job_id": e.external_job_id | |
| } | |
| ) | |
| except Exception as e: | |
| # Clean up input file on error | |
| await services.file_repository.delete_file(file_path) | |
| raise | |
| def _get_file_size(video: UploadFile) -> int: | |
| """Get the size of the uploaded file.""" | |
| video.file.seek(0, 2) # Seek to end | |
| size = video.file.tell() | |
| video.file.seek(0) # Reset pointer to start | |
| return size |