amalCoreFlow / load_balancer.py
osamabyc86's picture
Upload 69 files
84c65b6 verified
# load_balancer.py
import peer_discovery
import requests
import time
import smart_tasks
import psutil
import socket
import threading
import logging
from datetime import datetime
from peer_discovery import PORT
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class SmartLoadBalancer:
def __init__(self):
self.peer_stats = {}
self.local_hostname = socket.gethostname()
self.local_ip = self.get_local_ip()
self.running = True
def get_local_ip(self):
"""الحصول على IP المحلي"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except:
return "127.0.0.1"
def send(self, peer, func, *args, **kwargs):
"""إرسال مهمة إلى نظير"""
try:
url = f"{peer}/run" if not peer.endswith("/run") else peer
payload = {
"func": func,
"args": list(args),
"kwargs": kwargs
}
logging.info(f"📤 إرسال مهمة {func} إلى {peer}")
response = requests.post(url, json=payload, timeout=15)
if response.status_code == 200:
result = response.json()
logging.info(f"✅ تم استلام النتيجة من {peer}")
return result
else:
logging.error(f"❌ خطأ من {peer}: {response.status_code}")
return {"error": f"HTTP {response.status_code}"}
except requests.exceptions.Timeout:
logging.error(f"⏰ انتهت المهلة مع {peer}")
return {"error": "Request timeout"}
except requests.exceptions.ConnectionError:
logging.error(f"🔌 تعذر الاتصال بـ {peer}")
return {"error": "Connection failed"}
except Exception as e:
logging.error(f"❌ خطأ غير متوقع مع {peer}: {str(e)}")
return {"error": str(e)}
def choose_peer(self):
"""اختيار أفضل جهاز مع خوارزمية متطورة"""
available_peers = self.get_available_peers()
if not available_peers:
logging.info("🔍 لم يتم العثور على أقران متاحة")
return None
# تقييم كل نظير
peer_scores = []
for peer in available_peers:
score = self.evaluate_peer(peer)
if score > 0:
peer_scores.append((peer, score))
if not peer_scores:
return None
# اختيار النظير بأعلى درجة
best_peer = max(peer_scores, key=lambda x: x[1])[0]
logging.info(f"🏆 أفضل نظير مختار: {best_peer}")
return best_peer
def get_available_peers(self):
"""الحصول على قائمة الأقران المتاحة"""
peers = []
# اكتشاف الأقران المحليين
local_peers = self.discover_local_peers()
peers.extend(local_peers)
# إضافة الأقران من peer_discovery
for peer in list(peer_discovery.PEERS):
if peer not in peers:
peers.append(peer)
# إزالة التكرارات والنظير المحلي
unique_peers = []
for peer in peers:
if self.is_self_peer(peer):
continue
if peer not in unique_peers:
unique_peers.append(peer)
logging.info(f"👥 الأقران المتاحون: {len(unique_peers)}")
return unique_peers
def discover_local_peers(self):
"""اكتشاف الأقران على الشبكة المحلية"""
local_peers = []
base_ip = ".".join(self.local_ip.split(".")[:-1]) # الحصول على 192.168.1
# فحص نطاق IPs المحلي
for i in range(1, 255):
if i == int(self.local_ip.split(".")[-1]):
continue # تخطي الذات
ip = f"{base_ip}.{i}"
peer_url = f"http://{ip}:{PORT}"
# فحص سريع للتوصيل
if self.check_peer_availability(peer_url):
local_peers.append(peer_url)
return local_peers
def check_peer_availability(self, peer_url):
"""فحص توفر النظير"""
try:
response = requests.get(f"{peer_url}/status", timeout=2)
return response.status_code == 200
except:
return False
def is_self_peer(self, peer_url):
"""فحص إذا كان النظير هو الجهاز الحالي"""
if f"://{self.local_ip}:" in peer_url:
return True
if f"://localhost:" in peer_url:
return True
if f"://127.0.0.1:" in peer_url:
return True
return False
def evaluate_peer(self, peer):
"""تقييم أداء النظير"""
try:
# الحصول على حالة النظام
status_url = peer.replace("/run", "/status") if "/run" in peer else f"{peer}/status"
response = requests.get(status_url, timeout=3)
if response.status_code == 200:
status = response.json()
# حساب الدرجة
score = 100
# خصم حسب استخدام CPU
cpu_usage = status.get("cpu_usage", 50)
score -= cpu_usage * 0.5
# خصم حسب استخدام الذاكرة
memory_usage = status.get("memory_usage", 50)
score -= memory_usage * 0.3
# مكافأة للاتصالات المحلية
if self.is_local_peer(peer):
score += 20
# خصم للاتصال البطيء
response_time = response.elapsed.total_seconds()
score -= response_time * 10
return max(0, score)
except Exception as e:
logging.debug(f"تعذر تقييم {peer}: {str(e)}")
return 0
def is_local_peer(self, peer):
"""فحص إذا كان النظير محلي"""
peer_ip = peer.split("://")[1].split(":")[0] if "://" in peer else peer.split(":")[0]
return self.is_local_ip(peer_ip)
def is_local_ip(self, ip):
"""فحص إذا كان IP محلي"""
return (
ip.startswith('192.168.') or
ip.startswith('10.') or
ip.startswith('172.') or
ip in ['127.0.0.1', 'localhost', self.local_ip]
)
def execute_task(self, func_name, *args, **kwargs):
"""تنفيذ المهمة إما محلياً أو عن بعد"""
peer = self.choose_peer()
if peer:
logging.info(f"🛰️ إرسال مهمة {func_name} إلى {peer}")
result = self.send(peer, func_name, *args, **kwargs)
if "error" not in result:
return result
else:
logging.warning(f"❌ فشل الإرسال إلى {peer}: {result['error']}")
# التنفيذ المحلي كبديل
logging.info(f"⚙️ تنفيذ محلي للمهمة {func_name}")
return self.execute_locally(func_name, *args, **kwargs)
def execute_locally(self, func_name, *args, **kwargs):
"""تنفيذ المهمة محلياً"""
try:
if hasattr(smart_tasks, func_name):
func = getattr(smart_tasks, func_name)
result = func(*args, **kwargs)
return result
else:
return {"error": f"الدالة {func_name} غير موجودة"}
except Exception as e:
return {"error": str(e)}
def start_monitoring(self):
"""بدء مراقبة الأقران"""
def monitor_loop():
while self.running:
try:
available_peers = self.get_available_peers()
logging.info(f"📊 المراقبة: {len(available_peers)} أقران متاحون")
time.sleep(30)
except Exception as e:
logging.error(f"خطأ في المراقبة: {e}")
time.sleep(60)
monitor_thread = threading.Thread(target=monitor_loop, daemon=True)
monitor_thread.start()
def stop(self):
"""إيقاف موازن الحمل"""
self.running = False
# إنشاء نسخة عالمية
load_balancer = SmartLoadBalancer()
def send_task(func_name, *args, **kwargs):
"""وظيفة مساعدة لإرسال المهام"""
return load_balancer.execute_task(func_name, *args, **kwargs)
# التشغيل الرئيسي
def main():
logging.info("🚀 بدء تشغيل موازن الحمل الذكي")
# بدء المراقبة
load_balancer.start_monitoring()
try:
while load_balancer.running:
# تنفيذ مهام اختبارية
result = load_balancer.execute_task("prime_calculation", 20000)
if "error" in result:
logging.error(f"❌ خطأ في التنفيذ: {result['error']}")
else:
primes_count = result.get('count', 0)
logging.info(f"✅ تم تنفيذ المهمة - عدد الأعداد الأولية: {primes_count}")
time.sleep(15)
except KeyboardInterrupt:
logging.info("🛑 إيقاف موازن الحمل...")
load_balancer.stop()
if __name__ == "__main__":
main()