File size: 6,881 Bytes
a477044
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
"""
Rule Enhancement Service using LLM
Analyzes rules for edge cases and improves them before saving to database.
"""

import os
from typing import List, Dict, Any, Optional
from ..services.llm_client import LLMClient


class RuleEnhancer:
    """
    Uses LLM to analyze and enhance admin rules.
    Identifies edge cases, improves patterns, and suggests better descriptions.
    """
    
    def __init__(self, llm_client: Optional[LLMClient] = None):
        self.llm = llm_client or LLMClient(
            backend=os.getenv("LLM_BACKEND", "ollama"),
            url=os.getenv("OLLAMA_URL"),
            api_key=os.getenv("GROQ_API_KEY"),
            model=os.getenv("OLLAMA_MODEL", "llama3.1:latest")
        )
    
    async def enhance_rule(
        self,
        rule_text: str,
        existing_rules: Optional[List[str]] = None,
        context: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        Enhance a single rule using LLM analysis.
        
        Args:
            rule_text: The original rule text
            existing_rules: List of existing rules for context
            context: Additional context about the rule
        
        Returns:
            Dictionary with enhanced rule data:
            - rule: Enhanced rule text
            - pattern: Improved regex pattern
            - description: Better description
            - severity: Suggested severity
            - edge_cases: List of identified edge cases
            - improvements: List of suggested improvements
        """
        existing_context = ""
        if existing_rules:
            existing_context = "\n".join([f"- {r}" for r in existing_rules[:10]])  # Limit to 10 rules
        
        context_text = f"\nAdditional context: {context}" if context else ""
        
        prompt = f"""You are an expert in policy rule analysis and pattern matching. Analyze the following rule and enhance it.

Original Rule: "{rule_text}"

Existing Rules (for context):
{existing_context if existing_context else "None"}
{context_text}

Your task:
1. Analyze the rule for potential edge cases and improvements
2. Generate an improved regex pattern that catches more variations
3. Write a clear, comprehensive description
4. Suggest an appropriate severity level (low/medium/high/critical)
5. Identify edge cases that might be missed
6. Suggest improvements

Respond in JSON format with the following structure:
{{
    "rule": "Enhanced rule text (improved version of original)",
    "pattern": "Improved regex pattern (e.g., '.*password.*|.*pwd.*|.*passcode.*')",
    "description": "Clear description of what this rule detects",
    "severity": "low|medium|high|critical",
    "edge_cases": ["Edge case 1", "Edge case 2", ...],
    "improvements": ["Improvement 1", "Improvement 2", ...],
    "keywords": ["keyword1", "keyword2", ...]
}}

Only return valid JSON, no additional text:"""
        
        try:
            # Add timeout protection - LLM calls can be slow
            import asyncio
            response = await asyncio.wait_for(
                self.llm.simple_call(prompt, temperature=0.3),
                timeout=30.0  # 30 second timeout per rule
            )
            
            # Clean up response - remove markdown code blocks if present
            response = response.strip()
            if response.startswith("```json"):
                response = response[7:]
            if response.startswith("```"):
                response = response[3:]
            if response.endswith("```"):
                response = response[:-3]
            response = response.strip()
            
            import json
            enhanced_data = json.loads(response)
            
            # Ensure all required fields exist
            result = {
                "rule": enhanced_data.get("rule", rule_text),
                "pattern": enhanced_data.get("pattern", rule_text),
                "description": enhanced_data.get("description", rule_text),
                "severity": enhanced_data.get("severity", "medium"),
                "edge_cases": enhanced_data.get("edge_cases", []),
                "improvements": enhanced_data.get("improvements", []),
                "keywords": enhanced_data.get("keywords", [])
            }
            
            # Validate severity
            if result["severity"] not in ["low", "medium", "high", "critical"]:
                result["severity"] = "medium"
            
            return result
            
        except asyncio.TimeoutError:
            # Timeout - return original rule
            print(f"LLM enhancement timeout for rule: {rule_text[:50]}...")
            return {
                "rule": rule_text,
                "pattern": rule_text,
                "description": rule_text,
                "severity": "medium",
                "edge_cases": [],
                "improvements": ["Enhancement timed out - using original rule"],
                "keywords": []
            }
        except Exception as e:
            # Fallback to original rule if LLM fails
            print(f"LLM enhancement error: {e}")
            return {
                "rule": rule_text,
                "pattern": rule_text,
                "description": rule_text,
                "severity": "medium",
                "edge_cases": [],
                "improvements": [f"Enhancement failed: {str(e)[:50]}"],
                "keywords": []
            }
    
    async def enhance_rules_bulk(
        self,
        rules: List[str],
        existing_rules: Optional[List[str]] = None
    ) -> List[Dict[str, Any]]:
        """
        Enhance multiple rules at once.
        Processes rules sequentially with error handling to avoid timeout.
        
        Args:
            rules: List of rule texts to enhance
            existing_rules: List of existing rules for context
        
        Returns:
            List of enhanced rule dictionaries
        """
        enhanced_rules = []
        
        for i, rule in enumerate(rules):
            try:
                # Enhance each rule individually with timeout protection
                enhanced = await self.enhance_rule(rule, existing_rules)
                enhanced_rules.append(enhanced)
            except Exception as e:
                # If enhancement fails for one rule, use original rule
                # This ensures other rules can still be processed
                print(f"Warning: Rule {i+1}/{len(rules)} enhancement failed: {e}")
                enhanced_rules.append({
                    "rule": rule,
                    "pattern": rule,
                    "description": rule,
                    "severity": "medium",
                    "edge_cases": [],
                    "improvements": [f"Enhancement skipped due to error"],
                    "keywords": []
                })
        
        return enhanced_rules