Debashis
Deploy AIMS Full-Stack - 2026-03-28 18:01:53
45ab2bd
"""
LLM integration module for prompt management and execution
"""
import json
import logging
from typing import Dict, Any, Optional
from abc import ABC, abstractmethod
import os
logger = logging.getLogger(__name__)
class PromptManager:
"""Manages prompts from the prompts directory"""
def __init__(self, prompts_dir: str = "../prompts"):
self.prompts_dir = prompts_dir
self._cache: Dict[str, str] = {}
def load_prompt(self, prompt_name: str) -> str:
"""Load a prompt file from the prompts directory"""
if prompt_name in self._cache:
return self._cache[prompt_name]
prompt_file = os.path.join(self.prompts_dir, f"{prompt_name}.md")
try:
with open(prompt_file, 'r') as f:
content = f.read()
self._cache[prompt_name] = content
return content
except FileNotFoundError:
logger.error(f"Prompt file not found: {prompt_file}")
raise
def get_prompt_template(self, prompt_name: str) -> str:
"""
Extract the prompt template from a prompt file
Template is expected to be within triple backticks
"""
content = self.load_prompt(prompt_name)
# Extract template from markdown
start_marker = "```"
parts = content.split(start_marker)
if len(parts) >= 3:
# Typically: [header, language marker line, actual template, rest]
return parts[2].strip()
return content
class LLMClient(ABC):
"""Abstract base class for LLM clients"""
@abstractmethod
async def chat(self, messages: list, **kwargs) -> str:
"""Send chat request to LLM"""
pass
@abstractmethod
async def analyze_alerts(self, alerts: list) -> Dict[str, Any]:
"""Analyze alerts for correlation/classification"""
pass
@abstractmethod
async def analyze_incident(self, incident: Dict[str, Any], analysis_type: str) -> Dict[str, Any]:
"""Perform LLM analysis on incident"""
pass
class OpenAIClient(LLMClient):
"""OpenAI LLM client"""
def __init__(self, api_key: str, model: str = "gpt-4", temperature: float = 0.7):
self.api_key = api_key
self.model = model
self.temperature = temperature
self.prompt_manager = PromptManager()
# Initialize OpenAI client (will be imported when needed)
try:
from openai import AsyncOpenAI
self.client = AsyncOpenAI(api_key=api_key)
except ImportError:
logger.warning("OpenAI client not available")
self.client = None
async def chat(self, messages: list, **kwargs) -> str:
"""Send chat request to OpenAI"""
if not self.client:
raise RuntimeError("OpenAI client not initialized")
try:
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=self.temperature,
**kwargs
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"OpenAI API error: {e}")
raise
async def analyze_alerts(self, alerts: list) -> Dict[str, Any]:
"""Analyze alerts for correlation using alert_correlation prompt"""
try:
template = self.prompt_manager.get_prompt_template("alert_correlation")
prompt = template.format(
ALERT_COUNT=len(alerts),
TIME_WINDOW=5,
INCIDENT_COUNT=0,
ALERTS_JSON=json.dumps(alerts, indent=2)
)
response = await self.chat([{"role": "user", "content": prompt}])
# Parse JSON response
try:
result = json.loads(response)
return result
except json.JSONDecodeError:
logger.error(f"Failed to parse LLM response as JSON: {response}")
return {"raw_response": response, "error": "JSON parse error"}
except Exception as e:
logger.error(f"Alert analysis error: {e}")
raise
async def analyze_incident(self, incident: Dict[str, Any], analysis_type: str) -> Dict[str, Any]:
"""Perform LLM analysis on incident"""
try:
if analysis_type == "classification":
template = self.prompt_manager.get_prompt_template("incident_classification")
elif analysis_type == "root_cause":
template = self.prompt_manager.get_prompt_template("root_cause_analysis")
elif analysis_type == "recommendation":
template = self.prompt_manager.get_prompt_template("incident_recommendation")
else:
raise ValueError(f"Unknown analysis type: {analysis_type}")
prompt = template.format(
INCIDENT_DETAILS=json.dumps(incident, indent=2),
RELATED_ALERTS=json.dumps(incident.get("alerts", []), indent=2),
SIMILAR_COUNT=0,
COMMON_CAUSES="[]",
AVG_MTTR="Unknown"
)
response = await self.chat([{"role": "user", "content": prompt}])
# Parse JSON response
try:
result = json.loads(response)
result["analysis_type"] = analysis_type
return result
except json.JSONDecodeError:
logger.error(f"Failed to parse LLM response as JSON: {response}")
return {"raw_response": response, "error": "JSON parse error"}
except Exception as e:
logger.error(f"Incident analysis error: {e}")
raise
class AnthropicClient(LLMClient):
"""Anthropic Claude LLM client"""
def __init__(self, api_key: str, model: str = "claude-3-sonnet-20240229"):
self.api_key = api_key
self.model = model
self.prompt_manager = PromptManager()
# Initialize Anthropic client
try:
from anthropic import AsyncAnthropic
self.client = AsyncAnthropic(api_key=api_key)
except ImportError:
logger.warning("Anthropic client not available")
self.client = None
class OllamaClient(LLMClient):
"""Ollama local LLM client (free, runs locally)"""
def __init__(self, base_url: str = "http://localhost:11434", model: str = "mistral", temperature: float = 0.7, num_predict: int = 2048):
self.base_url = base_url.rstrip('/')
self.model = model
self.temperature = temperature
self.num_predict = num_predict
self.prompt_manager = PromptManager()
# Initialize HTTP client for Ollama
try:
import httpx
self.client = httpx.AsyncClient(timeout=60.0)
except ImportError:
logger.warning("httpx not available, install with: pip install httpx")
self.client = None
async def _call_ollama(self, prompt: str) -> str:
"""Call Ollama API"""
if not self.client:
raise RuntimeError("Ollama client not initialized")
try:
response = await self.client.post(
f"{self.base_url}/api/generate",
json={
"model": self.model,
"prompt": prompt,
"temperature": self.temperature,
"num_predict": self.num_predict,
"stream": False
}
)
response.raise_for_status()
result = response.json()
return result.get("response", "").strip()
except Exception as e:
logger.error(f"Ollama API error: {e}")
raise
async def chat(self, messages: list, **kwargs) -> str:
"""Send chat request to Ollama (converts messages to prompt)"""
# Convert messages format to simple prompt
prompt = ""
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if role == "system":
prompt += f"{content}\n\n"
elif role == "user":
prompt += f"User: {content}\n"
elif role == "assistant":
prompt += f"Assistant: {content}\n"
prompt += "Assistant: "
return await self._call_ollama(prompt)
async def analyze_alerts(self, alerts: list) -> Dict[str, Any]:
"""Analyze alerts for correlation"""
try:
template = self.prompt_manager.get_prompt_template("alert_correlation")
prompt = template.format(
ALERT_COUNT=len(alerts),
TIME_WINDOW=5,
INCIDENT_COUNT=0,
ALERTS_JSON=json.dumps(alerts, indent=2)
)
response = await self._call_ollama(prompt)
try:
result = json.loads(response)
return result
except json.JSONDecodeError:
logger.error(f"Failed to parse LLM response as JSON: {response}")
return {"raw_response": response, "error": "JSON parse error"}
except Exception as e:
logger.error(f"Alert analysis error: {e}")
raise
async def analyze_incident(self, incident: Dict[str, Any], analysis_type: str) -> Dict[str, Any]:
"""Perform LLM analysis on incident"""
try:
if analysis_type == "classification":
template = self.prompt_manager.get_prompt_template("incident_classification")
elif analysis_type == "root_cause":
template = self.prompt_manager.get_prompt_template("root_cause_analysis")
elif analysis_type == "recommendation":
template = self.prompt_manager.get_prompt_template("incident_recommendation")
else:
raise ValueError(f"Unknown analysis type: {analysis_type}")
prompt = template.format(
INCIDENT_DETAILS=json.dumps(incident, indent=2),
RELATED_ALERTS=json.dumps(incident.get("alerts", []), indent=2),
SIMILAR_COUNT=0,
COMMON_CAUSES="[]",
AVG_MTTR="Unknown"
)
response = await self._call_ollama(prompt)
try:
result = json.loads(response)
result["analysis_type"] = analysis_type
return result
except json.JSONDecodeError:
logger.error(f"Failed to parse LLM response as JSON: {response}")
return {"raw_response": response, "error": "JSON parse error"}
except Exception as e:
logger.error(f"Incident analysis error: {e}")
raise
async def chat(self, messages: list, **kwargs) -> str:
"""Send chat request to Anthropic"""
if not self.client:
raise RuntimeError("Anthropic client not initialized")
try:
response = await self.client.messages.create(
model=self.model,
max_tokens=2000,
messages=messages,
**kwargs
)
return response.content[0].text
except Exception as e:
logger.error(f"Anthropic API error: {e}")
raise
async def analyze_alerts(self, alerts: list) -> Dict[str, Any]:
"""Analyze alerts for correlation"""
try:
template = self.prompt_manager.get_prompt_template("alert_correlation")
prompt = template.format(
ALERT_COUNT=len(alerts),
TIME_WINDOW=5,
INCIDENT_COUNT=0,
ALERTS_JSON=json.dumps(alerts, indent=2)
)
response = await self.chat([{"role": "user", "content": prompt}])
try:
result = json.loads(response)
return result
except json.JSONDecodeError:
logger.error(f"Failed to parse LLM response as JSON: {response}")
return {"raw_response": response, "error": "JSON parse error"}
except Exception as e:
logger.error(f"Alert analysis error: {e}")
raise
async def analyze_incident(self, incident: Dict[str, Any], analysis_type: str) -> Dict[str, Any]:
"""Perform LLM analysis on incident"""
try:
if analysis_type == "classification":
template = self.prompt_manager.get_prompt_template("incident_classification")
elif analysis_type == "root_cause":
template = self.prompt_manager.get_prompt_template("root_cause_analysis")
elif analysis_type == "recommendation":
template = self.prompt_manager.get_prompt_template("incident_recommendation")
else:
raise ValueError(f"Unknown analysis type: {analysis_type}")
prompt = template.format(
INCIDENT_DETAILS=json.dumps(incident, indent=2),
RELATED_ALERTS=json.dumps(incident.get("alerts", []), indent=2),
SIMILAR_COUNT=0,
COMMON_CAUSES="[]"
)
response = await self.chat([{"role": "user", "content": prompt}])
try:
result = json.loads(response)
result["analysis_type"] = analysis_type
return result
except json.JSONDecodeError:
logger.error(f"Failed to parse LLM response as JSON: {response}")
return {"raw_response": response, "error": "JSON parse error"}
except Exception as e:
logger.error(f"Incident analysis error: {e}")
raise
class LLMFactory:
"""Factory for creating LLM clients"""
@staticmethod
def create_client(provider: str, **kwargs) -> LLMClient:
"""Create an LLM client based on provider"""
if provider.lower() == "ollama":
return OllamaClient(**kwargs)
elif provider.lower() == "openai":
return OpenAIClient(**kwargs)
elif provider.lower() == "anthropic":
return AnthropicClient(**kwargs)
else:
raise ValueError(f"Unknown LLM provider: {provider}")