|
|
import redis |
|
|
import json |
|
|
import logging |
|
|
from datetime import datetime |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class OrchestratorAgent: |
|
|
def __init__(self, redis_host='localhost', redis_port=6379, agents=None): |
|
|
self.agents = agents or {} |
|
|
self.redis = None |
|
|
|
|
|
if redis_host and redis_port: |
|
|
try: |
|
|
self.redis = redis.Redis(host=redis_host, port=int(redis_port), decode_responses=True) |
|
|
|
|
|
self.redis.ping() |
|
|
except Exception as e: |
|
|
logger.warning(f"Redis connection failed (Orchestrator): {e}") |
|
|
self.redis = None |
|
|
|
|
|
def create_workflow(self, workflow_name, payload): |
|
|
workflow_id = f"workflow_{workflow_name}_{datetime.now().timestamp()}" |
|
|
logger.info(f"Creating workflow {workflow_id}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.redis: |
|
|
try: |
|
|
self.redis.set(workflow_id, json.dumps({ |
|
|
"name": workflow_name, |
|
|
"payload": payload, |
|
|
"status": "created", |
|
|
"created_at": datetime.now().isoformat() |
|
|
})) |
|
|
self.redis.expire(workflow_id, 86400) |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to save workflow state: {e}") |
|
|
|
|
|
return workflow_id |
|
|
|
|
|
def monitor_agent_health(self): |
|
|
|
|
|
return { |
|
|
"orchestrator": "healthy", |
|
|
"redis": "connected" if self.redis else "disconnected" |
|
|
} |
|
|
|