DGX_AI / codeforge /app.py
vasiuuu's picture
Initial commit for CodeForge GRPO training
acf77ab
from __future__ import annotations
import logging
import os
from dotenv import load_dotenv
load_dotenv()
import shutil
import threading
import time
from pathlib import Path
from typing import Any
from uuid import uuid4
from fastapi import FastAPI, Response
from openenv.core.env_server.http_server import create_app
from codeforge.environment import CodeForgeEnvironment
from codeforge.models import CodeForgeAction, CodeForgeObservation
from codeforge.tasks import TASKS
_log = logging.getLogger(__name__)
_corpus_path_str = os.environ.get("GROUNDLOOP_CORPUS_PATH")
_corpus_path = Path(_corpus_path_str) if _corpus_path_str else None
# ---------------------------------------------------------------------------
# Session-keyed environment pool (SYSTEM_DESIGN §15)
# ---------------------------------------------------------------------------
_lock = threading.Lock()
_sessions: dict[str, CodeForgeEnvironment] = {}
_session_access: dict[str, float] = {} # session_id → last access timestamp
_MAX_SESSIONS = int(os.environ.get("CODEFORGE_MAX_SESSIONS", "10"))
_SESSION_TTL_S = int(os.environ.get("CODEFORGE_SESSION_TTL", "3600"))
def _get_or_create_env() -> CodeForgeEnvironment:
"""For OpenEnv compliance — creates a single-session env.
The session pool below is used by the MCP server layer.
"""
return CodeForgeEnvironment(corpus_path=_corpus_path)
def _expire_stale_sessions() -> None:
"""Remove sessions older than TTL. Must hold _lock."""
now = time.monotonic()
expired = [
sid for sid, ts in _session_access.items()
if now - ts > _SESSION_TTL_S
]
for sid in expired:
_sessions.pop(sid, None)
_session_access.pop(sid, None)
def get_session(session_id: str) -> CodeForgeEnvironment | None:
"""Retrieve an existing session by ID. Returns None if expired or missing."""
with _lock:
_expire_stale_sessions()
env = _sessions.get(session_id)
if env is not None:
_session_access[session_id] = time.monotonic()
return env
def create_session() -> tuple[str, CodeForgeEnvironment]:
"""Create a new session. Evicts LRU session if at capacity."""
sid = uuid4().hex[:16]
env = CodeForgeEnvironment(corpus_path=_corpus_path)
now = time.monotonic()
with _lock:
_expire_stale_sessions()
if len(_sessions) >= _MAX_SESSIONS:
# Evict least-recently-used session
lru_sid = min(_session_access, key=_session_access.get) # type: ignore[arg-type]
_sessions.pop(lru_sid, None)
_session_access.pop(lru_sid, None)
_sessions[sid] = env
_session_access[sid] = now
return sid, env
# ---------------------------------------------------------------------------
# OpenEnv compliance app
# ---------------------------------------------------------------------------
app: FastAPI = create_app(_get_or_create_env, CodeForgeAction, CodeForgeObservation)
@app.get("/", summary="Health check")
def root() -> dict[str, str]:
return {"name": "code-forge", "version": "0.2.0", "status": "ok", "docs": "/docs"}
@app.get("/favicon.ico", include_in_schema=False)
def favicon() -> Response:
return Response(status_code=204)
@app.get("/tasks", summary="List tasks + action schema")
def list_tasks() -> dict[str, Any]:
return {
"tasks": [
{
"id": t.task_id,
"difficulty": t.task_level,
"brief": t.brief,
"target_score": t.target_score,
"max_budget": t.max_budget,
"tools": list(t.tools),
}
for t in TASKS
],
"action_schema": {
"action_types": [
"query_kb",
"query_cluster",
"interrogate",
"run_ralph",
"submit",
"get_audit",
],
},
}
@app.get("/health/deep", summary="Deep health check")
def health_check() -> dict[str, Any]:
"""Check all dependencies: corpus file, tools."""
checks: dict[str, bool] = {
"ruff": shutil.which("ruff") is not None,
"mypy": shutil.which("mypy") is not None,
"pytest": shutil.which("pytest") is not None,
}
corpus_ok = _corpus_path is not None and Path(_corpus_path).is_file()
checks["corpus"] = corpus_ok
all_ok = all(checks.values())
return {"status": "ok" if all_ok else "degraded", "checks": checks}