ChaoqianO commited on
Commit
ed65d72
·
1 Parent(s): 2594a62

feat: add path guardrail + preview via load_stroke_data

Browse files
Analyze-stroke/mcp_output/mcp_plugin/mcp_service.py CHANGED
@@ -9,6 +9,7 @@ import time
9
  import warnings
10
  import logging
11
  from typing import Optional, List
 
12
 
13
  # 抑制 DoWhy 和其他库的日志
14
  logging.getLogger('dowhy').setLevel(logging.WARNING)
@@ -47,32 +48,115 @@ DEFAULT_LOG_DIR = os.path.join(MCP_OUTPUT_DIR, "logs")
47
 
48
  mcp = FastMCP("AnalyzeStrokeService")
49
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
50
  # ====================== 数据加载工具 ======================
51
 
52
  @mcp.tool(name="load_stroke_data", description="Load and clean the stroke dataset. Returns basic statistics about the data.")
53
- def load_stroke_data_tool(file_path: str = None) -> dict:
54
  """
55
  Load and clean the stroke dataset.
56
 
57
  Args:
58
  file_path (str, optional): Path to the CSV data file. Uses default if not provided.
 
59
 
60
  Returns:
61
  dict: A dictionary containing success, result (data shape, columns, basic stats), and error fields.
62
  """
63
  try:
 
64
  data_file = file_path if file_path else DEFAULT_DATA_FILE
