Spaces:
Paused
Paused
| import threading | |
| import queue | |
| import time | |
| import json | |
| from typing import Callable, Dict, List | |
| import socket | |
| from zeroconf import Zeroconf, ServiceBrowser, ServiceInfo | |
| import logging | |
| import requests # ✅ تأكد من استيراده | |
| logging.basicConfig(level=logging.INFO) | |
| class PeerRegistry: | |
| def __init__(self): | |
| self._peers = {} | |
| self._zeroconf = Zeroconf() | |
| self.local_node_id = socket.gethostname() | |
| def register_service(self, name: str, port: int, load: float = 0.0): | |
| service_info = ServiceInfo( | |
| "_tasknode._tcp.local.", | |
| f"{name}._tasknode._tcp.local.", | |
| addresses=[socket.inet_aton(self._get_local_ip())], | |
| port=port, | |
| properties={ | |
| b'load': str(load).encode(), # تأكد من أنها bytes | |
| b'node_id': self.local_node_id.encode() | |
| }, | |
| server=f"{name}.local." | |
| ) | |
| self._zeroconf.register_service(service_info) | |
| logging.info(f"✅ Service registered: {name} @ {self._get_local_ip()}:{port}") | |
| def discover_peers(self, timeout: int = 3) -> List[Dict]: | |
| class Listener: | |
| def __init__(self): | |
| self.peers = [] | |
| def add_service(self, zc, type_, name): | |
| info = zc.get_service_info(type_, name) | |
| if info: | |
| ip = socket.inet_ntoa(info.addresses[0]) | |
| peer_data = { | |
| 'ip': ip, | |
| 'port': info.port, | |
| 'load': float(info.properties.get(b'load', b'0')), | |
| 'node_id': info.properties.get(b'node_id', b'unknown').decode(), | |
| 'last_seen': time.time() | |
| } | |
| if peer_data not in self.peers: | |
| self.peers.append(peer_data) | |
| def update_service(self, zc, type_, name): | |
| self.add_service(zc, type_, name) | |
| def remove_service(self, zc, type_, name): | |
| pass # اختياري | |
| listener = Listener() | |
| ServiceBrowser(self._zeroconf, "_tasknode._tcp.local.", listener) | |
| time.sleep(timeout) | |
| return sorted(listener.peers, key=lambda x: x['load']) | |
| def _get_local_ip(self) -> str: | |
| s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| try: | |
| s.connect(('10.255.255.255', 1)) | |
| ip = s.getsockname()[0] | |
| except Exception: | |
| ip = '127.0.0.1' | |
| finally: | |
| s.close() | |
| return ip | |
| class DistributedExecutor: | |
| def __init__(self, shared_secret: str): | |
| self.peer_registry = PeerRegistry() | |
| self.shared_secret = shared_secret | |
| self.task_queue = queue.PriorityQueue() | |
| self.result_cache = {} | |
| self.available_peers = [] | |
| self._init_peer_discovery() | |
| def _init_peer_discovery(self): | |
| def discovery_loop(): | |
| while True: | |
| self.available_peers = self.peer_registry.discover_peers() | |
| logging.info(f"✅ Discovered peers: {self.available_peers}") | |
| time.sleep(10) | |
| threading.Thread(target=discovery_loop, daemon=True).start() | |
| def submit(self, task_func: Callable, *args, **kwargs): | |
| """إرسال مهمة جديدة للنظام""" | |
| task_id = f"{task_func.__name__}_{time.time()}" | |
| task = { | |
| 'task_id': task_id, | |
| 'function': task_func.__name__, | |
| 'args': args, | |
| 'kwargs': kwargs, | |
| 'sender_id': self.peer_registry.local_node_id | |
| } | |
| if self.available_peers: | |
| # ترتيب الأجهزة: LAN أولاً ثم WAN | |
| lan_peers = [p for p in self.available_peers if self._is_local_ip(p['ip'])] | |
| wan_peers = [p for p in self.available_peers if not self._is_local_ip(p['ip'])] | |
| # اختيار من LAN أولاً | |
| if lan_peers: | |
| peer = min(lan_peers, key=lambda x: x['load']) | |
| logging.info(f"✅ Sending task {task_id} to LAN peer {peer['node_id']}") | |
| else: | |
| # إذا لم تتوفر أجهزة محلية، استخدم WAN | |
| peer = min(wan_peers, key=lambda x: x['load']) | |
| logging.info(f"✅ Sending task {task_id} to WAN peer {peer['node_id']}") | |
| self._send_to_peer(peer, task) | |
| else: | |
| logging.warning("⚠️ لا توجد أجهزة متاحة - سيتم تنفيذ المهمة محلياً") | |
| def _is_local_ip(self, ip: str) -> bool: | |
| """فحص إذا كان IP في الشبكة المحلية""" | |
| return ( | |
| ip.startswith('192.168.') or | |
| ip.startswith('10.') or | |
| ip.startswith('172.') or | |
| ip == '127.0.0.1' | |
| ) | |
| def _send_to_peer(self, peer: Dict, task: Dict): | |
| try: | |
| url = f"http://{peer['ip']}:{peer['port']}/run" | |
| response = requests.post(url, json=task, timeout=10) | |
| response.raise_for_status() | |
| logging.info(f"✅ Response from peer: {response.text}") | |
| return response.json() | |
| except Exception as e: | |
| logging.error(f"❌ فشل إرسال المهمة لـ {peer['node_id']}: {e}") | |
| return None | |
| if __name__ == "__main__": | |
| executor = DistributedExecutor("my_secret_key") | |
| executor.peer_registry.register_service("node1", 7520, load=0.1) | |
| print("✅ نظام توزيع المهام جاهز...") | |
| # مثال لإرسال مهمة: | |
| def example_task(x): | |
| return x * x | |
| executor.submit(example_task, 5) | |