Spaces:
Sleeping
Sleeping
File size: 12,909 Bytes
a477044 0452a50 a477044 adf80ee a477044 adf80ee a477044 adf80ee a477044 adf80ee a477044 adf80ee a477044 adf80ee a477044 adf80ee a477044 adf80ee a477044 adf80ee a477044 adf80ee a477044 adf80ee a477044 adf80ee a477044 adf80ee a477044 adf80ee a477044 adf80ee a477044 adf80ee a477044 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 |
"""
Rule Enhancement Service using LLM
Analyzes rules for edge cases and improves them before saving to database.
"""
import os
from typing import List, Dict, Any, Optional
from ..services.llm_client import LLMClient
class RuleEnhancer:
"""
Uses LLM to analyze and enhance admin rules.
Identifies edge cases, improves patterns, and suggests better descriptions.
"""
def __init__(self, llm_client: Optional[LLMClient] = None):
self.llm = llm_client or LLMClient(
api_key=os.getenv("GROQ_API_KEY"),
model=os.getenv("GROQ_MODEL")
)
async def enhance_rule(
self,
rule_text: str,
existing_rules: Optional[List[str]] = None,
context: Optional[str] = None
) -> Dict[str, Any]:
"""
Enhance a single rule using LLM analysis.
Args:
rule_text: The original rule text
existing_rules: List of existing rules for context
context: Additional context about the rule
Returns:
Dictionary with enhanced rule data:
- rule: Enhanced rule text
- pattern: Improved regex pattern
- description: Better description
- severity: Suggested severity
- edge_cases: List of identified edge cases
- improvements: List of suggested improvements
"""
existing_context = ""
if existing_rules:
existing_context = "\n".join([f"- {r}" for r in existing_rules[:10]]) # Limit to 10 rules
context_text = f"\nAdditional context: {context}" if context else ""
prompt = f"""You are an expert in policy rule analysis and pattern matching. Analyze the following rule and provide comprehensive enhancements.
Original Rule: "{rule_text}"
Existing Rules (for context):
{existing_context if existing_context else "None"}
{context_text}
Your task:
1. Generate a human-readable explanation of what this rule does (2-3 sentences, plain English)
2. Provide 5-8 concrete examples of text/phrases that would match this rule's pattern
3. Suggest 3-5 missing patterns or variations that should also be caught
4. Analyze the rule for potential edge cases and improvements
5. Generate an improved regex pattern that catches more variations
6. Write a clear, comprehensive description
7. Suggest an appropriate severity level (low/medium/high/critical)
8. Identify edge cases that might be missed
9. Suggest improvements
Respond in JSON format with the following structure:
{{
"rule": "Enhanced rule text (improved version of original)",
"pattern": "Improved regex pattern (e.g., '.*password.*|.*pwd.*|.*passcode.*')",
"description": "Clear description of what this rule detects",
"severity": "low|medium|high|critical",
"explanation": "Human-readable explanation in 2-3 sentences explaining what this rule does and why it's important",
"examples": [
"Example text that would match: 'Please share your password'",
"Another example: 'My pwd is 12345'",
"More examples..."
],
"missing_patterns": [
"Pattern variation 1 that should be considered",
"Pattern variation 2 that should be considered",
"More suggestions..."
],
"edge_cases": ["Edge case 1", "Edge case 2", ...],
"improvements": ["Improvement 1", "Improvement 2", ...],
"keywords": ["keyword1", "keyword2", ...]
}}
Only return valid JSON, no additional text:"""
try:
# Add timeout protection - LLM calls can be slow
import asyncio
response = await asyncio.wait_for(
self.llm.simple_call(prompt, temperature=0.3),
timeout=30.0 # 30 second timeout per rule
)
# Clean up response - remove markdown code blocks if present
response = response.strip()
if response.startswith("```json"):
response = response[7:]
if response.startswith("```"):
response = response[3:]
if response.endswith("```"):
response = response[:-3]
response = response.strip()
import json
enhanced_data = json.loads(response)
# Ensure all required fields exist
result = {
"rule": enhanced_data.get("rule", rule_text),
"pattern": enhanced_data.get("pattern", rule_text),
"description": enhanced_data.get("description", rule_text),
"severity": enhanced_data.get("severity", "medium"),
"explanation": enhanced_data.get("explanation", f"This rule detects: {rule_text}"),
"examples": enhanced_data.get("examples", []),
"missing_patterns": enhanced_data.get("missing_patterns", []),
"edge_cases": enhanced_data.get("edge_cases", []),
"improvements": enhanced_data.get("improvements", []),
"keywords": enhanced_data.get("keywords", [])
}
# Validate severity
if result["severity"] not in ["low", "medium", "high", "critical"]:
result["severity"] = "medium"
return result
except asyncio.TimeoutError:
# Timeout - generate basic explanation without LLM
print(f"LLM enhancement timeout for rule: {rule_text[:50]}...")
basic_explanation = self._generate_basic_explanation(rule_text)
return {
"rule": rule_text,
"pattern": rule_text,
"description": rule_text,
"severity": "medium",
"explanation": basic_explanation["explanation"],
"examples": basic_explanation["examples"],
"missing_patterns": basic_explanation["missing_patterns"],
"edge_cases": [],
"improvements": ["Enhancement timed out - using basic explanation"],
"keywords": []
}
except Exception as e:
# Fallback to basic explanation if LLM fails
print(f"LLM enhancement error: {e}")
basic_explanation = self._generate_basic_explanation(rule_text)
return {
"rule": rule_text,
"pattern": rule_text,
"description": rule_text,
"severity": "medium",
"explanation": basic_explanation["explanation"],
"examples": basic_explanation["examples"],
"missing_patterns": basic_explanation["missing_patterns"],
"edge_cases": [],
"improvements": [f"Enhancement failed - using basic explanation"],
"keywords": []
}
async def enhance_rules_bulk(
self,
rules: List[str],
existing_rules: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
"""
Enhance multiple rules at once.
Processes rules sequentially with error handling to avoid timeout.
Args:
rules: List of rule texts to enhance
existing_rules: List of existing rules for context
Returns:
List of enhanced rule dictionaries
"""
enhanced_rules = []
for i, rule in enumerate(rules):
try:
# Enhance each rule individually with timeout protection
enhanced = await self.enhance_rule(rule, existing_rules)
enhanced_rules.append(enhanced)
except Exception as e:
# If enhancement fails for one rule, use original rule
# This ensures other rules can still be processed
print(f"Warning: Rule {i+1}/{len(rules)} enhancement failed: {e}")
# Generate basic explanation even on error
basic_explanation = self._generate_basic_explanation(rule)
enhanced_rules.append({
"rule": rule,
"pattern": rule,
"description": rule,
"severity": "medium",
"explanation": basic_explanation["explanation"],
"examples": basic_explanation["examples"],
"missing_patterns": basic_explanation["missing_patterns"],
"edge_cases": [],
"improvements": [f"Enhancement skipped - using basic explanation"],
"keywords": []
})
return enhanced_rules
def _generate_basic_explanation(self, rule_text: str) -> Dict[str, Any]:
"""
Generate a basic explanation without LLM when enhancement fails or times out.
Uses pattern matching and keyword extraction to provide useful information.
"""
rule_lower = rule_text.lower()
# Extract key concepts
keywords = []
if any(word in rule_lower for word in ["password", "pwd", "passcode", "credential"]):
keywords.append("authentication credentials")
if any(word in rule_lower for word in ["api", "key", "token", "secret"]):
keywords.append("API keys and tokens")
if any(word in rule_lower for word in ["credit", "card", "payment", "bank"]):
keywords.append("financial information")
if any(word in rule_lower for word in ["share", "send", "disclose", "reveal"]):
keywords.append("information sharing")
if any(word in rule_lower for word in ["prevent", "block", "stop", "deny"]):
keywords.append("prevention")
if any(word in rule_lower for word in ["sensitive", "private", "confidential"]):
keywords.append("sensitive data")
# Generate explanation
if keywords:
explanation = f"This rule is designed to prevent sharing of {', '.join(keywords)}. It monitors conversations to detect attempts to disclose sensitive information that could compromise security or privacy."
else:
explanation = f"This rule monitors for: {rule_text}. It helps maintain security and compliance by detecting potentially sensitive information sharing."
# Generate basic examples based on keywords
examples = []
if "password" in rule_lower or "credential" in rule_lower:
examples.extend([
"Can you share your password?",
"My password is 12345",
"What's your login pwd?",
"Here's my passcode: 9876",
"The credentials are admin/password123"
])
if "api" in rule_lower or "key" in rule_lower:
examples.extend([
"My API key is sk-1234567890",
"Here's the access token: xyz123",
"The secret key is abc-def-ghi",
"API token: bearer_abc123xyz"
])
if "credit" in rule_lower or "card" in rule_lower:
examples.extend([
"My credit card number is 4532-1234-5678-9010",
"CVV is 123",
"Card expiry: 12/25"
])
if "sensitive" in rule_lower or "authentication" in rule_lower:
examples.extend([
"Here's my login info",
"I'll send you the credentials",
"The password is...",
"Can I share my account details?"
])
if not examples:
# Generic examples based on rule text
examples = [
f"Example: '{rule_text[:40]}...'",
"Similar variations of the rule text",
"Related phrases containing key terms from the rule"
]
# Suggest missing patterns
missing_patterns = []
if "password" in rule_lower:
missing_patterns.extend([
"Consider variations: 'pwd', 'passcode', 'login credentials', 'auth info'"
])
if "api" in rule_lower or "key" in rule_lower:
missing_patterns.extend([
"Consider: 'access token', 'secret key', 'auth token', 'bearer token'"
])
if "share" in rule_lower:
missing_patterns.extend([
"Consider action verbs: 'send', 'disclose', 'reveal', 'provide', 'give'"
])
if "sensitive" in rule_lower:
missing_patterns.extend([
"Consider synonyms: 'confidential', 'private', 'secret', 'classified'"
])
return {
"explanation": explanation,
"examples": examples[:8], # Limit to 8 examples
"missing_patterns": missing_patterns[:5] # Limit to 5 suggestions
}
|