|
|
|
|
|
|
|
|
from blux.agent.advanced.reasoning import ReasoningLayer |
|
|
|
|
|
class MultiAgentManager: |
|
|
""" |
|
|
Multi-agent manager with memory broadcasting, secure monitoring, |
|
|
reasoning integration, and constitutional enforcement. |
|
|
""" |
|
|
|
|
|
def __init__(self, constitution=None): |
|
|
self.agents = {} |
|
|
self.constitution = constitution |
|
|
self.monitor = None |
|
|
self.reasoning = {} |
|
|
|
|
|
def attach_monitor(self, monitor): |
|
|
self.monitor = monitor |
|
|
|
|
|
def register_agent(self, name, agent_instance): |
|
|
self.agents[name] = agent_instance |
|
|
|
|
|
self.reasoning[name] = ReasoningLayer(agent_instance, constitution=self.constitution) |
|
|
if self.monitor: |
|
|
self.monitor.log(name, "agent_registered") |
|
|
|
|
|
def _enforce_constitution(self, agent_name, response): |
|
|
""" |
|
|
Placeholder for rule enforcement. |
|
|
Could veto, flag, or alter responses that violate constitutional rules. |
|
|
""" |
|
|
if self.constitution and "violation" in response: |
|
|
if self.monitor: |
|
|
self.monitor.log(agent_name, "constitutional_violation", {"response": response}) |
|
|
return "[VIOLATION DETECTED]" |
|
|
return response |
|
|
|
|
|
def broadcast_input(self, user_input, user_type="unknown"): |
|
|
results = {} |
|
|
for name, agent in self.agents.items(): |
|
|
reasoning = self.reasoning[name] |
|
|
reasoning_result = reasoning.process(user_input, user_type=user_type) |
|
|
resp = self._enforce_constitution(name, reasoning_result["decision"]) |
|
|
results[name] = resp |
|
|
if self.monitor: |
|
|
self.monitor.log(name, "input_processed", { |
|
|
"input": user_input, |
|
|
"response": resp, |
|
|
"reasoning": reasoning_result |
|
|
}) |
|
|
return results |
|
|
|
|
|
def delegate_task(self, user_input, target_agent=None, user_type="unknown"): |
|
|
if target_agent and target_agent in self.agents: |
|
|
resp = self.broadcast_input(user_input, user_type)[target_agent] |
|
|
if self.monitor: |
|
|
self.monitor.log(target_agent, "task_delegated", {"input": user_input, "response": resp}) |
|
|
return {target_agent: resp} |
|
|
elif self.agents: |
|
|
first_agent_name = next(iter(self.agents)) |
|
|
resp = self.broadcast_input(user_input, user_type)[first_agent_name] |
|
|
if self.monitor: |
|
|
self.monitor.log(first_agent_name, "task_delegated", {"input": user_input, "response": resp}) |
|
|
return {first_agent_name: resp} |
|
|
else: |
|
|
if self.monitor: |
|
|
self.monitor.log("manager", "task_delegated_failed", {"input": user_input}) |
|
|
return {"error": "No agents registered"} |
|
|
|
|
|
def broadcast_memory(self, key, value, user_type="default", priority=1, tags=None): |
|
|
for agent in self.agents.values(): |
|
|
if hasattr(agent, "memory"): |
|
|
agent.memory.add(key, value, user_type=user_type, priority=priority, tags=tags or []) |
|
|
if self.monitor: |
|
|
self.monitor.log(agent.name, "memory_broadcast", {"key": key, "value": value, "tags": tags}) |
|
|
|
|
|
def aggregate_memory(self, key, predictive=True): |
|
|
aggregated = [] |
|
|
for name, agent in self.agents.items(): |
|
|
if hasattr(agent, "memory"): |
|
|
entries = agent.memory.recall(key) |
|
|
if entries: |
|
|
if predictive: |
|
|
weighted = [e for e in entries if "urgent" in e.get("tags", [])] |
|
|
aggregated.extend(weighted or entries) |
|
|
else: |
|
|
aggregated.extend(entries) |
|
|
if self.monitor: |
|
|
self.monitor.log("manager", "memory_aggregated", {"key": key, "entries": aggregated}) |
|
|
return aggregated |
|
|
|
|
|
def resolve_conflict(self, responses, use_prediction=True): |
|
|
enforced = {agent: self._enforce_constitution(agent, resp) for agent, resp in responses.items()} |
|
|
if use_prediction: |
|
|
sorted_agents = sorted( |
|
|
enforced.keys(), |
|
|
key=lambda name: getattr(self.reasoning[name], "predict_behavior", lambda x: [1])(None)[0] if hasattr(self.reasoning[name], "predict_behavior") else 1, |
|
|
reverse=True |
|
|
) |
|
|
resolved = {a: enforced[a] for a in sorted_agents} |
|
|
else: |
|
|
resolved = enforced |
|
|
return "\n".join(f"[{agent}] {resp}" for agent, resp in resolved.items()) |
|
|
|
|
|
def aggregate_responses(self, responses): |
|
|
return "\n".join(f"[{agent}] {resp}" for agent, resp in responses.items())ive aggregation ===== |
|
|
def broadcast_memory(self, key, value, user_type="default", priority=1, tags=None): |
|
|
for agent in self.agents.values(): |
|
|
if hasattr(agent, "memory"): |
|
|
agent.memory.add(key, value, user_type=user_type, priority=priority, tags=tags or []) |
|
|
if self.monitor: |
|
|
self.monitor.log(agent.name, "memory_broadcast", {"key": key, "value": value, "tags": tags}) |
|
|
|
|
|
def aggregate_memory(self, key, predictive=True): |
|
|
""" |
|
|
Aggregates memory across all agents. |
|
|
If predictive=True, weights results based on reasoning predictions. |
|
|
""" |
|
|
aggregated = [] |
|
|
for name, agent in self.agents.items(): |
|
|
if hasattr(agent, "memory"): |
|
|
entries = agent.memory.recall(key) |
|
|
if entries: |
|
|
if predictive: |
|
|
|
|
|
weighted = [e for e in entries if "urgent" in e.get("tags", [])] |
|
|
aggregated.extend(weighted or entries) |
|
|
else: |
|
|
aggregated.extend(entries) |
|
|
if self.monitor: |
|
|
self.monitor.log("manager", "memory_aggregated", {"key": key, "entries": aggregated}) |
|
|
return aggregated |
|
|
|
|
|
|
|
|
def resolve_conflict(self, responses, use_prediction=True): |
|
|
""" |
|
|
Stage 6 conflict resolution: |
|
|
- Enforce constitutional rules |
|
|
- Optionally use reasoning prediction to weight responses |
|
|
""" |
|
|
enforced = {agent: self._enforce_constitution(agent, resp) for agent, resp in responses.items()} |
|
|
if use_prediction: |
|
|
|
|
|
sorted_agents = sorted( |
|
|
enforced.keys(), |
|
|
key=lambda name: self.reasoning[name].predict_behavior("")[0] if hasattr(self.reasoning[name], "predict_behavior") else 1, |
|
|
reverse=True |
|
|
) |
|
|
resolved = {a: enforced[a] for a in sorted_agents} |
|
|
else: |
|
|
resolved = enforced |
|
|
return "\n".join(f"[{agent}] {resp}" for agent, resp in resolved.items()) |
|
|
|
|
|
def aggregate_responses(self, responses): |
|
|
return "\n".join(f"[{agent}] {resp}" for agent, resp in responses.items()) |