| | import fastapi |
| | import shutil |
| | import os |
| | import zipfile |
| | import io |
| | import uvicorn |
| | import glob |
| | from typing import List |
| | import torch |
| | import gdown |
| | from soundfile import write |
| | from torchaudio import load |
| | from librosa import resample |
| | import logging |
| | logging.basicConfig(level=logging.DEBUG) |
| |
|
| | from sgmse import ScoreModel |
| | from sgmse.util.other import pad_spec |
| |
|
| | class ModelAPI: |
| | |
| | def __init__(self, host, port): |
| | |
| | self.host = host |
| | self.port = port |
| | |
| | self.base_path = os.path.join(os.path.expanduser("~"), ".modelapi") |
| | self.noisy_audio_path = os.path.join(self.base_path, "noisy_audio") |
| | self.enhanced_audio_path = os.path.join(self.base_path, "enhanced_audio") |
| | app_dir = os.path.dirname(os.path.abspath(__file__)) |
| | |
| | ckpt_files = glob.glob(os.path.join(app_dir, "*.ckpt")) |
| |
|
| | if not ckpt_files: |
| | raise FileNotFoundError("No .ckpt file found in app_dir.") |
| | elif len(ckpt_files) > 1: |
| | raise RuntimeError("Multiple .ckpt files found in app_dir. Please keep only one.") |
| | else: |
| | self.ckpt_path = ckpt_files[0] |
| | |
| | self.device = 'cuda' if torch.cuda.is_available() else 'cpu' |
| | self.corrector = "ald" |
| | self.corrector_steps = 1 |
| | self.snr = 0.5 |
| | self.N = 30 |
| |
|
| | |
| | for audio_path in [self.noisy_audio_path, self.enhanced_audio_path]: |
| | if not os.path.exists(audio_path): |
| | os.makedirs(audio_path) |
| | |
| | |
| | for filename in os.listdir(audio_path): |
| | file_path = os.path.join(audio_path, filename) |
| | |
| | |
| | try: |
| | if os.path.isfile(file_path) or os.path.islink(file_path): |
| | os.unlink(file_path) |
| | elif os.path.isdir(file_path): |
| | shutil.rmtree(file_path) |
| | except Exception as e: |
| | raise e |
| | |
| | self.app = fastapi.FastAPI() |
| | self._setup_routes() |
| | |
| | def _prepare(self): |
| | """Miners should modify this function to fit their fine-tuned models. |
| | |
| | This function will make any preparations necessary to initialize the |
| | speech enhancement model (i.e. downloading checkpoint files, etc.) |
| | """ |
| | |
| | self.model = ScoreModel.load_from_checkpoint(self.ckpt_path, self.device) |
| | self.model.t_eps = 0.03 |
| | self.model.eval() |
| | def _enhance(self): |
| | """ |
| | Miners should modify this function to fit their fine-tuned models. |
| | |
| | This function will: |
| | 1. Open each noisy .wav file |
| | 2. Enhance the audio with the model |
| | 3. Save the enhanced audio in .wav format to MinerAPI.enhanced_audio_path |
| | """ |
| | |
| | |
| | if self.model.backbone == 'ncsnpp_48k': |
| | target_sr = 48000 |
| | pad_mode = "reflection" |
| | elif self.model.backbone == 'ncsnpp_v2': |
| | target_sr = 16000 |
| | pad_mode = "reflection" |
| | else: |
| | target_sr = 16000 |
| | pad_mode = "zero_pad" |
| | |
| | |
| | noisy_files = sorted(glob.glob(os.path.join(self.noisy_audio_path, '*.wav'))) |
| | for noisy_file in noisy_files: |
| | |
| | filename = noisy_file.replace(self.noisy_audio_path, "") |
| | filename = filename[1:] if filename.startswith("/") else filename |
| |
|
| | |
| | y, sr = load(noisy_file) |
| | |
| | if sr != target_sr: |
| | y = torch.tensor(resample(y.numpy(), orig_sr=sr, target_sr=target_sr)) |
| |
|
| | T_orig = y.size(1) |
| | |
| | norm_factor = y.abs().max() |
| | y = y / norm_factor |
| | |
| | Y = torch.unsqueeze(self.model._forward_transform(self.model._stft(y.to(self.device))), 0) |
| | Y = pad_spec(Y, mode=pad_mode) |
| | |
| | |
| | if self.model.sde.__class__.__name__ == 'OUVESDE': |
| | if self.model.sde.sampler_type == 'pc': |
| | sampler = self.model.get_pc_sampler('reverse_diffusion', self.corrector, Y.to(self.device), N=self.N, |
| | corrector_steps=self.corrector_steps, snr=self.snr) |
| | elif self.model.sde.sampler_type == 'ode': |
| | sampler = self.model.get_ode_sampler(Y.to(self.device), N=self.N) |
| | else: |
| | raise ValueError(f"Sampler type {self.model.sde.sampler_type} not supported") |
| | elif self.model.sde.__class__.__name__ == 'SBVESDE': |
| | sampler_type = 'ode' if self.model.sde.sampler_type == 'pc' else self.model.sde.sampler_type |
| | sampler = self.model.get_sb_sampler(sde=self.model.sde, y=Y.cuda(), sampler_type=sampler_type) |
| | else: |
| | raise ValueError(f"SDE {self.model.sde.__class__.__name__} not supported") |
| | sample, _ = sampler() |
| | |
| | |
| | x_hat = self.model.to_audio(sample.squeeze(), T_orig) |
| | |
| | |
| | x_hat = x_hat * norm_factor |
| | |
| | |
| | os.makedirs(os.path.dirname(os.path.join(self.enhanced_audio_path, filename)), exist_ok=True) |
| | write(os.path.join(self.enhanced_audio_path, filename), x_hat.cpu().numpy(), target_sr) |
| | |
| | def _setup_routes(self): |
| | """ |
| | Setup API routes: |
| | |
| | /status/ : Communicates API status |
| | /upload-audio/ : Upload audio files, save to noisy audio directory |
| | /enhance/ : Enhance audio files, save to enhanced audio directory |
| | /download-enhanced/ : Download enhanced audio files |
| | /reset/ : Reset noisy and enhanced file cache |
| | """ |
| | self.app.get("/status/")(self.get_status) |
| | self.app.post("/prepare/")(self.prepare) |
| | self.app.post("/upload-audio/")(self.upload_audio) |
| | self.app.post("/enhance/")(self.enhance_audio) |
| | self.app.get("/download-enhanced/")(self.download_enhanced) |
| | self.app.post("/reset/")(self.reset) |
| | |
| | def get_status(self): |
| | try: |
| | return {"container_running": True} |
| | except Exception as e: |
| | logging.error(f"Error getting status: {e}") |
| | raise fastapi.HTTPException(status_code=500, detail="An error occurred while fetching API status.") |
| | |
| | def prepare(self): |
| | try: |
| | self._prepare() |
| | return {'preparations': True} |
| | except Exception as e: |
| | logging.error(f"Error during preparations: {e}") |
| | return fastapi.HTTPException(status_code=500, detail="An error occurred while fetching API status.") |
| | |
| | def upload_audio(self, files: List[fastapi.UploadFile] = fastapi.File(...)): |
| | |
| | uploaded_files = [] |
| | |
| | for file in files: |
| | try: |
| | |
| | file_path = os.path.join(self.noisy_audio_path, file.filename) |
| | |
| | |
| | with open(file_path, "wb") as f: |
| | while contents := file.file.read(1024*1024): |
| | f.write(contents) |
| | |
| | |
| | uploaded_files.append(file.filename) |
| | |
| | except Exception as e: |
| | logging.error(f"Error uploading files: {e}") |
| | raise fastapi.HTTPException(status_code=500, detail="An error occurred while uploading the noisy files.") |
| | finally: |
| | file.file.close() |
| | |
| | print(f"uploaded files: {uploaded_files}") |
| | |
| | return {"uploaded_files": uploaded_files, "status": True} |
| |
|
| | def enhance_audio(self): |
| | try: |
| | |
| | self._enhance() |
| | |
| | wav_files = glob.glob(os.path.join(self.enhanced_audio_path, '*.wav')) |
| | |
| | enhanced_files = [os.path.basename(file) for file in wav_files] |
| | return {"status": True} |
| | |
| | except Exception as e: |
| | print(f"Exception occured during enhancement: {e}") |
| | raise fastapi.HTTPException(status_code=500, detail="An error occurred while enhancing the noisy files.") |
| | |
| | def download_enhanced(self): |
| | try: |
| | |
| | zip_buffer = io.BytesIO() |
| |
|
| | with zipfile.ZipFile(zip_buffer, "w") as zip_file: |
| | |
| | for wav_file in glob.glob(os.path.join(self.enhanced_audio_path, '*.wav')): |
| | zip_file.write(wav_file, arcname=os.path.basename(wav_file)) |
| |
|
| | |
| | zip_buffer.seek(0) |
| |
|
| | |
| | return fastapi.responses.StreamingResponse( |
| | iter([zip_buffer.getvalue()]), |
| | media_type="application/zip", |
| | headers={"Content-Disposition": "attachment; filename=enhanced_audio_files.zip"} |
| | ) |
| |
|
| | except Exception as e: |
| | logging.error(f"Error during enhanced files download: {e}") |
| | raise fastapi.HTTPException(status_code=500, detail=f"An error occurred while creating the download file: {str(e)}") |
| | |
| | def reset(self): |
| | """ |
| | Removes all audio files in preparation for another batch of enhancement. |
| | """ |
| | for directory in [self.noisy_audio_path, self.enhanced_audio_path]: |
| | if not os.path.isdir(directory): |
| | continue |
| |
|
| | for filename in os.listdir(directory): |
| | filepath = os.path.join(directory, filename) |
| | if os.path.isfile(filepath): |
| | try: |
| | os.remove(filepath) |
| | except Exception as e: |
| | print(f"Error removing {filepath}: {e}") |
| | return {"status": False, "noisy": os.listdir(self.noisy_audio_path), "enhanced": os.listdir(self.enhanced_audio_path)} |
| | return {"status": True, "noisy": os.listdir(self.noisy_audio_path), "enhanced": os.listdir(self.enhanced_audio_path)} |
| |
|
| | def run(self): |
| | |
| | uvicorn.run(self.app, host=self.host, port=self.port) |