amalCoreFlow / quick_connection_test.py
osamabyc86's picture
Upload 69 files
84c65b6 verified
#!/usr/bin/env python3
# اختبار سريع للتحقق من حالة النظام
import requests
import time
import json
import socket
import os
# استيراد الملفات المحلية
try:
from peer_discovery import PORT, get_local_ip, get_peers
from load_balancer import send_task
from smart_tasks import prime_calculation
except ImportError:
print("⚠️ بعض الوحدات غير متوفرة، جرب تشغيل: python launcher.py --tray")
def get_available_port():
"""الحصول على منفذ متاح"""
try:
from peer_discovery import PORT
return PORT
except:
return 8888 # منفذ افتراضي
def test_local_server():
"""فحص الخادم المحلي"""
port = get_available_port()
endpoints = ["/health", "/status", "/", "/api/status"]
for endpoint in endpoints:
try:
url = f"http://localhost:{port}{endpoint}"
response = requests.get(url, timeout=5)
if response.status_code == 200:
print(f"✅ الخادم المحلي يعمل على: {url}")
try:
data = response.json()
print(f"📊 حالة النظام: {data}")
except:
print("📄 استجابة نصية")
return True
except requests.exceptions.ConnectionError:
continue
except requests.exceptions.Timeout:
print(f"⏰ انتهت المهلة مع: localhost:{port}{endpoint}")
continue
except Exception as e:
print(f"❌ خطأ مع localhost:{port}{endpoint}: {e}")
continue
print("❌ الخادم المحلي غير متاح")
return False
def test_peer_discovery():
"""اختبار اكتشاف الأقران"""
print("🔍 اختبار اكتشاف الأقران...")
try:
# محاولة استيراد واستخدام peer_discovery
from peer_discovery import PEERS, get_peers
local_ip = get_local_ip()
print(f"🌐 IP المحلي: {local_ip}")
print(f"🔌 المنفذ: {get_available_port()}")
peers = list(PEERS) if PEERS else []
if hasattr(get_peers, '__call__'):
additional_peers = get_peers()
if additional_peers:
peers.extend(additional_peers)
print(f"📱 تم اكتشاف {len(peers)} جهاز")
for i, peer in enumerate(peers[:5]): # عرض أول 5 أقران فقط
print(f" {i+1}. {peer}")
return len(peers) > 0
except Exception as e:
print(f"⚠️ تعذر اكتشاف الأقران: {e}")
return False
def test_basic_task():
"""اختبار مهمة بسيطة"""
print("⚙️ اختبار مهمة بسيطة...")
try:
start_time = time.time()
# استخدام prime_calculation مباشرة
result = prime_calculation(1000) # عدد أصغر للاختبار السريع
duration = time.time() - start_time
if "error" in result:
print(f"❌ فشل في المعالجة: {result['error']}")
return False
print(f"✅ تمت المعالجة في {duration:.2f} ثانية")
print(f"📊 النتيجة: {result.get('count', 0)} أعداد أولية")
print(f"🔢 أول 5 أعداد: {result.get('primes', [])[:5]}")
return True
except Exception as e:
print(f"❌ فشل في المعالجة: {e}")
return False
def test_load_balancer():
"""اختبار موزع الحمل"""
print("⚖️ اختبار موزع الحمل...")
try:
from load_balancer import send_task
result = send_task("prime_calculation", 500)
if "error" in result:
print(f"⚠️ موزع الحمل: {result['error']}")
return False
else:
print(f"✅ موزع الحمل يعمل - {result.get('count', 0)} أعداد أولية")
return True
except Exception as e:
print(f"⚠️ تعذر اختبار موزع الحمل: {e}")
return False
def test_network_connectivity():
"""اختبار اتصال الشبكة"""
print("🌐 اختبار اتصال الشبكة...")
# فحص الاتصال بالإنترنت
try:
response = requests.get("http://www.google.com", timeout=5)
print("✅ الاتصال بالإنترنت متاح")
internet = True
except:
print("❌ لا يوجد اتصال بالإنترنت")
internet = False
# فحص الشبكة المحلية
try:
local_ip = socket.gethostbyname(socket.gethostname())
print(f"🏠 الشبكة المحلية: {local_ip}")
local_network = True
except:
print("❌ مشكلة في الشبكة المحلية")
local_network = False
return internet or local_network
def test_connection():
"""اختبار سريع للاتصال"""
print("=" * 50)
print("🚀 اختبار اتصال سريع لنظام AmalOffload")
print("=" * 50)
tests = [
("🌐 اتصال الشبكة", test_network_connectivity),
("🖥️ الخادم المحلي", test_local_server),
("🔍 اكتشاف الأقران", test_peer_discovery),
("⚙️ المهمة الأساسية", test_basic_task),
("⚖️ موزع الحمل", test_load_balancer),
]
results = []
for test_name, test_func in tests:
print(f"\n{test_name}...")
try:
result = test_func()
results.append(result)
time.sleep(1) # فاصل بين الاختبارات
except Exception as e:
print(f"💥 خطأ غير متوقع: {e}")
results.append(False)
print("\n" + "=" * 50)
print("📊 نتائج الاختبار:")
print("=" * 50)
success_count = sum(results)
total_tests = len(results)
for i, (test_name, _) in enumerate(tests):
status = "✅" if results[i] else "❌"
print(f"{status} {test_name}")
print(f"\n🎯 النتيجة النهائية: {success_count}/{total_tests}")
if success_count == total_tests:
print("🎉 النظام يعمل بشكل ممتاز!")
elif success_count >= 3:
print("👍 النظام يعمل بشكل جيد")
else:
print("⚠️ هناك مشاكل تحتاج إصلاح")
return success_count >= 3 # يعتبر ناجحاً إذا نجح 3/5 اختبارات على الأقل
if __name__ == "__main__":
success = test_connection()
if success:
print("\n💡 يمكنك الآن تشغيل:")
print(" python launcher.py --tray (لتشغيل النظام)")
print(" python test_distributed_system.py (لاختبار متقدم)")
else:
print("\n🔧 اقتراحات الإصلاح:")
print(" 1. تأكد من تشغيل: python background_service.py start")
print(" 2. تحقق من إعدادات الجدار الناري")
print(" 3. جرب: python peer_discovery.py (لاكتشاف الأقران)")