File size: 10,940 Bytes
4d94786
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
Давай! Вот старые версии:

```
def udp_discovery():
    DISCOVERY_INTERVAL = 30
    local_addresses = storage.get_config_value("local_addresses", [])
    msg_data = json.dumps({
        "id": my_id,
        "name": agent_name,
        "addresses": local_addresses
    }).encode("utf-8")

    # Создаём UDP сокеты для прослушки
    listen_sockets = []
    for port in local_ports:
        # IPv4
        try:
            sock4 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            sock4.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            sock4.bind(("", port))
            listen_sockets.append(sock4)
            print(f"[UDP Discovery] Listening IPv4 on *:{port}")
        except Exception as e:
            print(f"[UDP Discovery] IPv4 bind failed on port {port}: {e}")

        # IPv6
        try:
            sock6 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
            sock6.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            sock6.bind(("::", port))
            listen_sockets.append(sock6)
            print(f"[UDP Discovery] Listening IPv6 on [::]:{port}")
        except Exception as e:
            print(f"[UDP Discovery] IPv6 bind failed on port {port}: {e}")

    while True:
        # Приём сообщений
        rlist, _, _ = select.select(listen_sockets, [], [], 0.5)
        for sock in rlist:
            try:
                data, addr = sock.recvfrom(2048)
                msg = json.loads(data.decode("utf-8"))
                peer_id = msg.get("id")
                if peer_id == my_id:
                    continue
                name = msg.get("name", "unknown")
                addresses = msg.get("addresses", [])
                storage.add_or_update_peer(peer_id, name, addresses, "discovery", "online")
                print(f"[UDP Discovery] peer={peer_id} from {addr}")
            except Exception as e:
                print(f"[UDP Discovery] receive error: {e}")

        # Отправка broadcast/multicast
        for port in local_ports:
            # IPv4 broadcast
            for iface in netifaces.interfaces():
                addrs = netifaces.ifaddresses(iface).get(netifaces.AF_INET, [])
                for a in addrs:
                    if "broadcast" in a:
                        try:
                            sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
                            sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
                            sock.sendto(msg_data, (a["broadcast"], port))
                            sock.close()
                        except Exception:
                            continue
            # IPv6 multicast ff02::1
            for iface in netifaces.interfaces():
                ifaddrs = netifaces.ifaddresses(iface).get(netifaces.AF_INET6, [])
                for a in ifaddrs:
                    addr = a.get("addr")
                    if not addr:
                        continue
                    multicast_addr = f"ff02::1%{iface}" if addr.startswith("fe80:") else "ff02::1"
                    try:
                        sock6 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
                        sock6.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, socket.if_nametoindex(iface))
                        sock6.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 1)
                        sock6.sendto(msg_data, (multicast_addr, port))
                        sock6.close()
                    except Exception:
                        continue

        time.sleep(DISCOVERY_INTERVAL)

---

def tcp_peer_exchange():
    PEER_EXCHANGE_INTERVAL = 20  # для отладки
    while True:
        peers = storage.get_known_peers(my_id, limit=50)
        print(f"[PeerExchange] Checking {len(peers)} peers (raw DB)...")

        for peer in peers:
            peer_id = peer["id"] if isinstance(peer, dict) else peer[0]
            addresses_json = peer["addresses"] if isinstance(peer, dict) else peer[1]

            if peer_id == my_id:
                continue

            try:
                addr_list = json.loads(addresses_json)
            except Exception as e:
                print(f"[PeerExchange] JSON decode error for peer {peer_id}: {e}")
                addr_list = []

            for addr in addr_list:
                norm = storage.normalize_address(addr)
                if not norm:
                    continue
                proto, hostport = norm.split("://", 1)
                if proto not in ["tcp", "any"]:
                    continue
                host, port = storage.parse_hostport(hostport)
                if not host or not port:
                    continue

                print(f"[PeerExchange] Trying {peer_id} at {host}:{port} (proto={proto})")
                try:
                    # IPv6 link-local
                    if storage.is_ipv6(host) and host.startswith("fe80:"):
                        scope_id = storage.get_ipv6_scope(host)
                        if scope_id is None:
                            print(f"[PeerExchange] Skipping {host}, no scope_id")
                            continue
                        sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
                        sock.settimeout(3)
                        sock.connect((host, port, 0, scope_id))
                    else:
                        sock = socket.socket(socket.AF_INET6 if storage.is_ipv6(host) else socket.AF_INET,
                                             socket.SOCK_STREAM)
                        sock.settimeout(3)
                        sock.connect((host, port))

                    # LAN или Интернет
                    if storage.is_private(host):
                        send_addresses = all_addresses
                    else:
                        send_addresses = [a for a in all_addresses
                                          if is_public(stprage.parse_hostport(a.split("://", 1)[1])[0])]

                    handshake = {
                        "type": "PEER_EXCHANGE_REQUEST",
                        "id": my_id,
                        "name": agent_name,
                        "addresses": send_addresses,
                    }
                    sock.sendall(json.dumps(handshake).encode("utf-8"))

                    data = sock.recv(64 * 1024)
                    sock.close()

                    if not data:
                        print(f"[PeerExchange] No data from {host}:{port}")
                        continue

                    try:
                        peers_recv = json.loads(data.decode("utf-8"))
                        for p in peers_recv:
                            if p.get("id") and p["id"] != my_id:
                                storage.add_or_update_peer(
                                    p["id"], p.get("name", "unknown"), p.get("addresses", []),
                                    "peer_exchange", "online"
                                )
                        print(f"[PeerExchange] Received {len(peers_recv)} peers from {host}:{port}")
                    except Exception as e:
                        print(f"[PeerExchange] Decode error from {host}:{port} -> {e}")
                        continue

                    break
                except Exception as e:
                    print(f"[PeerExchange] Connection to {host}:{port} failed: {e}")
                    continue

        time.sleep(PEER_EXCHANGE_INTERVAL)

---

def tcp_listener():
    listen_sockets = []
    for port in local_ports:
        for family, addr_str in [(socket.AF_INET, ""), (socket.AF_INET6, "::")]:
            try:
                sock = socket.socket(family, socket.SOCK_STREAM)
                sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                sock.bind((addr_str, port))
                sock.listen(5)
                listen_sockets.append(sock)
                proto_str = "IPv6" if family == socket.AF_INET6 else "IPv4"
                print(f"[TCP Listener] Listening {proto_str} on {addr_str}:{port}")
            except Exception as e:
                print(f"[TCP Listener] {proto_str} bind failed on port {port}: {e}")

    while True:
        if not listen_sockets:
            time.sleep(1)
            continue

        rlist, _, _ = select.select(listen_sockets, [], [], 1)
        for s in rlist:
            try:
                conn, addr = s.accept()
                data = conn.recv(64 * 1024)
                if not data:
                    conn.close()
                    continue

                try:
                    msg = json.loads(data.decode("utf-8"))
                except Exception as e:
                    print(f"[TCP Listener] JSON decode error from {addr}: {e}")
                    conn.close()
                    continue

                if msg.get("type") == "PEER_EXCHANGE_REQUEST":
                    peer_id = msg.get("id") or f"did:hmp:{addr[0]}:{addr[1]}"
                    peer_name = msg.get("name", "unknown")
                    peer_addrs = msg.get("addresses", [])

                    storage.add_or_update_peer(peer_id, peer_name, peer_addrs,
                                               source="incoming", status="online")
                    print(f"[TCP Listener] Handshake from {peer_id} ({addr})")

                    # LAN или Интернет
                    is_lan = storage.is_private(addr[0])

                    peers_list = []
                    for peer in storage.get_known_peers(my_id, limit=50):
                        peer_id = peer["id"]
                        try:
                            addresses = json.loads(peer["addresses"])
                        except:
                            addresses = []

                        updated_addresses = []
                        for a in addresses:
                            proto, hostport = a.split("://")
                            host, port = storage.parse_hostport(hostport)

                            # Фильтруем по LAN/Internet
                            if not is_lan and not is_public(host):
                                continue

                            # IPv6 link-local
                            if storage.is_ipv6(host) and host.startswith("fe80:"):
                                scope_id = storage.get_ipv6_scope(host)
                                if scope_id:
                                    host = f"{host}%{scope_id}"

                            updated_addresses.append(f"{proto}://{host}:{port}")

                        peers_list.append({"id": peer_id, "addresses": updated_addresses})

                    conn.sendall(json.dumps(peers_list).encode("utf-8"))

                conn.close()
            except Exception as e:
                print(f"[TCP Listener] Connection handling error: {e}")
```