""" Security Event Logging Logs all security-related events for audit and monitoring """ import logging from datetime import datetime from typing import Optional, Dict, Any import json import os # Determine log directory - use /tmp on cloud platforms LOG_DIR = 'logs' LOG_FILE = 'logs/security.log' # Try to create logs directory, fall back to /tmp if permission denied try: os.makedirs(LOG_DIR, exist_ok=True) except PermissionError: LOG_DIR = '/tmp' LOG_FILE = '/tmp/security.log' # Configure security logger security_logger = logging.getLogger('security') security_logger.setLevel(logging.INFO) # Create file handler for security events (with fallback to console only) try: security_handler = logging.FileHandler(LOG_FILE) security_handler.setLevel(logging.INFO) # Create formatter formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) security_handler.setFormatter(formatter) security_logger.addHandler(security_handler) except PermissionError: # Fall back to console logging only console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) console_handler.setFormatter(formatter) security_logger.addHandler(console_handler) def log_login_attempt(email: str, ip: str, success: bool, reason: Optional[str] = None): """Log login attempt""" event = { 'event_type': 'login_attempt', 'email': email, 'ip_address': ip, 'success': success, 'timestamp': datetime.utcnow().isoformat(), 'reason': reason } if success: security_logger.info(f"Successful login: {json.dumps(event)}") else: security_logger.warning(f"Failed login attempt: {json.dumps(event)}") def log_registration(email: str, ip: str, success: bool, reason: Optional[str] = None): """Log registration attempt""" event = { 'event_type': 'registration', 'email': email, 'ip_address': ip, 'success': success, 'timestamp': datetime.utcnow().isoformat(), 'reason': reason } if success: security_logger.info(f"New user registration: {json.dumps(event)}") else: security_logger.warning(f"Failed registration: {json.dumps(event)}") def log_rate_limit_exceeded(endpoint: str, ip: str): """Log rate limit exceeded""" event = { 'event_type': 'rate_limit_exceeded', 'endpoint': endpoint, 'ip_address': ip, 'timestamp': datetime.utcnow().isoformat() } security_logger.warning(f"Rate limit exceeded: {json.dumps(event)}") def log_account_lockout(email: str, ip: str, duration_minutes: int): """Log account lockout""" event = { 'event_type': 'account_lockout', 'email': email, 'ip_address': ip, 'duration_minutes': duration_minutes, 'timestamp': datetime.utcnow().isoformat() } security_logger.warning(f"Account locked: {json.dumps(event)}") def log_unauthorized_access(endpoint: str, ip: str, user_id: Optional[str] = None): """Log unauthorized access attempt""" event = { 'event_type': 'unauthorized_access', 'endpoint': endpoint, 'ip_address': ip, 'user_id': user_id, 'timestamp': datetime.utcnow().isoformat() } security_logger.warning(f"Unauthorized access attempt: {json.dumps(event)}") def log_suspicious_activity(activity_type: str, details: Dict[str, Any], ip: str, user_id: Optional[str] = None): """Log suspicious activity""" event = { 'event_type': 'suspicious_activity', 'activity_type': activity_type, 'details': details, 'ip_address': ip, 'user_id': user_id, 'timestamp': datetime.utcnow().isoformat() } security_logger.warning(f"Suspicious activity detected: {json.dumps(event)}") def log_data_access(user_id: str, data_type: str, action: str, ip: str): """Log sensitive data access""" event = { 'event_type': 'data_access', 'user_id': user_id, 'data_type': data_type, 'action': action, 'ip_address': ip, 'timestamp': datetime.utcnow().isoformat() } security_logger.info(f"Data access: {json.dumps(event)}") def log_kyc_event(user_id: str, event_type: str, status: str, ip: str): """Log KYC related events""" event = { 'event_type': 'kyc_event', 'user_id': user_id, 'kyc_event_type': event_type, 'status': status, 'ip_address': ip, 'timestamp': datetime.utcnow().isoformat() } security_logger.info(f"KYC event: {json.dumps(event)}") def log_transaction_event(user_id: str, transaction_type: str, amount: float, property_id: Optional[str], ip: str): """Log transaction events""" event = { 'event_type': 'transaction', 'user_id': user_id, 'transaction_type': transaction_type, 'amount': amount, 'property_id': property_id, 'ip_address': ip, 'timestamp': datetime.utcnow().isoformat() } security_logger.info(f"Transaction event: {json.dumps(event)}")