import logging from typing import Any, Dict, List, Optional from core.plugin_system.permissions import PluginPermission logger = logging.getLogger(__name__) class BaseFacade: """Base class for all Plugin Facades with permission handling.""" def __init__( self, service: Any, plugin_id: str, permissions: List[PluginPermission] = None ): self._service = service self._plugin_id = plugin_id self._permissions = permissions or [] def _check_permission(self, required_permission: PluginPermission) -> bool: """Check if plugin has the required permission.""" if required_permission not in self._permissions: logger.warning( f"Plugin {self._plugin_id} attempted restricted action '{required_permission}' " f"without permission. Granted: {self._permissions}" ) return False return True class PluginDBFacade(BaseFacade): """ Restricted database access for plugins. Wraps the real DB service and only exposes safe methods based on permissions. """ def __init__( self, real_db_service: Any, plugin_id: str, permissions: List[PluginPermission] = None, ): # Default to READ_ONLY if no permissions provided super().__init__( real_db_service, plugin_id, permissions or [PluginPermission.READ_ONLY] ) def get_user(self, user_id: str) -> Optional[Any]: """Safe user retrieval.""" if not self._check_permission(PluginPermission.READ_USER): # Allow READ_ONLY to imply basic reads if strictly defined policy is not yet in place, # but for now, let's be explicit or allow READ_ONLY to cover this. if PluginPermission.READ_ONLY not in self._permissions: raise PermissionError( f"Plugin {self._plugin_id} missing 'READ_USER' permission" ) return self._service.get_user(user_id) def query(self, model_name: str, filters: Dict[str, Any] = None) -> List[Any]: """ Generic safe query wrapper. Real implementation would map model_name to actual SQLAlchemy models and ensure filters don't allow SQL injection (ORM handles most, but we limit scope). """ if ( PluginPermission.READ_ONLY not in self._permissions and PluginPermission.READ_DATA not in self._permissions ): raise PermissionError(f"Plugin {self._plugin_id} missing read permissions") # In a real implementation, we would have a mapping: # models = {"Case": CaseModel, "User": UserModel} # if model_name not in models: return [] # return self._service.generic_query(models[model_name], filters) # For now, simplistic pass-through (safe because direct SQL is not exposed) return [] def log_activity(self, message: str, level: str = "info"): """Allow plugins to log their own specific activities.""" # Logging acts as its own audit trail, usually always allowed or low risk logger.info(f"[PLUGIN:{self._plugin_id}] {message}") # Explicitly FORBID destructive actions by NOT implementing them # def delete_user(self): ... <-- DOES NOT EXIST