#!/usr/bin/env python3 """ 虫群v8 — 集成核心 (Swarm Integration Core) 三层架构统一入口: 1. 参数化记忆模型 — 数据存参数,模型即数据库 2. 聚合协议 — 临时服务器,按需组网 3. 经济系统 — 算力货币,好友免费,虫皇税收 核心流程: 用户交互 → 记忆即时写入 → 组建临时服务器 → 调度推理 → 计费 → 结果聚合 """ import hashlib import logging import time import threading from datetime import datetime from typing import Dict, List, Optional logger = logging.getLogger(__name__) from core.parametric_memory import ParametricMemoryModel, MemoryEncoder from core.aggregation_protocol.protocol import AggregationProtocol from core.aggregation_protocol.types import ( NodeInfo, NodeCapability, NodeRole, PermissionLevel, AggregationStrategy, ) from core.aggregation_protocol.economy import SwarmEconomy class SwarmNode: """ 虫群节点 — 个人端完整实例 每个用户运行一个SwarmNode,包含: - 参数化记忆模型:存储个人记忆 - 聚合协议客户端:加入网络、组建临时服务器 - 经济账户:算力交易 这是"虫后"的完整实现 """ def __init__( self, node_id: str, name: str, permission_level: PermissionLevel = PermissionLevel.QUEEN, model_config: str = "tiny", lora_rank: int = 4, write_mode: str = "instant", initial_balance: float = 100.0, ): self.node_id = node_id self.name = name self.permission_level = permission_level # 1. 参数化记忆模型 self.memory = ParametricMemoryModel( model_config=model_config, lora_rank=lora_rank, write_mode=write_mode, accumulate_steps=5, micro_epochs=3, ) # 2. 聚合协议 cap = NodeCapability(model_types=["memory", "chat"], compute_score=1.0, specializations=["memory", "chat"]) role = NodeRole.HIVE if permission_level == PermissionLevel.OVERMIND else NodeRole.QUEEN self.protocol = AggregationProtocol( node_id=node_id, role=role, name=name, permission_level=permission_level, capabilities=["memory", "chat"], compute_score=cap.compute_score, ) # 3. 经济系统 self.economy = SwarmEconomy() self.economy.register_node(node_id, initial_balance=initial_balance) # 4. 推理服务 from core.inference_service import InferenceService, DistributedInferenceBridge self.inference = InferenceService(self.memory, node_id) self.inference_bridge = DistributedInferenceBridge() self.inference_bridge.register_service(node_id, self.inference) # 5. API推理 + MOA聚合 from core.api_inference import APIInferenceManager from core.moa_aggregator import MOAAggregator, AggregationStrategy, ModelAnswer self.api_manager = APIInferenceManager.from_env() self.moa = MOAAggregator(AggregationStrategy.ADAPTIVE) self._api_models_available = len(self.api_manager.models) > 0 # 统计 self.stats = { "memories_stored": 0, "queries_processed": 0, "remote_calls_made": 0, "remote_calls_received": 0, "total_cost_cc": 0.0, "total_earned_cc": 0.0, } # ============================================================ # 生命周期 # ============================================================ def start(self): """启动节点""" self.protocol.start() print(f"[SwarmNode] {self.name} 启动完成") def stop(self): """停止节点""" self.memory.save() print(f"[SwarmNode] {self.name} 已停止") # ============================================================ # 核心:交互处理 # ============================================================ def chat(self, query: str, user_id: str = "default") -> Dict: """ 统一交互入口 流程: 1. 记忆即时写入(后台) 2. 本地记忆检索 3. 如果本地不够,组建临时服务器调用远程 4. 计费 5. 返回结果 """ start_time = time.time() self.stats["queries_processed"] += 1 # Step 1: 本地记忆检索 local_result = self.memory.recall(query, max_tokens=64, temperature=0.3) local_confidence = local_result.get("confidence", 0.0) # Step 2: 本地置信度足够则直接返回 if local_confidence >= 0.7 and len(local_result.get("response", "")) > 5: elapsed = time.time() - start_time return { "response": local_result["response"], "source": "local_memory", "confidence": local_confidence, "latency_ms": elapsed * 1000, "cost_cc": 0.0, "node_id": self.node_id, } # Step 3: 本地不够,尝试远程调用 remote_result = None cost = 0.0 try: online_nodes = self.protocol.discover_nodes() available = [n for n in online_nodes if n["node_id"] != self.node_id] if available: # 提交聚合任务 task_result = self.protocol.submit_task( query=query, min_nodes=1, max_nodes=min(3, len(available)), strategy=AggregationStrategy.ADAPTIVE_MIX, timeout_sec=15, ) if task_result.get("status") == "completed": remote_result = { "response": task_result.get("final_response", ""), "confidence": task_result.get("confidence", 0.5), "source": "remote_aggregation", "members": task_result.get("members", []), } # 计费 members = task_result.get("members", []) for member_id in members: fee = self.economy.call_compute( caller_id=self.node_id, provider_id=member_id, compute_units=100, ) cost += fee self.stats["remote_calls_made"] += len(members) self.stats["total_cost_cc"] += cost except Exception as e: print(f"[SwarmNode] 远程调用失败: {e}") # Step 4: 合并结果 if remote_result and remote_result.get("confidence", 0) > local_confidence: final_response = remote_result["response"] final_confidence = remote_result["confidence"] source = "remote_aggregation" else: final_response = local_result.get("response", "") final_confidence = local_confidence source = "local_memory" elapsed = time.time() - start_time return { "response": final_response, "source": source, "confidence": final_confidence, "latency_ms": elapsed * 1000, "cost_cc": cost, "node_id": self.node_id, } def smart_query(self, query: str, use_api: bool = True, use_moa: bool = True, max_api_models: int = 3) -> Dict: """ 智能查询:本地记忆 + API推理 + MOA聚合 流程: 1. 本地参数化记忆检索(0ms网络延迟) 2. API多模型推理(智谱GLM/NIM) 3. MOA聚合所有回答 → 最优答案 """ start_time = time.time() self.stats["queries_processed"] += 1 from core.moa_aggregator import ModelAnswer, QualityScorer all_answers = [] # Step 1: 本地记忆检索 local_result = self.memory.recall(query, max_tokens=128, temperature=0.5) local_resp = local_result.get("response", "") local_conf = local_result.get("confidence", 0.0) if local_resp and len(local_resp.strip()) > 3: all_answers.append(ModelAnswer( answer=local_resp, model="local_memory", provider="local", confidence=local_conf, latency_ms=0, source="memory" )) # 本地置信度够高且无API需求时直接返回 if local_conf >= 0.8 and not use_api: elapsed = time.time() - start_time return { "response": local_resp, "source": "local_memory", "confidence": local_conf, "latency_ms": elapsed * 1000, "cost_cc": 0.0, "contributors": ["local_memory"], } # Step 2: API多模型推理 api_answers = [] if use_api and self._api_models_available: # 用本地记忆作为上下文提示 context = "" if local_resp: context = f"已知信息:{local_resp}\n请结合以上信息回答问题。" api_results = self.api_manager.infer_multi( query, context=context, max_models=max_api_models ) for r in api_results: if r.success and r.answer: api_answers.append(ModelAnswer( answer=r.answer, model=r.model, provider=r.provider, confidence=0.7, latency_ms=r.latency_ms, source="api" )) self.stats["api_calls_made"] = self.stats.get("api_calls_made", 0) + 1 all_answers.extend(api_answers) # 智能过滤:当API回答可用时,本地记忆仅作上下文不参与聚合 if api_answers and all_answers: filtered = [a for a in all_answers if a.source != "memory"] if filtered: # 有API回答则只保留API # 本地记忆作为上下文已在API调用时注入 all_answers = filtered logger.info(f"MOA过滤: 移除local_memory,保留{len(filtered)}个API候选") # Step 3: 分布式推理(如果联网) remote_answers = [] try: online_nodes = self.protocol.discover_nodes() available = [n for n in online_nodes if n["node_id"] != self.node_id] if available: for node_info in available[:2]: svc = self.inference_bridge.services.get(node_info["node_id"]) if svc: try: r = svc.infer(query, max_tokens=128) if r.get("response"): remote_answers.append(ModelAnswer( answer=r["response"], model=node_info["node_id"], provider="swarm", confidence=r.get("confidence", 0.5), latency_ms=r.get("latency_ms", 0), source="swarm" )) except: pass except: pass all_answers.extend(remote_answers) # Step 4: MOA聚合 if use_moa and len(all_answers) > 1: moa_result = self.moa.aggregate(all_answers, question=query) final_answer = moa_result.final_answer final_confidence = moa_result.confidence source = f"moa({moa_result.strategy})" contributors = moa_result.contributors elif all_answers: best = max(all_answers, key=lambda a: a.confidence) final_answer = best.answer final_confidence = best.confidence source = best.source contributors = [best.model] else: final_answer = "" final_confidence = 0.0 source = "none" contributors = [] elapsed = time.time() - start_time return { "response": final_answer, "source": source, "confidence": final_confidence, "latency_ms": elapsed * 1000, "cost_cc": 0.0, "contributors": contributors, "detail": { "local": 1 if local_resp else 0, "api": len(api_answers), "swarm": len(remote_answers), "total_candidates": len(all_answers), } } # ============================================================ # 记忆操作 # ============================================================ def store_memory(self, user_input: str, ai_response: str, memory_type: str = "chat", importance: float = 0.5) -> str: """存储交互记忆""" mid = self.memory.store(user_input, ai_response, memory_type=memory_type, importance=importance) self.stats["memories_stored"] += 1 return mid def recall_memory(self, query: str, max_tokens: int = 64) -> Dict: """检索记忆""" return self.memory.recall(query, max_tokens=max_tokens) # ============================================================ # 社交与经济 # ============================================================ def add_friend(self, friend_id: str): """添加好友(双方免费调用)""" self.economy.add_friend(self.node_id, friend_id) self.economy.add_friend(friend_id, self.node_id) def join_circle(self, circle_name: str, member_ids: List[str]): """加入圈子(圈内免费/折扣)""" self.economy.create_circle(circle_name, self.node_id) for mid in member_ids: self.economy.join_circle(circle_name, mid) def list_compute(self, units: int, price_per_unit: float, friend_free: bool = True): """挂出算力出售""" from core.aggregation_protocol.economy_types import ResourceType self.economy.sell_compute( provider_id=self.node_id, resource=ResourceType.INFERENCE, units=units, price=price_per_unit, friend_free=friend_free, ) def get_balance(self) -> float: """查询余额""" acc = self.economy.currency.get_account(self.node_id) return acc.balance if acc else 0.0 # ============================================================ # 网络操作 # ============================================================ def connect_to(self, other_node: 'SwarmNode'): """连接到另一个节点(本地模拟)""" # 注册对方节点到协议 other_info = other_node.protocol.node_info self.protocol.add_remote_node( node_id=other_info.node_id, name=other_info.name, permission_level=other_info.permission_level, capabilities=other_info.capability.specializations, compute_score=other_info.capability.compute_score, ) # 注册经济账户 try: bal = self.economy.currency.get_balance(other_node.node_id) except: bal = None if not bal: self.economy.register_node(other_node.node_id, initial_balance=100.0) def discover_network(self) -> List[Dict]: """发现网络中的节点""" return self.protocol.discover_nodes() # ============================================================ # 状态 # ============================================================ def get_status(self) -> Dict: """获取节点完整状态""" return { "node_id": self.node_id, "name": self.name, "permission": self.permission_level.value, "memory": self.memory.get_status(), "network": self.protocol.get_status(), "balance": self.get_balance(), "stats": self.stats, } class SwarmNetwork: """ 虫群网络 — 多节点模拟/管理 用于本地测试和演示 """ def __init__(self): self.nodes: Dict[str, SwarmNode] = {} self.economy = SwarmEconomy() def create_node(self, node_id: str, name: str, permission: PermissionLevel = PermissionLevel.QUEEN, model_config: str = "tiny", initial_balance: float = 100.0) -> SwarmNode: """创建并注册一个节点""" node = SwarmNode( node_id=node_id, name=name, permission_level=permission, model_config=model_config, initial_balance=initial_balance, ) node.start() self.nodes[node_id] = node return node def connect_all(self): """所有节点互相连接""" node_list = list(self.nodes.values()) for i, node_a in enumerate(node_list): for j, node_b in enumerate(node_list): if i != j: node_a.connect_to(node_b) def add_friendship(self, id_a: str, id_b: str): """建立好友关系""" self.nodes[id_a].add_friend(id_b) def create_circle(self, name: str, member_ids: List[str]): """创建圈子""" creator = member_ids[0] if member_ids else "" self.economy.create_circle(name, creator) for mid in member_ids: self.economy.join_circle(name, mid) # 同步到各节点的economy if mid in self.nodes: self.nodes[mid].economy.create_circle(name, mid) for other_id in member_ids: if other_id != mid: self.nodes[mid].economy.join_circle(name, other_id) def get_network_status(self) -> Dict: """获取全网状态""" return { "total_nodes": len(self.nodes), "nodes": {nid: node.get_status() for nid, node in self.nodes.items()}, }