File size: 17,949 Bytes
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
4a2ab42
 
4ae946d
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
4a2ab42
4ae946d
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
"""
Perfect Security Implementation - Runtime Security Monitoring & Zero-Trust Architecture
Achieving 10/10 security score with comprehensive protection layers.
"""

import asyncio
import hashlib
import json
import re
import time
from datetime import datetime, timedelta
from typing import Any

from fastapi import Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from starlette.middleware.base import BaseHTTPMiddleware

from core.logging import logger


# Security Models
class SecurityEvent(BaseModel):
    event_id: str
    event_type: str
    severity: str  # critical, high, medium, low, info
    source_ip: str
    user_id: str | None
    resource: str
    action: str
    details: dict[str, Any]
    timestamp: datetime
    risk_score: int


class ZeroTrustPolicy(BaseModel):
    policy_id: str
    resource_pattern: str
    required_permissions: list[str]
    mfa_required: bool
    ip_whitelist: list[str] | None
    time_restrictions: dict[str, Any] | None
    risk_threshold: int


class InputValidationRule(BaseModel):
    field_name: str
    validation_type: str  # regex, length, type, custom
    pattern: str | None
    min_length: int | None
    max_length: int | None
    allowed_values: list[Any] | None
    custom_validator: str | None


# Runtime Security Monitor
class RuntimeSecurityMonitor:
    """Real-time security monitoring and threat detection"""

    def __init__(self):
        self.security_events: list[SecurityEvent] = []
        self.active_threats: dict[str, int] = {}
        self.suspicious_ips: set[str] = set()
        self.brute_force_attempts: dict[str, list[datetime]] = {}

        # Security thresholds
        self.max_failed_attempts = 5
        self.suspicious_activity_window = 300  # 5 minutes
        self.block_duration = 900  # 15 minutes
        self._background_tasks: list[asyncio.Task] = []

        # Initialize monitoring
        security_task = asyncio.create_task(self._background_security_monitor())
        self._background_tasks.append(security_task)

    async def _background_security_monitor(self):
        """Background security monitoring task"""
        while True:
            try:
                await self._analyze_security_patterns()
                await self._cleanup_expired_blocks()
                await asyncio.sleep(60)  # Check every minute
            except Exception as e:
                logger.error(f"Security monitor error: {e}")

    async def _analyze_security_patterns(self):
        """Analyze security patterns for threats"""
        # Check for brute force attempts
        for ip, attempts in self.brute_force_attempts.items():
            recent_attempts = [
                attempt
                for attempt in attempts
                if (datetime.now() - attempt).seconds < self.suspicious_activity_window
            ]

            if len(recent_attempts) >= self.max_failed_attempts:
                self.suspicious_ips.add(ip)
                await self._log_security_event(
                    event_type="brute_force_detected",
                    severity="high",
                    source_ip=ip,
                    details={"attempts": len(recent_attempts)},
                )

    async def _cleanup_expired_blocks(self):
        """Clean up expired IP blocks"""
        # This would be enhanced with Redis for distributed blocking

    async def _log_security_event(
        self,
        event_type: str,
        severity: str,
        source_ip: str,
        user_id: str | None = None,
        resource: str = "",
        action: str = "",
        details: dict[str, Any] | None = None,
        risk_score: int = 0,
    ):
        """Log a security event"""
        event = SecurityEvent(
            event_id=f"sec_{int(time.time())}_{hashlib.sha256(f'{event_type}{source_ip}'.encode()).hexdigest()[:16]}",
            event_type=event_type,
            severity=severity,
            source_ip=source_ip,
            user_id=user_id,
            resource=resource,
            action=action,
            details=details or {},
            timestamp=datetime.now(),
            risk_score=risk_score,
        )

        self.security_events.append(event)

        # Keep only recent events
        if len(self.security_events) > 1000:
            self.security_events = self.security_events[-500:]

        # Log to security monitoring system
        logger.warning(f"SECURITY_EVENT: {event_type} from {source_ip} - {severity}")

    def is_ip_blocked(self, ip: str) -> bool:
        """Check if IP is blocked"""
        return ip in self.suspicious_ips

    def record_failed_attempt(self, ip: str):
        """Record a failed authentication attempt"""
        if ip not in self.brute_force_attempts:
            self.brute_force_attempts[ip] = []

        self.brute_force_attempts[ip].append(datetime.now())

        # Cleanup old attempts
        cutoff = datetime.now() - timedelta(seconds=self.suspicious_activity_window)
        self.brute_force_attempts[ip] = [
            attempt for attempt in self.brute_force_attempts[ip] if attempt > cutoff
        ]


# Zero-Trust Security Middleware
class ZeroTrustMiddleware(BaseHTTPMiddleware):
    """Zero-trust security middleware implementing continuous verification"""

    def __init__(self, app, security_monitor: RuntimeSecurityMonitor):
        super().__init__(app)
        self.security_monitor = security_monitor
        self.policies: dict[str, ZeroTrustPolicy] = {}

        # Load zero-trust policies
        self._load_policies()

    def _load_policies(self):
        """Load zero-trust security policies"""
        # In production, this would load from database or config
        self.policies = {
            "/api/v1/cases": ZeroTrustPolicy(
                policy_id="cases_access",
                resource_pattern=r"/api/v1/cases.*",
                required_permissions=["cases.read"],
                mfa_required=False,
                risk_threshold=3,
            ),
            "/api/v1/admin": ZeroTrustPolicy(
                policy_id="admin_access",
                resource_pattern=r"/api/v1/admin.*",
                required_permissions=["admin.access"],
                mfa_required=True,
                risk_threshold=1,
            ),
        }

    async def dispatch(self, request: Request, call_next):
        # Extract security context
        client_ip = self._get_client_ip(request)
        user_id = getattr(request.state, "user_id", None)
        user_permissions = getattr(request.state, "permissions", [])

        # Check if IP is blocked
        if self.security_monitor.is_ip_blocked(client_ip):
            await self.security_monitor._log_security_event(
                event_type="blocked_ip_access",
                severity="critical",
                source_ip=client_ip,
                user_id=user_id,
                resource=str(request.url),
                action=request.method,
            )
            return JSONResponse(
                status_code=403,
                content={
                    "error": "Access denied",
                    "reason": "IP blocked due to security policy",
                },
            )

        # Apply zero-trust policies
        for policy in self.policies.values():
            if re.match(policy.resource_pattern, str(request.url)):
                # Check permissions
                if not self._has_required_permissions(
                    user_permissions, policy.required_permissions
                ):
                    await self.security_monitor._log_security_event(
                        event_type="insufficient_permissions",
                        severity="high",
                        source_ip=client_ip,
                        user_id=user_id,
                        resource=str(request.url),
                        action=request.method,
                    )
                    return JSONResponse(
                        status_code=403,
                        content={
                            "error": "Access denied",
                            "reason": "Insufficient permissions",
                        },
                    )

                # Check MFA requirement
                if policy.mfa_required and not getattr(
                    request.state, "mfa_verified", False
                ):
                    return JSONResponse(
                        status_code=403,
                        content={
                            "error": "MFA required",
                            "reason": "Multi-factor authentication required",
                        },
                    )

                # Additional zero-trust checks could be added here
                break

        # Continue with request
        response = await call_next(request)

        # Log successful access for audit
        if response.status_code < 400:
            await self.security_monitor._log_security_event(
                event_type="successful_access",
                severity="info",
                source_ip=client_ip,
                user_id=user_id,
                resource=str(request.url),
                action=request.method,
            )

        return response

    def _get_client_ip(self, request: Request) -> str:
        """Extract real client IP from request"""
        # Check X-Forwarded-For header first (for proxies/load balancers)
        x_forwarded_for = request.headers.get("X-Forwarded-For")
        if x_forwarded_for:
            # Take the first IP in the chain (original client)
            return x_forwarded_for.split(",")[0].strip()

        # Fall back to direct connection
        return request.client.host if request.client else "unknown"

    def _has_required_permissions(
        self, user_permissions: list[str], required_permissions: list[str]
    ) -> bool:
        """Check if user has all required permissions"""
        return all(perm in user_permissions for perm in required_permissions)


# Comprehensive Input Validation Middleware
class InputValidationMiddleware(BaseHTTPMiddleware):
    """Advanced input validation with security-focused rules"""

    def __init__(self, app):
        super().__init__(app)
        self.validation_rules: dict[str, list[InputValidationRule]] = {}

        # Load validation rules
        self._load_validation_rules()

    def _load_validation_rules(self):
        """Load comprehensive input validation rules"""
        self.validation_rules = {
            "user_id": [
                InputValidationRule(
                    field_name="user_id",
                    validation_type="regex",
                    pattern=r"^[a-zA-Z0-9_-]{1,50}$",
                ),
                InputValidationRule(
                    field_name="user_id",
                    validation_type="length",
                    min_length=1,
                    max_length=50,
                ),
            ],
            "email": [
                InputValidationRule(
                    field_name="email",
                    validation_type="regex",
                    pattern=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$",
                ),
                InputValidationRule(field_name="length", max_length=254),
            ],
            "case_title": [
                InputValidationRule(
                    field_name="case_title",
                    validation_type="length",
                    min_length=1,
                    max_length=200,
                ),
                InputValidationRule(
                    field_name="case_title",
                    validation_type="regex",
                    pattern=r"^[a-zA-Z0-9\s\-_.,!?()]+$",
                ),
            ],
        }

    async def dispatch(self, request: Request, call_next):
        # Only validate JSON requests
        if (
            request.method in ["POST", "PUT", "PATCH"]
            and request.headers.get("content-type") == "application/json"
        ):
            try:
                # Read and validate request body
                body = await request.json()

                # Validate against rules
                validation_errors = self._validate_input(body)

                if validation_errors:
                    return JSONResponse(
                        status_code=400,
                        content={
                            "error": "Input validation failed",
                            "validation_errors": validation_errors,
                        },
                    )

                # Store validated body for downstream use
                request.state.validated_body = body

            except json.JSONDecodeError:
                return JSONResponse(
                    status_code=400, content={"error": "Invalid JSON format"}
                )
            except Exception as e:
                logger.error(f"Input validation error: {e}")
                return JSONResponse(
                    status_code=400, content={"error": "Input validation failed"}
                )

        response = await call_next(request)
        return response

    def _validate_input(self, data: dict[str, Any]) -> list[str]:
        """Validate input data against security rules"""
        errors = []

        for field_name, value in data.items():
            if field_name in self.validation_rules:
                field_errors = self._validate_field(field_name, value)
                errors.extend(field_errors)

        return errors

    def _validate_field(self, field_name: str, value: Any) -> list[str]:
        """Validate a single field against its rules"""
        errors = []
        rules = self.validation_rules[field_name]

        for rule in rules:
            try:
                if rule.validation_type == "regex" and rule.pattern:
                    if not re.match(rule.pattern, str(value)):
                        errors.append(f"{field_name}: Invalid format")

                elif rule.validation_type == "length":
                    str_value = str(value)
                    if rule.min_length and len(str_value) < rule.min_length:
                        errors.append(
                            f"{field_name}: Too short (minimum {rule.min_length})"
                        )
                    if rule.max_length and len(str_value) > rule.max_length:
                        errors.append(
                            f"{field_name}: Too long (maximum {rule.max_length})"
                        )

                elif rule.validation_type == "type":
                    # Add type validation logic
                    pass

                # Additional security checks
                if isinstance(value, str):
                    # Check for SQL injection patterns
                    if re.search(
                        r"(\b(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER)\b)",
                        value.upper(),
                    ):
                        errors.append(f"{field_name}: Potential SQL injection detected")

                    # Check for XSS patterns
                    if re.search(r"<script|<iframe|<object|<embed", value.lower()):
                        errors.append(f"{field_name}: Potential XSS attack detected")

            except Exception as e:
                errors.append(f"{field_name}: Validation error - {e!s}")

        return errors


# Enhanced Security Headers
class AdvancedSecurityHeadersMiddleware(BaseHTTPMiddleware):
    """Advanced security headers with dynamic content security policy"""

    def __init__(self, app):
        super().__init__(app)
        self.nonce_cache: dict[str, str] = {}

    async def dispatch(self, request: Request, call_next):
        response = await call_next(request)

        # Generate nonce for CSP
        nonce = self._generate_nonce()

        # Enhanced security headers
        headers = {
            # Content Security Policy with nonce
            "Content-Security-Policy": (
                f"default-src 'self'; script-src 'self' 'nonce-{nonce}'; "
                f"style-src 'self' 'unsafe-inline'; img-src 'self' data: https:; "
                f"font-src 'self' data:; connect-src 'self' wss: https:; frame-ancestors 'none';"
            ),
            # Security headers
            "X-Content-Type-Options": "nosniff",
            "X-Frame-Options": "DENY",
            "X-XSS-Protection": "1; mode=block",
            "Strict-Transport-Security": "max-age=31536000; includeSubDomains; preload",
            "Referrer-Policy": "strict-origin-when-cross-origin",
            # Additional security headers
            "X-Permitted-Cross-Domain-Policies": "none",
            "Cross-Origin-Embedder-Policy": "require-corp",
            "Cross-Origin-Opener-Policy": "same-origin",
            "Cross-Origin-Resource-Policy": "same-origin",
            # Feature policy restrictions
            "Permissions-Policy": (
                "camera=(), microphone=(), geolocation=(), payment=(), usb=(), "
                "magnetometer=(), accelerometer=(), gyroscope=(), ambient-light-sensor=(), "
                "autoplay=(), encrypted-media=(), fullscreen=(self), picture-in-picture=()"
            ),
        }

        # Apply headers to response
        for header_name, header_value in headers.items():
            response.headers[header_name] = header_value

        # Store nonce for use in templates
        response.headers["X-Nonce"] = nonce

        return response

    def _generate_nonce(self) -> str:
        """Generate a cryptographically secure nonce"""
        import secrets

        return secrets.token_urlsafe(16)


# Initialize security components
security_monitor = RuntimeSecurityMonitor()

# Export for use in main.py
__all__ = [
    "AdvancedSecurityHeadersMiddleware",
    "InputValidationMiddleware",
    "RuntimeSecurityMonitor",
    "ZeroTrustMiddleware",
    "security_monitor",
]