Upload orchestrator.py
Browse files- orchestrator.py +48 -0
orchestrator.py
ADDED
|
@@ -0,0 +1,48 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
import redis
|
| 2 |
+
import json
|
| 3 |
+
import logging
|
| 4 |
+
from datetime import datetime
|
| 5 |
+
|
| 6 |
+
logger = logging.getLogger(__name__)
|
| 7 |
+
|
| 8 |
+
class OrchestratorAgent:
|
| 9 |
+
def __init__(self, redis_host='localhost', redis_port=6379, agents=None):
|
| 10 |
+
self.agents = agents or {}
|
| 11 |
+
self.redis = None
|
| 12 |
+
|
| 13 |
+
if redis_host and redis_port:
|
| 14 |
+
try:
|
| 15 |
+
self.redis = redis.Redis(host=redis_host, port=int(redis_port), decode_responses=True)
|
| 16 |
+
# Test connection
|
| 17 |
+
self.redis.ping()
|
| 18 |
+
except Exception as e:
|
| 19 |
+
logger.warning(f"Redis connection failed (Orchestrator): {e}")
|
| 20 |
+
self.redis = None
|
| 21 |
+
|
| 22 |
+
def create_workflow(self, workflow_name, payload):
|
| 23 |
+
workflow_id = f"workflow_{workflow_name}_{datetime.now().timestamp()}"
|
| 24 |
+
logger.info(f"Creating workflow {workflow_id}")
|
| 25 |
+
|
| 26 |
+
# In a real system, you'd decompose this into steps and enqueue them
|
| 27 |
+
# For this prototype, we just log it and maybe store state
|
| 28 |
+
|
| 29 |
+
if self.redis:
|
| 30 |
+
try:
|
| 31 |
+
self.redis.set(workflow_id, json.dumps({
|
| 32 |
+
"name": workflow_name,
|
| 33 |
+
"payload": payload,
|
| 34 |
+
"status": "created",
|
| 35 |
+
"created_at": datetime.now().isoformat()
|
| 36 |
+
}))
|
| 37 |
+
self.redis.expire(workflow_id, 86400) # 1 day expiry
|
| 38 |
+
except Exception as e:
|
| 39 |
+
logger.error(f"Failed to save workflow state: {e}")
|
| 40 |
+
|
| 41 |
+
return workflow_id
|
| 42 |
+
|
| 43 |
+
def monitor_agent_health(self):
|
| 44 |
+
# Check system health
|
| 45 |
+
return {
|
| 46 |
+
"orchestrator": "healthy",
|
| 47 |
+
"redis": "connected" if self.redis else "disconnected"
|
| 48 |
+
}
|