File size: 12,909 Bytes
a477044
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0452a50
a477044
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
adf80ee
a477044
 
 
 
 
 
 
 
adf80ee
 
 
 
 
 
 
 
 
a477044
 
 
 
 
 
 
adf80ee
 
 
 
 
 
 
 
 
 
 
a477044
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
adf80ee
 
 
a477044
 
 
 
 
 
 
 
 
 
 
 
adf80ee
a477044
adf80ee
a477044
 
 
 
 
adf80ee
 
 
a477044
adf80ee
a477044
 
 
adf80ee
a477044
adf80ee
a477044
 
 
 
 
adf80ee
 
 
a477044
adf80ee
a477044
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
adf80ee
 
a477044
 
 
 
 
adf80ee
 
 
a477044
adf80ee
a477044
 
 
 
adf80ee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a477044
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
"""
Rule Enhancement Service using LLM
Analyzes rules for edge cases and improves them before saving to database.
"""

import os
from typing import List, Dict, Any, Optional
from ..services.llm_client import LLMClient


class RuleEnhancer:
    """
    Uses LLM to analyze and enhance admin rules.
    Identifies edge cases, improves patterns, and suggests better descriptions.
    """
    
    def __init__(self, llm_client: Optional[LLMClient] = None):
        self.llm = llm_client or LLMClient(
            api_key=os.getenv("GROQ_API_KEY"),
            model=os.getenv("GROQ_MODEL")
        )
    
    async def enhance_rule(
        self,
        rule_text: str,
        existing_rules: Optional[List[str]] = None,
        context: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        Enhance a single rule using LLM analysis.
        
        Args:
            rule_text: The original rule text
            existing_rules: List of existing rules for context
            context: Additional context about the rule
        
        Returns:
            Dictionary with enhanced rule data:
            - rule: Enhanced rule text
            - pattern: Improved regex pattern
            - description: Better description
            - severity: Suggested severity
            - edge_cases: List of identified edge cases
            - improvements: List of suggested improvements
        """
        existing_context = ""
        if existing_rules:
            existing_context = "\n".join([f"- {r}" for r in existing_rules[:10]])  # Limit to 10 rules
        
        context_text = f"\nAdditional context: {context}" if context else ""
        
        prompt = f"""You are an expert in policy rule analysis and pattern matching. Analyze the following rule and provide comprehensive enhancements.

Original Rule: "{rule_text}"

Existing Rules (for context):
{existing_context if existing_context else "None"}
{context_text}

Your task:
1. Generate a human-readable explanation of what this rule does (2-3 sentences, plain English)
2. Provide 5-8 concrete examples of text/phrases that would match this rule's pattern
3. Suggest 3-5 missing patterns or variations that should also be caught
4. Analyze the rule for potential edge cases and improvements
5. Generate an improved regex pattern that catches more variations
6. Write a clear, comprehensive description
7. Suggest an appropriate severity level (low/medium/high/critical)
8. Identify edge cases that might be missed
9. Suggest improvements

Respond in JSON format with the following structure:
{{
    "rule": "Enhanced rule text (improved version of original)",
    "pattern": "Improved regex pattern (e.g., '.*password.*|.*pwd.*|.*passcode.*')",
    "description": "Clear description of what this rule detects",
    "severity": "low|medium|high|critical",
    "explanation": "Human-readable explanation in 2-3 sentences explaining what this rule does and why it's important",
    "examples": [
        "Example text that would match: 'Please share your password'",
        "Another example: 'My pwd is 12345'",
        "More examples..."
    ],
    "missing_patterns": [
        "Pattern variation 1 that should be considered",
        "Pattern variation 2 that should be considered",
        "More suggestions..."
    ],
    "edge_cases": ["Edge case 1", "Edge case 2", ...],
    "improvements": ["Improvement 1", "Improvement 2", ...],
    "keywords": ["keyword1", "keyword2", ...]
}}

Only return valid JSON, no additional text:"""
        
        try:
            # Add timeout protection - LLM calls can be slow
            import asyncio
            response = await asyncio.wait_for(
                self.llm.simple_call(prompt, temperature=0.3),
                timeout=30.0  # 30 second timeout per rule
            )
            
            # Clean up response - remove markdown code blocks if present
            response = response.strip()
            if response.startswith("```json"):
                response = response[7:]
            if response.startswith("```"):
                response = response[3:]
            if response.endswith("```"):
                response = response[:-3]
            response = response.strip()
            
            import json
            enhanced_data = json.loads(response)
            
            # Ensure all required fields exist
            result = {
                "rule": enhanced_data.get("rule", rule_text),
                "pattern": enhanced_data.get("pattern", rule_text),
                "description": enhanced_data.get("description", rule_text),
                "severity": enhanced_data.get("severity", "medium"),
                "explanation": enhanced_data.get("explanation", f"This rule detects: {rule_text}"),
                "examples": enhanced_data.get("examples", []),
                "missing_patterns": enhanced_data.get("missing_patterns", []),
                "edge_cases": enhanced_data.get("edge_cases", []),
                "improvements": enhanced_data.get("improvements", []),
                "keywords": enhanced_data.get("keywords", [])
            }
            
            # Validate severity
            if result["severity"] not in ["low", "medium", "high", "critical"]:
                result["severity"] = "medium"
            
            return result
            
        except asyncio.TimeoutError:
            # Timeout - generate basic explanation without LLM
            print(f"LLM enhancement timeout for rule: {rule_text[:50]}...")
            basic_explanation = self._generate_basic_explanation(rule_text)
            return {
                "rule": rule_text,
                "pattern": rule_text,
                "description": rule_text,
                "severity": "medium",
                "explanation": basic_explanation["explanation"],
                "examples": basic_explanation["examples"],
                "missing_patterns": basic_explanation["missing_patterns"],
                "edge_cases": [],
                "improvements": ["Enhancement timed out - using basic explanation"],
                "keywords": []
            }
        except Exception as e:
            # Fallback to basic explanation if LLM fails
            print(f"LLM enhancement error: {e}")
            basic_explanation = self._generate_basic_explanation(rule_text)
            return {
                "rule": rule_text,
                "pattern": rule_text,
                "description": rule_text,
                "severity": "medium",
                "explanation": basic_explanation["explanation"],
                "examples": basic_explanation["examples"],
                "missing_patterns": basic_explanation["missing_patterns"],
                "edge_cases": [],
                "improvements": [f"Enhancement failed - using basic explanation"],
                "keywords": []
            }
    
    async def enhance_rules_bulk(
        self,
        rules: List[str],
        existing_rules: Optional[List[str]] = None
    ) -> List[Dict[str, Any]]:
        """
        Enhance multiple rules at once.
        Processes rules sequentially with error handling to avoid timeout.
        
        Args:
            rules: List of rule texts to enhance
            existing_rules: List of existing rules for context
        
        Returns:
            List of enhanced rule dictionaries
        """
        enhanced_rules = []
        
        for i, rule in enumerate(rules):
            try:
                # Enhance each rule individually with timeout protection
                enhanced = await self.enhance_rule(rule, existing_rules)
                enhanced_rules.append(enhanced)
            except Exception as e:
                # If enhancement fails for one rule, use original rule
                # This ensures other rules can still be processed
                print(f"Warning: Rule {i+1}/{len(rules)} enhancement failed: {e}")
                # Generate basic explanation even on error
                basic_explanation = self._generate_basic_explanation(rule)
                enhanced_rules.append({
                    "rule": rule,
                    "pattern": rule,
                    "description": rule,
                    "severity": "medium",
                    "explanation": basic_explanation["explanation"],
                    "examples": basic_explanation["examples"],
                    "missing_patterns": basic_explanation["missing_patterns"],
                    "edge_cases": [],
                    "improvements": [f"Enhancement skipped - using basic explanation"],
                    "keywords": []
                })
        
        return enhanced_rules
    
    def _generate_basic_explanation(self, rule_text: str) -> Dict[str, Any]:
        """
        Generate a basic explanation without LLM when enhancement fails or times out.
        Uses pattern matching and keyword extraction to provide useful information.
        """
        rule_lower = rule_text.lower()
        
        # Extract key concepts
        keywords = []
        if any(word in rule_lower for word in ["password", "pwd", "passcode", "credential"]):
            keywords.append("authentication credentials")
        if any(word in rule_lower for word in ["api", "key", "token", "secret"]):
            keywords.append("API keys and tokens")
        if any(word in rule_lower for word in ["credit", "card", "payment", "bank"]):
            keywords.append("financial information")
        if any(word in rule_lower for word in ["share", "send", "disclose", "reveal"]):
            keywords.append("information sharing")
        if any(word in rule_lower for word in ["prevent", "block", "stop", "deny"]):
            keywords.append("prevention")
        if any(word in rule_lower for word in ["sensitive", "private", "confidential"]):
            keywords.append("sensitive data")
        
        # Generate explanation
        if keywords:
            explanation = f"This rule is designed to prevent sharing of {', '.join(keywords)}. It monitors conversations to detect attempts to disclose sensitive information that could compromise security or privacy."
        else:
            explanation = f"This rule monitors for: {rule_text}. It helps maintain security and compliance by detecting potentially sensitive information sharing."
        
        # Generate basic examples based on keywords
        examples = []
        if "password" in rule_lower or "credential" in rule_lower:
            examples.extend([
                "Can you share your password?",
                "My password is 12345",
                "What's your login pwd?",
                "Here's my passcode: 9876",
                "The credentials are admin/password123"
            ])
        if "api" in rule_lower or "key" in rule_lower:
            examples.extend([
                "My API key is sk-1234567890",
                "Here's the access token: xyz123",
                "The secret key is abc-def-ghi",
                "API token: bearer_abc123xyz"
            ])
        if "credit" in rule_lower or "card" in rule_lower:
            examples.extend([
                "My credit card number is 4532-1234-5678-9010",
                "CVV is 123",
                "Card expiry: 12/25"
            ])
        if "sensitive" in rule_lower or "authentication" in rule_lower:
            examples.extend([
                "Here's my login info",
                "I'll send you the credentials",
                "The password is...",
                "Can I share my account details?"
            ])
        if not examples:
            # Generic examples based on rule text
            examples = [
                f"Example: '{rule_text[:40]}...'",
                "Similar variations of the rule text",
                "Related phrases containing key terms from the rule"
            ]
        
        # Suggest missing patterns
        missing_patterns = []
        if "password" in rule_lower:
            missing_patterns.extend([
                "Consider variations: 'pwd', 'passcode', 'login credentials', 'auth info'"
            ])
        if "api" in rule_lower or "key" in rule_lower:
            missing_patterns.extend([
                "Consider: 'access token', 'secret key', 'auth token', 'bearer token'"
            ])
        if "share" in rule_lower:
            missing_patterns.extend([
                "Consider action verbs: 'send', 'disclose', 'reveal', 'provide', 'give'"
            ])
        if "sensitive" in rule_lower:
            missing_patterns.extend([
                "Consider synonyms: 'confidential', 'private', 'secret', 'classified'"
            ])
        
        return {
            "explanation": explanation,
            "examples": examples[:8],  # Limit to 8 examples
            "missing_patterns": missing_patterns[:5]  # Limit to 5 suggestions
        }