Spaces:
Sleeping
Sleeping
feat: Enhance admin rules with file upload, drag-and-drop, chunk processing, and improved UI
a477044
| """ | |
| Rule Enhancement Service using LLM | |
| Analyzes rules for edge cases and improves them before saving to database. | |
| """ | |
| import os | |
| from typing import List, Dict, Any, Optional | |
| from ..services.llm_client import LLMClient | |
| class RuleEnhancer: | |
| """ | |
| Uses LLM to analyze and enhance admin rules. | |
| Identifies edge cases, improves patterns, and suggests better descriptions. | |
| """ | |
| def __init__(self, llm_client: Optional[LLMClient] = None): | |
| self.llm = llm_client or LLMClient( | |
| backend=os.getenv("LLM_BACKEND", "ollama"), | |
| url=os.getenv("OLLAMA_URL"), | |
| api_key=os.getenv("GROQ_API_KEY"), | |
| model=os.getenv("OLLAMA_MODEL", "llama3.1:latest") | |
| ) | |
| async def enhance_rule( | |
| self, | |
| rule_text: str, | |
| existing_rules: Optional[List[str]] = None, | |
| context: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Enhance a single rule using LLM analysis. | |
| Args: | |
| rule_text: The original rule text | |
| existing_rules: List of existing rules for context | |
| context: Additional context about the rule | |
| Returns: | |
| Dictionary with enhanced rule data: | |
| - rule: Enhanced rule text | |
| - pattern: Improved regex pattern | |
| - description: Better description | |
| - severity: Suggested severity | |
| - edge_cases: List of identified edge cases | |
| - improvements: List of suggested improvements | |
| """ | |
| existing_context = "" | |
| if existing_rules: | |
| existing_context = "\n".join([f"- {r}" for r in existing_rules[:10]]) # Limit to 10 rules | |
| context_text = f"\nAdditional context: {context}" if context else "" | |
| prompt = f"""You are an expert in policy rule analysis and pattern matching. Analyze the following rule and enhance it. | |
| Original Rule: "{rule_text}" | |
| Existing Rules (for context): | |
| {existing_context if existing_context else "None"} | |
| {context_text} | |
| Your task: | |
| 1. Analyze the rule for potential edge cases and improvements | |
| 2. Generate an improved regex pattern that catches more variations | |
| 3. Write a clear, comprehensive description | |
| 4. Suggest an appropriate severity level (low/medium/high/critical) | |
| 5. Identify edge cases that might be missed | |
| 6. Suggest improvements | |
| Respond in JSON format with the following structure: | |
| {{ | |
| "rule": "Enhanced rule text (improved version of original)", | |
| "pattern": "Improved regex pattern (e.g., '.*password.*|.*pwd.*|.*passcode.*')", | |
| "description": "Clear description of what this rule detects", | |
| "severity": "low|medium|high|critical", | |
| "edge_cases": ["Edge case 1", "Edge case 2", ...], | |
| "improvements": ["Improvement 1", "Improvement 2", ...], | |
| "keywords": ["keyword1", "keyword2", ...] | |
| }} | |
| Only return valid JSON, no additional text:""" | |
| try: | |
| # Add timeout protection - LLM calls can be slow | |
| import asyncio | |
| response = await asyncio.wait_for( | |
| self.llm.simple_call(prompt, temperature=0.3), | |
| timeout=30.0 # 30 second timeout per rule | |
| ) | |
| # Clean up response - remove markdown code blocks if present | |
| response = response.strip() | |
| if response.startswith("```json"): | |
| response = response[7:] | |
| if response.startswith("```"): | |
| response = response[3:] | |
| if response.endswith("```"): | |
| response = response[:-3] | |
| response = response.strip() | |
| import json | |
| enhanced_data = json.loads(response) | |
| # Ensure all required fields exist | |
| result = { | |
| "rule": enhanced_data.get("rule", rule_text), | |
| "pattern": enhanced_data.get("pattern", rule_text), | |
| "description": enhanced_data.get("description", rule_text), | |
| "severity": enhanced_data.get("severity", "medium"), | |
| "edge_cases": enhanced_data.get("edge_cases", []), | |
| "improvements": enhanced_data.get("improvements", []), | |
| "keywords": enhanced_data.get("keywords", []) | |
| } | |
| # Validate severity | |
| if result["severity"] not in ["low", "medium", "high", "critical"]: | |
| result["severity"] = "medium" | |
| return result | |
| except asyncio.TimeoutError: | |
| # Timeout - return original rule | |
| print(f"LLM enhancement timeout for rule: {rule_text[:50]}...") | |
| return { | |
| "rule": rule_text, | |
| "pattern": rule_text, | |
| "description": rule_text, | |
| "severity": "medium", | |
| "edge_cases": [], | |
| "improvements": ["Enhancement timed out - using original rule"], | |
| "keywords": [] | |
| } | |
| except Exception as e: | |
| # Fallback to original rule if LLM fails | |
| print(f"LLM enhancement error: {e}") | |
| return { | |
| "rule": rule_text, | |
| "pattern": rule_text, | |
| "description": rule_text, | |
| "severity": "medium", | |
| "edge_cases": [], | |
| "improvements": [f"Enhancement failed: {str(e)[:50]}"], | |
| "keywords": [] | |
| } | |
| async def enhance_rules_bulk( | |
| self, | |
| rules: List[str], | |
| existing_rules: Optional[List[str]] = None | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Enhance multiple rules at once. | |
| Processes rules sequentially with error handling to avoid timeout. | |
| Args: | |
| rules: List of rule texts to enhance | |
| existing_rules: List of existing rules for context | |
| Returns: | |
| List of enhanced rule dictionaries | |
| """ | |
| enhanced_rules = [] | |
| for i, rule in enumerate(rules): | |
| try: | |
| # Enhance each rule individually with timeout protection | |
| enhanced = await self.enhance_rule(rule, existing_rules) | |
| enhanced_rules.append(enhanced) | |
| except Exception as e: | |
| # If enhancement fails for one rule, use original rule | |
| # This ensures other rules can still be processed | |
| print(f"Warning: Rule {i+1}/{len(rules)} enhancement failed: {e}") | |
| enhanced_rules.append({ | |
| "rule": rule, | |
| "pattern": rule, | |
| "description": rule, | |
| "severity": "medium", | |
| "edge_cases": [], | |
| "improvements": [f"Enhancement skipped due to error"], | |
| "keywords": [] | |
| }) | |
| return enhanced_rules | |