import importlib import json import logging import pandas as pd from typing import Dict, Any from feedback.loop_manager import LoopManager class MultiplexingEngine: def __init__(self, matrix_path: str): self.matrix = pd.read_csv(matrix_path) self.agents = {} self.logger = logging.getLogger("MultiplexingEngine") self.loop_enabled = True self._validate_matrix() def _validate_matrix(self): required_columns = {'agent_id', 'role', 'activation_logic', 'config', 'behavior'} missing = required_columns - set(self.matrix.columns) if missing: raise ValueError(f"Missing required columns: {missing}") def _parse_json_field(self, field: str) -> Dict[str, Any]: try: return json.loads(field.replace("'", '"')) except json.JSONDecodeError: return {} def _parse_behavior(self, behavior_str: str) -> Dict[str, Any]: return self._parse_json_field(behavior_str) def _parse_security(self, security_str: str) -> Dict[str, Any]: return self._parse_json_field(security_str) def _load_agent_class(self, role: str): try: module = importlib.import_module(f"agents.{role.lower()}") return getattr(module, role) except Exception as e: self.logger.error(f"Agent class load failed: {str(e)}") return None def dispatch_tasks(self): for _, row in self.matrix.iterrows(): agent_id = row["agent_id"] role = row["role"] config = self._parse_json_field(row.get("config", "{}")) behavior = self._parse_behavior(row.get("behavior", "{}")) security = self._parse_security(row.get("security", "{}")) activation_logic = row.get("activation_logic", "lambda metrics: True") try: if eval(activation_logic)({}): # pass empty context for now AgentCls = self._load_agent_class(role) if AgentCls: agent_instance = AgentCls(**config) if self.loop_enabled: loop = LoopManager(agent_id) result = loop.run_loop(f"Execute task for {agent_id}") else: result = agent_instance.execute(config=config, behavior=behavior) print(f"{agent_id} executed: {result['output']}") except Exception as e: self.logger.error(f"Dispatch failed for {agent_id}: {str(e)}") def process_feedback(self, agent_id: str, feedback_data: Dict[str, Any]): try: loop = LoopManager(agent_id) result = loop.run_loop(feedback_data.get("task_input", "No input")) print(f"[Feedback] Loop complete for {agent_id}: {result['output']}") except Exception as e: self.logger.error(f"Feedback processing failed for {agent_id}: {str(e)}")