import os import signal import subprocess import threading import time from collections import deque from pathlib import Path import gradio as gr RDAGENT_HOME = Path(os.environ.get("RDAGENT_HOME", "/home/rtivolle/Quant/RD-Agent")).expanduser() LOG_DIR = Path(os.environ.get("RDAGENT_LOG_DIR", RDAGENT_HOME / "git_ignore_folder" / "log")).expanduser() DEFAULT_COMMAND = os.environ.get("RDAGENT_COMMAND", "fin_quant") _PROCESS = None _LOCK = threading.Lock() _LOG_BUFFER = deque(maxlen=4000) def _append_log(line: str) -> None: _LOG_BUFFER.append(line.rstrip("\n")) def _reader_thread(pipe) -> None: try: for line in iter(pipe.readline, ""): if not line: break _append_log(line) finally: pipe.close() def _status() -> str: with _LOCK: if _PROCESS is None: return "idle" code = _PROCESS.poll() if code is None: return f"running (pid {_PROCESS.pid})" return f"finished (exit code {code})" def _latest_trace_folder() -> str: if not LOG_DIR.exists(): return "No log directory found yet." folders = [p for p in LOG_DIR.iterdir() if p.is_dir()] if not folders: return "No trace folder created yet." latest = max(folders, key=lambda p: p.stat().st_mtime) return str(latest) def start_agent(command_name: str) -> tuple[str, str, str]: global _PROCESS with _LOCK: if _PROCESS is not None and _PROCESS.poll() is None: return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder() if not RDAGENT_HOME.exists(): _append_log(f"RD-Agent home does not exist: {RDAGENT_HOME}") return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder() LOG_DIR.mkdir(parents=True, exist_ok=True) _LOG_BUFFER.clear() _append_log(f"Starting rdagent {command_name} in {RDAGENT_HOME}") env = os.environ.copy() env.setdefault("DO_NOT_TRACK", "1") env.setdefault("PREFECT_SERVER_ANALYTICS_ENABLED", "false") _PROCESS = subprocess.Popen( ["rdagent", command_name], cwd=RDAGENT_HOME, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, ) threading.Thread(target=_reader_thread, args=(_PROCESS.stdout,), daemon=True).start() time.sleep(1) return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder() def stop_agent() -> tuple[str, str, str]: global _PROCESS with _LOCK: if _PROCESS is None or _PROCESS.poll() is not None: return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder() _append_log("Stopping active RD-Agent run...") _PROCESS.send_signal(signal.SIGTERM) time.sleep(1) return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder() def poll_logs() -> tuple[str, str, str]: return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder() with gr.Blocks(title="RD-Agent Quant") as demo: gr.Markdown( """ # RD-Agent Quant Runner Launch and monitor the quantitative RD-Agent loop from a single Gradio app. This app expects `rdagent` to already be installed and configured, plus QLib data to be available. """ ) with gr.Row(): command = gr.Dropdown( choices=["fin_quant", "fin_factor", "fin_model"], value=DEFAULT_COMMAND, label="RD-Agent command", ) start_btn = gr.Button("Start", variant="primary") stop_btn = gr.Button("Stop") refresh_btn = gr.Button("Refresh") status = gr.Textbox(label="Status", value=_status(), interactive=False) trace_path = gr.Textbox(label="Latest trace folder", value=_latest_trace_folder(), interactive=False) logs = gr.Textbox(label="Logs", lines=24, max_lines=30, interactive=False) poller = gr.Timer(2) start_btn.click(start_agent, inputs=command, outputs=[status, logs, trace_path]) stop_btn.click(stop_agent, outputs=[status, logs, trace_path]) refresh_btn.click(poll_logs, outputs=[status, logs, trace_path]) demo.load(poll_logs, outputs=[status, logs, trace_path]) poller.tick(poll_logs, outputs=[status, logs, trace_path]) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", "7860")))