File size: 3,315 Bytes
3c3608b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import logging
from typing import Any, Dict, List, Optional

from core.plugin_system.permissions import PluginPermission

logger = logging.getLogger(__name__)


class BaseFacade:
    """Base class for all Plugin Facades with permission handling."""

    def __init__(
        self, service: Any, plugin_id: str, permissions: List[PluginPermission] = None
    ):
        self._service = service
        self._plugin_id = plugin_id
        self._permissions = permissions or []

    def _check_permission(self, required_permission: PluginPermission) -> bool:
        """Check if plugin has the required permission."""
        if required_permission not in self._permissions:
            logger.warning(
                f"Plugin {self._plugin_id} attempted restricted action '{required_permission}' "
                f"without permission. Granted: {self._permissions}"
            )
            return False
        return True


class PluginDBFacade(BaseFacade):
    """
    Restricted database access for plugins.
    Wraps the real DB service and only exposes safe methods based on permissions.
    """

    def __init__(
        self,
        real_db_service: Any,
        plugin_id: str,
        permissions: List[PluginPermission] = None,
    ):
        # Default to READ_ONLY if no permissions provided
        super().__init__(
            real_db_service, plugin_id, permissions or [PluginPermission.READ_ONLY]
        )

    def get_user(self, user_id: str) -> Optional[Any]:
        """Safe user retrieval."""
        if not self._check_permission(PluginPermission.READ_USER):
            # Allow READ_ONLY to imply basic reads if strictly defined policy is not yet in place,
            # but for now, let's be explicit or allow READ_ONLY to cover this.
            if PluginPermission.READ_ONLY not in self._permissions:
                raise PermissionError(
                    f"Plugin {self._plugin_id} missing 'READ_USER' permission"
                )

        return self._service.get_user(user_id)

    def query(self, model_name: str, filters: Dict[str, Any] = None) -> List[Any]:
        """
        Generic safe query wrapper.
        Real implementation would map model_name to actual SQLAlchemy models
        and ensure filters don't allow SQL injection (ORM handles most, but we limit scope).
        """
        if (
            PluginPermission.READ_ONLY not in self._permissions
            and PluginPermission.READ_DATA not in self._permissions
        ):
            raise PermissionError(f"Plugin {self._plugin_id} missing read permissions")

        # In a real implementation, we would have a mapping:
        # models = {"Case": CaseModel, "User": UserModel}
        # if model_name not in models: return []
        # return self._service.generic_query(models[model_name], filters)

        # For now, simplistic pass-through (safe because direct SQL is not exposed)
        return []

    def log_activity(self, message: str, level: str = "info"):
        """Allow plugins to log their own specific activities."""
        # Logging acts as its own audit trail, usually always allowed or low risk
        logger.info(f"[PLUGIN:{self._plugin_id}] {message}")

    # Explicitly FORBID destructive actions by NOT implementing them
    # def delete_user(self): ...  <-- DOES NOT EXIST