File size: 8,777 Bytes
7c8af0f af56894 7c8af0f af56894 7c8af0f 1fe7ee6 7c8af0f af56894 7c8af0f af56894 7c8af0f af56894 7c8af0f af56894 7c8af0f f38c41b 5bca226 159cf1b 7c8af0f 159cf1b 7c8af0f 159cf1b 61ee84f 159cf1b 7c8af0f 159cf1b 7c8af0f f38c41b 7c8af0f b6349c3 f38c41b b6349c3 f38c41b b6349c3 6d82175 f38c41b 6d82175 f38c41b 6d82175 f38c41b b6349c3 f38c41b b6349c3 f38c41b 6d82175 f38c41b b6349c3 f38c41b 92ee62a f38c41b 7c8af0f af56894 f38c41b af56894 f38c41b 7c8af0f f38c41b af56894 f38c41b 7c8af0f f38c41b af56894 7c8af0f af56894 f38c41b af56894 f38c41b af56894 f38c41b af56894 4217039 af56894 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
import os
import shutil
import subprocess
import uuid
import logging
import asyncio
from pathlib import Path
from typing import Dict
import httpx
from fastapi import FastAPI, HTTPException, Request, BackgroundTasks, status
from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
from yt_dlp import YoutubeDL
# --- Basic Configuration ---
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
BASE_URL = os.getenv("BASE_URL")
TEMP_DIR = Path("/tmp/downloads")
STATIC_DIR = Path("static")
TEMP_DIR.mkdir(exist_ok=True)
STATIC_DIR.mkdir(exist_ok=True)
FILE_LIFETIME_SECONDS = 1800 # 30 minutes
# --- In-memory store for task statuses ---
# In a real-world, scalable application, you'd use a database like Redis or a message queue.
# For a Hugging Face Space, this simple dictionary is sufficient.
task_statuses: Dict[str, Dict] = {}
# --- FastAPI App Initialization ---
app = FastAPI(
title="Async Video Processor",
description="An API to process videos asynchronously without timeouts."
)
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
# --- Helper Functions and Background Worker ---
async def cleanup_file(filepath: Path):
await asyncio.sleep(FILE_LIFETIME_SECONDS)
try:
if filepath.parent.exists():
shutil.rmtree(filepath.parent)
logging.info(f"Cleaned up directory: {filepath.parent}")
except Exception as e:
logging.error(f"Error during cleanup of {filepath.parent}: {e}")
"""def get_best_formats_with_fallback(data: dict, requested_quality: int):
if "formats" not in data:
raise ValueError("The 'formats' key is missing from the Info API response.")
# --- Video Selection (Revised and Improved) ---
video_url = None
# Filter for any video-only stream that has resolution info
video_formats = [
f for f in data.get("formats", [])
if f.get("vcodec") not in (None, "none") and f.get("acodec") == "none" and f.get("height")
]
# Sort by height from best to worst
video_formats.sort(key=lambda f: f["height"], reverse=True)
if not video_formats:
raise ValueError("Could not find any suitable video-only streams in the API response.")
# Try to find the best format that is at or below the requested quality
selected_format = None
for f in video_formats:
if f["height"] <= requested_quality:
selected_format = f
break # Found the best possible match
# If no match was found (e.g., user requested 144p but only 360p+ is available),
# then default to the absolute best quality available.
if selected_format is None:
selected_format = video_formats[0] # Fallback to the best overall
logging.warning(
f"Requested quality ({requested_quality}p) is not available. "
f"Falling back to best available quality: {selected_format.get('height')}p."
)
video_url = selected_format.get("url")
logging.info(
f"Selected video: {selected_format.get('height')}p "
f"(format_id: {selected_format.get('format_id')}, ext: {selected_format.get('ext')})"
)
# --- Audio Selection ---
audio_url = None
audio_formats = [
f for f in data.get("formats", [])
if f.get("acodec") not in (None, "none") and f.get("vcodec") == "none"
]
if audio_formats:
audio_formats.sort(key=lambda f: f.get("abr", 0) or 0, reverse=True)
selected_audio = audio_formats[0]
audio_url = selected_audio.get("url")
logging.info(
f"Selected best audio: format_id {selected_audio.get('format_id')}, "
f"bitrate {selected_audio.get('abr', 'N/A')}k, "
f"codec {selected_audio.get('acodec')}"
)
# --- Final Check ---
if not video_url or not audio_url:
raise ValueError("Could not find a suitable video and/or audio stream.")
return video_url, audio_url
"""
def download_and_merge_with_ytdlp(
video_url: str,
quality: int,
output_path: Path
):
"""
Tells yt-dlp to download and merge the best formats automatically.
This is the robust, correct way to do this.
"""
# --- THIS IS THE CORRECTED LINE ---
# The invalid `[v+a]` has been replaced with the correct
# filter: `[vcodec!=none][acodec!=none]`
format_selector = (
f"bestvideo[height<={quality}][vcodec!=none]+bestaudio[acodec!=none]/"
f"best[vcodec!=none][acodec!=none][height<={quality}]/"
f"best[height<={quality}]"
)
# --- END OF CORRECTION ---
logging.info(f"Using yt-dlp format selector: {format_selector}")
ydl_opts = {
'format': format_selector,
'outtmpl': str(output_path),
'quiet': True,
'noprogress': True,
# This is safe to keep. yt-dlp will only use it for direct
# file URLs and ignore it for m3u8/DASH streams.
'external_downloader': 'aria2c',
'external_downloader_args': [
'--min-split-size=1M',
'--max-connection-per-server=16',
'--max-concurrent-downloads=16',
'--split=16'
],
}
with YoutubeDL(ydl_opts) as ydl:
ydl.download([video_url])
async def process_in_background(
task_id: str,
video_url: str,
quality: int,
base_url_for_links: str,
background_tasks: BackgroundTasks
):
"""
This is the main worker function that runs in the background.
--- SIMPLIFIED LOGIC ---
1. Tell yt-dlp to download, merge, and save the file.
2. Set status to complete.
3. Clean up.
"""
task_statuses[task_id] = {"status": "processing", "message": "Processing video..."}
try:
# Step 1: Define output path
final_output_dir = STATIC_DIR / task_id
final_output_dir.mkdir()
final_output_path = final_output_dir / "video.mp4"
# Step 2: Run the complete download and merge in one go.
# This one function call replaces your info-fetch, two downloads,
# and ffmpeg merge steps.
task_statuses[task_id]["message"] = "Downloading and merging video..."
await asyncio.to_thread(
download_and_merge_with_ytdlp,
video_url,
quality,
final_output_path
)
# Step 3: Finalize and set status to complete
task_statuses[task_id]["message"] = "Processing complete."
download_url = f"{base_url_for_links.rstrip('/')}/static/{task_id}/video.mp4"
task_statuses[task_id] = {
"status": "complete",
"download_url": download_url,
"expires_in": f"{FILE_LIFETIME_SECONDS} seconds"
}
background_tasks.add_task(cleanup_file, final_output_path)
except Exception as e:
# This will now catch errors from yt-dlp directly, e.g., "video unavailable"
logging.error(f"Task {task_id} failed: {e}")
# Add the error output from yt-dlp if available
error_message = str(e)
if "yt-dlp error" in error_message: # You can customize this
error_message = f"yt-dlp failed: {error_message}"
task_statuses[task_id] = {"status": "failed", "error": error_message}
finally:
# We no longer have temp video/audio files, so cleanup is simpler.
# The main cleanup_file task will handle the final directory.
pass
# --- API Endpoints ---
@app.post("/api/process", status_code=status.HTTP_202_ACCEPTED)
async def start_processing_job(request: Request, background_tasks: BackgroundTasks):
"""
Accepts a job and starts it in the background. Returns a task ID immediately.
"""
body = await request.json()
video_url = body.get("url")
quality = int(body.get("quality", "1080"))
if not video_url:
raise HTTPException(status_code=400, detail="A 'url' is required.")
task_id = str(uuid.uuid4())
task_statuses[task_id] = {"status": "queued"}
# We need the base URL of our own app to construct the final download link
base_url_for_links = str(request.base_url)
background_tasks.add_task(
process_in_background, task_id, video_url, quality, base_url_for_links, background_tasks
)
status_url = request.url_for('get_job_status', task_id=task_id)
return {"task_id": task_id, "status_url": str(status_url)}
@app.get("/api/status/{task_id}")
async def get_job_status(task_id: str):
"""
Allows the client to poll for the status of a background job.
"""
status_info = task_statuses.get(task_id)
if not status_info:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found")
return status_info |