#!/usr/bin/env python3 # اختبار سريع للتحقق من حالة النظام import requests import time import json import socket import os # استيراد الملفات المحلية try: from peer_discovery import PORT, get_local_ip, get_peers from load_balancer import send_task from smart_tasks import prime_calculation except ImportError: print("⚠️ بعض الوحدات غير متوفرة، جرب تشغيل: python launcher.py --tray") def get_available_port(): """الحصول على منفذ متاح""" try: from peer_discovery import PORT return PORT except: return 8888 # منفذ افتراضي def test_local_server(): """فحص الخادم المحلي""" port = get_available_port() endpoints = ["/health", "/status", "/", "/api/status"] for endpoint in endpoints: try: url = f"http://localhost:{port}{endpoint}" response = requests.get(url, timeout=5) if response.status_code == 200: print(f"✅ الخادم المحلي يعمل على: {url}") try: data = response.json() print(f"📊 حالة النظام: {data}") except: print("📄 استجابة نصية") return True except requests.exceptions.ConnectionError: continue except requests.exceptions.Timeout: print(f"⏰ انتهت المهلة مع: localhost:{port}{endpoint}") continue except Exception as e: print(f"❌ خطأ مع localhost:{port}{endpoint}: {e}") continue print("❌ الخادم المحلي غير متاح") return False def test_peer_discovery(): """اختبار اكتشاف الأقران""" print("🔍 اختبار اكتشاف الأقران...") try: # محاولة استيراد واستخدام peer_discovery from peer_discovery import PEERS, get_peers local_ip = get_local_ip() print(f"🌐 IP المحلي: {local_ip}") print(f"🔌 المنفذ: {get_available_port()}") peers = list(PEERS) if PEERS else [] if hasattr(get_peers, '__call__'): additional_peers = get_peers() if additional_peers: peers.extend(additional_peers) print(f"📱 تم اكتشاف {len(peers)} جهاز") for i, peer in enumerate(peers[:5]): # عرض أول 5 أقران فقط print(f" {i+1}. {peer}") return len(peers) > 0 except Exception as e: print(f"⚠️ تعذر اكتشاف الأقران: {e}") return False def test_basic_task(): """اختبار مهمة بسيطة""" print("⚙️ اختبار مهمة بسيطة...") try: start_time = time.time() # استخدام prime_calculation مباشرة result = prime_calculation(1000) # عدد أصغر للاختبار السريع duration = time.time() - start_time if "error" in result: print(f"❌ فشل في المعالجة: {result['error']}") return False print(f"✅ تمت المعالجة في {duration:.2f} ثانية") print(f"📊 النتيجة: {result.get('count', 0)} أعداد أولية") print(f"🔢 أول 5 أعداد: {result.get('primes', [])[:5]}") return True except Exception as e: print(f"❌ فشل في المعالجة: {e}") return False def test_load_balancer(): """اختبار موزع الحمل""" print("⚖️ اختبار موزع الحمل...") try: from load_balancer import send_task result = send_task("prime_calculation", 500) if "error" in result: print(f"⚠️ موزع الحمل: {result['error']}") return False else: print(f"✅ موزع الحمل يعمل - {result.get('count', 0)} أعداد أولية") return True except Exception as e: print(f"⚠️ تعذر اختبار موزع الحمل: {e}") return False def test_network_connectivity(): """اختبار اتصال الشبكة""" print("🌐 اختبار اتصال الشبكة...") # فحص الاتصال بالإنترنت try: response = requests.get("http://www.google.com", timeout=5) print("✅ الاتصال بالإنترنت متاح") internet = True except: print("❌ لا يوجد اتصال بالإنترنت") internet = False # فحص الشبكة المحلية try: local_ip = socket.gethostbyname(socket.gethostname()) print(f"🏠 الشبكة المحلية: {local_ip}") local_network = True except: print("❌ مشكلة في الشبكة المحلية") local_network = False return internet or local_network def test_connection(): """اختبار سريع للاتصال""" print("=" * 50) print("🚀 اختبار اتصال سريع لنظام AmalOffload") print("=" * 50) tests = [ ("🌐 اتصال الشبكة", test_network_connectivity), ("🖥️ الخادم المحلي", test_local_server), ("🔍 اكتشاف الأقران", test_peer_discovery), ("⚙️ المهمة الأساسية", test_basic_task), ("⚖️ موزع الحمل", test_load_balancer), ] results = [] for test_name, test_func in tests: print(f"\n{test_name}...") try: result = test_func() results.append(result) time.sleep(1) # فاصل بين الاختبارات except Exception as e: print(f"💥 خطأ غير متوقع: {e}") results.append(False) print("\n" + "=" * 50) print("📊 نتائج الاختبار:") print("=" * 50) success_count = sum(results) total_tests = len(results) for i, (test_name, _) in enumerate(tests): status = "✅" if results[i] else "❌" print(f"{status} {test_name}") print(f"\n🎯 النتيجة النهائية: {success_count}/{total_tests}") if success_count == total_tests: print("🎉 النظام يعمل بشكل ممتاز!") elif success_count >= 3: print("👍 النظام يعمل بشكل جيد") else: print("⚠️ هناك مشاكل تحتاج إصلاح") return success_count >= 3 # يعتبر ناجحاً إذا نجح 3/5 اختبارات على الأقل if __name__ == "__main__": success = test_connection() if success: print("\n💡 يمكنك الآن تشغيل:") print(" python launcher.py --tray (لتشغيل النظام)") print(" python test_distributed_system.py (لاختبار متقدم)") else: print("\n🔧 اقتراحات الإصلاح:") print(" 1. تأكد من تشغيل: python background_service.py start") print(" 2. تحقق من إعدادات الجدار الناري") print(" 3. جرب: python peer_discovery.py (لاكتشاف الأقران)")