Spaces:
Paused
Paused
File size: 3,315 Bytes
3c3608b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 | import logging
from typing import Any, Dict, List, Optional
from core.plugin_system.permissions import PluginPermission
logger = logging.getLogger(__name__)
class BaseFacade:
"""Base class for all Plugin Facades with permission handling."""
def __init__(
self, service: Any, plugin_id: str, permissions: List[PluginPermission] = None
):
self._service = service
self._plugin_id = plugin_id
self._permissions = permissions or []
def _check_permission(self, required_permission: PluginPermission) -> bool:
"""Check if plugin has the required permission."""
if required_permission not in self._permissions:
logger.warning(
f"Plugin {self._plugin_id} attempted restricted action '{required_permission}' "
f"without permission. Granted: {self._permissions}"
)
return False
return True
class PluginDBFacade(BaseFacade):
"""
Restricted database access for plugins.
Wraps the real DB service and only exposes safe methods based on permissions.
"""
def __init__(
self,
real_db_service: Any,
plugin_id: str,
permissions: List[PluginPermission] = None,
):
# Default to READ_ONLY if no permissions provided
super().__init__(
real_db_service, plugin_id, permissions or [PluginPermission.READ_ONLY]
)
def get_user(self, user_id: str) -> Optional[Any]:
"""Safe user retrieval."""
if not self._check_permission(PluginPermission.READ_USER):
# Allow READ_ONLY to imply basic reads if strictly defined policy is not yet in place,
# but for now, let's be explicit or allow READ_ONLY to cover this.
if PluginPermission.READ_ONLY not in self._permissions:
raise PermissionError(
f"Plugin {self._plugin_id} missing 'READ_USER' permission"
)
return self._service.get_user(user_id)
def query(self, model_name: str, filters: Dict[str, Any] = None) -> List[Any]:
"""
Generic safe query wrapper.
Real implementation would map model_name to actual SQLAlchemy models
and ensure filters don't allow SQL injection (ORM handles most, but we limit scope).
"""
if (
PluginPermission.READ_ONLY not in self._permissions
and PluginPermission.READ_DATA not in self._permissions
):
raise PermissionError(f"Plugin {self._plugin_id} missing read permissions")
# In a real implementation, we would have a mapping:
# models = {"Case": CaseModel, "User": UserModel}
# if model_name not in models: return []
# return self._service.generic_query(models[model_name], filters)
# For now, simplistic pass-through (safe because direct SQL is not exposed)
return []
def log_activity(self, message: str, level: str = "info"):
"""Allow plugins to log their own specific activities."""
# Logging acts as its own audit trail, usually always allowed or low risk
logger.info(f"[PLUGIN:{self._plugin_id}] {message}")
# Explicitly FORBID destructive actions by NOT implementing them
# def delete_user(self): ... <-- DOES NOT EXIST
|