""" Rule Enhancement Service using LLM Analyzes rules for edge cases and improves them before saving to database. """ import os from typing import List, Dict, Any, Optional from ..services.llm_client import LLMClient class RuleEnhancer: """ Uses LLM to analyze and enhance admin rules. Identifies edge cases, improves patterns, and suggests better descriptions. """ def __init__(self, llm_client: Optional[LLMClient] = None): self.llm = llm_client or LLMClient( api_key=os.getenv("GROQ_API_KEY"), model=os.getenv("GROQ_MODEL") ) async def enhance_rule( self, rule_text: str, existing_rules: Optional[List[str]] = None, context: Optional[str] = None ) -> Dict[str, Any]: """ Enhance a single rule using LLM analysis. Args: rule_text: The original rule text existing_rules: List of existing rules for context context: Additional context about the rule Returns: Dictionary with enhanced rule data: - rule: Enhanced rule text - pattern: Improved regex pattern - description: Better description - severity: Suggested severity - edge_cases: List of identified edge cases - improvements: List of suggested improvements """ existing_context = "" if existing_rules: existing_context = "\n".join([f"- {r}" for r in existing_rules[:10]]) # Limit to 10 rules context_text = f"\nAdditional context: {context}" if context else "" prompt = f"""You are an expert in policy rule analysis and pattern matching. Analyze the following rule and provide comprehensive enhancements. Original Rule: "{rule_text}" Existing Rules (for context): {existing_context if existing_context else "None"} {context_text} Your task: 1. Generate a human-readable explanation of what this rule does (2-3 sentences, plain English) 2. Provide 5-8 concrete examples of text/phrases that would match this rule's pattern 3. Suggest 3-5 missing patterns or variations that should also be caught 4. Analyze the rule for potential edge cases and improvements 5. Generate an improved regex pattern that catches more variations 6. Write a clear, comprehensive description 7. Suggest an appropriate severity level (low/medium/high/critical) 8. Identify edge cases that might be missed 9. Suggest improvements Respond in JSON format with the following structure: {{ "rule": "Enhanced rule text (improved version of original)", "pattern": "Improved regex pattern (e.g., '.*password.*|.*pwd.*|.*passcode.*')", "description": "Clear description of what this rule detects", "severity": "low|medium|high|critical", "explanation": "Human-readable explanation in 2-3 sentences explaining what this rule does and why it's important", "examples": [ "Example text that would match: 'Please share your password'", "Another example: 'My pwd is 12345'", "More examples..." ], "missing_patterns": [ "Pattern variation 1 that should be considered", "Pattern variation 2 that should be considered", "More suggestions..." ], "edge_cases": ["Edge case 1", "Edge case 2", ...], "improvements": ["Improvement 1", "Improvement 2", ...], "keywords": ["keyword1", "keyword2", ...] }} Only return valid JSON, no additional text:""" try: # Add timeout protection - LLM calls can be slow import asyncio response = await asyncio.wait_for( self.llm.simple_call(prompt, temperature=0.3), timeout=30.0 # 30 second timeout per rule ) # Clean up response - remove markdown code blocks if present response = response.strip() if response.startswith("```json"): response = response[7:] if response.startswith("```"): response = response[3:] if response.endswith("```"): response = response[:-3] response = response.strip() import json enhanced_data = json.loads(response) # Ensure all required fields exist result = { "rule": enhanced_data.get("rule", rule_text), "pattern": enhanced_data.get("pattern", rule_text), "description": enhanced_data.get("description", rule_text), "severity": enhanced_data.get("severity", "medium"), "explanation": enhanced_data.get("explanation", f"This rule detects: {rule_text}"), "examples": enhanced_data.get("examples", []), "missing_patterns": enhanced_data.get("missing_patterns", []), "edge_cases": enhanced_data.get("edge_cases", []), "improvements": enhanced_data.get("improvements", []), "keywords": enhanced_data.get("keywords", []) } # Validate severity if result["severity"] not in ["low", "medium", "high", "critical"]: result["severity"] = "medium" return result except asyncio.TimeoutError: # Timeout - generate basic explanation without LLM print(f"LLM enhancement timeout for rule: {rule_text[:50]}...") basic_explanation = self._generate_basic_explanation(rule_text) return { "rule": rule_text, "pattern": rule_text, "description": rule_text, "severity": "medium", "explanation": basic_explanation["explanation"], "examples": basic_explanation["examples"], "missing_patterns": basic_explanation["missing_patterns"], "edge_cases": [], "improvements": ["Enhancement timed out - using basic explanation"], "keywords": [] } except Exception as e: # Fallback to basic explanation if LLM fails print(f"LLM enhancement error: {e}") basic_explanation = self._generate_basic_explanation(rule_text) return { "rule": rule_text, "pattern": rule_text, "description": rule_text, "severity": "medium", "explanation": basic_explanation["explanation"], "examples": basic_explanation["examples"], "missing_patterns": basic_explanation["missing_patterns"], "edge_cases": [], "improvements": [f"Enhancement failed - using basic explanation"], "keywords": [] } async def enhance_rules_bulk( self, rules: List[str], existing_rules: Optional[List[str]] = None ) -> List[Dict[str, Any]]: """ Enhance multiple rules at once. Processes rules sequentially with error handling to avoid timeout. Args: rules: List of rule texts to enhance existing_rules: List of existing rules for context Returns: List of enhanced rule dictionaries """ enhanced_rules = [] for i, rule in enumerate(rules): try: # Enhance each rule individually with timeout protection enhanced = await self.enhance_rule(rule, existing_rules) enhanced_rules.append(enhanced) except Exception as e: # If enhancement fails for one rule, use original rule # This ensures other rules can still be processed print(f"Warning: Rule {i+1}/{len(rules)} enhancement failed: {e}") # Generate basic explanation even on error basic_explanation = self._generate_basic_explanation(rule) enhanced_rules.append({ "rule": rule, "pattern": rule, "description": rule, "severity": "medium", "explanation": basic_explanation["explanation"], "examples": basic_explanation["examples"], "missing_patterns": basic_explanation["missing_patterns"], "edge_cases": [], "improvements": [f"Enhancement skipped - using basic explanation"], "keywords": [] }) return enhanced_rules def _generate_basic_explanation(self, rule_text: str) -> Dict[str, Any]: """ Generate a basic explanation without LLM when enhancement fails or times out. Uses pattern matching and keyword extraction to provide useful information. """ rule_lower = rule_text.lower() # Extract key concepts keywords = [] if any(word in rule_lower for word in ["password", "pwd", "passcode", "credential"]): keywords.append("authentication credentials") if any(word in rule_lower for word in ["api", "key", "token", "secret"]): keywords.append("API keys and tokens") if any(word in rule_lower for word in ["credit", "card", "payment", "bank"]): keywords.append("financial information") if any(word in rule_lower for word in ["share", "send", "disclose", "reveal"]): keywords.append("information sharing") if any(word in rule_lower for word in ["prevent", "block", "stop", "deny"]): keywords.append("prevention") if any(word in rule_lower for word in ["sensitive", "private", "confidential"]): keywords.append("sensitive data") # Generate explanation if keywords: explanation = f"This rule is designed to prevent sharing of {', '.join(keywords)}. It monitors conversations to detect attempts to disclose sensitive information that could compromise security or privacy." else: explanation = f"This rule monitors for: {rule_text}. It helps maintain security and compliance by detecting potentially sensitive information sharing." # Generate basic examples based on keywords examples = [] if "password" in rule_lower or "credential" in rule_lower: examples.extend([ "Can you share your password?", "My password is 12345", "What's your login pwd?", "Here's my passcode: 9876", "The credentials are admin/password123" ]) if "api" in rule_lower or "key" in rule_lower: examples.extend([ "My API key is sk-1234567890", "Here's the access token: xyz123", "The secret key is abc-def-ghi", "API token: bearer_abc123xyz" ]) if "credit" in rule_lower or "card" in rule_lower: examples.extend([ "My credit card number is 4532-1234-5678-9010", "CVV is 123", "Card expiry: 12/25" ]) if "sensitive" in rule_lower or "authentication" in rule_lower: examples.extend([ "Here's my login info", "I'll send you the credentials", "The password is...", "Can I share my account details?" ]) if not examples: # Generic examples based on rule text examples = [ f"Example: '{rule_text[:40]}...'", "Similar variations of the rule text", "Related phrases containing key terms from the rule" ] # Suggest missing patterns missing_patterns = [] if "password" in rule_lower: missing_patterns.extend([ "Consider variations: 'pwd', 'passcode', 'login credentials', 'auth info'" ]) if "api" in rule_lower or "key" in rule_lower: missing_patterns.extend([ "Consider: 'access token', 'secret key', 'auth token', 'bearer token'" ]) if "share" in rule_lower: missing_patterns.extend([ "Consider action verbs: 'send', 'disclose', 'reveal', 'provide', 'give'" ]) if "sensitive" in rule_lower: missing_patterns.extend([ "Consider synonyms: 'confidential', 'private', 'secret', 'classified'" ]) return { "explanation": explanation, "examples": examples[:8], # Limit to 8 examples "missing_patterns": missing_patterns[:5] # Limit to 5 suggestions }