Spaces:
Sleeping
Sleeping
| """Use case for checking job status.""" | |
| from typing import Optional, Any | |
| import logging | |
| from domain.exceptions.domain_exceptions import JobNotFoundError | |
| from ..dto.extraction_response import JobStatusDTO | |
| logger = logging.getLogger(__name__) | |
| class JobRepository: | |
| """Protocol for job repository.""" | |
| async def get(self, job_id: str) -> Optional[Any]: | |
| ... | |
| class CheckJobStatusUseCase: | |
| """Use case for checking job status.""" | |
| def __init__(self, job_repository: JobRepository): | |
| self.job_repository = job_repository | |
| async def execute(self, job_id: str) -> JobStatusDTO: | |
| """Check the status of a job.""" | |
| # Get job from repository | |
| job_record = await self.job_repository.get(job_id) | |
| if not job_record: | |
| raise JobNotFoundError(job_id) | |
| # Calculate processing time | |
| processing_time = None | |
| if job_record.processing_time: | |
| processing_time = job_record.processing_time | |
| elif job_record.status == "processing": | |
| processing_time = (job_record.updated_at - job_record.created_at).total_seconds() | |
| # Create DTO | |
| return JobStatusDTO( | |
| job_id=job_record.id, | |
| external_job_id=job_record.external_job_id, | |
| status=job_record.status, | |
| created_at=job_record.created_at, | |
| updated_at=job_record.updated_at, | |
| filename=job_record.filename, | |
| file_size_mb=job_record.file_size_mb, | |
| output_format=job_record.output_format, | |
| quality=job_record.quality, | |
| processing_time=processing_time, | |
| error=job_record.error, | |
| download_url=f"/api/v1/jobs/{job_id}/download" if job_record.status == "completed" else None | |
| ) |