Spaces:
Running
Running
| import functools | |
| import numpy as np | |
| from fastapi import ( | |
| File, | |
| UploadFile, | |
| ) | |
| import gradio as gr | |
| from fastapi import APIRouter, BackgroundTasks, Depends, Response, status | |
| from fastapi.responses import FileResponse | |
| from typing import List, Dict, Tuple | |
| from datetime import datetime | |
| import os | |
| from modules.whisper.data_classes import * | |
| from modules.uvr.music_separator import MusicSeparator | |
| from modules.utils.paths import BACKEND_CACHE_DIR | |
| from backend.common.audio import read_audio | |
| from backend.common.models import QueueResponse | |
| from backend.common.config_loader import load_server_config | |
| from backend.common.compresser import get_file_hash, find_file_by_hash | |
| from backend.db.task.models import TaskStatus, TaskType, ResultType | |
| from backend.db.task.dao import add_task_to_db, update_task_status_in_db | |
| from .models import BGMSeparationResult | |
| bgm_separation_router = APIRouter(prefix="/bgm-separation", tags=["BGM Separation"]) | |
| def get_bgm_separation_inferencer() -> 'MusicSeparator': | |
| config = load_server_config()["bgm_separation"] | |
| inferencer = MusicSeparator( | |
| output_dir=os.path.join(BACKEND_CACHE_DIR, "UVR") | |
| ) | |
| inferencer.update_model( | |
| model_name=config["model_size"], | |
| device=config["device"] | |
| ) | |
| return inferencer | |
| def run_bgm_separation( | |
| audio: np.ndarray, | |
| params: BGMSeparationParams, | |
| identifier: str, | |
| ) -> Tuple[np.ndarray, np.ndarray]: | |
| update_task_status_in_db( | |
| identifier=identifier, | |
| update_data={ | |
| "uuid": identifier, | |
| "status": TaskStatus.IN_PROGRESS, | |
| "updated_at": datetime.utcnow() | |
| } | |
| ) | |
| start_time = datetime.utcnow() | |
| instrumental, vocal, filepaths = get_bgm_separation_inferencer().separate( | |
| audio=audio, | |
| model_name=params.uvr_model_size, | |
| device=params.uvr_device, | |
| segment_size=params.segment_size, | |
| save_file=True, | |
| progress=gr.Progress() | |
| ) | |
| instrumental_path, vocal_path = filepaths | |
| elapsed_time = (datetime.utcnow() - start_time).total_seconds() | |
| update_task_status_in_db( | |
| identifier=identifier, | |
| update_data={ | |
| "uuid": identifier, | |
| "status": TaskStatus.COMPLETED, | |
| "result": BGMSeparationResult( | |
| instrumental_hash=get_file_hash(instrumental_path), | |
| vocal_hash=get_file_hash(vocal_path) | |
| ).model_dump(), | |
| "result_type": ResultType.FILEPATH, | |
| "updated_at": datetime.utcnow(), | |
| "duration": elapsed_time | |
| } | |
| ) | |
| return instrumental, vocal | |
| async def bgm_separation( | |
| background_tasks: BackgroundTasks, | |
| file: UploadFile = File(..., description="Audio or video file to separate background music."), | |
| params: BGMSeparationParams = Depends() | |
| ) -> QueueResponse: | |
| if not isinstance(file, np.ndarray): | |
| audio, info = await read_audio(file=file) | |
| else: | |
| audio, info = file, None | |
| identifier = add_task_to_db( | |
| status=TaskStatus.QUEUED, | |
| file_name=file.filename, | |
| audio_duration=info.duration if info else None, | |
| task_type=TaskType.BGM_SEPARATION, | |
| task_params=params.model_dump(), | |
| ) | |
| background_tasks.add_task( | |
| run_bgm_separation, | |
| audio=audio, | |
| params=params, | |
| identifier=identifier | |
| ) | |
| return QueueResponse(identifier=identifier, status=TaskStatus.QUEUED, message="BGM Separation task has queued") | |