Spaces:
Running
Running
| # Audio utilities: ffmpeg, normalization, etc. | |
| from asyncio.log import logger | |
| import subprocess | |
| import shlex | |
| import uuid | |
| import requests | |
| from pathlib import Path | |
| import soundfile as sf | |
| from app.config.settings import TMP_DIR, MAX_UPLOAD_BYTES, CLOUDINARY_API_KEY, CLOUDINARY_API_SECRET, CLOUDINARY_CLOUD_NAME | |
| import cloudinary | |
| import cloudinary.uploader | |
| import os | |
| def save_upload_file(upload_file, dest_path: str): | |
| """Save FastAPI UploadFile to dest_path (streaming).""" | |
| with open(dest_path, "wb") as f: | |
| while True: | |
| chunk = upload_file.file.read(1024 * 1024) | |
| if not chunk: | |
| break | |
| f.write(chunk) | |
| def download_file_from_url(url: str, dest_path: str, timeout=30): | |
| """Download remote file to dest_path with size limit.""" | |
| r = requests.get(url, stream=True, timeout=timeout) | |
| r.raise_for_status() | |
| total = 0 | |
| with open(dest_path, "wb") as f: | |
| for chunk in r.iter_content(chunk_size=8192): | |
| if chunk: | |
| total += len(chunk) | |
| if total > MAX_UPLOAD_BYTES: | |
| raise ValueError("Remote file too large") | |
| f.write(chunk) | |
| def get_audio_info(path: str): | |
| """Return duration (s), sample_rate, channels using soundfile.""" | |
| try: | |
| info = sf.info(path) | |
| duration = info.frames / info.samplerate | |
| return { | |
| "duration": duration, | |
| "samplerate": info.samplerate, | |
| "channels": info.channels, | |
| } | |
| except Exception: | |
| return None | |
| def ensure_wav_16k_mono(src_path: str, dest_path: str): | |
| """ | |
| Convert any audio to WAV PCM16, 16kHz, mono using ffmpeg. | |
| """ | |
| cmd = ( | |
| f'ffmpeg -v error -y -i "{src_path}" ' | |
| f'-ar 16000 -ac 1 -acodec pcm_s16le "{dest_path}"' | |
| ) | |
| proc = subprocess.run( | |
| shlex.split(cmd), | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| ) | |
| if proc.returncode != 0: | |
| raise RuntimeError( | |
| f"ffmpeg convert failed: {proc.stderr.decode(errors='ignore')}" | |
| ) | |
| return dest_path | |
| def make_temp_path(suffix=".wav"): | |
| """Generate unique temp file path under TMP_DIR.""" | |
| return str(Path(TMP_DIR) / f"{uuid.uuid4().hex}{suffix}") | |
| # init once | |
| cloudinary.config( | |
| cloud_name=CLOUDINARY_CLOUD_NAME, | |
| api_key=CLOUDINARY_API_KEY, | |
| api_secret=CLOUDINARY_API_SECRET, | |
| secure=True, | |
| ) | |
| def upload_temp_audio( | |
| local_path: str, | |
| *, | |
| folder: str = "asr_uploads", | |
| public_id: str | None = None, | |
| ttl: int = 3600, | |
| ) -> str: | |
| """ | |
| Upload audio file to Cloudinary and return public URL. | |
| File can be safely deleted locally after upload. | |
| """ | |
| if not os.path.exists(local_path): | |
| raise FileNotFoundError(local_path) | |
| logger.info("Uploading audio to Cloudinary: %s", local_path) | |
| result = cloudinary.uploader.upload( | |
| local_path, | |
| resource_type="video", # ⚠️ audio MUST use video | |
| folder=folder, | |
| public_id=public_id, | |
| overwrite=True, | |
| invalidate=True, | |
| ) | |
| url = result.get("secure_url") | |
| if not url: | |
| raise RuntimeError("Cloudinary upload failed") | |
| logger.info("Uploaded audio -> %s", url) | |
| return url |