Spaces:
Sleeping
Sleeping
| """ | |
| Rule Enhancement Service using LLM | |
| Analyzes rules for edge cases and improves them before saving to database. | |
| """ | |
| import os | |
| from typing import List, Dict, Any, Optional | |
| from ..services.llm_client import LLMClient | |
| class RuleEnhancer: | |
| """ | |
| Uses LLM to analyze and enhance admin rules. | |
| Identifies edge cases, improves patterns, and suggests better descriptions. | |
| """ | |
| def __init__(self, llm_client: Optional[LLMClient] = None): | |
| self.llm = llm_client or LLMClient( | |
| api_key=os.getenv("GROQ_API_KEY"), | |
| model=os.getenv("GROQ_MODEL") | |
| ) | |
| async def enhance_rule( | |
| self, | |
| rule_text: str, | |
| existing_rules: Optional[List[str]] = None, | |
| context: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Enhance a single rule using LLM analysis. | |
| Args: | |
| rule_text: The original rule text | |
| existing_rules: List of existing rules for context | |
| context: Additional context about the rule | |
| Returns: | |
| Dictionary with enhanced rule data: | |
| - rule: Enhanced rule text | |
| - pattern: Improved regex pattern | |
| - description: Better description | |
| - severity: Suggested severity | |
| - edge_cases: List of identified edge cases | |
| - improvements: List of suggested improvements | |
| """ | |
| existing_context = "" | |
| if existing_rules: | |
| existing_context = "\n".join([f"- {r}" for r in existing_rules[:10]]) # Limit to 10 rules | |
| context_text = f"\nAdditional context: {context}" if context else "" | |
| prompt = f"""You are an expert in policy rule analysis and pattern matching. Analyze the following rule and provide comprehensive enhancements. | |
| Original Rule: "{rule_text}" | |
| Existing Rules (for context): | |
| {existing_context if existing_context else "None"} | |
| {context_text} | |
| Your task: | |
| 1. Generate a human-readable explanation of what this rule does (2-3 sentences, plain English) | |
| 2. Provide 5-8 concrete examples of text/phrases that would match this rule's pattern | |
| 3. Suggest 3-5 missing patterns or variations that should also be caught | |
| 4. Analyze the rule for potential edge cases and improvements | |
| 5. Generate an improved regex pattern that catches more variations | |
| 6. Write a clear, comprehensive description | |
| 7. Suggest an appropriate severity level (low/medium/high/critical) | |
| 8. Identify edge cases that might be missed | |
| 9. Suggest improvements | |
| Respond in JSON format with the following structure: | |
| {{ | |
| "rule": "Enhanced rule text (improved version of original)", | |
| "pattern": "Improved regex pattern (e.g., '.*password.*|.*pwd.*|.*passcode.*')", | |
| "description": "Clear description of what this rule detects", | |
| "severity": "low|medium|high|critical", | |
| "explanation": "Human-readable explanation in 2-3 sentences explaining what this rule does and why it's important", | |
| "examples": [ | |
| "Example text that would match: 'Please share your password'", | |
| "Another example: 'My pwd is 12345'", | |
| "More examples..." | |
| ], | |
| "missing_patterns": [ | |
| "Pattern variation 1 that should be considered", | |
| "Pattern variation 2 that should be considered", | |
| "More suggestions..." | |
| ], | |
| "edge_cases": ["Edge case 1", "Edge case 2", ...], | |
| "improvements": ["Improvement 1", "Improvement 2", ...], | |
| "keywords": ["keyword1", "keyword2", ...] | |
| }} | |
| Only return valid JSON, no additional text:""" | |
| try: | |
| # Add timeout protection - LLM calls can be slow | |
| import asyncio | |
| response = await asyncio.wait_for( | |
| self.llm.simple_call(prompt, temperature=0.3), | |
| timeout=30.0 # 30 second timeout per rule | |
| ) | |
| # Clean up response - remove markdown code blocks if present | |
| response = response.strip() | |
| if response.startswith("```json"): | |
| response = response[7:] | |
| if response.startswith("```"): | |
| response = response[3:] | |
| if response.endswith("```"): | |
| response = response[:-3] | |
| response = response.strip() | |
| import json | |
| enhanced_data = json.loads(response) | |
| # Ensure all required fields exist | |
| result = { | |
| "rule": enhanced_data.get("rule", rule_text), | |
| "pattern": enhanced_data.get("pattern", rule_text), | |
| "description": enhanced_data.get("description", rule_text), | |
| "severity": enhanced_data.get("severity", "medium"), | |
| "explanation": enhanced_data.get("explanation", f"This rule detects: {rule_text}"), | |
| "examples": enhanced_data.get("examples", []), | |
| "missing_patterns": enhanced_data.get("missing_patterns", []), | |
| "edge_cases": enhanced_data.get("edge_cases", []), | |
| "improvements": enhanced_data.get("improvements", []), | |
| "keywords": enhanced_data.get("keywords", []) | |
| } | |
| # Validate severity | |
| if result["severity"] not in ["low", "medium", "high", "critical"]: | |
| result["severity"] = "medium" | |
| return result | |
| except asyncio.TimeoutError: | |
| # Timeout - generate basic explanation without LLM | |
| print(f"LLM enhancement timeout for rule: {rule_text[:50]}...") | |
| basic_explanation = self._generate_basic_explanation(rule_text) | |
| return { | |
| "rule": rule_text, | |
| "pattern": rule_text, | |
| "description": rule_text, | |
| "severity": "medium", | |
| "explanation": basic_explanation["explanation"], | |
| "examples": basic_explanation["examples"], | |
| "missing_patterns": basic_explanation["missing_patterns"], | |
| "edge_cases": [], | |
| "improvements": ["Enhancement timed out - using basic explanation"], | |
| "keywords": [] | |
| } | |
| except Exception as e: | |
| # Fallback to basic explanation if LLM fails | |
| print(f"LLM enhancement error: {e}") | |
| basic_explanation = self._generate_basic_explanation(rule_text) | |
| return { | |
| "rule": rule_text, | |
| "pattern": rule_text, | |
| "description": rule_text, | |
| "severity": "medium", | |
| "explanation": basic_explanation["explanation"], | |
| "examples": basic_explanation["examples"], | |
| "missing_patterns": basic_explanation["missing_patterns"], | |
| "edge_cases": [], | |
| "improvements": [f"Enhancement failed - using basic explanation"], | |
| "keywords": [] | |
| } | |
| async def enhance_rules_bulk( | |
| self, | |
| rules: List[str], | |
| existing_rules: Optional[List[str]] = None | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Enhance multiple rules at once. | |
| Processes rules sequentially with error handling to avoid timeout. | |
| Args: | |
| rules: List of rule texts to enhance | |
| existing_rules: List of existing rules for context | |
| Returns: | |
| List of enhanced rule dictionaries | |
| """ | |
| enhanced_rules = [] | |
| for i, rule in enumerate(rules): | |
| try: | |
| # Enhance each rule individually with timeout protection | |
| enhanced = await self.enhance_rule(rule, existing_rules) | |
| enhanced_rules.append(enhanced) | |
| except Exception as e: | |
| # If enhancement fails for one rule, use original rule | |
| # This ensures other rules can still be processed | |
| print(f"Warning: Rule {i+1}/{len(rules)} enhancement failed: {e}") | |
| # Generate basic explanation even on error | |
| basic_explanation = self._generate_basic_explanation(rule) | |
| enhanced_rules.append({ | |
| "rule": rule, | |
| "pattern": rule, | |
| "description": rule, | |
| "severity": "medium", | |
| "explanation": basic_explanation["explanation"], | |
| "examples": basic_explanation["examples"], | |
| "missing_patterns": basic_explanation["missing_patterns"], | |
| "edge_cases": [], | |
| "improvements": [f"Enhancement skipped - using basic explanation"], | |
| "keywords": [] | |
| }) | |
| return enhanced_rules | |
| def _generate_basic_explanation(self, rule_text: str) -> Dict[str, Any]: | |
| """ | |
| Generate a basic explanation without LLM when enhancement fails or times out. | |
| Uses pattern matching and keyword extraction to provide useful information. | |
| """ | |
| rule_lower = rule_text.lower() | |
| # Extract key concepts | |
| keywords = [] | |
| if any(word in rule_lower for word in ["password", "pwd", "passcode", "credential"]): | |
| keywords.append("authentication credentials") | |
| if any(word in rule_lower for word in ["api", "key", "token", "secret"]): | |
| keywords.append("API keys and tokens") | |
| if any(word in rule_lower for word in ["credit", "card", "payment", "bank"]): | |
| keywords.append("financial information") | |
| if any(word in rule_lower for word in ["share", "send", "disclose", "reveal"]): | |
| keywords.append("information sharing") | |
| if any(word in rule_lower for word in ["prevent", "block", "stop", "deny"]): | |
| keywords.append("prevention") | |
| if any(word in rule_lower for word in ["sensitive", "private", "confidential"]): | |
| keywords.append("sensitive data") | |
| # Generate explanation | |
| if keywords: | |
| explanation = f"This rule is designed to prevent sharing of {', '.join(keywords)}. It monitors conversations to detect attempts to disclose sensitive information that could compromise security or privacy." | |
| else: | |
| explanation = f"This rule monitors for: {rule_text}. It helps maintain security and compliance by detecting potentially sensitive information sharing." | |
| # Generate basic examples based on keywords | |
| examples = [] | |
| if "password" in rule_lower or "credential" in rule_lower: | |
| examples.extend([ | |
| "Can you share your password?", | |
| "My password is 12345", | |
| "What's your login pwd?", | |
| "Here's my passcode: 9876", | |
| "The credentials are admin/password123" | |
| ]) | |
| if "api" in rule_lower or "key" in rule_lower: | |
| examples.extend([ | |
| "My API key is sk-1234567890", | |
| "Here's the access token: xyz123", | |
| "The secret key is abc-def-ghi", | |
| "API token: bearer_abc123xyz" | |
| ]) | |
| if "credit" in rule_lower or "card" in rule_lower: | |
| examples.extend([ | |
| "My credit card number is 4532-1234-5678-9010", | |
| "CVV is 123", | |
| "Card expiry: 12/25" | |
| ]) | |
| if "sensitive" in rule_lower or "authentication" in rule_lower: | |
| examples.extend([ | |
| "Here's my login info", | |
| "I'll send you the credentials", | |
| "The password is...", | |
| "Can I share my account details?" | |
| ]) | |
| if not examples: | |
| # Generic examples based on rule text | |
| examples = [ | |
| f"Example: '{rule_text[:40]}...'", | |
| "Similar variations of the rule text", | |
| "Related phrases containing key terms from the rule" | |
| ] | |
| # Suggest missing patterns | |
| missing_patterns = [] | |
| if "password" in rule_lower: | |
| missing_patterns.extend([ | |
| "Consider variations: 'pwd', 'passcode', 'login credentials', 'auth info'" | |
| ]) | |
| if "api" in rule_lower or "key" in rule_lower: | |
| missing_patterns.extend([ | |
| "Consider: 'access token', 'secret key', 'auth token', 'bearer token'" | |
| ]) | |
| if "share" in rule_lower: | |
| missing_patterns.extend([ | |
| "Consider action verbs: 'send', 'disclose', 'reveal', 'provide', 'give'" | |
| ]) | |
| if "sensitive" in rule_lower: | |
| missing_patterns.extend([ | |
| "Consider synonyms: 'confidential', 'private', 'secret', 'classified'" | |
| ]) | |
| return { | |
| "explanation": explanation, | |
| "examples": examples[:8], # Limit to 8 examples | |
| "missing_patterns": missing_patterns[:5] # Limit to 5 suggestions | |
| } | |