teoat commited on
Commit
3c3608b
·
verified ·
1 Parent(s): a61bfa6

Upload core/plugin_system/facades.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. core/plugin_system/facades.py +85 -0
core/plugin_system/facades.py ADDED
@@ -0,0 +1,85 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ from typing import Any, Dict, List, Optional
3
+
4
+ from core.plugin_system.permissions import PluginPermission
5
+
6
+ logger = logging.getLogger(__name__)
7
+
8
+
9
+ class BaseFacade:
10
+ """Base class for all Plugin Facades with permission handling."""
11
+
12
+ def __init__(
13
+ self, service: Any, plugin_id: str, permissions: List[PluginPermission] = None
14
+ ):
15
+ self._service = service
16
+ self._plugin_id = plugin_id
17
+ self._permissions = permissions or []
18
+
19
+ def _check_permission(self, required_permission: PluginPermission) -> bool:
20
+ """Check if plugin has the required permission."""
21
+ if required_permission not in self._permissions:
22
+ logger.warning(
23
+ f"Plugin {self._plugin_id} attempted restricted action '{required_permission}' "
24
+ f"without permission. Granted: {self._permissions}"
25
+ )
26
+ return False
27
+ return True
28
+
29
+
30
+ class PluginDBFacade(BaseFacade):
31
+ """
32
+ Restricted database access for plugins.
33
+ Wraps the real DB service and only exposes safe methods based on permissions.
34
+ """
35
+
36
+ def __init__(
37
+ self,
38
+ real_db_service: Any,
39
+ plugin_id: str,
40
+ permissions: List[PluginPermission] = None,
41
+ ):
42
+ # Default to READ_ONLY if no permissions provided
43
+ super().__init__(
44
+ real_db_service, plugin_id, permissions or [PluginPermission.READ_ONLY]
45
+ )
46
+
47
+ def get_user(self, user_id: str) -> Optional[Any]:
48
+ """Safe user retrieval."""
49
+ if not self._check_permission(PluginPermission.READ_USER):
50
+ # Allow READ_ONLY to imply basic reads if strictly defined policy is not yet in place,
51
+ # but for now, let's be explicit or allow READ_ONLY to cover this.
52
+ if PluginPermission.READ_ONLY not in self._permissions:
53
+ raise PermissionError(
54
+ f"Plugin {self._plugin_id} missing 'READ_USER' permission"
55
+ )
56
+
57
+ return self._service.get_user(user_id)
58
+
59
+ def query(self, model_name: str, filters: Dict[str, Any] = None) -> List[Any]:
60
+ """
61
+ Generic safe query wrapper.
62
+ Real implementation would map model_name to actual SQLAlchemy models
63
+ and ensure filters don't allow SQL injection (ORM handles most, but we limit scope).
64
+ """
65
+ if (
66
+ PluginPermission.READ_ONLY not in self._permissions
67
+ and PluginPermission.READ_DATA not in self._permissions
68
+ ):
69
+ raise PermissionError(f"Plugin {self._plugin_id} missing read permissions")
70
+
71
+ # In a real implementation, we would have a mapping:
72
+ # models = {"Case": CaseModel, "User": UserModel}
73
+ # if model_name not in models: return []
74
+ # return self._service.generic_query(models[model_name], filters)
75
+
76
+ # For now, simplistic pass-through (safe because direct SQL is not exposed)
77
+ return []
78
+
79
+ def log_activity(self, message: str, level: str = "info"):
80
+ """Allow plugins to log their own specific activities."""
81
+ # Logging acts as its own audit trail, usually always allowed or low risk
82
+ logger.info(f"[PLUGIN:{self._plugin_id}] {message}")
83
+
84
+ # Explicitly FORBID destructive actions by NOT implementing them
85
+ # def delete_user(self): ... <-- DOES NOT EXIST