|
|
import importlib |
|
|
import json |
|
|
import logging |
|
|
import pandas as pd |
|
|
from typing import Dict, Any |
|
|
from feedback.loop_manager import LoopManager |
|
|
|
|
|
class MultiplexingEngine: |
|
|
def __init__(self, matrix_path: str): |
|
|
self.matrix = pd.read_csv(matrix_path) |
|
|
self.agents = {} |
|
|
self.logger = logging.getLogger("MultiplexingEngine") |
|
|
self.loop_enabled = True |
|
|
self._validate_matrix() |
|
|
|
|
|
def _validate_matrix(self): |
|
|
required_columns = {'agent_id', 'role', 'activation_logic', 'config', 'behavior'} |
|
|
missing = required_columns - set(self.matrix.columns) |
|
|
if missing: |
|
|
raise ValueError(f"Missing required columns: {missing}") |
|
|
|
|
|
def _parse_json_field(self, field: str) -> Dict[str, Any]: |
|
|
try: |
|
|
return json.loads(field.replace("'", '"')) |
|
|
except json.JSONDecodeError: |
|
|
return {} |
|
|
|
|
|
def _parse_behavior(self, behavior_str: str) -> Dict[str, Any]: |
|
|
return self._parse_json_field(behavior_str) |
|
|
|
|
|
def _parse_security(self, security_str: str) -> Dict[str, Any]: |
|
|
return self._parse_json_field(security_str) |
|
|
|
|
|
def _load_agent_class(self, role: str): |
|
|
try: |
|
|
module = importlib.import_module(f"agents.{role.lower()}") |
|
|
return getattr(module, role) |
|
|
except Exception as e: |
|
|
self.logger.error(f"Agent class load failed: {str(e)}") |
|
|
return None |
|
|
|
|
|
def dispatch_tasks(self): |
|
|
for _, row in self.matrix.iterrows(): |
|
|
agent_id = row["agent_id"] |
|
|
role = row["role"] |
|
|
config = self._parse_json_field(row.get("config", "{}")) |
|
|
behavior = self._parse_behavior(row.get("behavior", "{}")) |
|
|
security = self._parse_security(row.get("security", "{}")) |
|
|
activation_logic = row.get("activation_logic", "lambda metrics: True") |
|
|
|
|
|
try: |
|
|
if eval(activation_logic)({}): |
|
|
AgentCls = self._load_agent_class(role) |
|
|
if AgentCls: |
|
|
agent_instance = AgentCls(**config) |
|
|
if self.loop_enabled: |
|
|
loop = LoopManager(agent_id) |
|
|
result = loop.run_loop(f"Execute task for {agent_id}") |
|
|
else: |
|
|
result = agent_instance.execute(config=config, behavior=behavior) |
|
|
print(f"{agent_id} executed: {result['output']}") |
|
|
except Exception as e: |
|
|
self.logger.error(f"Dispatch failed for {agent_id}: {str(e)}") |
|
|
|
|
|
def process_feedback(self, agent_id: str, feedback_data: Dict[str, Any]): |
|
|
try: |
|
|
loop = LoopManager(agent_id) |
|
|
result = loop.run_loop(feedback_data.get("task_input", "No input")) |
|
|
print(f"[Feedback] Loop complete for {agent_id}: {result['output']}") |
|
|
except Exception as e: |
|
|
self.logger.error(f"Feedback processing failed for {agent_id}: {str(e)}") |