Nadasr commited on
Commit
1b963f1
·
verified ·
1 Parent(s): c9f8ea8

Upload 2 files

Browse files
Files changed (2) hide show
  1. utils/logger.py +81 -0
  2. utils/validators.py +107 -0
utils/logger.py ADDED
@@ -0,0 +1,81 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Logging utility for agent actions - Policy Summarizer
3
+ """
4
+ import logging
5
+ import time
6
+ from typing import Optional, List
7
+ from functools import wraps
8
+
9
+ # Configure logging
10
+ logging.basicConfig(
11
+ level=logging.INFO,
12
+ format='%(asctime)s | %(levelname)s | %(name)s | %(message)s',
13
+ datefmt='%Y-%m-%d %H:%M:%S'
14
+ )
15
+
16
+ logger = logging.getLogger('PolicySummarizer')
17
+
18
+ # Store logs for UI display
19
+ _agent_logs: List[dict] = []
20
+
21
+
22
+ def get_logs() -> List[dict]:
23
+ """Get all logged agent actions"""
24
+ return _agent_logs.copy()
25
+
26
+
27
+ def clear_logs():
28
+ """Clear all logs"""
29
+ global _agent_logs
30
+ _agent_logs = []
31
+
32
+
33
+ def log_agent_action(
34
+ agent_name: str,
35
+ action: str,
36
+ input_summary: str,
37
+ output_summary: str,
38
+ duration_seconds: float,
39
+ success: bool = True,
40
+ error: Optional[str] = None
41
+ ):
42
+ """Log an agent action without sensitive data."""
43
+ log_entry = {
44
+ "agent_name": agent_name,
45
+ "action": action,
46
+ "input_summary": input_summary[:200] + "..." if len(input_summary) > 200 else input_summary,
47
+ "output_summary": output_summary[:200] + "..." if len(output_summary) > 200 else output_summary,
48
+ "duration_seconds": round(duration_seconds, 2),
49
+ "success": success,
50
+ "error": error
51
+ }
52
+
53
+ _agent_logs.append(log_entry)
54
+
55
+ status = "✓" if success else "✗"
56
+ logger.info(f"{status} [{agent_name}] {action} ({duration_seconds:.2f}s)")
57
+
58
+ if error:
59
+ logger.error(f" Error: {error}")
60
+
61
+
62
+ def format_logs_for_display() -> str:
63
+ """Format logs for display in UI"""
64
+ if not _agent_logs:
65
+ return "No logs yet."
66
+
67
+ lines = ["## Agent Activity Log\n"]
68
+
69
+ for i, log in enumerate(_agent_logs, 1):
70
+ status = "✅" if log["success"] else "❌"
71
+ lines.append(f"### Step {i}: {log['agent_name']}")
72
+ lines.append(f"- **Action:** {log['action']}")
73
+ lines.append(f"- **Status:** {status}")
74
+ lines.append(f"- **Duration:** {log['duration_seconds']}s")
75
+
76
+ if log.get("error"):
77
+ lines.append(f"- **Error:** {log['error']}")
78
+
79
+ lines.append("")
80
+
81
+ return "\n".join(lines)
utils/validators.py ADDED
@@ -0,0 +1,107 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Input validation utilities - Policy Summarizer
3
+ """
4
+ import re
5
+ from urllib.parse import urlparse
6
+ from typing import Tuple
7
+
8
+ # Maximum content length to process
9
+ MAX_CONTENT_LENGTH = 50000
10
+
11
+ # URL validation pattern
12
+ URL_PATTERN = re.compile(
13
+ r'^https?://'
14
+ r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|'
15
+ r'localhost|'
16
+ r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
17
+ r'(?::\d+)?'
18
+ r'(?:/?|[/?]\S+)$', re.IGNORECASE)
19
+
20
+
21
+ def validate_url(url: str) -> Tuple[bool, str]:
22
+ """Validate if the URL is valid and safe to scrape."""
23
+ if not url or not isinstance(url, str):
24
+ return False, "URL cannot be empty"
25
+
26
+ url = url.strip()
27
+
28
+ if len(url) > 2048:
29
+ return False, "URL is too long (max 2048 characters)"
30
+
31
+ if not URL_PATTERN.match(url):
32
+ return False, "Invalid URL format. Must start with http:// or https://"
33
+
34
+ try:
35
+ parsed = urlparse(url)
36
+ except Exception as e:
37
+ return False, f"Failed to parse URL: {str(e)}"
38
+
39
+ if parsed.scheme not in ['http', 'https']:
40
+ return False, "URL must use http or https protocol"
41
+
42
+ if not parsed.netloc:
43
+ return False, "URL must have a valid domain"
44
+
45
+ blocked_hosts = ['localhost', '127.0.0.1', '0.0.0.0', '::1']
46
+ if parsed.hostname and parsed.hostname.lower() in blocked_hosts:
47
+ return False, "Cannot scrape localhost or private addresses"
48
+
49
+ return True, ""
50
+
51
+
52
+ def is_likely_policy_url(url: str) -> bool:
53
+ """Check if the URL likely points to a policy page."""
54
+ keywords = ['privacy', 'policy', 'terms', 'tos', 'legal', 'service', 'conditions']
55
+ url_lower = url.lower()
56
+ return any(keyword in url_lower for keyword in keywords)
57
+
58
+
59
+ def sanitize_text(text: str) -> str:
60
+ """Sanitize text content to prevent prompt injection."""
61
+ if not text:
62
+ return ""
63
+
64
+ text = text.replace('\x00', '')
65
+ text = re.sub(r'\n{3,}', '\n\n', text)
66
+ text = re.sub(r' {3,}', ' ', text)
67
+
68
+ # Remove potential prompt injection patterns
69
+ injection_patterns = [
70
+ r'ignore\s+(previous|above|all)\s+instructions',
71
+ r'disregard\s+(previous|above|all)\s+instructions',
72
+ r'forget\s+(previous|above|all)\s+instructions',
73
+ r'new\s+instructions?\s*:',
74
+ r'system\s*:\s*',
75
+ ]
76
+
77
+ for pattern in injection_patterns:
78
+ text = re.sub(pattern, '[FILTERED]', text, flags=re.IGNORECASE)
79
+
80
+ return text.strip()
81
+
82
+
83
+ def truncate_content(content: str, max_length: int = MAX_CONTENT_LENGTH) -> str:
84
+ """Truncate content to maximum length while preserving sentences."""
85
+ if len(content) <= max_length:
86
+ return content
87
+
88
+ truncated = content[:max_length]
89
+ last_period = truncated.rfind('.')
90
+
91
+ if last_period > max_length * 0.8:
92
+ truncated = truncated[:last_period + 1]
93
+
94
+ return truncated + "\n\n[Content truncated due to length...]"
95
+
96
+
97
+ def validate_content_length(content: str) -> Tuple[bool, str]:
98
+ """Validate that content is not empty and not too short."""
99
+ if not content or not content.strip():
100
+ return False, "No content was extracted from the page"
101
+
102
+ word_count = len(content.split())
103
+
104
+ if word_count < 50:
105
+ return False, f"Content too short ({word_count} words). This may not be a valid policy page."
106
+
107
+ return True, ""