File size: 18,381 Bytes
3f23a1b
 
 
 
 
 
 
 
 
 
 
 
 
 
2286c14
3f23a1b
 
 
 
 
2286c14
 
3f23a1b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6580e1b
3f23a1b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6580e1b
 
 
 
 
 
 
 
3f23a1b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
#!/usr/bin/env python3
"""
虫群v8 — 集成核心 (Swarm Integration Core)

三层架构统一入口:
1. 参数化记忆模型 — 数据存参数,模型即数据库
2. 聚合协议 — 临时服务器,按需组网
3. 经济系统 — 算力货币,好友免费,虫皇税收

核心流程:
  用户交互 → 记忆即时写入 → 组建临时服务器 → 调度推理 → 计费 → 结果聚合
"""

import hashlib
import logging
import time
import threading
from datetime import datetime
from typing import Dict, List, Optional

logger = logging.getLogger(__name__)

from core.parametric_memory import ParametricMemoryModel, MemoryEncoder
from core.aggregation_protocol.protocol import AggregationProtocol
from core.aggregation_protocol.types import (
    NodeInfo, NodeCapability, NodeRole, PermissionLevel, AggregationStrategy,
)
from core.aggregation_protocol.economy import SwarmEconomy


class SwarmNode:
    """
    虫群节点 — 个人端完整实例

    每个用户运行一个SwarmNode,包含:
    - 参数化记忆模型:存储个人记忆
    - 聚合协议客户端:加入网络、组建临时服务器
    - 经济账户:算力交易

    这是"虫后"的完整实现
    """

    def __init__(
        self,
        node_id: str,
        name: str,
        permission_level: PermissionLevel = PermissionLevel.QUEEN,
        model_config: str = "tiny",
        lora_rank: int = 4,
        write_mode: str = "instant",
        initial_balance: float = 100.0,
    ):
        self.node_id = node_id
        self.name = name
        self.permission_level = permission_level

        # 1. 参数化记忆模型
        self.memory = ParametricMemoryModel(
            model_config=model_config,
            lora_rank=lora_rank,
            write_mode=write_mode,
            accumulate_steps=5,
            micro_epochs=3,
        )

        # 2. 聚合协议
        cap = NodeCapability(model_types=["memory", "chat"], compute_score=1.0, specializations=["memory", "chat"])
        role = NodeRole.HIVE if permission_level == PermissionLevel.OVERMIND else NodeRole.QUEEN
        self.protocol = AggregationProtocol(
            node_id=node_id,
            role=role,
            name=name,
            permission_level=permission_level,
            capabilities=["memory", "chat"],
            compute_score=cap.compute_score,
        )

        # 3. 经济系统
        self.economy = SwarmEconomy()
        self.economy.register_node(node_id, initial_balance=initial_balance)

        # 4. 推理服务
        from core.inference_service import InferenceService, DistributedInferenceBridge
        self.inference = InferenceService(self.memory, node_id)
        self.inference_bridge = DistributedInferenceBridge()
        self.inference_bridge.register_service(node_id, self.inference)

        # 5. API推理 + MOA聚合
        from core.api_inference import APIInferenceManager
        from core.moa_aggregator import MOAAggregator, AggregationStrategy, ModelAnswer
        self.api_manager = APIInferenceManager.from_env()
        self.moa = MOAAggregator(AggregationStrategy.ADAPTIVE)
        self._api_models_available = len(self.api_manager.models) > 0

        # 统计
        self.stats = {
            "memories_stored": 0,
            "queries_processed": 0,
            "remote_calls_made": 0,
            "remote_calls_received": 0,
            "total_cost_cc": 0.0,
            "total_earned_cc": 0.0,
        }

    # ============================================================
    # 生命周期
    # ============================================================

    def start(self):
        """启动节点"""
        self.protocol.start()
        print(f"[SwarmNode] {self.name} 启动完成")

    def stop(self):
        """停止节点"""
        self.memory.save()
        print(f"[SwarmNode] {self.name} 已停止")

    # ============================================================
    # 核心:交互处理
    # ============================================================

    def chat(self, query: str, user_id: str = "default") -> Dict:
        """
        统一交互入口

        流程:
        1. 记忆即时写入(后台)
        2. 本地记忆检索
        3. 如果本地不够,组建临时服务器调用远程
        4. 计费
        5. 返回结果
        """
        start_time = time.time()
        self.stats["queries_processed"] += 1

        # Step 1: 本地记忆检索
        local_result = self.memory.recall(query, max_tokens=64, temperature=0.3)
        local_confidence = local_result.get("confidence", 0.0)

        # Step 2: 本地置信度足够则直接返回
        if local_confidence >= 0.7 and len(local_result.get("response", "")) > 5:
            elapsed = time.time() - start_time
            return {
                "response": local_result["response"],
                "source": "local_memory",
                "confidence": local_confidence,
                "latency_ms": elapsed * 1000,
                "cost_cc": 0.0,
                "node_id": self.node_id,
            }

        # Step 3: 本地不够,尝试远程调用
        remote_result = None
        cost = 0.0

        try:
            online_nodes = self.protocol.discover_nodes()
            available = [n for n in online_nodes if n["node_id"] != self.node_id]

            if available:
                # 提交聚合任务
                task_result = self.protocol.submit_task(
                    query=query,
                    min_nodes=1,
                    max_nodes=min(3, len(available)),
                    strategy=AggregationStrategy.ADAPTIVE_MIX,
                    timeout_sec=15,
                )

                if task_result.get("status") == "completed":
                    remote_result = {
                        "response": task_result.get("final_response", ""),
                        "confidence": task_result.get("confidence", 0.5),
                        "source": "remote_aggregation",
                        "members": task_result.get("members", []),
                    }

                    # 计费
                    members = task_result.get("members", [])
                    for member_id in members:
                        fee = self.economy.call_compute(
                            caller_id=self.node_id,
                            provider_id=member_id,
                            compute_units=100,
                        )
                        cost += fee

                    self.stats["remote_calls_made"] += len(members)
                    self.stats["total_cost_cc"] += cost
        except Exception as e:
            print(f"[SwarmNode] 远程调用失败: {e}")

        # Step 4: 合并结果
        if remote_result and remote_result.get("confidence", 0) > local_confidence:
            final_response = remote_result["response"]
            final_confidence = remote_result["confidence"]
            source = "remote_aggregation"
        else:
            final_response = local_result.get("response", "")
            final_confidence = local_confidence
            source = "local_memory"

        elapsed = time.time() - start_time

        return {
            "response": final_response,
            "source": source,
            "confidence": final_confidence,
            "latency_ms": elapsed * 1000,
            "cost_cc": cost,
            "node_id": self.node_id,
        }

    def smart_query(self, query: str, use_api: bool = True, 
                    use_moa: bool = True, max_api_models: int = 3) -> Dict:
        """
        智能查询:本地记忆 + API推理 + MOA聚合
        
        流程:
        1. 本地参数化记忆检索(0ms网络延迟)
        2. API多模型推理(智谱GLM/NIM)
        3. MOA聚合所有回答 → 最优答案
        """
        start_time = time.time()
        self.stats["queries_processed"] += 1
        
        from core.moa_aggregator import ModelAnswer, QualityScorer
        
        all_answers = []
        
        # Step 1: 本地记忆检索
        local_result = self.memory.recall(query, max_tokens=128, temperature=0.5)
        local_resp = local_result.get("response", "")
        local_conf = local_result.get("confidence", 0.0)
        
        if local_resp and len(local_resp.strip()) > 3:
            all_answers.append(ModelAnswer(
                answer=local_resp, model="local_memory",
                provider="local", confidence=local_conf,
                latency_ms=0, source="memory"
            ))
            # 本地置信度够高且无API需求时直接返回
            if local_conf >= 0.8 and not use_api:
                elapsed = time.time() - start_time
                return {
                    "response": local_resp, "source": "local_memory",
                    "confidence": local_conf, "latency_ms": elapsed * 1000,
                    "cost_cc": 0.0, "contributors": ["local_memory"],
                }
        
        # Step 2: API多模型推理
        api_answers = []
        if use_api and self._api_models_available:
            # 用本地记忆作为上下文提示
            context = ""
            if local_resp:
                context = f"已知信息:{local_resp}\n请结合以上信息回答问题。"
            
            api_results = self.api_manager.infer_multi(
                query, context=context, max_models=max_api_models
            )
            for r in api_results:
                if r.success and r.answer:
                    api_answers.append(ModelAnswer(
                        answer=r.answer, model=r.model,
                        provider=r.provider, confidence=0.7,
                        latency_ms=r.latency_ms, source="api"
                    ))
                    self.stats["api_calls_made"] = self.stats.get("api_calls_made", 0) + 1
        
        all_answers.extend(api_answers)
        
        # 智能过滤:当API回答可用时,本地记忆仅作上下文不参与聚合
        if api_answers and all_answers:
            filtered = [a for a in all_answers if a.source != "memory"]
            if filtered:  # 有API回答则只保留API
                # 本地记忆作为上下文已在API调用时注入
                all_answers = filtered
                logger.info(f"MOA过滤: 移除local_memory,保留{len(filtered)}个API候选")
        
        # Step 3: 分布式推理(如果联网)
        remote_answers = []
        try:
            online_nodes = self.protocol.discover_nodes()
            available = [n for n in online_nodes if n["node_id"] != self.node_id]
            if available:
                for node_info in available[:2]:
                    svc = self.inference_bridge.services.get(node_info["node_id"])
                    if svc:
                        try:
                            r = svc.infer(query, max_tokens=128)
                            if r.get("response"):
                                remote_answers.append(ModelAnswer(
                                    answer=r["response"],
                                    model=node_info["node_id"],
                                    provider="swarm", confidence=r.get("confidence", 0.5),
                                    latency_ms=r.get("latency_ms", 0), source="swarm"
                                ))
                        except:
                            pass
        except:
            pass
        
        all_answers.extend(remote_answers)
        
        # Step 4: MOA聚合
        if use_moa and len(all_answers) > 1:
            moa_result = self.moa.aggregate(all_answers, question=query)
            final_answer = moa_result.final_answer
            final_confidence = moa_result.confidence
            source = f"moa({moa_result.strategy})"
            contributors = moa_result.contributors
        elif all_answers:
            best = max(all_answers, key=lambda a: a.confidence)
            final_answer = best.answer
            final_confidence = best.confidence
            source = best.source
            contributors = [best.model]
        else:
            final_answer = ""
            final_confidence = 0.0
            source = "none"
            contributors = []
        
        elapsed = time.time() - start_time
        
        return {
            "response": final_answer,
            "source": source,
            "confidence": final_confidence,
            "latency_ms": elapsed * 1000,
            "cost_cc": 0.0,
            "contributors": contributors,
            "detail": {
                "local": 1 if local_resp else 0,
                "api": len(api_answers),
                "swarm": len(remote_answers),
                "total_candidates": len(all_answers),
            }
        }

    # ============================================================
    # 记忆操作
    # ============================================================

    def store_memory(self, user_input: str, ai_response: str,
                     memory_type: str = "chat", importance: float = 0.5) -> str:
        """存储交互记忆"""
        mid = self.memory.store(user_input, ai_response,
                                memory_type=memory_type, importance=importance)
        self.stats["memories_stored"] += 1
        return mid

    def recall_memory(self, query: str, max_tokens: int = 64) -> Dict:
        """检索记忆"""
        return self.memory.recall(query, max_tokens=max_tokens)

    # ============================================================
    # 社交与经济
    # ============================================================

    def add_friend(self, friend_id: str):
        """添加好友(双方免费调用)"""
        self.economy.add_friend(self.node_id, friend_id)
        self.economy.add_friend(friend_id, self.node_id)

    def join_circle(self, circle_name: str, member_ids: List[str]):
        """加入圈子(圈内免费/折扣)"""
        self.economy.create_circle(circle_name, self.node_id)
        for mid in member_ids:
            self.economy.join_circle(circle_name, mid)

    def list_compute(self, units: int, price_per_unit: float,
                     friend_free: bool = True):
        """挂出算力出售"""
        from core.aggregation_protocol.economy_types import ResourceType
        self.economy.sell_compute(
            provider_id=self.node_id,
            resource=ResourceType.INFERENCE,
            units=units,
            price=price_per_unit,
            friend_free=friend_free,
        )

    def get_balance(self) -> float:
        """查询余额"""
        acc = self.economy.currency.get_account(self.node_id)
        return acc.balance if acc else 0.0

    # ============================================================
    # 网络操作
    # ============================================================

    def connect_to(self, other_node: 'SwarmNode'):
        """连接到另一个节点(本地模拟)"""
        # 注册对方节点到协议
        other_info = other_node.protocol.node_info
        self.protocol.add_remote_node(
            node_id=other_info.node_id,
            name=other_info.name,
            permission_level=other_info.permission_level,
            capabilities=other_info.capability.specializations,
            compute_score=other_info.capability.compute_score,
        )
        # 注册经济账户
        try:
            bal = self.economy.currency.get_balance(other_node.node_id)
        except:
            bal = None
        if not bal:
            self.economy.register_node(other_node.node_id, initial_balance=100.0)

    def discover_network(self) -> List[Dict]:
        """发现网络中的节点"""
        return self.protocol.discover_nodes()

    # ============================================================
    # 状态
    # ============================================================

    def get_status(self) -> Dict:
        """获取节点完整状态"""
        return {
            "node_id": self.node_id,
            "name": self.name,
            "permission": self.permission_level.value,
            "memory": self.memory.get_status(),
            "network": self.protocol.get_status(),
            "balance": self.get_balance(),
            "stats": self.stats,
        }


