BioMCP-explorer / core /runner.py
singhankit16's picture
Deploy BioMCP Explorer
f103ad7
"""
Subprocess wrapper for the biomcp CLI.
Runs biomcp commands and returns structured (JSON) or markdown output.
"""
import json
import os
import subprocess
import shutil
import sys
from typing import Optional
def find_biomcp() -> str:
"""Locate the biomcp binary on PATH or common install locations."""
path = shutil.which("biomcp")
if path:
return path
# Check Python Scripts directory (pip install location)
candidates = [
os.path.join(os.path.dirname(sys.executable), "Scripts", "biomcp.exe"),
os.path.join(os.path.dirname(sys.executable), "Scripts", "biomcp"),
os.path.join(os.path.dirname(sys.executable), "biomcp.exe"),
os.path.join(os.path.dirname(sys.executable), "biomcp"),
]
# Also check the Python that `py` launcher uses
for base in [
os.path.expanduser("~/.local/bin"),
os.path.expandvars(r"%LOCALAPPDATA%\Python\pythoncore-3.14-64\Scripts"),
os.path.expandvars(r"%LOCALAPPDATA%\Python\pythoncore-3.13-64\Scripts"),
os.path.expandvars(r"%LOCALAPPDATA%\Python\pythoncore-3.12-64\Scripts"),
]:
candidates.append(os.path.join(base, "biomcp.exe"))
candidates.append(os.path.join(base, "biomcp"))
for c in candidates:
if os.path.isfile(c):
return c
raise FileNotFoundError(
"biomcp not found on PATH. Install with: pip install biomcp-cli"
)
def run(
args: list[str],
*,
json_mode: bool = True,
no_cache: bool = False,
env_overrides: Optional[dict[str, str]] = None,
timeout: int = 120,
) -> dict:
"""
Execute a biomcp CLI command.
Returns dict with keys:
- success: bool
- command: str (the full command string)
- markdown: str (raw stdout, always present)
- data: dict | list | None (parsed JSON when json_mode=True)
- error: str | None
"""
import os
biomcp = find_biomcp()
cmd = [biomcp]
if json_mode:
cmd.append("--json")
if no_cache:
cmd.append("--no-cache")
cmd.extend(args)
env = os.environ.copy()
if env_overrides:
for k, v in env_overrides.items():
if v: # only set non-empty values
env[k] = v
command_str = " ".join(cmd)
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
env=env,
)
stdout = result.stdout.strip()
stderr = result.stderr.strip()
if result.returncode != 0:
error_msg = stderr or stdout or f"Command exited with code {result.returncode}"
return {
"success": False,
"command": command_str,
"markdown": error_msg,
"data": None,
"error": error_msg,
}
# Try parsing JSON
data = None
if json_mode and stdout:
try:
data = json.loads(stdout)
except json.JSONDecodeError:
pass
# For markdown display: if we got JSON, pretty-print it; otherwise use raw stdout
if data is not None:
markdown = _json_to_markdown(data)
else:
markdown = stdout
return {
"success": True,
"command": command_str,
"markdown": markdown,
"data": data,
"error": None,
}
except subprocess.TimeoutExpired:
return {
"success": False,
"command": command_str,
"markdown": "",
"data": None,
"error": f"Command timed out after {timeout}s",
}
except FileNotFoundError:
return {
"success": False,
"command": command_str,
"markdown": "",
"data": None,
"error": "biomcp binary not found. Install with: uv tool install biomcp-cli",
}
except Exception as e:
return {
"success": False,
"command": command_str,
"markdown": "",
"data": None,
"error": str(e),
}
def run_markdown(
args: list[str],
*,
no_cache: bool = False,
env_overrides: Optional[dict[str, str]] = None,
timeout: int = 120,
) -> dict:
"""Run command in markdown mode (no --json flag)."""
return run(
args,
json_mode=False,
no_cache=no_cache,
env_overrides=env_overrides,
timeout=timeout,
)
def _is_small_dict(d: dict) -> bool:
"""Check if a dict is simple enough to render inline."""
return (len(d) <= 4
and all(isinstance(v, (str, int, float, bool, type(None))) for v in d.values()))
def _json_to_markdown(data, level=1) -> str:
"""Convert structured JSON response to readable markdown."""
if isinstance(data, str):
return data
if isinstance(data, (int, float, bool)):
return str(data)
if data is None:
return ""
if isinstance(data, list):
if not data:
return ""
# List of simple values
if all(isinstance(v, (str, int, float, bool)) for v in data):
return ", ".join(str(v) for v in data)
# List of small dicts — render as compact table-like rows
if all(isinstance(v, dict) and _is_small_dict(v) for v in data):
rows = []
for item in data[:15]:
parts = [f"{v}" for v in item.values() if v is not None and v != ""]
rows.append(" · ".join(parts))
result = "\n".join(f"- {r}" for r in rows)
if len(data) > 15:
result += f"\n- *(+{len(data) - 15} more)*"
return result
# List of larger dicts — render each as a block
parts = []
for i, item in enumerate(data, 1):
if isinstance(item, dict):
label = (item.get("label") or item.get("name") or item.get("title")
or item.get("primary_id") or item.get("id") or f"Item {i}")
hdr = "#" * min(level + 1, 5)
parts.append(f"{hdr} {i}. {label}")
parts.append(_format_dict(item, level + 1))
else:
parts.append(f"- {item}")
return "\n\n".join(parts)
if isinstance(data, dict):
return _format_dict(data, level)
return str(data)
def _format_dict(d: dict, level: int = 1) -> str:
"""Format a dict as readable markdown key-value pairs."""
lines = []
skip = {"_meta", "_links"}
for key, value in d.items():
if key in skip or key.startswith("_"):
continue
nice_key = key.replace("_", " ").replace("-", " ").title()
if value is None or value == "" or value == []:
continue
if isinstance(value, (str, int, float, bool)):
lines.append(f"**{nice_key}:** {value}")
elif isinstance(value, list):
if all(isinstance(v, (str, int, float)) for v in value):
shown = ", ".join(str(v) for v in value[:8])
more = f" *(+{len(value) - 8} more)*" if len(value) > 8 else ""
lines.append(f"**{nice_key}:** {shown}{more}")
elif all(isinstance(v, dict) and _is_small_dict(v) for v in value):
hdr = "#" * min(level + 1, 5)
lines.append(f"\n{hdr} {nice_key}\n")
lines.append(_json_to_markdown(value, level + 1))
else:
hdr = "#" * min(level + 1, 5)
lines.append(f"\n{hdr} {nice_key}\n")
lines.append(_json_to_markdown(value, level + 1))
elif isinstance(value, dict):
hdr = "#" * min(level + 1, 5)
lines.append(f"\n{hdr} {nice_key}\n")
lines.append(_format_dict(value, level + 1))
return "\n\n".join(lines)