teoat's picture
Upload core/plugin_system/facades.py with huggingface_hub
3c3608b verified
import logging
from typing import Any, Dict, List, Optional
from core.plugin_system.permissions import PluginPermission
logger = logging.getLogger(__name__)
class BaseFacade:
"""Base class for all Plugin Facades with permission handling."""
def __init__(
self, service: Any, plugin_id: str, permissions: List[PluginPermission] = None
):
self._service = service
self._plugin_id = plugin_id
self._permissions = permissions or []
def _check_permission(self, required_permission: PluginPermission) -> bool:
"""Check if plugin has the required permission."""
if required_permission not in self._permissions:
logger.warning(
f"Plugin {self._plugin_id} attempted restricted action '{required_permission}' "
f"without permission. Granted: {self._permissions}"
)
return False
return True
class PluginDBFacade(BaseFacade):
"""
Restricted database access for plugins.
Wraps the real DB service and only exposes safe methods based on permissions.
"""
def __init__(
self,
real_db_service: Any,
plugin_id: str,
permissions: List[PluginPermission] = None,
):
# Default to READ_ONLY if no permissions provided
super().__init__(
real_db_service, plugin_id, permissions or [PluginPermission.READ_ONLY]
)
def get_user(self, user_id: str) -> Optional[Any]:
"""Safe user retrieval."""
if not self._check_permission(PluginPermission.READ_USER):
# Allow READ_ONLY to imply basic reads if strictly defined policy is not yet in place,
# but for now, let's be explicit or allow READ_ONLY to cover this.
if PluginPermission.READ_ONLY not in self._permissions:
raise PermissionError(
f"Plugin {self._plugin_id} missing 'READ_USER' permission"
)
return self._service.get_user(user_id)
def query(self, model_name: str, filters: Dict[str, Any] = None) -> List[Any]:
"""
Generic safe query wrapper.
Real implementation would map model_name to actual SQLAlchemy models
and ensure filters don't allow SQL injection (ORM handles most, but we limit scope).
"""
if (
PluginPermission.READ_ONLY not in self._permissions
and PluginPermission.READ_DATA not in self._permissions
):
raise PermissionError(f"Plugin {self._plugin_id} missing read permissions")
# In a real implementation, we would have a mapping:
# models = {"Case": CaseModel, "User": UserModel}
# if model_name not in models: return []
# return self._service.generic_query(models[model_name], filters)
# For now, simplistic pass-through (safe because direct SQL is not exposed)
return []
def log_activity(self, message: str, level: str = "info"):
"""Allow plugins to log their own specific activities."""
# Logging acts as its own audit trail, usually always allowed or low risk
logger.info(f"[PLUGIN:{self._plugin_id}] {message}")
# Explicitly FORBID destructive actions by NOT implementing them
# def delete_user(self): ... <-- DOES NOT EXIST