Spaces:
Running
Running
| import asyncio | |
| from dataclasses import dataclass | |
| from backend.separator import StemSeparatorService | |
| from backend import file_manager | |
| class JobProgress: | |
| state: str = "queued" | |
| progress: float = 0.0 | |
| message: str = "Waiting in queue..." | |
| stems: dict[str, str] | None = None | |
| error: str | None = None | |
| # Shared state | |
| jobs: dict[str, JobProgress] = {} | |
| _queue: asyncio.Queue | None = None | |
| def get_queue() -> asyncio.Queue: | |
| global _queue | |
| if _queue is None: | |
| _queue = asyncio.Queue(maxsize=5) | |
| return _queue | |
| def get_job_progress(job_id: str) -> JobProgress | None: | |
| return jobs.get(job_id) | |
| async def enqueue_job(job_id: str, stems: list[str], output_format: str) -> bool: | |
| """Enqueue a separation job. Returns False if queue is full.""" | |
| q = get_queue() | |
| if q.full(): | |
| return False | |
| jobs[job_id] = JobProgress() | |
| await q.put((job_id, stems, output_format)) | |
| return True | |
| async def worker_loop(): | |
| """Single worker that processes separation jobs sequentially.""" | |
| separator = StemSeparatorService() | |
| q = get_queue() | |
| while True: | |
| job_id, stems, output_format = await q.get() | |
| try: | |
| progress = jobs.get(job_id) | |
| if progress is None: | |
| progress = JobProgress() | |
| jobs[job_id] = progress | |
| def update_progress(state: str, pct: float): | |
| progress.state = state | |
| progress.progress = pct | |
| messages = { | |
| "loading_model": "Loading BS-RoFormer model...", | |
| "separating": "Separating stems...", | |
| "finalizing": "Finalizing output files...", | |
| "done": "Separation complete!", | |
| } | |
| progress.message = messages.get(state, f"{state}...") | |
| input_file = file_manager.get_input_file(job_id) | |
| if input_file is None: | |
| progress.state = "error" | |
| progress.error = "Input file not found" | |
| progress.message = "Error: input file not found" | |
| continue | |
| output_dir = str(file_manager.get_output_dir(job_id)) | |
| # Run separation in a thread to avoid blocking the event loop | |
| loop = asyncio.get_event_loop() | |
| result = await loop.run_in_executor( | |
| None, | |
| separator.separate, | |
| str(input_file), | |
| output_dir, | |
| stems, | |
| output_format, | |
| update_progress, | |
| ) | |
| progress.state = "done" | |
| progress.progress = 1.0 | |
| progress.message = "Separation complete!" | |
| progress.stems = result | |
| except Exception as e: | |
| progress = jobs.get(job_id, JobProgress()) | |
| progress.state = "error" | |
| progress.error = str(e) | |
| progress.message = f"Error: {e}" | |
| jobs[job_id] = progress | |
| finally: | |
| q.task_done() | |