# load_balancer.py import peer_discovery import requests import time import smart_tasks import psutil import socket import threading import logging from datetime import datetime from peer_discovery import PORT logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') class SmartLoadBalancer: def __init__(self): self.peer_stats = {} self.local_hostname = socket.gethostname() self.local_ip = self.get_local_ip() self.running = True def get_local_ip(self): """الحصول على IP المحلي""" try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] s.close() return ip except: return "127.0.0.1" def send(self, peer, func, *args, **kwargs): """إرسال مهمة إلى نظير""" try: url = f"{peer}/run" if not peer.endswith("/run") else peer payload = { "func": func, "args": list(args), "kwargs": kwargs } logging.info(f"📤 إرسال مهمة {func} إلى {peer}") response = requests.post(url, json=payload, timeout=15) if response.status_code == 200: result = response.json() logging.info(f"✅ تم استلام النتيجة من {peer}") return result else: logging.error(f"❌ خطأ من {peer}: {response.status_code}") return {"error": f"HTTP {response.status_code}"} except requests.exceptions.Timeout: logging.error(f"⏰ انتهت المهلة مع {peer}") return {"error": "Request timeout"} except requests.exceptions.ConnectionError: logging.error(f"🔌 تعذر الاتصال بـ {peer}") return {"error": "Connection failed"} except Exception as e: logging.error(f"❌ خطأ غير متوقع مع {peer}: {str(e)}") return {"error": str(e)} def choose_peer(self): """اختيار أفضل جهاز مع خوارزمية متطورة""" available_peers = self.get_available_peers() if not available_peers: logging.info("🔍 لم يتم العثور على أقران متاحة") return None # تقييم كل نظير peer_scores = [] for peer in available_peers: score = self.evaluate_peer(peer) if score > 0: peer_scores.append((peer, score)) if not peer_scores: return None # اختيار النظير بأعلى درجة best_peer = max(peer_scores, key=lambda x: x[1])[0] logging.info(f"🏆 أفضل نظير مختار: {best_peer}") return best_peer def get_available_peers(self): """الحصول على قائمة الأقران المتاحة""" peers = [] # اكتشاف الأقران المحليين local_peers = self.discover_local_peers() peers.extend(local_peers) # إضافة الأقران من peer_discovery for peer in list(peer_discovery.PEERS): if peer not in peers: peers.append(peer) # إزالة التكرارات والنظير المحلي unique_peers = [] for peer in peers: if self.is_self_peer(peer): continue if peer not in unique_peers: unique_peers.append(peer) logging.info(f"👥 الأقران المتاحون: {len(unique_peers)}") return unique_peers def discover_local_peers(self): """اكتشاف الأقران على الشبكة المحلية""" local_peers = [] base_ip = ".".join(self.local_ip.split(".")[:-1]) # الحصول على 192.168.1 # فحص نطاق IPs المحلي for i in range(1, 255): if i == int(self.local_ip.split(".")[-1]): continue # تخطي الذات ip = f"{base_ip}.{i}" peer_url = f"http://{ip}:{PORT}" # فحص سريع للتوصيل if self.check_peer_availability(peer_url): local_peers.append(peer_url) return local_peers def check_peer_availability(self, peer_url): """فحص توفر النظير""" try: response = requests.get(f"{peer_url}/status", timeout=2) return response.status_code == 200 except: return False def is_self_peer(self, peer_url): """فحص إذا كان النظير هو الجهاز الحالي""" if f"://{self.local_ip}:" in peer_url: return True if f"://localhost:" in peer_url: return True if f"://127.0.0.1:" in peer_url: return True return False def evaluate_peer(self, peer): """تقييم أداء النظير""" try: # الحصول على حالة النظام status_url = peer.replace("/run", "/status") if "/run" in peer else f"{peer}/status" response = requests.get(status_url, timeout=3) if response.status_code == 200: status = response.json() # حساب الدرجة score = 100 # خصم حسب استخدام CPU cpu_usage = status.get("cpu_usage", 50) score -= cpu_usage * 0.5 # خصم حسب استخدام الذاكرة memory_usage = status.get("memory_usage", 50) score -= memory_usage * 0.3 # مكافأة للاتصالات المحلية if self.is_local_peer(peer): score += 20 # خصم للاتصال البطيء response_time = response.elapsed.total_seconds() score -= response_time * 10 return max(0, score) except Exception as e: logging.debug(f"تعذر تقييم {peer}: {str(e)}") return 0 def is_local_peer(self, peer): """فحص إذا كان النظير محلي""" peer_ip = peer.split("://")[1].split(":")[0] if "://" in peer else peer.split(":")[0] return self.is_local_ip(peer_ip) def is_local_ip(self, ip): """فحص إذا كان IP محلي""" return ( ip.startswith('192.168.') or ip.startswith('10.') or ip.startswith('172.') or ip in ['127.0.0.1', 'localhost', self.local_ip] ) def execute_task(self, func_name, *args, **kwargs): """تنفيذ المهمة إما محلياً أو عن بعد""" peer = self.choose_peer() if peer: logging.info(f"🛰️ إرسال مهمة {func_name} إلى {peer}") result = self.send(peer, func_name, *args, **kwargs) if "error" not in result: return result else: logging.warning(f"❌ فشل الإرسال إلى {peer}: {result['error']}") # التنفيذ المحلي كبديل logging.info(f"⚙️ تنفيذ محلي للمهمة {func_name}") return self.execute_locally(func_name, *args, **kwargs) def execute_locally(self, func_name, *args, **kwargs): """تنفيذ المهمة محلياً""" try: if hasattr(smart_tasks, func_name): func = getattr(smart_tasks, func_name) result = func(*args, **kwargs) return result else: return {"error": f"الدالة {func_name} غير موجودة"} except Exception as e: return {"error": str(e)} def start_monitoring(self): """بدء مراقبة الأقران""" def monitor_loop(): while self.running: try: available_peers = self.get_available_peers() logging.info(f"📊 المراقبة: {len(available_peers)} أقران متاحون") time.sleep(30) except Exception as e: logging.error(f"خطأ في المراقبة: {e}") time.sleep(60) monitor_thread = threading.Thread(target=monitor_loop, daemon=True) monitor_thread.start() def stop(self): """إيقاف موازن الحمل""" self.running = False # إنشاء نسخة عالمية load_balancer = SmartLoadBalancer() def send_task(func_name, *args, **kwargs): """وظيفة مساعدة لإرسال المهام""" return load_balancer.execute_task(func_name, *args, **kwargs) # التشغيل الرئيسي def main(): logging.info("🚀 بدء تشغيل موازن الحمل الذكي") # بدء المراقبة load_balancer.start_monitoring() try: while load_balancer.running: # تنفيذ مهام اختبارية result = load_balancer.execute_task("prime_calculation", 20000) if "error" in result: logging.error(f"❌ خطأ في التنفيذ: {result['error']}") else: primes_count = result.get('count', 0) logging.info(f"✅ تم تنفيذ المهمة - عدد الأعداد الأولية: {primes_count}") time.sleep(15) except KeyboardInterrupt: logging.info("🛑 إيقاف موازن الحمل...") load_balancer.stop() if __name__ == "__main__": main()