| |
| import threading |
| import queue |
| import time |
| import json |
| from typing import Callable, Dict, List |
| import socket |
| from zeroconf import Zeroconf, ServiceBrowser, ServiceInfo |
| import logging |
|
|
| class PeerRegistry: |
| def __init__(self): |
| self._peers = {} |
| self._zeroconf = Zeroconf() |
| self.local_node_id = socket.gethostname() |
| |
| def register_service(self, name: str, port: int, load: float = 0.0): |
| service_info = ServiceInfo( |
| "_tasknode._tcp.local.", |
| f"{name}._tasknode._tcp.local.", |
| addresses=[socket.inet_aton(self._get_local_ip())], |
| port=port, |
| properties={'load': str(load), 'node_id': self.local_node_id}, |
| server=f"{name}.local." |
| ) |
| self._zeroconf.register_service(service_info) |
| |
| def discover_peers(self, timeout: int = 3) -> List[Dict]: |
| class Listener: |
| def __init__(self): |
| self.peers = [] |
| |
| def add_service(self, zc, type_, name): |
| info = zc.get_service_info(type_, name) |
| if info: |
| ip = socket.inet_ntoa(info.addresses[0]) |
| self.peers.append({ |
| 'ip': ip, |
| 'port': info.port, |
| 'load': float(info.properties[b'load']), |
| 'node_id': info.properties[b'node_id'].decode(), |
| 'last_seen': time.time() |
| }) |
| |
| listener = Listener() |
| browser = ServiceBrowser(self._zeroconf, "_tasknode._tcp.local.", listener) |
| time.sleep(timeout) |
| return sorted(listener.peers, key=lambda x: x['load']) |
| |
| def _get_local_ip(self) -> str: |
| s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| try: |
| s.connect(('10.255.255.255', 1)) |
| ip = s.getsockname()[0] |
| except: |
| ip = '127.0.0.1' |
| finally: |
| s.close() |
| return ip |
|
|
| class DistributedExecutor: |
| def __init__(self, shared_secret: str): |
| self.peer_registry = PeerRegistry() |
| self.shared_secret = shared_secret |
| self.task_queue = queue.PriorityQueue() |
| self.result_cache = {} |
| self._init_peer_discovery() |
| logging.basicConfig(level=logging.INFO) |
| |
| def _init_peer_discovery(self): |
| def discovery_loop(): |
| while True: |
| self.available_peers = self.peer_registry.discover_peers() |
| time.sleep(10) |
| |
| threading.Thread(target=discovery_loop, daemon=True).start() |
| |
| def submit(self, task_func: Callable, *args, **kwargs): |
| """إرسال مهمة جديدة للنظام""" |
| task_id = f"{task_func.__name__}_{time.time()}" |
| |
| |
| task = { |
| 'task_id': task_id, |
| 'function': task_func.__name__, |
| 'args': args, |
| 'kwargs': kwargs, |
| 'sender_id': self.peer_registry.local_node_id |
| } |
| |
| |
| if self.available_peers: |
| peer = min(self.available_peers, key=lambda x: x['load']) |
| self._send_to_peer(peer, task) |
| else: |
| logging.warning("لا توجد أجهزة متاحة - سيتم تنفيذ المهمة محلياً") |
| |
| def _send_to_peer(self, peer: Dict, task: Dict): |
| """إرسال مهمة إلى جهاز آخر""" |
| try: |
| url = f"http://{peer['ip']}:{peer['port']}/run" |
| response = requests.post( |
| url, |
| json=task, |
| timeout=10 |
| ) |
| response.raise_for_status() |
| return response.json() |
| except Exception as e: |
| logging.error(f"فشل إرسال المهمة لـ {peer['node_id']}: {str(e)}") |
| return None |
|
|
| |
| if __name__ == "__main__": |
| executor = DistributedExecutor("my_secret_key") |
| executor.peer_registry.register_service("node1", 7520, load=0.1) |
| print("نظام توزيع المهام جاهز...") |
|
|