multi-agent-lab / modal_app.py
agharsallah
feat(observability): instrument registry, tools, session, modal (Units 7,8,10,11)
cb2de15
Raw
History Blame Contribute Delete
3.39 kB
"""Optional serverless deployment — run a scenario on a schedule with Modal.
The ledger lives on a persistent Modal Volume, so each scheduled invocation
restores the run and advances it by one episode: the long-running story (ADR-0013,
docs/architecture/long-running.md) deployed. Not imported by the app or tests —
used only when you deploy it:
modal run modal_app.py # one-off episode
modal deploy modal_app.py # schedule it (hourly by default)
"""
from __future__ import annotations
import modal
APP_NAME = "multi-agent-land"
DEFAULT_SCENARIO = "thousand-token-wood"
TICKS_PER_EPISODE = 60
# Mount the whole repo (code + config/) so the registry resolves config via the
# same __file__ logic it uses locally. Deps come from pyproject.toml (single
# source of truth, pinned locally by uv.lock).
image = (
modal.Image.debian_slim()
.pip_install_from_pyproject("pyproject.toml")
.add_local_dir(
".",
remote_path="/root",
ignore=["__pycache__", "*.pyc", ".venv", ".git", "runs", "*.db", ".pytest_cache", ".ruff_cache"],
)
)
app = modal.App(APP_NAME)
volume = modal.Volume.from_name("multi-agent-land-runs", create_if_missing=True)
@app.function(image=image, volumes={"/data": volume}, schedule=modal.Cron("0 * * * *"))
def run_episode(scenario_name: str = DEFAULT_SCENARIO, n_ticks: int = TICKS_PER_EPISODE) -> dict:
import os
from pathlib import Path
from src import observability as obs
from src.core.conductor import Conductor
from src.core.ledger_factory import database_url
from src.core.registry import default_registry
from src.core.sqlite_ledger import SQLiteLedger
from src.tools.builtins import default_tool_registry
obs.configure()
db_path = f"/data/{scenario_name}.db"
reg = default_registry()
# Durable event store when DATABASE_URL is set (ADR-0014); otherwise the
# SQLite ledger on the persistent Modal Volume (the default deployment path).
if database_url():
from src.core.sqlalchemy_ledger import SqlAlchemyLedger
ledger = SqlAlchemyLedger.from_file(os.environ["DATABASE_URL"])
else:
ledger = SQLiteLedger.from_file(db_path) if Path(db_path).exists() else SQLiteLedger(db_path)
conductor = Conductor(
reg.build_scenario(scenario_name, tools=default_tool_registry()),
governor=reg.governor_for(scenario_name),
ledger=ledger,
snapshot_every=20,
snapshot_path=f"/data/{scenario_name}.snapshot.db",
)
with obs.span("modal.run_episode", **{"scenario": scenario_name, "modal.n_ticks": n_ticks}):
restored = conductor.restore()
if not restored:
conductor.reset(conductor.scenario.default_seed)
obs.log(
"modal.episode.start",
scenario=scenario_name,
n_ticks=n_ticks,
restored=restored,
durable=bool(database_url()),
)
conductor.step(n_ticks=n_ticks)
ledger.close()
volume.commit() # persist the ledger for the next scheduled run
stats = conductor.governor.stats
obs.log("modal.episode.done", scenario=scenario_name, turn=conductor.turn, stats=stats)
return {"scenario": scenario_name, "turn": conductor.turn, "stats": stats}
@app.local_entrypoint()
def main() -> None:
print(run_episode.remote())