ImageQuality / app.py
MahmoudNabilMohamed's picture
Update app.py
482fe50 verified
import os
import time
import io
import base64
import cv2
import numpy as np
from PIL import Image, ImageFilter, ImageEnhance
from flask import Flask, request, jsonify
import onnx
import onnxruntime as ort
from huggingface_hub import hf_hub_download
app = Flask(__name__)
class SuperUpscaler:
"""مُحسِّن دقة يستخدم AI لجميع الصور مهما كان حجمها."""
def __init__(self):
self.scale = 4
self.model_path = None
self.onnx_session = None
self.input_shape = None
self.output_shape = None
print("🚀 بدء إعداد Super Upscaler مع ONNX...")
self.setup_complete = self.load_onnx_model()
if self.setup_complete:
print("✅ تم تحميل نموذج ONNX والتحقق منه بنجاح!")
else:
print("⚠️ فشل تحميل نموذج ONNX، سيتم استخدام الطريقة التقليدية.")
def load_onnx_model(self):
"""تحميل نموذج ONNX من Hugging Face Hub."""
try:
print("📥 تحميل نموذج UltraSharp 4x من Hugging Face Hub...")
self.model_path = hf_hub_download(
repo_id="Kim2091/UltraSharp",
filename="ONNX/4x-UltraSharp-fp32-opset14.onnx",
force_download=False,
)
print(f"✅ تم تنزيل النموذج إلى: {self.model_path}")
onnx_model = onnx.load(self.model_path)
onnx.checker.check_model(onnx_model)
print("✅ تم التحقق من سلامة النموذج.")
session_options = ort.SessionOptions()
session_options.enable_cpu_mem_arena = True
session_options.enable_mem_pattern = True
session_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
session_options.intra_op_num_threads = max(1, os.cpu_count() // 2)
session_options.inter_op_num_threads = 1
self.onnx_session = ort.InferenceSession(
self.model_path,
sess_options=session_options,
providers=["CPUExecutionProvider"],
)
input_info = self.onnx_session.get_inputs()[0]
output_info = self.onnx_session.get_outputs()[0]
self.input_shape = input_info.shape
self.output_shape = output_info.shape
print(f"📊 شكل الإدخال: {self.input_shape}")
print(f"📊 شكل الإخراج: {self.output_shape}")
return True
except Exception as exc:
print(f"❌ تعذّر تهيئة نموذج ONNX: {exc}")
self.onnx_session = None
return False
def preprocess_for_ai(self, image, max_size=512):
"""
تحضير الصورة للنموذج بدون تقليل الحجم الشديد.
"""
try:
rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
h, w = rgb_image.shape[:2]
# تقليل فقط إذا كانت الصورة كبيرة جداً
if max(h, w) > max_size:
scale = max_size / max(h, w)
new_w = int(w * scale)
new_h = int(h * scale)
print(f"📐 تصغير من {w}x{h} إلى {new_w}x{new_h} للمعالجة")
rgb_image = cv2.resize(rgb_image, (new_w, new_h), interpolation=cv2.INTER_LANCZOS4)
h, w = rgb_image.shape[:2]
# جعل الأبعاد قابلة للقسمة على 4
new_h = ((h + 3) // 4) * 4
new_w = ((w + 3) // 4) * 4
if new_h != h or new_w != w:
rgb_image = cv2.resize(rgb_image, (new_w, new_h), interpolation=cv2.INTER_LANCZOS4)
print(f"📐 تعديل للأبعاد: {new_w}x{new_h}")
# تطبيع
normalized = rgb_image.astype(np.float32) / 255.0
transposed = np.transpose(normalized, (2, 0, 1))
batched = np.expand_dims(transposed, axis=0)
return batched, (h, w)
except Exception as exc:
print(f"❌ خطأ أثناء التحضير للنموذج: {exc}")
return None, None
def enhance_image(self, image):
"""رفع دقة الصورة باستخدام AI أو الطريقة التقليدية."""
try:
print(f"📏 أبعاد الصورة الأصلية: {image.shape[1]}x{image.shape[0]}")
if self.setup_complete and self.onnx_session is not None:
print("🤖 استخدام نموذج UltraSharp عبر ONNX Runtime...")
result = self.enhance_with_ai_smart(image)
if result is not None:
return result
print("⚠️ فشل المعالجة بالذكاء الاصطناعي، سيتم استخدام طريقة لانكزوس.")
else:
print("🔄 استخدام طريقة لانكزوس المحسّنة (بدون AI).")
return self.lanczos_enhanced(image)
except Exception as exc:
print(f"❌ خطأ أثناء رفع الدقة: {exc}")
return self.lanczos_enhanced(image)
def enhance_with_ai_smart(self, image):
"""رفع دقة الصورة مع معالجة خاصة للصور الكبيرة."""
try:
h, w = image.shape[:2]
# استخدام تقسيم للصور الكبيرة جداً فقط
if max(h, w) > 1024:
print("📦 الصورة كبيرة جداً - تقسيم إلى أجزاء...")
return self.process_large_image_in_tiles(image, tile_size=512, overlap=64)
print("🔄 معالجة الصورة كاملة عبر الذكاء الاصطناعي...")
return self.process_single_image_with_ai(image)
except Exception as exc:
print(f"❌ خطأ أثناء استخدام الذكاء الاصطناعي: {exc}")
return None
def process_single_image_with_ai(self, image):
"""تشغيل النموذج على صورة واحدة."""
try:
input_data, original_size = self.preprocess_for_ai(image, max_size=512)
if input_data is None:
return None
input_name = self.onnx_session.get_inputs()[0].name
start_time = time.time()
outputs = self.onnx_session.run(None, {input_name: input_data})
inference_time = time.time() - start_time
print(f"⏱️ زمن الاستدلال: {inference_time:.2f} ثانية")
enhanced = self.postprocess_from_ai(outputs[0])
if enhanced is not None:
# تكبير للحجم المطلوب (4x من الأصل)
orig_h, orig_w = image.shape[:2]
target_h, target_w = orig_h * self.scale, orig_w * self.scale
curr_h, curr_w = enhanced.shape[:2]
# إذا كان الحجم أصغر من المطلوب، كبّر
if curr_h < target_h or curr_w < target_w:
print(f"📐 تكبير من {curr_w}x{curr_h} إلى {target_w}x{target_h}")
enhanced = cv2.resize(enhanced, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4)
return enhanced
return None
except Exception as exc:
print(f"❌ خطأ أثناء الاستدلال: {exc}")
return None
def process_large_image_in_tiles(self, image, tile_size=512, overlap=64):
"""تقسيم الصورة الكبيرة إلى أجزاء ومعالجتها بشكل محسّن - مع إصلاح خطأ الدمج."""
try:
h, w = image.shape[:2]
print(f"📦 تقسيم صورة {w}x{h} إلى أجزاء {tile_size}x{tile_size}")
output_h, output_w = h * self.scale, w * self.scale
# ✅ تغيير النوع إلى float32 لتجنب خطأ التحويل
final_image = np.zeros((output_h, output_w, 3), dtype=np.float32)
weight_map = np.zeros((output_h, output_w), dtype=np.float32)
processed_tiles = 0
y_positions = list(range(0, h, tile_size - overlap))
x_positions = list(range(0, w, tile_size - overlap))
total_tiles = len(y_positions) * len(x_positions)
for y in y_positions:
for x in x_positions:
y_end = min(y + tile_size, h)
x_end = min(x + tile_size, w)
tile = image[y:y_end, x:x_end]
# معالجة القطعة
enhanced_tile = self.process_single_image_with_ai(tile)
if enhanced_tile is not None:
tile_h, tile_w = enhanced_tile.shape[:2]
y_start = y * self.scale
x_start = x * self.scale
y_end_out = min(y_start + tile_h, output_h)
x_end_out = min(x_start + tile_w, output_w)
# دمج مع وزن لتقليل الحواف
actual_tile_h = y_end_out - y_start
actual_tile_w = x_end_out - x_start
# إنشاء وزن تدريجي للحواف
fade = np.ones((actual_tile_h, actual_tile_w), dtype=np.float32)
fade_size = min(overlap * self.scale, min(actual_tile_h, actual_tile_w) // 4)
if fade_size > 0:
for i in range(fade_size):
alpha = i / fade_size
fade[i, :] *= alpha
fade[-i-1, :] *= alpha
fade[:, i] *= alpha
fade[:, -i-1] *= alpha
# ✅ تحويل enhanced_tile إلى float32 للدمج
enhanced_tile_float = enhanced_tile[:actual_tile_h, :actual_tile_w].astype(np.float32)
for c in range(3):
final_image[y_start:y_end_out, x_start:x_end_out, c] += (
enhanced_tile_float[:, :, c] * fade
)
weight_map[y_start:y_end_out, x_start:x_end_out] += fade
processed_tiles += 1
if processed_tiles % 5 == 0:
print(f"📦 تمت معالجة {processed_tiles}/{total_tiles} من الأجزاء")
# ✅ تطبيع حسب الوزن بشكل صحيح
# تجنب القسمة على صفر
weight_map = np.maximum(weight_map, 1e-6)
for c in range(3):
final_image[:, :, c] = final_image[:, :, c] / weight_map
# ✅ تحويل إلى uint8 في النهاية
final_image = np.clip(final_image, 0, 255).astype(np.uint8)
print("✅ تم دمج الأجزاء بنجاح!")
return final_image
except Exception as exc:
print(f"❌ خطأ أثناء معالجة الأجزاء: {exc}")
import traceback
traceback.print_exc()
return None
def postprocess_from_ai(self, output):
"""تحويل مخرجات النموذج إلى صورة BGR."""
try:
if len(output.shape) == 4:
output = np.squeeze(output, axis=0)
if output.shape[0] == 3:
output = np.transpose(output, (1, 2, 0))
output = np.clip(output, 0, 1)
output = (output * 255).astype(np.uint8)
bgr = cv2.cvtColor(output, cv2.COLOR_RGB2BGR)
return bgr
except Exception as exc:
print(f"❌ خطأ أثناء تحويل مخرجات النموذج: {exc}")
return None
def lanczos_enhanced(self, image):
"""رفع دقة الصورة بطريقة لانكزوس مع تحسينات إضافية."""
try:
h, w = image.shape[:2]
pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
# تقليل الضوضاء للصور الصغيرة فقط
if min(w, h) < 300:
pil_image = pil_image.filter(ImageFilter.MedianFilter(size=3))
new_size = (w * self.scale, h * self.scale)
upscaled = pil_image.resize(new_size, Image.LANCZOS)
# تحسينات خفيفة
enhancer = ImageEnhance.Sharpness(upscaled)
upscaled = enhancer.enhance(1.2)
enhancer = ImageEnhance.Contrast(upscaled)
upscaled = enhancer.enhance(1.05)
return cv2.cvtColor(np.array(upscaled), cv2.COLOR_RGB2BGR)
except Exception as exc:
print(f"❌ خطأ في طريقة لانكزوس: {exc}")
# إرجاع الصورة مكبرة بطريقة بسيطة
h, w = image.shape[:2]
return cv2.resize(image, (w * self.scale, h * self.scale), interpolation=cv2.INTER_CUBIC)
upscaler = SuperUpscaler()
@app.route("/health", methods=["GET"])
def health_check():
model_type = "UltraSharp AI 4x" if upscaler.setup_complete else "Enhanced Traditional Upscaler"
return jsonify(
{
"status": "running",
"model_loaded": upscaler.setup_complete,
"model_type": model_type,
"scale_factor": upscaler.scale,
"ai_model_available": upscaler.setup_complete,
"supports_large_images": True,
"message": f"Super Resolution API running with {model_type}",
}
)
@app.route("/upscale", methods=["POST"])
def upscale_image():
try:
if "image" not in request.files:
return jsonify({"success": False, "error": "لم يتم إرسال صورة"}), 400
file = request.files["image"]
if file.filename == "":
return jsonify({"success": False, "error": "لم يتم اختيار صورة"}), 400
start_total = time.time()
image_bytes = file.read()
nparr = np.frombuffer(image_bytes, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if image is None:
return jsonify({"success": False, "error": "فشل في قراءة الصورة"}), 400
original_height, original_width = image.shape[:2]
print(f"📏 معالجة صورة: {original_width}x{original_height}")
method_used = "UltraSharp AI 4x" if upscaler.setup_complete else "Enhanced Lanczos"
process_start = time.time()
upscaled_image = upscaler.enhance_image(image)
process_time = time.time() - process_start
if upscaled_image is None:
return jsonify({"success": False, "error": "فشل في رفع دقة الصورة"}), 500
final_h, final_w = upscaled_image.shape[:2]
print(f"✅ زمن المعالجة: {process_time:.2f}s | النتيجة: {final_w}x{final_h}")
# حفظ بجودة عالية
pil_image = Image.fromarray(cv2.cvtColor(upscaled_image, cv2.COLOR_BGR2RGB))
buffer = io.BytesIO()
pil_image.save(buffer, format="PNG", optimize=False, compress_level=1)
img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
total_time = time.time() - start_total
return jsonify(
{
"success": True,
"message": f"تم رفع الدقة بنجاح باستخدام {method_used}!",
"image": img_base64,
"original_size": f"{original_width}x{original_height}",
"upscaled_size": f"{final_w}x{final_h}",
"scale_factor": upscaler.scale,
"model_type": method_used,
"processing_time": f"{total_time:.2f}s",
"ai_used": upscaler.setup_complete,
}
)
except Exception as exc:
print(f"❌ خطأ غير متوقّع: {exc}")
import traceback
traceback.print_exc()
return jsonify({"success": False, "error": f"خطأ في معالجة الصورة: {str(exc)}"}), 500
@app.route("/", methods=["GET"])
def home():
model_status = "🤖 UltraSharp AI 4x" if upscaler.setup_complete else "🔧 Enhanced Traditional"
return f"""
<!DOCTYPE html>
<html>
<head>
<title>AI Super Resolution</title>
<style>
body {{ font-family: Arial; padding: 20px; background: #f5f5f5; }}
.container {{ max-width: 800px; margin: 0 auto; background: white; padding: 30px; border-radius: 10px; }}
h1 {{ color: #333; }}
.status {{ padding: 15px; background: #e8f5e9; border-radius: 5px; margin: 20px 0; }}
</style>
</head>
<body>
<div class="container">
<h1>🚀 AI Super Resolution API</h1>
<div class="status">
<p><strong>حالة النموذج:</strong> {model_status}</p>
<p><strong>معامل التكبير:</strong> 4x</p>
<p><strong>جاهز للاستخدام:</strong> ✅</p>
</div>
<h3>نقاط النهاية (Endpoints):</h3>
<ul>
<li><a href="/health">/health</a> - فحص حالة الخدمة</li>
<li>/upscale - رفع دقة الصورة (POST)</li>
</ul>
</div>
</body>
</html>
"""
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
print(f"🚀 تشغيل Flask على المنفذ {port}...")
print("=" * 60)
app.run(host="0.0.0.0", port=port, debug=False, threaded=True)