Spaces:
Running
Running
| # load_balancer.py | |
| import peer_discovery | |
| import requests | |
| import time | |
| import smart_tasks | |
| import psutil | |
| import socket | |
| import threading | |
| import logging | |
| from datetime import datetime | |
| from peer_discovery import PORT | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| class SmartLoadBalancer: | |
| def __init__(self): | |
| self.peer_stats = {} | |
| self.local_hostname = socket.gethostname() | |
| self.local_ip = self.get_local_ip() | |
| self.running = True | |
| def get_local_ip(self): | |
| """الحصول على IP المحلي""" | |
| try: | |
| s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| s.connect(("8.8.8.8", 80)) | |
| ip = s.getsockname()[0] | |
| s.close() | |
| return ip | |
| except: | |
| return "127.0.0.1" | |
| def send(self, peer, func, *args, **kwargs): | |
| """إرسال مهمة إلى نظير""" | |
| try: | |
| url = f"{peer}/run" if not peer.endswith("/run") else peer | |
| payload = { | |
| "func": func, | |
| "args": list(args), | |
| "kwargs": kwargs | |
| } | |
| logging.info(f"📤 إرسال مهمة {func} إلى {peer}") | |
| response = requests.post(url, json=payload, timeout=15) | |
| if response.status_code == 200: | |
| result = response.json() | |
| logging.info(f"✅ تم استلام النتيجة من {peer}") | |
| return result | |
| else: | |
| logging.error(f"❌ خطأ من {peer}: {response.status_code}") | |
| return {"error": f"HTTP {response.status_code}"} | |
| except requests.exceptions.Timeout: | |
| logging.error(f"⏰ انتهت المهلة مع {peer}") | |
| return {"error": "Request timeout"} | |
| except requests.exceptions.ConnectionError: | |
| logging.error(f"🔌 تعذر الاتصال بـ {peer}") | |
| return {"error": "Connection failed"} | |
| except Exception as e: | |
| logging.error(f"❌ خطأ غير متوقع مع {peer}: {str(e)}") | |
| return {"error": str(e)} | |
| def choose_peer(self): | |
| """اختيار أفضل جهاز مع خوارزمية متطورة""" | |
| available_peers = self.get_available_peers() | |
| if not available_peers: | |
| logging.info("🔍 لم يتم العثور على أقران متاحة") | |
| return None | |
| # تقييم كل نظير | |
| peer_scores = [] | |
| for peer in available_peers: | |
| score = self.evaluate_peer(peer) | |
| if score > 0: | |
| peer_scores.append((peer, score)) | |
| if not peer_scores: | |
| return None | |
| # اختيار النظير بأعلى درجة | |
| best_peer = max(peer_scores, key=lambda x: x[1])[0] | |
| logging.info(f"🏆 أفضل نظير مختار: {best_peer}") | |
| return best_peer | |
| def get_available_peers(self): | |
| """الحصول على قائمة الأقران المتاحة""" | |
| peers = [] | |
| # اكتشاف الأقران المحليين | |
| local_peers = self.discover_local_peers() | |
| peers.extend(local_peers) | |
| # إضافة الأقران من peer_discovery | |
| for peer in list(peer_discovery.PEERS): | |
| if peer not in peers: | |
| peers.append(peer) | |
| # إزالة التكرارات والنظير المحلي | |
| unique_peers = [] | |
| for peer in peers: | |
| if self.is_self_peer(peer): | |
| continue | |
| if peer not in unique_peers: | |
| unique_peers.append(peer) | |
| logging.info(f"👥 الأقران المتاحون: {len(unique_peers)}") | |
| return unique_peers | |
| def discover_local_peers(self): | |
| """اكتشاف الأقران على الشبكة المحلية""" | |
| local_peers = [] | |
| base_ip = ".".join(self.local_ip.split(".")[:-1]) # الحصول على 192.168.1 | |
| # فحص نطاق IPs المحلي | |
| for i in range(1, 255): | |
| if i == int(self.local_ip.split(".")[-1]): | |
| continue # تخطي الذات | |
| ip = f"{base_ip}.{i}" | |
| peer_url = f"http://{ip}:{PORT}" | |
| # فحص سريع للتوصيل | |
| if self.check_peer_availability(peer_url): | |
| local_peers.append(peer_url) | |
| return local_peers | |
| def check_peer_availability(self, peer_url): | |
| """فحص توفر النظير""" | |
| try: | |
| response = requests.get(f"{peer_url}/status", timeout=2) | |
| return response.status_code == 200 | |
| except: | |
| return False | |
| def is_self_peer(self, peer_url): | |
| """فحص إذا كان النظير هو الجهاز الحالي""" | |
| if f"://{self.local_ip}:" in peer_url: | |
| return True | |
| if f"://localhost:" in peer_url: | |
| return True | |
| if f"://127.0.0.1:" in peer_url: | |
| return True | |
| return False | |
| def evaluate_peer(self, peer): | |
| """تقييم أداء النظير""" | |
| try: | |
| # الحصول على حالة النظام | |
| status_url = peer.replace("/run", "/status") if "/run" in peer else f"{peer}/status" | |
| response = requests.get(status_url, timeout=3) | |
| if response.status_code == 200: | |
| status = response.json() | |
| # حساب الدرجة | |
| score = 100 | |
| # خصم حسب استخدام CPU | |
| cpu_usage = status.get("cpu_usage", 50) | |
| score -= cpu_usage * 0.5 | |
| # خصم حسب استخدام الذاكرة | |
| memory_usage = status.get("memory_usage", 50) | |
| score -= memory_usage * 0.3 | |
| # مكافأة للاتصالات المحلية | |
| if self.is_local_peer(peer): | |
| score += 20 | |
| # خصم للاتصال البطيء | |
| response_time = response.elapsed.total_seconds() | |
| score -= response_time * 10 | |
| return max(0, score) | |
| except Exception as e: | |
| logging.debug(f"تعذر تقييم {peer}: {str(e)}") | |
| return 0 | |
| def is_local_peer(self, peer): | |
| """فحص إذا كان النظير محلي""" | |
| peer_ip = peer.split("://")[1].split(":")[0] if "://" in peer else peer.split(":")[0] | |
| return self.is_local_ip(peer_ip) | |
| def is_local_ip(self, ip): | |
| """فحص إذا كان IP محلي""" | |
| return ( | |
| ip.startswith('192.168.') or | |
| ip.startswith('10.') or | |
| ip.startswith('172.') or | |
| ip in ['127.0.0.1', 'localhost', self.local_ip] | |
| ) | |
| def execute_task(self, func_name, *args, **kwargs): | |
| """تنفيذ المهمة إما محلياً أو عن بعد""" | |
| peer = self.choose_peer() | |
| if peer: | |
| logging.info(f"🛰️ إرسال مهمة {func_name} إلى {peer}") | |
| result = self.send(peer, func_name, *args, **kwargs) | |
| if "error" not in result: | |
| return result | |
| else: | |
| logging.warning(f"❌ فشل الإرسال إلى {peer}: {result['error']}") | |
| # التنفيذ المحلي كبديل | |
| logging.info(f"⚙️ تنفيذ محلي للمهمة {func_name}") | |
| return self.execute_locally(func_name, *args, **kwargs) | |
| def execute_locally(self, func_name, *args, **kwargs): | |
| """تنفيذ المهمة محلياً""" | |
| try: | |
| if hasattr(smart_tasks, func_name): | |
| func = getattr(smart_tasks, func_name) | |
| result = func(*args, **kwargs) | |
| return result | |
| else: | |
| return {"error": f"الدالة {func_name} غير موجودة"} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def start_monitoring(self): | |
| """بدء مراقبة الأقران""" | |
| def monitor_loop(): | |
| while self.running: | |
| try: | |
| available_peers = self.get_available_peers() | |
| logging.info(f"📊 المراقبة: {len(available_peers)} أقران متاحون") | |
| time.sleep(30) | |
| except Exception as e: | |
| logging.error(f"خطأ في المراقبة: {e}") | |
| time.sleep(60) | |
| monitor_thread = threading.Thread(target=monitor_loop, daemon=True) | |
| monitor_thread.start() | |
| def stop(self): | |
| """إيقاف موازن الحمل""" | |
| self.running = False | |
| # إنشاء نسخة عالمية | |
| load_balancer = SmartLoadBalancer() | |
| def send_task(func_name, *args, **kwargs): | |
| """وظيفة مساعدة لإرسال المهام""" | |
| return load_balancer.execute_task(func_name, *args, **kwargs) | |
| # التشغيل الرئيسي | |
| def main(): | |
| logging.info("🚀 بدء تشغيل موازن الحمل الذكي") | |
| # بدء المراقبة | |
| load_balancer.start_monitoring() | |
| try: | |
| while load_balancer.running: | |
| # تنفيذ مهام اختبارية | |
| result = load_balancer.execute_task("prime_calculation", 20000) | |
| if "error" in result: | |
| logging.error(f"❌ خطأ في التنفيذ: {result['error']}") | |
| else: | |
| primes_count = result.get('count', 0) | |
| logging.info(f"✅ تم تنفيذ المهمة - عدد الأعداد الأولية: {primes_count}") | |
| time.sleep(15) | |
| except KeyboardInterrupt: | |
| logging.info("🛑 إيقاف موازن الحمل...") | |
| load_balancer.stop() | |
| if __name__ == "__main__": | |
| main() | |