| """Multi-specialist consultation — orchestrate the existing swarm-py |
| agent swarm to produce a multidisciplinary opinion on the digital twin. |
| |
| For rare diseases, a multidisciplinary team (MDT) is the gold standard. |
| We simulate one by routing the patient subgraph + summary through the |
| swarm-py agents tagged with each clinical specialty (cardio, neuro, |
| genetics, pediatrics, pharma, immuno, hepato, etc.) and synthesizing |
| their opinions into a structured `ConsultSpec`. |
| |
| Bootstrap path: re-uses `engine_v2`/`agent_coordinator` if available. |
| Phase-2: dedicated specialist agents fine-tuned per discipline. |
| """ |
| from __future__ import annotations |
| import asyncio |
| import logging |
| from typing import Optional |
|
|
| from .types import ConsultSpec, SpecialistOpinion |
|
|
| logger = logging.getLogger("gemeo.consult") |
|
|
|
|
| |
| DEFAULT_PANEL = [ |
| "geneticist", |
| "neurologist", |
| "pediatrician", |
| "immunologist", |
| "cardiologist", |
| "pharmacologist", |
| ] |
|
|
|
|
| async def _ask_specialist(specialty: str, prompt: str) -> Optional[dict]: |
| """Route the prompt to a specialist agent. |
| |
| First tries the swarm's native specialist routing; falls back to a |
| structured LLM call if the swarm is unavailable. |
| """ |
| try: |
| from agent_coordinator import ask_specialist |
| return await ask_specialist(specialty=specialty, prompt=prompt) |
| except Exception: |
| pass |
|
|
| |
| try: |
| from llm_router import call_llm |
| sys = ( |
| f"You are a senior {specialty}. Review the rare-disease case " |
| f"and produce a JSON object with keys: opinion (str), " |
| f"confidence (float 0..1), key_concerns (list[str]), " |
| f"recommended_next_steps (list[str]), " |
| f"red_flags_for_my_specialty (list[str])." |
| ) |
| out = await call_llm(system=sys, user=prompt, json_mode=True, max_tokens=600) |
| if isinstance(out, dict): |
| return out |
| import json |
| return json.loads(out) |
| except Exception as e: |
| logger.warning(f"specialist {specialty} failed: {e}") |
| return None |
|
|
|
|
| def _serialize_twin_for_prompt(twin) -> str: |
| """Compact human-readable summary for specialist consumption.""" |
| if twin is None: |
| return "" |
| parts = [] |
| if twin.diagnoses: |
| top = twin.diagnoses[:3] |
| parts.append("Top diagnoses: " + "; ".join( |
| f"{d.get('name','?')} (ORPHA:{d.get('orpha','?')}, p={d.get('probability', 0):.2f})" |
| for d in top |
| )) |
| if twin.subgraph and twin.subgraph.paths: |
| parts.append("Reasoning paths: " + "; ".join( |
| f"{p['target_name']}: {' → '.join(s.get('label','?') for s in p['steps'])}" |
| for p in twin.subgraph.paths[:2] |
| )) |
| if twin.risk: |
| parts.append( |
| f"Risk: severity={twin.risk.overall_severity:.2f}, " |
| f"progression={twin.risk.progression_risk:.2f}, " |
| f"urgency={twin.risk.treatment_urgency:.2f}" |
| ) |
| if twin.trajectory: |
| for h in twin.trajectory.horizons[:2]: |
| parts.append(f"T+{h.months}m: {h.state[:120]}") |
| if twin.drugs and twin.drugs.candidates: |
| parts.append("Drug candidates: " + ", ".join( |
| d.get("name", "?") for d in twin.drugs.candidates[:5] |
| )) |
| if twin.next_questions: |
| parts.append("Next questions: " + "; ".join( |
| f"{q.name} ({q.hpo_id})" for q in twin.next_questions[:3] |
| )) |
| return "\n".join(parts) |
|
|
|
|
| async def consult( |
| twin, |
| *, |
| panel: list[str] = None, |
| question: str = "Synthesise your opinion on this case.", |
| ) -> ConsultSpec: |
| """Run a multi-specialist consult on the twin.""" |
| panel = panel or DEFAULT_PANEL |
| summary = _serialize_twin_for_prompt(twin) |
| if not summary: |
| return ConsultSpec(opinions=[], synthesis="No twin context available.", panel=panel) |
|
|
| prompt = f"{summary}\n\nQuestion: {question}\n\nReturn JSON." |
|
|
| opinions = await asyncio.gather(*[ |
| _ask_specialist(sp, prompt) for sp in panel |
| ], return_exceptions=True) |
|
|
| parsed: list = [] |
| for sp, opin in zip(panel, opinions): |
| if isinstance(opin, Exception) or not opin: |
| parsed.append(SpecialistOpinion( |
| specialty=sp, opinion="(unavailable)", confidence=0.0, |
| key_concerns=[], recommended_next_steps=[], red_flags=[], |
| )) |
| continue |
| parsed.append(SpecialistOpinion( |
| specialty=sp, |
| opinion=str(opin.get("opinion", ""))[:600], |
| confidence=float(opin.get("confidence", 0.6) or 0.6), |
| key_concerns=list(opin.get("key_concerns", []))[:5], |
| recommended_next_steps=list(opin.get("recommended_next_steps", []))[:5], |
| red_flags=list(opin.get("red_flags_for_my_specialty", opin.get("red_flags", [])))[:5], |
| )) |
|
|
| |
| all_concerns = [c for o in parsed for c in (o.key_concerns or [])] |
| from collections import Counter |
| top_concerns = [c for c, _ in Counter(all_concerns).most_common(5)] |
| all_steps = [s for o in parsed for s in (o.recommended_next_steps or [])] |
| top_steps = [s for s, _ in Counter(all_steps).most_common(5)] |
|
|
| synthesis_lines = [] |
| if top_concerns: |
| synthesis_lines.append("Shared concerns: " + "; ".join(top_concerns)) |
| if top_steps: |
| synthesis_lines.append("Converged next steps: " + "; ".join(top_steps)) |
| flags = [f for o in parsed for f in (o.red_flags or [])] |
| if flags: |
| synthesis_lines.append("Red flags raised: " + "; ".join(flags[:5])) |
|
|
| synthesis = "\n".join(synthesis_lines) or "No consensus signal." |
|
|
| return ConsultSpec( |
| opinions=parsed, |
| synthesis=synthesis, |
| panel=panel, |
| ) |
|
|