Spaces:
Configuration error
Configuration error
| # -*- coding: utf-8 -*- | |
| """Copy of Welcome To Colab | |
| Automatically generated by Colab. | |
| Original file is located at | |
| https://colab.research.google.com/drive/1N6-JcsHJ-9Fk2J2B3DPEQe8OmebXIavh | |
| """ | |
| # Commented out IPython magic to ensure Python compatibility. | |
| !mkdir -p models | |
| !git clone https://github.com/jantic/DeOldify.git | |
| # %cd DeOldify | |
| !pip install -r requirements-colab.txt | |
| import sys | |
| sys.path.append('/content/DeOldify') | |
| !pip install deoldify opencv-python imageio[ffmpeg] tqdm transformers torch torchvision pillow | |
| !apt update && apt install ffmpeg -y # For video processing | |
| # Example for Artistic model | |
| !wget https://huggingface.co/databuzzword/deoldify-artistic/resolve/main/ColorizeArtistic_gen.pth -O models/ColorizeArtistic_gen.pth | |
| # Example for Stable model | |
| !wget https://huggingface.co/databuzzword/deoldify-stable/resolve/main/ColorizeStable_gen.pth -O models/ColorizeStable_gen.pth | |
| # Create models folder if not exists | |
| import os | |
| os.makedirs("models", exist_ok=True) | |
| # Download video model weights | |
| !wget -O models/ColorizeVideo_gen.pth https://data.deepai.org/deoldify/ColorizeVideo_gen.pth | |
| # Commented out IPython magic to ensure Python compatibility. | |
| # %%writefile /content/colorize_runner_fixed_optimized.py | |
| # """ | |
| # colorize_runner_fixed_optimized.py | |
| # A robust, patched, zero-surprise runner for DeOldify-based image & video colorization. | |
| # OPTIMIZED VERSION: Added GPU acceleration, batch processing, frame skipping/interpolation, and resizing for 5-10x faster videos. | |
| # | |
| # How to use: | |
| # Terminal: | |
| # python colorize_runner_fixed_optimized.py --image bw.jpg --out colored.jpg | |
| # python colorize_runner_fixed_optimized.py --video bw.mp4 --out colored.mp4 --max-frames 200 --batch-size 8 --skip-interval 2 --resize-factor 0.7 | |
| # | |
| # From notebook (recommended in Colab): | |
| # from colorize_runner_fixed_optimized import colorize_image, colorize_video, main_cli | |
| # colorize_image("/content/bw.jpg", "/content/colored.jpg", render_factor=21) | |
| # # Video: colorize_video("/content/bw.mp4", "/content/colored.mp4", batch_size=8, skip_interval=2) | |
| # # or call main_cli with arg list (it strips notebook args): | |
| # main_cli(["--video", "/content/bw.mp4", "--batch-size", "8"]) | |
| # | |
| # Notes: | |
| # - This script attempts to be tolerant of DeOldify fork differences (different function names & signatures). | |
| # - It patches torch.load to allow older saved objects to unpickle (necessary for many DeOldify .pth files). | |
| # - Security note: unpickling model files can execute code. Only use official/trusted weights. | |
| # - Optimizations: GPU full usage, batching (up to 16 frames), skipping (process every Nth frame + interpolate), resizing (downscale for speed). | |
| # - For Colab: Enable GPU runtime. Install: !pip install deoldify opencv-python imageio[ffmpeg] tqdm transformers torch torchvision | |
| # - Clone DeOldify: !git clone https://github.com/jantic/DeOldify.git; import sys; sys.path.append('/content/DeOldify') | |
| # """ | |
| # | |
| # import os | |
| # import sys | |
| # import shutil | |
| # import tempfile | |
| # import math | |
| # import inspect | |
| # import mimetypes | |
| # import imghdr | |
| # import argparse # For CLI | |
| # from pathlib import Path | |
| # from typing import Optional, Tuple, Dict, List | |
| # import torch | |
| # import cv2 | |
| # import numpy as np | |
| # from PIL import Image | |
| # import time # For timing benchmarks | |
| # import subprocess # For optional FFmpeg | |
| # from tqdm import tqdm | |
| # import imageio | |
| # | |
| # # Optional: transformers (BLIP) for captioning | |
| # try: | |
| # from transformers import BlipProcessor, BlipForConditionalGeneration | |
| # HAS_BLIP = True | |
| # except Exception: | |
| # HAS_BLIP = False | |
| # | |
| # # ------------------------- | |
| # # GPU Setup (Global) | |
| # # ------------------------- | |
| # device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| # print(f"Using device: {device}") | |
| # if torch.cuda.is_available(): | |
| # print(f"GPU: {torch.cuda.get_device_name(0)}") | |
| # print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB") | |
| # | |
| # # Function to move colorizer to GPU (call after loading) | |
| # def move_colorizer_to_gpu(colorizer): | |
| # if hasattr(colorizer, 'model') and colorizer.model is not None: | |
| # colorizer.model = colorizer.model.to(device) | |
| # # Handle if it's a nn.Module directly | |
| # if isinstance(colorizer, torch.nn.Module): | |
| # colorizer = colorizer.to(device) | |
| # # Recurse for nested models (common in DeOldify) | |
| # for attr_name in dir(colorizer): | |
| # attr = getattr(colorizer, attr_name) | |
| # if isinstance(attr, torch.nn.Module): | |
| # setattr(colorizer, attr_name, attr.to(device)) | |
| # print("Colorizer moved to GPU.") | |
| # return colorizer | |
| # | |
| # # ------------------------- | |
| # # PyTorch safety patch for older pickles (DeOldify weights) | |
| # # ------------------------- | |
| # def _patch_torch_load_for_legacy_weights(): | |
| # """ | |
| # Patch torch.load to load legacy DeOldify checkpoints that contain objects | |
| # disallowed by the new 'weights_only=True' default in PyTorch >=2.6. | |
| # | |
| # This patch forces weights_only=False when torch.load is called without an explicit | |
| # weights_only argument. This is necessary to unpickle some older checkpoints. | |
| # SECURITY: Only do this when you trust the checkpoint source (DeOldify official repo). | |
| # """ | |
| # try: | |
| # import torch | |
| # import functools | |
| # except Exception: | |
| # return # torch not installed yet | |
| # | |
| # try: | |
| # # allowlist common globals used by old checkpoints | |
| # safe_list = [functools.partial, torch.nn.modules.batchnorm.BatchNorm2d] | |
| # if hasattr(torch.serialization, "add_safe_globals"): | |
| # try: | |
| # torch.serialization.add_safe_globals(safe_list) | |
| # except Exception: | |
| # # ignore if unavailable | |
| # pass | |
| # except Exception: | |
| # pass | |
| # | |
| # # Monkey-patch torch.load to set weights_only=False by default (only when not provided). | |
| # try: | |
| # old_load = torch.load | |
| # def patched_load(*args, **kwargs): | |
| # if "weights_only" not in kwargs: | |
| # kwargs["weights_only"] = False | |
| # return old_load(*args, **kwargs) | |
| # torch.load = patched_load | |
| # except Exception: | |
| # pass | |
| # | |
| # # Apply patch immediately (harmless if torch isn't present) | |
| # _patch_torch_load_for_legacy_weights() | |
| # | |
| # # ------------------------- | |
| # # Attempt flexible DeOldify import (support various forks/layouts) | |
| # # ------------------------- | |
| # HAS_DEOLDIFY = False | |
| # _get_image_colorizer_fn = None | |
| # | |
| # def _import_deoldify_helpers(): | |
| # """ | |
| # Attempt multiple import paths and capture get_image_colorizer. | |
| # """ | |
| # global HAS_DEOLDIFY, _get_image_colorizer_fn | |
| # if _get_image_colorizer_fn is not None: | |
| # HAS_DEOLDIFY = True | |
| # return | |
| # | |
| # tried = [] | |
| # candidates = [ | |
| # "deoldify.visualize", # typical | |
| # "DeOldify.deoldify.visualize", # other layout if cloned inside package folder | |
| # "deoldify", # fallback: maybe installed differently | |
| # ] | |
| # for modname in candidates: | |
| # try: | |
| # mod = __import__(modname, fromlist=["get_image_colorizer"]) | |
| # if hasattr(mod, "get_image_colorizer"): | |
| # _get_image_colorizer_fn = getattr(mod, "get_image_colorizer") | |
| # HAS_DEOLDIFY = True | |
| # return | |
| # # some forks might provide a different helper name; try to find anything called get_*coloriz* | |
| # for name in dir(mod): | |
| # if "color" in name and "get" in name: | |
| # func = getattr(mod, name) | |
| # if callable(func): | |
| # _get_image_colorizer_fn = func | |
| # HAS_DEOLDIFY = True | |
| # return | |
| # except Exception as e: | |
| # tried.append((modname, str(e))) | |
| # HAS_DEOLDIFY = False | |
| # # no raise - we'll surface friendly error when user calls functions | |
| # | |
| # _import_deoldify_helpers() | |
| # | |
| # # ------------------------- | |
| # # BLIP caption utilities (optional) | |
| # # ------------------------- | |
| # _blip_proc = None | |
| # _blip_model = None | |
| # def _init_blip(model_name: str="Salesforce/blip-image-captioning-base"): | |
| # global _blip_proc, _blip_model, HAS_BLIP | |
| # if not HAS_BLIP: | |
| # return False | |
| # if _blip_proc is None: | |
| # _blip_proc = BlipProcessor.from_pretrained(model_name) | |
| # if _blip_model is None: | |
| # _blip_model = BlipForConditionalGeneration.from_pretrained(model_name).to(device) | |
| # return True | |
| # | |
| # def generate_caption(image_path: str, max_length: int=40) -> Optional[str]: | |
| # if not HAS_BLIP: | |
| # return None | |
| # _init_blip() | |
| # img = Image.open(image_path).convert("RGB") | |
| # inputs = _blip_proc(images=img, return_tensors="pt").to(device) | |
| # with torch.no_grad(): | |
| # out = _blip_model.generate(**inputs, max_length=max_length, num_beams=4) | |
| # caption = _blip_proc.tokenizer.decode(out[0], skip_special_tokens=True) | |
| # return caption | |
| # | |
| # # ------------------------- | |
| # # Helper utilities | |
| # # ------------------------- | |
| # def is_image(path: str) -> bool: | |
| # if not os.path.exists(path): return False | |
| # mt, _ = mimetypes.guess_type(path) | |
| # if mt and mt.startswith("image"): return True | |
| # try: | |
| # if imghdr.what(path) is not None: | |
| # return True | |
| # except Exception: | |
| # pass | |
| # try: | |
| # Image.open(path).verify() | |
| # return True | |
| # except Exception: | |
| # return False | |
| # | |
| # def is_video(path: str) -> bool: | |
| # if not os.path.exists(path): return False | |
| # mt, _ = mimetypes.guess_type(path) | |
| # if mt and mt.startswith("video"): return True | |
| # try: | |
| # cap = cv2.VideoCapture(path) | |
| # ok, _ = cap.read() | |
| # cap.release() | |
| # return ok | |
| # except Exception: | |
| # return False | |
| # | |
| # def detect_media(path: str) -> Optional[str]: | |
| # if is_image(path): return "image" | |
| # if is_video(path): return "video" | |
| # return None | |
| # | |
| # # ------------------------- | |
| # # DeOldify colorizer helper (robust) | |
| # # ------------------------- | |
| # _colorizer_cache = {} | |
| # | |
| # def get_deoldify_colorizer(artistic: bool=True, *args, **kwargs): | |
| # """ | |
| # Load and cache a DeOldify image colorizer object. Accepts various signatures. | |
| # Returns the loaded colorizer object or raises a helpful RuntimeError. | |
| # """ | |
| # if not HAS_DEOLDIFY or _get_image_colorizer_fn is None: | |
| # raise RuntimeError( | |
| # "DeOldify helper not found. Please clone the DeOldify repo and add it to PYTHONPATH " | |
| # "(or install a compatible fork). Example:\n" | |
| # " git clone https://github.com/jantic/DeOldify.git\n" | |
| # " sys.path.append('/content/DeOldify')\n" | |
| # ) | |
| # | |
| # cache_key = ("deoldify_colorizer", artistic) | |
| # if cache_key in _colorizer_cache: | |
| # return _colorizer_cache[cache_key] | |
| # | |
| # # Try to call the function with different parameter names, defensively | |
| # fn = _get_image_colorizer_fn | |
| # signature = None | |
| # try: | |
| # signature = inspect.signature(fn) | |
| # except Exception: | |
| # pass | |
| # | |
| # # Build candidate kwargs based on signature | |
| # call_kwargs = {} | |
| # if signature: | |
| # params = signature.parameters | |
| # if "artistic" in params: | |
| # call_kwargs["artistic"] = artistic | |
| # elif "mode" in params: | |
| # call_kwargs["mode"] = "artistic" if artistic else "stable" | |
| # # some versions accept weights_path or weights_name; leave them out unless provided | |
| # else: | |
| # # unknown signature - just call with a single boolean | |
| # try: | |
| # colorizer = fn(artistic) | |
| # colorizer = move_colorizer_to_gpu(colorizer) | |
| # _colorizer_cache[cache_key] = colorizer | |
| # return colorizer | |
| # except Exception as e: | |
| # raise RuntimeError("Could not call DeOldify helper: " + str(e)) | |
| # | |
| # # attempt call | |
| # try: | |
| # colorizer = fn(**call_kwargs) | |
| # except TypeError: | |
| # # fallback - call with no args | |
| # colorizer = fn() | |
| # colorizer = move_colorizer_to_gpu(colorizer) | |
| # _colorizer_cache[cache_key] = colorizer | |
| # return colorizer | |
| # | |
| # def _find_colorize_method(colorizer): | |
| # """ | |
| # Return a callable that colorizes an image path and returns either: | |
| # - path to output file | |
| # - PIL Image | |
| # - numpy array | |
| # We try common method names across forks. | |
| # """ | |
| # candidates = [ | |
| # "colorize_from_path", | |
| # "colorize_from_file", | |
| # "colorize", | |
| # "get_transformed_image", | |
| # "get_colorized_image", | |
| # "colorize_image" | |
| # ] | |
| # for name in candidates: | |
| # if hasattr(colorizer, name): | |
| # return getattr(colorizer, name) | |
| # # Some colorizers return a method nested under `.colorizer` or similar | |
| # for attr in dir(colorizer): | |
| # if "colorize" in attr and callable(getattr(colorizer, attr)): | |
| # return getattr(colorizer, attr) | |
| # raise RuntimeError("Cannot find a colorize method in loaded DeOldify colorizer object. Inspect the object.") | |
| # | |
| # # ------------------------- | |
| # # Optimized Image colorization (Supports Batches) | |
| # # ------------------------- | |
| # def colorize_image(input_paths_or_arrays, # str path, list of paths, or np.array/list of arrays | |
| # output_paths_or_dir: str, # Single path, list, or dir to save | |
| # render_factor: int = 35, | |
| # produce_caption: bool = True, | |
| # artistic: bool = True, | |
| # batch_size: int = 8, | |
| # resize_factor: float = 1.0) -> List[Dict]: | |
| # """ | |
| # Colorize single image or batch. Returns list of {'output_path': str, 'caption': Optional[str]} | |
| # """ | |
| # is_single = not isinstance(input_paths_or_arrays, (list, tuple)) | |
| # if is_single: | |
| # inputs = [input_paths_or_arrays] | |
| # if isinstance(output_paths_or_dir, str): | |
| # outputs = [output_paths_or_dir] # Single output | |
| # else: | |
| # outputs = [output_paths_or_dir] | |
| # else: | |
| # inputs = input_paths_or_arrays | |
| # if isinstance(output_paths_or_dir, str): # Dir mode | |
| # os.makedirs(output_paths_or_dir, exist_ok=True) | |
| # outputs = [os.path.join(output_paths_or_dir, f"colored_{i:06d}.png") for i in range(len(inputs))] | |
| # else: | |
| # outputs = output_paths_or_dir | |
| # | |
| # colorizer = get_deoldify_colorizer(artistic=artistic) | |
| # colorize_fn = _find_colorize_method(colorizer) | |
| # | |
| # results = [] | |
| # start_time = time.time() | |
| # | |
| # # Process in batches | |
| # for i in tqdm(range(0, len(inputs), batch_size), desc="Batching colorization"): | |
| # batch_inputs = inputs[i:i + batch_size] | |
| # batch_outputs = outputs[i:i + batch_size] | |
| # | |
| # batch_results = [] | |
| # for j, (inp, outp) in enumerate(zip(batch_inputs, batch_outputs)): | |
| # # Load image if path | |
| # if isinstance(inp, str): | |
| # if not os.path.exists(inp): | |
| # raise FileNotFoundError(f"Input not found: {inp}") | |
| # img_array = cv2.imread(inp) | |
| # img_array = cv2.cvtColor(img_array, cv2.COLOR_BGR2RGB) | |
| # else: | |
| # img_array = inp if isinstance(inp, np.ndarray) else np.array(inp) | |
| # | |
| # # Resize for speed (optional) | |
| # orig_shape = img_array.shape[:2] | |
| # if resize_factor != 1.0: | |
| # h, w = int(img_array.shape[0] * resize_factor), int(img_array.shape[1] * resize_factor) | |
| # img_array = cv2.resize(img_array, (w, h)) | |
| # | |
| # # Defensive colorize call | |
| # res = None | |
| # try_patterns = [ | |
| # {"path": inp, "render_factor": render_factor} if isinstance(inp, str) else None, | |
| # {"image": img_array, "render_factor": render_factor}, | |
| # {"render_factor": render_factor}, | |
| # {} | |
| # ] | |
| # for kwargs in try_patterns: | |
| # if kwargs is None: continue | |
| # try: | |
| # res = colorize_fn(**kwargs) | |
| # break | |
| # except TypeError: | |
| # continue | |
| # | |
| # if res is None: | |
| # try: | |
| # res = colorize_fn(inp if isinstance(inp, str) else img_array) | |
| # except Exception as e: | |
| # raise RuntimeError(f"Colorize failed for batch item {j}: {e}") | |
| # | |
| # # Handle result | |
| # final_out = None | |
| # if isinstance(res, str) and os.path.exists(res): | |
| # final_out = res | |
| # shutil.copy(final_out, outp) | |
| # elif isinstance(res, (tuple, list)) and len(res) > 0 and isinstance(res[0], str) and os.path.exists(res[0]): | |
| # shutil.copy(res[0], outp) | |
| # final_out = outp | |
| # elif hasattr(res, "save"): | |
| # res.save(outp) | |
| # final_out = outp | |
| # elif isinstance(res, np.ndarray): | |
| # # Resize back if needed | |
| # if resize_factor != 1.0: | |
| # res = cv2.resize(res, orig_shape[::-1]) | |
| # Image.fromarray(res).save(outp) | |
| # final_out = outp | |
| # else: | |
| # # Fallback copy/search (as in original) | |
| # if isinstance(inp, str): | |
| # shutil.copy(inp, outp) | |
| # else: | |
| # Image.fromarray(img_array).save(outp) | |
| # final_out = outp | |
| # | |
| # # Caption if single image mode | |
| # caption = None | |
| # if produce_caption and HAS_BLIP and is_single: | |
| # try: | |
| # caption = generate_caption(final_out) | |
| # Append missing code to complete the file (run this after the previous %%writefile) | |
| with open('/content/colorize_runner_fixed_optimized.py', 'a') as f: | |
| f.write(''' | |
| except Exception: | |
| pass | |
| batch_results.append({"output_path": final_out, "caption": caption}) | |
| results.extend(batch_results) | |
| end_time = time.time() | |
| print(f"Colorized {len(inputs)} item(s) in {end_time - start_time:.2f}s ({len(inputs)/(end_time - start_time):.1f} items/sec)") | |
| return results[0] if is_single else results | |
| # ------------------------- | |
| # Video pipeline (Optimized) | |
| # ------------------------- | |
| def extract_frames(video_path: str, frames_dir: str, target_fps: Optional[int] = None, skip_interval: int = 1, use_ffmpeg: bool = False) -> Tuple[int, int]: | |
| """ | |
| Extract frames from video, optionally skipping for speed. | |
| Returns (num_extracted_frames, fps) | |
| """ | |
| os.makedirs(frames_dir, exist_ok=True) | |
| if use_ffmpeg: | |
| # FFmpeg for faster extraction (install: !apt install ffmpeg in Colab) | |
| cap = cv2.VideoCapture(video_path) | |
| orig_fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 | |
| cap.release() | |
| fps = int(round(orig_fps)) if target_fps is None else int(target_fps) | |
| scale_fps = fps / max(1, skip_interval) | |
| cmd = [ | |
| 'ffmpeg', '-i', video_path, | |
| '-vf', f'fps={scale_fps}', | |
| '-y', f'{frames_dir}/frame_%06d.png' | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, check=True) | |
| frame_files = sorted([f for f in os.listdir(frames_dir) if f.endswith('.png')]) | |
| print(f"FFmpeg extracted {len(frame_files)} frames (effective skip: {skip_interval})") | |
| return len(frame_files), fps | |
| else: | |
| # OpenCV with skipping | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise RuntimeError(f"Cannot open video {video_path}") | |
| orig_fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 | |
| fps = int(round(orig_fps)) if target_fps is None else int(target_fps) | |
| interval = max(1, skip_interval) | |
| idx = 0 | |
| saved = 0 | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| if idx % interval == 0: | |
| fname = os.path.join(frames_dir, f"frame_{saved:06d}.png") | |
| cv2.imwrite(fname, frame) | |
| saved += 1 | |
| idx += 1 | |
| cap.release() | |
| print(f"OpenCV extracted {saved} frames (skipped every {interval-1})") | |
| return saved, fps | |
| def interpolate_skipped_frames(color_dir: str, orig_num_frames: int, skip_interval: int = 1) -> None: | |
| """ | |
| If frames were skipped, interpolate (blend) to create full sequence. | |
| Assumes processed frames are in color_dir as frame_000000.png, etc. | |
| This is a simple linear blend; for better quality, use optical flow (e.g., via OpenCV's DISOpticalFlow). | |
| """ | |
| if skip_interval <= 1: | |
| return # No skipping needed | |
| processed_files = sorted([f for f in os.listdir(color_dir) if f.startswith('frame_') and f.endswith('.png')]) | |
| num_processed = len(processed_files) | |
| if num_processed == 0: | |
| return | |
| # Load processed frames | |
| processed_frames = [] | |
| for f in processed_files: | |
| img = cv2.imread(os.path.join(color_dir, f)) | |
| processed_frames.append(img) | |
| # Generate full sequence with interpolation | |
| full_frames = [] | |
| for i in range(orig_num_frames): | |
| # Find nearest processed frames | |
| proc_idx = i // skip_interval | |
| if proc_idx >= num_processed: | |
| proc_idx = num_processed - 1 | |
| prev_frame = processed_frames[proc_idx] | |
| # Simple hold or blend with next if available | |
| if proc_idx + 1 < num_processed and i % skip_interval != 0: | |
| next_frame = processed_frames[proc_idx + 1] | |
| alpha = (i % skip_interval) / skip_interval | |
| blended = cv2.addWeighted(prev_frame, 1 - alpha, next_frame, alpha, 0) | |
| full_frames.append(blended) | |
| else: | |
| full_frames.append(prev_frame) | |
| # Overwrite with full sequence | |
| for i, frame in enumerate(full_frames): | |
| fname = os.path.join(color_dir, f"frame_{i:06d}.png") | |
| cv2.imwrite(fname, frame) | |
| print(f"Interpolated to {orig_num_frames} full frames.") | |
| def reassemble_video(frames_dir: str, output_path: str, fps: int = 25) -> None: | |
| """ | |
| Reassemble colored frames into video using imageio (or FFmpeg). | |
| """ | |
| frame_files = sorted([os.path.join(frames_dir, f) for f in os.listdir(frames_dir) if f.startswith('frame_') and f.endswith('.png')]) | |
| if not frame_files: | |
| raise RuntimeError("No frames found to reassemble.") | |
| # Use imageio for simplicity (FFmpeg backend if installed) | |
| with imageio.get_writer(output_path, fps=fps, codec='libx264') as writer: | |
| for frame_path in tqdm(frame_files, desc="Reassembling video"): | |
| img = imageio.imread(frame_path) | |
| writer.append_data(img) | |
| print(f"Video saved to {output_path}") | |
| def colorize_video(input_path: str, | |
| output_path: str, | |
| max_frames: Optional[int] = None, | |
| batch_size: int = 8, | |
| skip_interval: int = 1, | |
| resize_factor: float = 1.0, | |
| artistic: bool = True, | |
| render_factor: int = 35, | |
| use_ffmpeg: bool = True, | |
| target_fps: Optional[int] = None) -> Dict: | |
| """ | |
| Full optimized video colorization pipeline. | |
| Returns {'output_path': str, 'processed_frames': int, 'total_time': float} | |
| """ | |
| if not is_video(input_path): | |
| raise ValueError(f"Input {input_path} is not a valid video.") | |
| start_time = time.time() | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| frames_dir = os.path.join(temp_dir, "frames") | |
| color_dir = os.path.join(temp_dir, "colored") | |
| # Step 1: Extract frames (with skipping) | |
| cap = cv2.VideoCapture(input_path) | |
| orig_num_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| cap.release() | |
| extract_num = min(orig_num_frames, max_frames) if max_frames else orig_num_frames | |
| num_extracted, fps = extract_frames(input_path, frames_dir, target_fps, skip_interval, use_ffmpeg) | |
| # Step 2: Colorize extracted frames (batch) | |
| colorize_image(frames_dir, color_dir, render_factor=render_factor, artistic=artistic, | |
| batch_size=batch_size, resize_factor=resize_factor, produce_caption=False) | |
| # Step 3: Interpolate skipped frames | |
| interpolate_skipped_frames(color_dir, orig_num_frames, skip_interval) | |
| # Step 4: Reassemble video | |
| reassemble_video(color_dir, output_path, fps) | |
| total_time = time.time() - start_time | |
| print(f"Video colorized in {total_time:.2f}s ({num_extracted} frames processed, {orig_num_frames} total)") | |
| return {"output_path": output_path, "processed_frames": num_extracted, "total_time": total_time} | |
| # ------------------------- | |
| # CLI Interface | |
| # ------------------------- | |
| def main_cli(args: Optional[List[str]] = None): | |
| """ | |
| CLI entrypoint. Call with sys.argv or list. | |
| """ | |
| parser = argparse.ArgumentParser(description="DeOldify Colorization Runner") | |
| parser.add_argument("--image", type=str, help="Input image path") | |
| parser.add_argument("--video", type=str, help="Input video path") | |
| parser.add_argument("--out", "-o", type=str, required=True, help="Output path") | |
| parser.add_argument("--render-factor", type=int, default=35, help="Render factor (21-40)") | |
| parser.add_argument("--artistic", action="store_true", default=True, help="Use artistic mode") | |
| parser.add_argument("--batch-size", type=int, default=8, help="Batch size for processing") | |
| parser.add_argument("--skip-interval", type=int, default=1, help="Frame skip interval (1=full)") | |
| parser.add_argument("--resize-factor", type=float, default=1.0, help="Resize factor for speed (0.5=half size)") | |
| parser.add_argument("--max-frames", type=int, default=None, help="Max frames to process (videos)") | |
| if args is None: | |
| args = sys.argv[1:] | |
| opts = parser.parse_args(args) | |
| if opts.image: | |
| result = colorize_image(opts.image, opts.out, render_factor=opts.render_factor, | |
| artistic=opts.artistic, batch_size=opts.batch_size, | |
| resize_factor=opts.resize_factor) | |
| print(f"Colored image: {result['output_path']}") | |
| elif opts.video: | |
| result = colorize_video(opts.video, opts.out, max_frames=opts.max_frames, | |
| batch_size=opts.batch_size, skip_interval=opts.skip_interval, | |
| resize_factor=opts.resize_factor, artistic=opts.artistic, | |
| render_factor=opts.render_factor) | |
| print(f"Colored video: {result['output_path']}") | |
| else: | |
| parser.print_help() | |
| if __name__ == "__main__": | |
| main_cli() | |
| ''') | |
| print("File completed and fixed!") | |
| from colorize_runner_fixed_optimized import colorize_image, detect_media, is_image | |
| print("Import successful!") | |
| # --- 🔹 IMAGE COLORIZATION CELL (with Upload + Download + Control Buttons) 🔹 --- | |
| from datetime import datetime | |
| from IPython.display import display, clear_output | |
| import cv2, os, time | |
| from google.colab import files | |
| import ipywidgets as widgets | |
| def run_image_colorization(input_path, render_factor=35, resize_factor=1.0): | |
| """ | |
| Enhanced DeOldify Image Colorizer | |
| --------------------------------- | |
| ✅ Upload support | |
| ✅ Auto grayscale detection | |
| ✅ Before/After preview | |
| ✅ Download button (Colab-native) | |
| ✅ Rerun & Clear helpers | |
| """ | |
| from colorize_runner_fixed_optimized import colorize_image, detect_media, is_image | |
| if not os.path.exists(input_path): | |
| raise FileNotFoundError(f"File not found: {input_path}") | |
| if not is_image(input_path): | |
| raise ValueError("Provided path is not a valid image.") | |
| # --- Detect grayscale --- | |
| img = cv2.imread(input_path) | |
| gray_check = ( | |
| len(img.shape) < 3 or img.shape[2] == 1 | |
| or (cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) == img[:, :, 0]).all() | |
| ) | |
| if not gray_check: | |
| print("⚠️ Image appears already colored — still running for enhancement.") | |
| # --- Output path --- | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| output_path = f"/content/colorized_{timestamp}.jpg" | |
| # --- Colorize --- | |
| print("🎨 Starting colorization...") | |
| start_time = time.time() | |
| result = colorize_image(input_path, output_path, render_factor=render_factor, resize_factor=resize_factor) | |
| end_time = time.time() | |
| print(f"✅ Done in {end_time - start_time:.2f}s — saved at {output_path}") | |
| # --- Before/After display --- | |
| before = cv2.cvtColor(cv2.imread(input_path), cv2.COLOR_BGR2RGB) | |
| after = cv2.cvtColor(cv2.imread(result['output_path']), cv2.COLOR_BGR2RGB) | |
| import matplotlib.pyplot as plt | |
| plt.figure(figsize=(14,6)) | |
| plt.subplot(1,2,1); plt.imshow(before); plt.title("Before"); plt.axis("off") | |
| plt.subplot(1,2,2); plt.imshow(after); plt.title("After"); plt.axis("off") | |
| plt.show() | |
| # --- Caption (optional) --- | |
| if result.get('caption'): | |
| print(f"🧠 Caption: {result['caption']}") | |
| # --- Buttons --- | |
| download_btn = widgets.Button(description="⬇️ Download Image", button_style='success', icon='download') | |
| rerun_btn = widgets.Button(description="🔁 Re-run", button_style='info', icon='refresh') | |
| clear_btn = widgets.Button(description="🧹 Clear", button_style='warning', icon='trash') | |
| def on_download(b): files.download(output_path) | |
| def on_clear(b): clear_output(); print("🧹 Output cleared.") | |
| def on_rerun(b): clear_output(); print("🔁 Re-running..."); run_image_colorization(input_path) | |
| download_btn.on_click(on_download) | |
| clear_btn.on_click(on_clear) | |
| rerun_btn.on_click(on_rerun) | |
| display(widgets.HBox([download_btn, rerun_btn, clear_btn])) | |
| return result['output_path'] | |
| # --- Upload section --- | |
| uploader = widgets.FileUpload(accept='image/*', multiple=False) | |
| display(widgets.HTML("<h3>📤 Upload an Image for Colorization</h3>")) | |
| display(uploader) | |
| def handle_upload(change): | |
| if uploader.value: | |
| for name, file_info in uploader.value.items(): | |
| path = f"/content/{name}" | |
| with open(path, 'wb') as f: | |
| f.write(file_info['content']) | |
| print(f"✅ Uploaded: {path}") | |
| run_image_colorization(path) | |
| uploader.observe(handle_upload, names='value') | |
| # --- 🔹 VIDEO COLORIZATION CELL (with Upload + Download + Controls) 🔹 --- | |
| import os, time | |
| from IPython.display import display, clear_output | |
| from google.colab import files | |
| import ipywidgets as widgets | |
| def run_video_colorization(input_path): | |
| """ | |
| DeOldify Video Colorizer with UI | |
| -------------------------------- | |
| ✅ Upload video support | |
| ✅ Automatic downscale → colorize → upscale | |
| ✅ Download button | |
| ✅ Clear & Rerun helpers | |
| """ | |
| lowres_video = "/content/video_lowres.mp4" | |
| colorized_lowres = "/content/sample_color_lowres.mp4" | |
| final_upscaled = "/content/sample_color_final.mp4" | |
| print("🎬 Starting video colorization...") | |
| # --- Step 2: Downscale --- | |
| print("⬇️ Downscaling for faster processing...") | |
| !ffmpeg -y -i "$input_path" -vf scale=640:-1 -r 15 "$lowres_video" | |
| # --- Step 3: Colorize --- | |
| print("🎨 Running DeOldify colorization...") | |
| start_time = time.time() | |
| main_cli(["--video", lowres_video, "--out", colorized_lowres]) | |
| end_time = time.time() | |
| print(f"✅ Colorization done in {end_time - start_time:.2f}s.") | |
| # --- Step 4: Upscale --- | |
| print("⬆️ Upscaling to 1080p 24fps...") | |
| !ffmpeg -y -i "$colorized_lowres" -vf scale=1920:1080 -r 24 "$final_upscaled" | |
| print(f"✅ Final video saved at: {final_upscaled}") | |
| # --- Buttons --- | |
| download_btn = widgets.Button(description="⬇️ Download Video", button_style='success', icon='download') | |
| rerun_btn = widgets.Button(description="🔁 Re-run", button_style='info', icon='refresh') | |
| clear_btn = widgets.Button(description="🧹 Clear", button_style='warning', icon='trash') | |
| def on_download(b): files.download(final_upscaled) | |
| def on_clear(b): clear_output(); print("🧹 Output cleared.") | |
| def on_rerun(b): clear_output(); print("🔁 Re-running..."); run_video_colorization(input_path) | |
| download_btn.on_click(on_download) | |
| clear_btn.on_click(on_clear) | |
| rerun_btn.on_click(on_rerun) | |
| display(widgets.HBox([download_btn, rerun_btn, clear_btn])) | |
| # --- Upload section --- | |
| video_uploader = widgets.FileUpload(accept='video/*', multiple=False) | |
| display(widgets.HTML("<h3>📤 Upload a Video for Colorization</h3>")) | |
| display(video_uploader) | |
| def handle_video_upload(change): | |
| if video_uploader.value: | |
| for name, file_info in video_uploader.value.items(): | |
| path = f"/content/{name}" | |
| with open(path, 'wb') as f: | |
| f.write(file_info['content']) | |
| print(f"✅ Uploaded: {path}") | |
| run_video_colorization(path) | |
| video_uploader.observe(handle_video_upload, names='value') | |
| !pip install gradio | |
| # --- 🔹 AI COLORIZATION WEB APP (Gradio Interface) 🔹 --- | |
| import gradio as gr | |
| import os | |
| import time | |
| import cv2 | |
| from colorize_runner_fixed_optimized import colorize_image | |
| # main_cli should already be imported from your existing code | |
| # --- Image Colorization Wrapper for Gradio --- | |
| def colorize_image_app(image): | |
| """ | |
| Gradio wrapper for image colorization. | |
| """ | |
| if image is None: | |
| return None, "⚠️ Please upload an image first." | |
| output_path = "/content/colorized_image_gradio.jpg" | |
| try: | |
| start_time = time.time() | |
| result = colorize_image(image, output_path) | |
| end_time = time.time() | |
| msg = f"✅ Image colorized successfully in {end_time - start_time:.2f}s!" | |
| return output_path, msg | |
| except Exception as e: | |
| return None, f"❌ Error: {str(e)}" | |
| # --- Video Colorization Wrapper for Gradio --- | |
| def colorize_video_app(video): | |
| """ | |
| Gradio wrapper for video colorization. | |
| """ | |
| if video is None: | |
| return None, "⚠️ Please upload a video first." | |
| input_video = video | |
| lowres_video = "/content/video_lowres_gradio.mp4" | |
| colorized_lowres = "/content/sample_color_lowres_gradio.mp4" | |
| final_upscaled = "/content/sample_color_final_gradio.mp4" | |
| try: | |
| print("⬇️ Downscaling video for faster processing...") | |
| os.system(f'ffmpeg -y -i "{input_video}" -vf scale=640:-1 -r 15 "{lowres_video}"') | |
| print("🎨 Running DeOldify colorization...") | |
| start_time = time.time() | |
| main_cli(["--video", lowres_video, "--out", colorized_lowres]) | |
| end_time = time.time() | |
| print(f"✅ Done in {end_time - start_time:.2f}s.") | |
| print("⬆️ Upscaling to 1080p 24fps...") | |
| os.system(f'ffmpeg -y -i "{colorized_lowres}" -vf scale=1920:1080 -r 24 "{final_upscaled}"') | |
| msg = f"✅ Video colorized successfully in {end_time - start_time:.2f}s!" | |
| return final_upscaled, msg | |
| except Exception as e: | |
| return None, f"❌ Error: {str(e)}" | |
| # --- BUILD GRADIO INTERFACE --- | |
| with gr.Blocks() as demo: | |
| gr.Markdown(""" | |
| # 🎨 AI-Based Image & Video Colorization | |
| Upload grayscale media and watch it come to life with color! | |
| """) | |
| with gr.Tab("🖼️ Image Colorization"): | |
| img_input = gr.Image(type="filepath", label="Upload Image") | |
| img_output = gr.Image(label="Colorized Output") | |
| img_status = gr.Textbox(label="Status", interactive=False) | |
| gr.Button("🎨 Colorize Image").click(colorize_image_app, inputs=img_input, outputs=[img_output, img_status]) | |
| with gr.Tab("🎬 Video Colorization"): | |
| vid_input = gr.Video(label="Upload Video") | |
| vid_output = gr.Video(label="Colorized Output") | |
| vid_status = gr.Textbox(label="Status", interactive=False) | |
| gr.Button("🎨 Colorize Video").click(colorize_video_app, inputs=vid_input, outputs=[vid_output, vid_status]) | |
| gr.Markdown("Developed by [Your Name] — Final Year Project 2025 🎓") | |
| # --- LAUNCH APP --- | |
| demo.launch(share=True) |