atriumchain-api / utils /security_logger.py
Jainish1808's picture
Upload utils/security_logger.py with huggingface_hub
2724228 verified
"""
Security Event Logging
Logs all security-related events for audit and monitoring
"""
import logging
from datetime import datetime
from typing import Optional, Dict, Any
import json
import os
# Determine log directory - use /tmp on cloud platforms
LOG_DIR = 'logs'
LOG_FILE = 'logs/security.log'
# Try to create logs directory, fall back to /tmp if permission denied
try:
os.makedirs(LOG_DIR, exist_ok=True)
except PermissionError:
LOG_DIR = '/tmp'
LOG_FILE = '/tmp/security.log'
# Configure security logger
security_logger = logging.getLogger('security')
security_logger.setLevel(logging.INFO)
# Create file handler for security events (with fallback to console only)
try:
security_handler = logging.FileHandler(LOG_FILE)
security_handler.setLevel(logging.INFO)
# Create formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
security_handler.setFormatter(formatter)
security_logger.addHandler(security_handler)
except PermissionError:
# Fall back to console logging only
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
console_handler.setFormatter(formatter)
security_logger.addHandler(console_handler)
def log_login_attempt(email: str, ip: str, success: bool, reason: Optional[str] = None):
"""Log login attempt"""
event = {
'event_type': 'login_attempt',
'email': email,
'ip_address': ip,
'success': success,
'timestamp': datetime.utcnow().isoformat(),
'reason': reason
}
if success:
security_logger.info(f"Successful login: {json.dumps(event)}")
else:
security_logger.warning(f"Failed login attempt: {json.dumps(event)}")
def log_registration(email: str, ip: str, success: bool, reason: Optional[str] = None):
"""Log registration attempt"""
event = {
'event_type': 'registration',
'email': email,
'ip_address': ip,
'success': success,
'timestamp': datetime.utcnow().isoformat(),
'reason': reason
}
if success:
security_logger.info(f"New user registration: {json.dumps(event)}")
else:
security_logger.warning(f"Failed registration: {json.dumps(event)}")
def log_rate_limit_exceeded(endpoint: str, ip: str):
"""Log rate limit exceeded"""
event = {
'event_type': 'rate_limit_exceeded',
'endpoint': endpoint,
'ip_address': ip,
'timestamp': datetime.utcnow().isoformat()
}
security_logger.warning(f"Rate limit exceeded: {json.dumps(event)}")
def log_account_lockout(email: str, ip: str, duration_minutes: int):
"""Log account lockout"""
event = {
'event_type': 'account_lockout',
'email': email,
'ip_address': ip,
'duration_minutes': duration_minutes,
'timestamp': datetime.utcnow().isoformat()
}
security_logger.warning(f"Account locked: {json.dumps(event)}")
def log_unauthorized_access(endpoint: str, ip: str, user_id: Optional[str] = None):
"""Log unauthorized access attempt"""
event = {
'event_type': 'unauthorized_access',
'endpoint': endpoint,
'ip_address': ip,
'user_id': user_id,
'timestamp': datetime.utcnow().isoformat()
}
security_logger.warning(f"Unauthorized access attempt: {json.dumps(event)}")
def log_suspicious_activity(activity_type: str, details: Dict[str, Any], ip: str, user_id: Optional[str] = None):
"""Log suspicious activity"""
event = {
'event_type': 'suspicious_activity',
'activity_type': activity_type,
'details': details,
'ip_address': ip,
'user_id': user_id,
'timestamp': datetime.utcnow().isoformat()
}
security_logger.warning(f"Suspicious activity detected: {json.dumps(event)}")
def log_data_access(user_id: str, data_type: str, action: str, ip: str):
"""Log sensitive data access"""
event = {
'event_type': 'data_access',
'user_id': user_id,
'data_type': data_type,
'action': action,
'ip_address': ip,
'timestamp': datetime.utcnow().isoformat()
}
security_logger.info(f"Data access: {json.dumps(event)}")
def log_kyc_event(user_id: str, event_type: str, status: str, ip: str):
"""Log KYC related events"""
event = {
'event_type': 'kyc_event',
'user_id': user_id,
'kyc_event_type': event_type,
'status': status,
'ip_address': ip,
'timestamp': datetime.utcnow().isoformat()
}
security_logger.info(f"KYC event: {json.dumps(event)}")
def log_transaction_event(user_id: str, transaction_type: str, amount: float, property_id: Optional[str], ip: str):
"""Log transaction events"""
event = {
'event_type': 'transaction',
'user_id': user_id,
'transaction_type': transaction_type,
'amount': amount,
'property_id': property_id,
'ip_address': ip,
'timestamp': datetime.utcnow().isoformat()
}
security_logger.info(f"Transaction event: {json.dumps(event)}")