teoat's picture
fix(backend): fix port and health check robustness
d29a5a0 verified
"""
Compliance Service - Automated regulatory reporting, risk assessment, and training management.
"""
import json
import logging
from datetime import datetime, timedelta
from typing import Any, Optional
from sqlalchemy.orm import Session
from core.database import (
AccessReview,
ComplianceAuditLog,
RegulatoryReport,
SecurityIncident,
)
logger = logging.getLogger(__name__)
class ComplianceService:
"""Consolidated logic for regulatory compliance, reporting, and dashboard metrics"""
def __init__(self, db: Session):
self.db = db
async def log_compliance_event(
self,
action: str,
resource_type: str,
resource_id: str,
user_id: str,
user_role: str,
details: dict,
) -> Optional[str]:
try:
audit_log = ComplianceAuditLog(
action=action,
resource_type=resource_type,
resource_id=resource_id,
user_id=user_id,
user_role=user_role,
timestamp=datetime.utcnow(),
details=json.dumps(details),
)
self.db.add(audit_log)
self.db.commit()
return audit_log.id
except Exception as e:
logger.error(f"Failed to log compliance: {e}")
self.db.rollback()
return None
def get_dashboard_metrics(self) -> dict[str, Any]:
last_24h = datetime.utcnow() - timedelta(hours=24)
return {
"recent_audit_events": self.db.query(ComplianceAuditLog)
.filter(ComplianceAuditLog.timestamp >= last_24h)
.count(),
"pending_regulatory_reports": self.db.query(RegulatoryReport)
.filter(RegulatoryReport.filing_status.in_(["draft", "rejected"]))
.count(),
"open_security_incidents": self.db.query(SecurityIncident)
.filter(SecurityIncident.status.in_(["open", "investigating"]))
.count(),
"overdue_access_reviews": self.db.query(AccessReview)
.filter(AccessReview.review_status == "overdue")
.count(),
}
async def cleanup_old_logs(self, days: int = 90) -> int:
"""Remove compliance audit logs older than a specific number of days."""
threshold = datetime.utcnow() - timedelta(days=days)
try:
# Bulk delete older logs
deleted_count = (
self.db.query(ComplianceAuditLog)
.filter(ComplianceAuditLog.timestamp < threshold)
.delete(synchronize_session=False)
)
self.db.commit()
logger.info(
f"Compliance log cleanup: Removed {deleted_count} logs older than {days} days."
)
return deleted_count
except Exception as e:
logger.error(f"Failed to cleanup compliance logs: {e}")
self.db.rollback()
return 0
# Instance factory
def get_compliance_service(db: Session):
return ComplianceService(db)