File size: 4,217 Bytes
1dd36e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# distributed_executor.py
import threading
import queue
import time
import json
from typing import Callable, Dict, List
import socket
from zeroconf import Zeroconf, ServiceBrowser, ServiceInfo
import logging

class PeerRegistry:
    def __init__(self):
        self._peers = {}
        self._zeroconf = Zeroconf()
        self.local_node_id = socket.gethostname()
    
    def register_service(self, name: str, port: int, load: float = 0.0):
        service_info = ServiceInfo(
            "_tasknode._tcp.local.",
            f"{name}._tasknode._tcp.local.",
            addresses=[socket.inet_aton(self._get_local_ip())],
            port=port,
            properties={'load': str(load), 'node_id': self.local_node_id},
            server=f"{name}.local."
        )
        self._zeroconf.register_service(service_info)
    
    def discover_peers(self, timeout: int = 3) -> List[Dict]:
        class Listener:
            def __init__(self):
                self.peers = []
            
            def add_service(self, zc, type_, name):
                info = zc.get_service_info(type_, name)
                if info:
                    ip = socket.inet_ntoa(info.addresses[0])
                    self.peers.append({
                        'ip': ip,
                        'port': info.port,
                        'load': float(info.properties[b'load']),
                        'node_id': info.properties[b'node_id'].decode(),
                        'last_seen': time.time()
                    })
        
        listener = Listener()
        browser = ServiceBrowser(self._zeroconf, "_tasknode._tcp.local.", listener)
        time.sleep(timeout)
        return sorted(listener.peers, key=lambda x: x['load'])
    
    def _get_local_ip(self) -> str:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        try:
            s.connect(('10.255.255.255', 1))
            ip = s.getsockname()[0]
        except:
            ip = '127.0.0.1'
        finally:
            s.close()
        return ip

class DistributedExecutor:
    def __init__(self, shared_secret: str):
        self.peer_registry = PeerRegistry()
        self.shared_secret = shared_secret
        self.task_queue = queue.PriorityQueue()
        self.result_cache = {}
        self._init_peer_discovery()
        logging.basicConfig(level=logging.INFO)
    
    def _init_peer_discovery(self):
        def discovery_loop():
            while True:
                self.available_peers = self.peer_registry.discover_peers()
                time.sleep(10)
        
        threading.Thread(target=discovery_loop, daemon=True).start()
    
    def submit(self, task_func: Callable, *args, **kwargs):
        """إرسال مهمة جديدة للنظام"""
        task_id = f"{task_func.__name__}_{time.time()}"
        
        # تجهيز المهمة
        task = {
            'task_id': task_id,
            'function': task_func.__name__,
            'args': args,
            'kwargs': kwargs,
            'sender_id': self.peer_registry.local_node_id
        }
        
        # إرسال المهمة (تبسيط الإرسال في هذا المثال)
        if self.available_peers:
            peer = min(self.available_peers, key=lambda x: x['load'])
            self._send_to_peer(peer, task)
        else:
            logging.warning("لا توجد أجهزة متاحة - سيتم تنفيذ المهمة محلياً")
    
    def _send_to_peer(self, peer: Dict, task: Dict):
        """إرسال مهمة إلى جهاز آخر"""
        try:
            url = f"http://{peer['ip']}:{peer['port']}/run"
            response = requests.post(
                url,
                json=task,
                timeout=10
            )
            response.raise_for_status()
            return response.json()
        except Exception as e:
            logging.error(f"فشل إرسال المهمة لـ {peer['node_id']}: {str(e)}")
            return None

# نموذج استخدام مبسط
if __name__ == "__main__":
    executor = DistributedExecutor("my_secret_key")
    executor.peer_registry.register_service("node1", 7520, load=0.1)
    print("نظام توزيع المهام جاهز...")