saemstunes commited on
Commit
1069dad
·
verified ·
1 Parent(s): 24d3062

Create security_system.py

Browse files
Files changed (1) hide show
  1. src/security_system.py +87 -0
src/security_system.py ADDED
@@ -0,0 +1,87 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import re
2
+ import time
3
+ from datetime import datetime, timedelta
4
+ from typing import Dict, List, Optional
5
+ import logging
6
+
7
+ class SecuritySystem:
8
+ """Security system for input validation and rate limiting"""
9
+
10
+ def __init__(self):
11
+ self.rate_limits = {}
12
+ self.suspicious_patterns = [
13
+ r"(?i)(password|token|key|secret)",
14
+ r"(?i)(delete|drop|alter|update).*table",
15
+ r"(?i)(script|javascript|onload|onerror)",
16
+ r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b",
17
+ r"(?i)(admin|root|sudo)"
18
+ ]
19
+ self.setup_logging()
20
+
21
+ def setup_logging(self):
22
+ """Setup logging"""
23
+ self.logger = logging.getLogger(__name__)
24
+
25
+ def check_request(self, query: str, user_id: str) -> Dict:
26
+ """Check request for security issues"""
27
+ result = {
28
+ "is_suspicious": False,
29
+ "alerts": [],
30
+ "risk_score": 0
31
+ }
32
+
33
+ # Rate limiting
34
+ if not self.check_rate_limit(user_id):
35
+ result["is_suspicious"] = True
36
+ result["alerts"].append("Rate limit exceeded")
37
+ result["risk_score"] = 100
38
+ return result
39
+
40
+ # Pattern matching
41
+ for pattern in self.suspicious_patterns:
42
+ if re.search(pattern, query):
43
+ result["alerts"].append(f"Suspicious pattern: {pattern}")
44
+ result["risk_score"] += 20
45
+
46
+ # Query length analysis
47
+ if len(query) > 10000:
48
+ result["alerts"].append("Excessively long query")
49
+ result["risk_score"] += 30
50
+
51
+ # Special character analysis
52
+ special_chars = len(re.findall(r'[^\w\s]', query))
53
+ if special_chars > len(query) * 0.3:
54
+ result["alerts"].append("High percentage of special characters")
55
+ result["risk_score"] += 25
56
+
57
+ # Determine if suspicious
58
+ if result["risk_score"] >= 50:
59
+ result["is_suspicious"] = True
60
+
61
+ return result
62
+
63
+ def check_rate_limit(self, user_id: str, requests_per_minute: int = 60) -> bool:
64
+ """Check rate limit for user"""
65
+ current_time = datetime.now()
66
+
67
+ # Clean old entries
68
+ self.rate_limits[user_id] = [
69
+ t for t in self.rate_limits.get(user_id, [])
70
+ if current_time - t < timedelta(minutes=1)
71
+ ]
72
+
73
+ # Check rate limit
74
+ if len(self.rate_limits[user_id]) >= requests_per_minute:
75
+ return False
76
+
77
+ # Add current request
78
+ self.rate_limits[user_id].append(current_time)
79
+ return True
80
+
81
+ def sanitize_input(self, text: str) -> str:
82
+ """Sanitize user input"""
83
+ # Remove potentially dangerous characters
84
+ sanitized = re.sub(r'[<>"\'&]', '', text)
85
+ sanitized = re.sub(r'(\b)(DROP|DELETE|INSERT|UPDATE)(\b)', '', sanitized, flags=re.IGNORECASE)
86
+ sanitized = re.sub(r';\s*\w+', '', sanitized)
87
+ return sanitized.strip()