| |
| from __future__ import annotations |
|
|
| import logging |
| import asyncio |
| from typing import Any, Dict, Optional, List |
| import time |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| try: |
| from demo.mock_arf import ( |
| simulate_arf_analysis, |
| run_rag_similarity_search, |
| create_mock_healing_intent, |
| calculate_pattern_confidence |
| ) |
| MOCK_ARF_AVAILABLE = True |
| logger.info("Mock ARF functions available") |
| except ImportError as e: |
| logger.warning(f"Mock ARF functions not available: {e}") |
| MOCK_ARF_AVAILABLE = False |
|
|
|
|
| class DemoOrchestrator: |
| """ |
| Orchestrates demo scenarios with proper agent workflow. |
| """ |
|
|
| def __init__(self, enable_streamlit: bool = False): |
| self.enable_streamlit = enable_streamlit |
| logger.info("DemoOrchestrator initialized") |
| |
| async def analyze_incident(self, scenario_name: str, scenario_data: Dict[str, Any]) -> Dict[str, Any]: |
| """ |
| Analyze an incident using the ARF agent workflow. |
| This is the method called by app.py |
| """ |
| logger.info(f"Analyzing incident: {scenario_name}") |
| |
| if not MOCK_ARF_AVAILABLE: |
| logger.error("Mock ARF functions not available") |
| return { |
| "status": "error", |
| "message": "Mock ARF functions not available", |
| "scenario": scenario_name |
| } |
| |
| try: |
| |
| logger.debug("Running detection agent...") |
| detection_result = simulate_arf_analysis(scenario_data) |
| |
| |
| logger.debug("Running recall agent...") |
| similar_incidents = run_rag_similarity_search(scenario_data) |
| |
| |
| logger.debug("Running decision agent...") |
| confidence = calculate_pattern_confidence(scenario_data, similar_incidents) |
| healing_intent = create_mock_healing_intent(scenario_data, similar_incidents, confidence) |
| |
| |
| await asyncio.sleep(0.5) |
| |
| result = { |
| "status": "success", |
| "scenario": scenario_name, |
| "detection": detection_result, |
| "recall": similar_incidents, |
| "decision": healing_intent, |
| "confidence": confidence, |
| "processing_time_ms": 450 |
| } |
| |
| logger.info(f"Analysis complete for {scenario_name}") |
| return result |
| |
| except Exception as e: |
| logger.error(f"Error analyzing incident: {e}", exc_info=True) |
| return { |
| "status": "error", |
| "message": str(e), |
| "scenario": scenario_name |
| } |
| |
| def run_scenario(self, scenario: Dict[str, Any]) -> Dict[str, Any]: |
| """ |
| Run a demo scenario (legacy method). |
| """ |
| logger.info("Running scenario: %s", scenario.get("name", "unknown")) |
|
|
| return { |
| "scenario": scenario.get("name"), |
| "status": "completed", |
| "output": scenario, |
| } |