blux-ca / ca /agent /advanced /multi_agent.py
Justadudeinspace
restructure and upgrade all ca python files
2c5ae19
# blux/agent/advanced/multi_agent.py
from blux.agent.advanced.reasoning import ReasoningLayer
class MultiAgentManager:
"""
Multi-agent manager with memory broadcasting, secure monitoring,
reasoning integration, and constitutional enforcement.
"""
def __init__(self, constitution=None):
self.agents = {}
self.constitution = constitution
self.monitor = None
self.reasoning = {}
def attach_monitor(self, monitor):
self.monitor = monitor
def register_agent(self, name, agent_instance):
self.agents[name] = agent_instance
# Attach reasoning layer per agent
self.reasoning[name] = ReasoningLayer(agent_instance, constitution=self.constitution)
if self.monitor:
self.monitor.log(name, "agent_registered")
def _enforce_constitution(self, agent_name, response):
"""
Placeholder for rule enforcement.
Could veto, flag, or alter responses that violate constitutional rules.
"""
if self.constitution and "violation" in response:
if self.monitor:
self.monitor.log(agent_name, "constitutional_violation", {"response": response})
return "[VIOLATION DETECTED]"
return response
def broadcast_input(self, user_input, user_type="unknown"):
results = {}
for name, agent in self.agents.items():
reasoning = self.reasoning[name]
reasoning_result = reasoning.process(user_input, user_type=user_type)
resp = self._enforce_constitution(name, reasoning_result["decision"])
results[name] = resp
if self.monitor:
self.monitor.log(name, "input_processed", {
"input": user_input,
"response": resp,
"reasoning": reasoning_result
})
return results
def delegate_task(self, user_input, target_agent=None, user_type="unknown"):
if target_agent and target_agent in self.agents:
resp = self.broadcast_input(user_input, user_type)[target_agent]
if self.monitor:
self.monitor.log(target_agent, "task_delegated", {"input": user_input, "response": resp})
return {target_agent: resp}
elif self.agents:
first_agent_name = next(iter(self.agents))
resp = self.broadcast_input(user_input, user_type)[first_agent_name]
if self.monitor:
self.monitor.log(first_agent_name, "task_delegated", {"input": user_input, "response": resp})
return {first_agent_name: resp}
else:
if self.monitor:
self.monitor.log("manager", "task_delegated_failed", {"input": user_input})
return {"error": "No agents registered"}
def broadcast_memory(self, key, value, user_type="default", priority=1, tags=None):
for agent in self.agents.values():
if hasattr(agent, "memory"):
agent.memory.add(key, value, user_type=user_type, priority=priority, tags=tags or [])
if self.monitor:
self.monitor.log(agent.name, "memory_broadcast", {"key": key, "value": value, "tags": tags})
def aggregate_memory(self, key, predictive=True):
aggregated = []
for name, agent in self.agents.items():
if hasattr(agent, "memory"):
entries = agent.memory.recall(key)
if entries:
if predictive:
weighted = [e for e in entries if "urgent" in e.get("tags", [])]
aggregated.extend(weighted or entries)
else:
aggregated.extend(entries)
if self.monitor:
self.monitor.log("manager", "memory_aggregated", {"key": key, "entries": aggregated})
return aggregated
def resolve_conflict(self, responses, use_prediction=True):
enforced = {agent: self._enforce_constitution(agent, resp) for agent, resp in responses.items()}
if use_prediction:
sorted_agents = sorted(
enforced.keys(),
key=lambda name: getattr(self.reasoning[name], "predict_behavior", lambda x: [1])(None)[0] if hasattr(self.reasoning[name], "predict_behavior") else 1,
reverse=True
)
resolved = {a: enforced[a] for a in sorted_agents}
else:
resolved = enforced
return "\n".join(f"[{agent}] {resp}" for agent, resp in resolved.items())
def aggregate_responses(self, responses):
return "\n".join(f"[{agent}] {resp}" for agent, resp in responses.items())ive aggregation =====
def broadcast_memory(self, key, value, user_type="default", priority=1, tags=None):
for agent in self.agents.values():
if hasattr(agent, "memory"):
agent.memory.add(key, value, user_type=user_type, priority=priority, tags=tags or [])
if self.monitor:
self.monitor.log(agent.name, "memory_broadcast", {"key": key, "value": value, "tags": tags})
def aggregate_memory(self, key, predictive=True):
"""
Aggregates memory across all agents.
If predictive=True, weights results based on reasoning predictions.
"""
aggregated = []
for name, agent in self.agents.items():
if hasattr(agent, "memory"):
entries = agent.memory.recall(key)
if entries:
if predictive:
# Simple weighting: prioritize entries tagged as urgent or from predicted struggler
weighted = [e for e in entries if "urgent" in e.get("tags", [])]
aggregated.extend(weighted or entries)
else:
aggregated.extend(entries)
if self.monitor:
self.monitor.log("manager", "memory_aggregated", {"key": key, "entries": aggregated})
return aggregated
# ===== Stage 6: Response aggregation & advanced conflict resolution =====
def resolve_conflict(self, responses, use_prediction=True):
"""
Stage 6 conflict resolution:
- Enforce constitutional rules
- Optionally use reasoning prediction to weight responses
"""
enforced = {agent: self._enforce_constitution(agent, resp) for agent, resp in responses.items()}
if use_prediction:
# Simple prediction-based sorting: 'struggler' > 'neutral' > 'indulgent'
sorted_agents = sorted(
enforced.keys(),
key=lambda name: self.reasoning[name].predict_behavior("")[0] if hasattr(self.reasoning[name], "predict_behavior") else 1,
reverse=True
)
resolved = {a: enforced[a] for a in sorted_agents}
else:
resolved = enforced
return "\n".join(f"[{agent}] {resp}" for agent, resp in resolved.items())
def aggregate_responses(self, responses):
return "\n".join(f"[{agent}] {resp}" for agent, resp in responses.items())