Spaces:
Sleeping
Sleeping
| from flask import Flask, render_template, jsonify, request, send_file | |
| import torch | |
| import os | |
| import time | |
| import threading | |
| from datetime import datetime | |
| import cv2 | |
| from werkzeug.utils import secure_filename | |
| import uuid | |
| import mimetypes | |
| import numpy as np | |
| from PIL import Image | |
| import traceback | |
| # Real-ESRGAN imports with comprehensive error handling | |
| REALESRGAN_AVAILABLE = False | |
| REALESRGAN_ERROR = None | |
| try: | |
| from realesrgan import RealESRGANer | |
| from basicsr.archs.rrdbnet_arch import RRDBNet | |
| REALESRGAN_AVAILABLE = True | |
| print("β Real-ESRGAN successfully imported") | |
| except ImportError as e: | |
| REALESRGAN_ERROR = str(e) | |
| print(f"β οΈ Real-ESRGAN not available: {e}") | |
| except Exception as e: | |
| REALESRGAN_ERROR = str(e) | |
| print(f"β Real-ESRGAN import error: {e}") | |
| # Configuration | |
| UPLOAD_FOLDER = '/data/uploads' | |
| OUTPUT_FOLDER = '/data/outputs' | |
| MODEL_FOLDER = '/data/models' | |
| # Global application state | |
| app_state = { | |
| "cuda_available": torch.cuda.is_available(), | |
| "realesrgan_available": REALESRGAN_AVAILABLE, | |
| "realesrgan_error": REALESRGAN_ERROR, | |
| "processing_active": False, | |
| "logs": [], | |
| "processed_files": [], | |
| "current_model": None, | |
| "upscaler": None | |
| } | |
| def ensure_directories(): | |
| """Create necessary directories""" | |
| directories = [UPLOAD_FOLDER, OUTPUT_FOLDER, MODEL_FOLDER] | |
| for directory in directories: | |
| try: | |
| os.makedirs(directory, exist_ok=True) | |
| print(f"β Directory verified: {directory}") | |
| except Exception as e: | |
| print(f"β οΈ Error creating directory {directory}: {e}") | |
| def allowed_file(filename): | |
| """Check if file has allowed extension""" | |
| return '.' in filename and \ | |
| filename.rsplit('.', 1)[1].lower() in ['png', 'jpg', 'jpeg', 'gif', 'bmp', 'tiff', 'webp', 'mp4', 'avi', 'mov', 'mkv'] | |
| def get_file_mimetype(filename): | |
| """Get correct mimetype for file""" | |
| mimetype, _ = mimetypes.guess_type(filename) | |
| if mimetype is None: | |
| ext = filename.lower().rsplit('.', 1)[1] if '.' in filename else '' | |
| if ext in ['mp4', 'avi', 'mov', 'mkv']: | |
| mimetype = f'video/{ext}' | |
| elif ext in ['png', 'jpg', 'jpeg', 'gif', 'bmp', 'tiff', 'webp']: | |
| mimetype = f'image/{ext}' | |
| else: | |
| mimetype = 'application/octet-stream' | |
| return mimetype | |
| def log_message(message): | |
| """Add message to log with timestamp""" | |
| timestamp = datetime.now().strftime("%H:%M:%S") | |
| app_state["logs"].append(f"[{timestamp}] {message}") | |
| if len(app_state["logs"]) > 100: | |
| app_state["logs"] = app_state["logs"][-100:] | |
| print(f"[{timestamp}] {message}") | |
| def download_realesrgan_models(): | |
| """Download Real-ESRGAN models if not present""" | |
| if not REALESRGAN_AVAILABLE: | |
| log_message("β Real-ESRGAN not available for model download") | |
| return False | |
| models = { | |
| 'RealESRGAN_x4plus': 'https://github.com/xinntao/Real-ESRGAN/releases/download/v0.1.0/RealESRGAN_x4plus.pth', | |
| 'RealESRGAN_x2plus': 'https://github.com/xinntao/Real-ESRGAN/releases/download/v0.2.1/RealESRGAN_x2plus.pth' | |
| } | |
| try: | |
| import urllib.request | |
| for model_name, url in models.items(): | |
| model_path = os.path.join(MODEL_FOLDER, f"{model_name}.pth") | |
| if not os.path.exists(model_path): | |
| log_message(f"π₯ Downloading {model_name}...") | |
| try: | |
| urllib.request.urlretrieve(url, model_path) | |
| log_message(f"β Downloaded {model_name}") | |
| except Exception as e: | |
| log_message(f"β Failed to download {model_name}: {e}") | |
| return False | |
| else: | |
| log_message(f"β Model {model_name} already exists") | |
| return True | |
| except Exception as e: | |
| log_message(f"β Error downloading models: {str(e)}") | |
| return False | |
| def initialize_realesrgan(model_name='RealESRGAN_x4plus', scale=4): | |
| """Initialize Real-ESRGAN upscaler with robust error handling""" | |
| if not REALESRGAN_AVAILABLE: | |
| log_message(f"β Real-ESRGAN not available: {REALESRGAN_ERROR}") | |
| return None | |
| try: | |
| log_message(f"π§ Initializing Real-ESRGAN with {model_name}...") | |
| model_path = os.path.join(MODEL_FOLDER, f"{model_name}.pth") | |
| # Check if model exists, download if not | |
| if not os.path.exists(model_path): | |
| log_message(f"π₯ Model {model_name} not found, downloading...") | |
| if not download_realesrgan_models(): | |
| log_message("β Failed to download models") | |
| return None | |
| # Verify model file | |
| if not os.path.exists(model_path) or os.path.getsize(model_path) == 0: | |
| log_message(f"β Model file invalid: {model_path}") | |
| return None | |
| log_message(f"π Model file verified: {os.path.getsize(model_path) / (1024*1024):.1f}MB") | |
| # Initialize model architecture | |
| if model_name == 'RealESRGAN_x4plus': | |
| model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=23, num_grow_ch=32, scale=4) | |
| netscale = 4 | |
| elif model_name == 'RealESRGAN_x2plus': | |
| model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=23, num_grow_ch=32, scale=2) | |
| netscale = 2 | |
| else: | |
| log_message(f"β Unknown model: {model_name}") | |
| return None | |
| # Use CPU for maximum compatibility | |
| device = torch.device('cpu') | |
| log_message(f"π₯οΈ Using device: {device}") | |
| # Initialize upscaler with conservative settings | |
| upscaler = RealESRGANer( | |
| scale=netscale, | |
| model_path=model_path, | |
| model=model, | |
| tile=400, # Reasonable tile size for CPU | |
| tile_pad=10, | |
| pre_pad=0, | |
| half=False, # No half precision on CPU | |
| device=device | |
| ) | |
| # Test the upscaler with a small image | |
| log_message("π§ͺ Testing Real-ESRGAN with sample image...") | |
| test_img = np.random.randint(0, 255, (64, 64, 3), dtype=np.uint8) | |
| try: | |
| _, _ = upscaler.enhance(test_img, outscale=2) | |
| log_message("β Real-ESRGAN test successful") | |
| except Exception as e: | |
| log_message(f"β Real-ESRGAN test failed: {e}") | |
| return None | |
| app_state["upscaler"] = upscaler | |
| app_state["current_model"] = model_name | |
| log_message(f"β Real-ESRGAN initialized: {model_name} on {device}") | |
| return upscaler | |
| except Exception as e: | |
| log_message(f"β Error initializing Real-ESRGAN: {str(e)}") | |
| log_message(f"π Traceback: {traceback.format_exc()}") | |
| app_state["upscaler"] = None | |
| app_state["current_model"] = None | |
| return None | |
| def optimize_gpu(): | |
| """Optimize GPU configuration""" | |
| try: | |
| if torch.cuda.is_available(): | |
| torch.backends.cudnn.benchmark = True | |
| torch.cuda.empty_cache() | |
| # Test GPU | |
| test_tensor = torch.randn(100, 100, device='cuda') | |
| _ = torch.mm(test_tensor, test_tensor) | |
| del test_tensor | |
| torch.cuda.empty_cache() | |
| log_message("β GPU optimized") | |
| return True | |
| else: | |
| log_message("β οΈ CUDA not available, using CPU") | |
| return False | |
| except Exception as e: | |
| log_message(f"β Error optimizing GPU: {str(e)}") | |
| return False | |
| def upscale_image_4k(input_path, output_path): | |
| """Main upscaling function - tries Real-ESRGAN first, falls back to enhanced bicubic""" | |
| def process_worker(): | |
| try: | |
| log_message(f"π¨ Starting 4K upscaling: {os.path.basename(input_path)}") | |
| app_state["processing_active"] = True | |
| start_time = time.time() | |
| # Read image with error handling | |
| try: | |
| img = cv2.imread(input_path, cv2.IMREAD_COLOR) | |
| if img is None: | |
| # Try with PIL as fallback | |
| pil_img = Image.open(input_path).convert('RGB') | |
| img = cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR) | |
| except Exception as e: | |
| log_message(f"β Error reading image: {e}") | |
| return | |
| if img is None: | |
| log_message("β Error: Could not read image with any method") | |
| return | |
| h, w = img.shape[:2] | |
| log_message(f"π Original resolution: {w}x{h}") | |
| success = False | |
| method_used = "Unknown" | |
| # Try Real-ESRGAN first if available | |
| if REALESRGAN_AVAILABLE and app_state["upscaler"] is not None: | |
| try: | |
| log_message("π§ Applying Real-ESRGAN neural upscaling...") | |
| output, _ = app_state["upscaler"].enhance(img, outscale=4) | |
| cv2.imwrite(output_path, output) | |
| method_used = f"Real-ESRGAN ({app_state['current_model']})" | |
| success = True | |
| log_message("β Real-ESRGAN upscaling successful") | |
| except Exception as e: | |
| log_message(f"β οΈ Real-ESRGAN failed: {str(e)}") | |
| log_message("π Falling back to enhanced bicubic...") | |
| else: | |
| log_message("β οΈ Real-ESRGAN not available, using enhanced bicubic") | |
| # Fallback to enhanced bicubic if Real-ESRGAN failed or not available | |
| if not success: | |
| log_message("π Using enhanced bicubic upscaling...") | |
| target_h, target_w = h * 4, w * 4 | |
| if torch.cuda.is_available(): | |
| try: | |
| # GPU enhanced bicubic | |
| device = torch.device('cuda') | |
| image_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| image_tensor = torch.from_numpy(image_rgb).float().to(device) / 255.0 | |
| image_tensor = image_tensor.permute(2, 0, 1).unsqueeze(0) | |
| with torch.no_grad(): | |
| # Progressive upscaling for better quality | |
| intermediate = torch.nn.functional.interpolate( | |
| image_tensor, | |
| size=(h * 2, w * 2), | |
| mode='bicubic', | |
| align_corners=False, | |
| antialias=True | |
| ) | |
| upscaled = torch.nn.functional.interpolate( | |
| intermediate, | |
| size=(target_h, target_w), | |
| mode='bicubic', | |
| align_corners=False, | |
| antialias=True | |
| ) | |
| # Enhanced sharpening | |
| kernel = torch.tensor([ | |
| [-0.5, -1, -0.5], | |
| [-1, 7, -1], | |
| [-0.5, -1, -0.5] | |
| ], dtype=torch.float32, device=device).unsqueeze(0).unsqueeze(0) | |
| enhanced_channels = [] | |
| for i in range(3): | |
| channel = upscaled[:, i:i+1, :, :] | |
| padded = torch.nn.functional.pad(channel, (1, 1, 1, 1), mode='reflect') | |
| enhanced = torch.nn.functional.conv2d(padded, kernel) | |
| enhanced_channels.append(enhanced) | |
| enhanced = torch.cat(enhanced_channels, dim=1) | |
| final_result = torch.clamp(enhanced, 0, 1) | |
| result_cpu = final_result.squeeze(0).permute(1, 2, 0).cpu().numpy() | |
| result_image = (result_cpu * 255).astype(np.uint8) | |
| result_bgr = cv2.cvtColor(result_image, cv2.COLOR_RGB2BGR) | |
| cv2.imwrite(output_path, result_bgr) | |
| method_used = "Enhanced Bicubic (GPU)" | |
| success = True | |
| log_message("β GPU enhanced bicubic completed") | |
| except Exception as e: | |
| log_message(f"β οΈ GPU processing failed: {e}") | |
| if not success: | |
| # CPU enhanced bicubic | |
| log_message("π» Using CPU enhanced bicubic...") | |
| # Progressive upscaling | |
| intermediate = cv2.resize(img, (w * 2, h * 2), interpolation=cv2.INTER_CUBIC) | |
| upscaled = cv2.resize(intermediate, (target_w, target_h), interpolation=cv2.INTER_CUBIC) | |
| # Apply sharpening | |
| kernel = np.array([ | |
| [-0.5, -1, -0.5], | |
| [-1, 7, -1], | |
| [-0.5, -1, -0.5] | |
| ]) | |
| sharpened = cv2.filter2D(upscaled, -1, kernel) | |
| # Blend for final result | |
| result = cv2.addWeighted(upscaled, 0.7, sharpened, 0.3, 0) | |
| cv2.imwrite(output_path, result) | |
| method_used = "Enhanced Bicubic (CPU)" | |
| success = True | |
| log_message("β CPU enhanced bicubic completed") | |
| if success: | |
| # Verify output | |
| try: | |
| final_img = cv2.imread(output_path) | |
| if final_img is not None: | |
| final_h, final_w = final_img.shape[:2] | |
| processing_time = time.time() - start_time | |
| log_message(f"β Upscaling completed: {final_w}x{final_h}") | |
| log_message(f"π Scale factor: {final_w/w:.1f}x") | |
| log_message(f"β±οΈ Processing time: {processing_time:.1f}s") | |
| log_message(f"π§ Method used: {method_used}") | |
| # Add to processed files | |
| app_state["processed_files"].append({ | |
| "input_file": os.path.basename(input_path), | |
| "output_file": os.path.basename(output_path), | |
| "original_size": f"{w}x{h}", | |
| "upscaled_size": f"{final_w}x{final_h}", | |
| "method": method_used, | |
| "processing_time": f"{processing_time:.1f}s", | |
| "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| }) | |
| else: | |
| log_message("β Error: Output file could not be read") | |
| except Exception as e: | |
| log_message(f"β Error verifying output: {e}") | |
| else: | |
| log_message("β All upscaling methods failed") | |
| except Exception as e: | |
| log_message(f"β Critical error in upscaling: {str(e)}") | |
| log_message(f"π Traceback: {traceback.format_exc()}") | |
| finally: | |
| app_state["processing_active"] = False | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| thread = threading.Thread(target=process_worker) | |
| thread.daemon = True | |
| thread.start() | |
| def upscale_video_4k(input_path, output_path): | |
| """Upscale video to 4K frame by frame""" | |
| def process_worker(): | |
| try: | |
| log_message(f"π¬ Starting video upscaling: {os.path.basename(input_path)}") | |
| app_state["processing_active"] = True | |
| # Open video | |
| cap = cv2.VideoCapture(input_path) | |
| if not cap.isOpened(): | |
| log_message("β Error: Could not open video") | |
| return | |
| # Get video properties | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| log_message(f"πΉ Video: {w}x{h}, {fps}FPS, {frame_count} frames") | |
| # Configure output | |
| target_w, target_h = w * 4, h * 4 | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (target_w, target_h)) | |
| # Process frames | |
| frame_num = 0 | |
| start_time = time.time() | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frame_num += 1 | |
| try: | |
| # Enhanced bicubic for video frames (faster than Real-ESRGAN) | |
| upscaled_frame = cv2.resize(frame, (target_w, target_h), interpolation=cv2.INTER_CUBIC) | |
| # Light sharpening | |
| kernel = np.array([[0, -1, 0], [-1, 5, -1], [0, -1, 0]]) | |
| sharpened = cv2.filter2D(upscaled_frame, -1, kernel) | |
| final_frame = cv2.addWeighted(upscaled_frame, 0.8, sharpened, 0.2, 0) | |
| out.write(final_frame) | |
| # Progress logging | |
| if frame_num % 30 == 0: | |
| progress = (frame_num / frame_count) * 100 | |
| elapsed = time.time() - start_time | |
| eta = (elapsed / frame_num) * (frame_count - frame_num) | |
| log_message(f"ποΈ Frame {frame_num}/{frame_count} ({progress:.1f}%) - ETA: {eta:.0f}s") | |
| except Exception as e: | |
| log_message(f"β οΈ Error processing frame {frame_num}: {e}") | |
| break | |
| cap.release() | |
| out.release() | |
| # Verify output | |
| if os.path.exists(output_path) and os.path.getsize(output_path) > 0: | |
| file_size = os.path.getsize(output_path) | |
| total_time = time.time() - start_time | |
| log_message(f"β Video upscaling completed: {target_w}x{target_h}") | |
| log_message(f"π Output size: {file_size / (1024**2):.1f}MB") | |
| log_message(f"β±οΈ Total time: {total_time:.1f}s") | |
| # Add to processed files | |
| app_state["processed_files"].append({ | |
| "input_file": os.path.basename(input_path), | |
| "output_file": os.path.basename(output_path), | |
| "original_size": f"{w}x{h}", | |
| "upscaled_size": f"{target_w}x{target_h}", | |
| "frame_count": frame_count, | |
| "fps": fps, | |
| "method": "Enhanced Bicubic (Video)", | |
| "processing_time": f"{total_time:.1f}s", | |
| "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| }) | |
| else: | |
| log_message("β Video processing failed") | |
| except Exception as e: | |
| log_message(f"β Error processing video: {str(e)}") | |
| log_message(f"π Traceback: {traceback.format_exc()}") | |
| finally: | |
| app_state["processing_active"] = False | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| thread = threading.Thread(target=process_worker) | |
| thread.daemon = True | |
| thread.start() | |
| # Initialize directories | |
| ensure_directories() | |
| def force_init_realesrgan(): | |
| """Force Real-ESRGAN initialization with detailed logging""" | |
| log_message("π§ Attempting Real-ESRGAN initialization...") | |
| if not REALESRGAN_AVAILABLE: | |
| log_message(f"β Real-ESRGAN package not available: {REALESRGAN_ERROR}") | |
| return False | |
| try: | |
| # Try to download models first | |
| log_message("π₯ Checking/downloading Real-ESRGAN models...") | |
| download_success = download_realesrgan_models() | |
| if not download_success: | |
| log_message("β Model download failed") | |
| return False | |
| # Initialize upscaler | |
| log_message("π§ Creating Real-ESRGAN upscaler...") | |
| upscaler = initialize_realesrgan() | |
| if upscaler: | |
| log_message("β Real-ESRGAN initialized successfully!") | |
| return True | |
| else: | |
| log_message("β Real-ESRGAN initialization returned None") | |
| return False | |
| except Exception as e: | |
| log_message(f"β Real-ESRGAN initialization error: {str(e)}") | |
| log_message(f"π Traceback: {traceback.format_exc()}") | |
| return False | |
| # Try to initialize Real-ESRGAN on startup | |
| log_message("π Starting Real-ESRGAN initialization...") | |
| if REALESRGAN_AVAILABLE: | |
| force_init_realesrgan() | |
| else: | |
| log_message("β οΈ Real-ESRGAN not available, will use enhanced bicubic fallback") | |
| app = Flask(__name__) | |
| def index(): | |
| return render_template('index.html') | |
| def api_system(): | |
| """Get comprehensive system information""" | |
| try: | |
| info = {} | |
| # GPU Info | |
| if torch.cuda.is_available(): | |
| info["gpu_available"] = True | |
| info["gpu_name"] = torch.cuda.get_device_name() | |
| total_memory = torch.cuda.get_device_properties(0).total_memory | |
| allocated_memory = torch.cuda.memory_allocated() | |
| info["gpu_memory"] = f"{total_memory / (1024**3):.1f}GB" | |
| info["gpu_memory_used"] = f"{allocated_memory / (1024**3):.1f}GB" | |
| info["gpu_memory_free"] = f"{(total_memory - allocated_memory) / (1024**3):.1f}GB" | |
| info["cuda_version"] = torch.version.cuda | |
| else: | |
| info["gpu_available"] = False | |
| info["gpu_name"] = "CPU Only" | |
| info["gpu_memory"] = "N/A" | |
| info["gpu_memory_used"] = "N/A" | |
| info["gpu_memory_free"] = "N/A" | |
| info["cuda_version"] = "Not available" | |
| info["pytorch_version"] = torch.__version__ | |
| # Real-ESRGAN info | |
| info["realesrgan_available"] = REALESRGAN_AVAILABLE | |
| info["realesrgan_initialized"] = app_state["upscaler"] is not None | |
| info["current_model"] = app_state.get("current_model", "None") | |
| info["realesrgan_error"] = REALESRGAN_ERROR | |
| # Check if models exist | |
| models_status = {} | |
| if REALESRGAN_AVAILABLE: | |
| models = ['RealESRGAN_x4plus', 'RealESRGAN_x2plus'] | |
| for model in models: | |
| model_path = os.path.join(MODEL_FOLDER, f"{model}.pth") | |
| models_status[model] = os.path.exists(model_path) | |
| info["models_downloaded"] = models_status | |
| # Storage info | |
| try: | |
| upload_files = os.listdir(UPLOAD_FOLDER) if os.path.exists(UPLOAD_FOLDER) else [] | |
| output_files = os.listdir(OUTPUT_FOLDER) if os.path.exists(OUTPUT_FOLDER) else [] | |
| upload_size = sum(os.path.getsize(os.path.join(UPLOAD_FOLDER, f)) | |
| for f in upload_files if os.path.isfile(os.path.join(UPLOAD_FOLDER, f))) | |
| output_size = sum(os.path.getsize(os.path.join(OUTPUT_FOLDER, f)) | |
| for f in output_files if os.path.isfile(os.path.join(OUTPUT_FOLDER, f))) | |
| info["storage_uploads"] = f"{upload_size / (1024**2):.1f}MB" | |
| info["storage_outputs"] = f"{output_size / (1024**2):.1f}MB" | |
| info["upload_files_count"] = len(upload_files) | |
| info["output_files_count"] = len(output_files) | |
| except Exception as e: | |
| info["storage_uploads"] = "Error" | |
| info["storage_outputs"] = "Error" | |
| info["upload_files_count"] = 0 | |
| info["output_files_count"] = 0 | |
| return jsonify({"success": True, "data": info}) | |
| except Exception as e: | |
| return jsonify({"success": False, "error": str(e)}) | |
| def api_upload(): | |
| """Upload and process file for 4K upscaling""" | |
| try: | |
| if 'file' not in request.files: | |
| return jsonify({"success": False, "error": "No file provided"}) | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({"success": False, "error": "No file selected"}) | |
| if file and allowed_file(file.filename): | |
| file_id = str(uuid.uuid4()) | |
| filename = secure_filename(file.filename) | |
| file_ext = filename.rsplit('.', 1)[1].lower() | |
| input_filename = f"{file_id}_input.{file_ext}" | |
| input_path = os.path.join(UPLOAD_FOLDER, input_filename) | |
| file.save(input_path) | |
| output_filename = f"{file_id}_4k.{file_ext}" | |
| output_path = os.path.join(OUTPUT_FOLDER, output_filename) | |
| if file_ext in ['png', 'jpg', 'jpeg', 'gif', 'bmp', 'tiff', 'webp']: | |
| upscale_image_4k(input_path, output_path) | |
| media_type = "image" | |
| elif file_ext in ['mp4', 'avi', 'mov', 'mkv']: | |
| upscale_video_4k(input_path, output_path) | |
| media_type = "video" | |
| log_message(f"π€ File uploaded: {filename}") | |
| log_message(f"π― Starting 4K upscaling process...") | |
| return jsonify({ | |
| "success": True, | |
| "file_id": file_id, | |
| "filename": filename, | |
| "output_filename": output_filename, | |
| "media_type": media_type, | |
| "message": "Upload successful, processing started" | |
| }) | |
| else: | |
| return jsonify({"success": False, "error": "File type not allowed"}) | |
| except Exception as e: | |
| return jsonify({"success": False, "error": str(e)}) | |
| def api_processing_status(): | |
| """Get processing status""" | |
| return jsonify({ | |
| "success": True, | |
| "processing": app_state["processing_active"], | |
| "processed_files": app_state["processed_files"] | |
| }) | |
| def api_download(filename): | |
| """Download processed file""" | |
| try: | |
| file_path = os.path.join(OUTPUT_FOLDER, filename) | |
| if os.path.exists(file_path): | |
| mimetype = get_file_mimetype(filename) | |
| return send_file( | |
| file_path, | |
| as_attachment=True, | |
| download_name=f"4k_upscaled_{filename}", | |
| mimetype=mimetype | |
| ) | |
| else: | |
| return jsonify({"error": "File not found"}), 404 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def api_preview(filename): | |
| """Preview processed file""" | |
| try: | |
| file_path = os.path.join(OUTPUT_FOLDER, filename) | |
| if os.path.exists(file_path): | |
| mimetype = get_file_mimetype(filename) | |
| return send_file(file_path, mimetype=mimetype) | |
| else: | |
| return jsonify({"error": "File not found"}), 404 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def api_logs(): | |
| """Get application logs""" | |
| return jsonify({ | |
| "success": True, | |
| "logs": app_state["logs"] | |
| }) | |
| def api_clear_logs(): | |
| """Clear application logs""" | |
| app_state["logs"] = [] | |
| log_message("π§Ή Logs cleared") | |
| return jsonify({"success": True, "message": "Logs cleared"}) | |
| def api_optimize_gpu(): | |
| """Optimize GPU for processing""" | |
| try: | |
| success = optimize_gpu() | |
| return jsonify({"success": success}) | |
| except Exception as e: | |
| return jsonify({"success": False, "error": str(e)}) | |
| def api_init_realesrgan(): | |
| """Initialize Real-ESRGAN manually""" | |
| try: | |
| if not REALESRGAN_AVAILABLE: | |
| return jsonify({ | |
| "success": False, | |
| "error": f"Real-ESRGAN not available: {REALESRGAN_ERROR}" | |
| }) | |
| success = force_init_realesrgan() | |
| if success: | |
| return jsonify({"success": True, "message": "Real-ESRGAN initialized successfully"}) | |
| else: | |
| return jsonify({"success": False, "error": "Failed to initialize Real-ESRGAN"}) | |
| except Exception as e: | |
| return jsonify({"success": False, "error": str(e)}) | |
| def api_clear_cache(): | |
| """Clear cache and processed files""" | |
| try: | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| app_state["processed_files"] = [] | |
| log_message("π§Ή Cache and history cleared") | |
| return jsonify({"success": True, "message": "Cache cleared"}) | |
| except Exception as e: | |
| return jsonify({"success": False, "error": str(e)}) | |
| def api_test_realesrgan(): | |
| """Test Real-ESRGAN installation""" | |
| try: | |
| if not REALESRGAN_AVAILABLE: | |
| return jsonify({ | |
| "success": False, | |
| "error": f"Real-ESRGAN not available: {REALESRGAN_ERROR}", | |
| "details": { | |
| "import_error": REALESRGAN_ERROR, | |
| "numpy_available": True, | |
| "torch_available": True, | |
| "opencv_available": True | |
| } | |
| }) | |
| # Test imports | |
| try: | |
| from realesrgan import RealESRGANer | |
| from basicsr.archs.rrdbnet_arch import RRDBNet | |
| import_success = True | |
| import_error = None | |
| except Exception as e: | |
| import_success = False | |
| import_error = str(e) | |
| return jsonify({ | |
| "success": import_success, | |
| "error": import_error, | |
| "details": { | |
| "realesrgan_available": REALESRGAN_AVAILABLE, | |
| "import_error": import_error, | |
| "current_model": app_state.get("current_model"), | |
| "upscaler_initialized": app_state["upscaler"] is not None | |
| } | |
| }) | |
| except Exception as e: | |
| return jsonify({"success": False, "error": str(e)}) | |
| if __name__ == '__main__': | |
| # Initialize system | |
| log_message("π 4K Upscaler starting...") | |
| try: | |
| # Optimize GPU if available | |
| if optimize_gpu(): | |
| log_message("β GPU optimization completed") | |
| else: | |
| log_message("β οΈ Using CPU mode") | |
| log_message("β 4K Upscaler ready") | |
| log_message("π€ Upload images or videos to upscale to 4K resolution") | |
| if REALESRGAN_AVAILABLE: | |
| log_message("π§ Real-ESRGAN neural upscaling available") | |
| else: | |
| log_message("β οΈ Real-ESRGAN not available, using enhanced bicubic fallback") | |
| except Exception as e: | |
| log_message(f"β Initialization error: {str(e)}") | |
| log_message("β οΈ Starting in fallback mode...") | |
| # Run application | |
| try: | |
| app.run(host='0.0.0.0', port=7860, debug=False, threaded=True) | |
| except Exception as e: | |
| log_message(f"β Server startup error: {str(e)}") | |
| print(f"Critical error: {str(e)}") |