alembic / model_runtime.py
GitHub Actions
Deploy alembic from b8a1bc0
b68aeed
Raw
History Blame Contribute Delete
2.43 kB
"""HTTP client for the Alembic Modal backend."""
import os
import httpx
API_URL = os.environ.get("ALEMBIC_API_URL", "")
TIMEOUT = 120.0
async def _post(endpoint: str, payload: dict) -> dict:
"""Send POST request to the Modal backend."""
if not API_URL:
raise RuntimeError("ALEMBIC_API_URL not set and not in mock mode")
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
resp = await client.post(f"{API_URL}{endpoint}", json=payload)
resp.raise_for_status()
return resp.json()
async def chat(messages: list[dict], user_id: str, state: dict) -> dict:
"""Send chat messages to the agent and get next action.
Returns dict with keys:
action: "show_connect" | "show_app" | "building" | "error" | "reply"
message: str (assistant reply text)
data: dict (action-specific payload)
"""
return await _post("/chat", {
"messages": messages,
"user_id": user_id,
"state": state,
})
async def generate(requirements: str, design_prefs: dict, user_id: str) -> dict:
"""Trigger mini-app code generation.
Returns dict with keys:
status: "success" | "error"
app_url: str (deployed mini-app URL)
code: str (generated source)
"""
return await _post("/generate", {
"requirements": requirements,
"design_prefs": design_prefs,
"user_id": user_id,
})
async def fix(code: str, traceback: str) -> dict:
"""Send broken code + traceback for auto-fix.
Returns dict with keys:
status: "success" | "error"
code: str (fixed source)
explanation: str
"""
return await _post("/fix", {
"code": code,
"traceback": traceback,
})
async def get_connect_url(user_id: str, app_slug: str) -> str:
"""Get OAuth connect URL for a third-party service.
Returns the URL string the user should visit to authorize.
"""
result = await _post("/connect/url", {
"user_id": user_id,
"app_slug": app_slug,
})
return result.get("url", "")
async def is_connected(user_id: str, app_slug: str) -> bool:
"""Check if user has connected a third-party service.
Returns True if the service is authorized for this user.
"""
result = await _post("/connect/status", {
"user_id": user_id,
"app_slug": app_slug,
})
return result.get("connected", False)