Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import io | |
| import base64 | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image, ImageFilter, ImageEnhance | |
| from flask import Flask, request, jsonify | |
| import onnx | |
| import onnxruntime as ort | |
| from huggingface_hub import hf_hub_download | |
| app = Flask(__name__) | |
| class SuperUpscaler: | |
| """مُحسِّن دقة يستخدم AI لجميع الصور مهما كان حجمها.""" | |
| def __init__(self): | |
| self.scale = 4 | |
| self.model_path = None | |
| self.onnx_session = None | |
| self.input_shape = None | |
| self.output_shape = None | |
| print("🚀 بدء إعداد Super Upscaler مع ONNX...") | |
| self.setup_complete = self.load_onnx_model() | |
| if self.setup_complete: | |
| print("✅ تم تحميل نموذج ONNX والتحقق منه بنجاح!") | |
| else: | |
| print("⚠️ فشل تحميل نموذج ONNX، سيتم استخدام الطريقة التقليدية.") | |
| def load_onnx_model(self): | |
| """تحميل نموذج ONNX من Hugging Face Hub.""" | |
| try: | |
| print("📥 تحميل نموذج UltraSharp 4x من Hugging Face Hub...") | |
| self.model_path = hf_hub_download( | |
| repo_id="Kim2091/UltraSharp", | |
| filename="ONNX/4x-UltraSharp-fp32-opset14.onnx", | |
| force_download=False, | |
| ) | |
| print(f"✅ تم تنزيل النموذج إلى: {self.model_path}") | |
| onnx_model = onnx.load(self.model_path) | |
| onnx.checker.check_model(onnx_model) | |
| print("✅ تم التحقق من سلامة النموذج.") | |
| session_options = ort.SessionOptions() | |
| session_options.enable_cpu_mem_arena = True | |
| session_options.enable_mem_pattern = True | |
| session_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL | |
| session_options.intra_op_num_threads = max(1, os.cpu_count() // 2) | |
| session_options.inter_op_num_threads = 1 | |
| self.onnx_session = ort.InferenceSession( | |
| self.model_path, | |
| sess_options=session_options, | |
| providers=["CPUExecutionProvider"], | |
| ) | |
| input_info = self.onnx_session.get_inputs()[0] | |
| output_info = self.onnx_session.get_outputs()[0] | |
| self.input_shape = input_info.shape | |
| self.output_shape = output_info.shape | |
| print(f"📊 شكل الإدخال: {self.input_shape}") | |
| print(f"📊 شكل الإخراج: {self.output_shape}") | |
| return True | |
| except Exception as exc: | |
| print(f"❌ تعذّر تهيئة نموذج ONNX: {exc}") | |
| self.onnx_session = None | |
| return False | |
| def preprocess_for_ai(self, image, max_size=512): | |
| """ | |
| تحضير الصورة للنموذج بدون تقليل الحجم الشديد. | |
| """ | |
| try: | |
| rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
| h, w = rgb_image.shape[:2] | |
| # تقليل فقط إذا كانت الصورة كبيرة جداً | |
| if max(h, w) > max_size: | |
| scale = max_size / max(h, w) | |
| new_w = int(w * scale) | |
| new_h = int(h * scale) | |
| print(f"📐 تصغير من {w}x{h} إلى {new_w}x{new_h} للمعالجة") | |
| rgb_image = cv2.resize(rgb_image, (new_w, new_h), interpolation=cv2.INTER_LANCZOS4) | |
| h, w = rgb_image.shape[:2] | |
| # جعل الأبعاد قابلة للقسمة على 4 | |
| new_h = ((h + 3) // 4) * 4 | |
| new_w = ((w + 3) // 4) * 4 | |
| if new_h != h or new_w != w: | |
| rgb_image = cv2.resize(rgb_image, (new_w, new_h), interpolation=cv2.INTER_LANCZOS4) | |
| print(f"📐 تعديل للأبعاد: {new_w}x{new_h}") | |
| # تطبيع | |
| normalized = rgb_image.astype(np.float32) / 255.0 | |
| transposed = np.transpose(normalized, (2, 0, 1)) | |
| batched = np.expand_dims(transposed, axis=0) | |
| return batched, (h, w) | |
| except Exception as exc: | |
| print(f"❌ خطأ أثناء التحضير للنموذج: {exc}") | |
| return None, None | |
| def enhance_image(self, image): | |
| """رفع دقة الصورة باستخدام AI أو الطريقة التقليدية.""" | |
| try: | |
| print(f"📏 أبعاد الصورة الأصلية: {image.shape[1]}x{image.shape[0]}") | |
| if self.setup_complete and self.onnx_session is not None: | |
| print("🤖 استخدام نموذج UltraSharp عبر ONNX Runtime...") | |
| result = self.enhance_with_ai_smart(image) | |
| if result is not None: | |
| return result | |
| print("⚠️ فشل المعالجة بالذكاء الاصطناعي، سيتم استخدام طريقة لانكزوس.") | |
| else: | |
| print("🔄 استخدام طريقة لانكزوس المحسّنة (بدون AI).") | |
| return self.lanczos_enhanced(image) | |
| except Exception as exc: | |
| print(f"❌ خطأ أثناء رفع الدقة: {exc}") | |
| return self.lanczos_enhanced(image) | |
| def enhance_with_ai_smart(self, image): | |
| """رفع دقة الصورة مع معالجة خاصة للصور الكبيرة.""" | |
| try: | |
| h, w = image.shape[:2] | |
| # استخدام تقسيم للصور الكبيرة جداً فقط | |
| if max(h, w) > 1024: | |
| print("📦 الصورة كبيرة جداً - تقسيم إلى أجزاء...") | |
| return self.process_large_image_in_tiles(image, tile_size=512, overlap=64) | |
| print("🔄 معالجة الصورة كاملة عبر الذكاء الاصطناعي...") | |
| return self.process_single_image_with_ai(image) | |
| except Exception as exc: | |
| print(f"❌ خطأ أثناء استخدام الذكاء الاصطناعي: {exc}") | |
| return None | |
| def process_single_image_with_ai(self, image): | |
| """تشغيل النموذج على صورة واحدة.""" | |
| try: | |
| input_data, original_size = self.preprocess_for_ai(image, max_size=512) | |
| if input_data is None: | |
| return None | |
| input_name = self.onnx_session.get_inputs()[0].name | |
| start_time = time.time() | |
| outputs = self.onnx_session.run(None, {input_name: input_data}) | |
| inference_time = time.time() - start_time | |
| print(f"⏱️ زمن الاستدلال: {inference_time:.2f} ثانية") | |
| enhanced = self.postprocess_from_ai(outputs[0]) | |
| if enhanced is not None: | |
| # تكبير للحجم المطلوب (4x من الأصل) | |
| orig_h, orig_w = image.shape[:2] | |
| target_h, target_w = orig_h * self.scale, orig_w * self.scale | |
| curr_h, curr_w = enhanced.shape[:2] | |
| # إذا كان الحجم أصغر من المطلوب، كبّر | |
| if curr_h < target_h or curr_w < target_w: | |
| print(f"📐 تكبير من {curr_w}x{curr_h} إلى {target_w}x{target_h}") | |
| enhanced = cv2.resize(enhanced, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4) | |
| return enhanced | |
| return None | |
| except Exception as exc: | |
| print(f"❌ خطأ أثناء الاستدلال: {exc}") | |
| return None | |
| def process_large_image_in_tiles(self, image, tile_size=512, overlap=64): | |
| """تقسيم الصورة الكبيرة إلى أجزاء ومعالجتها بشكل محسّن - مع إصلاح خطأ الدمج.""" | |
| try: | |
| h, w = image.shape[:2] | |
| print(f"📦 تقسيم صورة {w}x{h} إلى أجزاء {tile_size}x{tile_size}") | |
| output_h, output_w = h * self.scale, w * self.scale | |
| # ✅ تغيير النوع إلى float32 لتجنب خطأ التحويل | |
| final_image = np.zeros((output_h, output_w, 3), dtype=np.float32) | |
| weight_map = np.zeros((output_h, output_w), dtype=np.float32) | |
| processed_tiles = 0 | |
| y_positions = list(range(0, h, tile_size - overlap)) | |
| x_positions = list(range(0, w, tile_size - overlap)) | |
| total_tiles = len(y_positions) * len(x_positions) | |
| for y in y_positions: | |
| for x in x_positions: | |
| y_end = min(y + tile_size, h) | |
| x_end = min(x + tile_size, w) | |
| tile = image[y:y_end, x:x_end] | |
| # معالجة القطعة | |
| enhanced_tile = self.process_single_image_with_ai(tile) | |
| if enhanced_tile is not None: | |
| tile_h, tile_w = enhanced_tile.shape[:2] | |
| y_start = y * self.scale | |
| x_start = x * self.scale | |
| y_end_out = min(y_start + tile_h, output_h) | |
| x_end_out = min(x_start + tile_w, output_w) | |
| # دمج مع وزن لتقليل الحواف | |
| actual_tile_h = y_end_out - y_start | |
| actual_tile_w = x_end_out - x_start | |
| # إنشاء وزن تدريجي للحواف | |
| fade = np.ones((actual_tile_h, actual_tile_w), dtype=np.float32) | |
| fade_size = min(overlap * self.scale, min(actual_tile_h, actual_tile_w) // 4) | |
| if fade_size > 0: | |
| for i in range(fade_size): | |
| alpha = i / fade_size | |
| fade[i, :] *= alpha | |
| fade[-i-1, :] *= alpha | |
| fade[:, i] *= alpha | |
| fade[:, -i-1] *= alpha | |
| # ✅ تحويل enhanced_tile إلى float32 للدمج | |
| enhanced_tile_float = enhanced_tile[:actual_tile_h, :actual_tile_w].astype(np.float32) | |
| for c in range(3): | |
| final_image[y_start:y_end_out, x_start:x_end_out, c] += ( | |
| enhanced_tile_float[:, :, c] * fade | |
| ) | |
| weight_map[y_start:y_end_out, x_start:x_end_out] += fade | |
| processed_tiles += 1 | |
| if processed_tiles % 5 == 0: | |
| print(f"📦 تمت معالجة {processed_tiles}/{total_tiles} من الأجزاء") | |
| # ✅ تطبيع حسب الوزن بشكل صحيح | |
| # تجنب القسمة على صفر | |
| weight_map = np.maximum(weight_map, 1e-6) | |
| for c in range(3): | |
| final_image[:, :, c] = final_image[:, :, c] / weight_map | |
| # ✅ تحويل إلى uint8 في النهاية | |
| final_image = np.clip(final_image, 0, 255).astype(np.uint8) | |
| print("✅ تم دمج الأجزاء بنجاح!") | |
| return final_image | |
| except Exception as exc: | |
| print(f"❌ خطأ أثناء معالجة الأجزاء: {exc}") | |
| import traceback | |
| traceback.print_exc() | |
| return None | |
| def postprocess_from_ai(self, output): | |
| """تحويل مخرجات النموذج إلى صورة BGR.""" | |
| try: | |
| if len(output.shape) == 4: | |
| output = np.squeeze(output, axis=0) | |
| if output.shape[0] == 3: | |
| output = np.transpose(output, (1, 2, 0)) | |
| output = np.clip(output, 0, 1) | |
| output = (output * 255).astype(np.uint8) | |
| bgr = cv2.cvtColor(output, cv2.COLOR_RGB2BGR) | |
| return bgr | |
| except Exception as exc: | |
| print(f"❌ خطأ أثناء تحويل مخرجات النموذج: {exc}") | |
| return None | |
| def lanczos_enhanced(self, image): | |
| """رفع دقة الصورة بطريقة لانكزوس مع تحسينات إضافية.""" | |
| try: | |
| h, w = image.shape[:2] | |
| pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) | |
| # تقليل الضوضاء للصور الصغيرة فقط | |
| if min(w, h) < 300: | |
| pil_image = pil_image.filter(ImageFilter.MedianFilter(size=3)) | |
| new_size = (w * self.scale, h * self.scale) | |
| upscaled = pil_image.resize(new_size, Image.LANCZOS) | |
| # تحسينات خفيفة | |
| enhancer = ImageEnhance.Sharpness(upscaled) | |
| upscaled = enhancer.enhance(1.2) | |
| enhancer = ImageEnhance.Contrast(upscaled) | |
| upscaled = enhancer.enhance(1.05) | |
| return cv2.cvtColor(np.array(upscaled), cv2.COLOR_RGB2BGR) | |
| except Exception as exc: | |
| print(f"❌ خطأ في طريقة لانكزوس: {exc}") | |
| # إرجاع الصورة مكبرة بطريقة بسيطة | |
| h, w = image.shape[:2] | |
| return cv2.resize(image, (w * self.scale, h * self.scale), interpolation=cv2.INTER_CUBIC) | |
| upscaler = SuperUpscaler() | |
| def health_check(): | |
| model_type = "UltraSharp AI 4x" if upscaler.setup_complete else "Enhanced Traditional Upscaler" | |
| return jsonify( | |
| { | |
| "status": "running", | |
| "model_loaded": upscaler.setup_complete, | |
| "model_type": model_type, | |
| "scale_factor": upscaler.scale, | |
| "ai_model_available": upscaler.setup_complete, | |
| "supports_large_images": True, | |
| "message": f"Super Resolution API running with {model_type}", | |
| } | |
| ) | |
| def upscale_image(): | |
| try: | |
| if "image" not in request.files: | |
| return jsonify({"success": False, "error": "لم يتم إرسال صورة"}), 400 | |
| file = request.files["image"] | |
| if file.filename == "": | |
| return jsonify({"success": False, "error": "لم يتم اختيار صورة"}), 400 | |
| start_total = time.time() | |
| image_bytes = file.read() | |
| nparr = np.frombuffer(image_bytes, np.uint8) | |
| image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| if image is None: | |
| return jsonify({"success": False, "error": "فشل في قراءة الصورة"}), 400 | |
| original_height, original_width = image.shape[:2] | |
| print(f"📏 معالجة صورة: {original_width}x{original_height}") | |
| method_used = "UltraSharp AI 4x" if upscaler.setup_complete else "Enhanced Lanczos" | |
| process_start = time.time() | |
| upscaled_image = upscaler.enhance_image(image) | |
| process_time = time.time() - process_start | |
| if upscaled_image is None: | |
| return jsonify({"success": False, "error": "فشل في رفع دقة الصورة"}), 500 | |
| final_h, final_w = upscaled_image.shape[:2] | |
| print(f"✅ زمن المعالجة: {process_time:.2f}s | النتيجة: {final_w}x{final_h}") | |
| # حفظ بجودة عالية | |
| pil_image = Image.fromarray(cv2.cvtColor(upscaled_image, cv2.COLOR_BGR2RGB)) | |
| buffer = io.BytesIO() | |
| pil_image.save(buffer, format="PNG", optimize=False, compress_level=1) | |
| img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8") | |
| total_time = time.time() - start_total | |
| return jsonify( | |
| { | |
| "success": True, | |
| "message": f"تم رفع الدقة بنجاح باستخدام {method_used}!", | |
| "image": img_base64, | |
| "original_size": f"{original_width}x{original_height}", | |
| "upscaled_size": f"{final_w}x{final_h}", | |
| "scale_factor": upscaler.scale, | |
| "model_type": method_used, | |
| "processing_time": f"{total_time:.2f}s", | |
| "ai_used": upscaler.setup_complete, | |
| } | |
| ) | |
| except Exception as exc: | |
| print(f"❌ خطأ غير متوقّع: {exc}") | |
| import traceback | |
| traceback.print_exc() | |
| return jsonify({"success": False, "error": f"خطأ في معالجة الصورة: {str(exc)}"}), 500 | |
| def home(): | |
| model_status = "🤖 UltraSharp AI 4x" if upscaler.setup_complete else "🔧 Enhanced Traditional" | |
| return f""" | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>AI Super Resolution</title> | |
| <style> | |
| body {{ font-family: Arial; padding: 20px; background: #f5f5f5; }} | |
| .container {{ max-width: 800px; margin: 0 auto; background: white; padding: 30px; border-radius: 10px; }} | |
| h1 {{ color: #333; }} | |
| .status {{ padding: 15px; background: #e8f5e9; border-radius: 5px; margin: 20px 0; }} | |
| </style> | |
| </head> | |
| <body> | |
| <div class="container"> | |
| <h1>🚀 AI Super Resolution API</h1> | |
| <div class="status"> | |
| <p><strong>حالة النموذج:</strong> {model_status}</p> | |
| <p><strong>معامل التكبير:</strong> 4x</p> | |
| <p><strong>جاهز للاستخدام:</strong> ✅</p> | |
| </div> | |
| <h3>نقاط النهاية (Endpoints):</h3> | |
| <ul> | |
| <li><a href="/health">/health</a> - فحص حالة الخدمة</li> | |
| <li>/upscale - رفع دقة الصورة (POST)</li> | |
| </ul> | |
| </div> | |
| </body> | |
| </html> | |
| """ | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 7860)) | |
| print(f"🚀 تشغيل Flask على المنفذ {port}...") | |
| print("=" * 60) | |
| app.run(host="0.0.0.0", port=port, debug=False, threaded=True) |