| | import fastapi
|
| | import shutil
|
| | import os
|
| | import zipfile
|
| | import io
|
| | import uvicorn
|
| | import threading
|
| | import glob
|
| | from typing import List
|
| | import torch
|
| | import gdown
|
| | from soundfile import write
|
| | from torchaudio import load
|
| | from librosa import resample
|
| | import logging
|
| |
|
| | logging.basicConfig(level=logging.DEBUG)
|
| |
|
| | from sgmse import ScoreModel
|
| | from sgmse.util.other import pad_spec
|
| |
|
| |
|
| | class ModelAPI:
|
| |
|
| | def __init__(self, host, port):
|
| |
|
| | self.host = host
|
| | self.port = port
|
| |
|
| | self.base_path = os.path.join(os.path.expanduser("~"), ".modelapi")
|
| | self.noisy_audio_path = os.path.join(self.base_path, "noisy_audio")
|
| | self.enhanced_audio_path = os.path.join(self.base_path, "enhanced_audio")
|
| | app_dir = os.path.dirname(os.path.abspath(__file__))
|
| | self.ckpt_path = glob.glob(os.path.join(app_dir, "*.ckpt"))[0]
|
| | self.device = "cuda" if torch.cuda.is_available() else "cpu"
|
| | self.corrector = "ald"
|
| | self.corrector_steps = 1
|
| | self.snr = 0.5
|
| | self.N = 30
|
| |
|
| | for audio_path in [self.noisy_audio_path, self.enhanced_audio_path]:
|
| | if not os.path.exists(audio_path):
|
| | os.makedirs(audio_path)
|
| |
|
| | for filename in os.listdir(audio_path):
|
| | file_path = os.path.join(audio_path, filename)
|
| |
|
| | try:
|
| | if os.path.isfile(file_path) or os.path.islink(file_path):
|
| | os.unlink(file_path)
|
| | elif os.path.isdir(file_path):
|
| | shutil.rmtree(file_path)
|
| | except Exception as e:
|
| | raise e
|
| |
|
| | self.app = fastapi.FastAPI()
|
| | self._setup_routes()
|
| |
|
| | def _prepare(self):
|
| | """Miners should modify this function to fit their fine-tuned models.
|
| |
|
| | This function will make any preparations necessary to initialize the
|
| | speech enhancement model (i.e. downloading checkpoint files, etc.)
|
| | """
|
| |
|
| | self.model = ScoreModel.load_from_checkpoint(self.ckpt_path, self.device)
|
| | self.model.t_eps = 0.03
|
| | self.model.eval()
|
| |
|
| | def _enhance(self):
|
| | """
|
| | Miners should modify this function to fit their fine-tuned models.
|
| |
|
| | This function will:
|
| | 1. Open each noisy .wav file
|
| | 2. Enhance the audio with the model
|
| | 3. Save the enhanced audio in .wav format to ModelAPI.enhanced_audio_path
|
| | """
|
| |
|
| | if self.model.backbone == "ncsnpp_48k":
|
| | target_sr = 48000
|
| | pad_mode = "reflection"
|
| | elif self.model.backbone == "ncsnpp_v2":
|
| | target_sr = 16000
|
| | pad_mode = "reflection"
|
| | print("using ncsnpp_v2")
|
| | else:
|
| | target_sr = 16000
|
| | pad_mode = "zero_pad"
|
| |
|
| | noisy_files = sorted(glob.glob(os.path.join(self.noisy_audio_path, "*.wav")))
|
| | for noisy_file in noisy_files:
|
| |
|
| | filename = noisy_file.replace(self.noisy_audio_path, "")
|
| | filename = filename[1:] if filename.startswith("/") else filename
|
| |
|
| | y, sr = load(noisy_file)
|
| |
|
| | if sr != target_sr:
|
| | y = torch.tensor(resample(y.numpy(), orig_sr=sr, target_sr=target_sr))
|
| |
|
| | T_orig = y.size(1)
|
| |
|
| |
|
| | norm_factor = y.abs().max()
|
| | y = y / norm_factor
|
| |
|
| |
|
| | Y = torch.unsqueeze(
|
| | self.model._forward_transform(self.model._stft(y.to(self.device))), 0
|
| | )
|
| | Y = pad_spec(Y, mode=pad_mode)
|
| |
|
| |
|
| | if self.model.sde.__class__.__name__ == "OUVESDE":
|
| | if self.model.sde.sampler_type == "pc":
|
| | sampler = self.model.get_pc_sampler(
|
| | "reverse_diffusion",
|
| | self.corrector,
|
| | Y.to(self.device),
|
| | N=self.N,
|
| | corrector_steps=self.corrector_steps,
|
| | snr=self.snr,
|
| | )
|
| | elif self.model.sde.sampler_type == "ode":
|
| | sampler = self.model.get_ode_sampler(Y.to(self.device), N=self.N)
|
| | else:
|
| | raise ValueError(f"Sampler type {args.sampler_type} not supported")
|
| | elif self.model.sde.__class__.__name__ == "SBVESDE":
|
| | sampler_type = (
|
| | "ode"
|
| | if self.model.sde.sampler_type == "pc"
|
| | else self.model.sde.sampler_type
|
| | )
|
| | sampler = self.model.get_sb_sampler(
|
| | sde=self.model.sde, y=Y.cuda(), sampler_type=sampler_type
|
| | )
|
| | else:
|
| | raise ValueError(
|
| | f"SDE {self.model.sde.__class__.__name__} not supported"
|
| | )
|
| |
|
| | sample, _ = sampler()
|
| |
|
| | x_hat = self.model.to_audio(sample.squeeze(), T_orig)
|
| |
|
| | x_hat = x_hat * norm_factor
|
| |
|
| | os.makedirs(
|
| | os.path.dirname(os.path.join(self.enhanced_audio_path, filename)),
|
| | exist_ok=True,
|
| | )
|
| | write(
|
| | os.path.join(self.enhanced_audio_path, filename),
|
| | x_hat.cpu().numpy(),
|
| | target_sr,
|
| | )
|
| |
|
| | def _setup_routes(self):
|
| | self.app.get("/status/")(self.get_status)
|
| | self.app.post("/prepare/")(self.prepare)
|
| | self.app.post("/upload-audio/")(self.upload_audio)
|
| | self.app.post("/enhance/")(self.enhance_audio)
|
| | self.app.get("/download-enhanced/")(self.download_enhanced)
|
| | self.app.post("/reset/")(self.reset)
|
| |
|
| | def get_status(self):
|
| | try:
|
| | return {"container_running": True}
|
| | except Exception as e:
|
| | logging.error(f"Error getting status: {e}")
|
| | raise fastapi.HTTPException(
|
| | status_code=500, detail="An error occurred while fetching API status."
|
| | )
|
| |
|
| | def prepare(self):
|
| | try:
|
| | self._prepare()
|
| | return {"preparations": True}
|
| | except Exception as e:
|
| | logging.error(f"Error during preparations: {e}")
|
| | return fastapi.HTTPException(
|
| | status_code=500, detail="An error occurred while fetching API status."
|
| | )
|
| |
|
| | def upload_audio(self, files: List[fastapi.UploadFile] = fastapi.File(...)):
|
| |
|
| | uploaded_files = []
|
| |
|
| | for file in files:
|
| | try:
|
| | file_path = os.path.join(self.noisy_audio_path, file.filename)
|
| |
|
| | with open(file_path, "wb") as f:
|
| | while contents := file.file.read(1024 * 1024):
|
| | f.write(contents)
|
| |
|
| | uploaded_files.append(file.filename)
|
| |
|
| | except Exception as e:
|
| | logging.error(f"Error uploading files: {e}")
|
| | raise fastapi.HTTPException(
|
| | status_code=500,
|
| | detail="An error occurred while uploading the noisy files.",
|
| | )
|
| | finally:
|
| | file.file.close()
|
| |
|
| | print(f"uploaded files: {uploaded_files}")
|
| |
|
| | return {"uploaded_files": uploaded_files, "status": True}
|
| |
|
| | def enhance_audio(self):
|
| | try:
|
| |
|
| | self._enhance()
|
| |
|
| | wav_files = glob.glob(os.path.join(self.enhanced_audio_path, "*.wav"))
|
| |
|
| | enhanced_files = [os.path.basename(file) for file in wav_files]
|
| | return {"status": True}
|
| |
|
| | except Exception as e:
|
| | print(f"Exception occured during enhancement: {e}")
|
| | raise fastapi.HTTPException(
|
| | status_code=500,
|
| | detail="An error occurred while enhancing the noisy files.",
|
| | )
|
| |
|
| | def download_enhanced(self):
|
| | try:
|
| | zip_buffer = io.BytesIO()
|
| |
|
| | with zipfile.ZipFile(zip_buffer, "w") as zip_file:
|
| | for wav_file in glob.glob(
|
| | os.path.join(self.enhanced_audio_path, "*.wav")
|
| | ):
|
| | zip_file.write(wav_file, arcname=os.path.basename(wav_file))
|
| | zip_buffer.seek(0)
|
| |
|
| | return fastapi.responses.StreamingResponse(
|
| | iter([zip_buffer.getvalue()]),
|
| | media_type="application/zip",
|
| | headers={
|
| | "Content-Disposition": "attachment; filename=enhanced_audio_files.zip"
|
| | },
|
| | )
|
| |
|
| | except Exception as e:
|
| | logging.error(f"Error during enhanced files download: {e}")
|
| | raise fastapi.HTTPException(
|
| | status_code=500,
|
| | detail=f"An error occurred while creating the download file: {str(e)}",
|
| | )
|
| |
|
| | def reset(self):
|
| | """
|
| | Removes all audio files in preparation for another batch of enhancement.
|
| | """
|
| | for directory in [self.noisy_audio_path, self.enhanced_audio_path]:
|
| | if not os.path.isdir(directory):
|
| | continue
|
| |
|
| | for filename in os.listdir(directory):
|
| | filepath = os.path.join(directory, filename)
|
| | if os.path.isfile(filepath):
|
| | try:
|
| | os.remove(filepath)
|
| | except Exception as e:
|
| | print(f"Error removing {filepath}: {e}")
|
| | return {
|
| | "status": False,
|
| | "noisy": os.listdir(self.noisy_audio_path),
|
| | "enhanced": os.listdir(self.enhanced_audio_path),
|
| | }
|
| | return {
|
| | "status": True,
|
| | "noisy": os.listdir(self.noisy_audio_path),
|
| | "enhanced": os.listdir(self.enhanced_audio_path),
|
| | }
|
| |
|
| | def run(self):
|
| |
|
| | uvicorn.run(self.app, host=self.host, port=self.port)
|
| |
|