videoupscaler / app.py
sam12345324's picture
Update app.py
c51de31 verified
import gradio as gr
import cv2
import os
import requests
from tqdm import tqdm
import tempfile
import logging
import time
from PIL import Image
import io
import concurrent.futures
import subprocess
import shutil
import itertools
# ===========================
# CONFIG
# ===========================
UPSCALE_APIS = [
"https://sam12345324-imagemultiupscaler.hf.space/upscale-multiple",
"https://api2.example.com/upscale-multiple", # Replace with your API
"https://api3.example.com/upscale-multiple", # Replace with your API
"https://api4.example.com/upscale-multiple", # Replace with your API
"https://api5.example.com/upscale-multiple", # Replace with your API
"https://api6.example.com/upscale-multiple", # Replace with your API
"https://api7.example.com/upscale-multiple", # Replace with your API
"https://api8.example.com/upscale-multiple", # Replace with your API
]
api_cycle = itertools.cycle(UPSCALE_APIS)
MAX_RETRIES = 3
BATCH_SIZE = 10 # Number of frames per API call
# Setup logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
# ===========================
# Helper Functions
# ===========================
def validate_image(content):
"""Check if the content is a valid image using PIL."""
try:
img = Image.open(io.BytesIO(content))
img.verify()
return True
except:
return False
def download_image(url):
"""Download an image from URL and return content or None."""
try:
img_resp = requests.get(url)
if img_resp.status_code == 200 and validate_image(img_resp.content):
return img_resp.content
else:
logging.warning(f"Invalid or failed download for {url}")
return None
except Exception as e:
logging.warning(f"Download error for {url}: {e}")
return None
def upscale_batch(frame_paths):
"""
Send a batch of frames to the multi-image upscaler API.
Returns a list of downloaded image bytes or None for failed frames.
"""
for attempt in range(1, MAX_RETRIES + 1):
try:
files = []
for path in frame_paths:
# Use 'files' key for each image
files.append(("files", (os.path.basename(path), open(path, "rb"), "image/png")))
api = next(api_cycle)
r = requests.post(api, files=files)
if r.status_code == 200:
data = r.json()
urls = [item["upscaled_url"] for item in data.get("results", []) if item.get("status_code") == 200] # adjust based on API response
if not urls or len(urls) != len(frame_paths):
logging.warning(f"Batch attempt {attempt}: URL count mismatch")
continue
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(download_image, urls))
return results
else:
logging.warning(f"Batch attempt {attempt} failed: Status {r.status_code}")
except Exception as e:
logging.warning(f"Batch attempt {attempt} error: {e}")
time.sleep(1)
return [None]*len(frame_paths)
# ===========================
# Main Upscale Function
# ===========================
def upscale_video(video_file):
logging.info(f"Starting upscale for video: {video_file}")
# Temporary directories
temp_dir = tempfile.mkdtemp()
frames_dir = os.path.join(temp_dir, "frames")
upscaled_dir = os.path.join(temp_dir, "upscaled")
os.makedirs(frames_dir, exist_ok=True)
os.makedirs(upscaled_dir, exist_ok=True)
# ---------------------------
# STEP 1: Extract frames
# ---------------------------
logging.info("Extracting frames...")
cap = cv2.VideoCapture(video_file)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
logging.info(f"Video FPS: {fps}, Total frames: {frame_count}")
frame_paths = []
idx = 0
while True:
ret, frame = cap.read()
if not ret:
break
frame_path = os.path.join(frames_dir, f"frame_{idx:05d}.png")
cv2.imwrite(frame_path, frame)
frame_paths.append(frame_path)
idx += 1
cap.release()
logging.info(f"Finished extracting {len(frame_paths)} frames")
# ---------------------------
# STEP 2: Upscale frames in batches
# ---------------------------
logging.info("Upscaling frames using API in batches...")
upscaled_paths = []
batch_list = [frame_paths[i:i+BATCH_SIZE] for i in range(0, len(frame_paths), BATCH_SIZE)]
with concurrent.futures.ThreadPoolExecutor(max_workers=len(UPSCALE_APIS)) as executor:
batch_results_list = list(tqdm(executor.map(upscale_batch, batch_list), desc="Upscaling batches", total=len(batch_list)))
for batch_idx, batch_results in enumerate(batch_results_list):
for j, content in enumerate(batch_results):
frame_idx = batch_idx * BATCH_SIZE + j
if content:
out_path = os.path.join(upscaled_dir, f"upscaled_{frame_idx:05d}.jpg")
with open(out_path, "wb") as f:
f.write(content)
upscaled_paths.append(out_path)
logging.info(f"Saved upscaled frame {frame_idx}: {out_path}")
else:
# Use original frame if upscaling failed, convert to JPEG
original_path = os.path.join(frames_dir, f"frame_{frame_idx:05d}.png")
out_path = os.path.join(upscaled_dir, f"upscaled_{frame_idx:05d}.jpg")
img = Image.open(original_path)
img.convert('RGB').save(out_path, 'JPEG')
upscaled_paths.append(out_path)
logging.warning(f"Used original frame {frame_idx} due to failed upscaling")
if not upscaled_paths:
raise ValueError("No frames were successfully upscaled. Cannot merge video.")
logging.info(f"Upscaled {len(upscaled_paths)} frames successfully")
# ---------------------------
# STEP 3: Merge frames back to video using FFmpeg
# ---------------------------
logging.info("Merging frames back to video using FFmpeg...")
output_path = os.path.join(temp_dir, "upscaled_video.mp4")
subprocess.run([
'ffmpeg', '-framerate', str(fps), '-i', os.path.join(upscaled_dir, 'upscaled_%05d.jpg'),
'-i', video_file, '-c:v', 'libx264', '-c:a', 'copy', '-map', '0:v', '-map', '1:a',
'-pix_fmt', 'yuv420p', output_path
], check=True)
logging.info(f"Upscaled video saved at: {output_path}")
# ---------------------------
# STEP 4: Cleanup temporary frames
# ---------------------------
logging.info("Cleaning temporary frames...")
for f in frame_paths + upscaled_paths:
try:
os.remove(f)
except Exception as e:
logging.warning(f"Could not remove {f}: {e}")
logging.info("Processing completed successfully")
return output_path
# ===========================
# Gradio UI
# ===========================
demo = gr.Interface(
fn=upscale_video,
inputs=gr.Video(label="Upload Video"),
outputs=gr.Video(label="Upscaled Video"),
title="🎥 Video Upscaler (Batch URL Support)",
description="Uploads a video and upscales it frame by frame using a multi-image URL-returning upscaler API (up to 10 images per batch)."
)
if __name__ == "__main__":
demo.launch()