File size: 5,690 Bytes
b458825
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import threading
import queue
import time
import json
from typing import Callable, Dict, List
import socket
from zeroconf import Zeroconf, ServiceBrowser, ServiceInfo
import logging
import requests  # ✅ تأكد من استيراده

logging.basicConfig(level=logging.INFO)

class PeerRegistry:
    def __init__(self):
        self._peers = {}
        self._zeroconf = Zeroconf()
        self.local_node_id = socket.gethostname()

    def register_service(self, name: str, port: int, load: float = 0.0):
        service_info = ServiceInfo(
            "_tasknode._tcp.local.",
            f"{name}._tasknode._tcp.local.",
            addresses=[socket.inet_aton(self._get_local_ip())],
            port=port,
            properties={
                b'load': str(load).encode(),   # تأكد من أنها bytes
                b'node_id': self.local_node_id.encode()
            },
            server=f"{name}.local."
        )
        self._zeroconf.register_service(service_info)
        logging.info(f"✅ Service registered: {name} @ {self._get_local_ip()}:{port}")

    def discover_peers(self, timeout: int = 3) -> List[Dict]:
        class Listener:
            def __init__(self):
                self.peers = []

            def add_service(self, zc, type_, name):
                info = zc.get_service_info(type_, name)
                if info:
                    ip = socket.inet_ntoa(info.addresses[0])
                    peer_data = {
                        'ip': ip,
                        'port': info.port,
                        'load': float(info.properties.get(b'load', b'0')),
                        'node_id': info.properties.get(b'node_id', b'unknown').decode(),
                        'last_seen': time.time()
                    }
                    if peer_data not in self.peers:
                        self.peers.append(peer_data)

            def update_service(self, zc, type_, name):
                self.add_service(zc, type_, name)

            def remove_service(self, zc, type_, name):
                pass  # اختياري

        listener = Listener()
        ServiceBrowser(self._zeroconf, "_tasknode._tcp.local.", listener)
        time.sleep(timeout)
        return sorted(listener.peers, key=lambda x: x['load'])

    def _get_local_ip(self) -> str:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        try:
            s.connect(('10.255.255.255', 1))
            ip = s.getsockname()[0]
        except Exception:
            ip = '127.0.0.1'
        finally:
            s.close()
        return ip

class DistributedExecutor:
    def __init__(self, shared_secret: str):
        self.peer_registry = PeerRegistry()
        self.shared_secret = shared_secret
        self.task_queue = queue.PriorityQueue()
        self.result_cache = {}
        self.available_peers = []
        self._init_peer_discovery()

    def _init_peer_discovery(self):
        def discovery_loop():
            while True:
                self.available_peers = self.peer_registry.discover_peers()
                logging.info(f"✅ Discovered peers: {self.available_peers}")
                time.sleep(10)

        threading.Thread(target=discovery_loop, daemon=True).start()

    def submit(self, task_func: Callable, *args, **kwargs):
        """إرسال مهمة جديدة للنظام"""
        task_id = f"{task_func.__name__}_{time.time()}"

        task = {
            'task_id': task_id,
            'function': task_func.__name__,
            'args': args,
            'kwargs': kwargs,
            'sender_id': self.peer_registry.local_node_id
        }

        if self.available_peers:
            # ترتيب الأجهزة: LAN أولاً ثم WAN
            lan_peers = [p for p in self.available_peers if self._is_local_ip(p['ip'])]
            wan_peers = [p for p in self.available_peers if not self._is_local_ip(p['ip'])]
            
            # اختيار من LAN أولاً
            if lan_peers:
                peer = min(lan_peers, key=lambda x: x['load'])
                logging.info(f"✅ Sending task {task_id} to LAN peer {peer['node_id']}")
            else:
                # إذا لم تتوفر أجهزة محلية، استخدم WAN
                peer = min(wan_peers, key=lambda x: x['load'])
                logging.info(f"✅ Sending task {task_id} to WAN peer {peer['node_id']}")
            
            self._send_to_peer(peer, task)
        else:
            logging.warning("⚠️ لا توجد أجهزة متاحة - سيتم تنفيذ المهمة محلياً")

    def _is_local_ip(self, ip: str) -> bool:
        """فحص إذا كان IP في الشبكة المحلية"""
        return (
            ip.startswith('192.168.') or 
            ip.startswith('10.') or 
            ip.startswith('172.') or
            ip == '127.0.0.1'
        )

    def _send_to_peer(self, peer: Dict, task: Dict):
        try:
            url = f"http://{peer['ip']}:{peer['port']}/run"
            response = requests.post(url, json=task, timeout=10)
            response.raise_for_status()
            logging.info(f"✅ Response from peer: {response.text}")
            return response.json()
        except Exception as e:
            logging.error(f"❌ فشل إرسال المهمة لـ {peer['node_id']}: {e}")
            return None

if __name__ == "__main__":
    executor = DistributedExecutor("my_secret_key")
    executor.peer_registry.register_service("node1", 7520, load=0.1)
    print("✅ نظام توزيع المهام جاهز...")

    # مثال لإرسال مهمة:
    def example_task(x):
        return x * x

    executor.submit(example_task, 5)