| import os |
| import signal |
| import subprocess |
| import threading |
| import time |
| from collections import deque |
| from pathlib import Path |
|
|
| import gradio as gr |
|
|
|
|
| RDAGENT_HOME = Path(os.environ.get("RDAGENT_HOME", "/home/rtivolle/Quant/RD-Agent")).expanduser() |
| LOG_DIR = Path(os.environ.get("RDAGENT_LOG_DIR", RDAGENT_HOME / "git_ignore_folder" / "log")).expanduser() |
| DEFAULT_COMMAND = os.environ.get("RDAGENT_COMMAND", "fin_quant") |
|
|
| _PROCESS = None |
| _LOCK = threading.Lock() |
| _LOG_BUFFER = deque(maxlen=4000) |
|
|
|
|
| def _append_log(line: str) -> None: |
| _LOG_BUFFER.append(line.rstrip("\n")) |
|
|
|
|
| def _reader_thread(pipe) -> None: |
| try: |
| for line in iter(pipe.readline, ""): |
| if not line: |
| break |
| _append_log(line) |
| finally: |
| pipe.close() |
|
|
|
|
| def _status() -> str: |
| with _LOCK: |
| if _PROCESS is None: |
| return "idle" |
| code = _PROCESS.poll() |
| if code is None: |
| return f"running (pid {_PROCESS.pid})" |
| return f"finished (exit code {code})" |
|
|
|
|
| def _latest_trace_folder() -> str: |
| if not LOG_DIR.exists(): |
| return "No log directory found yet." |
| folders = [p for p in LOG_DIR.iterdir() if p.is_dir()] |
| if not folders: |
| return "No trace folder created yet." |
| latest = max(folders, key=lambda p: p.stat().st_mtime) |
| return str(latest) |
|
|
|
|
| def start_agent(command_name: str) -> tuple[str, str, str]: |
| global _PROCESS |
| with _LOCK: |
| if _PROCESS is not None and _PROCESS.poll() is None: |
| return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder() |
|
|
| if not RDAGENT_HOME.exists(): |
| _append_log(f"RD-Agent home does not exist: {RDAGENT_HOME}") |
| return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder() |
|
|
| LOG_DIR.mkdir(parents=True, exist_ok=True) |
| _LOG_BUFFER.clear() |
| _append_log(f"Starting rdagent {command_name} in {RDAGENT_HOME}") |
|
|
| env = os.environ.copy() |
| env.setdefault("DO_NOT_TRACK", "1") |
| env.setdefault("PREFECT_SERVER_ANALYTICS_ENABLED", "false") |
|
|
| _PROCESS = subprocess.Popen( |
| ["rdagent", command_name], |
| cwd=RDAGENT_HOME, |
| env=env, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| text=True, |
| bufsize=1, |
| ) |
| threading.Thread(target=_reader_thread, args=(_PROCESS.stdout,), daemon=True).start() |
|
|
| time.sleep(1) |
| return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder() |
|
|
|
|
| def stop_agent() -> tuple[str, str, str]: |
| global _PROCESS |
| with _LOCK: |
| if _PROCESS is None or _PROCESS.poll() is not None: |
| return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder() |
|
|
| _append_log("Stopping active RD-Agent run...") |
| _PROCESS.send_signal(signal.SIGTERM) |
|
|
| time.sleep(1) |
| return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder() |
|
|
|
|
| def poll_logs() -> tuple[str, str, str]: |
| return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder() |
|
|
|
|
| with gr.Blocks(title="RD-Agent Quant") as demo: |
| gr.Markdown( |
| """ |
| # RD-Agent Quant Runner |
| Launch and monitor the quantitative RD-Agent loop from a single Gradio app. |
| This app expects `rdagent` to already be installed and configured, plus QLib data to be available. |
| """ |
| ) |
|
|
| with gr.Row(): |
| command = gr.Dropdown( |
| choices=["fin_quant", "fin_factor", "fin_model"], |
| value=DEFAULT_COMMAND, |
| label="RD-Agent command", |
| ) |
| start_btn = gr.Button("Start", variant="primary") |
| stop_btn = gr.Button("Stop") |
| refresh_btn = gr.Button("Refresh") |
|
|
| status = gr.Textbox(label="Status", value=_status(), interactive=False) |
| trace_path = gr.Textbox(label="Latest trace folder", value=_latest_trace_folder(), interactive=False) |
| logs = gr.Textbox(label="Logs", lines=24, max_lines=30, interactive=False) |
| poller = gr.Timer(2) |
|
|
| start_btn.click(start_agent, inputs=command, outputs=[status, logs, trace_path]) |
| stop_btn.click(stop_agent, outputs=[status, logs, trace_path]) |
| refresh_btn.click(poll_logs, outputs=[status, logs, trace_path]) |
| demo.load(poll_logs, outputs=[status, logs, trace_path]) |
| poller.tick(poll_logs, outputs=[status, logs, trace_path]) |
|
|
|
|
| if __name__ == "__main__": |
| demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", "7860"))) |
|
|
|
|