Spaces:
Running
Running
File size: 7,434 Bytes
042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
#!/usr/bin/env python3
# اختبار سريع للتحقق من حالة النظام
import requests
import time
import json
import socket
import os
# استيراد الملفات المحلية
try:
from peer_discovery import PORT, get_local_ip, get_peers
from load_balancer import send_task
from smart_tasks import prime_calculation
except ImportError:
print("⚠️ بعض الوحدات غير متوفرة، جرب تشغيل: python launcher.py --tray")
def get_available_port():
"""الحصول على منفذ متاح"""
try:
from peer_discovery import PORT
return PORT
except:
return 8888 # منفذ افتراضي
def test_local_server():
"""فحص الخادم المحلي"""
port = get_available_port()
endpoints = ["/health", "/status", "/", "/api/status"]
for endpoint in endpoints:
try:
url = f"http://localhost:{port}{endpoint}"
response = requests.get(url, timeout=5)
if response.status_code == 200:
print(f"✅ الخادم المحلي يعمل على: {url}")
try:
data = response.json()
print(f"📊 حالة النظام: {data}")
except:
print("📄 استجابة نصية")
return True
except requests.exceptions.ConnectionError:
continue
except requests.exceptions.Timeout:
print(f"⏰ انتهت المهلة مع: localhost:{port}{endpoint}")
continue
except Exception as e:
print(f"❌ خطأ مع localhost:{port}{endpoint}: {e}")
continue
print("❌ الخادم المحلي غير متاح")
return False
def test_peer_discovery():
"""اختبار اكتشاف الأقران"""
print("🔍 اختبار اكتشاف الأقران...")
try:
# محاولة استيراد واستخدام peer_discovery
from peer_discovery import PEERS, get_peers
local_ip = get_local_ip()
print(f"🌐 IP المحلي: {local_ip}")
print(f"🔌 المنفذ: {get_available_port()}")
peers = list(PEERS) if PEERS else []
if hasattr(get_peers, '__call__'):
additional_peers = get_peers()
if additional_peers:
peers.extend(additional_peers)
print(f"📱 تم اكتشاف {len(peers)} جهاز")
for i, peer in enumerate(peers[:5]): # عرض أول 5 أقران فقط
print(f" {i+1}. {peer}")
return len(peers) > 0
except Exception as e:
print(f"⚠️ تعذر اكتشاف الأقران: {e}")
return False
def test_basic_task():
"""اختبار مهمة بسيطة"""
print("⚙️ اختبار مهمة بسيطة...")
try:
start_time = time.time()
# استخدام prime_calculation مباشرة
result = prime_calculation(1000) # عدد أصغر للاختبار السريع
duration = time.time() - start_time
if "error" in result:
print(f"❌ فشل في المعالجة: {result['error']}")
return False
print(f"✅ تمت المعالجة في {duration:.2f} ثانية")
print(f"📊 النتيجة: {result.get('count', 0)} أعداد أولية")
print(f"🔢 أول 5 أعداد: {result.get('primes', [])[:5]}")
return True
except Exception as e:
print(f"❌ فشل في المعالجة: {e}")
return False
def test_load_balancer():
"""اختبار موزع الحمل"""
print("⚖️ اختبار موزع الحمل...")
try:
from load_balancer import send_task
result = send_task("prime_calculation", 500)
if "error" in result:
print(f"⚠️ موزع الحمل: {result['error']}")
return False
else:
print(f"✅ موزع الحمل يعمل - {result.get('count', 0)} أعداد أولية")
return True
except Exception as e:
print(f"⚠️ تعذر اختبار موزع الحمل: {e}")
return False
def test_network_connectivity():
"""اختبار اتصال الشبكة"""
print("🌐 اختبار اتصال الشبكة...")
# فحص الاتصال بالإنترنت
try:
response = requests.get("http://www.google.com", timeout=5)
print("✅ الاتصال بالإنترنت متاح")
internet = True
except:
print("❌ لا يوجد اتصال بالإنترنت")
internet = False
# فحص الشبكة المحلية
try:
local_ip = socket.gethostbyname(socket.gethostname())
print(f"🏠 الشبكة المحلية: {local_ip}")
local_network = True
except:
print("❌ مشكلة في الشبكة المحلية")
local_network = False
return internet or local_network
def test_connection():
"""اختبار سريع للاتصال"""
print("=" * 50)
print("🚀 اختبار اتصال سريع لنظام AmalOffload")
print("=" * 50)
tests = [
("🌐 اتصال الشبكة", test_network_connectivity),
("🖥️ الخادم المحلي", test_local_server),
("🔍 اكتشاف الأقران", test_peer_discovery),
("⚙️ المهمة الأساسية", test_basic_task),
("⚖️ موزع الحمل", test_load_balancer),
]
results = []
for test_name, test_func in tests:
print(f"\n{test_name}...")
try:
result = test_func()
results.append(result)
time.sleep(1) # فاصل بين الاختبارات
except Exception as e:
print(f"💥 خطأ غير متوقع: {e}")
results.append(False)
print("\n" + "=" * 50)
print("📊 نتائج الاختبار:")
print("=" * 50)
success_count = sum(results)
total_tests = len(results)
for i, (test_name, _) in enumerate(tests):
status = "✅" if results[i] else "❌"
print(f"{status} {test_name}")
print(f"\n🎯 النتيجة النهائية: {success_count}/{total_tests}")
if success_count == total_tests:
print("🎉 النظام يعمل بشكل ممتاز!")
elif success_count >= 3:
print("👍 النظام يعمل بشكل جيد")
else:
print("⚠️ هناك مشاكل تحتاج إصلاح")
return success_count >= 3 # يعتبر ناجحاً إذا نجح 3/5 اختبارات على الأقل
if __name__ == "__main__":
success = test_connection()
if success:
print("\n💡 يمكنك الآن تشغيل:")
print(" python launcher.py --tray (لتشغيل النظام)")
print(" python test_distributed_system.py (لاختبار متقدم)")
else:
print("\n🔧 اقتراحات الإصلاح:")
print(" 1. تأكد من تشغيل: python background_service.py start")
print(" 2. تحقق من إعدادات الجدار الناري")
print(" 3. جرب: python peer_discovery.py (لاكتشاف الأقران)")
|