Spaces:
Running
Running
File size: 10,198 Bytes
042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 042d8bf 84c65b6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 |
# load_balancer.py
import peer_discovery
import requests
import time
import smart_tasks
import psutil
import socket
import threading
import logging
from datetime import datetime
from peer_discovery import PORT
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class SmartLoadBalancer:
def __init__(self):
self.peer_stats = {}
self.local_hostname = socket.gethostname()
self.local_ip = self.get_local_ip()
self.running = True
def get_local_ip(self):
"""الحصول على IP المحلي"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except:
return "127.0.0.1"
def send(self, peer, func, *args, **kwargs):
"""إرسال مهمة إلى نظير"""
try:
url = f"{peer}/run" if not peer.endswith("/run") else peer
payload = {
"func": func,
"args": list(args),
"kwargs": kwargs
}
logging.info(f"📤 إرسال مهمة {func} إلى {peer}")
response = requests.post(url, json=payload, timeout=15)
if response.status_code == 200:
result = response.json()
logging.info(f"✅ تم استلام النتيجة من {peer}")
return result
else:
logging.error(f"❌ خطأ من {peer}: {response.status_code}")
return {"error": f"HTTP {response.status_code}"}
except requests.exceptions.Timeout:
logging.error(f"⏰ انتهت المهلة مع {peer}")
return {"error": "Request timeout"}
except requests.exceptions.ConnectionError:
logging.error(f"🔌 تعذر الاتصال بـ {peer}")
return {"error": "Connection failed"}
except Exception as e:
logging.error(f"❌ خطأ غير متوقع مع {peer}: {str(e)}")
return {"error": str(e)}
def choose_peer(self):
"""اختيار أفضل جهاز مع خوارزمية متطورة"""
available_peers = self.get_available_peers()
if not available_peers:
logging.info("🔍 لم يتم العثور على أقران متاحة")
return None
# تقييم كل نظير
peer_scores = []
for peer in available_peers:
score = self.evaluate_peer(peer)
if score > 0:
peer_scores.append((peer, score))
if not peer_scores:
return None
# اختيار النظير بأعلى درجة
best_peer = max(peer_scores, key=lambda x: x[1])[0]
logging.info(f"🏆 أفضل نظير مختار: {best_peer}")
return best_peer
def get_available_peers(self):
"""الحصول على قائمة الأقران المتاحة"""
peers = []
# اكتشاف الأقران المحليين
local_peers = self.discover_local_peers()
peers.extend(local_peers)
# إضافة الأقران من peer_discovery
for peer in list(peer_discovery.PEERS):
if peer not in peers:
peers.append(peer)
# إزالة التكرارات والنظير المحلي
unique_peers = []
for peer in peers:
if self.is_self_peer(peer):
continue
if peer not in unique_peers:
unique_peers.append(peer)
logging.info(f"👥 الأقران المتاحون: {len(unique_peers)}")
return unique_peers
def discover_local_peers(self):
"""اكتشاف الأقران على الشبكة المحلية"""
local_peers = []
base_ip = ".".join(self.local_ip.split(".")[:-1]) # الحصول على 192.168.1
# فحص نطاق IPs المحلي
for i in range(1, 255):
if i == int(self.local_ip.split(".")[-1]):
continue # تخطي الذات
ip = f"{base_ip}.{i}"
peer_url = f"http://{ip}:{PORT}"
# فحص سريع للتوصيل
if self.check_peer_availability(peer_url):
local_peers.append(peer_url)
return local_peers
def check_peer_availability(self, peer_url):
"""فحص توفر النظير"""
try:
response = requests.get(f"{peer_url}/status", timeout=2)
return response.status_code == 200
except:
return False
def is_self_peer(self, peer_url):
"""فحص إذا كان النظير هو الجهاز الحالي"""
if f"://{self.local_ip}:" in peer_url:
return True
if f"://localhost:" in peer_url:
return True
if f"://127.0.0.1:" in peer_url:
return True
return False
def evaluate_peer(self, peer):
"""تقييم أداء النظير"""
try:
# الحصول على حالة النظام
status_url = peer.replace("/run", "/status") if "/run" in peer else f"{peer}/status"
response = requests.get(status_url, timeout=3)
if response.status_code == 200:
status = response.json()
# حساب الدرجة
score = 100
# خصم حسب استخدام CPU
cpu_usage = status.get("cpu_usage", 50)
score -= cpu_usage * 0.5
# خصم حسب استخدام الذاكرة
memory_usage = status.get("memory_usage", 50)
score -= memory_usage * 0.3
# مكافأة للاتصالات المحلية
if self.is_local_peer(peer):
score += 20
# خصم للاتصال البطيء
response_time = response.elapsed.total_seconds()
score -= response_time * 10
return max(0, score)
except Exception as e:
logging.debug(f"تعذر تقييم {peer}: {str(e)}")
return 0
def is_local_peer(self, peer):
"""فحص إذا كان النظير محلي"""
peer_ip = peer.split("://")[1].split(":")[0] if "://" in peer else peer.split(":")[0]
return self.is_local_ip(peer_ip)
def is_local_ip(self, ip):
"""فحص إذا كان IP محلي"""
return (
ip.startswith('192.168.') or
ip.startswith('10.') or
ip.startswith('172.') or
ip in ['127.0.0.1', 'localhost', self.local_ip]
)
def execute_task(self, func_name, *args, **kwargs):
"""تنفيذ المهمة إما محلياً أو عن بعد"""
peer = self.choose_peer()
if peer:
logging.info(f"🛰️ إرسال مهمة {func_name} إلى {peer}")
result = self.send(peer, func_name, *args, **kwargs)
if "error" not in result:
return result
else:
logging.warning(f"❌ فشل الإرسال إلى {peer}: {result['error']}")
# التنفيذ المحلي كبديل
logging.info(f"⚙️ تنفيذ محلي للمهمة {func_name}")
return self.execute_locally(func_name, *args, **kwargs)
def execute_locally(self, func_name, *args, **kwargs):
"""تنفيذ المهمة محلياً"""
try:
if hasattr(smart_tasks, func_name):
func = getattr(smart_tasks, func_name)
result = func(*args, **kwargs)
return result
else:
return {"error": f"الدالة {func_name} غير موجودة"}
except Exception as e:
return {"error": str(e)}
def start_monitoring(self):
"""بدء مراقبة الأقران"""
def monitor_loop():
while self.running:
try:
available_peers = self.get_available_peers()
logging.info(f"📊 المراقبة: {len(available_peers)} أقران متاحون")
time.sleep(30)
except Exception as e:
logging.error(f"خطأ في المراقبة: {e}")
time.sleep(60)
monitor_thread = threading.Thread(target=monitor_loop, daemon=True)
monitor_thread.start()
def stop(self):
"""إيقاف موازن الحمل"""
self.running = False
# إنشاء نسخة عالمية
load_balancer = SmartLoadBalancer()
def send_task(func_name, *args, **kwargs):
"""وظيفة مساعدة لإرسال المهام"""
return load_balancer.execute_task(func_name, *args, **kwargs)
# التشغيل الرئيسي
def main():
logging.info("🚀 بدء تشغيل موازن الحمل الذكي")
# بدء المراقبة
load_balancer.start_monitoring()
try:
while load_balancer.running:
# تنفيذ مهام اختبارية
result = load_balancer.execute_task("prime_calculation", 20000)
if "error" in result:
logging.error(f"❌ خطأ في التنفيذ: {result['error']}")
else:
primes_count = result.get('count', 0)
logging.info(f"✅ تم تنفيذ المهمة - عدد الأعداد الأولية: {primes_count}")
time.sleep(15)
except KeyboardInterrupt:
logging.info("🛑 إيقاف موازن الحمل...")
load_balancer.stop()
if __name__ == "__main__":
main()
|