File size: 4,502 Bytes
acf77ab | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | from __future__ import annotations
import logging
import os
from dotenv import load_dotenv
load_dotenv()
import shutil
import threading
import time
from pathlib import Path
from typing import Any
from uuid import uuid4
from fastapi import FastAPI, Response
from openenv.core.env_server.http_server import create_app
from codeforge.environment import CodeForgeEnvironment
from codeforge.models import CodeForgeAction, CodeForgeObservation
from codeforge.tasks import TASKS
_log = logging.getLogger(__name__)
_corpus_path_str = os.environ.get("GROUNDLOOP_CORPUS_PATH")
_corpus_path = Path(_corpus_path_str) if _corpus_path_str else None
# ---------------------------------------------------------------------------
# Session-keyed environment pool (SYSTEM_DESIGN §15)
# ---------------------------------------------------------------------------
_lock = threading.Lock()
_sessions: dict[str, CodeForgeEnvironment] = {}
_session_access: dict[str, float] = {} # session_id → last access timestamp
_MAX_SESSIONS = int(os.environ.get("CODEFORGE_MAX_SESSIONS", "10"))
_SESSION_TTL_S = int(os.environ.get("CODEFORGE_SESSION_TTL", "3600"))
def _get_or_create_env() -> CodeForgeEnvironment:
"""For OpenEnv compliance — creates a single-session env.
The session pool below is used by the MCP server layer.
"""
return CodeForgeEnvironment(corpus_path=_corpus_path)
def _expire_stale_sessions() -> None:
"""Remove sessions older than TTL. Must hold _lock."""
now = time.monotonic()
expired = [
sid for sid, ts in _session_access.items()
if now - ts > _SESSION_TTL_S
]
for sid in expired:
_sessions.pop(sid, None)
_session_access.pop(sid, None)
def get_session(session_id: str) -> CodeForgeEnvironment | None:
"""Retrieve an existing session by ID. Returns None if expired or missing."""
with _lock:
_expire_stale_sessions()
env = _sessions.get(session_id)
if env is not None:
_session_access[session_id] = time.monotonic()
return env
def create_session() -> tuple[str, CodeForgeEnvironment]:
"""Create a new session. Evicts LRU session if at capacity."""
sid = uuid4().hex[:16]
env = CodeForgeEnvironment(corpus_path=_corpus_path)
now = time.monotonic()
with _lock:
_expire_stale_sessions()
if len(_sessions) >= _MAX_SESSIONS:
# Evict least-recently-used session
lru_sid = min(_session_access, key=_session_access.get) # type: ignore[arg-type]
_sessions.pop(lru_sid, None)
_session_access.pop(lru_sid, None)
_sessions[sid] = env
_session_access[sid] = now
return sid, env
# ---------------------------------------------------------------------------
# OpenEnv compliance app
# ---------------------------------------------------------------------------
app: FastAPI = create_app(_get_or_create_env, CodeForgeAction, CodeForgeObservation)
@app.get("/", summary="Health check")
def root() -> dict[str, str]:
return {"name": "code-forge", "version": "0.2.0", "status": "ok", "docs": "/docs"}
@app.get("/favicon.ico", include_in_schema=False)
def favicon() -> Response:
return Response(status_code=204)
@app.get("/tasks", summary="List tasks + action schema")
def list_tasks() -> dict[str, Any]:
return {
"tasks": [
{
"id": t.task_id,
"difficulty": t.task_level,
"brief": t.brief,
"target_score": t.target_score,
"max_budget": t.max_budget,
"tools": list(t.tools),
}
for t in TASKS
],
"action_schema": {
"action_types": [
"query_kb",
"query_cluster",
"interrogate",
"run_ralph",
"submit",
"get_audit",
],
},
}
@app.get("/health/deep", summary="Deep health check")
def health_check() -> dict[str, Any]:
"""Check all dependencies: corpus file, tools."""
checks: dict[str, bool] = {
"ruff": shutil.which("ruff") is not None,
"mypy": shutil.which("mypy") is not None,
"pytest": shutil.which("pytest") is not None,
}
corpus_ok = _corpus_path is not None and Path(_corpus_path).is_file()
checks["corpus"] = corpus_ok
all_ok = all(checks.values())
return {"status": "ok" if all_ok else "degraded", "checks": checks}
|