shaheerawan3's picture
Update utils/audit.py
ff4402e verified
# utils/audit.py
import streamlit as st
import logging
import json
import uuid
from datetime import datetime
from typing import Optional, Any
class AuditLogger:
def __init__(self):
self.conn = None
self._setup_logging()
def _setup_logging(self):
logging.basicConfig(
filename='audit.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def initialize_with_connection(self, conn):
self.conn = conn
def log_activity(
self,
user_id: str,
action: str,
ip_address: Optional[str] = None,
details: Any = None,
severity: str = 'info'
):
if not self.conn:
logging.error("Database connection not initialized")
return
try:
cursor = self.conn.cursor()
cursor.execute("""
INSERT INTO audit_logs (
id, user_id, action, ip_address, timestamp, details, severity
) VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
str(uuid.uuid4()),
user_id,
action,
ip_address,
datetime.now().isoformat(),
json.dumps(details) if details else None,
severity
))
self.conn.commit()
logging.info(f"Audit log: {action} by user {user_id}")
except Exception as e:
logging.error(f"Failed to create audit log: {str(e)}")
def get_user_activity(self, user_id: str, limit: int = 50):
if not self.conn:
return []
try:
cursor = self.conn.cursor()
cursor.execute("""
SELECT action, timestamp, details, severity
FROM audit_logs
WHERE user_id = ?
ORDER BY timestamp DESC
LIMIT ?
""", (user_id, limit))
return cursor.fetchall()
except Exception as e:
logging.error(f"Failed to fetch user activity: {str(e)}")
return []
# Create a single instance
audit_logger = AuditLogger()