Spaces:
Paused
Paused
| from fastapi import FastAPI, BackgroundTasks, HTTPException, Request | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from pydantic import BaseModel | |
| import uuid | |
| import os | |
| from app.utils import download_video | |
| import logging | |
| app = FastAPI() | |
| VIDEO_DIR = "/tmp" | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("yt-downloader") | |
| class DownloadRequest(BaseModel): | |
| url: str | |
| cookie: str | |
| def health_check(): | |
| return {"status": "ok", "message": "YouTube Downloader API is running."} | |
| async def download(payload: DownloadRequest, request: Request): | |
| # Chỉ cho phép URL YouTube | |
| if not (payload.url.startswith("https://www.youtube.com/") or payload.url.startswith("https://youtu.be/")): | |
| logger.warning(f"Blocked non-YouTube URL: {payload.url}") | |
| raise HTTPException(status_code=400, detail="Only YouTube URLs are allowed.") | |
| # Validate cookie (đơn giản: phải có ít nhất 1 dấu '=') | |
| if not payload.cookie or '=' not in payload.cookie: | |
| logger.warning("Invalid cookie input") | |
| raise HTTPException(status_code=400, detail="Invalid cookie format.") | |
| # Kiểm soát số lượng file tạm | |
| files = sorted([f for f in os.listdir(VIDEO_DIR) if f.endswith('.mp4')], key=lambda x: os.path.getctime(os.path.join(VIDEO_DIR, x))) | |
| while len(files) >= 10: | |
| old_file = os.path.join(VIDEO_DIR, files.pop(0)) | |
| try: | |
| os.remove(old_file) | |
| logger.info(f"Removed old file: {old_file}") | |
| except Exception as e: | |
| logger.error(f"Failed to remove old file: {old_file}, {e}") | |
| try: | |
| video_id = str(uuid.uuid4()) | |
| video_path = download_video(payload.url, payload.cookie, video_id=video_id) | |
| # Lấy host từ request | |
| base_url = str(request.base_url).rstrip('/') | |
| download_url = f"{base_url}/video/{video_id}" | |
| logger.info(f"Video downloaded: {video_path}, download_url: {download_url}") | |
| return {"download_url": download_url} | |
| except Exception as e: | |
| logger.error(f"API download error: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def get_video(video_id: str, background_tasks: BackgroundTasks): | |
| filepath = f"/tmp/{video_id}.mp4" | |
| if not os.path.exists(filepath): | |
| return JSONResponse({"error": "Not found"}, status_code=404) | |
| background_tasks.add_task(os.remove, filepath) | |
| return FileResponse(filepath, filename=f"{video_id}.mp4", media_type="video/mp4") | |