IntegraChat / backend /api /services /rule_enhancer.py
nothingworry's picture
feat: Enhance admin rules with file upload, drag-and-drop, chunk processing, and improved UI
a477044
raw
history blame
6.88 kB
"""
Rule Enhancement Service using LLM
Analyzes rules for edge cases and improves them before saving to database.
"""
import os
from typing import List, Dict, Any, Optional
from ..services.llm_client import LLMClient
class RuleEnhancer:
"""
Uses LLM to analyze and enhance admin rules.
Identifies edge cases, improves patterns, and suggests better descriptions.
"""
def __init__(self, llm_client: Optional[LLMClient] = None):
self.llm = llm_client or LLMClient(
backend=os.getenv("LLM_BACKEND", "ollama"),
url=os.getenv("OLLAMA_URL"),
api_key=os.getenv("GROQ_API_KEY"),
model=os.getenv("OLLAMA_MODEL", "llama3.1:latest")
)
async def enhance_rule(
self,
rule_text: str,
existing_rules: Optional[List[str]] = None,
context: Optional[str] = None
) -> Dict[str, Any]:
"""
Enhance a single rule using LLM analysis.
Args:
rule_text: The original rule text
existing_rules: List of existing rules for context
context: Additional context about the rule
Returns:
Dictionary with enhanced rule data:
- rule: Enhanced rule text
- pattern: Improved regex pattern
- description: Better description
- severity: Suggested severity
- edge_cases: List of identified edge cases
- improvements: List of suggested improvements
"""
existing_context = ""
if existing_rules:
existing_context = "\n".join([f"- {r}" for r in existing_rules[:10]]) # Limit to 10 rules
context_text = f"\nAdditional context: {context}" if context else ""
prompt = f"""You are an expert in policy rule analysis and pattern matching. Analyze the following rule and enhance it.
Original Rule: "{rule_text}"
Existing Rules (for context):
{existing_context if existing_context else "None"}
{context_text}
Your task:
1. Analyze the rule for potential edge cases and improvements
2. Generate an improved regex pattern that catches more variations
3. Write a clear, comprehensive description
4. Suggest an appropriate severity level (low/medium/high/critical)
5. Identify edge cases that might be missed
6. Suggest improvements
Respond in JSON format with the following structure:
{{
"rule": "Enhanced rule text (improved version of original)",
"pattern": "Improved regex pattern (e.g., '.*password.*|.*pwd.*|.*passcode.*')",
"description": "Clear description of what this rule detects",
"severity": "low|medium|high|critical",
"edge_cases": ["Edge case 1", "Edge case 2", ...],
"improvements": ["Improvement 1", "Improvement 2", ...],
"keywords": ["keyword1", "keyword2", ...]
}}
Only return valid JSON, no additional text:"""
try:
# Add timeout protection - LLM calls can be slow
import asyncio
response = await asyncio.wait_for(
self.llm.simple_call(prompt, temperature=0.3),
timeout=30.0 # 30 second timeout per rule
)
# Clean up response - remove markdown code blocks if present
response = response.strip()
if response.startswith("```json"):
response = response[7:]
if response.startswith("```"):
response = response[3:]
if response.endswith("```"):
response = response[:-3]
response = response.strip()
import json
enhanced_data = json.loads(response)
# Ensure all required fields exist
result = {
"rule": enhanced_data.get("rule", rule_text),
"pattern": enhanced_data.get("pattern", rule_text),
"description": enhanced_data.get("description", rule_text),
"severity": enhanced_data.get("severity", "medium"),
"edge_cases": enhanced_data.get("edge_cases", []),
"improvements": enhanced_data.get("improvements", []),
"keywords": enhanced_data.get("keywords", [])
}
# Validate severity
if result["severity"] not in ["low", "medium", "high", "critical"]:
result["severity"] = "medium"
return result
except asyncio.TimeoutError:
# Timeout - return original rule
print(f"LLM enhancement timeout for rule: {rule_text[:50]}...")
return {
"rule": rule_text,
"pattern": rule_text,
"description": rule_text,
"severity": "medium",
"edge_cases": [],
"improvements": ["Enhancement timed out - using original rule"],
"keywords": []
}
except Exception as e:
# Fallback to original rule if LLM fails
print(f"LLM enhancement error: {e}")
return {
"rule": rule_text,
"pattern": rule_text,
"description": rule_text,
"severity": "medium",
"edge_cases": [],
"improvements": [f"Enhancement failed: {str(e)[:50]}"],
"keywords": []
}
async def enhance_rules_bulk(
self,
rules: List[str],
existing_rules: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
"""
Enhance multiple rules at once.
Processes rules sequentially with error handling to avoid timeout.
Args:
rules: List of rule texts to enhance
existing_rules: List of existing rules for context
Returns:
List of enhanced rule dictionaries
"""
enhanced_rules = []
for i, rule in enumerate(rules):
try:
# Enhance each rule individually with timeout protection
enhanced = await self.enhance_rule(rule, existing_rules)
enhanced_rules.append(enhanced)
except Exception as e:
# If enhancement fails for one rule, use original rule
# This ensures other rules can still be processed
print(f"Warning: Rule {i+1}/{len(rules)} enhancement failed: {e}")
enhanced_rules.append({
"rule": rule,
"pattern": rule,
"description": rule,
"severity": "medium",
"edge_cases": [],
"improvements": [f"Enhancement skipped due to error"],
"keywords": []
})
return enhanced_rules