|
|
import os |
|
|
import shutil |
|
|
import subprocess |
|
|
import uuid |
|
|
import logging |
|
|
import asyncio |
|
|
from pathlib import Path |
|
|
from typing import Dict |
|
|
|
|
|
import httpx |
|
|
from fastapi import FastAPI, HTTPException, Request, BackgroundTasks, status |
|
|
from fastapi.responses import JSONResponse |
|
|
from fastapi.staticfiles import StaticFiles |
|
|
from yt_dlp import YoutubeDL |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
|
|
BASE_URL = os.getenv("BASE_URL") |
|
|
TEMP_DIR = Path("/tmp/downloads") |
|
|
STATIC_DIR = Path("static") |
|
|
TEMP_DIR.mkdir(exist_ok=True) |
|
|
STATIC_DIR.mkdir(exist_ok=True) |
|
|
FILE_LIFETIME_SECONDS = 1800 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
task_statuses: Dict[str, Dict] = {} |
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="Async Video Processor", |
|
|
description="An API to process videos asynchronously without timeouts." |
|
|
) |
|
|
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") |
|
|
|
|
|
|
|
|
|
|
|
async def cleanup_file(filepath: Path): |
|
|
await asyncio.sleep(FILE_LIFETIME_SECONDS) |
|
|
try: |
|
|
if filepath.parent.exists(): |
|
|
shutil.rmtree(filepath.parent) |
|
|
logging.info(f"Cleaned up directory: {filepath.parent}") |
|
|
except Exception as e: |
|
|
logging.error(f"Error during cleanup of {filepath.parent}: {e}") |
|
|
|
|
|
"""def get_best_formats_with_fallback(data: dict, requested_quality: int): |
|
|
|
|
|
if "formats" not in data: |
|
|
raise ValueError("The 'formats' key is missing from the Info API response.") |
|
|
|
|
|
# --- Video Selection (Revised and Improved) --- |
|
|
video_url = None |
|
|
# Filter for any video-only stream that has resolution info |
|
|
video_formats = [ |
|
|
f for f in data.get("formats", []) |
|
|
if f.get("vcodec") not in (None, "none") and f.get("acodec") == "none" and f.get("height") |
|
|
] |
|
|
|
|
|
# Sort by height from best to worst |
|
|
video_formats.sort(key=lambda f: f["height"], reverse=True) |
|
|
|
|
|
if not video_formats: |
|
|
raise ValueError("Could not find any suitable video-only streams in the API response.") |
|
|
|
|
|
# Try to find the best format that is at or below the requested quality |
|
|
selected_format = None |
|
|
for f in video_formats: |
|
|
if f["height"] <= requested_quality: |
|
|
selected_format = f |
|
|
break # Found the best possible match |
|
|
|
|
|
# If no match was found (e.g., user requested 144p but only 360p+ is available), |
|
|
# then default to the absolute best quality available. |
|
|
if selected_format is None: |
|
|
selected_format = video_formats[0] # Fallback to the best overall |
|
|
logging.warning( |
|
|
f"Requested quality ({requested_quality}p) is not available. " |
|
|
f"Falling back to best available quality: {selected_format.get('height')}p." |
|
|
) |
|
|
|
|
|
video_url = selected_format.get("url") |
|
|
logging.info( |
|
|
f"Selected video: {selected_format.get('height')}p " |
|
|
f"(format_id: {selected_format.get('format_id')}, ext: {selected_format.get('ext')})" |
|
|
) |
|
|
|
|
|
|
|
|
# --- Audio Selection --- |
|
|
audio_url = None |
|
|
audio_formats = [ |
|
|
f for f in data.get("formats", []) |
|
|
if f.get("acodec") not in (None, "none") and f.get("vcodec") == "none" |
|
|
] |
|
|
if audio_formats: |
|
|
audio_formats.sort(key=lambda f: f.get("abr", 0) or 0, reverse=True) |
|
|
selected_audio = audio_formats[0] |
|
|
audio_url = selected_audio.get("url") |
|
|
logging.info( |
|
|
f"Selected best audio: format_id {selected_audio.get('format_id')}, " |
|
|
f"bitrate {selected_audio.get('abr', 'N/A')}k, " |
|
|
f"codec {selected_audio.get('acodec')}" |
|
|
) |
|
|
|
|
|
# --- Final Check --- |
|
|
if not video_url or not audio_url: |
|
|
raise ValueError("Could not find a suitable video and/or audio stream.") |
|
|
|
|
|
return video_url, audio_url |
|
|
""" |
|
|
|
|
|
|
|
|
def download_and_merge_with_ytdlp( |
|
|
video_url: str, |
|
|
quality: int, |
|
|
output_path: Path |
|
|
): |
|
|
""" |
|
|
Tells yt-dlp to download and merge the best formats automatically. |
|
|
|
|
|
This is the robust, correct way to do this. |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
format_selector = ( |
|
|
f"bestvideo[height<={quality}][vcodec!=none]+bestaudio[acodec!=none]/" |
|
|
f"best[vcodec!=none][acodec!=none][height<={quality}]/" |
|
|
f"best[height<={quality}]" |
|
|
) |
|
|
|
|
|
|
|
|
logging.info(f"Using yt-dlp format selector: {format_selector}") |
|
|
|
|
|
ydl_opts = { |
|
|
'format': format_selector, |
|
|
'outtmpl': str(output_path), |
|
|
'quiet': True, |
|
|
'noprogress': True, |
|
|
|
|
|
|
|
|
|
|
|
'external_downloader': 'aria2c', |
|
|
'external_downloader_args': [ |
|
|
'--min-split-size=1M', |
|
|
'--max-connection-per-server=16', |
|
|
'--max-concurrent-downloads=16', |
|
|
'--split=16' |
|
|
], |
|
|
} |
|
|
|
|
|
with YoutubeDL(ydl_opts) as ydl: |
|
|
ydl.download([video_url]) |
|
|
|
|
|
async def process_in_background( |
|
|
task_id: str, |
|
|
video_url: str, |
|
|
quality: int, |
|
|
base_url_for_links: str, |
|
|
background_tasks: BackgroundTasks |
|
|
): |
|
|
""" |
|
|
This is the main worker function that runs in the background. |
|
|
|
|
|
--- SIMPLIFIED LOGIC --- |
|
|
1. Tell yt-dlp to download, merge, and save the file. |
|
|
2. Set status to complete. |
|
|
3. Clean up. |
|
|
""" |
|
|
task_statuses[task_id] = {"status": "processing", "message": "Processing video..."} |
|
|
|
|
|
try: |
|
|
|
|
|
final_output_dir = STATIC_DIR / task_id |
|
|
final_output_dir.mkdir() |
|
|
final_output_path = final_output_dir / "video.mp4" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
task_statuses[task_id]["message"] = "Downloading and merging video..." |
|
|
|
|
|
await asyncio.to_thread( |
|
|
download_and_merge_with_ytdlp, |
|
|
video_url, |
|
|
quality, |
|
|
final_output_path |
|
|
) |
|
|
|
|
|
|
|
|
task_statuses[task_id]["message"] = "Processing complete." |
|
|
download_url = f"{base_url_for_links.rstrip('/')}/static/{task_id}/video.mp4" |
|
|
task_statuses[task_id] = { |
|
|
"status": "complete", |
|
|
"download_url": download_url, |
|
|
"expires_in": f"{FILE_LIFETIME_SECONDS} seconds" |
|
|
} |
|
|
background_tasks.add_task(cleanup_file, final_output_path) |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
logging.error(f"Task {task_id} failed: {e}") |
|
|
|
|
|
error_message = str(e) |
|
|
if "yt-dlp error" in error_message: |
|
|
error_message = f"yt-dlp failed: {error_message}" |
|
|
task_statuses[task_id] = {"status": "failed", "error": error_message} |
|
|
finally: |
|
|
|
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
@app.post("/api/process", status_code=status.HTTP_202_ACCEPTED) |
|
|
async def start_processing_job(request: Request, background_tasks: BackgroundTasks): |
|
|
""" |
|
|
Accepts a job and starts it in the background. Returns a task ID immediately. |
|
|
""" |
|
|
body = await request.json() |
|
|
video_url = body.get("url") |
|
|
quality = int(body.get("quality", "1080")) |
|
|
|
|
|
if not video_url: |
|
|
raise HTTPException(status_code=400, detail="A 'url' is required.") |
|
|
|
|
|
task_id = str(uuid.uuid4()) |
|
|
task_statuses[task_id] = {"status": "queued"} |
|
|
|
|
|
|
|
|
base_url_for_links = str(request.base_url) |
|
|
|
|
|
background_tasks.add_task( |
|
|
process_in_background, task_id, video_url, quality, base_url_for_links, background_tasks |
|
|
) |
|
|
|
|
|
status_url = request.url_for('get_job_status', task_id=task_id) |
|
|
return {"task_id": task_id, "status_url": str(status_url)} |
|
|
|
|
|
|
|
|
@app.get("/api/status/{task_id}") |
|
|
async def get_job_status(task_id: str): |
|
|
""" |
|
|
Allows the client to poll for the status of a background job. |
|
|
""" |
|
|
status_info = task_statuses.get(task_id) |
|
|
if not status_info: |
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found") |
|
|
return status_info |