Spaces:
Paused
Paused
| import gradio as gr | |
| import cv2 | |
| import os | |
| import requests | |
| from tqdm import tqdm | |
| import tempfile | |
| import logging | |
| import time | |
| from PIL import Image | |
| import io | |
| import concurrent.futures | |
| import subprocess | |
| import shutil | |
| import itertools | |
| # =========================== | |
| # CONFIG | |
| # =========================== | |
| UPSCALE_APIS = [ | |
| "https://sam12345324-imagemultiupscaler.hf.space/upscale-multiple", | |
| "https://api2.example.com/upscale-multiple", # Replace with your API | |
| "https://api3.example.com/upscale-multiple", # Replace with your API | |
| "https://api4.example.com/upscale-multiple", # Replace with your API | |
| "https://api5.example.com/upscale-multiple", # Replace with your API | |
| "https://api6.example.com/upscale-multiple", # Replace with your API | |
| "https://api7.example.com/upscale-multiple", # Replace with your API | |
| "https://api8.example.com/upscale-multiple", # Replace with your API | |
| ] | |
| api_cycle = itertools.cycle(UPSCALE_APIS) | |
| MAX_RETRIES = 3 | |
| BATCH_SIZE = 10 # Number of frames per API call | |
| # Setup logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s" | |
| ) | |
| # =========================== | |
| # Helper Functions | |
| # =========================== | |
| def validate_image(content): | |
| """Check if the content is a valid image using PIL.""" | |
| try: | |
| img = Image.open(io.BytesIO(content)) | |
| img.verify() | |
| return True | |
| except: | |
| return False | |
| def download_image(url): | |
| """Download an image from URL and return content or None.""" | |
| try: | |
| img_resp = requests.get(url) | |
| if img_resp.status_code == 200 and validate_image(img_resp.content): | |
| return img_resp.content | |
| else: | |
| logging.warning(f"Invalid or failed download for {url}") | |
| return None | |
| except Exception as e: | |
| logging.warning(f"Download error for {url}: {e}") | |
| return None | |
| def upscale_batch(frame_paths): | |
| """ | |
| Send a batch of frames to the multi-image upscaler API. | |
| Returns a list of downloaded image bytes or None for failed frames. | |
| """ | |
| for attempt in range(1, MAX_RETRIES + 1): | |
| try: | |
| files = [] | |
| for path in frame_paths: | |
| # Use 'files' key for each image | |
| files.append(("files", (os.path.basename(path), open(path, "rb"), "image/png"))) | |
| api = next(api_cycle) | |
| r = requests.post(api, files=files) | |
| if r.status_code == 200: | |
| data = r.json() | |
| urls = [item["upscaled_url"] for item in data.get("results", []) if item.get("status_code") == 200] # adjust based on API response | |
| if not urls or len(urls) != len(frame_paths): | |
| logging.warning(f"Batch attempt {attempt}: URL count mismatch") | |
| continue | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: | |
| results = list(executor.map(download_image, urls)) | |
| return results | |
| else: | |
| logging.warning(f"Batch attempt {attempt} failed: Status {r.status_code}") | |
| except Exception as e: | |
| logging.warning(f"Batch attempt {attempt} error: {e}") | |
| time.sleep(1) | |
| return [None]*len(frame_paths) | |
| # =========================== | |
| # Main Upscale Function | |
| # =========================== | |
| def upscale_video(video_file): | |
| logging.info(f"Starting upscale for video: {video_file}") | |
| # Temporary directories | |
| temp_dir = tempfile.mkdtemp() | |
| frames_dir = os.path.join(temp_dir, "frames") | |
| upscaled_dir = os.path.join(temp_dir, "upscaled") | |
| os.makedirs(frames_dir, exist_ok=True) | |
| os.makedirs(upscaled_dir, exist_ok=True) | |
| # --------------------------- | |
| # STEP 1: Extract frames | |
| # --------------------------- | |
| logging.info("Extracting frames...") | |
| cap = cv2.VideoCapture(video_file) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| logging.info(f"Video FPS: {fps}, Total frames: {frame_count}") | |
| frame_paths = [] | |
| idx = 0 | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frame_path = os.path.join(frames_dir, f"frame_{idx:05d}.png") | |
| cv2.imwrite(frame_path, frame) | |
| frame_paths.append(frame_path) | |
| idx += 1 | |
| cap.release() | |
| logging.info(f"Finished extracting {len(frame_paths)} frames") | |
| # --------------------------- | |
| # STEP 2: Upscale frames in batches | |
| # --------------------------- | |
| logging.info("Upscaling frames using API in batches...") | |
| upscaled_paths = [] | |
| batch_list = [frame_paths[i:i+BATCH_SIZE] for i in range(0, len(frame_paths), BATCH_SIZE)] | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=len(UPSCALE_APIS)) as executor: | |
| batch_results_list = list(tqdm(executor.map(upscale_batch, batch_list), desc="Upscaling batches", total=len(batch_list))) | |
| for batch_idx, batch_results in enumerate(batch_results_list): | |
| for j, content in enumerate(batch_results): | |
| frame_idx = batch_idx * BATCH_SIZE + j | |
| if content: | |
| out_path = os.path.join(upscaled_dir, f"upscaled_{frame_idx:05d}.jpg") | |
| with open(out_path, "wb") as f: | |
| f.write(content) | |
| upscaled_paths.append(out_path) | |
| logging.info(f"Saved upscaled frame {frame_idx}: {out_path}") | |
| else: | |
| # Use original frame if upscaling failed, convert to JPEG | |
| original_path = os.path.join(frames_dir, f"frame_{frame_idx:05d}.png") | |
| out_path = os.path.join(upscaled_dir, f"upscaled_{frame_idx:05d}.jpg") | |
| img = Image.open(original_path) | |
| img.convert('RGB').save(out_path, 'JPEG') | |
| upscaled_paths.append(out_path) | |
| logging.warning(f"Used original frame {frame_idx} due to failed upscaling") | |
| if not upscaled_paths: | |
| raise ValueError("No frames were successfully upscaled. Cannot merge video.") | |
| logging.info(f"Upscaled {len(upscaled_paths)} frames successfully") | |
| # --------------------------- | |
| # STEP 3: Merge frames back to video using FFmpeg | |
| # --------------------------- | |
| logging.info("Merging frames back to video using FFmpeg...") | |
| output_path = os.path.join(temp_dir, "upscaled_video.mp4") | |
| subprocess.run([ | |
| 'ffmpeg', '-framerate', str(fps), '-i', os.path.join(upscaled_dir, 'upscaled_%05d.jpg'), | |
| '-i', video_file, '-c:v', 'libx264', '-c:a', 'copy', '-map', '0:v', '-map', '1:a', | |
| '-pix_fmt', 'yuv420p', output_path | |
| ], check=True) | |
| logging.info(f"Upscaled video saved at: {output_path}") | |
| # --------------------------- | |
| # STEP 4: Cleanup temporary frames | |
| # --------------------------- | |
| logging.info("Cleaning temporary frames...") | |
| for f in frame_paths + upscaled_paths: | |
| try: | |
| os.remove(f) | |
| except Exception as e: | |
| logging.warning(f"Could not remove {f}: {e}") | |
| logging.info("Processing completed successfully") | |
| return output_path | |
| # =========================== | |
| # Gradio UI | |
| # =========================== | |
| demo = gr.Interface( | |
| fn=upscale_video, | |
| inputs=gr.Video(label="Upload Video"), | |
| outputs=gr.Video(label="Upscaled Video"), | |
| title="🎥 Video Upscaler (Batch URL Support)", | |
| description="Uploads a video and upscales it frame by frame using a multi-image URL-returning upscaler API (up to 10 images per batch)." | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |