Spaces:
Sleeping
Sleeping
| """ | |
| Security Event Logging | |
| Logs all security-related events for audit and monitoring | |
| """ | |
| import logging | |
| from datetime import datetime | |
| from typing import Optional, Dict, Any | |
| import json | |
| import os | |
| # Determine log directory - use /tmp on cloud platforms | |
| LOG_DIR = 'logs' | |
| LOG_FILE = 'logs/security.log' | |
| # Try to create logs directory, fall back to /tmp if permission denied | |
| try: | |
| os.makedirs(LOG_DIR, exist_ok=True) | |
| except PermissionError: | |
| LOG_DIR = '/tmp' | |
| LOG_FILE = '/tmp/security.log' | |
| # Configure security logger | |
| security_logger = logging.getLogger('security') | |
| security_logger.setLevel(logging.INFO) | |
| # Create file handler for security events (with fallback to console only) | |
| try: | |
| security_handler = logging.FileHandler(LOG_FILE) | |
| security_handler.setLevel(logging.INFO) | |
| # Create formatter | |
| formatter = logging.Formatter( | |
| '%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| security_handler.setFormatter(formatter) | |
| security_logger.addHandler(security_handler) | |
| except PermissionError: | |
| # Fall back to console logging only | |
| console_handler = logging.StreamHandler() | |
| console_handler.setLevel(logging.INFO) | |
| formatter = logging.Formatter( | |
| '%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| console_handler.setFormatter(formatter) | |
| security_logger.addHandler(console_handler) | |
| def log_login_attempt(email: str, ip: str, success: bool, reason: Optional[str] = None): | |
| """Log login attempt""" | |
| event = { | |
| 'event_type': 'login_attempt', | |
| 'email': email, | |
| 'ip_address': ip, | |
| 'success': success, | |
| 'timestamp': datetime.utcnow().isoformat(), | |
| 'reason': reason | |
| } | |
| if success: | |
| security_logger.info(f"Successful login: {json.dumps(event)}") | |
| else: | |
| security_logger.warning(f"Failed login attempt: {json.dumps(event)}") | |
| def log_registration(email: str, ip: str, success: bool, reason: Optional[str] = None): | |
| """Log registration attempt""" | |
| event = { | |
| 'event_type': 'registration', | |
| 'email': email, | |
| 'ip_address': ip, | |
| 'success': success, | |
| 'timestamp': datetime.utcnow().isoformat(), | |
| 'reason': reason | |
| } | |
| if success: | |
| security_logger.info(f"New user registration: {json.dumps(event)}") | |
| else: | |
| security_logger.warning(f"Failed registration: {json.dumps(event)}") | |
| def log_rate_limit_exceeded(endpoint: str, ip: str): | |
| """Log rate limit exceeded""" | |
| event = { | |
| 'event_type': 'rate_limit_exceeded', | |
| 'endpoint': endpoint, | |
| 'ip_address': ip, | |
| 'timestamp': datetime.utcnow().isoformat() | |
| } | |
| security_logger.warning(f"Rate limit exceeded: {json.dumps(event)}") | |
| def log_account_lockout(email: str, ip: str, duration_minutes: int): | |
| """Log account lockout""" | |
| event = { | |
| 'event_type': 'account_lockout', | |
| 'email': email, | |
| 'ip_address': ip, | |
| 'duration_minutes': duration_minutes, | |
| 'timestamp': datetime.utcnow().isoformat() | |
| } | |
| security_logger.warning(f"Account locked: {json.dumps(event)}") | |
| def log_unauthorized_access(endpoint: str, ip: str, user_id: Optional[str] = None): | |
| """Log unauthorized access attempt""" | |
| event = { | |
| 'event_type': 'unauthorized_access', | |
| 'endpoint': endpoint, | |
| 'ip_address': ip, | |
| 'user_id': user_id, | |
| 'timestamp': datetime.utcnow().isoformat() | |
| } | |
| security_logger.warning(f"Unauthorized access attempt: {json.dumps(event)}") | |
| def log_suspicious_activity(activity_type: str, details: Dict[str, Any], ip: str, user_id: Optional[str] = None): | |
| """Log suspicious activity""" | |
| event = { | |
| 'event_type': 'suspicious_activity', | |
| 'activity_type': activity_type, | |
| 'details': details, | |
| 'ip_address': ip, | |
| 'user_id': user_id, | |
| 'timestamp': datetime.utcnow().isoformat() | |
| } | |
| security_logger.warning(f"Suspicious activity detected: {json.dumps(event)}") | |
| def log_data_access(user_id: str, data_type: str, action: str, ip: str): | |
| """Log sensitive data access""" | |
| event = { | |
| 'event_type': 'data_access', | |
| 'user_id': user_id, | |
| 'data_type': data_type, | |
| 'action': action, | |
| 'ip_address': ip, | |
| 'timestamp': datetime.utcnow().isoformat() | |
| } | |
| security_logger.info(f"Data access: {json.dumps(event)}") | |
| def log_kyc_event(user_id: str, event_type: str, status: str, ip: str): | |
| """Log KYC related events""" | |
| event = { | |
| 'event_type': 'kyc_event', | |
| 'user_id': user_id, | |
| 'kyc_event_type': event_type, | |
| 'status': status, | |
| 'ip_address': ip, | |
| 'timestamp': datetime.utcnow().isoformat() | |
| } | |
| security_logger.info(f"KYC event: {json.dumps(event)}") | |
| def log_transaction_event(user_id: str, transaction_type: str, amount: float, property_id: Optional[str], ip: str): | |
| """Log transaction events""" | |
| event = { | |
| 'event_type': 'transaction', | |
| 'user_id': user_id, | |
| 'transaction_type': transaction_type, | |
| 'amount': amount, | |
| 'property_id': property_id, | |
| 'ip_address': ip, | |
| 'timestamp': datetime.utcnow().isoformat() | |
| } | |
| security_logger.info(f"Transaction event: {json.dumps(event)}") | |