""" Rule Enhancement Service using LLM Analyzes rules for edge cases and improves them before saving to database. """ import os from typing import List, Dict, Any, Optional from ..services.llm_client import LLMClient class RuleEnhancer: """ Uses LLM to analyze and enhance admin rules. Identifies edge cases, improves patterns, and suggests better descriptions. """ def __init__(self, llm_client: Optional[LLMClient] = None): self.llm = llm_client or LLMClient( backend=os.getenv("LLM_BACKEND", "ollama"), url=os.getenv("OLLAMA_URL"), api_key=os.getenv("GROQ_API_KEY"), model=os.getenv("OLLAMA_MODEL", "llama3.1:latest") ) async def enhance_rule( self, rule_text: str, existing_rules: Optional[List[str]] = None, context: Optional[str] = None ) -> Dict[str, Any]: """ Enhance a single rule using LLM analysis. Args: rule_text: The original rule text existing_rules: List of existing rules for context context: Additional context about the rule Returns: Dictionary with enhanced rule data: - rule: Enhanced rule text - pattern: Improved regex pattern - description: Better description - severity: Suggested severity - edge_cases: List of identified edge cases - improvements: List of suggested improvements """ existing_context = "" if existing_rules: existing_context = "\n".join([f"- {r}" for r in existing_rules[:10]]) # Limit to 10 rules context_text = f"\nAdditional context: {context}" if context else "" prompt = f"""You are an expert in policy rule analysis and pattern matching. Analyze the following rule and enhance it. Original Rule: "{rule_text}" Existing Rules (for context): {existing_context if existing_context else "None"} {context_text} Your task: 1. Analyze the rule for potential edge cases and improvements 2. Generate an improved regex pattern that catches more variations 3. Write a clear, comprehensive description 4. Suggest an appropriate severity level (low/medium/high/critical) 5. Identify edge cases that might be missed 6. Suggest improvements Respond in JSON format with the following structure: {{ "rule": "Enhanced rule text (improved version of original)", "pattern": "Improved regex pattern (e.g., '.*password.*|.*pwd.*|.*passcode.*')", "description": "Clear description of what this rule detects", "severity": "low|medium|high|critical", "edge_cases": ["Edge case 1", "Edge case 2", ...], "improvements": ["Improvement 1", "Improvement 2", ...], "keywords": ["keyword1", "keyword2", ...] }} Only return valid JSON, no additional text:""" try: # Add timeout protection - LLM calls can be slow import asyncio response = await asyncio.wait_for( self.llm.simple_call(prompt, temperature=0.3), timeout=30.0 # 30 second timeout per rule ) # Clean up response - remove markdown code blocks if present response = response.strip() if response.startswith("```json"): response = response[7:] if response.startswith("```"): response = response[3:] if response.endswith("```"): response = response[:-3] response = response.strip() import json enhanced_data = json.loads(response) # Ensure all required fields exist result = { "rule": enhanced_data.get("rule", rule_text), "pattern": enhanced_data.get("pattern", rule_text), "description": enhanced_data.get("description", rule_text), "severity": enhanced_data.get("severity", "medium"), "edge_cases": enhanced_data.get("edge_cases", []), "improvements": enhanced_data.get("improvements", []), "keywords": enhanced_data.get("keywords", []) } # Validate severity if result["severity"] not in ["low", "medium", "high", "critical"]: result["severity"] = "medium" return result except asyncio.TimeoutError: # Timeout - return original rule print(f"LLM enhancement timeout for rule: {rule_text[:50]}...") return { "rule": rule_text, "pattern": rule_text, "description": rule_text, "severity": "medium", "edge_cases": [], "improvements": ["Enhancement timed out - using original rule"], "keywords": [] } except Exception as e: # Fallback to original rule if LLM fails print(f"LLM enhancement error: {e}") return { "rule": rule_text, "pattern": rule_text, "description": rule_text, "severity": "medium", "edge_cases": [], "improvements": [f"Enhancement failed: {str(e)[:50]}"], "keywords": [] } async def enhance_rules_bulk( self, rules: List[str], existing_rules: Optional[List[str]] = None ) -> List[Dict[str, Any]]: """ Enhance multiple rules at once. Processes rules sequentially with error handling to avoid timeout. Args: rules: List of rule texts to enhance existing_rules: List of existing rules for context Returns: List of enhanced rule dictionaries """ enhanced_rules = [] for i, rule in enumerate(rules): try: # Enhance each rule individually with timeout protection enhanced = await self.enhance_rule(rule, existing_rules) enhanced_rules.append(enhanced) except Exception as e: # If enhancement fails for one rule, use original rule # This ensures other rules can still be processed print(f"Warning: Rule {i+1}/{len(rules)} enhancement failed: {e}") enhanced_rules.append({ "rule": rule, "pattern": rule, "description": rule, "severity": "medium", "edge_cases": [], "improvements": [f"Enhancement skipped due to error"], "keywords": [] }) return enhanced_rules