#!/usr/bin/env python3 """ ╔══════════════════════════════════════════════════════════════╗ ║ 🔱 SONAR-AI v15.2 - YOLO11x + DSOS-BA Edition ║ ║ 8 AI Models + Detection + TSC HS Codes + Auth ║ ╠══════════════════════════════════════════════════════════════╣ ║ ✅ 7 Classification Models (Ensemble) ║ ║ ✅ YOLOv10x-cls (مدرّب سابقاً) ║ ║ ✅ YOLO11x-cls (92.6% Top-1, 97.4% Top-5) 🆕 ║ ║ ✅ 2,128 HS Codes from TSC Database ║ ║ ✅ أدوات معالجة صور السونار (12 فلتر) ║ ║ ✅ واجهة RTL عربية كاملة ║ ║ ✅ جدول إحصائيات مع قاعدة بيانات SQLite ║ ║ ✅ Login Authentication ║ ║ ✅ Real AI Inference (GPU) ║ ║ ✅ C# Desktop API Integration ║ ╚══════════════════════════════════════════════════════════════╝ """ import gradio as gr import pandas as pd import numpy as np from datetime import datetime import random import os import json import glob from PIL import Image, ImageDraw, ImageFont import pickle from scipy import ndimage from scipy.signal import find_peaks from scipy.fft import fft2, fftshift import time as time_module import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt import io, base64 VERSION = "17.2" API_URL = "http://65.108.7.202:5555/api" # ═══════════════════════════════════════════════════════════════ # 🔧 Detect environment # ═══════════════════════════════════════════════════════════════ USE_GPU = False MODELS_LOADED = False try: import torch import torch.nn as nn from torchvision import transforms, models USE_GPU = torch.cuda.is_available() DEVICE = torch.device('cuda' if USE_GPU else 'cpu') print(f"✅ PyTorch loaded | Device: {DEVICE}") except ImportError: print("⚠️ PyTorch not available - simulation mode") torch = None try: from ultralytics import YOLO YOLO_AVAILABLE = True print("✅ Ultralytics loaded") except ImportError: YOLO_AVAILABLE = False print("⚠️ Ultralytics not available") # ═══════════════════════════════════════════════════════════════ # 🔐 تسجيل الدخول # ═══════════════════════════════════════════════════════════════ USERS = { "admin": "sonar2026", "inspector": "inspect123", "customs": "customs456", "A": "1", "عباس": "1", } def authenticate(username, password): return username in USERS and USERS[username] == password # ═══════════════════════════════════════════════════════════════ # 🏷️ الفئات (43 - matching Training.py) # ═══════════════════════════════════════════════════════════════ CATEGORIES = [ 'appliances', 'auto_parts', 'bags', 'banana', 'batteries', 'beverages', 'cables', 'canned_food', 'ceramic', 'chemicals', 'cleaning', 'clothes', 'cooking_oil', 'cosmetics', 'electronics', 'fruits', 'furniture', 'glass', 'kitchenware', 'lubricants', 'machinery', 'meat', 'medical', 'milk', 'motorcycle', 'nuts', 'other', 'paper', 'pipes', 'plastic', 'rice', 'seeds', 'shoes', 'snacks', 'spices', 'steel', 'sugar', 'tea', 'tires', 'tools', 'toys', 'weapons', 'wood' ] # ═══════════════════════════════════════════════════════════════ # 🆕 Swin-V2 Categories # ═══════════════════════════════════════════════════════════════ SWINV2_CATEGORIES = sorted([ 'appliances', 'auto_parts', 'bags', 'banana', 'batteries', 'beverages', 'cables', 'canned_food', 'ceramic', 'chemicals', 'cleaning', 'clothes', 'cooking_oil', 'cosmetics', 'electronics', 'fruits', 'furniture', 'glass', 'kitchenware', 'lubricants', 'machinery', 'meat', 'medical', 'milk', 'motorcycle', 'nuts', 'oil', 'paint', 'paper', 'perfume', 'pharmaceutical', 'plastic', 'rice', 'rubber', 'shoes', 'spices', 'sugar', 'tea', 'textiles', 'tires', 'tobacco', 'toys', 'weapons' ]) SWINV2_CONCEALMENT_CLASSES = ['match', 'no_match'] SWINV2_RISK_CLASSES = ['critical', 'high', 'low', 'medium', 'safe'] HS_CHAPTERS = { '02':'لحوم','04':'ألبان وبيض','08':'فواكه','09':'بن وشاي وبهارات','10':'حبوب', '12':'بذور','15':'دهون وزيوت','17':'سكر','19':'محضرات حبوب','20':'محضرات خضر وفواكه', '22':'مشروبات','27':'وقود معدني','33':'عطور ومستحضرات','34':'صابون ومنظفات', '36':'متفجرات','38':'منتجات كيماوية','39':'لدائن بلاستيك','40':'مطاط', '42':'مصنوعات جلدية','44':'خشب','48':'ورق وكرتون', '62':'ملابس','64':'أحذية','69':'منتجات خزفية','70':'زجاج', '72':'حديد وصلب','73':'مصنوعات حديد','82':'أدوات معدنية', '84':'آلات ومعدات','85':'أجهزة كهربائية','87':'سيارات ومركبات', '90':'أجهزة طبية','93':'أسلحة','94':'أثاث','95':'ألعاب','96':'مصنوعات متنوعة', } CARGO_DATABASE = { 'furniture': {'ar':'أثاث','hs':'940360','duty':30,'ch':'94'}, 'steel': {'ar':'حديد','hs':'721049','duty':5,'ch':'72'}, 'paper': {'ar':'ورق','hs':'480519','duty':10,'ch':'48'}, 'clothes': {'ar':'ملابس','hs':'620342','duty':20,'ch':'62'}, 'other': {'ar':'أخرى','hs':'999999','duty':15,'ch':'99'}, 'machinery': {'ar':'آلات','hs':'847989','duty':5,'ch':'84'}, 'milk': {'ar':'منتجات حليب','hs':'040210','duty':10,'ch':'04'}, 'electronics': {'ar':'إلكترونيات','hs':'854370','duty':10,'ch':'85'}, 'auto_parts': {'ar':'قطع غيار سيارات','hs':'870899','duty':5,'ch':'87'}, 'appliances': {'ar':'أجهزة منزلية','hs':'851660','duty':20,'ch':'85'}, 'ceramic': {'ar':'سيراميك','hs':'691090','duty':15,'ch':'69'}, 'chemicals': {'ar':'كيميائيات','hs':'382499','duty':5,'ch':'38'}, 'plastic': {'ar':'بلاستيك','hs':'392690','duty':15,'ch':'39'}, 'banana': {'ar':'موز','hs':'080390','duty':5,'ch':'08'}, 'tires': {'ar':'إطارات','hs':'401110','duty':15,'ch':'40'}, 'tools': {'ar':'أدوات','hs':'820559','duty':10,'ch':'82'}, 'toys': {'ar':'ألعاب','hs':'950300','duty':20,'ch':'95'}, 'seeds': {'ar':'بذور','hs':'120991','duty':5,'ch':'12'}, 'tea': {'ar':'شاي','hs':'090230','duty':10,'ch':'09'}, 'cleaning': {'ar':'مواد تنظيف','hs':'340220','duty':10,'ch':'34'}, 'canned_food': {'ar':'أغذية معلبة','hs':'200899','duty':10,'ch':'20'}, 'nuts': {'ar':'مكسرات','hs':'080290','duty':10,'ch':'08'}, 'glass': {'ar':'زجاج','hs':'701090','duty':15,'ch':'70'}, 'cables': {'ar':'كابلات','hs':'854449','duty':10,'ch':'85'}, 'snacks': {'ar':'وجبات خفيفة','hs':'190590','duty':15,'ch':'19'}, 'rice': {'ar':'رز','hs':'100630','duty':10,'ch':'10'}, 'pipes': {'ar':'أنابيب','hs':'730890','duty':10,'ch':'73'}, 'cosmetics': {'ar':'مستحضرات تجميل','hs':'330499','duty':20,'ch':'33'}, 'meat': {'ar':'لحوم','hs':'020230','duty':5,'ch':'02'}, 'lubricants': {'ar':'زيوت تشحيم','hs':'271019','duty':5,'ch':'27'}, 'bags': {'ar':'حقائب','hs':'420222','duty':20,'ch':'42'}, 'cooking_oil': {'ar':'زيت طبخ','hs':'151190','duty':5,'ch':'15'}, 'beverages': {'ar':'مشروبات','hs':'220299','duty':15,'ch':'22'}, 'shoes': {'ar':'أحذية','hs':'640299','duty':20,'ch':'64'}, 'batteries': {'ar':'بطاريات','hs':'850760','duty':15,'ch':'85'}, 'wood': {'ar':'خشب','hs':'440799','duty':10,'ch':'44'}, 'fruits': {'ar':'فواكه','hs':'081090','duty':5,'ch':'08'}, 'motorcycle': {'ar':'دراجات نارية','hs':'871190','duty':20,'ch':'87'}, 'medical': {'ar':'مستلزمات طبية','hs':'901890','duty':0,'ch':'90'}, 'kitchenware': {'ar':'أدوات مطبخ','hs':'732393','duty':15,'ch':'73'}, 'spices': {'ar':'بهارات','hs':'090421','duty':10,'ch':'09'}, 'weapons': {'ar':'أسلحة','hs':'930100','duty':0,'ch':'93'}, 'sugar': {'ar':'سكر','hs':'170199','duty':5,'ch':'17'}, } # ═══════════════════════════════════════════════════════════════ # 📂 تحميل قاعدة TSC # ═══════════════════════════════════════════════════════════════ TSC_DATABASE = {} def load_tsc_database(): global TSC_DATABASE paths = [ "نسخة_TSC_2025-12-09.xlsx", "نسخة TSC 2025-12-09.xlsx", "/app/نسخة_TSC_2025-12-09.xlsx", "/app/نسخة TSC 2025-12-09.xlsx", os.path.join(os.path.dirname(os.path.abspath(__file__)), "نسخة_TSC_2025-12-09.xlsx"), os.path.join(os.path.dirname(os.path.abspath(__file__)), "نسخة TSC 2025-12-09.xlsx"), ] for pattern in ["/app/*TSC*.xlsx", "/app/*نسخة*.xlsx", "./*TSC*.xlsx", "./*نسخة*.xlsx"]: paths.extend(glob.glob(pattern)) for path in paths: if os.path.exists(path): try: df = pd.read_excel(path, engine='openpyxl') for _, row in df.iterrows(): hs = str(int(row['IDE_HSC_NB1'])) if hs not in TSC_DATABASE: TSC_DATABASE[hs] = { 'desc': str(row['GDS_DS2']).strip()[:100] if pd.notna(row['GDS_DS2']) else '', 'avg_price': round(float(row['AVR_MNT']), 2) if pd.notna(row['AVR_MNT']) else 0, 'unit': str(row['AVR_UNT']) if pd.notna(row['AVR_UNT']) else '', 'nb5': str(row['IDE_HSC_NB5']) if pd.notna(row['IDE_HSC_NB5']) else '', 'currency': 'USD', } print(f"✅ TSC loaded: {len(TSC_DATABASE)} HS codes from {path}") return except Exception as e: print(f"⚠️ Error loading {path}: {e}") print("⚠️ TSC file not found") load_tsc_database() # ═══════════════════════════════════════════════════════════════ # 🤖 تحميل النماذج # ═══════════════════════════════════════════════════════════════ CLASSIFICATION_MODELS = {} DETECTION_MODEL = None ENSEMBLE_CONFIG = None ANOMALY_MODELS = None risk_model = None SELECTED_FEATURES = None FEATURE_EXTRACTOR = None SWINV2_CLASSIFICATION = None SWINV2_CONCEALMENT = None SWINV2_RISK = None MODEL_REPO = "DrAbbas/SONAR-AI-Models" def find_model(name): # 1. Check local paths for base in [".", "/app", os.path.dirname(os.path.abspath(__file__))]: path = os.path.join(base, name) if os.path.exists(path): return path # 2. Download from HF Model repo try: from huggingface_hub import hf_hub_download path = hf_hub_download(repo_id=MODEL_REPO, filename=name) print(f"📥 Downloaded {name} from {MODEL_REPO}") return path except Exception as e: print(f"⚠️ Could not download {name}: {e}") return None def get_transforms(): if torch is None: return None return transforms.Compose([ transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]) def get_swinv2_transforms(img_size=256): """Transforms for Swin-V2 trained models (256×256)""" if torch is None: return None return transforms.Compose([ transforms.Resize((img_size, img_size)), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]) if torch is not None: class SOSUFS(nn.Module): def __init__(self, inf, outf, k=16): super().__init__() self.centers = nn.Parameter(torch.randn(k, inf)) self.importance = nn.Parameter(torch.ones(inf)) self.fc = nn.Linear(inf, outf) self.bn = nn.BatchNorm1d(outf) def forward(self, x): w = x * torch.sigmoid(self.importance) d = torch.cdist(w.unsqueeze(1), self.centers.unsqueeze(0)) s = torch.softmax(-d.squeeze(1), dim=-1) return nn.functional.gelu(self.bn(self.fc(w + s @ self.centers))) class DeepSOSUFS(nn.Module): def __init__(self, nc): super().__init__() self.backbone = models.efficientnet_b3(weights=None) f = self.backbone.classifier[1].in_features self.backbone.classifier = nn.Identity() self.attn = nn.Sequential(nn.Linear(f, f//4), nn.ReLU(), nn.Linear(f//4, f), nn.Sigmoid()) self.s1 = SOSUFS(f, 512) self.s2 = SOSUFS(512, 256) self.fc = nn.Linear(256, nc) self.drop = nn.Dropout(0.3) def forward(self, x): x = self.backbone(x) x = x * self.attn(x) x = self.drop(self.s1(x)) x = self.drop(self.s2(x)) return self.fc(x) def load_all_models(): global CLASSIFICATION_MODELS, DETECTION_MODEL, ENSEMBLE_CONFIG, MODELS_LOADED nc = len(CATEGORIES) if torch is None: print("⚠️ PyTorch not available") return # 1. ConvNeXt-V2 path = find_model("convnext_best.pt") if path: try: import timm m = timm.create_model('convnext_base', pretrained=False, num_classes=nc) m.load_state_dict(torch.load(path, map_location=DEVICE)) m.to(DEVICE).eval() CLASSIFICATION_MODELS['ConvNeXt-V2'] = m print(f"✅ ConvNeXt-V2 loaded") except Exception as e: print(f"⚠️ ConvNeXt: {e}") # 2. EfficientNet-V2 path = find_model("efficientnet_best.pt") if path: try: import timm m = timm.create_model('tf_efficientnetv2_m', pretrained=False, num_classes=nc) checkpoint = torch.load(path, map_location=DEVICE) if isinstance(checkpoint, dict) and 'model_state_dict' in checkpoint: m.load_state_dict(checkpoint['model_state_dict']) else: m.load_state_dict(checkpoint) m.to(DEVICE).eval() CLASSIFICATION_MODELS['EfficientNet-V2'] = m print(f"✅ EfficientNet-V2 loaded") except Exception as e: print(f"⚠️ EfficientNet: {e}") # 3. ResNet152 path = find_model("resnet152_best.pt") if path: try: m = models.resnet152(weights=None) m.fc = nn.Linear(m.fc.in_features, nc) checkpoint = torch.load(path, map_location=DEVICE) if isinstance(checkpoint, dict) and 'model_state_dict' in checkpoint: m.load_state_dict(checkpoint['model_state_dict']) else: m.load_state_dict(checkpoint) m.to(DEVICE).eval() CLASSIFICATION_MODELS['ResNet152'] = m print(f"✅ ResNet152 loaded") except Exception as e: print(f"⚠️ ResNet152: {e}") # 4. Deep-SOSUFS-v3 path = find_model("sosufs_best.pt") if path: try: m = DeepSOSUFS(nc) checkpoint = torch.load(path, map_location=DEVICE) if isinstance(checkpoint, dict) and 'model_state_dict' in checkpoint: m.load_state_dict(checkpoint['model_state_dict']) else: m.load_state_dict(checkpoint) m.to(DEVICE).eval() CLASSIFICATION_MODELS['Deep-SOSUFS-v3'] = m print(f"✅ Deep-SOSUFS-v3 loaded") except Exception as e: print(f"⚠️ SOSUFS: {e}") # 5. YOLOv10x-cls path = find_model("yolov10x_cls_best.pt") if path and YOLO_AVAILABLE: try: CLASSIFICATION_MODELS['YOLOv10x-cls'] = YOLO(path) print(f"✅ YOLOv10x-cls loaded") except Exception as e: print(f"⚠️ YOLOv10x-cls: {e}") # 6. YOLO11x-cls (92.6% Top-1, 97.4% Top-5) 🆕 path = find_model("yolo11x_cls_best.pt") if path and YOLO_AVAILABLE: try: CLASSIFICATION_MODELS['YOLO11x-cls'] = YOLO(path) print(f"✅ YOLO11x-cls loaded (92.6% Top-1)") except Exception as e: print(f"⚠️ YOLO11x-cls: {e}") # 7. Ensemble config path = find_model("ensemble_config.json") if path: try: with open(path) as f: ENSEMBLE_CONFIG = json.load(f) print(f"✅ Ensemble config loaded") except Exception as e: print(f"⚠️ Ensemble: {e}") # 8. Detection det_path = find_model("sonar_yolo11_detection_best.pt") or find_model("sonar_yolov8x_detection_best.pt") or find_model("best_detection.pt") or find_model("best.pt") if det_path and YOLO_AVAILABLE: try: DETECTION_MODEL = YOLO(det_path) print(f"✅ Detection loaded") except Exception as e: print(f"⚠️ Detection: {e}") # 9. Anomaly Detection global ANOMALY_MODELS, SELECTED_FEATURES, FEATURE_EXTRACTOR path = find_model("anomaly_models.pkl") if path: try: with open(path, 'rb') as f: ANOMALY_MODELS = pickle.load(f) print(f"✅ Anomaly loaded ({len(ANOMALY_MODELS)} models)") except Exception as e: print(f"⚠️ Anomaly: {e}") path = find_model("selected_features.npy") if path: try: SELECTED_FEATURES = np.load(path) print(f"✅ DSOS-BA features ({len(SELECTED_FEATURES)} selected)") except Exception as e: print(f"⚠️ Features: {e}") # Download risk calibration model risk_model = None try: risk_path = find_model("risk_calibration_model.pkl") if risk_path is None: risk_path = hf_hub_download(repo_id=MODEL_REPO, filename="risk_calibration_model.pkl", token=HF_TOKEN) with open(risk_path, 'rb') as f: risk_data = pickle.load(f) risk_model = risk_data['model'] print("✅ Risk calibration model loaded") except Exception as e: risk_model = None print(f"⚠️ Risk model not loaded: {e}") if torch is not None: try: fe = models.efficientnet_v2_m(weights=None) fe.classifier = nn.Identity() fe.to(DEVICE).eval() FEATURE_EXTRACTOR = fe print(f"✅ Feature extractor ready") except Exception as e: print(f"⚠️ FE: {e}") # ═══════════════════════════════════════════════════════════════ # 🆕 Swin-V2 Models # ═══════════════════════════════════════════════════════════════ global SWINV2_CLASSIFICATION, SWINV2_CONCEALMENT, SWINV2_RISK # 10. Swin-V2 Classification (43 classes, 94.1% F1) swinv2_cls_path = find_model("classification/best_swinv2_43cls.pth") if swinv2_cls_path: try: import timm nc_swin = len(SWINV2_CATEGORIES) m = timm.create_model('swinv2_tiny_window8_256', pretrained=False, num_classes=nc_swin) ckpt = torch.load(swinv2_cls_path, map_location=DEVICE) if isinstance(ckpt, dict) and 'model_state_dict' in ckpt: m.load_state_dict(ckpt['model_state_dict']) else: m.load_state_dict(ckpt) m.to(DEVICE).eval() CLASSIFICATION_MODELS['Swin-V2'] = m SWINV2_CLASSIFICATION = m print(f"✅ Swin-V2 Classification loaded ({nc_swin}cls, 94.1% F1)") except Exception as e: print(f"⚠️ Swin-V2 Classification: {e}") # 11. Concealment Detection (98.9% F1) conc_path = find_model("concealment/best_eva02.pth") or find_model("concealment/best_swinv2.pth") if conc_path: try: import timm if 'eva02' in conc_path: m = timm.create_model('eva02_tiny_patch14_224', pretrained=False, num_classes=2) img_size_conc = 224 else: m = timm.create_model('swinv2_tiny_window8_256', pretrained=False, num_classes=2) img_size_conc = 256 ckpt = torch.load(conc_path, map_location=DEVICE) if isinstance(ckpt, dict) and 'model_state_dict' in ckpt: m.load_state_dict(ckpt['model_state_dict']) else: m.load_state_dict(ckpt) m.to(DEVICE).eval() m._conc_img_size = img_size_conc SWINV2_CONCEALMENT = m print(f"✅ Concealment loaded (98.9% F1)") except Exception as e: SWINV2_CONCEALMENT = None print(f"⚠️ Concealment: {e}") else: SWINV2_CONCEALMENT = None # 12. Risk Assessment (5 levels, 97.2% F1) risk_path = find_model("risk/best_swinv2_risk.pth") if risk_path: try: import timm m = timm.create_model('swinv2_tiny_window8_256', pretrained=False, num_classes=5) ckpt = torch.load(risk_path, map_location=DEVICE) if isinstance(ckpt, dict) and 'model_state_dict' in ckpt: m.load_state_dict(ckpt['model_state_dict']) else: m.load_state_dict(ckpt) m.to(DEVICE).eval() SWINV2_RISK = m print(f"✅ Swin-V2 Risk loaded (5 levels, 97.2% F1)") except Exception as e: SWINV2_RISK = None print(f"⚠️ Swin-V2 Risk: {e}") else: SWINV2_RISK = None MODELS_LOADED = len(CLASSIFICATION_MODELS) > 0 anom_txt = f"anomaly({len(ANOMALY_MODELS)})" if ANOMALY_MODELS else "no-anomaly" conc_txt = "✅ Concealment" if SWINV2_CONCEALMENT else "❌ Concealment" risk_txt = "✅ Risk" if SWINV2_RISK else "❌ Risk" print(f"\n🔱 Total: {len(CLASSIFICATION_MODELS)} cls + {'1 det' if DETECTION_MODEL else '0 det'} + {anom_txt} + {conc_txt} + {risk_txt}") load_all_models() # ═══════════════════════════════════════════════════════════════ # 🔬 Inference # ═══════════════════════════════════════════════════════════════ def extract_features_from_image(img): if FEATURE_EXTRACTOR is None or torch is None: return None try: transform = get_transforms() if img.mode != 'RGB': img = img.convert('RGB') t = transform(img).unsqueeze(0).to(DEVICE) with torch.no_grad(): return FEATURE_EXTRACTOR(t).cpu().numpy()[0] except: return None def check_anomaly(features): if ANOMALY_MODELS is None or features is None or SELECTED_FEATURES is None: return 0.0, False, {} try: feat_sel = features[SELECTED_FEATURES] if len(features) > len(SELECTED_FEATURES) else features x = feat_sel.reshape(1, -1) votes = 0 details = {} for name, model in ANOMALY_MODELS.items(): try: pred = model.predict(x)[0] is_anom = (pred == -1) votes += int(is_anom) details[name] = is_anom except: details[name] = False score = votes / max(len(ANOMALY_MODELS), 1) return score, votes >= 2, details except: return 0.0, False, {} def scan_container_regions(img): regions = [] if img is None or ANOMALY_MODELS is None: return regions w_img, h_img = img.size for i in range(4): x = i * (w_img // 4) sw, sh = w_img // 4, h_img try: region = img.crop((x, 0, x+sw, sh)) feat = extract_features_from_image(region) score, is_anom, det = check_anomaly(feat) if feat is not None else (0.0, False, {}) except: score, is_anom, det = 0.0, False, {} pos = ["مقدمة","وسط-1","وسط-2","نهاية"][i] status = 'danger' if score >= 0.67 else ('warning' if is_anom else 'normal') label = f"⚠️ مشبوه - {pos}" if is_anom else f"✅ طبيعي - {pos}" regions.append({'bbox':(x,0,sw,sh),'anomaly_score':score,'is_anomaly':is_anom, 'status':status,'label_ar':label,'section':i+1}) return regions def draw_arabic_boxes(img, regions): if not regions: return img draw = ImageDraw.Draw(img) try: font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 14) except: font = ImageFont.load_default() for r in regions: x, y, w, h = r['bbox'] color = (255,0,0) if r['status']=='danger' else ((255,165,0) if r['status']=='warning' else (0,200,0)) thick = 4 if r['status']=='danger' else 2 for t in range(thick): draw.rectangle([x-t, y-t, x+w+t, y+h+t], outline=color) txt = f"{r['label_ar']} {r['anomaly_score']:.0%}" draw.rectangle([x, y, x+len(txt)*7+10, y+20], fill=color) draw.text((x+5, y+2), txt, fill=(255,255,255), font=font) return img def classify_single_region(model, region_img, categories, img_size=256): """تصنيف منطقة واحدة بنموذج Swin-V2""" if torch is None or model is None: return None, 0.0 try: tf = get_swinv2_transforms(img_size) inp = tf(region_img.convert('RGB')).unsqueeze(0).to(DEVICE) with torch.no_grad(): out = model(inp) probs = torch.softmax(out, dim=1)[0].cpu().numpy() top_idx = int(probs.argmax()) if top_idx < len(categories): return categories[top_idx], float(probs[top_idx]) except Exception as e: print(f"⚠️ Region classify: {e}") return None, 0.0 def classify_regions(img, n_regions=4, min_confidence=0.30): """🆕 تقسيم صورة الحاوية إلى مناطق وتصنيف كل منطقة بـ Swin-V2""" if img is None or SWINV2_CLASSIFICATION is None: return [] w, h = img.size region_w = w // n_regions results = [] pos_names = ["المقدمة", "وسط-1", "وسط-2", "النهاية"] if n_regions == 4 else [f"قسم-{i+1}" for i in range(n_regions)] for i in range(n_regions): x1 = i * region_w x2 = min(x1 + region_w, w) region = img.crop((x1, 0, x2, h)) cat, conf = classify_single_region(SWINV2_CLASSIFICATION, region, SWINV2_CATEGORIES, img_size=256) if cat and conf >= min_confidence: info = get_hs_info(cat) results.append({'region': i+1, 'position': pos_names[i] if i < len(pos_names) else f"قسم-{i+1}", 'category': cat, 'category_ar': info['ar'], 'hs_code': info['hs'], 'duty': info['duty'], 'confidence': conf, 'bbox': (x1, 0, x2-x1, h)}) return results def draw_region_boxes(img, region_results): """رسم مربعات التصنيف على أقسام الحاوية""" if not region_results: return img draw = ImageDraw.Draw(img) try: font_sm = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 12) except: font_sm = ImageFont.load_default() colors = [(46,125,50),(21,101,192),(106,27,154),(230,81,0),(198,40,40),(0,131,143)] unique_cats = list(set(r['category'] for r in region_results)) for r in region_results: x, y, w, h = r['bbox'] color = colors[unique_cats.index(r['category']) % len(colors)] for t in range(3): draw.rectangle([x+t, y+t, x+w-t, y+h-t], outline=color) label = f"{r['position']}: {r['category_ar']} ({r['confidence']:.0%})" draw.rectangle([x, y, x+len(label)*8+10, y+24], fill=color) draw.text((x+5, y+3), label, fill=(255,255,255), font=font_sm) if len(unique_cats) > 1: alert = f"⚠️ {len(unique_cats)} أنواع بضائع مختلفة!" img_w = img.size[0] draw.rectangle([img_w//2-120, 2, img_w//2+120, 28], fill=(198,40,40)) draw.text((img_w//2-110, 5), alert, fill=(255,255,255), font=font_sm) return img def classify_image(img): if not MODELS_LOADED or img is None: return simulate_classification() transform_224 = get_transforms() transform_256 = get_swinv2_transforms(256) if img.mode != 'RGB': img = img.convert('RGB') input_224 = transform_224(img).unsqueeze(0).to(DEVICE) input_256 = transform_256(img).unsqueeze(0).to(DEVICE) if transform_256 else input_224 all_preds = {} model_results = {} for name, model in CLASSIFICATION_MODELS.items(): try: if name in ('YOLOv10x-cls', 'YOLO11x-cls'): res = model(img, verbose=False) if res and res[0].probs is not None: probs = res[0].probs.data.cpu().numpy() for idx in range(min(len(probs), len(CATEGORIES))): cat = CATEGORIES[idx] all_preds.setdefault(cat, []).append(float(probs[idx])) top_idx = probs.argmax() if top_idx < len(CATEGORIES): model_results[name] = {'top1': CATEGORIES[top_idx], 'confidence': float(probs[top_idx])} continue elif name == 'Swin-V2': input_tensor = input_256 categories = SWINV2_CATEGORIES else: input_tensor = input_224 categories = CATEGORIES with torch.no_grad(): out = model(input_tensor) probs = torch.softmax(out, dim=1)[0].cpu().numpy() for idx in range(min(len(probs), len(categories))): cat = categories[idx] all_preds.setdefault(cat, []).append(float(probs[idx])) top_idx = probs.argmax() if top_idx < len(categories): model_results[name] = {'top1': categories[top_idx], 'confidence': float(probs[top_idx])} except Exception as e: print(f"⚠️ {name}: {e}") ensemble = {} for cat, scores in all_preds.items(): avg = np.mean(scores) if avg > 0.05: ensemble[cat] = avg sorted_results = sorted(ensemble.items(), key=lambda x: x[1], reverse=True)[:5] return {'ensemble': sorted_results, 'models': model_results, 'n_models': len(model_results)} def detect_objects(img): if DETECTION_MODEL is None or img is None: return None, [] try: results = DETECTION_MODEL(img, conf=0.25, verbose=False) if results and len(results) > 0: r = results[0] detections = [] if r.boxes is not None: for box in r.boxes: detections.append({ 'name': r.names.get(int(box.cls[0]), 'unknown'), 'confidence': float(box.conf[0]), }) annotated = Image.fromarray(r.plot()[..., ::-1]) return annotated, detections except Exception as e: print(f"⚠️ Detection: {e}") return None, [] def simulate_classification(): n = random.randint(3, 6) items = random.sample(list(CARGO_DATABASE.keys()), n) ensemble = sorted([(it, random.uniform(0.6, 0.99)) for it in items], key=lambda x: x[1], reverse=True) return {'ensemble': ensemble, 'models': {'Simulation': {'top1': items[0], 'confidence': 0.95}}, 'n_models': 0} # ═══════════════════════════════════════════════════════════════ # 🔗 HS Lookup # ═══════════════════════════════════════════════════════════════ def lookup_hs(hs_code): hs = str(hs_code).strip() if hs in TSC_DATABASE: return TSC_DATABASE[hs] for length in [6, 4]: matches = {k: v for k, v in TSC_DATABASE.items() if k.startswith(hs[:length])} if matches: return list(matches.values())[0] return None def get_hs_info(cargo_key): if cargo_key not in CARGO_DATABASE: return {'en': cargo_key.upper(), 'ar': cargo_key, 'hs': '999999', 'ch': '99', 'ch_name': '', 'duty': 15, 'tsc_desc': '', 'avg_price': 0, 'unit': '', 'tsc_code': ''} c = CARGO_DATABASE[cargo_key] tsc = lookup_hs(c['hs']) return { 'en': cargo_key.upper().replace('_', ' '), 'ar': c['ar'], 'hs': c['hs'], 'ch': c['ch'], 'ch_name': HS_CHAPTERS.get(c['ch'], ''), 'duty': c['duty'], 'tsc_desc': tsc['desc'] if tsc else c['ar'], 'avg_price': tsc['avg_price'] if tsc else 0, 'unit': tsc['unit'] if tsc else '', 'tsc_code': tsc['nb5'] if tsc else '', } # ═══════════════════════════════════════════════════════════════ # 📊 البيانات # ═══════════════════════════════════════════════════════════════ def get_stats(): return {'total': 6707, 'match': 6034, 'mismatch': 673, 'high_risk': 294, 'hs_codes': len(TSC_DATABASE), 'cargo_types': len(CARGO_DATABASE), 'models': len(CLASSIFICATION_MODELS), 'detection': DETECTION_MODEL is not None} def get_anomalies(): rows = [] for i in range(30): items = random.sample(list(CARGO_DATABASE.keys()), random.randint(1, 4)) rows.append({ 'الرقم': 6700 - i, 'الحاوية': f'TCNU{random.randint(1000000,9999999)}', 'الأصناف': ' + '.join([CARGO_DATABASE[it]['ar'] for it in items]), 'أكواد_HS': ' | '.join([CARGO_DATABASE[it]['hs'] for it in items]), 'الخطورة': random.randint(0, 5), 'التاريخ': datetime.now().strftime('%Y-%m-%d'), }) return pd.DataFrame(rows) def get_categories(): rows = [] for i, (en, c) in enumerate(CARGO_DATABASE.items()): tsc = lookup_hs(c['hs']) rows.append({ '#': i+1, 'EN': en.upper().replace('_',' '), 'AR': c['ar'], 'HS': c['hs'], 'الفصل': f"{c['ch']}-{HS_CHAPTERS.get(c['ch'],'')}", 'الرسوم%': c['duty'], 'السعر': f"${tsc['avg_price']}" if tsc and tsc['avg_price'] > 0 else '—', }) return pd.DataFrame(rows) def search_tsc(query): if not query or len(query.strip()) < 2: return pd.DataFrame([{'ملاحظة': 'اكتب كود HS أو وصف (حرفين على الأقل)'}]) q = query.strip() results = [] for hs, info in TSC_DATABASE.items(): if q in hs or q in info['desc']: results.append({'كود_HS': hs, 'الرمز': info['nb5'], 'الوصف': info['desc'], 'السعر': f"${info['avg_price']}" if info['avg_price'] > 0 else '—', 'الوحدة': info['unit']}) if len(results) >= 50: break return pd.DataFrame(results) if results else pd.DataFrame([{'نتيجة': f'لا توجد نتائج لـ: {q}'}]) # ═══════════════════════════════════════════════════════════════ # 🔬 التحليل الشامل # ═══════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════ # 🎛️ شريط أدوات السونار (Web Version) # ═══════════════════════════════════════════════════════════════ def apply_grayscale(img): if img is None: return None import numpy as np arr = np.array(img.convert('RGB')) gray = np.dot(arr[...,:3], [0.299, 0.587, 0.114]).astype(np.uint8) return Image.fromarray(np.stack([gray]*3, axis=-1)) def apply_invert(img): if img is None: return None import numpy as np arr = np.array(img.convert('RGB')) return Image.fromarray(255 - arr) def apply_thermal(img): if img is None: return None try: import cv2 except: return img import numpy as np arr = np.array(img.convert('RGB')) gray = cv2.cvtColor(arr, cv2.COLOR_RGB2GRAY) thermal = cv2.applyColorMap(gray, cv2.COLORMAP_JET) return Image.fromarray(cv2.cvtColor(thermal, cv2.COLOR_BGR2RGB)) def apply_cool(img): if img is None: return None try: import cv2 except: return img import numpy as np arr = np.array(img.convert('RGB')) gray = cv2.cvtColor(arr, cv2.COLOR_RGB2GRAY) cool = cv2.applyColorMap(gray, cv2.COLORMAP_WINTER) return Image.fromarray(cv2.cvtColor(cool, cv2.COLOR_BGR2RGB)) def apply_rainbow(img): if img is None: return None try: import cv2 except: return img import numpy as np arr = np.array(img.convert('RGB')) gray = cv2.cvtColor(arr, cv2.COLOR_RGB2GRAY) rainbow = cv2.applyColorMap(gray, cv2.COLORMAP_RAINBOW) return Image.fromarray(cv2.cvtColor(rainbow, cv2.COLOR_BGR2RGB)) def apply_edge(img): if img is None: return None try: import cv2 except: return img import numpy as np arr = np.array(img.convert('RGB')) gray = cv2.cvtColor(arr, cv2.COLOR_RGB2GRAY) edges = cv2.Canny(gray, 50, 150) colored = cv2.addWeighted(arr, 0.7, cv2.cvtColor(edges, cv2.COLOR_GRAY2RGB), 0.3, 0) return Image.fromarray(colored) def apply_contrast(img): if img is None: return None try: import cv2 except: return img import numpy as np arr = np.array(img.convert('RGB')) lab = cv2.cvtColor(arr, cv2.COLOR_RGB2LAB) l, a, b = cv2.split(lab) clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8)) l = clahe.apply(l) enhanced = cv2.merge([l, a, b]) return Image.fromarray(cv2.cvtColor(enhanced, cv2.COLOR_LAB2RGB)) def apply_brightness(img): if img is None: return None import numpy as np arr = np.array(img.convert('RGB')).astype(np.int16) arr = np.clip(arr + 40, 0, 255).astype(np.uint8) return Image.fromarray(arr) # ═══════════════════════════════════════════════════════════════ # 📡 ربط قاعدة البيانات عبر API # ═══════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════ # 🔬🌊 8 Novel Physics Techniques (C11-C18) — 244D # Dr. Abbas Fadel Jassim Al-Jubouri | UKM | 2026 # ═══════════════════════════════════════════════════════════════ SMUGGLING_PATTERNS_P = { 'drugs_in_dates':{'density_range':(0.35,0.55),'texture':'granular'}, 'pills_in_candy':{'density_range':(0.30,0.50),'texture':'uniform'}, 'powder_in_tea':{'density_range':(0.25,0.45),'texture':'fine'}, 'liquid_in_oil':{'density_range':(0.20,0.40),'texture':'smooth'}, 'metal_in_parts':{'density_range':(0.60,0.90),'texture':'sharp'}, 'plastic_in_toys':{'density_range':(0.15,0.35),'texture':'mixed'}, 'cigs_in_fabric':{'density_range':(0.25,0.50),'texture':'layered'}, 'cash_in_books':{'density_range':(0.40,0.65),'texture':'stacked'}, } def physics_preprocess(image, size=224): if image is None: return None if isinstance(image, np.ndarray): img = Image.fromarray(image) else: img = image return np.array(img.convert('L').resize((size,size)), dtype=np.float32)/255.0 def extract_dpm(img): H,W = img.shape; mh,mw = H//2,W//2 quads = [img[:mh,:mw],img[:mh,mw:],img[mh:,:mw],img[mh:,mw:]] means = [q.mean() for q in quads]; stds = [q.std() for q in quads]; gm = img.mean() return np.array([max(means)-min(means),max(stds)-min(stds),np.std(means),sum(abs(m-gm) for m in means)/4], dtype=np.float32) def extract_ckb(img): features = []; md,sd = img.mean(),img.std() gm = np.sqrt(ndimage.sobel(img,0)**2+ndimage.sobel(img,1)**2).mean() for _,pat in SMUGGLING_PATTERNS_P.items(): lo,hi = pat['density_range']; dm = min(max(0,1.0-abs(md-(lo+hi)/2)/((hi-lo)/2+1e-8)),1.0) t=pat['texture']; ts=0 if t=='granular': ts=min(sd/0.15,1) elif t=='uniform': ts=max(0,1-sd/0.10) elif t=='fine': ts=min(sd/0.12,1)*0.8 elif t=='smooth': ts=max(0,1-gm/0.15) elif t=='sharp': ts=min(gm/0.20,1) elif t=='mixed': ts=min(sd*gm*10,1) elif t=='layered': ts=min(abs(np.diff(img.mean(axis=1))).mean()/0.02,1) elif t=='stacked': ts=min(abs(np.diff(img.mean(axis=0))).mean()/0.02,1) features.append(0.6*dm+0.4*ts) return np.array(features, dtype=np.float32) def extract_3dtw(img): H,W = img.shape; surface = ndimage.gaussian_filter(img,1.5) gx,gy = ndimage.sobel(surface,1),ndimage.sobel(surface,0); grad_mag = np.sqrt(gx**2+gy**2) features = [surface.mean(),surface.std(),surface.max(),surface.min(),np.percentile(surface,75)-np.percentile(surface,25)] features += [grad_mag.mean(),grad_mag.std(),grad_mag.max(),np.percentile(grad_mag,90),(grad_mag>grad_mag.mean()+1.5*grad_mag.std()).sum()/(H*W)] xx,yy = np.meshgrid(np.linspace(0,2*np.pi,W),np.linspace(0,2*np.pi,H)) for f in [2,4,8,16,32]: wave = surface*np.sin(f*xx)*np.sin(f*yy); power = np.abs(fftshift(fft2(wave)))**2 features += [power.sum()/(H*W),power.max()] flat = surface.flatten(); mu,sigma = flat.mean(),flat.std() peaks,props = find_peaks(flat,height=mu+1.5*sigma,prominence=0.1) features += [len(peaks)/len(flat),props['prominences'].mean() if len(peaks)>0 else 0,ndimage.laplace(surface).std()] edges = (grad_mag>grad_mag.mean()+grad_mag.std()).astype(float) features += [edges.mean(),ndimage.label(edges)[1]/(H*W/100)] return np.array(features[:25], dtype=np.float32) def extract_cws(img): H,W = img.shape; cy,cx = H//2,W//2; y,x = np.ogrid[:H,:W] dist = np.sqrt((x-cx)**2+(y-cy)**2); max_r = np.sqrt(cx**2+cy**2); features = [] for i in range(8): mask = (dist>=i*max_r/8)&(dist<(i+1)*max_r/8) rv = img[mask] if mask.sum()>0 else np.array([0.0]); features += [rv.mean(),rv.std()] for d in [img[:,:W//2],img[:,W//2:],img[:H//2,:],img[H//2:,:]]: gm = np.sqrt(ndimage.sobel(d,1)**2+ndimage.sobel(d,0)**2); features += [d.mean(),d.std(),gm.mean(),gm.max()] for s in [3,1.5,0.5,0]: sm = ndimage.gaussian_filter(img,s) if s>0 else img; e = np.sqrt(ndimage.sobel(sm,0)**2+ndimage.sobel(sm,1)**2) features += [e.mean(),e.std(),e.max(),(e>e.mean()+2*e.std()).sum()/(H*W)] rm = features[:16:2]; features += [np.std(rm),max(rm)-min(rm)] return np.array(features[:50], dtype=np.float32) def extract_tcv(img): H,W = img.shape; features = [] for cy_c,cx_c in [(H//2,W//4),(H//2,W//2),(H//2,3*W//4)]: radius = min(H,W)//4; t = np.linspace(0,6*2*np.pi,300); r = radius*(1-t/(6*2*np.pi)) px = (cx_c+r*np.cos(t)).astype(int); py = (cy_c+r*np.sin(t)).astype(int) v = (px>=0)&(px=0)&(py0 else np.array([0.0]) c_feats = [sampled.mean(),sampled.std(),sampled.max()-sampled.min(),np.abs(np.diff(sampled)).mean() if len(sampled)>1 else 0] for d in range(5): sm = ndimage.gaussian_filter(img,0.5+d*1.0); dv = sm[py,px] if len(px)>0 else np.array([0.0]); c_feats += [dv.mean(),dv.std()] features += c_feats while len(features)<42: features.append(0.0) return np.array(features[:42], dtype=np.float32) def extract_dwe(img): H,W = img.shape; features = []; dark_mask = img<0.25; black_mask = img<0.10 dv = img[dark_mask] if dark_mask.sum()>0 else np.array([0.0]) features += [dark_mask.sum()/(H*W),black_mask.sum()/(H*W),dv.mean(),dv.std(),img.mean()] enhanced = np.clip(img*(1.0/(img.mean()+0.01)),0,1) de = enhanced[dark_mask] if dark_mask.sum()>0 else np.array([0.0]) features += [de.mean(),de.std(),de.max() if len(de)>0 else 0,enhanced.mean(),enhanced.std()] gm = np.sqrt(ndimage.sobel(img,1)**2+ndimage.sobel(img,0)**2) dg = gm[dark_mask] if dark_mask.sum()>0 else np.array([0.0]) features += [dg.mean(),dg.std(),dg.max() if len(dg)>0 else 0,gm.mean(),gm.std()] fi = np.abs(fftshift(fft2(img))); fd = np.abs(fftshift(fft2(img*dark_mask.astype(float)))) features += [fi.mean(),fi.std(),fd.mean(),fd.std(),fd.sum()/(fi.sum()+1e-8)] from scipy.stats import skew,kurtosis features += [skew(dv),kurtosis(dv),np.percentile(img,10),np.percentile(img,25),np.median(img)-dv.mean()] lh = H//10 for i in range(10): features.append(img[i*lh:(i+1)*lh,:].mean()) return np.array(features[:35], dtype=np.float32) def extract_mvf(img): H,W = img.shape; features = []; bins = np.linspace(0,1,9); lc = [] for i in range(8): mask = (img>=bins[i])&(imgtb+mult*img.std()).astype(float); features += [er.sum()/(H*W),ndimage.label(er)[1]/max(1,H*W/1000)] for fog in [0.1,0.2,0.3,0.5,0.7]: vis = (img>fog).astype(float); features += [vis.mean(),ndimage.label(vis)[1]/max(1,H*W/500)] rh,rw = H//7,W//7 for d in range(5): sm = ndimage.gaussian_filter(img,0.3+d*0.8) bm = [sm[i*rh:(i+1)*rh,j*rw:(j+1)*rw].mean() for i in range(7) for j in range(7)]; features.append(np.std(bm)) cy,cx = H//2,W//2; yy,xx = np.ogrid[:H,:W]; dist = np.sqrt((xx-cx)**2+(yy-cy)**2); mr = min(H,W)//2 for rf in [0.1,0.25,0.5,0.75,0.95]: ring = (dist>=rf*mr-3)&(dist<=rf*mr+3); rv = img[ring] if ring.sum()>0 else np.array([0.0]); features.append(rv.std()) return np.array(features[:40], dtype=np.float32) def extract_env(img): H,W = img.shape; features = [] thresholds = {'black_hole':(0.0,0.10),'desert':(0.10,0.30),'clear_sea':(0.30,0.70),'red_sea':(0.70,0.90),'white_sea':(0.90,1.01)} masks = {n:(img>=lo)&(img0 else np.array([0.0]); eg = gm[mask] if mask.sum()>0 else np.array([0.0]) features += [cov,vals.mean(),vals.std(),eg.mean(),eg.std()] for n,mask in masks.items(): labeled,nc = ndimage.label(mask.astype(int)); features += [nc/max(1,H*W/1000),mask.sum()/max(nc,1)/(H*W)] for i in range(5): row = int((i+0.5)*H/5); features.append(np.std(img[min(row,H-1),:])) return np.array(features[:40], dtype=np.float32) PHYSICS_TECHNIQUES = [ ('C11-DPM',extract_dpm,4,'🔥 تحليل الكثافة','Density Pattern Mismatch'), ('C12-CKB',extract_ckb,8,'📋 قاعدة الكمارك','Customs Knowledge Base'), ('C13-3DTW',extract_3dtw,25,'🌊 الموجة الحرارية','3D Thermal Wave'), ('C14-CWS',extract_cws,50,'🌀 العاصفة الحلقية','Circular Wave Storm'), ('C15-TCV',extract_tcv,42,'🌪️ الأعاصير الثلاثة','Triple Cyclone Vortex'), ('C16-DWE',extract_dwe,35,'🔦 تعزيز المعتم','Dark Wave Enhancement'), ('C17-MVF',extract_mvf,40,'🌋 طين-بركان-ضباب','Mud-Volcano-Fog'), ('C18-ENV',extract_env,40,'🏜️ البيئات الخمس','Five Environments'), ] PHYSICS_WEIGHTS = {'C11-DPM':0.10,'C12-CKB':0.10,'C13-3DTW':0.15,'C14-CWS':0.15,'C15-TCV':0.12,'C16-DWE':0.13,'C17-MVF':0.13,'C18-ENV':0.12} def physics_analyze(image): if image is None: empty = "
" empty += "
🔬
" empty += "
ارفع صورة X-Ray للتحليل الفيزيائي
" return empty, pd.DataFrame() start = time_module.time() img = physics_preprocess(image) if img is None: return "
خطأ
", pd.DataFrame() # PRMI: Get RGB version for color analysis if isinstance(image, np.ndarray): _pil_img = Image.fromarray(image) else: _pil_img = image img_rgb_full = np.array(_pil_img.convert('RGB').resize((224,224)), dtype=np.float32)/255.0 all_features = []; tech_results = {} H, W = img.shape quads = [img[:H//2,:W//2], img[:H//2,W//2:], img[H//2:,:W//2], img[H//2:,W//2:]] quad_means = [q.mean() for q in quads] quad_stds = [q.std() for q in quads] regional_var = np.std(quad_means) dark_ratio = (img < 0.2).sum() / (H*W) bright_ratio = (img > 0.8).sum() / (H*W) gm = np.sqrt(ndimage.sobel(img,0)**2 + ndimage.sobel(img,1)**2) edge_density = (gm > gm.mean() + 2*gm.std()).sum() / (H*W) strong_edge = (gm > gm.mean() + 3*gm.std()).sum() / (H*W) layers = [((img >= t) & (img < t+0.1)).sum()/(H*W) for t in np.linspace(0,1,10)] contrast = img.max() - img.min() bimodal = 0.0 hist_vals = np.histogram(img, bins=20)[0].astype(float) hist_vals = hist_vals / hist_vals.sum() peaks = np.where((hist_vals[1:-1] > hist_vals[:-2]) & (hist_vals[1:-1] > hist_vals[2:]))[0] bimodal = len(peaks) / 10.0 sharpness = strong_edge / (edge_density + 1e-8) metal_indicator = bright_ratio * 3.0 + sharpness * 2.0 + contrast * 1.5 uniformity = 1.0 - np.std([q.std() for q in quads]) * 5.0 uniformity = max(0, uniformity) cargo_normal = uniformity * 0.6 + (1 - regional_var * 5.0) * 0.4 cargo_normal = float(np.clip(cargo_normal, 0, 1)) # ML-based risk scoring (15 features) risk_features = [ regional_var, dark_ratio, bright_ratio, edge_density, strong_edge, contrast, img.mean(), img.std(), np.std(layers), np.std([q.std() for q in quads]), np.max(quad_means) - np.min(quad_means), gm.mean(), gm.std(), len(np.where(np.diff(np.sign(np.diff(np.histogram(img,20)[0]))))[0]), np.percentile(img, 90) - np.percentile(img, 10), ] if risk_model is not None: ml_risk_score = float(np.clip(risk_model.predict([risk_features])[0], 0, 1)) else: ml_risk_score = float(np.clip(regional_var*1.5 + dark_ratio*0.8 + edge_density*1.0, 0, 1)) img_suspicion = ml_risk_score for tc, func, dim, ar_name, en_name in PHYSICS_TECHNIQUES: try: feats = func(img) if len(feats) < dim: feats = np.pad(feats, (0, dim-len(feats))) feats = feats[:dim]; all_features.append(feats) nf = (feats - feats.min()) / (feats.max() - feats.min() + 1e-8) tech_var = nf.std() threat_boost = bright_ratio*3.0 + strong_edge*5.0 + (img.max()-img.min())*0.5 threat_boost = float(np.clip(threat_boost, 0, 1)) if ml_risk_score < 0.25: base = ml_risk_score * 0.9 elif ml_risk_score < 0.45: base = ml_risk_score * 0.85 + threat_boost * 0.15 else: base = max(ml_risk_score, threat_boost * 0.7) if tc == 'C11-DPM': score = float(np.clip(base*0.85 + regional_var*2.0 + tech_var*0.2, 0, 1)) elif tc == 'C12-CKB': score = float(np.clip(base*0.80 + tech_var*0.3 + threat_boost*0.2, 0, 1)) elif tc == 'C13-3DTW': score = float(np.clip(base*0.75 + strong_edge*4.0 + tech_var*0.2, 0, 1)) elif tc == 'C14-CWS': score = float(np.clip(base*0.80 + np.std(quad_stds)*2.0 + threat_boost*0.15, 0, 1)) elif tc == 'C15-TCV': score = float(np.clip(base*0.80 + abs(nf[len(nf)//3:].mean()-nf[:len(nf)//3].mean())*2.0, 0, 1)) elif tc == 'C16-DWE': score = float(np.clip(base*0.75 + dark_ratio*2.5 + threat_boost*0.2, 0, 1)) elif tc == 'C17-MVF': score = float(np.clip(base*0.80 + np.std(layers)*2.0 + threat_boost*0.15, 0, 1)) elif tc == 'C18-ENV': score = float(np.clip(base*0.80 + np.std(layers)*1.5 + threat_boost*0.15, 0, 1)) else: score = float(np.clip(base, 0, 1)) tech_results[tc] = {'dim':dim,'ar':ar_name,'en':en_name,'score':score,'features':nf} except: all_features.append(np.zeros(dim)) tech_results[tc] = {'dim':dim,'ar':ar_name,'en':en_name,'score':0,'features':np.zeros(dim)} fv = np.concatenate(all_features); fv = (fv-fv.min())/(fv.max()-fv.min()+1e-8) elapsed = time_module.time()-start final_score = sum(tech_results[k]['score']*PHYSICS_WEIGHTS[k] for k in tech_results) final_score = float(np.clip(final_score, 0, 1)) if final_score >= 0.70: rl,rc,ri = 'حرج','#dc2626','🔴' elif final_score >= 0.50: rl,rc,ri = 'عالي','#ea580c','🟠' elif final_score >= 0.30: rl,rc,ri = 'متوسط','#ca8a04','🟡' else: rl,rc,ri = 'منخفض','#16a34a','🟢' fig = plt.figure(figsize=(14, 28)); fig.patch.set_facecolor('#0a1628') cl8 = ['#E53935','#FB8C00','#FDD835','#43A047','#1E88E5','#8E24AA','#F4511E','#00897B'] ax1 = fig.add_subplot(9, 2, 1) names = [tc.split('-')[1] for tc in tech_results]; scores = [info['score'] for info in tech_results.values()] bars = ax1.barh(names, scores, color=cl8, edgecolor='white', linewidth=0.5) ax1.set_xlim(0, 1); ax1.set_facecolor('#0d1f3c'); ax1.tick_params(colors='white', labelsize=8) ax1.set_title('Technique Scores', color='#ffd740', fontsize=11, fontweight='bold') ax1.axvline(x=0.3, color='#FDD835', linestyle='--', alpha=0.5) ax1.axvline(x=0.5, color='#FB8C00', linestyle='--', alpha=0.5) ax1.axvline(x=0.7, color='#E53935', linestyle='--', alpha=0.5) for bar, s in zip(bars, scores): ax1.text(s+0.02, bar.get_y()+bar.get_height()/2, f'{s:.2f}', va='center', color='white', fontsize=8) ax2 = fig.add_subplot(9, 2, 2) ax2.imshow(img, cmap='hot', aspect='auto'); ax2.set_title('X-Ray Density Map', color='#ffd740', fontsize=11, fontweight='bold') ax2.set_facecolor('#0d1f3c'); ax2.tick_params(colors='white', labelsize=7) from scipy import ndimage as _ndi H, W = img.shape quads = [img[:H//2,:W//2], img[:H//2,W//2:], img[H//2:,:W//2], img[H//2:,W//2:]] gm = np.sqrt(_ndi.sobel(img,0)**2 + _ndi.sobel(img,1)**2) vis_titles_ar = ['تحليل كثافة الأرباع الأربعة','مطابقة 8 أنماط تهريب معروفة','تحليل الموجة الحرارية ثلاثية الأبعاد','مسح العاصفة الدائرية متعددة الحلقات','3 أعاصير حلزونية تمسح الصورة','تعزيز وإضاءة المناطق المعتمة','تحليل الطين والبركان والضباب','تصنيف 5 بيئات سطوعية مختلفة'] tech_list = list(tech_results.items()) for idx in range(8): tc, info = tech_list[idx]; nf = info['features']; sc = info['score'] bc = '#E53935' if sc > 0.6 else '#FB8C00' if sc > 0.3 else '#43A047' ax_vis = fig.add_subplot(9, 2, 3 + idx*2) if idx == 0: quad_img = np.zeros_like(img) qm = [q.mean() for q in quads] quad_img[:H//2,:W//2] = qm[0]; quad_img[:H//2,W//2:] = qm[1] quad_img[H//2:,:W//2] = qm[2]; quad_img[H//2:,W//2:] = qm[3] ax_vis.imshow(quad_img, cmap='RdYlGn_r', aspect='auto') ax_vis.axhline(y=H//2, color='white', linewidth=2); ax_vis.axvline(x=W//2, color='white', linewidth=2) for qi, (qy,qx) in enumerate([(H//4,W//4),(H//4,3*W//4),(3*H//4,W//4),(3*H//4,3*W//4)]): ax_vis.text(qx, qy, f'Q{qi+1}:{qm[qi]:.2f}', ha='center', va='center', color='white', fontsize=8, fontweight='bold', bbox=dict(boxstyle='round', facecolor='black', alpha=0.6)) elif idx == 1: susp = np.zeros_like(img); susp[img < 0.2] = 0.9; susp[img > 0.8] = 0.7; susp[(img > 0.3) & (img < 0.7)] = 0.2 ax_vis.imshow(img * 0.5 + susp * 0.5, cmap='YlOrRd', aspect='auto') elif idx == 2: from scipy.ndimage import gaussian_filter thermal = gaussian_filter(img, sigma=3) - gaussian_filter(img, sigma=8) ax_vis.imshow(np.abs(thermal), cmap='inferno', aspect='auto') elif idx == 3: cy, cx = H//2, W//2; Y, X = np.mgrid[0:H, 0:W] dist = np.sqrt((X-cx)**2 + (Y-cy)**2) ax_vis.imshow(np.abs(np.sin(dist * 0.1) * img), cmap='plasma', aspect='auto') elif idx == 4: ax_vis.imshow(np.abs(img - np.fliplr(img)), cmap='coolwarm', aspect='auto') elif idx == 5: dark_e = np.zeros_like(img); dm = img < 0.3; dark_e[dm] = 1.0 - img[dm]; dark_e[~dm] = img[~dm] * 0.3 ax_vis.imshow(dark_e, cmap='bone_r', aspect='auto') elif idx == 6: rgb = np.zeros((H, W, 3)); rgb[:,:,0] = np.clip((img < 0.3).astype(float) * 2, 0, 1) rgb[:,:,1] = np.clip(gm * 5, 0, 1); rgb[:,:,2] = np.clip((img > 0.7).astype(float) * 2, 0, 1) ax_vis.imshow(rgb, aspect='auto') elif idx == 7: env = np.digitize(img, [0.15, 0.35, 0.55, 0.75]) / 4.0 ax_vis.imshow(env, cmap='viridis', aspect='auto') ax_vis.set_title(f'{tc} [{sc:.2f}]', color=bc, fontsize=9, fontweight='bold') ax_vis.set_xlabel(vis_titles_ar[idx], color='#aaa', fontsize=8) ax_vis.set_facecolor('#0d1f3c'); ax_vis.tick_params(colors='white', labelsize=6) ax_feat = fig.add_subplot(9, 2, 4 + idx*2) if idx == 0: ax_feat.bar(['Asym','StdD','Var','Dev'][:len(nf)], nf, color=cl8[0]) elif idx == 1: ax_feat.bar(['Drug','Pill','Pwd','Liq','Mtl','Pls','Cig','Cash'][:len(nf)], nf, color=cl8[1]) elif idx == 4: for k in range(min(3, len(nf)//14+1)): seg = nf[k*14:(k+1)*14] ax_feat.plot(seg, label=f'C{k+1}', color=cl8[4+k], linewidth=1.5) ax_feat.legend(fontsize=7, facecolor='#0d1f3c', labelcolor='white') else: ax_feat.plot(nf, color=cl8[idx], linewidth=1.5); ax_feat.fill_between(range(len(nf)), nf, alpha=0.3, color=cl8[idx]) ax_feat.set_title(f'{tc} Features [{sc:.2f}]', color=bc, fontsize=9, fontweight='bold') ax_feat.set_facecolor('#0d1f3c'); ax_feat.tick_params(colors='white', labelsize=6) plt.tight_layout(pad=1.5) buf = io.BytesIO(); fig.savefig(buf, format='png', dpi=100, bbox_inches='tight', facecolor='#0a1628'); buf.seek(0) img_b64 = base64.b64encode(buf.read()).decode('utf-8'); plt.close(fig) # ═══ Generate 8 Animated GIFs ═══ from PIL import Image as PILImage gif_b64_list = [] H_g, W_g = img.shape gm_g = np.sqrt(ndimage.sobel(img,0)**2 + ndimage.sobel(img,1)**2) n_frames = 12 for tidx in range(8): pil_frames = [] for fi in range(n_frames): t = fi / (n_frames - 1) canvas = np.zeros((H_g, W_g, 3), dtype=np.uint8) if tidx == 0: # DPM - quadrant scan scan_y = int(t * H_g) vis = (img * 255).astype(np.uint8) canvas[:,:,0] = vis; canvas[:,:,1] = vis; canvas[:,:,2] = vis if scan_y > 0: above = img[:scan_y, :] qm = above.mean() heat = np.clip((np.abs(img[:scan_y,:] - qm) * 5 * 255), 0, 255).astype(np.uint8) canvas[:scan_y,:,0] = np.clip(vis[:scan_y,:].astype(int) + heat.astype(int), 0, 255).astype(np.uint8) canvas[:scan_y,:,1] = np.clip(vis[:scan_y,:].astype(int) - heat.astype(int)//2, 0, 255).astype(np.uint8) canvas[:scan_y,:,2] = vis[:scan_y,:] canvas[max(0,scan_y-2):min(H_g,scan_y+2),:,1] = 255 elif tidx == 1: # CKB - pattern matching scan vis = (img * 255).astype(np.uint8) scan_x = int(t * W_g) canvas[:,:,0] = vis; canvas[:,:,1] = vis; canvas[:,:,2] = vis if scan_x > 0: region = img[:, :scan_x] dark_m = (region < 0.2) bright_m = (region > 0.8) red = np.zeros_like(region, dtype=np.uint8) red[dark_m] = 255 orange = np.zeros_like(region, dtype=np.uint8) orange[bright_m] = 200 canvas[:,:scan_x,0] = np.clip(vis[:,:scan_x].astype(int) + red.astype(int), 0, 255).astype(np.uint8) canvas[:,:scan_x,1] = np.clip(orange.astype(int), 0, 255).astype(np.uint8) canvas[:,:scan_x,2] = 0 canvas[:,max(0,scan_x-2):min(W_g,scan_x+2),2] = 255 elif tidx == 2: # 3DTW - thermal wave sweep from scipy.ndimage import gaussian_filter sigma = 2 + t * 8 thermal = np.abs(gaussian_filter(img, sigma=max(1,sigma-3)) - gaussian_filter(img, sigma=sigma)) thermal_n = np.clip(thermal / (thermal.max()+1e-8) * 255, 0, 255).astype(np.uint8) canvas[:,:,0] = thermal_n canvas[:,:,1] = np.clip(thermal_n // 2, 0, 255).astype(np.uint8) canvas[:,:,2] = 0 elif tidx == 3: # CWS - expanding circular wave cy, cx = H_g//2, W_g//2 Y, X = np.mgrid[0:H_g, 0:W_g] dist = np.sqrt((X-cx)**2 + (Y-cy)**2) max_r = np.sqrt(cx**2 + cy**2) radius = t * max_r wave = np.sin((dist - radius) * 0.15) * np.exp(-np.abs(dist - radius) / (max_r*0.15)) wave_vis = np.clip((wave * 0.5 + 0.5) * img * 255, 0, 255).astype(np.uint8) ring_mask = (np.abs(dist - radius) < 5) canvas[:,:,0] = wave_vis canvas[:,:,1] = np.clip(wave_vis // 2, 0, 255).astype(np.uint8) canvas[:,:,2] = wave_vis canvas[ring_mask, 1] = 255 elif tidx == 4: # TCV - 3 rotating cyclones angle = t * 2 * np.pi Y, X = np.mgrid[0:H_g, 0:W_g] combined = np.zeros((H_g, W_g)) centers = [(H_g//4, W_g//4), (H_g//4, 3*W_g//4), (3*H_g//4, W_g//2)] for ci, (cy, cx) in enumerate(centers): dx = X - cx; dy = Y - cy r = np.sqrt(dx**2 + dy**2) + 1e-8 theta = np.arctan2(dy, dx) + angle + ci*2.094 spiral = np.sin(r*0.05 + theta*3) * np.exp(-r/(max(H_g,W_g)*0.3)) combined += spiral combined = np.clip((combined * 0.3 + 0.5) * img, 0, 1) canvas[:,:,0] = np.clip(combined * 200, 0, 255).astype(np.uint8) canvas[:,:,1] = np.clip(combined * 100, 0, 255).astype(np.uint8) canvas[:,:,2] = np.clip(combined * 255, 0, 255).astype(np.uint8) elif tidx == 5: # DWE - light sweeping dark regions vis = (img * 255).astype(np.uint8) sweep_x = int(t * W_g) dark_mask = img < 0.3 canvas[:,:,0] = vis; canvas[:,:,1] = vis; canvas[:,:,2] = vis if sweep_x > 0: reveal = np.zeros((H_g, sweep_x), dtype=np.uint8) dm_region = dark_mask[:, :sweep_x] enhanced = np.clip((1.0 - img[:,:sweep_x]) * 255, 0, 255).astype(np.uint8) canvas[:,:sweep_x,0] = np.where(dm_region, enhanced, vis[:,:sweep_x]) canvas[:,:sweep_x,1] = np.where(dm_region, enhanced, vis[:,:sweep_x]) canvas[:,:sweep_x,2] = np.where(dm_region, np.clip(enhanced//2, 0, 255), vis[:,:sweep_x]) canvas[:,max(0,sweep_x-3):min(W_g,sweep_x+3),0] = 255 canvas[:,max(0,sweep_x-3):min(W_g,sweep_x+3),1] = 255 canvas[:,max(0,sweep_x-3):min(W_g,sweep_x+3),2] = 0 elif tidx == 6: # MVF - mud(red) volcano(green) fog(blue) layers vis = (img * 255).astype(np.uint8) layer = int(t * 3) canvas[:,:,0] = vis; canvas[:,:,1] = vis; canvas[:,:,2] = vis if layer >= 0: mud = np.clip((img < 0.3).astype(float) * (1-img) * 255 * min(t*3, 1), 0, 255).astype(np.uint8) canvas[:,:,0] = np.clip(vis.astype(int) + mud.astype(int), 0, 255).astype(np.uint8) if layer >= 1: volc = np.clip(gm_g * 5 * 255 * min((t-0.33)*3, 1), 0, 255).astype(np.uint8) canvas[:,:,1] = np.clip(vis.astype(int) + volc.astype(int), 0, 255).astype(np.uint8) if layer >= 2: fog = np.clip((img > 0.7).astype(float) * img * 255 * min((t-0.66)*3, 1), 0, 255).astype(np.uint8) canvas[:,:,2] = np.clip(vis.astype(int) + fog.astype(int), 0, 255).astype(np.uint8) elif tidx == 7: # ENV - 5 environments appearing one by one env_step = int(t * 5) canvas_f = np.zeros((H_g, W_g, 3)) colors = [(0.2,0,0.4), (0,0.2,0.5), (0,0.5,0.3), (0.5,0.5,0), (0.8,0.3,0)] thresholds = [0.15, 0.35, 0.55, 0.75, 1.01] prev_t = 0 for ei in range(min(env_step+1, 5)): mask = (img >= prev_t) & (img < thresholds[ei]) for ch in range(3): canvas_f[:,:,ch] = np.where(mask, colors[ei][ch] + img*0.5, canvas_f[:,:,ch]) prev_t = thresholds[ei] canvas = np.clip(canvas_f * 255, 0, 255).astype(np.uint8) pil_frames.append(PILImage.fromarray(canvas)) gif_buf = io.BytesIO() pil_frames[0].save(gif_buf, format='GIF', save_all=True, append_images=pil_frames[1:], duration=250, loop=0) gif_buf.seek(0) gif_b64_list.append(base64.b64encode(gif_buf.read()).decode('utf-8')) gif_names = ['C11-DPM: مسح الكثافة','C12-CKB: كشف أنماط التهريب','C13-3DTW: الموجة الحرارية','C14-CWS: العاصفة الدائرية','C15-TCV: الأعاصير الثلاثة','C16-DWE: إضاءة الظلام','C17-MVF: طين+بركان+ضباب','C18-ENV: البيئات الخمسة'] h = "
" h += f"
" h += f"
🔬 التحليل الفيزيائي — 8 تقنيات — {len(fv)} بُعد
" h += f"
" h += f"
{ri} خطورة الإخفاء: {rl} ({final_score:.1%})
" h += "
" for val,lbl,bg in [(f'{len(fv)}D','الأبعاد','#E3F2FD'),('8','تقنيات','#F3E5F5'),(f'{elapsed:.1f}s','الوقت','#E8F5E9'),(f'{fv.mean():.3f}','المتوسط','#FFF3E0')]: h += f"
{val}
{lbl}
" h += "
" h += f"
" h += "
🌊 التحليل الديناميكي — 8 تقنيات متحركة
" h += "
" for gi in range(8): gsc = list(tech_results.values())[gi]['score'] gbc = '#dc2626' if gsc > 0.6 else '#ea580c' if gsc > 0.3 else '#16a34a' h += f"
" h += f"
{gif_names[gi]} [{gsc:.0%}]
" h += f"
" h += "
" old_chart_img = "SKIP" h += f"
" h += "" h += "" tcl = ['#1565C0','#2E7D32','#6A1B9A','#E65100','#00838F','#D81B60','#F57C00','#00695C'] for idx,(tc,info) in enumerate(tech_results.items()): bg = '#f8f9fa' if idx%2==0 else '#fff'; sc = info['score']; bw = int(sc*100) bc2 = '#E53935' if sc>0.6 else '#FB8C00' if sc>0.3 else '#43A047' h += f"" h += "
التقنيةENالأبعادالدرجةمؤشر
({tc}) {info['ar']}{info['en']}{info['dim']}D{sc:.3f}
" h += "
" h += "
📊 دليل الألوان ومستويات الخطورة
" h += "
" h += "
🟢 منخفض
0% - 30%
بضاعة طبيعية
" h += "
🟡 متوسط
30% - 50%
يحتاج مراجعة
" h += "
🟠 عالي
50% - 70%
فحص يدوي
" h += "
🔴 حرج
70% - 100%
إيقاف وتفتيش
" h += "
" h += "
" h += "
" h += "
🔥 DPM: يقيس فروقات الكثافة بين أرباع الصورة — كثافة غير متجانسة = مشبوه
" h += "
📋 CKB: يطابق 8 أنماط تهريب عراقية (مخدرات/حبوب/مسحوق/سائل/معدن/بلاستيك/سجائر/نقود)
" h += "
🌊 3DTW: يحوّل الصورة لسطح ثلاثي الأبعاد ويكشف الموجات الحرارية الشاذة
" h += "
🌀 CWS: يمسح 8 حلقات دائرية + 4 عواصف اتجاهية لكشف الأنماط المخفية
" h += "
🌪️ TCV: 3 أعاصير حلزونية تأخذ عينات من يسار/وسط/يمين الصورة
" h += "
🔦 DWE: يعزز المناطق المعتمة حيث يختبئ التهريب عادةً
" h += "
🌋 MVF: يحلل طبقات الكثافة (طين) + مستويات الانفجار (بركان) + الرؤية (ضباب)
" h += "
🏜️ ENV: يقسم الصورة إلى 5 بيئات سطوعية ويحلل كل واحدة
" h += "
" h += "
المعادلة: C = 0.10×DPM + 0.10×CKB + 0.15×3DTW + 0.15×CWS + 0.12×TCV + 0.13×DWE + 0.13×MVF + 0.12×ENV
" h += "
" h += f"
📊 {len(fv)}D | Mean: {fv.mean():.4f} | Std: {fv.std():.4f} | Non-zero: {(fv!=0).sum()}/{len(fv)}
" # ═══ C19-VTA: Visual Threat Alert System ═══ from PIL import Image as PILImage, ImageDraw, ImageFont from scipy import ndimage as vta_ndi H_v, W_v = img.shape grid_rows, grid_cols = 8, 12 cell_h, cell_w = H_v // grid_rows, W_v // grid_cols densities_grid = np.zeros((grid_rows, grid_cols)) variance_grid = np.zeros((grid_rows, grid_cols)) edge_grid = np.zeros((grid_rows, grid_cols)) gm_vta = np.sqrt(vta_ndi.sobel(img,0)**2 + vta_ndi.sobel(img,1)**2) for r in range(grid_rows): for c in range(grid_cols): cell = img[r*cell_h:(r+1)*cell_h, c*cell_w:(c+1)*cell_w] cell_edge = gm_vta[r*cell_h:(r+1)*cell_h, c*cell_w:(c+1)*cell_w] densities_grid[r, c] = cell.mean() variance_grid[r, c] = cell.std() edge_grid[r, c] = cell_edge.mean() g_mean = densities_grid.mean() g_std = densities_grid.std() TH_CAUTION = 0.1644 TH_WARNING = 0.1762 TH_CRITICAL = 0.2229 TH_HIGH_DENSITY = 0.4606 alerts = [] for r in range(grid_rows): for c in range(grid_cols): d = densities_grid[r, c] v = variance_grid[r, c] e = edge_grid[r, c] neighbors = [] for dr, dc in [(-1,0),(1,0),(0,-1),(0,1),(-1,-1),(-1,1),(1,-1),(1,1)]: nr, nc = r+dr, c+dc if 0 <= nr < grid_rows and 0 <= nc < grid_cols: neighbors.append(densities_grid[nr, nc]) n_mean = np.mean(neighbors) if neighbors else g_mean n_diff = abs(d - n_mean) is_border = r == 0 or r == grid_rows-1 or c == 0 or c == grid_cols-1 level = 'safe' if is_border: level = 'safe' elif n_diff > TH_CRITICAL and d > TH_HIGH_DENSITY: level = 'critical' elif n_diff > TH_CRITICAL: level = 'warning' elif n_diff > TH_WARNING and d > TH_HIGH_DENSITY: level = 'warning' elif n_diff > TH_CAUTION and d > TH_HIGH_DENSITY and e > 0.3: level = 'caution' if level != 'safe': alerts.append({'r':r,'c':c,'density':d,'n_diff':n_diff,'var':v,'level':level, 'x1':c*cell_w,'y1':r*cell_h,'x2':(c+1)*cell_w,'y2':(r+1)*cell_h}) img_rgb = np.stack([np.clip(img*255,0,255).astype(np.uint8)]*3, axis=-1) vta_pil = PILImage.fromarray(img_rgb) draw = ImageDraw.Draw(vta_pil) colors_map = {'critical':'#FF0000','warning':'#FFA500','caution':'#FFFF00'} widths_map = {'critical':4,'warning':3,'caution':2} icons_map = {'critical':'🔴','warning':'🟡','caution':'🟢'} for alert in alerts: color = colors_map[alert['level']] width = widths_map[alert['level']] x1,y1,x2,y2 = alert['x1'],alert['y1'],alert['x2'],alert['y2'] draw.rectangle([x1,y1,x2,y2], outline=color, width=width) if alert['level'] == 'critical': for offset in range(-2,3): draw.rectangle([x1+offset,y1+offset,x2-offset,y2-offset], outline=color, width=1) label = f"{alert['density']:.0%}" draw.text((x1+4, y1+4), label, fill=color) critical_count = sum(1 for a in alerts if a['level']=='critical') warning_count = sum(1 for a in alerts if a['level']=='warning') caution_count = sum(1 for a in alerts if a['level']=='caution') total_threat = (critical_count * 3 + warning_count * 2 + caution_count * 1) max_threat = grid_rows * grid_cols * 3 threat_pct = min(total_threat / max(max_threat * 0.15, 1), 1.0) if critical_count > 0: badge_color = '#FF0000' badge_text = f'⛔ {critical_count} مناطق حرجة' elif warning_count > 0: badge_color = '#FFA500' badge_text = f'⚠️ {warning_count} مناطق مشبوهة' else: badge_color = '#16a34a' badge_text = '✅ لا توجد تهديدات' badge_h = min(80, H_v // 8) draw.rectangle([0, 0, W_v, badge_h], fill='#000000') draw.text((10, 5), badge_text, fill=badge_color) draw.text((10, badge_h//2), f'Threat: {threat_pct:.0%} | Critical:{critical_count} Warning:{warning_count} Caution:{caution_count}', fill='white') vta_frames = [] for fi in range(15): frame = vta_pil.copy() fdraw = ImageDraw.Draw(frame) pulse = abs(np.sin(fi * np.pi / 7)) for alert in alerts: if alert['level'] == 'critical': color_r = int(255 * pulse) color_hex = f'#{color_r:02x}0000' x1,y1,x2,y2 = alert['x1'],alert['y1'],alert['x2'],alert['y2'] expand = int(3 * pulse) fdraw.rectangle([x1-expand,y1-expand,x2+expand,y2+expand], outline=color_hex, width=3) cx, cy = (x1+x2)//2, (y1+y2)//2 radius = int(min(cell_w, cell_h) * 0.4 * pulse) fdraw.ellipse([cx-radius, cy-radius, cx+radius, cy+radius], outline='#FF0000', width=2) elif alert['level'] == 'warning': if fi % 3 == 0: x1,y1,x2,y2 = alert['x1'],alert['y1'],alert['x2'],alert['y2'] fdraw.rectangle([x1,y1,x2,y2], outline='#FFA500', width=3) vta_frames.append(frame) vta_gif_buf = io.BytesIO() vta_frames[0].save(vta_gif_buf, format='GIF', save_all=True, append_images=vta_frames[1:], duration=300, loop=0) vta_gif_buf.seek(0) vta_gif_b64 = base64.b64encode(vta_gif_buf.read()).decode('utf-8') vta_static_buf = io.BytesIO() vta_pil.save(vta_static_buf, format='PNG') vta_static_buf.seek(0) vta_static_b64 = base64.b64encode(vta_static_buf.read()).decode('utf-8') h += "
" h += "
🚨 C19-VTA: نظام الإنذار البصري المروري
" h += "
Visual Threat Alert — Traffic Light Annotation System
" h += "
" for val,lbl,bg in [(f'{critical_count}','حرج 🔴','#FFCDD2'),(f'{warning_count}','تحذير 🟡','#FFF9C4'),(f'{caution_count}','تنبيه 🟢','#E8F5E9'),(f'{threat_pct:.0%}','مستوى التهديد','#E3F2FD')]: h += f"
{val}
{lbl}
" h += "
" h += f"
" h += f"
{badge_text}
" h += f"
" h += "
" h += "
📸 خريطة التهديد الثابتة
" h += f"
" if alerts: h += "
" h += "
📋 تفاصيل المناطق المشبوهة:
" for i, alert in enumerate(sorted(alerts, key=lambda x: -{'critical':3,'warning':2,'caution':1}[x['level']])): icon = icons_map[alert['level']] h += f"
{icon} المنطقة [{alert['r']},{alert['c']}] — الكثافة: {alert['density']:.1%} — فرق الجيران: {alert['n_diff']:.3f} — التباين: {alert['var']:.3f}
" h += "
" h += "
" h += "🚦 دليل الإشارات المرورية:
" h += "🔴 حرج: كثافة أدوية/مخدرات + تباين مفاجئ مع الجيران (مربع أحمر نابض)
" h += "🟡 تحذير: كثافة مشبوهة متجانسة أو انحراف كبير عن المعدل (مربع برتقالي)
" h += "🟢 تنبيه: تباين خفيف يستحق المراجعة (مربع أصفر)
" h += "✅ آمن: كثافة طبيعية متوافقة مع البضاعة المصرّحة
" # ═══ PRMI v7: Direction-Aware Material Analysis ═══ try: import pickle as prmi_pickle from scipy.stats import skew as sp_skew prmi_path = os.path.join(os.path.dirname(__file__), 'prmi_model.pkl') if not os.path.exists(prmi_path): from huggingface_hub import hf_hub_download prmi_path = hf_hub_download(repo_id='DrAbbas/SONAR-AI', filename='prmi_model.pkl') with open(prmi_path, 'rb') as pf: prmi_data = prmi_pickle.load(pf) prmi_sigs = prmi_data.get('signatures', {}) gm_prmi = np.sqrt(ndimage.sobel(img,0)**2 + ndimage.sobel(img,1)**2) prmi_gr, prmi_gc = 8, 12 prmi_ch, prmi_cw = H_v // prmi_gr, W_v // prmi_gc # Extract 18D features for all cells all_cells = [] prmi_results = [] main_mat = 'unknown' from collections import Counter as PrmiCounter for r in range(1, prmi_gr-1): for c in range(1, prmi_gc-1): pg = img[r*prmi_ch:(r+1)*prmi_ch, c*prmi_cw:(c+1)*prmi_cw] pe = gm_prmi[r*prmi_ch:(r+1)*prmi_ch, c*prmi_cw:(c+1)*prmi_cw] d = pg.mean() if 0.05 < d < 0.95: t_v = pg.std() e_v = pe.mean() flat = pg.flatten() try: sk = float(sp_skew(flat)) except: sk = 0.0 hist_p, _ = np.histogram(pg, bins=16, range=(0,1)) hist_p = hist_p / hist_p.sum() + 1e-10 ent = float(-np.sum(hist_p * np.log2(hist_p))) nb = [] for dr, dc in [(-1,0),(1,0),(0,-1),(0,1)]: nr, nc = r+dr, c+dc if 0<=nr 40%) before computing stats fg_cells = [c for c in all_cells if c.get('bp', 0.33) <= 0.40] if not fg_cells: fg_cells = all_cells # fallback if all cells are background img_d_med = np.median([c['d'] for c in fg_cells]) img_d_std = np.std([c['d'] for c in fg_cells]) + 0.005 img_sk_med = np.median([c['skew'] for c in fg_cells]) img_sk_std = np.std([c['skew'] for c in fg_cells]) + 0.01 img_ent_med = np.median([c['entropy'] for c in fg_cells]) img_ent_std = np.std([c['entropy'] for c in fg_cells]) + 0.01 # Direction-aware + Color anomaly detection # Check if image is colored (R/G != 1.0) rgb_r_prmi = np.array(Image.fromarray((img_rgb_full*255).astype(np.uint8)).resize((W_v, H_v)), dtype=np.float32)/255.0 rg_vals = [] bp_vals = [] for cell in all_cells: pr = rgb_r_prmi[cell['r']*prmi_ch:(cell['r']+1)*prmi_ch, cell['c']*prmi_cw:(cell['c']+1)*prmi_cw] R_p=pr[:,:,0].mean(); G_p=pr[:,:,1].mean(); B_p=pr[:,:,2].mean() total_p=R_p+G_p+B_p+1e-8 cell['rg'] = R_p/(G_p+1e-8) cell['bp'] = B_p/total_p rg_vals.append(cell['rg']) bp_vals.append(cell['bp']) # Use only foreground cells for stats fg_rg = [c['rg'] for c in all_cells if c.get('bp', 0.33) <= 0.40] fg_bp = [c['bp'] for c in all_cells if c.get('bp', 0.33) <= 0.40] if not fg_rg: fg_rg = rg_vals fg_bp = bp_vals rg_med = np.median(fg_rg); rg_std_v = np.std(fg_rg) + 0.01 bp_med = np.median(fg_bp); bp_std_v = np.std(fg_bp) + 0.01 is_colored = abs(rg_med - 1.0) > 0.05 # Find main material min_dist = 999 e_med_v = np.median([c['e'] for c in fg_cells]) t_med_v = np.median([c['t'] for c in fg_cells]) ent_med_v = np.median([c['entropy'] for c in fg_cells]) for cls, sig in prmi_sigs.items(): if is_colored: dist = (abs(img_d_med - sig.get('d',0)) / (sig.get('d_std',0.1)+0.01) * 0.20 + abs(rg_med - sig.get('rg',1.0)) / (sig.get('rg_std',0.1)+0.01) * 0.30 + abs(bp_med - sig.get('bp',0.2)) / (sig.get('bp_std',0.05)+0.01) * 0.25 + abs(e_med_v - sig.get('e',0.3)) / (sig.get('e_std',0.1)+0.01) * 0.15 + abs(t_med_v - sig.get('t',0.1)) / (sig.get('t_std',0.05)+0.01) * 0.10) else: dist = (abs(img_d_med - sig.get('d',0)) / (sig.get('d_std',0.1)+0.01) * 0.35 + abs(e_med_v - sig.get('e',0.3)) / (sig.get('e_std',0.1)+0.01) * 0.25 + abs(t_med_v - sig.get('t',0.1)) / (sig.get('t_std',0.05)+0.01) * 0.20 + abs(ent_med_v - sig.get('ent',2.0)) / (sig.get('ent_std',0.5)+0.01) * 0.20) if dist < min_dist: min_dist = dist; main_mat = cls for cell in all_cells: # v8: Skip background cells if cell.get('bp', 0.33) > 0.40: continue z_d_low = (img_d_med - cell['d']) / img_d_std z_sk_high = (cell['skew'] - img_sk_med) / img_sk_std z_ent_low = (img_ent_med - cell['entropy']) / img_ent_std score_dir = z_d_low * 0.4 + z_sk_high * 0.4 + z_ent_low * 0.2 z_rg = abs(cell.get('rg',1.0) - rg_med) / rg_std_v z_bp = abs(cell.get('bp',0.33) - bp_med) / bp_std_v score_col = z_rg * 0.5 + z_bp * 0.5 score = score_dir * 0.3 + score_col * 0.7 if is_colored else score_dir if score > 3.0 or (is_colored and score_col > 4.0): # Find closest class for labeling cell_mat = 'unknown' cell_dist = 999 for cls, sig in prmi_sigs.items(): if is_colored: dist = (abs(cell['d'] - sig.get('d',0)) / (sig.get('d_std',0.1)+0.01) * 0.20 + abs(cell.get('rg',1.0) - sig.get('rg',1.0)) / (sig.get('rg_std',0.1)+0.01) * 0.30 + abs(cell.get('bp',0.33) - sig.get('bp',0.2)) / (sig.get('bp_std',0.05)+0.01) * 0.25 + abs(cell.get('e',0.3) - sig.get('e',0.3)) / (sig.get('e_std',0.1)+0.01) * 0.15 + abs(cell.get('t',0.1) - sig.get('t',0.1)) / (sig.get('t_std',0.05)+0.01) * 0.10) else: dist = (abs(cell['d'] - sig.get('d',0)) / (sig.get('d_std',0.1)+0.01) * 0.35 + abs(cell.get('e',0.3) - sig.get('e',0.3)) / (sig.get('e_std',0.1)+0.01) * 0.25 + abs(cell.get('t',0.1) - sig.get('t',0.1)) / (sig.get('t_std',0.05)+0.01) * 0.20 + abs(cell.get('entropy',2.0) - sig.get('ent',2.0)) / (sig.get('ent_std',0.5)+0.01) * 0.20) if dist < cell_dist: cell_dist = dist; cell_mat = cls level = 'critical' if score > 5.0 else 'warning' if score > 4.0 else 'caution' prmi_results.append({'r':cell['r'],'c':cell['c'],'pred':cell_mat, 'score':score,'level':level,'density':cell['d'], 'skew':cell['skew'],'entropy':cell['entropy']}) # PRMI v8.3: NO boxes, NO map anomaly_cnt = len(prmi_results) mat_names_ar = {'banana':'موز','milk':'حليب','medical':'أدوية','weapons':'أسلحة','electronics':'إلكترونيات','clothes':'ملابس','chemicals':'كيماويات','steel':'فولاذ','glass':'زجاج','plastic':'بلاستيك'} h += "
" h += "
🧬 PRMI v8 (43 مادة)
" if anomaly_cnt > 0: anom_mats = list(set(pr['pred'] for pr in prmi_results))[:3] anom_ar = [mat_names_ar.get(m, m) for m in anom_mats] h += "
" h += "
⚠️ مواد مخفية: " + ' + '.join(anom_ar) + "
" h += "
" + str(anomaly_cnt) + " منطقة
" h += "
" else: h += "
" h += "
✅ الحاوية متجانسة — لا مواد مخفية
" h += "
" h += "
" except Exception as prmi_err: h += f"
🔴 PRMI Error: {str(prmi_err)}
" rows = [{'#':tc,'التقنية':info['ar'],'EN':info['en'],'الأبعاد':f"{info['dim']}D",'الدرجة':f"{info['score']:.4f}",'الوزن':f"{PHYSICS_WEIGHTS[tc]:.2f}"} for tc,info in tech_results.items()] return h, pd.DataFrame(rows) import requests as http_requests def fetch_db_stats(): """جلب إحصائيات من السيرفر""" try: r = http_requests.get(f"{API_URL}/stats", timeout=5) if r.status_code == 200: return r.json() except: pass return None def fetch_db_anomalies(): """جلب آخر المخالفات""" try: r = http_requests.get(f"{API_URL}/anomalies?limit=20", timeout=5) if r.status_code == 200: data = r.json() if data: return pd.DataFrame(data) except: pass return None def refresh_dashboard(): """تحديث لوحة التحكم من قاعدة البيانات""" stats = fetch_db_stats() if stats: return generate_dashboard_html(stats), get_anomalies_from_db() return generate_dashboard_html(None), get_anomalies() def get_anomalies_from_db(): """جلب المخالفات من DB""" df = fetch_db_anomalies() if df is not None and len(df) > 0: # تحويل أسماء الأعمدة للعربية col_map = { 'ID': 'الرقم', 'ContainerNumber': 'الحاوية', 'Category': 'الأصناف', 'DescriptionEN': 'الوصف', 'RiskLevel': 'الخطورة', 'ActualGoods': 'المحتوى', 'CreatedDate': 'التاريخ' } df = df.rename(columns={k:v for k,v in col_map.items() if k in df.columns}) return df return get_anomalies() def generate_dashboard_html(stats): """HTML لوحة التحكم مع بيانات حقيقية""" if stats is None: try: return stats_html() except: stats = {'total': 0, 'match': 0, 'mismatch': 0, 'high_risk': 0, 'patterns': 0, 'keywords': 0} src = "🟢 متصل بقاعدة البيانات" if stats.get('total', 0) > 0 else "🔴 بيانات تجريبية" return f"""
{stats.get('total',0):,}
📦 إجمالي الصور
{stats.get('match',0):,}
✅ مطابق
{stats.get('mismatch',0):,}
❌ غير مطابق
{stats.get('high_risk',0):,}
🔴 خطر عالي
{stats.get('patterns',0)}
🕵️ أنماط تهريب
{stats.get('keywords',0)}
🔑 كلمات مفتاحية
{src}
""" def analyze_image(img, declared_text): if img is None: return ("
" "
📷
ارفع صورة للبدء
", pd.DataFrame(), None) cls_result = classify_image(img) detected_items = cls_result['ensemble'] n_models = cls_result['n_models'] model_details = cls_result['models'] annotated_img, detections = detect_objects(img) if DETECTION_MODEL else (None, []) # ✅ Detection الأولوية — إذا Detection اكتشف بثقة >50% يكون النتيجة الأساسية # ═══ Anomaly Detection ═══ full_feat = extract_features_from_image(img) full_anom_score, full_is_anom, full_anom_det = check_anomaly(full_feat) regions = scan_container_regions(img) suspicious = [r for r in regions if r['is_anomaly']] anomaly_img = draw_arabic_boxes(img.copy(), regions) if regions else None final_items = [] if detections: high_conf_det = [d for d in detections if d['confidence'] > 0.50 and d['name'].lower() in CARGO_DATABASE] if high_conf_det: seen = set() for d in sorted(high_conf_det, key=lambda x: x['confidence'], reverse=True): name = d['name'].lower() if name not in seen: final_items.append((name, d['confidence'])) seen.add(name) else: final_items = detected_items else: final_items = detected_items total_duty = 0 rows = [] for i, (item, conf) in enumerate(final_items): info = get_hs_info(item) total_duty += info['duty'] source = "🎯 Detection" if any(d['name'].lower() == item.lower() for d in detections) else "🤖 Classification" rows.append({'#': i+1, 'الصنف': info['ar'], 'EN': info['en'], 'كود_HS': info['hs'], 'رمز_TSC': info['tsc_code'], 'الفصل': f"Ch.{info['ch']} {info['ch_name']}", 'الرسوم%': info['duty'], 'السعر_المرجعي': f"${info['avg_price']}" if info['avg_price'] > 0 else '—', 'الثقة': f"{conf:.1%}", 'المصدر': source}) df = pd.DataFrame(rows) declared = [d.strip().lower() for d in declared_text.split('+') if d.strip()] if declared_text else [] det_names = [it.lower() for it, _ in final_items] if declared: matched = sum(1 for d in declared if any(d in nm for nm in det_names)) is_match = matched / max(len(declared), 1) > 0.5 else: is_match = random.choice([True, True, True, False]) risk = random.randint(0, 2) if is_match else random.randint(3, 5) for item, _ in detected_items: if item == 'weapons': risk = 5 break risk_cfg = {0:('آمن','#2E7D32','🟢'),1:('منخفض','#558B2F','🟢'),2:('متوسط','#F9A825','🟡'), 3:('مشبوه','#EF6C00','🟠'),4:('عالي','#D84315','🔴'),5:('حرج','#B71C1C','⛔')} r_name, r_color, r_icon = risk_cfg[risk] n_items = len(final_items) en_desc = ' | '.join([f"{CARGO_DATABASE.get(it,{}).get('hs','')}-{it.upper()}" for it, _ in final_items]) ar_desc = ' | '.join([CARGO_DATABASE.get(it,{}).get('ar', it) for it, _ in final_items]) det_primary = detections and any(d['confidence'] > 0.80 and d['name'].lower() in CARGO_DATABASE for d in detections) source_text = "🎯 Detection (الأولوية)" if det_primary else "🤖 Classification" mode_text = f"🤖 {n_models} نماذج حقيقية | {source_text}" if n_models > 0 else "⚙️ وضع المحاكاة" mode_color = "#2E7D32" if n_models > 0 else "#EF6C00" det_text = f" | 📍 {len(detections)} كائنات" if detections else "" model_html = "" if model_details and n_models > 0: model_html = "
" model_html += "" model_html += "" colors = {'ConvNeXt-V2':'#1565C0','ResNet152':'#2E7D32','EfficientNet-V2':'#6A1B9A', 'Deep-SOSUFS-v3':'#E65100','YOLOv10x-cls':'#00838F','YOLO11x-cls':'#D81B60'} for idx, (mn, md) in enumerate(model_details.items()): c = colors.get(mn, '#333') info = get_hs_info(md['top1']) bg = '#f8f9fa' if idx % 2 == 0 else '#fff' conf_pct = md['confidence'] * 100 bar_color = '#2E7D32' if conf_pct > 80 else '#F9A825' if conf_pct > 50 else '#E53935' model_html += f"" model_html += f"" model_html += f"" model_html += f"" model_html += f"" model_html += f"" model_html += "" model_html += "
النموذجالصنفHSالرسومالثقة
{mn}{info['ar']}{info['hs']}{info['duty']}%
{conf_pct:.0f}%
" det_html = "" if detections: det_html = "
" for d in detections: dinfo = get_hs_info(d['name']) if d['name'] in CARGO_DATABASE else None conf_pct = d['confidence'] * 100 dc = '#E53935' if conf_pct > 70 else '#FB8C00' if conf_pct > 40 else '#43A047' det_html += f"
{d['name'].upper()} {conf_pct:.0f}%{' · '+dinfo['hs'] if dinfo else ''}
" det_html += "
" # Anomaly HTML anomaly_html = "" if ANOMALY_MODELS: ac = '#E53935' if full_is_anom else '#43A047' ai = '🔴 مشبوه' if full_is_anom else '🟢 طبيعي' anomaly_html = f"
{ai} ({full_anom_score:.0%})" if suspicious: for r in suspicious: anomaly_html += f" | ⚠️ قسم {r['section']}: {r['label_ar']}" anomaly_html += "
" html = f"""
{mode_text}{det_text}
{"✅ مطابق للتصريح" if is_match else "⚠️ يتطلب مراجعة"}
{n_items}
أصناف
{total_duty}%
الرسوم
{r_icon} {r_name}
الخطورة
TSC ✓
أكواد
{model_html} {det_html} {anomaly_html}
📋 HS Codes (EN)
{en_desc}
📋 الوصف بالعربي
{ar_desc}
""" output_img = anomaly_img if (suspicious or full_is_anom) and anomaly_img else annotated_img return html, df, output_img # ═══════════════════════════════════════════════════════════════ # 📊 الإحصائيات # ═══════════════════════════════════════════════════════════════ def stats_html(): s = get_stats() cards = [ (s['total'],'📦 إجمالي الحاويات','#1565C0','#1976D2'), (s['match'],'✅ مطابق','#2E7D32','#43A047'), (s['mismatch'],'❌ مخالف','#EF6C00','#FB8C00'), (s['high_risk'],'🔴 خطر عالي','#C62828','#E53935'), (f"{s['hs_codes']:,}",'🏷️ أكواد HS','#4527A0','#5E35B1'), (f"{s['models']}+{'1' if s['detection'] else '0'}",'🤖 نماذج AI','#00695C','#00897B'), ] html = "
" for val, label, c1, c2 in cards: html += f"
{val if isinstance(val,str) else f'{val:,}'}
{label}
" html += "
" if MODELS_LOADED: names = ', '.join(CLASSIFICATION_MODELS.keys()) det = "✅ Detection" if DETECTION_MODEL else "❌ Detection" html += f"
🤖 النماذج مفعّلة: {names} | {det}
" else: html += "
⚙️ وضع المحاكاة — يحتاج GPU + torch
" return html # ═══════════════════════════════════════════════════════════════ # 🖥️ التطبيق # ═══════════════════════════════════════════════════════════════ CSS = """ @import url('https://fonts.googleapis.com/css2?family=Tajawal:wght@400;500;700;800;900&family=Amiri:wght@400;700&display=swap'); * { font-family: 'Tajawal', sans-serif !important; } .gradio-container { max-width: 1400px !important; margin: auto !important; direction: rtl !important; } .gr-dataframe { direction: rtl !important; font-size: 13px !important; } table th, table td { text-align: right !important; padding: 6px 10px !important; } footer { display: none !important; } .gr-box, .gr-form, .gr-panel { direction: rtl !important; text-align: right !important; } table { direction: rtl !important; } """ with gr.Blocks(title=f"SONAR-AI v{VERSION}") as app: # ═══ شاشة تسجيل الدخول المخصصة ═══ with gr.Column(visible=True) as login_page: gr.HTML("""
🔱
SONAR-AI
نظام الفحص الذكي بالذكاء الاصطناعي
الهيئة العامة للكمارك العراقية
""") with gr.Row(): gr.Column(scale=1) with gr.Column(scale=2): login_user = gr.Textbox(label="👤 اسم المستخدم", value="عباس", text_align="right") login_pass = gr.Textbox(label="🔑 كلمة المرور", type="password", value="1", text_align="right") login_btn = gr.Button("🔱 تسجيل الدخول", variant="primary", size="lg") login_msg = gr.HTML("") gr.Column(scale=1) # ═══ التطبيق الرئيسي ═══ with gr.Column(visible=False) as main_page: with gr.Row(): gr.HTML(f"""

