Spaces:
Paused
Paused
| from fastapi import FastAPI, HTTPException, Response | |
| from fastapi.responses import JSONResponse | |
| from pydantic import BaseModel | |
| from audio_separator.separator import Separator | |
| import ffmpeg | |
| from datetime import datetime | |
| import logging | |
| import os | |
| import uuid | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| import asyncio | |
| from fastapi.concurrency import run_in_threadpool | |
| from concurrent.futures import ThreadPoolExecutor | |
| app = FastAPI() | |
| tmp_directory = "tmp" | |
| separator = Separator(output_dir=tmp_directory, log_level=logging.INFO) | |
| logging.getLogger().setLevel(logging.INFO) | |
| separator.load_model("UVR-MDX-NET-Inst_Main.onnx") | |
| extractionExecuter = ThreadPoolExecutor(max_workers=8) | |
| ffmpegExecuter = ThreadPoolExecutor(max_workers=8) | |
| class IsolationRequest(BaseModel): | |
| url: str | |
| start_time: float | |
| duration_seconds: float | |
| async def isolate_voice(request: IsolationRequest): | |
| media_url = request.url | |
| start_seconds = request.start_time | |
| duration_seconds = request.duration_seconds | |
| try: | |
| extracted_audio_path = f"{tmp_directory}/{uuid.uuid4()}.wav" | |
| # TODO switch to CUDA | |
| await extract_audio( | |
| media_url, start_seconds, duration_seconds, extracted_audio_path | |
| ) | |
| ( | |
| primary_stem_output_path, | |
| secondary_stem_output_path, | |
| ) = await asyncio.get_event_loop().run_in_executor( | |
| extractionExecuter, | |
| separator.separate, | |
| extracted_audio_path, | |
| ) | |
| with open(f"{tmp_directory}/{primary_stem_output_path}", "rb") as f: | |
| isolated_audio_data = f.read() | |
| except Exception as e: | |
| logging.error(f"An error occurred: {str(e)}") | |
| raise HTTPException( | |
| status_code=500, detail="An error occurred during vocal isolation" | |
| ) | |
| finally: | |
| try: | |
| os.remove(extracted_audio_path) | |
| os.remove(f"{tmp_directory}/{primary_stem_output_path}") | |
| os.remove(f"{tmp_directory}/{secondary_stem_output_path}") | |
| except OSError as e: | |
| logging.warning( | |
| f"Error occurred while cleaning up temporary files: {str(e)}" | |
| ) | |
| return Response(content=isolated_audio_data, media_type="audio/wav") | |
| async def extract_audio( | |
| media_url: str, start_seconds: float, duration_seconds: float, output_path: str | |
| ): | |
| start_time = datetime.now() | |
| await asyncio.get_event_loop().run_in_executor( | |
| ffmpegExecuter, # Uses the default executor | |
| lambda: ffmpeg.input(media_url, ss=start_seconds) | |
| .output(output_path, format="wav", t=duration_seconds) | |
| .global_args("-loglevel", "error", "-hide_banner") | |
| .global_args("-nostats") | |
| .run(), | |
| ) | |
| end_time = datetime.now() | |
| logging.info( | |
| f"Audio extraction took {(end_time - start_time).total_seconds()} seconds" | |
| ) | |
| def scrape_subtitles(video_id, translate_to, translate_from): | |
| transcript_list = YouTubeTranscriptApi.list_transcripts( | |
| video_id, | |
| ) | |
| # see if translation already exists | |
| try: | |
| return transcript_list.find_transcript([translate_to]).fetch() | |
| except: | |
| pass | |
| # find transcription in video language | |
| try: | |
| return ( | |
| transcript_list.find_transcript([translate_from]) | |
| .translate(translate_to) | |
| .fetch() | |
| ) | |
| except: | |
| pass | |
| # search for any other translatable languages | |
| for transcript in transcript_list: | |
| try: | |
| return transcript.translate(translate_to).fetch() | |
| except: | |
| continue | |
| return None | |
| def format_language_code(lang: str) -> str: | |
| mapping = { | |
| "he": "iw", | |
| "zh": "zh-Hans", | |
| "zh-TW": "zh-Hant", | |
| } | |
| return mapping.get(lang, lang.split("-")[0]) | |
| class SubtitleRequest(BaseModel): | |
| video_id: str | |
| translate_to: str | |
| translate_from: str | |
| async def get_subtitles(request: SubtitleRequest): | |
| try: | |
| subtitles = await run_in_threadpool( | |
| scrape_subtitles, | |
| request.video_id, | |
| format_language_code(request.translate_to), | |
| format_language_code(request.translate_from), | |
| ) | |
| if subtitles is None: | |
| return Response("Not available", 400) | |
| return JSONResponse(subtitles, 200) | |
| except Exception as e: | |
| logging.warn(e) | |
| raise HTTPException( | |
| status_code=500, detail="An error occurred while getting subtitles" | |
| ) | |
| # if __name__ == "__main__": | |
| # import uvicorn | |
| # uvicorn.run(app, host="0.0.0.0", port=8000) | |