Spaces:
Sleeping
Sleeping
File size: 6,881 Bytes
a477044 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
"""
Rule Enhancement Service using LLM
Analyzes rules for edge cases and improves them before saving to database.
"""
import os
from typing import List, Dict, Any, Optional
from ..services.llm_client import LLMClient
class RuleEnhancer:
"""
Uses LLM to analyze and enhance admin rules.
Identifies edge cases, improves patterns, and suggests better descriptions.
"""
def __init__(self, llm_client: Optional[LLMClient] = None):
self.llm = llm_client or LLMClient(
backend=os.getenv("LLM_BACKEND", "ollama"),
url=os.getenv("OLLAMA_URL"),
api_key=os.getenv("GROQ_API_KEY"),
model=os.getenv("OLLAMA_MODEL", "llama3.1:latest")
)
async def enhance_rule(
self,
rule_text: str,
existing_rules: Optional[List[str]] = None,
context: Optional[str] = None
) -> Dict[str, Any]:
"""
Enhance a single rule using LLM analysis.
Args:
rule_text: The original rule text
existing_rules: List of existing rules for context
context: Additional context about the rule
Returns:
Dictionary with enhanced rule data:
- rule: Enhanced rule text
- pattern: Improved regex pattern
- description: Better description
- severity: Suggested severity
- edge_cases: List of identified edge cases
- improvements: List of suggested improvements
"""
existing_context = ""
if existing_rules:
existing_context = "\n".join([f"- {r}" for r in existing_rules[:10]]) # Limit to 10 rules
context_text = f"\nAdditional context: {context}" if context else ""
prompt = f"""You are an expert in policy rule analysis and pattern matching. Analyze the following rule and enhance it.
Original Rule: "{rule_text}"
Existing Rules (for context):
{existing_context if existing_context else "None"}
{context_text}
Your task:
1. Analyze the rule for potential edge cases and improvements
2. Generate an improved regex pattern that catches more variations
3. Write a clear, comprehensive description
4. Suggest an appropriate severity level (low/medium/high/critical)
5. Identify edge cases that might be missed
6. Suggest improvements
Respond in JSON format with the following structure:
{{
"rule": "Enhanced rule text (improved version of original)",
"pattern": "Improved regex pattern (e.g., '.*password.*|.*pwd.*|.*passcode.*')",
"description": "Clear description of what this rule detects",
"severity": "low|medium|high|critical",
"edge_cases": ["Edge case 1", "Edge case 2", ...],
"improvements": ["Improvement 1", "Improvement 2", ...],
"keywords": ["keyword1", "keyword2", ...]
}}
Only return valid JSON, no additional text:"""
try:
# Add timeout protection - LLM calls can be slow
import asyncio
response = await asyncio.wait_for(
self.llm.simple_call(prompt, temperature=0.3),
timeout=30.0 # 30 second timeout per rule
)
# Clean up response - remove markdown code blocks if present
response = response.strip()
if response.startswith("```json"):
response = response[7:]
if response.startswith("```"):
response = response[3:]
if response.endswith("```"):
response = response[:-3]
response = response.strip()
import json
enhanced_data = json.loads(response)
# Ensure all required fields exist
result = {
"rule": enhanced_data.get("rule", rule_text),
"pattern": enhanced_data.get("pattern", rule_text),
"description": enhanced_data.get("description", rule_text),
"severity": enhanced_data.get("severity", "medium"),
"edge_cases": enhanced_data.get("edge_cases", []),
"improvements": enhanced_data.get("improvements", []),
"keywords": enhanced_data.get("keywords", [])
}
# Validate severity
if result["severity"] not in ["low", "medium", "high", "critical"]:
result["severity"] = "medium"
return result
except asyncio.TimeoutError:
# Timeout - return original rule
print(f"LLM enhancement timeout for rule: {rule_text[:50]}...")
return {
"rule": rule_text,
"pattern": rule_text,
"description": rule_text,
"severity": "medium",
"edge_cases": [],
"improvements": ["Enhancement timed out - using original rule"],
"keywords": []
}
except Exception as e:
# Fallback to original rule if LLM fails
print(f"LLM enhancement error: {e}")
return {
"rule": rule_text,
"pattern": rule_text,
"description": rule_text,
"severity": "medium",
"edge_cases": [],
"improvements": [f"Enhancement failed: {str(e)[:50]}"],
"keywords": []
}
async def enhance_rules_bulk(
self,
rules: List[str],
existing_rules: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
"""
Enhance multiple rules at once.
Processes rules sequentially with error handling to avoid timeout.
Args:
rules: List of rule texts to enhance
existing_rules: List of existing rules for context
Returns:
List of enhanced rule dictionaries
"""
enhanced_rules = []
for i, rule in enumerate(rules):
try:
# Enhance each rule individually with timeout protection
enhanced = await self.enhance_rule(rule, existing_rules)
enhanced_rules.append(enhanced)
except Exception as e:
# If enhancement fails for one rule, use original rule
# This ensures other rules can still be processed
print(f"Warning: Rule {i+1}/{len(rules)} enhancement failed: {e}")
enhanced_rules.append({
"rule": rule,
"pattern": rule,
"description": rule,
"severity": "medium",
"edge_cases": [],
"improvements": [f"Enhancement skipped due to error"],
"keywords": []
})
return enhanced_rules
|