File size: 4,502 Bytes
acf77ab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
from __future__ import annotations

import logging
import os

from dotenv import load_dotenv

load_dotenv()
import shutil
import threading
import time
from pathlib import Path
from typing import Any
from uuid import uuid4

from fastapi import FastAPI, Response
from openenv.core.env_server.http_server import create_app

from codeforge.environment import CodeForgeEnvironment
from codeforge.models import CodeForgeAction, CodeForgeObservation
from codeforge.tasks import TASKS

_log = logging.getLogger(__name__)

_corpus_path_str = os.environ.get("GROUNDLOOP_CORPUS_PATH")
_corpus_path = Path(_corpus_path_str) if _corpus_path_str else None

# ---------------------------------------------------------------------------
# Session-keyed environment pool (SYSTEM_DESIGN §15)
# ---------------------------------------------------------------------------
_lock = threading.Lock()
_sessions: dict[str, CodeForgeEnvironment] = {}
_session_access: dict[str, float] = {}  # session_id → last access timestamp
_MAX_SESSIONS = int(os.environ.get("CODEFORGE_MAX_SESSIONS", "10"))
_SESSION_TTL_S = int(os.environ.get("CODEFORGE_SESSION_TTL", "3600"))


def _get_or_create_env() -> CodeForgeEnvironment:
    """For OpenEnv compliance — creates a single-session env.

    The session pool below is used by the MCP server layer.
    """
    return CodeForgeEnvironment(corpus_path=_corpus_path)


def _expire_stale_sessions() -> None:
    """Remove sessions older than TTL. Must hold _lock."""
    now = time.monotonic()
    expired = [
        sid for sid, ts in _session_access.items()
        if now - ts > _SESSION_TTL_S
    ]
    for sid in expired:
        _sessions.pop(sid, None)
        _session_access.pop(sid, None)


def get_session(session_id: str) -> CodeForgeEnvironment | None:
    """Retrieve an existing session by ID. Returns None if expired or missing."""
    with _lock:
        _expire_stale_sessions()
        env = _sessions.get(session_id)
        if env is not None:
            _session_access[session_id] = time.monotonic()
        return env


def create_session() -> tuple[str, CodeForgeEnvironment]:
    """Create a new session. Evicts LRU session if at capacity."""
    sid = uuid4().hex[:16]
    env = CodeForgeEnvironment(corpus_path=_corpus_path)
    now = time.monotonic()
    with _lock:
        _expire_stale_sessions()
        if len(_sessions) >= _MAX_SESSIONS:
            # Evict least-recently-used session
            lru_sid = min(_session_access, key=_session_access.get)  # type: ignore[arg-type]
            _sessions.pop(lru_sid, None)
            _session_access.pop(lru_sid, None)
        _sessions[sid] = env
        _session_access[sid] = now
    return sid, env


# ---------------------------------------------------------------------------
# OpenEnv compliance app
# ---------------------------------------------------------------------------
app: FastAPI = create_app(_get_or_create_env, CodeForgeAction, CodeForgeObservation)


@app.get("/", summary="Health check")
def root() -> dict[str, str]:
    return {"name": "code-forge", "version": "0.2.0", "status": "ok", "docs": "/docs"}


@app.get("/favicon.ico", include_in_schema=False)
def favicon() -> Response:
    return Response(status_code=204)


@app.get("/tasks", summary="List tasks + action schema")
def list_tasks() -> dict[str, Any]:
    return {
        "tasks": [
            {
                "id": t.task_id,
                "difficulty": t.task_level,
                "brief": t.brief,
                "target_score": t.target_score,
                "max_budget": t.max_budget,
                "tools": list(t.tools),
            }
            for t in TASKS
        ],
        "action_schema": {
            "action_types": [
                "query_kb",
                "query_cluster",
                "interrogate",
                "run_ralph",
                "submit",
                "get_audit",
            ],
        },
    }


@app.get("/health/deep", summary="Deep health check")
def health_check() -> dict[str, Any]:
    """Check all dependencies: corpus file, tools."""
    checks: dict[str, bool] = {
        "ruff": shutil.which("ruff") is not None,
        "mypy": shutil.which("mypy") is not None,
        "pytest": shutil.which("pytest") is not None,
    }
    corpus_ok = _corpus_path is not None and Path(_corpus_path).is_file()
    checks["corpus"] = corpus_ok
    all_ok = all(checks.values())
    return {"status": "ok" if all_ok else "degraded", "checks": checks}