import shutil import os import requests import uuid from urllib.parse import urlparse from fastapi import UploadFile, HTTPException UPLOAD_DIR = "uploads" os.makedirs(UPLOAD_DIR, exist_ok=True) async def download_audio_from_url(url: str) -> str: try: response = requests.get(url, stream=True) response.raise_for_status() # Parse URL to get path and extension, ignoring query params parsed_url = urlparse(url) path = parsed_url.path ext = os.path.splitext(path)[1] if not ext: ext = ".mp3" # Default to mp3 if no extension found # Generate a safe filename filename = f"{uuid.uuid4()}{ext}" file_location = f"{UPLOAD_DIR}/{filename}" with open(file_location, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return file_location except Exception as e: raise HTTPException(status_code=400, detail=f"Failed to download audio: {str(e)}") async def process_audio(file: UploadFile = None, audio_url: str = None): if file: file_location = f"{UPLOAD_DIR}/{file.filename}" with open(file_location, "wb+") as file_object: shutil.copyfileobj(file.file, file_object) filename = file.filename elif audio_url: file_location = await download_audio_from_url(audio_url) filename = os.path.basename(file_location) else: raise HTTPException(status_code=400, detail="No audio file or URL provided") # In a real scenario, use librosa or pydub to get duration/sample_rate # For now, just return file size as a proxy for 'processed' metadata file_size = os.path.getsize(file_location) return { "filename": filename, "size_bytes": file_size, "format": filename.split(".")[-1], "duration_seconds": 0.0 # Placeholder }