""" LLM integration module for prompt management and execution """ import json import logging from typing import Dict, Any, Optional from abc import ABC, abstractmethod import os logger = logging.getLogger(__name__) class PromptManager: """Manages prompts from the prompts directory""" def __init__(self, prompts_dir: str = "../prompts"): self.prompts_dir = prompts_dir self._cache: Dict[str, str] = {} def load_prompt(self, prompt_name: str) -> str: """Load a prompt file from the prompts directory""" if prompt_name in self._cache: return self._cache[prompt_name] prompt_file = os.path.join(self.prompts_dir, f"{prompt_name}.md") try: with open(prompt_file, 'r') as f: content = f.read() self._cache[prompt_name] = content return content except FileNotFoundError: logger.error(f"Prompt file not found: {prompt_file}") raise def get_prompt_template(self, prompt_name: str) -> str: """ Extract the prompt template from a prompt file Template is expected to be within triple backticks """ content = self.load_prompt(prompt_name) # Extract template from markdown start_marker = "```" parts = content.split(start_marker) if len(parts) >= 3: # Typically: [header, language marker line, actual template, rest] return parts[2].strip() return content class LLMClient(ABC): """Abstract base class for LLM clients""" @abstractmethod async def chat(self, messages: list, **kwargs) -> str: """Send chat request to LLM""" pass @abstractmethod async def analyze_alerts(self, alerts: list) -> Dict[str, Any]: """Analyze alerts for correlation/classification""" pass @abstractmethod async def analyze_incident(self, incident: Dict[str, Any], analysis_type: str) -> Dict[str, Any]: """Perform LLM analysis on incident""" pass class OpenAIClient(LLMClient): """OpenAI LLM client""" def __init__(self, api_key: str, model: str = "gpt-4", temperature: float = 0.7): self.api_key = api_key self.model = model self.temperature = temperature self.prompt_manager = PromptManager() # Initialize OpenAI client (will be imported when needed) try: from openai import AsyncOpenAI self.client = AsyncOpenAI(api_key=api_key) except ImportError: logger.warning("OpenAI client not available") self.client = None async def chat(self, messages: list, **kwargs) -> str: """Send chat request to OpenAI""" if not self.client: raise RuntimeError("OpenAI client not initialized") try: response = await self.client.chat.completions.create( model=self.model, messages=messages, temperature=self.temperature, **kwargs ) return response.choices[0].message.content except Exception as e: logger.error(f"OpenAI API error: {e}") raise async def analyze_alerts(self, alerts: list) -> Dict[str, Any]: """Analyze alerts for correlation using alert_correlation prompt""" try: template = self.prompt_manager.get_prompt_template("alert_correlation") prompt = template.format( ALERT_COUNT=len(alerts), TIME_WINDOW=5, INCIDENT_COUNT=0, ALERTS_JSON=json.dumps(alerts, indent=2) ) response = await self.chat([{"role": "user", "content": prompt}]) # Parse JSON response try: result = json.loads(response) return result except json.JSONDecodeError: logger.error(f"Failed to parse LLM response as JSON: {response}") return {"raw_response": response, "error": "JSON parse error"} except Exception as e: logger.error(f"Alert analysis error: {e}") raise async def analyze_incident(self, incident: Dict[str, Any], analysis_type: str) -> Dict[str, Any]: """Perform LLM analysis on incident""" try: if analysis_type == "classification": template = self.prompt_manager.get_prompt_template("incident_classification") elif analysis_type == "root_cause": template = self.prompt_manager.get_prompt_template("root_cause_analysis") elif analysis_type == "recommendation": template = self.prompt_manager.get_prompt_template("incident_recommendation") else: raise ValueError(f"Unknown analysis type: {analysis_type}") prompt = template.format( INCIDENT_DETAILS=json.dumps(incident, indent=2), RELATED_ALERTS=json.dumps(incident.get("alerts", []), indent=2), SIMILAR_COUNT=0, COMMON_CAUSES="[]", AVG_MTTR="Unknown" ) response = await self.chat([{"role": "user", "content": prompt}]) # Parse JSON response try: result = json.loads(response) result["analysis_type"] = analysis_type return result except json.JSONDecodeError: logger.error(f"Failed to parse LLM response as JSON: {response}") return {"raw_response": response, "error": "JSON parse error"} except Exception as e: logger.error(f"Incident analysis error: {e}") raise class AnthropicClient(LLMClient): """Anthropic Claude LLM client""" def __init__(self, api_key: str, model: str = "claude-3-sonnet-20240229"): self.api_key = api_key self.model = model self.prompt_manager = PromptManager() # Initialize Anthropic client try: from anthropic import AsyncAnthropic self.client = AsyncAnthropic(api_key=api_key) except ImportError: logger.warning("Anthropic client not available") self.client = None class OllamaClient(LLMClient): """Ollama local LLM client (free, runs locally)""" def __init__(self, base_url: str = "http://localhost:11434", model: str = "mistral", temperature: float = 0.7, num_predict: int = 2048): self.base_url = base_url.rstrip('/') self.model = model self.temperature = temperature self.num_predict = num_predict self.prompt_manager = PromptManager() # Initialize HTTP client for Ollama try: import httpx self.client = httpx.AsyncClient(timeout=60.0) except ImportError: logger.warning("httpx not available, install with: pip install httpx") self.client = None async def _call_ollama(self, prompt: str) -> str: """Call Ollama API""" if not self.client: raise RuntimeError("Ollama client not initialized") try: response = await self.client.post( f"{self.base_url}/api/generate", json={ "model": self.model, "prompt": prompt, "temperature": self.temperature, "num_predict": self.num_predict, "stream": False } ) response.raise_for_status() result = response.json() return result.get("response", "").strip() except Exception as e: logger.error(f"Ollama API error: {e}") raise async def chat(self, messages: list, **kwargs) -> str: """Send chat request to Ollama (converts messages to prompt)""" # Convert messages format to simple prompt prompt = "" for msg in messages: role = msg.get("role", "user") content = msg.get("content", "") if role == "system": prompt += f"{content}\n\n" elif role == "user": prompt += f"User: {content}\n" elif role == "assistant": prompt += f"Assistant: {content}\n" prompt += "Assistant: " return await self._call_ollama(prompt) async def analyze_alerts(self, alerts: list) -> Dict[str, Any]: """Analyze alerts for correlation""" try: template = self.prompt_manager.get_prompt_template("alert_correlation") prompt = template.format( ALERT_COUNT=len(alerts), TIME_WINDOW=5, INCIDENT_COUNT=0, ALERTS_JSON=json.dumps(alerts, indent=2) ) response = await self._call_ollama(prompt) try: result = json.loads(response) return result except json.JSONDecodeError: logger.error(f"Failed to parse LLM response as JSON: {response}") return {"raw_response": response, "error": "JSON parse error"} except Exception as e: logger.error(f"Alert analysis error: {e}") raise async def analyze_incident(self, incident: Dict[str, Any], analysis_type: str) -> Dict[str, Any]: """Perform LLM analysis on incident""" try: if analysis_type == "classification": template = self.prompt_manager.get_prompt_template("incident_classification") elif analysis_type == "root_cause": template = self.prompt_manager.get_prompt_template("root_cause_analysis") elif analysis_type == "recommendation": template = self.prompt_manager.get_prompt_template("incident_recommendation") else: raise ValueError(f"Unknown analysis type: {analysis_type}") prompt = template.format( INCIDENT_DETAILS=json.dumps(incident, indent=2), RELATED_ALERTS=json.dumps(incident.get("alerts", []), indent=2), SIMILAR_COUNT=0, COMMON_CAUSES="[]", AVG_MTTR="Unknown" ) response = await self._call_ollama(prompt) try: result = json.loads(response) result["analysis_type"] = analysis_type return result except json.JSONDecodeError: logger.error(f"Failed to parse LLM response as JSON: {response}") return {"raw_response": response, "error": "JSON parse error"} except Exception as e: logger.error(f"Incident analysis error: {e}") raise async def chat(self, messages: list, **kwargs) -> str: """Send chat request to Anthropic""" if not self.client: raise RuntimeError("Anthropic client not initialized") try: response = await self.client.messages.create( model=self.model, max_tokens=2000, messages=messages, **kwargs ) return response.content[0].text except Exception as e: logger.error(f"Anthropic API error: {e}") raise async def analyze_alerts(self, alerts: list) -> Dict[str, Any]: """Analyze alerts for correlation""" try: template = self.prompt_manager.get_prompt_template("alert_correlation") prompt = template.format( ALERT_COUNT=len(alerts), TIME_WINDOW=5, INCIDENT_COUNT=0, ALERTS_JSON=json.dumps(alerts, indent=2) ) response = await self.chat([{"role": "user", "content": prompt}]) try: result = json.loads(response) return result except json.JSONDecodeError: logger.error(f"Failed to parse LLM response as JSON: {response}") return {"raw_response": response, "error": "JSON parse error"} except Exception as e: logger.error(f"Alert analysis error: {e}") raise async def analyze_incident(self, incident: Dict[str, Any], analysis_type: str) -> Dict[str, Any]: """Perform LLM analysis on incident""" try: if analysis_type == "classification": template = self.prompt_manager.get_prompt_template("incident_classification") elif analysis_type == "root_cause": template = self.prompt_manager.get_prompt_template("root_cause_analysis") elif analysis_type == "recommendation": template = self.prompt_manager.get_prompt_template("incident_recommendation") else: raise ValueError(f"Unknown analysis type: {analysis_type}") prompt = template.format( INCIDENT_DETAILS=json.dumps(incident, indent=2), RELATED_ALERTS=json.dumps(incident.get("alerts", []), indent=2), SIMILAR_COUNT=0, COMMON_CAUSES="[]" ) response = await self.chat([{"role": "user", "content": prompt}]) try: result = json.loads(response) result["analysis_type"] = analysis_type return result except json.JSONDecodeError: logger.error(f"Failed to parse LLM response as JSON: {response}") return {"raw_response": response, "error": "JSON parse error"} except Exception as e: logger.error(f"Incident analysis error: {e}") raise class LLMFactory: """Factory for creating LLM clients""" @staticmethod def create_client(provider: str, **kwargs) -> LLMClient: """Create an LLM client based on provider""" if provider.lower() == "ollama": return OllamaClient(**kwargs) elif provider.lower() == "openai": return OpenAIClient(**kwargs) elif provider.lower() == "anthropic": return AnthropicClient(**kwargs) else: raise ValueError(f"Unknown LLM provider: {provider}")