fastapi-rc / main.py
ozipoetra's picture
Update main.py
014e519 verified
import os
import asyncio
import httpx
import psutil
import time
import re
import shutil
import math
from pathlib import Path
from fastapi import FastAPI, HTTPException, Security, Body
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.middleware.cors import CORSMiddleware
from pydantic import HttpUrl, BaseModel, Field
from typing import Optional, Literal
from contextlib import asynccontextmanager
# --- Configuration ---
API_TOKEN = os.getenv("API_TOKEN", "supersecret")
PIXELDRAIN_KEY = os.getenv("PIXELDRAIN_KEY")
YTDLP_COOKIES = os.getenv("YTDLP_COOKIES", "") # Path to cookies.txt OR cookies string content
# Handle cookies: if it's a string (not a file path), create temp file
COOKIES_FILE = None
if YTDLP_COOKIES:
if os.path.exists(YTDLP_COOKIES):
# It's a valid file path
COOKIES_FILE = YTDLP_COOKIES
else:
# It's a cookies string - create temporary file.
# IMPORTANT: HuggingFace secrets and most env vars store multiline
# values with literal \n (backslash+n) instead of real newlines.
# Netscape cookie format requires real newlines β€” convert here.
cookie_content = YTDLP_COOKIES.replace("\\n", "\n").replace("\\t", "\t")
COOKIES_FILE = "/tmp/cookies_from_env.txt"
try:
with open(COOKIES_FILE, 'w') as f:
f.write(cookie_content)
cookie_lines = [l for l in cookie_content.splitlines()
if l.strip() and not l.startswith("#")]
print(f"βœ… Cookies file created: {len(cookie_lines)} entries β†’ {COOKIES_FILE}")
if len(cookie_lines) == 0:
print("⚠️ WARNING: 0 cookie entries parsed β€” check YTDLP_COOKIES format (Netscape required).")
except Exception as e:
print(f"❌ Failed to create cookies file: {e}")
COOKIES_FILE = None
# Log configuration at startup
print("=" * 60)
print("🎬 ANIME VIDEO COMPRESSOR - Configuration")
print("=" * 60)
print(f"API Token: {'βœ… Set' if API_TOKEN else '❌ Not set'}")
print(f"Pixeldrain Key: {'βœ… Set' if PIXELDRAIN_KEY else '❌ Not set'}")
print(f"Cookies: {'βœ… Loaded' if COOKIES_FILE else '❌ Not loaded'}")
print("=" * 60)
# Hugging Face Ephemeral Disk Paths (Fast NVMe /tmp)
TEMP_DIR = Path("/tmp/downloads")
EXTRACT_DIR = Path("/tmp/extracted")
# Settings
NUM_WORKERS = 1 # Concurrent jobs
MAX_ARCHIVE_SIZE = 10 * 1024**3 # 10GB Archive Limit (Since /tmp is ~50GB)
# Supported Extensions
VIDEO_EXTS = {'.mp4', '.mkv', '.mov', '.webm', '.avi', '.flv', '.wmv', '.m4v'}
ARCHIVE_EXTS = {'.zip', '.rar', '.7z', '.tar', '.gz', '.xz', '.bz2'}
# --- Compression Presets ---
COMPRESSION_PRESETS = {
# Fast WebM presets (optimized for 2 vCPU)
"webm_anime_720p_fast": {
"codec": "libvpx-vp9",
"format": "webm",
"resolution": "1280:720",
"crf": "31",
"preset": None,
"tune": None,
"profile": None,
"audio_codec": "libopus",
"audio_bitrate": "64k",
"extra_params": ["-row-mt", "1", "-tile-columns", "1", "-threads", "2", "-deadline", "realtime", "-cpu-used", "4"],
"description": "WebM VP9 Fast - Quick encode, 720p (2-3x faster)"
},
"webm_anime_1080p_fast": {
"codec": "libvpx-vp9",
"format": "webm",
"resolution": "1920:1080",
"crf": "32",
"preset": None,
"tune": None,
"profile": None,
"audio_codec": "libopus",
"audio_bitrate": "96k",
"extra_params": ["-row-mt", "1", "-tile-columns", "2", "-threads", "2", "-deadline", "realtime", "-cpu-used", "4"],
"description": "WebM VP9 Fast - Quick encode, 1080p source quality"
},
# Fast H.264 presets
"mp4_anime_720p_fast": {
"codec": "libx264",
"format": "mp4",
"resolution": "1280:720",
"crf": "23",
"preset": "veryfast",
"tune": "animation",
"profile": "high",
"audio_codec": "aac",
"audio_bitrate": "96k",
"extra_params": [],
"description": "MP4 H.264 Fast - Quick encode, 720p (3-4x faster)"
},
"mp4_anime_1080p_fast": {
"codec": "libx264",
"format": "mp4",
"resolution": "1920:1080",
"crf": "23",
"preset": "veryfast",
"tune": "animation",
"profile": "high",
"audio_codec": "aac",
"audio_bitrate": "128k",
"extra_params": [],
"description": "MP4 H.264 Fast - Quick encode, 1080p source quality"
},
# H.265 presets (better compression, slower)
"mp4_anime_720p_h265": {
"codec": "libx265",
"format": "mp4",
"resolution": "1280:720",
"crf": "26",
"preset": "fast",
"tune": None,
"profile": None,
"audio_codec": "aac",
"audio_bitrate": "96k",
"extra_params": ["-x265-params", "aq-mode=3"],
"description": "MP4 H.265 - Better compression than H.264, 720p"
},
"mp4_anime_1080p_h265": {
"codec": "libx265",
"format": "mp4",
"resolution": "1920:1080",
"crf": "26",
"preset": "fast",
"tune": None,
"profile": None,
"audio_codec": "aac",
"audio_bitrate": "128k",
"extra_params": ["-x265-params", "aq-mode=3"],
"description": "MP4 H.265 - Better compression than H.264, 1080p"
},
# Mirror (no compression)
"mirror": {
"codec": "copy",
"format": "auto", # Keep original format
"resolution": None,
"crf": None,
"preset": None,
"tune": None,
"profile": None,
"audio_codec": "copy",
"audio_bitrate": None,
"extra_params": [],
"description": "Mirror - No compression, just re-upload (instant)"
}
}
# --- Pydantic Models ---
class CompressionSettings(BaseModel):
"""Custom compression settings for advanced users"""
codec: Optional[Literal["libx264", "libvpx-vp9", "libx265", "copy"]] = Field(None, description="Video codec (copy = no re-encoding)")
format: Optional[Literal["mp4", "webm", "mkv", "auto"]] = Field(None, description="Output format (auto = keep original)")
resolution: Optional[str] = Field(None, description="Output resolution (e.g., '1280:720', '1920:1080', leave empty for original)", pattern=r"^\d+:\d+$")
crf: Optional[int] = Field(None, ge=0, le=51, description="Constant Rate Factor (0-51, lower=better quality, ignored if codec=copy)")
preset: Optional[Literal["ultrafast", "superfast", "veryfast", "faster", "fast", "medium", "slow", "slower", "veryslow"]] = Field(None, description="Encoding speed preset (H.264/H.265 only)")
tune: Optional[Literal["film", "animation", "grain", "stillimage", "fastdecode", "zerolatency"]] = Field(None, description="Encoding tune (H.264/H.265 only)")
profile: Optional[Literal["baseline", "main", "high", "high10"]] = Field(None, description="Encoding profile (H.264 only)")
audio_codec: Optional[Literal["aac", "libopus", "libmp3lame", "copy"]] = Field(None, description="Audio codec (copy = no re-encoding)")
audio_bitrate: Optional[str] = Field(None, description="Audio bitrate (e.g., '64k', '128k', ignored if audio_codec=copy)", pattern=r"^\d+k$")
class Config:
json_schema_extra = {
"example": {
"codec": "libvpx-vp9",
"format": "webm",
"resolution": "1280:720",
"crf": 32,
"audio_codec": "libopus",
"audio_bitrate": "64k"
}
}
class CompressionRequest(BaseModel):
"""Compression request with optional custom settings"""
video_url: HttpUrl
preset: Optional[Literal[
"webm_anime_720p_fast", "webm_anime_1080p_fast",
"mp4_anime_720p_fast", "mp4_anime_1080p_fast",
"mp4_anime_720p_h265", "mp4_anime_1080p_h265",
"mirror"
]] = Field(
"webm_anime_720p_fast",
description="Preset configuration (fast presets optimized for 2 vCPU)"
)
custom_settings: Optional[CompressionSettings] = Field(None, description="Override preset with custom settings")
is_ytdlp: Optional[bool] = Field(False, description="Use yt-dlp to download (supports YouTube, Twitter, etc.)")
class Config:
json_schema_extra = {
"example": {
"video_url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ",
"preset": "mp4_anime_720p_fast",
"is_ytdlp": True
}
}
# --- Global State ---
tasks = {}
video_queue = asyncio.Queue()
active_files = set() # Track files currently being processed
# --- Helper Functions ---
def get_container_stats():
"""Reads resource usage from Docker container (cgroup v1 and v2 support)."""
stats = {}
# === CPU Usage ===
try:
stats["cpu"] = round(psutil.cpu_percent(interval=0.1), 1)
except:
stats["cpu"] = 0.0
# === MEMORY Usage (Docker Cgroup) ===
try:
# Try cgroup v2 first (newer Docker versions)
if os.path.exists("/sys/fs/cgroup/memory.current"):
with open("/sys/fs/cgroup/memory.current", "r") as f:
used = int(f.read().strip())
with open("/sys/fs/cgroup/memory.max", "r") as f:
limit_str = f.read().strip()
limit = int(limit_str) if limit_str != "max" else 16 * 1024**3 # Default 16GB
# Fallback to cgroup v1 (Hugging Face Spaces uses this)
else:
with open("/sys/fs/cgroup/memory/memory.usage_in_bytes", "r") as f:
used = int(f.read().strip())
with open("/sys/fs/cgroup/memory/memory.limit_in_bytes", "r") as f:
limit = int(f.read().strip())
# If limit is absurdly high (> 1TB), it means no limit set
if limit > 1024**4:
limit = 16 * 1024**3 # Default to 16GB
stats["ram_percent"] = round((used / limit) * 100, 1)
stats["ram_used_mb"] = round(used / 1024**2, 0)
stats["ram_limit_mb"] = round(limit / 1024**2, 0)
except Exception as e:
# Absolute fallback (for local testing)
mem = psutil.virtual_memory()
stats["ram_percent"] = round(mem.percent, 1)
stats["ram_used_mb"] = round(mem.used / 1024**2, 0)
stats["ram_limit_mb"] = round(mem.total / 1024**2, 0)
# === DISK Usage (Container /tmp) ===
try:
# Always read from /tmp (Docker ephemeral storage)
disk = shutil.disk_usage("/tmp")
stats["disk_used_gb"] = round((disk.total - disk.free) / 1024**3, 2)
stats["disk_free_gb"] = round(disk.free / 1024**3, 2)
stats["disk_total_gb"] = round(disk.total / 1024**3, 2)
stats["disk_percent"] = round(((disk.total - disk.free) / disk.total) * 100, 1)
except:
stats["disk_free_gb"] = 0
stats["disk_total_gb"] = 0
stats["disk_percent"] = 0
return stats
def get_filename_from_headers(headers, url):
cd = headers.get("Content-Disposition")
if cd:
fname = re.findall(r'filename="?([^";]+)"?', cd)
if fname: return fname[0]
return os.path.basename(str(url).split("?")[0]) or "file.bin"
async def get_video_duration(file_path):
cmd = [
"ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", str(file_path)
]
process = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE)
stdout, _ = await process.communicate()
try: return float(stdout.decode().strip())
except: return 0.0
async def extract_archive(archive_path, extract_to):
"""Uses official 7zz to extract archives, handles RAR5."""
extract_to.mkdir(parents=True, exist_ok=True)
# 7zz x = Extract with full paths, -y = Yes to all
cmd = ["7zz", "x", str(archive_path), f"-o{str(extract_to)}", "-y"]
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
# Verify extraction success by looking for video files
videos_found = False
for ext in VIDEO_EXTS:
if any(extract_to.rglob(f"*{ext}")):
videos_found = True
break
if not videos_found:
error_msg = stderr.decode() or stdout.decode()
print(f"Extraction Failed: {error_msg}")
raise Exception("Archive extraction failed or contained no videos.")
print(f"Extraction finished. Videos found.")
def build_ffmpeg_command(input_file, output_file, settings_dict):
"""Builds FFmpeg command based on settings dictionary"""
cmd = ["ffmpeg", "-y", "-i", str(input_file)]
# Mirror mode (no encoding)
if settings_dict["codec"] == "copy":
cmd.extend(["-c:v", "copy", "-c:a", "copy"])
cmd.append(str(output_file))
return cmd
# Normal encoding
cmd.extend(["-threads", "2"]) # Optimized for 2 vCPU
# Video filters - always apply scaling with lanczos for quality
if settings_dict["resolution"]:
vf_parts = [f"scale={settings_dict['resolution']}:flags=lanczos"]
# Add unsharp filter for anime (helps with compression artifacts)
if settings_dict.get("tune") == "animation" or settings_dict["codec"] == "libvpx-vp9":
vf_parts.append("unsharp=3:3:0.3:3:3:0.0")
cmd.extend(["-vf", ",".join(vf_parts)])
# Video codec
cmd.extend(["-c:v", settings_dict["codec"]])
# CRF
if settings_dict["codec"] in ["libx264", "libx265"]:
cmd.extend(["-crf", str(settings_dict["crf"])])
elif settings_dict["codec"] == "libvpx-vp9":
cmd.extend(["-crf", str(settings_dict["crf"]), "-b:v", "0"]) # VP9 needs -b:v 0 for CRF mode
# Preset (H.264/H.265 only)
if settings_dict["preset"]:
cmd.extend(["-preset", settings_dict["preset"]])
# Tune (H.264/H.265 only)
if settings_dict["tune"]:
cmd.extend(["-tune", settings_dict["tune"]])
# Profile (H.264 only)
if settings_dict["profile"] and settings_dict["codec"] == "libx264":
cmd.extend(["-profile:v", settings_dict["profile"]])
# Pixel format
if settings_dict["codec"] in ["libx264", "libx265"]:
cmd.extend(["-pix_fmt", "yuv420p"])
elif settings_dict["codec"] == "libvpx-vp9":
cmd.extend(["-pix_fmt", "yuv420p"])
# Extra codec-specific parameters
if settings_dict.get("extra_params"):
cmd.extend(settings_dict["extra_params"])
# Audio encoding
cmd.extend(["-c:a", settings_dict["audio_codec"]])
if settings_dict["audio_codec"] != "copy":
cmd.extend(["-b:a", settings_dict["audio_bitrate"]])
if settings_dict["audio_codec"] != "libopus": # Opus handles channels automatically
cmd.extend(["-ac", "2", "-ar", "48000" if settings_dict["audio_codec"] == "libopus" else "44100"])
# Format-specific optimizations
if settings_dict["format"] == "mp4":
cmd.extend(["-movflags", "+faststart"])
if settings_dict["codec"] == "libx264":
cmd.extend(["-level", "4.1"]) # Compatibility
# Progress tracking
cmd.extend(["-progress", "pipe:1", "-nostats"])
cmd.extend(["-max_muxing_queue_size", "1024"])
cmd.append(str(output_file))
return cmd
# --- Worker Logic ---
async def janitor():
"""Aggressive cleanup to keep Disk Free."""
while True:
await asyncio.sleep(300) # Check every 5 mins
now = time.time()
# Clean Tasks
stale_ids = [tid for tid, t in tasks.items() if now - t.get("created_at", 0) > 86400]
for tid in stale_ids: del tasks[tid]
# Clean Files - BUT SKIP FILES IN ACTIVE USE
for folder in [TEMP_DIR, EXTRACT_DIR]:
if not folder.exists(): continue
for item in folder.glob("*"):
try:
# SKIP if file/folder is in active use
if str(item) in active_files:
continue
# Delete if older than 2 hours (increased from 30 mins)
# This gives queued videos more time to be processed
if item.stat().st_mtime < now - 7200:
if item.is_file(): item.unlink()
elif item.is_dir(): shutil.rmtree(item)
print(f"Janitor: Cleaned up {item}")
except Exception as e:
print(f"Janitor: Could not delete {item}: {e}")
async def worker(worker_id):
print(f"Worker {worker_id} started.")
async with httpx.AsyncClient(follow_redirects=True, timeout=300.0) as client:
while True:
job = await video_queue.get()
task_id = job["id"]
# Define paths early for cleanup in 'finally'
input_file = TEMP_DIR / f"{task_id}_in"
output_file = None # Will be set based on format
unique_extract_path = EXTRACT_DIR / task_id
try:
# --- STEP 1: ACQUIRE FILE ---
if job["type"] == "url":
tasks[task_id]["status"] = "downloading"
# Use yt-dlp if requested
if job.get("is_ytdlp", False):
print(f"Using yt-dlp for {job['payload']}")
# Run yt-dlp to download (default downloader)
yt_output = TEMP_DIR / f"{task_id}_ytdlp.%(ext)s"
cmd = [
"yt-dlp",
"-o", str(yt_output),
"--no-playlist",
"--format", "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best",
"--progress",
"--newline",
job["payload"]
]
# Add cookies if provided
if COOKIES_FILE and os.path.exists(COOKIES_FILE):
cmd.extend(["--cookies", COOKIES_FILE])
print(f"Using cookies: {COOKIES_FILE}")
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# Track yt-dlp progress
while True:
line = await process.stdout.readline()
if not line:
break
text = line.decode().strip()
print(f"yt-dlp: {text}") # Debug output
# Parse yt-dlp progress: [download] 45.2% of 123.45MiB at 5.67MiB/s
if "[download]" in text and "%" in text:
try:
# Extract percentage
percent_match = re.search(r'(\d+\.?\d*)%', text)
if percent_match:
percent = int(float(percent_match.group(1)))
tasks[task_id]["progress"] = min(99, percent)
# Extract speed - try multiple patterns
speed_match = re.search(r'at\s+([\d.]+[KMG]iB/s)', text)
if not speed_match:
speed_match = re.search(r'([\d.]+[KMG]iB/s)', text)
if speed_match:
tasks[task_id]["download_speed"] = speed_match.group(1)
print(f"Progress: {tasks[task_id].get('progress')}%, Speed: {tasks[task_id].get('download_speed')}")
except Exception as e:
print(f"Parse error: {e}")
stderr_output = await process.stderr.read()
if process.returncode != 0:
error_msg = stderr_output.decode()[-1000:] # more context
cookies_loaded = bool(COOKIES_FILE and os.path.exists(COOKIES_FILE))
print(f"yt-dlp stderr tail:\n{error_msg}") # always log full error
# Order matters: most specific checks first.
# Bot/sign-in detection (happens even WITH cookies if they're stale)
if "Sign in to confirm" in error_msg or "bot" in error_msg.lower():
if cookies_loaded:
raise Exception(
"YouTube bot-detection triggered despite cookies being loaded. "
"Your cookies may be expired or from the wrong account. "
"Re-export fresh cookies from a logged-in browser and update YTDLP_COOKIES."
)
else:
raise Exception(
"YouTube requires login cookies. Add Netscape-format cookies to YTDLP_COOKIES env var."
)
# 403 Forbidden
elif "403" in error_msg or "HTTP Error 403" in error_msg:
if cookies_loaded:
raise Exception(
"HTTP 403: Server refused even with cookies. "
"Cookies may be expired/invalid, or video is region-locked."
)
else:
raise Exception("HTTP 403 Forbidden. Try adding cookies or check if video is region-locked.")
# Video gone / private
elif "Video unavailable" in error_msg or "Private video" in error_msg:
raise Exception("Video is unavailable, private, or removed.")
# DNS failure
elif "No address associated with hostname" in error_msg or "Errno -5" in error_msg:
raise Exception("DNS error: Cannot resolve hostname. Check network or try a different URL.")
# Repeated webpage downloads = silent bot block with no cookies
elif not cookies_loaded and "youtube" in job["payload"].lower():
raise Exception(
"YouTube download failed (no cookies loaded). "
f"Add Netscape-format cookies to YTDLP_COOKIES. yt-dlp error: {error_msg[-200:]}"
)
else:
raise Exception(f"yt-dlp failed: {error_msg[-300:]}")
# Find the downloaded file
downloaded_files = list(TEMP_DIR.glob(f"{task_id}_ytdlp.*"))
if not downloaded_files:
raise Exception("yt-dlp downloaded no files")
input_file = downloaded_files[0]
active_files.add(str(input_file))
tasks[task_id]["filename"] = input_file.name
tasks[task_id]["download_speed"] = None # Clear speed after download
tasks[task_id]["progress"] = 0 # Reset for compression phase
else:
# Direct download with curl
print(f"Using curl for {job['payload']}")
# First, try to get filename from Content-Disposition header
fname = None
try:
resp = await client.head(job["payload"])
fname = get_filename_from_headers(resp.headers, job["payload"])
except:
pass
# Fallback to URL if no Content-Disposition
if not fname:
fname = os.path.basename(str(job["payload"]).split("?")[0])
# Set extension
ext = os.path.splitext(fname)[1].lower() or ".bin"
input_file = input_file.with_suffix(ext)
# curl command:
# -L follow redirects
# --progress-bar machine-readable progress on stderr
# -o output file path
# We redirect stderr to stdout so we can read one stream.
# curl --progress-bar writes a single line like:
# ########## 27.3% 123.45M 4.56M/s 0:00:05
cmd = [
"curl",
"-L",
"--progress-bar",
"-o", str(input_file),
job["payload"]
]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT # merge stderr β†’ stdout
)
# curl --progress-bar writes \r-terminated lines (no \n).
# We read raw chunks and split on \r to get each update.
buf = b""
while True:
chunk = await process.stdout.read(256)
if not chunk:
break
buf += chunk
# Split on carriage-return (progress bar updates)
parts = buf.split(b"\r")
buf = parts[-1] # keep incomplete last segment
for part in parts[:-1]:
text = part.decode(errors="replace").strip()
if not text:
continue
print(f"curl: {text}")
try:
# curl --progress-bar line (after '#' block):
# "###### 27.3% 123.45M 4.56M/s 0:00:05"
# We look for the percentage token and speed token.
pct_match = re.search(r'(\d+(?:\.\d+)?)\s*%', text)
if pct_match:
percent = int(float(pct_match.group(1)))
tasks[task_id]["progress"] = min(99, percent)
print(f"βœ“ Progress: {percent}%")
# Speed: e.g. "4.56M/s" or "512k/s" or "1.23G/s"
speed_match = re.search(r'([\d.]+[kKMG](?:iB)?/s)', text)
if speed_match:
tasks[task_id]["download_speed"] = speed_match.group(1)
print(f"βœ“ Speed: {speed_match.group(1)}")
except Exception:
pass # Silent fail on parsing errors
await process.wait()
if process.returncode != 0:
raise Exception("curl download failed")
active_files.add(str(input_file))
tasks[task_id]["filename"] = fname
tasks[task_id]["download_speed"] = None # Clear speed after download
else:
# File from extraction
input_file = Path(job["payload"])
active_files.add(str(input_file))
# --- STEP 2: CHECK IF ARCHIVE ---
if input_file.suffix.lower() in ARCHIVE_EXTS:
tasks[task_id]["status"] = "extracting"
await extract_archive(input_file, unique_extract_path)
# PROTECT extraction directory
active_files.add(str(unique_extract_path))
# Find all videos and queue them
videos = []
for ext in VIDEO_EXTS:
videos.extend(unique_extract_path.rglob(f"*{ext}"))
if not videos: raise Exception("No videos found in archive")
# Queue each video as separate job (inherit settings from parent)
for video_path in sorted(videos):
# PROTECT each video file
active_files.add(str(video_path))
new_task_id = os.urandom(4).hex()
tasks[new_task_id] = {
"url": tasks[task_id]["url"],
"status": "queued",
"progress": 0,
"filename": video_path.name,
"created_at": time.time(),
"settings": tasks[task_id]["settings"] # Inherit settings
}
await video_queue.put({"type": "file", "payload": str(video_path), "id": new_task_id})
tasks[task_id]["status"] = f"extracted {len(videos)} videos"
else:
# --- COMPRESSION LOGIC ---
safe_name = "".join([c for c in tasks[task_id]["filename"] if c.isalnum() or c in "._- "])
base_name = os.path.splitext(safe_name)[0]
# Get compression settings for this task
settings = tasks[task_id]["settings"]
# Determine output format
if settings["format"] == "auto":
# Keep original extension for mirror mode
output_format = input_file.suffix.lstrip('.')
else:
output_format = settings["format"]
output_file = TEMP_DIR / f"{task_id}_out.{output_format}"
# PROTECT output file from janitor
active_files.add(str(output_file))
# Mirror mode (just copy)
if settings["codec"] == "copy":
tasks[task_id]["status"] = "mirroring"
tasks[task_id]["progress"] = 50
# Just copy the file
import shutil
shutil.copy2(input_file, output_file)
tasks[task_id]["progress"] = 100
else:
# Normal compression
duration = await get_video_duration(input_file)
tasks[task_id]["status"] = "compressing"
# Build FFmpeg command
cmd = build_ffmpeg_command(input_file, output_file, settings)
print(f"FFmpeg command: {' '.join(cmd)}")
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
while True:
line = await process.stdout.readline()
if not line: break
text = line.decode().strip()
if "out_time_ms=" in text and duration > 0:
try:
ms = int(text.split("=")[1])
percent = min(99, round((ms / 1000000) / duration * 100, 1))
tasks[task_id]["progress"] = percent
except: pass
_, stderr = await process.communicate()
if process.returncode != 0:
raise Exception(f"FFmpeg failed: {stderr.decode()[-500:]}")
# --- UPLOAD ---
tasks[task_id]["status"] = "uploading"
tasks[task_id]["progress"] = 100
if output_file.exists():
file_size_mb = output_file.stat().st_size / (1024 * 1024)
tasks[task_id]["output_size_mb"] = round(file_size_mb, 2)
with open(output_file, "rb") as f:
upload_filename = f"{base_name}_compressed.{output_format}"
up_res = await client.post(
"https://pixeldrain.com/api/file",
auth=("", PIXELDRAIN_KEY),
files={"file": (upload_filename, f)},
timeout=None
)
up_data = up_res.json()
if "id" in up_data:
tasks[task_id].update({
"status": "completed",
"url": f"https://pixeldrain.com/u/{up_data['id']}",
"finished_at": time.time()
})
else: raise Exception(f"Upload failed: {up_data}")
else:
raise Exception("Output file missing")
except Exception as e:
print(f"Task {task_id} Failed: {e}")
tasks[task_id]["status"] = f"failed: {str(e)[:100]}"
finally:
# --- CLEANUP ---
# Remove from active protection
if 'input_file' in locals():
active_files.discard(str(input_file))
if input_file.exists():
try: input_file.unlink()
except: pass
if 'output_file' in locals() and output_file:
active_files.discard(str(output_file))
if output_file.exists():
try: output_file.unlink()
except: pass
# For extracted files, only remove from protection if it was a source file
if job["type"] == "file":
source_path = Path(job["payload"])
active_files.discard(str(source_path))
# Clean up extraction directory only if all its videos are processed
if 'unique_extract_path' in locals() and unique_extract_path.exists():
# Check if any files in this directory are still protected
has_active_files = any(
str(path).startswith(str(unique_extract_path))
for path in active_files
)
if not has_active_files:
try:
shutil.rmtree(unique_extract_path)
active_files.discard(str(unique_extract_path))
print(f"Cleaned extraction folder: {unique_extract_path}")
except Exception as e:
print(f"Could not clean {unique_extract_path}: {e}")
video_queue.task_done()
# --- App Lifecycle ---
@asynccontextmanager
async def lifespan(app: FastAPI):
TEMP_DIR.mkdir(parents=True, exist_ok=True)
EXTRACT_DIR.mkdir(parents=True, exist_ok=True)
workers = [asyncio.create_task(worker(i)) for i in range(NUM_WORKERS)]
cleaner = asyncio.create_task(janitor())
yield
for w in workers: w.cancel()
cleaner.cancel()
app = FastAPI(
lifespan=lifespan,
title="Anime Video Compression API",
description="Compress anime videos for Blogger/Blogspot (<100MB) with WebM/VP9 optimization",
version="2.0.0"
)
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
@app.get("/")
async def root():
return {
"msg": "Anime Compression API Online",
"version": "2.0.0",
"recommended_preset": "webm_anime_720p",
"endpoints": {
"/compress": "POST - Compress video",
"/status": "GET - System status",
"/presets": "GET - Available presets"
}
}
@app.get("/presets")
async def get_presets():
"""Get all available compression presets"""
return {
"presets": {
name: {
"description": preset["description"],
"codec": preset["codec"],
"format": preset["format"],
"resolution": preset["resolution"],
"crf": preset["crf"],
"audio_codec": preset["audio_codec"],
"audio_bitrate": preset["audio_bitrate"]
}
for name, preset in COMPRESSION_PRESETS.items()
},
"recommended": "webm_anime_720p"
}
@app.get("/status")
async def get_status():
return {
"system": get_container_stats(),
"workers_active": NUM_WORKERS,
"queue_depth": video_queue.qsize(),
"tasks": tasks
}
@app.post("/compress")
async def compress_video(
request: CompressionRequest,
auth: HTTPAuthorizationCredentials = Security(HTTPBearer())
):
"""
Compress a video file with customizable settings.
**Recommended for Blogger**: Use `webm_anime_720p` preset (default)
**Presets**:
- `webm_anime_720p`: Best quality/size ratio for Blogger (recommended)
- `webm_anime_480p`: Smallest size, lower quality
- `mp4_anime_720p`: MP4 format for compatibility
- `mp4_anime_480p`: MP4 format, smaller size
**Custom Settings**: Override any preset parameter using `custom_settings`
"""
if auth.credentials != API_TOKEN:
raise HTTPException(403, "Invalid API token")
task_id = os.urandom(4).hex()
filename = "Unknown File"
# Pre-fetch Metadata (skip for yt-dlp URLs)
if not request.is_ytdlp:
try:
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client:
resp = await client.head(str(request.video_url))
if resp.status_code != 200:
resp = await client.get(str(request.video_url), headers={"Range": "bytes=0-0"})
filename = get_filename_from_headers(resp.headers, request.video_url)
except:
filename = os.path.basename(str(request.video_url).split("?")[0])
else:
filename = f"ytdlp_{task_id}"
# Build final settings (preset + custom overrides)
preset = COMPRESSION_PRESETS[request.preset]
final_settings = preset.copy()
# Apply custom settings if provided
if request.custom_settings:
custom_dict = request.custom_settings.model_dump(exclude_none=True)
final_settings.update(custom_dict)
# If codec changed, adjust extra_params accordingly
if "codec" in custom_dict:
if custom_dict["codec"] == "libvpx-vp9" and "extra_params" not in custom_dict:
final_settings["extra_params"] = ["-row-mt", "1", "-tile-columns", "1", "-threads", "2", "-deadline", "realtime", "-cpu-used", "4"]
elif custom_dict["codec"] in ["libx264", "libx265"] and "extra_params" not in custom_dict:
final_settings["extra_params"] = []
tasks[task_id] = {
"url": str(request.video_url),
"status": "queued",
"progress": 0,
"filename": filename,
"created_at": time.time(),
"preset": request.preset,
"settings": final_settings,
"is_ytdlp": request.is_ytdlp
}
await video_queue.put({
"type": "url",
"payload": str(request.video_url),
"id": task_id,
"is_ytdlp": request.is_ytdlp
})
return {
"task_id": task_id,
"filename": filename,
"preset": request.preset,
"output_format": final_settings["format"] if final_settings["format"] != "auto" else "original",
"message": "Task Queued"
}