Spaces:
Sleeping
Sleeping
| import re | |
| import os | |
| import unicodedata | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse | |
| from pydantic import BaseModel | |
| from pytube import YouTube | |
| from urllib.parse import unquote | |
| app = FastAPI() | |
| # carpeta temp | |
| TEMP_FOLDER = "temp" | |
| if not os.path.exists(TEMP_FOLDER): | |
| os.makedirs(TEMP_FOLDER) | |
| temp_path = os.path.join(os.getcwd(), TEMP_FOLDER) | |
| # middleware CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def limpiar_temp(): | |
| for file in os.listdir(temp_path): | |
| os.remove(os.path.join(temp_path, file)) | |
| def change_song_name(titulo): | |
| titulo_normalizado = unicodedata.normalize("NFKD", titulo) | |
| # Limpiar el título para usarlo como nombre de archivo | |
| caracteres_no_permitidos = r"[\"\'/\\:*?<>|,]" | |
| titulo_limpio = re.sub(caracteres_no_permitidos, "", titulo_normalizado) | |
| # Codificar y decodificar para manejar la codificación Unicode | |
| titulo_limpio = titulo_limpio.encode("ascii", errors="ignore").decode("ascii") | |
| # print(titulo_limpio) | |
| return titulo_limpio | |
| class Item(BaseModel): | |
| url: str | |
| def download_song(url: str): | |
| try: | |
| limpiar_temp() | |
| yt = YouTube(url) | |
| titulo = yt.title | |
| titulo_limpio = change_song_name(titulo) | |
| stream = yt.streams.filter(only_audio=True).first() | |
| if stream: | |
| stream.download(temp_path) | |
| else: | |
| raise HTTPException( | |
| status_code=500, detail="No se pudo obtener el stream de audio" | |
| ) | |
| # print("Archivos en temp después de la descarga:", os.listdir(temp_path)) | |
| for file in os.listdir(temp_path): | |
| if file.endswith(".mp4"): | |
| src_path = os.path.join(temp_path, file) | |
| dst_path = os.path.join(temp_path, f"{titulo_limpio}.mp3") | |
| os.rename(src_path, dst_path) | |
| # print("Archivos en temp después del cambio de nombre:", os.listdir(temp_path)) | |
| return FileResponse( | |
| path=f"{temp_path}/{titulo_limpio}.mp3", | |
| filename=f"{titulo_limpio}.mp3", | |
| media_type="audio/mpeg", | |
| ) | |
| except Exception as e: | |
| print(f"Error durante la descarga: {e}") | |
| raise HTTPException(status_code=500, detail="Error interno del servidor") | |
| async def download_song_post(item: Item): | |
| return download_song(item.url) | |
| def download_info(): | |
| return { | |
| "message": "La api de descargas se llama /download y recibe un json con la str(url) de youtube" | |
| } | |
| def saludo(): | |
| return {"message": "Hola mundo"} | |