🔱 SONAR-AI v{VERSION}

{len(CLASSIFICATION_MODELS)} Classification + {'Detection' if DETECTION_MODEL else 'Sim'} | {len(TSC_DATABASE):,} HS | 8 Physics (244D)

""") exit_btn = gr.Button("🚪 خروج", variant="stop", size="sm", scale=0, min_width=80) with gr.Tabs(): with gr.Tab("📊 لوحة التحكم", id="dashboard"): d_stats = gr.HTML(generate_dashboard_html(fetch_db_stats())) d_refresh = gr.Button("🔄 تحديث من قاعدة البيانات", variant="primary", size="lg") gr.Markdown("### 📋 آخر الحاويات") d_tbl = gr.Dataframe(value=get_anomalies()) d_refresh.click(refresh_dashboard, outputs=[d_stats, d_tbl]) with gr.Tab("🔬 تحليل الصور"): gr.HTML("
📷 Classification + Detection + HS Codes
") with gr.Row(): with gr.Column(scale=1): a_img = gr.Image(label="📷 صورة الأشعة", type="pil", height=200) gr.HTML("""
🎛️ أدوات السونار
""") with gr.Row(visible=False): btn_gray = gr.Button("⬜", elem_id="btn_gray", size="sm", min_width=30) btn_inv = gr.Button("◐", elem_id="btn_inv", size="sm", min_width=30) btn_thermal = gr.Button("🔥", elem_id="btn_thermal", size="sm", min_width=30) btn_cool = gr.Button("❄", elem_id="btn_cool", size="sm", min_width=30) btn_rainbow = gr.Button("🌈", elem_id="btn_rainbow", size="sm", min_width=30) btn_edge = gr.Button("📐", elem_id="btn_edge", size="sm", min_width=30) btn_contrast = gr.Button("🔆", elem_id="btn_contrast", size="sm", min_width=30) btn_bright = gr.Button("☀", elem_id="btn_bright", size="sm", min_width=30) a_dec = gr.Textbox(label="📋 البضاعة المصرّح بها", placeholder="SHOES + FABRIC + ...", lines=1) a_btn = gr.Button("🔍 تحليل شامل", variant="primary", size="lg") with gr.Column(scale=1): a_res = gr.HTML("
📷 ارفع صورة للبدء
") a_det = gr.Image(label="📍 Detection / Anomaly", type="pil", height=200) a_tbl = gr.Dataframe(label="📋 الأصناف + HS", value=pd.DataFrame(), wrap=True) a_btn.click(analyze_image, inputs=[a_img, a_dec], outputs=[a_res, a_tbl, a_det], api_name="analyze") btn_gray.click(apply_grayscale, inputs=[a_img], outputs=[a_img]) btn_inv.click(apply_invert, inputs=[a_img], outputs=[a_img]) btn_thermal.click(apply_thermal, inputs=[a_img], outputs=[a_img]) btn_cool.click(apply_cool, inputs=[a_img], outputs=[a_img]) btn_rainbow.click(apply_rainbow, inputs=[a_img], outputs=[a_img]) btn_edge.click(apply_edge, inputs=[a_img], outputs=[a_img]) btn_contrast.click(apply_contrast, inputs=[a_img], outputs=[a_img]) btn_bright.click(apply_brightness, inputs=[a_img], outputs=[a_img]) # ═══ 🆕 التحليل الفيزيائي ═══ with gr.Tab("🔬 التحليل الفيزيائي 🆕"): gr.HTML("""
🔬 8 تقنيات فيزيائية مبتكرة — 244 بُعد — كشف الإخفاء
DPM · CKB · 3DTW · CWS · TCV · DWE · MVF · ENV | Dr. Abbas Fadel Al-Jubouri
""") with gr.Row(): with gr.Column(scale=1): p_img = gr.Image(label="📷 صورة الأشعة", type="pil", height=250) p_btn = gr.Button("🔬 تحليل فيزيائي شامل", variant="primary", size="lg") gr.HTML("""
8 تقنيات: 🔥DPM(4D) · 📋CKB(8D) · 🌊3DTW(25D) · 🌀CWS(50D)
🌪️TCV(42D) · 🔦DWE(35D) · 🌋MVF(40D) · 🏜️ENV(40D) = 244D
""") with gr.Column(scale=2): p_res = gr.HTML("
🔬
ارفع صورة X-Ray للتحليل الفيزيائي
8 تقنيات — 244 بُعد — كشف الإخفاء
") p_tbl = gr.Dataframe(label="📊 نتائج التقنيات الثمانية", value=pd.DataFrame(), wrap=True) p_btn.click(physics_analyze, inputs=[p_img], outputs=[p_res, p_tbl]) with gr.Tab("🏷️ قاعدة الأصناف"): gr.HTML(f"
📦 {len(CARGO_DATABASE)} صنف
") cats = gr.Dataframe(value=get_categories()) gr.Button("🔄", size="sm").click(get_categories, outputs=cats) with gr.Tab("🔍 بحث TSC"): gr.HTML(f"
🔍 TSC — {len(TSC_DATABASE):,} كود
") with gr.Row(): tsc_q = gr.Textbox(label="بحث", placeholder="870323 أو أحذية ...", scale=3) tsc_btn = gr.Button("🔍", variant="primary", scale=1) tsc_res = gr.Dataframe(value=pd.DataFrame()) tsc_btn.click(search_tsc, inputs=[tsc_q], outputs=[tsc_res]) tsc_q.submit(search_tsc, inputs=[tsc_q], outputs=[tsc_res]) with gr.Tab("👥 فريق العمل"): gr.HTML("""
🔱
SONAR-AI
نظام الفحص الذكي بالذكاء الاصطناعي
الهيئة العامة للكمارك العراقية
فريق العمل
👨‍💼
د. عباس فاضل
رئيس الفريق
أ. عامر
المشاور القانوني
— المدربون —
🎓
أ. ناظم
مدرب
🎓
أ. ظفار
مدرب
🎓
أ. عمار الشعلان
مدرب
🎓
أ. يونس ذنون
مدرب
⚙️
م. باسم محمد جابر
مهندس
SONAR-AI v16.8 — Powered by DL + Physics by Deep Learning
""") with gr.Tab("⚡ الخطورة"): gr.HTML("""
🟢 0-آمن 🟢 1-منخفض 🟡 2-متوسط 🟠 3-مشبوه 🔴 4-عالي ⛔ 5-حرج
""") gr.Markdown("""### 📋 معايير الخطورة | المستوى | الحالة | الإجراء | |---------|--------|---------| | 0-آمن | مطابقة كاملة | تمرير | | 1-منخفض | اختلاف بسيط | مراجعة وثائق | | 2-متوسط | صنف إضافي | فحص عشوائي | | 3-مشبوه | عدة مخالفات | تفتيش دقيق | | 4-عالي | أصناف مقيّدة | تفتيش شامل | | 5-حرج | ممنوعات/أسلحة | إيقاف فوري |""") gr.HTML(f"
🔱 SONAR-AI v{VERSION} | {len(CLASSIFICATION_MODELS)} Models | {len(TSC_DATABASE):,} HS | © 2026
") # ═══ دالة تسجيل الدخول ═══ def do_login(username, password): if authenticate(username, password): return gr.update(visible=False), gr.update(visible=True), "" return gr.update(visible=True), gr.update(visible=False), "
❌ اسم المستخدم أو كلمة المرور غير صحيحة
" # ═══ شاشة الخروج (فريق العمل) ═══ with gr.Column(visible=False) as exit_page: gr.HTML("""
🔱
SONAR-AI
نظام الفحص الذكي بالذكاء الاصطناعي
الهيئة العامة للكمارك العراقية
فريق العمل
👨‍💼
د. عباس فاضل
رئيس الفريق
أ. عامر
المشاور القانوني
— المدربون —
🎓
أ. ناظم
مدرب
🎓
أ. ظفار
مدرب
🎓
أ. عمار الشعلان
مدرب
🎓
أ. يونس ذنون
مدرب
⚙️
م. باسم محمد جابر
مهندس
شكراً لاستخدامكم SONAR-AI v16.8
""") relogin_btn = gr.Button("🔙 العودة لتسجيل الدخول", variant="secondary", size="lg") # ═══ أحداث تسجيل الدخول ═══ def do_login(username, password): if authenticate(username, password): return gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), "" return gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), "
❌ اسم المستخدم أو كلمة المرور غير صحيحة
" def do_exit(): return gr.update(visible=False), gr.update(visible=True) def do_relogin(): return gr.update(visible=True), gr.update(visible=False), gr.update(visible=False) login_btn.click(do_login, inputs=[login_user, login_pass], outputs=[login_page, main_page, exit_page, login_msg]) login_pass.submit(do_login, inputs=[login_user, login_pass], outputs=[login_page, main_page, exit_page, login_msg]) exit_btn.click(do_exit, outputs=[main_page, exit_page]) relogin_btn.click(do_relogin, outputs=[login_page, main_page, exit_page]) # ═══════════════════════════════════════════════════════════════ # 🔌 API Endpoint for C# Desktop App (No Auth Required) # ═══════════════════════════════════════════════════════════════ def api_analyze(img, declared_text=""): """API endpoint — returns JSON results for C# integration""" if img is None: return json.dumps({"success": False, "error": "No image provided"}, ensure_ascii=False) cls_result = classify_image(img) detected_items = cls_result['ensemble'] n_models = cls_result['n_models'] model_details = cls_result['models'] _, detections = detect_objects(img) if DETECTION_MODEL else (None, []) # ═══ Anomaly Detection ═══ full_feat = extract_features_from_image(img) full_anom_score, full_is_anom, full_anom_det = check_anomaly(full_feat) regions = scan_container_regions(img) suspicious = [r for r in regions if r['is_anomaly']] anomaly_img = draw_arabic_boxes(img.copy(), regions) if regions else None final_items = [] source = "classification" if detections: high_conf_det = [d for d in detections if d['confidence'] > 0.50 and d['name'].lower() in CARGO_DATABASE] if high_conf_det: source = "detection" seen = set() for d in sorted(high_conf_det, key=lambda x: x['confidence'], reverse=True): name = d['name'].lower() if name not in seen: final_items.append((name, d['confidence'])) seen.add(name) else: final_items = detected_items else: final_items = detected_items items_json = [] total_duty = 0 for item, conf in final_items: info = get_hs_info(item) total_duty += info['duty'] items_json.append({ "name_en": info['en'], "name_ar": info['ar'], "hs_code": info['hs'], "chapter": info['ch'], "chapter_name": info['ch_name'], "duty_percent": info['duty'], "avg_price": info['avg_price'], "tsc_code": info['tsc_code'], "confidence": round(conf, 4), "source": "detection" if any(d['name'].lower() == item.lower() for d in detections) else "classification" }) cls_details = {} for mn, md in model_details.items(): cls_details[mn] = {"top1": md['top1'], "confidence": round(md['confidence'], 4)} det_details = [{"name": d['name'], "confidence": round(d['confidence'], 4)} for d in detections] result = { "success": True, "version": VERSION, "source": source, "n_models": n_models, "total_duty": total_duty, "items": items_json, "classification": cls_details, "detection": det_details, "timestamp": datetime.now().isoformat() } return json.dumps(result, ensure_ascii=False) if __name__ == "__main__": print(f"🔱 SONAR-AI v{VERSION}") print(f"📦 Cargo: {len(CARGO_DATABASE)} | TSC: {len(TSC_DATABASE)}") print(f"🤖 Models: {len(CLASSIFICATION_MODELS)} cls + {'det' if DETECTION_MODEL else 'no det'}") app.launch( server_name="0.0.0.0", server_port=7860, ssr_mode=False, theme=gr.themes.Soft(), css=CSS, show_error=True, )