import os import time import io import base64 import cv2 import numpy as np from PIL import Image, ImageFilter, ImageEnhance from flask import Flask, request, jsonify import onnx import onnxruntime as ort from huggingface_hub import hf_hub_download app = Flask(__name__) class SuperUpscaler: """مُحسِّن دقة يستخدم AI لجميع الصور مهما كان حجمها.""" def __init__(self): self.scale = 4 self.model_path = None self.onnx_session = None self.input_shape = None self.output_shape = None print("🚀 بدء إعداد Super Upscaler مع ONNX...") self.setup_complete = self.load_onnx_model() if self.setup_complete: print("✅ تم تحميل نموذج ONNX والتحقق منه بنجاح!") else: print("⚠️ فشل تحميل نموذج ONNX، سيتم استخدام الطريقة التقليدية.") def load_onnx_model(self): """تحميل نموذج ONNX من Hugging Face Hub.""" try: print("📥 تحميل نموذج UltraSharp 4x من Hugging Face Hub...") self.model_path = hf_hub_download( repo_id="Kim2091/UltraSharp", filename="ONNX/4x-UltraSharp-fp32-opset14.onnx", force_download=False, ) print(f"✅ تم تنزيل النموذج إلى: {self.model_path}") onnx_model = onnx.load(self.model_path) onnx.checker.check_model(onnx_model) print("✅ تم التحقق من سلامة النموذج.") session_options = ort.SessionOptions() session_options.enable_cpu_mem_arena = True session_options.enable_mem_pattern = True session_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL session_options.intra_op_num_threads = max(1, os.cpu_count() // 2) session_options.inter_op_num_threads = 1 self.onnx_session = ort.InferenceSession( self.model_path, sess_options=session_options, providers=["CPUExecutionProvider"], ) input_info = self.onnx_session.get_inputs()[0] output_info = self.onnx_session.get_outputs()[0] self.input_shape = input_info.shape self.output_shape = output_info.shape print(f"📊 شكل الإدخال: {self.input_shape}") print(f"📊 شكل الإخراج: {self.output_shape}") return True except Exception as exc: print(f"❌ تعذّر تهيئة نموذج ONNX: {exc}") self.onnx_session = None return False def preprocess_for_ai(self, image, max_size=512): """ تحضير الصورة للنموذج بدون تقليل الحجم الشديد. """ try: rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) h, w = rgb_image.shape[:2] # تقليل فقط إذا كانت الصورة كبيرة جداً if max(h, w) > max_size: scale = max_size / max(h, w) new_w = int(w * scale) new_h = int(h * scale) print(f"📐 تصغير من {w}x{h} إلى {new_w}x{new_h} للمعالجة") rgb_image = cv2.resize(rgb_image, (new_w, new_h), interpolation=cv2.INTER_LANCZOS4) h, w = rgb_image.shape[:2] # جعل الأبعاد قابلة للقسمة على 4 new_h = ((h + 3) // 4) * 4 new_w = ((w + 3) // 4) * 4 if new_h != h or new_w != w: rgb_image = cv2.resize(rgb_image, (new_w, new_h), interpolation=cv2.INTER_LANCZOS4) print(f"📐 تعديل للأبعاد: {new_w}x{new_h}") # تطبيع normalized = rgb_image.astype(np.float32) / 255.0 transposed = np.transpose(normalized, (2, 0, 1)) batched = np.expand_dims(transposed, axis=0) return batched, (h, w) except Exception as exc: print(f"❌ خطأ أثناء التحضير للنموذج: {exc}") return None, None def enhance_image(self, image): """رفع دقة الصورة باستخدام AI أو الطريقة التقليدية.""" try: print(f"📏 أبعاد الصورة الأصلية: {image.shape[1]}x{image.shape[0]}") if self.setup_complete and self.onnx_session is not None: print("🤖 استخدام نموذج UltraSharp عبر ONNX Runtime...") result = self.enhance_with_ai_smart(image) if result is not None: return result print("⚠️ فشل المعالجة بالذكاء الاصطناعي، سيتم استخدام طريقة لانكزوس.") else: print("🔄 استخدام طريقة لانكزوس المحسّنة (بدون AI).") return self.lanczos_enhanced(image) except Exception as exc: print(f"❌ خطأ أثناء رفع الدقة: {exc}") return self.lanczos_enhanced(image) def enhance_with_ai_smart(self, image): """رفع دقة الصورة مع معالجة خاصة للصور الكبيرة.""" try: h, w = image.shape[:2] # استخدام تقسيم للصور الكبيرة جداً فقط if max(h, w) > 1024: print("📦 الصورة كبيرة جداً - تقسيم إلى أجزاء...") return self.process_large_image_in_tiles(image, tile_size=512, overlap=64) print("🔄 معالجة الصورة كاملة عبر الذكاء الاصطناعي...") return self.process_single_image_with_ai(image) except Exception as exc: print(f"❌ خطأ أثناء استخدام الذكاء الاصطناعي: {exc}") return None def process_single_image_with_ai(self, image): """تشغيل النموذج على صورة واحدة.""" try: input_data, original_size = self.preprocess_for_ai(image, max_size=512) if input_data is None: return None input_name = self.onnx_session.get_inputs()[0].name start_time = time.time() outputs = self.onnx_session.run(None, {input_name: input_data}) inference_time = time.time() - start_time print(f"⏱️ زمن الاستدلال: {inference_time:.2f} ثانية") enhanced = self.postprocess_from_ai(outputs[0]) if enhanced is not None: # تكبير للحجم المطلوب (4x من الأصل) orig_h, orig_w = image.shape[:2] target_h, target_w = orig_h * self.scale, orig_w * self.scale curr_h, curr_w = enhanced.shape[:2] # إذا كان الحجم أصغر من المطلوب، كبّر if curr_h < target_h or curr_w < target_w: print(f"📐 تكبير من {curr_w}x{curr_h} إلى {target_w}x{target_h}") enhanced = cv2.resize(enhanced, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4) return enhanced return None except Exception as exc: print(f"❌ خطأ أثناء الاستدلال: {exc}") return None def process_large_image_in_tiles(self, image, tile_size=512, overlap=64): """تقسيم الصورة الكبيرة إلى أجزاء ومعالجتها بشكل محسّن - مع إصلاح خطأ الدمج.""" try: h, w = image.shape[:2] print(f"📦 تقسيم صورة {w}x{h} إلى أجزاء {tile_size}x{tile_size}") output_h, output_w = h * self.scale, w * self.scale # ✅ تغيير النوع إلى float32 لتجنب خطأ التحويل final_image = np.zeros((output_h, output_w, 3), dtype=np.float32) weight_map = np.zeros((output_h, output_w), dtype=np.float32) processed_tiles = 0 y_positions = list(range(0, h, tile_size - overlap)) x_positions = list(range(0, w, tile_size - overlap)) total_tiles = len(y_positions) * len(x_positions) for y in y_positions: for x in x_positions: y_end = min(y + tile_size, h) x_end = min(x + tile_size, w) tile = image[y:y_end, x:x_end] # معالجة القطعة enhanced_tile = self.process_single_image_with_ai(tile) if enhanced_tile is not None: tile_h, tile_w = enhanced_tile.shape[:2] y_start = y * self.scale x_start = x * self.scale y_end_out = min(y_start + tile_h, output_h) x_end_out = min(x_start + tile_w, output_w) # دمج مع وزن لتقليل الحواف actual_tile_h = y_end_out - y_start actual_tile_w = x_end_out - x_start # إنشاء وزن تدريجي للحواف fade = np.ones((actual_tile_h, actual_tile_w), dtype=np.float32) fade_size = min(overlap * self.scale, min(actual_tile_h, actual_tile_w) // 4) if fade_size > 0: for i in range(fade_size): alpha = i / fade_size fade[i, :] *= alpha fade[-i-1, :] *= alpha fade[:, i] *= alpha fade[:, -i-1] *= alpha # ✅ تحويل enhanced_tile إلى float32 للدمج enhanced_tile_float = enhanced_tile[:actual_tile_h, :actual_tile_w].astype(np.float32) for c in range(3): final_image[y_start:y_end_out, x_start:x_end_out, c] += ( enhanced_tile_float[:, :, c] * fade ) weight_map[y_start:y_end_out, x_start:x_end_out] += fade processed_tiles += 1 if processed_tiles % 5 == 0: print(f"📦 تمت معالجة {processed_tiles}/{total_tiles} من الأجزاء") # ✅ تطبيع حسب الوزن بشكل صحيح # تجنب القسمة على صفر weight_map = np.maximum(weight_map, 1e-6) for c in range(3): final_image[:, :, c] = final_image[:, :, c] / weight_map # ✅ تحويل إلى uint8 في النهاية final_image = np.clip(final_image, 0, 255).astype(np.uint8) print("✅ تم دمج الأجزاء بنجاح!") return final_image except Exception as exc: print(f"❌ خطأ أثناء معالجة الأجزاء: {exc}") import traceback traceback.print_exc() return None def postprocess_from_ai(self, output): """تحويل مخرجات النموذج إلى صورة BGR.""" try: if len(output.shape) == 4: output = np.squeeze(output, axis=0) if output.shape[0] == 3: output = np.transpose(output, (1, 2, 0)) output = np.clip(output, 0, 1) output = (output * 255).astype(np.uint8) bgr = cv2.cvtColor(output, cv2.COLOR_RGB2BGR) return bgr except Exception as exc: print(f"❌ خطأ أثناء تحويل مخرجات النموذج: {exc}") return None def lanczos_enhanced(self, image): """رفع دقة الصورة بطريقة لانكزوس مع تحسينات إضافية.""" try: h, w = image.shape[:2] pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) # تقليل الضوضاء للصور الصغيرة فقط if min(w, h) < 300: pil_image = pil_image.filter(ImageFilter.MedianFilter(size=3)) new_size = (w * self.scale, h * self.scale) upscaled = pil_image.resize(new_size, Image.LANCZOS) # تحسينات خفيفة enhancer = ImageEnhance.Sharpness(upscaled) upscaled = enhancer.enhance(1.2) enhancer = ImageEnhance.Contrast(upscaled) upscaled = enhancer.enhance(1.05) return cv2.cvtColor(np.array(upscaled), cv2.COLOR_RGB2BGR) except Exception as exc: print(f"❌ خطأ في طريقة لانكزوس: {exc}") # إرجاع الصورة مكبرة بطريقة بسيطة h, w = image.shape[:2] return cv2.resize(image, (w * self.scale, h * self.scale), interpolation=cv2.INTER_CUBIC) upscaler = SuperUpscaler() @app.route("/health", methods=["GET"]) def health_check(): model_type = "UltraSharp AI 4x" if upscaler.setup_complete else "Enhanced Traditional Upscaler" return jsonify( { "status": "running", "model_loaded": upscaler.setup_complete, "model_type": model_type, "scale_factor": upscaler.scale, "ai_model_available": upscaler.setup_complete, "supports_large_images": True, "message": f"Super Resolution API running with {model_type}", } ) @app.route("/upscale", methods=["POST"]) def upscale_image(): try: if "image" not in request.files: return jsonify({"success": False, "error": "لم يتم إرسال صورة"}), 400 file = request.files["image"] if file.filename == "": return jsonify({"success": False, "error": "لم يتم اختيار صورة"}), 400 start_total = time.time() image_bytes = file.read() nparr = np.frombuffer(image_bytes, np.uint8) image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if image is None: return jsonify({"success": False, "error": "فشل في قراءة الصورة"}), 400 original_height, original_width = image.shape[:2] print(f"📏 معالجة صورة: {original_width}x{original_height}") method_used = "UltraSharp AI 4x" if upscaler.setup_complete else "Enhanced Lanczos" process_start = time.time() upscaled_image = upscaler.enhance_image(image) process_time = time.time() - process_start if upscaled_image is None: return jsonify({"success": False, "error": "فشل في رفع دقة الصورة"}), 500 final_h, final_w = upscaled_image.shape[:2] print(f"✅ زمن المعالجة: {process_time:.2f}s | النتيجة: {final_w}x{final_h}") # حفظ بجودة عالية pil_image = Image.fromarray(cv2.cvtColor(upscaled_image, cv2.COLOR_BGR2RGB)) buffer = io.BytesIO() pil_image.save(buffer, format="PNG", optimize=False, compress_level=1) img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8") total_time = time.time() - start_total return jsonify( { "success": True, "message": f"تم رفع الدقة بنجاح باستخدام {method_used}!", "image": img_base64, "original_size": f"{original_width}x{original_height}", "upscaled_size": f"{final_w}x{final_h}", "scale_factor": upscaler.scale, "model_type": method_used, "processing_time": f"{total_time:.2f}s", "ai_used": upscaler.setup_complete, } ) except Exception as exc: print(f"❌ خطأ غير متوقّع: {exc}") import traceback traceback.print_exc() return jsonify({"success": False, "error": f"خطأ في معالجة الصورة: {str(exc)}"}), 500 @app.route("/", methods=["GET"]) def home(): model_status = "🤖 UltraSharp AI 4x" if upscaler.setup_complete else "🔧 Enhanced Traditional" return f""" AI Super Resolution

🚀 AI Super Resolution API

حالة النموذج: {model_status}

معامل التكبير: 4x

جاهز للاستخدام:

نقاط النهاية (Endpoints):

""" if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) print(f"🚀 تشغيل Flask على المنفذ {port}...") print("=" * 60) app.run(host="0.0.0.0", port=port, debug=False, threaded=True)