File size: 3,277 Bytes
c9eaa2d b0546d3 ec78ba8 b0546d3 c6cea2c 9c06673 b0546d3 c6cea2c b0546d3 c6cea2c c9eaa2d c6cea2c b0546d3 e992ee3 c6cea2c e992ee3 b0546d3 c6cea2c c9eaa2d c6cea2c b0546d3 c6cea2c b0546d3 c6cea2c c9eaa2d c6cea2c c9eaa2d c6cea2c c9eaa2d c6cea2c c9eaa2d c6cea2c c9eaa2d c6cea2c c9eaa2d c6cea2c c9eaa2d c6cea2c b0546d3 c6cea2c b0546d3 c6cea2c b0546d3 c6cea2c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 | # demo/orchestrator.py - COMPLETE FIXED VERSION
from __future__ import annotations
import logging
import asyncio
from typing import Any, Dict, Optional, List
import time
logger = logging.getLogger(__name__)
# Import mock ARF functions
try:
from demo.mock_arf import (
simulate_arf_analysis,
run_rag_similarity_search,
create_mock_healing_intent,
calculate_pattern_confidence
)
MOCK_ARF_AVAILABLE = True
logger.info("Mock ARF functions available")
except ImportError as e:
logger.warning(f"Mock ARF functions not available: {e}")
MOCK_ARF_AVAILABLE = False
class DemoOrchestrator:
"""
Orchestrates demo scenarios with proper agent workflow.
"""
def __init__(self, enable_streamlit: bool = False):
self.enable_streamlit = enable_streamlit
logger.info("DemoOrchestrator initialized")
async def analyze_incident(self, scenario_name: str, scenario_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Analyze an incident using the ARF agent workflow.
This is the method called by app.py
"""
logger.info(f"Analyzing incident: {scenario_name}")
if not MOCK_ARF_AVAILABLE:
logger.error("Mock ARF functions not available")
return {
"status": "error",
"message": "Mock ARF functions not available",
"scenario": scenario_name
}
try:
# Step 1: Detection Agent
logger.debug("Running detection agent...")
detection_result = simulate_arf_analysis(scenario_data)
# Step 2: Recall Agent
logger.debug("Running recall agent...")
similar_incidents = run_rag_similarity_search(scenario_data)
# Step 3: Decision Agent
logger.debug("Running decision agent...")
confidence = calculate_pattern_confidence(scenario_data, similar_incidents)
healing_intent = create_mock_healing_intent(scenario_data, similar_incidents, confidence)
# Simulate processing time
await asyncio.sleep(0.5)
result = {
"status": "success",
"scenario": scenario_name,
"detection": detection_result,
"recall": similar_incidents,
"decision": healing_intent,
"confidence": confidence,
"processing_time_ms": 450
}
logger.info(f"Analysis complete for {scenario_name}")
return result
except Exception as e:
logger.error(f"Error analyzing incident: {e}", exc_info=True)
return {
"status": "error",
"message": str(e),
"scenario": scenario_name
}
def run_scenario(self, scenario: Dict[str, Any]) -> Dict[str, Any]:
"""
Run a demo scenario (legacy method).
"""
logger.info("Running scenario: %s", scenario.get("name", "unknown"))
return {
"scenario": scenario.get("name"),
"status": "completed",
"output": scenario,
} |