Spaces:
Paused
Paused
| """ | |
| Compliance Service - Automated regulatory reporting, risk assessment, and training management. | |
| """ | |
| import json | |
| import logging | |
| from datetime import datetime, timedelta | |
| from typing import Any, Optional | |
| from sqlalchemy.orm import Session | |
| from core.database import ( | |
| AccessReview, | |
| ComplianceAuditLog, | |
| RegulatoryReport, | |
| SecurityIncident, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class ComplianceService: | |
| """Consolidated logic for regulatory compliance, reporting, and dashboard metrics""" | |
| def __init__(self, db: Session): | |
| self.db = db | |
| async def log_compliance_event( | |
| self, | |
| action: str, | |
| resource_type: str, | |
| resource_id: str, | |
| user_id: str, | |
| user_role: str, | |
| details: dict, | |
| ) -> Optional[str]: | |
| try: | |
| audit_log = ComplianceAuditLog( | |
| action=action, | |
| resource_type=resource_type, | |
| resource_id=resource_id, | |
| user_id=user_id, | |
| user_role=user_role, | |
| timestamp=datetime.utcnow(), | |
| details=json.dumps(details), | |
| ) | |
| self.db.add(audit_log) | |
| self.db.commit() | |
| return audit_log.id | |
| except Exception as e: | |
| logger.error(f"Failed to log compliance: {e}") | |
| self.db.rollback() | |
| return None | |
| def get_dashboard_metrics(self) -> dict[str, Any]: | |
| last_24h = datetime.utcnow() - timedelta(hours=24) | |
| return { | |
| "recent_audit_events": self.db.query(ComplianceAuditLog) | |
| .filter(ComplianceAuditLog.timestamp >= last_24h) | |
| .count(), | |
| "pending_regulatory_reports": self.db.query(RegulatoryReport) | |
| .filter(RegulatoryReport.filing_status.in_(["draft", "rejected"])) | |
| .count(), | |
| "open_security_incidents": self.db.query(SecurityIncident) | |
| .filter(SecurityIncident.status.in_(["open", "investigating"])) | |
| .count(), | |
| "overdue_access_reviews": self.db.query(AccessReview) | |
| .filter(AccessReview.review_status == "overdue") | |
| .count(), | |
| } | |
| async def cleanup_old_logs(self, days: int = 90) -> int: | |
| """Remove compliance audit logs older than a specific number of days.""" | |
| threshold = datetime.utcnow() - timedelta(days=days) | |
| try: | |
| # Bulk delete older logs | |
| deleted_count = ( | |
| self.db.query(ComplianceAuditLog) | |
| .filter(ComplianceAuditLog.timestamp < threshold) | |
| .delete(synchronize_session=False) | |
| ) | |
| self.db.commit() | |
| logger.info( | |
| f"Compliance log cleanup: Removed {deleted_count} logs older than {days} days." | |
| ) | |
| return deleted_count | |
| except Exception as e: | |
| logger.error(f"Failed to cleanup compliance logs: {e}") | |
| self.db.rollback() | |
| return 0 | |
| # Instance factory | |
| def get_compliance_service(db: Session): | |
| return ComplianceService(db) | |