File size: 4,428 Bytes
efb94c0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import os
import signal
import subprocess
import threading
import time
from collections import deque
from pathlib import Path

import gradio as gr


RDAGENT_HOME = Path(os.environ.get("RDAGENT_HOME", "/home/rtivolle/Quant/RD-Agent")).expanduser()
LOG_DIR = Path(os.environ.get("RDAGENT_LOG_DIR", RDAGENT_HOME / "git_ignore_folder" / "log")).expanduser()
DEFAULT_COMMAND = os.environ.get("RDAGENT_COMMAND", "fin_quant")

_PROCESS = None
_LOCK = threading.Lock()
_LOG_BUFFER = deque(maxlen=4000)


def _append_log(line: str) -> None:
    _LOG_BUFFER.append(line.rstrip("\n"))


def _reader_thread(pipe) -> None:
    try:
        for line in iter(pipe.readline, ""):
            if not line:
                break
            _append_log(line)
    finally:
        pipe.close()


def _status() -> str:
    with _LOCK:
        if _PROCESS is None:
            return "idle"
        code = _PROCESS.poll()
        if code is None:
            return f"running (pid {_PROCESS.pid})"
        return f"finished (exit code {code})"


def _latest_trace_folder() -> str:
    if not LOG_DIR.exists():
        return "No log directory found yet."
    folders = [p for p in LOG_DIR.iterdir() if p.is_dir()]
    if not folders:
        return "No trace folder created yet."
    latest = max(folders, key=lambda p: p.stat().st_mtime)
    return str(latest)


def start_agent(command_name: str) -> tuple[str, str, str]:
    global _PROCESS
    with _LOCK:
        if _PROCESS is not None and _PROCESS.poll() is None:
            return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder()

        if not RDAGENT_HOME.exists():
            _append_log(f"RD-Agent home does not exist: {RDAGENT_HOME}")
            return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder()

        LOG_DIR.mkdir(parents=True, exist_ok=True)
        _LOG_BUFFER.clear()
        _append_log(f"Starting rdagent {command_name} in {RDAGENT_HOME}")

        env = os.environ.copy()
        env.setdefault("DO_NOT_TRACK", "1")
        env.setdefault("PREFECT_SERVER_ANALYTICS_ENABLED", "false")

        _PROCESS = subprocess.Popen(
            ["rdagent", command_name],
            cwd=RDAGENT_HOME,
            env=env,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            text=True,
            bufsize=1,
        )
        threading.Thread(target=_reader_thread, args=(_PROCESS.stdout,), daemon=True).start()

    time.sleep(1)
    return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder()


def stop_agent() -> tuple[str, str, str]:
    global _PROCESS
    with _LOCK:
        if _PROCESS is None or _PROCESS.poll() is not None:
            return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder()

        _append_log("Stopping active RD-Agent run...")
        _PROCESS.send_signal(signal.SIGTERM)

    time.sleep(1)
    return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder()


def poll_logs() -> tuple[str, str, str]:
    return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder()


with gr.Blocks(title="RD-Agent Quant") as demo:
    gr.Markdown(
        """
        # RD-Agent Quant Runner
        Launch and monitor the quantitative RD-Agent loop from a single Gradio app.
        This app expects `rdagent` to already be installed and configured, plus QLib data to be available.
        """
    )

    with gr.Row():
        command = gr.Dropdown(
            choices=["fin_quant", "fin_factor", "fin_model"],
            value=DEFAULT_COMMAND,
            label="RD-Agent command",
        )
        start_btn = gr.Button("Start", variant="primary")
        stop_btn = gr.Button("Stop")
        refresh_btn = gr.Button("Refresh")

    status = gr.Textbox(label="Status", value=_status(), interactive=False)
    trace_path = gr.Textbox(label="Latest trace folder", value=_latest_trace_folder(), interactive=False)
    logs = gr.Textbox(label="Logs", lines=24, max_lines=30, interactive=False)
    poller = gr.Timer(2)

    start_btn.click(start_agent, inputs=command, outputs=[status, logs, trace_path])
    stop_btn.click(stop_agent, outputs=[status, logs, trace_path])
    refresh_btn.click(poll_logs, outputs=[status, logs, trace_path])
    demo.load(poll_logs, outputs=[status, logs, trace_path])
    poller.tick(poll_logs, outputs=[status, logs, trace_path])


if __name__ == "__main__":
    demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", "7860")))