""" VideoGrab v2.0 — Hugging Face Spaces FastAPI + встроенный HTML интерфейс. Без Gradio — без конфликтов версий. """ import os import re import base64 import json import urllib.request import asyncio from pathlib import Path import yt_dlp from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse, FileResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware DOWNLOAD_DIR = Path("/tmp/videograb_downloads") DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True) app = FastAPI(title="VideoGrab") app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) # ─── Boomstream ─────────────────────────────────────────────────────────────── def is_boomstream(url: str) -> bool: return "play.boomstream.com" in url def resolve_boomstream(url: str): req = urllib.request.Request(url, headers={ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" }) with urllib.request.urlopen(req, timeout=15) as resp: html = resp.read().decode("utf-8", errors="replace") match = re.search(r'window\.boomstreamConfig\s*=\s*(\{.+?\});', html, re.DOTALL) if not match: raise ValueError("boomstreamConfig не найден на странице") config = json.loads(match.group(1)) hls_b64 = config.get("mediaData", {}).get("links", {}).get("hls") if not hls_b64: raise ValueError("HLS-ссылка не найдена в конфиге") m3u8_url = base64.b64decode(hls_b64).decode("utf-8") title = config.get("mediaData", {}).get("title") or "boomstream_video" return m3u8_url, title # ─── Format helpers ─────────────────────────────────────────────────────────── QUALITY_MAP = { "best": "bestvideo+bestaudio/best", "4k": "bestvideo[height<=2160]+bestaudio/best", "1440": "bestvideo[height<=1440]+bestaudio/best", "1080": "bestvideo[height<=1080]+bestaudio/best", "720": "bestvideo[height<=720]+bestaudio/best", "480": "bestvideo[height<=480]+bestaudio/best", "360": "bestvideo[height<=360]+bestaudio/best", "240": "bestvideo[height<=240]+bestaudio/best", "worst": "worst", } CODEC_MAP = { "auto": "", "h264": "[vcodec^=avc]", "h265": "[vcodec^=hev]", "vp9": "[vcodec=vp9]", "av1": "[vcodec^=av01]", } CONTAINER_MAP = { "auto": None, "mp4": "mp4", "mkv": "mkv", "webm": "webm", "mov": "mov", "avi": "avi", "flv": "flv", } AUDIO_Q_MAP = {"best": "0", "medium": "5", "low": "9"} def build_format(quality, codec, container, audio_only, audio_quality): if audio_only: return "bestaudio" base = QUALITY_MAP.get(quality, "bestvideo+bestaudio/best") cf = CODEC_MAP.get(codec, "") if cf: base = f"bestvideo{cf}+bestaudio/best" return base def find_latest_file(since_mtime: float): files = sorted(DOWNLOAD_DIR.rglob("*"), key=lambda f: f.stat().st_mtime, reverse=True) for f in files: if f.is_file() and f.stat().st_mtime >= since_mtime: return f return None # ─── Download logic ─────────────────────────────────────────────────────────── def do_download(url, audio_only, quality, codec, container, audio_quality): import time logs = [] if is_boomstream(url): try: url, title = resolve_boomstream(url) logs.append(f"Boomstream: найдена HLS-ссылка для «{title}»") except Exception as e: return None, [f"Boomstream: {e}"] fmt = build_format(quality, codec, container, audio_only, audio_quality) cont = CONTAINER_MAP.get(container) aq = AUDIO_Q_MAP.get(audio_quality, "0") start_mtime = time.time() - 1 ydl_opts = { "format": fmt, "outtmpl": str(DOWNLOAD_DIR / "%(title)s.%(ext)s"), "noplaylist": True, "quiet": True, } if audio_only: ydl_opts["postprocessors"] = [{ "key": "FFmpegExtractAudio", "preferredcodec": "mp3", "preferredquality": aq, }] elif cont: ydl_opts["merge_output_format"] = cont for attempt in range(2): try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) filepath = Path(ydl.prepare_filename(info)) if audio_only: filepath = filepath.with_suffix(".mp3") elif cont and filepath.suffix != f".{cont}": filepath = filepath.with_suffix(f".{cont}") if not filepath.exists(): filepath = find_latest_file(start_mtime) if filepath and filepath.exists(): size_mb = filepath.stat().st_size / 1024 / 1024 logs.append(f"Скачано: {filepath.name} ({size_mb:.1f} MB)") return filepath, logs else: return None, logs + ["Файл не найден после скачивания"] except Exception as e: if attempt == 0: logs.append("Формат недоступен, пробую fallback...") ydl_opts["format"] = "bestvideo+bestaudio/best" else: return None, logs + [f"Ошибка: {e}"] return None, logs + ["Не удалось скачать"] # ─── API routes ─────────────────────────────────────────────────────────────── @app.post("/api/download") async def api_download(request: Request): data = await request.json() url = data.get("url", "").strip() if not url: return JSONResponse({"success": False, "logs": ["URL не указан"]}) loop = asyncio.get_event_loop() filepath, logs = await loop.run_in_executor(None, do_download, url, data.get("audio_only", False), data.get("quality", "1080"), data.get("codec", "auto"), data.get("container", "mp4"), data.get("audio_quality", "best")) if filepath: return JSONResponse({"success": True, "logs": logs, "filename": filepath.name, "download_url": f"/file/{filepath.name}"}) return JSONResponse({"success": False, "logs": logs}) @app.post("/api/batch") async def api_batch(request: Request): data = await request.json() urls = [u.strip() for u in data.get("urls", "").splitlines() if u.strip() and not u.startswith("#")] if not urls: return JSONResponse({"success": False, "logs": ["Нет корректных URL"]}) all_logs, results = [], [] loop = asyncio.get_event_loop() for url in urls: filepath, logs = await loop.run_in_executor(None, do_download, url, data.get("audio_only", False), data.get("quality", "1080"), data.get("codec", "auto"), data.get("container", "mp4"), data.get("audio_quality", "best")) all_logs.extend([f"[{url[:50]}]"] + logs) if filepath: results.append({"filename": filepath.name, "download_url": f"/file/{filepath.name}"}) return JSONResponse({"success": True, "logs": all_logs, "files": results}) @app.post("/api/playlist") async def api_playlist(request: Request): data = await request.json() url = data.get("url", "").strip() if not url: return JSONResponse({"success": False, "logs": ["URL не указан"]}) fmt = build_format(data.get("quality","1080"), data.get("codec","auto"), data.get("container","mp4"), data.get("audio_only",False), data.get("audio_quality","best")) cont = CONTAINER_MAP.get(data.get("container","mp4")) aq = AUDIO_Q_MAP.get(data.get("audio_quality","best"), "0") ydl_opts = {"format": fmt, "outtmpl": str(DOWNLOAD_DIR / "%(playlist)s/%(playlist_index)s - %(title)s.%(ext)s"), "quiet": True} if data.get("start"): ydl_opts["playliststart"] = int(data["start"]) if data.get("end"): ydl_opts["playlistend"] = int(data["end"]) if data.get("audio_only"): ydl_opts["postprocessors"] = [{"key":"FFmpegExtractAudio","preferredcodec":"mp3","preferredquality":aq}] elif cont: ydl_opts["merge_output_format"] = cont def run(): try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) return ["Плейлист скачан"] except Exception as e: return [f"Ошибка: {e}"] loop = asyncio.get_event_loop() logs = await loop.run_in_executor(None, run) return JSONResponse({"success": True, "logs": logs}) @app.get("/file/{filename}") async def serve_file(filename: str): safe = Path(filename).name fp = DOWNLOAD_DIR / safe if not fp.exists(): return JSONResponse({"error": "Файл не найден"}, status_code=404) return FileResponse(str(fp), filename=safe) # ─── HTML UI ────────────────────────────────────────────────────────────────── HTML = r""" VideoGrab v2.0

VIDEOGRAB

Скачивай видео из любой точки интернета

📹 Одно Видео
📋 Плейлист
📦 Пакетная
🔍 Boomstream — ссылка будет автоматически декодирована

🎬 Видео

Скачивание...


🎬 Видео

Файлы плейлиста сохраняются на сервере — отдельного скачивания нет

Скачивание...


🎬 Видео

Скачивание...

VideoGrab v2.0 · yt-dlp · YouTube, Vimeo, Boomstream и 1000+ сайтов

""" @app.get("/", response_class=HTMLResponse) async def root(): return HTML