Spaces:
Running
Running
| import shutil | |
| import os | |
| import requests | |
| import uuid | |
| from urllib.parse import urlparse | |
| from fastapi import UploadFile, HTTPException | |
| UPLOAD_DIR = "uploads" | |
| os.makedirs(UPLOAD_DIR, exist_ok=True) | |
| async def download_audio_from_url(url: str) -> str: | |
| try: | |
| response = requests.get(url, stream=True) | |
| response.raise_for_status() | |
| # Parse URL to get path and extension, ignoring query params | |
| parsed_url = urlparse(url) | |
| path = parsed_url.path | |
| ext = os.path.splitext(path)[1] | |
| if not ext: | |
| ext = ".mp3" # Default to mp3 if no extension found | |
| # Generate a safe filename | |
| filename = f"{uuid.uuid4()}{ext}" | |
| file_location = f"{UPLOAD_DIR}/{filename}" | |
| with open(file_location, "wb") as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| return file_location | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Failed to download audio: {str(e)}") | |
| async def process_audio(file: UploadFile = None, audio_url: str = None): | |
| if file: | |
| file_location = f"{UPLOAD_DIR}/{file.filename}" | |
| with open(file_location, "wb+") as file_object: | |
| shutil.copyfileobj(file.file, file_object) | |
| filename = file.filename | |
| elif audio_url: | |
| file_location = await download_audio_from_url(audio_url) | |
| filename = os.path.basename(file_location) | |
| else: | |
| raise HTTPException(status_code=400, detail="No audio file or URL provided") | |
| # In a real scenario, use librosa or pydub to get duration/sample_rate | |
| # For now, just return file size as a proxy for 'processed' metadata | |
| file_size = os.path.getsize(file_location) | |
| return { | |
| "filename": filename, | |
| "size_bytes": file_size, | |
| "format": filename.split(".")[-1], | |
| "duration_seconds": 0.0 # Placeholder | |
| } | |