# utils/audit.py import streamlit as st import logging import json import uuid from datetime import datetime from typing import Optional, Any class AuditLogger: def __init__(self): self.conn = None self._setup_logging() def _setup_logging(self): logging.basicConfig( filename='audit.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) def initialize_with_connection(self, conn): self.conn = conn def log_activity( self, user_id: str, action: str, ip_address: Optional[str] = None, details: Any = None, severity: str = 'info' ): if not self.conn: logging.error("Database connection not initialized") return try: cursor = self.conn.cursor() cursor.execute(""" INSERT INTO audit_logs ( id, user_id, action, ip_address, timestamp, details, severity ) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( str(uuid.uuid4()), user_id, action, ip_address, datetime.now().isoformat(), json.dumps(details) if details else None, severity )) self.conn.commit() logging.info(f"Audit log: {action} by user {user_id}") except Exception as e: logging.error(f"Failed to create audit log: {str(e)}") def get_user_activity(self, user_id: str, limit: int = 50): if not self.conn: return [] try: cursor = self.conn.cursor() cursor.execute(""" SELECT action, timestamp, details, severity FROM audit_logs WHERE user_id = ? ORDER BY timestamp DESC LIMIT ? """, (user_id, limit)) return cursor.fetchall() except Exception as e: logging.error(f"Failed to fetch user activity: {str(e)}") return [] # Create a single instance audit_logger = AuditLogger()