geoforce / agent /runtime.py
Ubuntu
fix(agent): use claude-opus-4-7 to match hackathon + repo-wide config
72309a5
"""Claude agent runtime for GeoForce-CCHackathon.
Wires the two physics engines — `tools.predict_solver` and
`tools.predict_surrogate` — into an in-process SDK MCP server, then runs
a `ClaudeSDKClient` session that can answer geothermal questions using
those tools.
Run directly to answer Q1:
.venv/bin/python -m agent.runtime \\
"If I drill at x=200m, y=100m, what reservoir temperature will I hit \\
after 1 year of 0.5 kg/s cold water reinjection at x=50m, y=100m?"
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import sys
from pathlib import Path
from typing import Any
import numpy as np
from dotenv import load_dotenv
from claude_agent_sdk import (
AssistantMessage,
ClaudeAgentOptions,
ClaudeSDKClient,
ResultMessage,
TextBlock,
ToolUseBlock,
create_sdk_mcp_server,
tool,
)
from tools.monte_carlo import run as _mc_run
from tools.predict_solver import predict as _solver_predict
from tools.predict_surrogate import predict as _surrogate_predict
from tools.sensitivity import run as _sens_run
DEFAULT_MODEL = "claude-opus-4-7"
SYSTEM_PROMPT = """You are GeoForce, an Indonesian geothermal reservoir engineering agent.
You have four tools:
1. `predict_solver` — runs GeoForce-Solver, a from-scratch implicit
backward-Euler Darcy + energy (conduction + upwind advection) solver.
Use this when the user wants a physics-grounded, benchmark-validated
answer. Slower (seconds), but the numerics are trustworthy.
2. `predict_surrogate` — runs the v1.1 ReservoirCNN surrogate.
Fast (tens of ms), best for sweeps and UQ. 32x32 grid only.
3. `monte_carlo` — Monte Carlo ensemble over parameter distributions.
Returns P10/P50/P90 fields + per-draw scalars. Defaults to surrogate
engine. Use for Q2-style "how confident" / "P10/P50/P90" questions.
4. `sensitivity_oat` — one-at-a-time sensitivity sweep for a scenario.
Ranks parameters by how much they move a chosen scalar metric
(probe_temperature_C, mean_temperature_C, etc). Use to answer
"which parameter matters most?" or Q3-style placement questions.
Scenario dict schema (keys are optional; sensible defaults exist):
- nx, ny (int) grid cell counts
- dx, dy (float, m) cell size
- porosity (float)
- permeability (float, m^2) OR log_permeability (float, log10 m^2)
- rho_rock, cp_rock, lam_rock (floats, SI)
- T_initial (float, degC)
- P_initial (float, Pa) [solver] / base_pressure (float, Pa) [surrogate]
- depth (float, m) [surrogate]
- dt (float, s), n_steps (int) [solver only]
- wells: list of {i:int, j:int, mass_rate:float (kg/s, +inj, -prod),
injection_temperature: float (degC, required if mass_rate>0)}
When answering "if I drill at (x,y), what temperature will I hit?":
1. Translate the user's (x, y) meters to grid cell (i, j) using dx/dy.
2. Build a scenario dict and call `predict_solver`.
3. Read `result["temperature"][i, j]` to get the temperature at the drill
location after the simulated elapsed time (dt * n_steps seconds).
4. Report the temperature, the elapsed simulated time, and any wells that
influenced the field.
Always cite which engine you used and the elapsed wall-clock seconds.
Keep your final answer to ≤ 4 sentences unless more detail is asked for.
"""
def _serialize_scenario_result(result: dict[str, Any]) -> dict[str, Any]:
"""Turn a predict() result into a JSON-serializable summary for the model.
Returning the full (nx, ny) array is too heavy for an LLM tool response,
so we return grid metadata + summary statistics + a down-sampled preview.
"""
t = np.asarray(result["temperature"])
p = np.asarray(result["pressure"])
grid = result["grid"]
# 8x8 preview, bilinear-ish via slicing
stride_x = max(1, t.shape[0] // 8)
stride_y = max(1, t.shape[1] // 8)
t_preview = t[::stride_x, ::stride_y].round(2).tolist()
p_preview = (p[::stride_x, ::stride_y] / 1.0e6).round(3).tolist() # MPa
return {
"engine": result["engine"],
"elapsed_seconds": round(float(result["elapsed_seconds"]), 4),
"grid": grid,
"temperature_C": {
"shape": list(t.shape),
"min": float(t.min()),
"max": float(t.max()),
"mean": float(t.mean()),
"preview_8x8": t_preview,
},
"pressure_MPa": {
"shape": list(p.shape),
"min": float(p.min() / 1.0e6),
"max": float(p.max() / 1.0e6),
"mean": float(p.mean() / 1.0e6),
"preview_8x8": p_preview,
},
}
def _cell_value(result: dict[str, Any], x_m: float, y_m: float) -> dict[str, Any]:
grid = result["grid"]
i = int(round(x_m / grid["dx"] - 0.5))
j = int(round(y_m / grid["dy"] - 0.5))
i = int(np.clip(i, 0, grid["nx"] - 1))
j = int(np.clip(j, 0, grid["ny"] - 1))
return {
"i": i,
"j": j,
"x_cell_center_m": (i + 0.5) * grid["dx"],
"y_cell_center_m": (j + 0.5) * grid["dy"],
"temperature_C": float(result["temperature"][i, j]),
"pressure_MPa": float(result["pressure"][i, j] / 1.0e6),
}
@tool(
"predict_solver",
"Run the GeoForce-Solver (implicit Darcy + energy + upwind advection) on "
"a scenario dict. Returns grid metadata, summary stats, an 8x8 preview of "
"the final temperature/pressure fields, and the wall-clock runtime. For a "
"drill-site question, also pass `probe_x_m` and `probe_y_m` to get the "
"temperature + pressure at that location.",
{
"scenario": dict,
"probe_x_m": float,
"probe_y_m": float,
},
)
async def predict_solver_tool(args: dict[str, Any]) -> dict[str, Any]:
scenario = args.get("scenario") or {}
result = _solver_predict(scenario)
payload = _serialize_scenario_result(result)
if "probe_x_m" in args and "probe_y_m" in args:
payload["probe"] = _cell_value(
result, float(args["probe_x_m"]), float(args["probe_y_m"])
)
return {"content": [{"type": "text", "text": json.dumps(payload)}]}
@tool(
"predict_surrogate",
"Run the v1.1 ReservoirCNN surrogate on a scenario dict. 32x32 grid, "
"fast (~10-100ms). Returns the same schema as predict_solver. For a "
"drill-site question, pass `probe_x_m` and `probe_y_m`.",
{
"scenario": dict,
"probe_x_m": float,
"probe_y_m": float,
},
)
async def predict_surrogate_tool(args: dict[str, Any]) -> dict[str, Any]:
scenario = args.get("scenario") or {}
result = _surrogate_predict(scenario)
payload = _serialize_scenario_result(result)
if "probe_x_m" in args and "probe_y_m" in args:
payload["probe"] = _cell_value(
result, float(args["probe_x_m"]), float(args["probe_y_m"])
)
return {"content": [{"type": "text", "text": json.dumps(payload)}]}
def _serialize_mc_result(result: dict[str, Any]) -> dict[str, Any]:
"""Compact MC result for LLM consumption: 8x8 P10/P50/P90 previews + scalars."""
p10 = np.asarray(result["p10"])
p50 = np.asarray(result["p50"])
p90 = np.asarray(result["p90"])
sx = max(1, p50.shape[0] // 8)
sy = max(1, p50.shape[1] // 8)
return {
"engine": result["engine"],
"n_samples": result["n_samples"],
"elapsed_seconds": round(float(result["elapsed_seconds"]), 4),
"temperature_C": {
"shape": list(p50.shape),
"p10_8x8": p10[::sx, ::sy].round(2).tolist(),
"p50_8x8": p50[::sx, ::sy].round(2).tolist(),
"p90_8x8": p90[::sx, ::sy].round(2).tolist(),
},
"scalar_summary": result["scalar_summary"],
}
@tool(
"monte_carlo",
"Monte Carlo ensemble over parameter distributions. Pass `scenario` "
"(base dict), `distributions` (name -> {dist, ...}), optional "
"`n_samples` (default 200), `engine` ('surrogate' or 'solver', default "
"surrogate), and `seed`. Returns 8x8 P10/P50/P90 temperature previews "
"and per-draw scalar summaries.",
{
"scenario": dict,
"distributions": dict,
"n_samples": int,
"engine": str,
"seed": int,
},
)
async def monte_carlo_tool(args: dict[str, Any]) -> dict[str, Any]:
scenario = args.get("scenario") or {}
distributions = args.get("distributions") or {}
n_samples = int(args.get("n_samples", 200))
engine = str(args.get("engine", "surrogate"))
seed = int(args["seed"]) if "seed" in args else 0
result = _mc_run(
scenario,
distributions,
n_samples=n_samples,
engine=engine,
seed=seed,
)
return {"content": [{"type": "text", "text": json.dumps(_serialize_mc_result(result))}]}
@tool(
"sensitivity_oat",
"One-at-a-time sensitivity sweep. Pass `scenario`, `params` (name -> "
"{low, high}), optional `n_points` (default 5), `engine` ('surrogate' "
"or 'solver'), `metric` ('probe_temperature_C', 'mean_temperature_C', "
"'min_temperature_C', 'max_temperature_C', 'mean_pressure_MPa'), and "
"probe_x_m/probe_y_m if using probe_temperature_C. Returns per-parameter "
"sweep curves plus a ranking by |Δmetric|.",
{
"scenario": dict,
"params": dict,
"n_points": int,
"engine": str,
"metric": str,
"probe_x_m": float,
"probe_y_m": float,
},
)
async def sensitivity_tool(args: dict[str, Any]) -> dict[str, Any]:
scenario = args.get("scenario") or {}
params = args.get("params") or {}
kwargs: dict[str, Any] = {
"engine": str(args.get("engine", "surrogate")),
"n_points": int(args.get("n_points", 5)),
"metric": str(args.get("metric", "probe_temperature_C")),
}
if "probe_x_m" in args:
kwargs["probe_x_m"] = float(args["probe_x_m"])
if "probe_y_m" in args:
kwargs["probe_y_m"] = float(args["probe_y_m"])
result = _sens_run(scenario, params, **kwargs)
# result is already LLM-sized; just round floats.
for name, curve in result["curves"].items():
curve["values"] = [round(float(v), 6) for v in curve["values"]]
curve["metric"] = [round(float(m), 4) for m in curve["metric"]]
curve["delta"] = round(float(curve["delta"]), 4)
curve["slope_per_unit"] = round(float(curve["slope_per_unit"]), 4)
result["baseline_metric"] = round(float(result["baseline_metric"]), 4)
result["elapsed_seconds"] = round(float(result["elapsed_seconds"]), 4)
return {"content": [{"type": "text", "text": json.dumps(result)}]}
def _load_env() -> None:
repo_root = Path(__file__).resolve().parent.parent
load_dotenv(repo_root / ".env")
if not os.environ.get("ANTHROPIC_API_KEY"):
sys.stderr.write("ANTHROPIC_API_KEY missing (expected in .env)\n")
sys.exit(2)
def build_options() -> ClaudeAgentOptions:
mcp_server = create_sdk_mcp_server(
name="geoforce-tools",
tools=[
predict_solver_tool,
predict_surrogate_tool,
monte_carlo_tool,
sensitivity_tool,
],
)
return ClaudeAgentOptions(
model=DEFAULT_MODEL,
system_prompt=SYSTEM_PROMPT,
mcp_servers={"geoforce": mcp_server},
allowed_tools=[
"mcp__geoforce__predict_solver",
"mcp__geoforce__predict_surrogate",
"mcp__geoforce__monte_carlo",
"mcp__geoforce__sensitivity_oat",
],
max_turns=12,
)
async def answer(query: str, *, verbose: bool = True) -> str:
"""Run one query end-to-end and return the final assistant text."""
_load_env()
options = build_options()
final_text_parts: list[str] = []
async with ClaudeSDKClient(options=options) as client:
await client.query(query)
async for message in client.receive_response():
if verbose and isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
print(block.text, end="", flush=True)
elif isinstance(block, ToolUseBlock):
print(f"\n[tool: {block.name}]", flush=True)
elif isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
final_text_parts.append(block.text)
if isinstance(message, ResultMessage):
if verbose:
print("", flush=True)
if not final_text_parts and message.result:
final_text_parts.append(message.result)
break
return "".join(final_text_parts) if final_text_parts else ""
def main() -> None:
parser = argparse.ArgumentParser(description="Answer a geothermal question with the GeoForce agent.")
parser.add_argument("query", nargs="+", help="Natural-language question.")
parser.add_argument("--quiet", action="store_true", help="Suppress streaming output.")
args = parser.parse_args()
q = " ".join(args.query)
asyncio.run(answer(q, verbose=not args.quiet))
if __name__ == "__main__":
main()