class SwarmNetwork:
    """
    虫群网络 — 多节点模拟/管理

    用于本地测试和演示
    """

    def __init__(self):
        self.nodes: Dict[str, SwarmNode] = {}
        self.economy = SwarmEconomy()

    def create_node(self, node_id: str, name: str,
                    permission: PermissionLevel = PermissionLevel.QUEEN,
                    model_config: str = "tiny",
                    initial_balance: float = 100.0) -> SwarmNode:
        """创建并注册一个节点"""
        node = SwarmNode(
            node_id=node_id,
            name=name,
            permission_level=permission,
            model_config=model_config,
            initial_balance=initial_balance,
        )
        node.start()
        self.nodes[node_id] = node
        return node

    def connect_all(self):
        """所有节点互相连接"""
        node_list = list(self.nodes.values())
        for i, node_a in enumerate(node_list):
            for j, node_b in enumerate(node_list):
                if i != j:
                    node_a.connect_to(node_b)

    def add_friendship(self, id_a: str, id_b: str):
        """建立好友关系"""
        self.nodes[id_a].add_friend(id_b)

    def create_circle(self, name: str, member_ids: List[str]):
        """创建圈子"""
        creator = member_ids[0] if member_ids else ""
        self.economy.create_circle(name, creator)
        for mid in member_ids:
            self.economy.join_circle(name, mid)
            # 同步到各节点的economy
            if mid in self.nodes:
                self.nodes[mid].economy.create_circle(name, mid)
                for other_id in member_ids:
                    if other_id != mid:
                        self.nodes[mid].economy.join_circle(name, other_id)

    def get_network_status(self) -> Dict:
        """获取全网状态"""
        return {
            "total_nodes": len(self.nodes),
            "nodes": {nid: node.get_status() for nid, node in self.nodes.items()},
        }