|
|
""" |
|
|
Image Enhancement Utility using Real-ESRGAN-ncnn-vulkan |
|
|
Enhances blurry/low-quality images before VLM processing |
|
|
""" |
|
|
|
|
|
import os |
|
|
import subprocess |
|
|
import tempfile |
|
|
import zipfile |
|
|
import shutil |
|
|
from pathlib import Path |
|
|
from PIL import Image |
|
|
import urllib.request |
|
|
import platform |
|
|
|
|
|
|
|
|
class ImageEnhancer: |
|
|
"""Handles image enhancement using Real-ESRGAN-ncnn-vulkan""" |
|
|
|
|
|
|
|
|
REALESRGAN_VERSION = "v0.2.0" |
|
|
REALESRGAN_WINDOWS_URL = f"https://github.com/xinntao/Real-ESRGAN-ncnn-vulkan/releases/download/{REALESRGAN_VERSION}/realesrgan-ncnn-vulkan-v0.2.0-windows.zip" |
|
|
|
|
|
def __init__(self, base_dir: str = None): |
|
|
"""Initialize image enhancer |
|
|
|
|
|
Args: |
|
|
base_dir: Base directory for storing executable and models |
|
|
""" |
|
|
if base_dir is None: |
|
|
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
|
|
|
|
|
self.base_dir = Path(base_dir) |
|
|
self.enhancer_dir = self.base_dir / "utils" / "realesrgan" |
|
|
self.executable_path = None |
|
|
self.models_path = None |
|
|
self.is_available = False |
|
|
|
|
|
|
|
|
self._setup_enhancer() |
|
|
|
|
|
def _setup_enhancer(self): |
|
|
"""Setup Real-ESRGAN enhancer (download if needed)""" |
|
|
try: |
|
|
|
|
|
if self._check_existing_installation(): |
|
|
print("✅ Real-ESRGAN enhancer already installed") |
|
|
self.is_available = True |
|
|
return |
|
|
|
|
|
|
|
|
print("📥 Downloading Real-ESRGAN enhancer...") |
|
|
self._download_and_extract() |
|
|
|
|
|
if self._check_existing_installation(): |
|
|
print("✅ Real-ESRGAN enhancer installed successfully") |
|
|
self.is_available = True |
|
|
else: |
|
|
print("⚠️ Real-ESRGAN enhancer setup incomplete") |
|
|
self.is_available = False |
|
|
|
|
|
except Exception as e: |
|
|
print(f"⚠️ Failed to setup Real-ESRGAN enhancer: {str(e)}") |
|
|
print(" Image enhancement will be skipped") |
|
|
self.is_available = False |
|
|
|
|
|
def _check_existing_installation(self) -> bool: |
|
|
"""Check if Real-ESRGAN is already installed""" |
|
|
if not self.enhancer_dir.exists(): |
|
|
return False |
|
|
|
|
|
|
|
|
exe_name = "realesrgan-ncnn-vulkan.exe" if platform.system() == "Windows" else "realesrgan-ncnn-vulkan" |
|
|
possible_paths = [ |
|
|
self.enhancer_dir / exe_name, |
|
|
self.enhancer_dir / "realesrgan-ncnn-vulkan" / exe_name, |
|
|
self.enhancer_dir / "realesrgan-ncnn-vulkan-v0.2.0-ubuntu" / exe_name, |
|
|
] |
|
|
|
|
|
for path in possible_paths: |
|
|
if path.exists() and (path.stat().st_mode & 0o111 or platform.system() == "Windows"): |
|
|
self.executable_path = path |
|
|
print(f" Found executable: {path}") |
|
|
|
|
|
|
|
|
possible_model_dirs = [ |
|
|
path.parent / "models", |
|
|
self.enhancer_dir / "models", |
|
|
] |
|
|
|
|
|
for models_dir in possible_model_dirs: |
|
|
if models_dir.exists(): |
|
|
self.models_path = models_dir |
|
|
print(f" Found models: {models_dir}") |
|
|
return True |
|
|
|
|
|
|
|
|
|
|
|
print(f" Models directory not found, but executable is available") |
|
|
self.models_path = path.parent / "models" |
|
|
return True |
|
|
|
|
|
return False |
|
|
|
|
|
def _download_and_extract(self): |
|
|
"""Download and extract Real-ESRGAN executable""" |
|
|
if platform.system() != "Windows": |
|
|
print("⚠️ Auto-download only supported on Windows. Please manually install Real-ESRGAN-ncnn-vulkan") |
|
|
return |
|
|
|
|
|
|
|
|
self.enhancer_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
zip_path = self.enhancer_dir / "realesrgan.zip" |
|
|
print(f" Downloading from {self.REALESRGAN_WINDOWS_URL}...") |
|
|
|
|
|
try: |
|
|
urllib.request.urlretrieve(self.REALESRGAN_WINDOWS_URL, zip_path) |
|
|
except Exception as e: |
|
|
print(f" Download failed: {str(e)}") |
|
|
return |
|
|
|
|
|
|
|
|
print(" Extracting files...") |
|
|
try: |
|
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref: |
|
|
zip_ref.extractall(self.enhancer_dir) |
|
|
except Exception as e: |
|
|
print(f" Extraction failed: {str(e)}") |
|
|
return |
|
|
|
|
|
|
|
|
zip_path.unlink() |
|
|
|
|
|
print(" Setup complete!") |
|
|
|
|
|
def enhance_image(self, image_path: str, scale: int = 2, model_name: str = "realesrgan-x4plus") -> str: |
|
|
"""Enhance image using Real-ESRGAN |
|
|
|
|
|
Args: |
|
|
image_path: Path to input image |
|
|
scale: Upscale ratio (2, 3, or 4) |
|
|
model_name: Model to use (realesrgan-x4plus, realesrgan-x4plus-anime, realesrnet-x4plus) |
|
|
|
|
|
Returns: |
|
|
Path to enhanced image |
|
|
""" |
|
|
if not self.is_available: |
|
|
print("⚠️ Enhancement not available, using original image") |
|
|
return image_path |
|
|
|
|
|
|
|
|
input_path = Path(image_path) |
|
|
output_path = input_path.parent / f"{input_path.stem}_enhanced{input_path.suffix}" |
|
|
|
|
|
try: |
|
|
|
|
|
cmd = [ |
|
|
str(self.executable_path), |
|
|
"-i", str(image_path), |
|
|
"-o", str(output_path), |
|
|
"-n", model_name, |
|
|
"-s", str(scale), |
|
|
"-f", "png" |
|
|
] |
|
|
|
|
|
|
|
|
if self.models_path and self.models_path.exists(): |
|
|
cmd.extend(["-m", str(self.models_path)]) |
|
|
|
|
|
|
|
|
result = subprocess.run( |
|
|
cmd, |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=30, |
|
|
creationflags=subprocess.CREATE_NO_WINDOW if platform.system() == "Windows" else 0 |
|
|
) |
|
|
|
|
|
if result.returncode == 0 and output_path.exists(): |
|
|
print(f"✨ Image enhanced successfully (scale={scale}x)") |
|
|
return str(output_path) |
|
|
else: |
|
|
if result.stderr: |
|
|
print(f"⚠️ Enhancement failed: {result.stderr}") |
|
|
print(" Using original image") |
|
|
return image_path |
|
|
|
|
|
except subprocess.TimeoutExpired: |
|
|
print("⚠️ Enhancement timeout, using original image") |
|
|
return image_path |
|
|
except Exception as e: |
|
|
print(f"⚠️ Enhancement error: {str(e)}, using original image") |
|
|
return image_path |
|
|
|
|
|
def enhance_pil_image(self, pil_image: Image.Image, scale: int = 2, model_name: str = "realesrgan-x4plus") -> Image.Image: |
|
|
"""Enhance PIL Image object |
|
|
|
|
|
Args: |
|
|
pil_image: PIL Image object |
|
|
scale: Upscale ratio (2, 3, or 4) |
|
|
model_name: Model to use |
|
|
|
|
|
Returns: |
|
|
Enhanced PIL Image object |
|
|
""" |
|
|
if not self.is_available: |
|
|
return pil_image |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as temp_input: |
|
|
temp_input_path = temp_input.name |
|
|
pil_image.save(temp_input_path, "PNG") |
|
|
|
|
|
try: |
|
|
|
|
|
enhanced_path = self.enhance_image(temp_input_path, scale, model_name) |
|
|
|
|
|
|
|
|
if enhanced_path != temp_input_path: |
|
|
enhanced_image = Image.open(enhanced_path).convert("RGB") |
|
|
|
|
|
try: |
|
|
os.unlink(enhanced_path) |
|
|
except: |
|
|
pass |
|
|
return enhanced_image |
|
|
else: |
|
|
return pil_image |
|
|
|
|
|
finally: |
|
|
|
|
|
try: |
|
|
os.unlink(temp_input_path) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
_enhancer_instance = None |
|
|
|
|
|
def get_enhancer() -> ImageEnhancer: |
|
|
"""Get global enhancer instance""" |
|
|
global _enhancer_instance |
|
|
if _enhancer_instance is None: |
|
|
_enhancer_instance = ImageEnhancer() |
|
|
return _enhancer_instance |
|
|
|