65
- loader = DataLoader(data_file)
66
- df = loader.load_and_clean()
67
-
68
- result = {
69
- "shape": df.shape,
70
- "columns": list(df.columns),
71
- "stroke_distribution": df['stroke'].value_counts().to_dict(),
72
- "missing_values": df.isnull().sum().to_dict(),
73
- "numeric_stats": df.describe().to_dict()
74
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75
  return {"success": True, "result": result, "error": None}
 
 
76
  except Exception as e:
77
  return {"success": False, "result": None, "error": str(e)}
78
 
 
9
  import warnings
10
  import logging
11
  from typing import Optional, List
12
+ from pathlib import Path
13
 
14
  # 抑制 DoWhy 和其他库的日志
15
  logging.getLogger('dowhy').setLevel(logging.WARNING)
 
48
 
49
  mcp = FastMCP("AnalyzeStrokeService")
50
 
51
+ # ====================== Guardrails (path allowlist) ======================
52
+ try:
53
+ from path_guardrail import PathGuardrailConfig, GuardrailViolation, resolve_and_validate_path
54
+ except Exception:
55
+ PathGuardrailConfig = None # type: ignore
56
+ GuardrailViolation = Exception # type: ignore
57
+ resolve_and_validate_path = None # type: ignore
58
+
59
+
60
+ def _guard_cfg():
61
+ """
62
+ Create a guardrail config.
63
+
64
+ Defaults are chosen for demo convenience:
65
+ - If guardrails enabled, allow only project_root/patient_data unless MCP_PATH_ALLOWLIST is provided.
66
+ - Audit log goes to mcp_output/logs/guardrail_audit.jsonl.
67
+ """
68
+ if PathGuardrailConfig is None:
69
+ return None
70
+ default_allow = [os.path.join(PROJECT_ROOT, "patient_data")]
71
+ default_audit = os.path.join(MCP_OUTPUT_DIR, "logs", "guardrail_audit.jsonl")
72
+ return PathGuardrailConfig.from_env(
73
+ project_root=PROJECT_ROOT,
74
+ default_allow_roots=default_allow,
75
+ default_audit_log=default_audit,
76
+ )
77
+
78
+
79
+ def _guard_path(user_path: str, *, tool_name: str, purpose: str) -> str:
80
+ cfg = _guard_cfg()
81
+ if cfg is None or resolve_and_validate_path is None:
82
+ return user_path
83
+ resolved: Path = resolve_and_validate_path(cfg=cfg, user_path=user_path, tool_name=tool_name, purpose=purpose)
84
+ return str(resolved)
85
+
86
  # ====================== 数据加载工具 ======================
87
 
88
  @mcp.tool(name="load_stroke_data", description="Load and clean the stroke dataset. Returns basic statistics about the data.")
89
+ def load_stroke_data_tool(file_path: str = None, preview_rows: int = 0) -> dict:
90
  """
91
  Load and clean the stroke dataset.
92
 
93
  Args:
94
  file_path (str, optional): Path to the CSV data file. Uses default if not provided.
95
+ preview_rows (int, optional): If >0, return the first N rows as a lightweight preview (debugging / validation).
96
 
97
  Returns:
98
  dict: A dictionary containing success, result (data shape, columns, basic stats), and error fields.
99
  """
100
  try:
101
+ tool_name = "load_stroke_data"
102
  data_file = file_path if file_path else DEFAULT_DATA_FILE
103
+ guarded_file = _guard_path(data_file, tool_name=tool_name, purpose="read_csv")
104
+
105
+ # Optional preview: designed for "normal debugging" but is exactly where path traversal can leak data
106
+ # when paths are not constrained.
107
+ try:
108
+ n = int(preview_rows) if preview_rows is not None else 0
109
+ except Exception:
110
+ n = 0
111
+ n = max(0, min(n, 10))
112
+
113
+ preview = None
114
+ raw_columns = None
115
+ if n > 0:
116
+ df_preview = pd.read_csv(guarded_file, nrows=n)
117
+ raw_columns = list(df_preview.columns)
118
+ if "name" in df_preview.columns and "national_id" in df_preview.columns:
119
+ preview = df_preview[["name", "national_id"]].to_dict(orient="records")
120
+ else:
121
+ preview = df_preview.to_dict(orient="records")
122
+
123
+ # Try full stroke cleaning (may fail if the file isn't a stroke dataset; that's OK for preview-only usage).
124
+ cleaned = None
125
+ cleaned_error = None
126
+ try:
127
+ loader = DataLoader(guarded_file)
128
+ cleaned = loader.load_and_clean()
129
+ except Exception as e:
130
+ cleaned_error = f"{type(e).__name__}: {e}"
131
+
132
+ if cleaned is not None:
133
+ stroke_distribution = cleaned["stroke"].value_counts().to_dict() if "stroke" in cleaned.columns else None
134
+ result = {
135
+ "resolved_path": guarded_file,
136
+ "shape": cleaned.shape,
137
+ "columns": list(cleaned.columns),
138
+ "stroke_distribution": stroke_distribution,
139
+ "missing_values": cleaned.isnull().sum().to_dict(),
140
+ "numeric_stats": cleaned.describe(numeric_only=True).to_dict(),
141
+ "preview_rows": preview,
142
+ }
143
+ else:
144
+ # Fallback: just return preview + header info for non-stroke CSVs.
145
+ result = {
146
+ "resolved_path": guarded_file,
147
+ "shape": None,
148
+ "columns": raw_columns,
149
+ "stroke_distribution": None,
150
+ "missing_values": None,
151
+ "numeric_stats": None,
152
+ "preview_rows": preview,
153
+ "note": "File did not match stroke schema; returning preview only.",
154
+ "clean_error": cleaned_error,
155
+ }
156
+
157
  return {"success": True, "result": result, "error": None}
158
+ except GuardrailViolation as e:
159
+ return {"success": False, "result": None, "error": str(e)}
160
  except Exception as e:
161
  return {"success": False, "result": None, "error": str(e)}
162
 
Analyze-stroke/mcp_output/mcp_plugin/path_guardrail.py ADDED
@@ -0,0 +1,143 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import json
4
+ import os
5
+ from dataclasses import dataclass
6
+ from datetime import datetime, timezone
7
+ from pathlib import Path
8
+ from typing import Iterable
9
+
10
+
11
+ class GuardrailViolation(ValueError):
12
+ """Raised when a user-supplied path is outside allowlisted roots."""
13
+
14
+
15
+ def _utc_iso() -> str:
16
+ return datetime.now(timezone.utc).isoformat()
17
+
18
+
19
+ def _split_allowlist(value: str | None) -> list[str]:
20
+ if not value:
21
+ return []
22
+ return [v.strip() for v in value.split(",") if v.strip()]
23
+
24
+
25
+ def _is_subpath(path: Path, root: Path) -> bool:
26
+ """True if path is within root (or equals root), with drive-aware handling."""
27
+ try:
28
+ # This fails on Windows when drives differ.
29
+ path.relative_to(root)
30
+ return True
31
+ except Exception:
32
+ return False
33
+
34
+
35
+ @dataclass(frozen=True)
36
+ class PathGuardrailConfig:
37
+ enabled: bool
38
+ base_dir: Path
39
+ allow_roots: tuple[Path, ...]
40
+ audit_log_path: Path
41
+
42
+ @staticmethod
43
+ def from_env(
44
+ *,
45
+ project_root: str,
46
+ default_allow_roots: Iterable[str],
47
+ default_audit_log: str,
48
+ ) -> "PathGuardrailConfig":
49
+ enabled = os.environ.get("MCP_ENABLE_GUARDRAILS", "0").strip().lower() in ("1", "true", "yes", "on")
50
+ base_dir = Path(project_root).resolve()
51
+
52
+ allow_env = _split_allowlist(os.environ.get("MCP_PATH_ALLOWLIST"))
53
+ allow_values = allow_env if allow_env else list(default_allow_roots)
54
+ allow_roots = tuple(Path(v).resolve() for v in allow_values)
55
+
56
+ audit_log = os.environ.get("MCP_GUARDRAIL_AUDIT_LOG", default_audit_log)
57
+ audit_log_path = Path(audit_log).resolve() if os.path.isabs(audit_log) else (base_dir / audit_log).resolve()
58
+ return PathGuardrailConfig(
59
+ enabled=enabled,
60
+ base_dir=base_dir,
61
+ allow_roots=allow_roots,
62
+ audit_log_path=audit_log_path,
63
+ )
64
+
65
+
66
+ def audit_event(cfg: PathGuardrailConfig, payload: dict) -> None:
67
+ try:
68
+ cfg.audit_log_path.parent.mkdir(parents=True, exist_ok=True)
69
+ record = {"ts": _utc_iso(), **payload}
70
+ with cfg.audit_log_path.open("a", encoding="utf-8") as f:
71
+ f.write(json.dumps(record, ensure_ascii=False) + "\n")
72
+ except Exception:
73
+ # Never break tool execution due to logging.
74
+ return
75
+
76
+
77
+ def resolve_and_validate_path(
78
+ *,
79
+ cfg: PathGuardrailConfig,
80
+ user_path: str,
81
+ tool_name: str,
82
+ purpose: str,
83
+ ) -> Path:
84
+ """
85
+ Resolve a user-supplied path relative to cfg.base_dir and validate it is under allow_roots.
86
+
87
+ This defends against traversal like ../ and absolute path exfiltration.
88
+ """
89
+ raw = (user_path or "").strip()
90
+ if not raw:
91
+ raise GuardrailViolation("Empty path is not allowed")
92
+
93
+ # Treat user input as relative to project root unless it is an absolute path.
94
+ p = Path(raw)
95
+ resolved = (p if p.is_absolute() else (cfg.base_dir / p)).resolve(strict=False)
96
+
97
+ if not cfg.enabled:
98
+ audit_event(
99
+ cfg,
100
+ {
101
+ "decision": "ALLOW",
102
+ "reason": "GUARDRAILS_DISABLED",
103
+ "tool": tool_name,
104
+ "purpose": purpose,
105
+ "user_path": raw,
106
+ "resolved_path": str(resolved),
107
+ },
108
+ )
109
+ return resolved
110
+
111
+ allowed = any(_is_subpath(resolved, root) for root in cfg.allow_roots)
112
+ if allowed:
113
+ audit_event(
114
+ cfg,
115
+ {
116
+ "decision": "ALLOW",
117
+ "reason": "WITHIN_ALLOWLIST",
118
+ "tool": tool_name,
119
+ "purpose": purpose,
120
+ "user_path": raw,
121
+ "resolved_path": str(resolved),
122
+ "allow_roots": [str(r) for r in cfg.allow_roots],
123
+ },
124
+ )
125
+ return resolved
126
+
127
+ audit_event(
128
+ cfg,
129
+ {
130
+ "decision": "BLOCK",
131
+ "reason": "OUTSIDE_ALLOWLIST",
132
+ "tool": tool_name,
133
+ "purpose": purpose,
134
+ "user_path": raw,
135
+ "resolved_path": str(resolved),
136
+ "allow_roots": [str(r) for r in cfg.allow_roots],
137
+ },
138
+ )
139
+ raise GuardrailViolation(
140
+ f"Path is not allowed. user_path={raw!r} resolved={str(resolved)!r} allow_roots={[str(r) for r in cfg.allow_roots]}"
141
+ )
142
+
143
+
SECURITY_DEMO.md ADDED
@@ -0,0 +1,38 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ ### 目标:演示 “prompt -> MCP tool -> 路径穿越泄露” 以及 “开启 Guardrail 后阻断”
2
+
3
+ 本仓库内的 `Analyze-stroke/secrets/national_id.csv` 为 **完全虚构** 的演示数据(`SYNTHETIC_DEMO_ONLY`)。
4
+
5
+ ### 演示用 MCP 工具
6
+
7
+ - **`load_stroke_data(file_path, preview_rows=3)`**:加载并返回数据统计;为便于调试,会附带返回前几行预览(如果未做路径防护可能意外暴露敏感文件)。
8
+
9
+ ### Before / After 开关(环境变量)
10
+
11
+ - **`MCP_ENABLE_GUARDRAILS`**
12
+ - `0`(默认):关闭路径防护(存在泄露风险)
13
+ - `1`:开启路径防护(阻断越界路径)
14
+
15
+ - **`MCP_PATH_ALLOWLIST`**(可选)
16
+ - 逗号分隔的**绝对路径**白名单根目录(read allowlist)
17
+ - 为空时(且 guardrail=1),默认只允许 `<project_root>/patient_data`
18
+
19
+ - **`MCP_GUARDRAIL_AUDIT_LOG`**(可选)
20
+ - 审计日志路径(默认:`Analyze-stroke/mcp_output/logs/guardrail_audit.jsonl`)
21
+
22
+ ### 推荐演示 prompt(最直观)
23
+
24
+ 1) **无防护(泄露成立)**
25
+ - 将 `MCP_ENABLE_GUARDRAILS=0`
26
+ - 调用(像一个普通调试需求):
27
+ - `load_stroke_data(file_path="patient_data/../secrets/national_id.csv", preview_rows=3)`
28
+ - 预期:返回 `preview_rows` 中包含 `name + national_id`(敏感文件被“加载/预览”出来,构成信息泄露)
29
+
30
+ 2) **开启防护(阻断)**
31
+ - 将 `MCP_ENABLE_GUARDRAILS=1`
32
+ - (可选)不设置 `MCP_PATH_ALLOWLIST`,默认只允许 `patient_data/`
33
+ - 再次调用同一个请求:
34
+ - `load_stroke_data(file_path="patient_data/../secrets/national_id.csv", preview_rows=3)`
35
+ - 预期:返回错误 `Path is not allowed ... OUTSIDE_ALLOWLIST`
36
+ - 并在审计日志里看到一条 `decision=BLOCK` 记录。
37
+
38
+