| import fastapi |
| import shutil |
| import os |
| import zipfile |
| import io |
| import uvicorn |
| import threading |
| import glob |
| from typing import List |
| import torch |
| import gdown |
| from soundfile import write |
| from torchaudio import load |
| from librosa import resample |
| import logging |
|
|
| logging.basicConfig(level=logging.DEBUG) |
|
|
| from sgmse import ScoreModel |
| from sgmse.util.other import pad_spec |
|
|
|
|
| class ModelAPI: |
|
|
| def __init__(self, host, port): |
|
|
| self.host = host |
| self.port = port |
|
|
| self.base_path = os.path.join(os.path.expanduser("~"), ".modelapi") |
| self.noisy_audio_path = os.path.join(self.base_path, "noisy_audio") |
| self.enhanced_audio_path = os.path.join(self.base_path, "enhanced_audio") |
| app_dir = os.path.dirname(os.path.abspath(__file__)) |
| self.ckpt_path = glob.glob(os.path.join(app_dir, "*.ckpt"))[0] |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" |
| self.corrector = "ald" |
| self.corrector_steps = 1 |
| self.snr = 0.5 |
| self.N = 30 |
|
|
| for audio_path in [self.noisy_audio_path, self.enhanced_audio_path]: |
| if not os.path.exists(audio_path): |
| os.makedirs(audio_path) |
|
|
| for filename in os.listdir(audio_path): |
| file_path = os.path.join(audio_path, filename) |
|
|
| try: |
| if os.path.isfile(file_path) or os.path.islink(file_path): |
| os.unlink(file_path) |
| elif os.path.isdir(file_path): |
| shutil.rmtree(file_path) |
| except Exception as e: |
| raise e |
|
|
| self.app = fastapi.FastAPI() |
| self._setup_routes() |
|
|
| def _prepare(self): |
| """Miners should modify this function to fit their fine-tuned models. |
| |
| This function will make any preparations necessary to initialize the |
| speech enhancement model (i.e. downloading checkpoint files, etc.) |
| """ |
|
|
| self.model = ScoreModel.load_from_checkpoint(self.ckpt_path, self.device) |
| self.model.t_eps = 0.03 |
| self.model.eval() |
|
|
| def _enhance(self): |
| """ |
| Miners should modify this function to fit their fine-tuned models. |
| |
| This function will: |
| 1. Open each noisy .wav file |
| 2. Enhance the audio with the model |
| 3. Save the enhanced audio in .wav format to ModelAPI.enhanced_audio_path |
| """ |
|
|
| if self.model.backbone == "ncsnpp_48k": |
| target_sr = 48000 |
| pad_mode = "reflection" |
| elif self.model.backbone == "ncsnpp_v2": |
| target_sr = 16000 |
| pad_mode = "reflection" |
| print("using ncsnpp_v2") |
| else: |
| target_sr = 16000 |
| pad_mode = "zero_pad" |
|
|
| noisy_files = sorted(glob.glob(os.path.join(self.noisy_audio_path, "*.wav"))) |
| for noisy_file in noisy_files: |
|
|
| filename = noisy_file.replace(self.noisy_audio_path, "") |
| filename = filename[1:] if filename.startswith("/") else filename |
|
|
| y, sr = load(noisy_file) |
|
|
| if sr != target_sr: |
| y = torch.tensor(resample(y.numpy(), orig_sr=sr, target_sr=target_sr)) |
|
|
| T_orig = y.size(1) |
|
|
| |
| norm_factor = y.abs().max() |
| y = y / norm_factor |
|
|
| |
| Y = torch.unsqueeze( |
| self.model._forward_transform(self.model._stft(y.to(self.device))), 0 |
| ) |
| Y = pad_spec(Y, mode=pad_mode) |
|
|
| |
| if self.model.sde.__class__.__name__ == "OUVESDE": |
| if self.model.sde.sampler_type == "pc": |
| sampler = self.model.get_pc_sampler( |
| "reverse_diffusion", |
| self.corrector, |
| Y.to(self.device), |
| N=self.N, |
| corrector_steps=self.corrector_steps, |
| snr=self.snr, |
| ) |
| elif self.model.sde.sampler_type == "ode": |
| sampler = self.model.get_ode_sampler(Y.to(self.device), N=self.N) |
| else: |
| raise ValueError(f"Sampler type {args.sampler_type} not supported") |
| elif self.model.sde.__class__.__name__ == "SBVESDE": |
| sampler_type = ( |
| "ode" |
| if self.model.sde.sampler_type == "pc" |
| else self.model.sde.sampler_type |
| ) |
| sampler = self.model.get_sb_sampler( |
| sde=self.model.sde, y=Y.cuda(), sampler_type=sampler_type |
| ) |
| else: |
| raise ValueError( |
| f"SDE {self.model.sde.__class__.__name__} not supported" |
| ) |
|
|
| sample, _ = sampler() |
|
|
| x_hat = self.model.to_audio(sample.squeeze(), T_orig) |
|
|
| x_hat = x_hat * norm_factor |
|
|
| os.makedirs( |
| os.path.dirname(os.path.join(self.enhanced_audio_path, filename)), |
| exist_ok=True, |
| ) |
| write( |
| os.path.join(self.enhanced_audio_path, filename), |
| x_hat.cpu().numpy(), |
| target_sr, |
| ) |
|
|
| def _setup_routes(self): |
| self.app.get("/status/")(self.get_status) |
| self.app.post("/prepare/")(self.prepare) |
| self.app.post("/upload-audio/")(self.upload_audio) |
| self.app.post("/enhance/")(self.enhance_audio) |
| self.app.get("/download-enhanced/")(self.download_enhanced) |
| self.app.post("/reset/")(self.reset) |
|
|
| def get_status(self): |
| try: |
| return {"container_running": True} |
| except Exception as e: |
| logging.error(f"Error getting status: {e}") |
| raise fastapi.HTTPException( |
| status_code=500, detail="An error occurred while fetching API status." |
| ) |
|
|
| def prepare(self): |
| try: |
| self._prepare() |
| return {"preparations": True} |
| except Exception as e: |
| logging.error(f"Error during preparations: {e}") |
| return fastapi.HTTPException( |
| status_code=500, detail="An error occurred while fetching API status." |
| ) |
|
|
| def upload_audio(self, files: List[fastapi.UploadFile] = fastapi.File(...)): |
|
|
| uploaded_files = [] |
|
|
| for file in files: |
| try: |
| file_path = os.path.join(self.noisy_audio_path, file.filename) |
|
|
| with open(file_path, "wb") as f: |
| while contents := file.file.read(1024 * 1024): |
| f.write(contents) |
|
|
| uploaded_files.append(file.filename) |
|
|
| except Exception as e: |
| logging.error(f"Error uploading files: {e}") |
| raise fastapi.HTTPException( |
| status_code=500, |
| detail="An error occurred while uploading the noisy files.", |
| ) |
| finally: |
| file.file.close() |
|
|
| print(f"uploaded files: {uploaded_files}") |
|
|
| return {"uploaded_files": uploaded_files, "status": True} |
|
|
| def enhance_audio(self): |
| try: |
| |
| self._enhance() |
| |
| wav_files = glob.glob(os.path.join(self.enhanced_audio_path, "*.wav")) |
| |
| enhanced_files = [os.path.basename(file) for file in wav_files] |
| return {"status": True} |
|
|
| except Exception as e: |
| print(f"Exception occured during enhancement: {e}") |
| raise fastapi.HTTPException( |
| status_code=500, |
| detail="An error occurred while enhancing the noisy files.", |
| ) |
|
|
| def download_enhanced(self): |
| try: |
| zip_buffer = io.BytesIO() |
|
|
| with zipfile.ZipFile(zip_buffer, "w") as zip_file: |
| for wav_file in glob.glob( |
| os.path.join(self.enhanced_audio_path, "*.wav") |
| ): |
| zip_file.write(wav_file, arcname=os.path.basename(wav_file)) |
| zip_buffer.seek(0) |
|
|
| return fastapi.responses.StreamingResponse( |
| iter([zip_buffer.getvalue()]), |
| media_type="application/zip", |
| headers={ |
| "Content-Disposition": "attachment; filename=enhanced_audio_files.zip" |
| }, |
| ) |
|
|
| except Exception as e: |
| logging.error(f"Error during enhanced files download: {e}") |
| raise fastapi.HTTPException( |
| status_code=500, |
| detail=f"An error occurred while creating the download file: {str(e)}", |
| ) |
|
|
| def reset(self): |
| """ |
| Removes all audio files in preparation for another batch of enhancement. |
| """ |
| for directory in [self.noisy_audio_path, self.enhanced_audio_path]: |
| if not os.path.isdir(directory): |
| continue |
|
|
| for filename in os.listdir(directory): |
| filepath = os.path.join(directory, filename) |
| if os.path.isfile(filepath): |
| try: |
| os.remove(filepath) |
| except Exception as e: |
| print(f"Error removing {filepath}: {e}") |
| return { |
| "status": False, |
| "noisy": os.listdir(self.noisy_audio_path), |
| "enhanced": os.listdir(self.enhanced_audio_path), |
| } |
| return { |
| "status": True, |
| "noisy": os.listdir(self.noisy_audio_path), |
| "enhanced": os.listdir(self.enhanced_audio_path), |
| } |
|
|
| def run(self): |
|
|
| uvicorn.run(self.app, host=self.host, port=self.port) |
|
|