File size: 8,777 Bytes
7c8af0f
 
 
 
 
 
 
af56894
7c8af0f
 
af56894
7c8af0f
 
1fe7ee6
7c8af0f
 
af56894
7c8af0f
 
 
 
 
af56894
 
 
 
 
 
 
7c8af0f
 
 
af56894
 
7c8af0f
 
 
 
af56894
7c8af0f
 
 
 
 
 
 
 
 
f38c41b
5bca226
159cf1b
 
 
 
 
 
 
 
 
 
 
 
7c8af0f
159cf1b
 
 
 
 
 
7c8af0f
159cf1b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
61ee84f
 
159cf1b
 
 
 
 
 
 
 
 
7c8af0f
159cf1b
 
7c8af0f
f38c41b
7c8af0f
b6349c3
f38c41b
 
 
 
 
b6349c3
f38c41b
 
 
b6349c3
6d82175
 
 
 
f38c41b
6d82175
 
 
f38c41b
6d82175
f38c41b
 
 
b6349c3
f38c41b
 
b6349c3
 
f38c41b
6d82175
 
f38c41b
 
b6349c3
 
 
 
f38c41b
 
 
92ee62a
f38c41b
7c8af0f
af56894
 
 
 
 
 
 
 
 
f38c41b
 
 
 
 
af56894
f38c41b
 
7c8af0f
f38c41b
af56894
 
 
f38c41b
 
 
 
 
 
 
 
 
 
 
7c8af0f
 
f38c41b
 
af56894
 
 
 
7c8af0f
 
af56894
 
 
f38c41b
af56894
f38c41b
 
 
 
 
af56894
f38c41b
 
 
af56894
 
 
4217039
af56894
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
import os
import shutil
import subprocess
import uuid
import logging
import asyncio
from pathlib import Path
from typing import Dict

import httpx
from fastapi import FastAPI, HTTPException, Request, BackgroundTasks, status
from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
from yt_dlp import YoutubeDL

# --- Basic Configuration ---
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
BASE_URL = os.getenv("BASE_URL")
TEMP_DIR = Path("/tmp/downloads")
STATIC_DIR = Path("static")
TEMP_DIR.mkdir(exist_ok=True)
STATIC_DIR.mkdir(exist_ok=True)
FILE_LIFETIME_SECONDS = 1800  # 30 minutes

# --- In-memory store for task statuses ---
# In a real-world, scalable application, you'd use a database like Redis or a message queue.
# For a Hugging Face Space, this simple dictionary is sufficient.
task_statuses: Dict[str, Dict] = {}


# --- FastAPI App Initialization ---
app = FastAPI(
    title="Async Video Processor",
    description="An API to process videos asynchronously without timeouts."
)
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")


# --- Helper Functions and Background Worker ---
async def cleanup_file(filepath: Path):
    await asyncio.sleep(FILE_LIFETIME_SECONDS)
    try:
        if filepath.parent.exists():
            shutil.rmtree(filepath.parent)
            logging.info(f"Cleaned up directory: {filepath.parent}")
    except Exception as e:
        logging.error(f"Error during cleanup of {filepath.parent}: {e}")

"""def get_best_formats_with_fallback(data: dict, requested_quality: int):
    
    if "formats" not in data:
        raise ValueError("The 'formats' key is missing from the Info API response.")

    # --- Video Selection (Revised and Improved) ---
    video_url = None
    # Filter for any video-only stream that has resolution info
    video_formats = [
        f for f in data.get("formats", [])
        if f.get("vcodec") not in (None, "none") and f.get("acodec") == "none" and f.get("height")
    ]

    # Sort by height from best to worst
    video_formats.sort(key=lambda f: f["height"], reverse=True)

    if not video_formats:
        raise ValueError("Could not find any suitable video-only streams in the API response.")

    # Try to find the best format that is at or below the requested quality
    selected_format = None
    for f in video_formats:
        if f["height"] <= requested_quality:
            selected_format = f
            break # Found the best possible match

    # If no match was found (e.g., user requested 144p but only 360p+ is available),
    # then default to the absolute best quality available.
    if selected_format is None:
        selected_format = video_formats[0] # Fallback to the best overall
        logging.warning(
            f"Requested quality ({requested_quality}p) is not available. "
            f"Falling back to best available quality: {selected_format.get('height')}p."
        )

    video_url = selected_format.get("url")
    logging.info(
        f"Selected video: {selected_format.get('height')}p "
        f"(format_id: {selected_format.get('format_id')}, ext: {selected_format.get('ext')})"
    )


    # --- Audio Selection ---
    audio_url = None
    audio_formats = [
        f for f in data.get("formats", [])
        if f.get("acodec") not in (None, "none") and f.get("vcodec") == "none"
    ]
    if audio_formats:
        audio_formats.sort(key=lambda f: f.get("abr", 0) or 0, reverse=True)
        selected_audio = audio_formats[0]
        audio_url = selected_audio.get("url")
        logging.info(
            f"Selected best audio: format_id {selected_audio.get('format_id')}, "
            f"bitrate {selected_audio.get('abr', 'N/A')}k, "
            f"codec {selected_audio.get('acodec')}"
        )

    # --- Final Check ---
    if not video_url or not audio_url:
        raise ValueError("Could not find a suitable video and/or audio stream.")

    return video_url, audio_url
"""


