LATE.IO2 / backend /scripts /multiplexing_engine.py
AIEINC
Initial Hugging Face Space deployment
5e1dfdc
import importlib
import json
import logging
import pandas as pd
from typing import Dict, Any
from feedback.loop_manager import LoopManager
class MultiplexingEngine:
def __init__(self, matrix_path: str):
self.matrix = pd.read_csv(matrix_path)
self.agents = {}
self.logger = logging.getLogger("MultiplexingEngine")
self.loop_enabled = True
self._validate_matrix()
def _validate_matrix(self):
required_columns = {'agent_id', 'role', 'activation_logic', 'config', 'behavior'}
missing = required_columns - set(self.matrix.columns)
if missing:
raise ValueError(f"Missing required columns: {missing}")
def _parse_json_field(self, field: str) -> Dict[str, Any]:
try:
return json.loads(field.replace("'", '"'))
except json.JSONDecodeError:
return {}
def _parse_behavior(self, behavior_str: str) -> Dict[str, Any]:
return self._parse_json_field(behavior_str)
def _parse_security(self, security_str: str) -> Dict[str, Any]:
return self._parse_json_field(security_str)
def _load_agent_class(self, role: str):
try:
module = importlib.import_module(f"agents.{role.lower()}")
return getattr(module, role)
except Exception as e:
self.logger.error(f"Agent class load failed: {str(e)}")
return None
def dispatch_tasks(self):
for _, row in self.matrix.iterrows():
agent_id = row["agent_id"]
role = row["role"]
config = self._parse_json_field(row.get("config", "{}"))
behavior = self._parse_behavior(row.get("behavior", "{}"))
security = self._parse_security(row.get("security", "{}"))
activation_logic = row.get("activation_logic", "lambda metrics: True")
try:
if eval(activation_logic)({}): # pass empty context for now
AgentCls = self._load_agent_class(role)
if AgentCls:
agent_instance = AgentCls(**config)
if self.loop_enabled:
loop = LoopManager(agent_id)
result = loop.run_loop(f"Execute task for {agent_id}")
else:
result = agent_instance.execute(config=config, behavior=behavior)
print(f"{agent_id} executed: {result['output']}")
except Exception as e:
self.logger.error(f"Dispatch failed for {agent_id}: {str(e)}")
def process_feedback(self, agent_id: str, feedback_data: Dict[str, Any]):
try:
loop = LoopManager(agent_id)
result = loop.run_loop(feedback_data.get("task_input", "No input"))
print(f"[Feedback] Loop complete for {agent_id}: {result['output']}")
except Exception as e:
self.logger.error(f"Feedback processing failed for {agent_id}: {str(e)}")