petter2025's picture
Update demo/orchestrator.py
c9eaa2d verified
raw
history blame
3.28 kB
# demo/orchestrator.py - COMPLETE FIXED VERSION
from __future__ import annotations
import logging
import asyncio
from typing import Any, Dict, Optional, List
import time
logger = logging.getLogger(__name__)
# Import mock ARF functions
try:
from demo.mock_arf import (
simulate_arf_analysis,
run_rag_similarity_search,
create_mock_healing_intent,
calculate_pattern_confidence
)
MOCK_ARF_AVAILABLE = True
logger.info("Mock ARF functions available")
except ImportError as e:
logger.warning(f"Mock ARF functions not available: {e}")
MOCK_ARF_AVAILABLE = False
class DemoOrchestrator:
"""
Orchestrates demo scenarios with proper agent workflow.
"""
def __init__(self, enable_streamlit: bool = False):
self.enable_streamlit = enable_streamlit
logger.info("DemoOrchestrator initialized")
async def analyze_incident(self, scenario_name: str, scenario_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Analyze an incident using the ARF agent workflow.
This is the method called by app.py
"""
logger.info(f"Analyzing incident: {scenario_name}")
if not MOCK_ARF_AVAILABLE:
logger.error("Mock ARF functions not available")
return {
"status": "error",
"message": "Mock ARF functions not available",
"scenario": scenario_name
}
try:
# Step 1: Detection Agent
logger.debug("Running detection agent...")
detection_result = simulate_arf_analysis(scenario_data)
# Step 2: Recall Agent
logger.debug("Running recall agent...")
similar_incidents = run_rag_similarity_search(scenario_data)
# Step 3: Decision Agent
logger.debug("Running decision agent...")
confidence = calculate_pattern_confidence(scenario_data, similar_incidents)
healing_intent = create_mock_healing_intent(scenario_data, similar_incidents, confidence)
# Simulate processing time
await asyncio.sleep(0.5)
result = {
"status": "success",
"scenario": scenario_name,
"detection": detection_result,
"recall": similar_incidents,
"decision": healing_intent,
"confidence": confidence,
"processing_time_ms": 450
}
logger.info(f"Analysis complete for {scenario_name}")
return result
except Exception as e:
logger.error(f"Error analyzing incident: {e}", exc_info=True)
return {
"status": "error",
"message": str(e),
"scenario": scenario_name
}
def run_scenario(self, scenario: Dict[str, Any]) -> Dict[str, Any]:
"""
Run a demo scenario (legacy method).
"""
logger.info("Running scenario: %s", scenario.get("name", "unknown"))
return {
"scenario": scenario.get("name"),
"status": "completed",
"output": scenario,
}