def download_and_merge_with_ytdlp(
    video_url: str,
    quality: int,
    output_path: Path
):
    """
    Tells yt-dlp to download and merge the best formats automatically.
    
    This is the robust, correct way to do this.
    """
    
    # --- THIS IS THE CORRECTED LINE ---
    # The invalid `[v+a]` has been replaced with the correct
    # filter: `[vcodec!=none][acodec!=none]`
    format_selector = (
        f"bestvideo[height<={quality}][vcodec!=none]+bestaudio[acodec!=none]/"
        f"best[vcodec!=none][acodec!=none][height<={quality}]/"
        f"best[height<={quality}]"
    )
    # --- END OF CORRECTION ---

    logging.info(f"Using yt-dlp format selector: {format_selector}")

    ydl_opts = {
        'format': format_selector,
        'outtmpl': str(output_path),
        'quiet': True,
        'noprogress': True,
        
        # This is safe to keep. yt-dlp will only use it for direct
        # file URLs and ignore it for m3u8/DASH streams.
        'external_downloader': 'aria2c',
        'external_downloader_args': [
            '--min-split-size=1M',
            '--max-connection-per-server=16',
            '--max-concurrent-downloads=16',
            '--split=16'
        ],
    }
    
    with YoutubeDL(ydl_opts) as ydl:
        ydl.download([video_url])

async def process_in_background(
    task_id: str,
    video_url: str,
    quality: int,
    base_url_for_links: str,
    background_tasks: BackgroundTasks
):
    """
    This is the main worker function that runs in the background.
    
    --- SIMPLIFIED LOGIC ---
    1. Tell yt-dlp to download, merge, and save the file.
    2. Set status to complete.
    3. Clean up.
    """
    task_statuses[task_id] = {"status": "processing", "message": "Processing video..."}
    
    try:
        # Step 1: Define output path
        final_output_dir = STATIC_DIR / task_id
        final_output_dir.mkdir()
        final_output_path = final_output_dir / "video.mp4"

        # Step 2: Run the complete download and merge in one go.
        # This one function call replaces your info-fetch, two downloads,
        # and ffmpeg merge steps.
        task_statuses[task_id]["message"] = "Downloading and merging video..."
        
        await asyncio.to_thread(
            download_and_merge_with_ytdlp,
            video_url,
            quality,
            final_output_path
        )

        # Step 3: Finalize and set status to complete
        task_statuses[task_id]["message"] = "Processing complete."
        download_url = f"{base_url_for_links.rstrip('/')}/static/{task_id}/video.mp4"
        task_statuses[task_id] = {
            "status": "complete",
            "download_url": download_url,
            "expires_in": f"{FILE_LIFETIME_SECONDS} seconds"
        }
        background_tasks.add_task(cleanup_file, final_output_path)

    except Exception as e:
        # This will now catch errors from yt-dlp directly, e.g., "video unavailable"
        logging.error(f"Task {task_id} failed: {e}")
        # Add the error output from yt-dlp if available
        error_message = str(e)
        if "yt-dlp error" in error_message: # You can customize this
             error_message = f"yt-dlp failed: {error_message}"
        task_statuses[task_id] = {"status": "failed", "error": error_message}
    finally:
        # We no longer have temp video/audio files, so cleanup is simpler.
        # The main cleanup_file task will handle the final directory.
        pass


# --- API Endpoints ---
@app.post("/api/process", status_code=status.HTTP_202_ACCEPTED)
async def start_processing_job(request: Request, background_tasks: BackgroundTasks):
    """
    Accepts a job and starts it in the background. Returns a task ID immediately.
    """
    body = await request.json()
    video_url = body.get("url")
    quality = int(body.get("quality", "1080"))

    if not video_url:
        raise HTTPException(status_code=400, detail="A 'url' is required.")

    task_id = str(uuid.uuid4())
    task_statuses[task_id] = {"status": "queued"}

    # We need the base URL of our own app to construct the final download link
    base_url_for_links = str(request.base_url)

    background_tasks.add_task(
        process_in_background, task_id, video_url, quality, base_url_for_links, background_tasks
    )

    status_url = request.url_for('get_job_status', task_id=task_id)
    return {"task_id": task_id, "status_url": str(status_url)}


@app.get("/api/status/{task_id}")
async def get_job_status(task_id: str):
    """
    Allows the client to poll for the status of a background job.
    """
    status_info = task_statuses.get(task_id)
    if not status_info:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found")
    return status_info