File size: 1,727 Bytes
e83c79a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import redis
import json
import logging
from datetime import datetime

logger = logging.getLogger(__name__)

class OrchestratorAgent:
    def __init__(self, redis_host='localhost', redis_port=6379, agents=None):
        self.agents = agents or {}
        self.redis = None
        
        if redis_host and redis_port:
            try:
                self.redis = redis.Redis(host=redis_host, port=int(redis_port), decode_responses=True)
                # Test connection
                self.redis.ping()
            except Exception as e:
                logger.warning(f"Redis connection failed (Orchestrator): {e}")
                self.redis = None

    def create_workflow(self, workflow_name, payload):
        workflow_id = f"workflow_{workflow_name}_{datetime.now().timestamp()}"
        logger.info(f"Creating workflow {workflow_id}")
        
        # In a real system, you'd decompose this into steps and enqueue them
        # For this prototype, we just log it and maybe store state
        
        if self.redis:
            try:
                self.redis.set(workflow_id, json.dumps({
                    "name": workflow_name,
                    "payload": payload,
                    "status": "created",
                    "created_at": datetime.now().isoformat()
                }))
                self.redis.expire(workflow_id, 86400) # 1 day expiry
            except Exception as e:
                logger.error(f"Failed to save workflow state: {e}")
                
        return workflow_id

    def monitor_agent_health(self):
        # Check system health
        return {
            "orchestrator": "healthy",
            "redis": "connected" if self.redis else "disconnected"
        }