File size: 4,428 Bytes
efb94c0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 | import os
import signal
import subprocess
import threading
import time
from collections import deque
from pathlib import Path
import gradio as gr
RDAGENT_HOME = Path(os.environ.get("RDAGENT_HOME", "/home/rtivolle/Quant/RD-Agent")).expanduser()
LOG_DIR = Path(os.environ.get("RDAGENT_LOG_DIR", RDAGENT_HOME / "git_ignore_folder" / "log")).expanduser()
DEFAULT_COMMAND = os.environ.get("RDAGENT_COMMAND", "fin_quant")
_PROCESS = None
_LOCK = threading.Lock()
_LOG_BUFFER = deque(maxlen=4000)
def _append_log(line: str) -> None:
_LOG_BUFFER.append(line.rstrip("\n"))
def _reader_thread(pipe) -> None:
try:
for line in iter(pipe.readline, ""):
if not line:
break
_append_log(line)
finally:
pipe.close()
def _status() -> str:
with _LOCK:
if _PROCESS is None:
return "idle"
code = _PROCESS.poll()
if code is None:
return f"running (pid {_PROCESS.pid})"
return f"finished (exit code {code})"
def _latest_trace_folder() -> str:
if not LOG_DIR.exists():
return "No log directory found yet."
folders = [p for p in LOG_DIR.iterdir() if p.is_dir()]
if not folders:
return "No trace folder created yet."
latest = max(folders, key=lambda p: p.stat().st_mtime)
return str(latest)
def start_agent(command_name: str) -> tuple[str, str, str]:
global _PROCESS
with _LOCK:
if _PROCESS is not None and _PROCESS.poll() is None:
return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder()
if not RDAGENT_HOME.exists():
_append_log(f"RD-Agent home does not exist: {RDAGENT_HOME}")
return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder()
LOG_DIR.mkdir(parents=True, exist_ok=True)
_LOG_BUFFER.clear()
_append_log(f"Starting rdagent {command_name} in {RDAGENT_HOME}")
env = os.environ.copy()
env.setdefault("DO_NOT_TRACK", "1")
env.setdefault("PREFECT_SERVER_ANALYTICS_ENABLED", "false")
_PROCESS = subprocess.Popen(
["rdagent", command_name],
cwd=RDAGENT_HOME,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
threading.Thread(target=_reader_thread, args=(_PROCESS.stdout,), daemon=True).start()
time.sleep(1)
return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder()
def stop_agent() -> tuple[str, str, str]:
global _PROCESS
with _LOCK:
if _PROCESS is None or _PROCESS.poll() is not None:
return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder()
_append_log("Stopping active RD-Agent run...")
_PROCESS.send_signal(signal.SIGTERM)
time.sleep(1)
return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder()
def poll_logs() -> tuple[str, str, str]:
return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder()
with gr.Blocks(title="RD-Agent Quant") as demo:
gr.Markdown(
"""
# RD-Agent Quant Runner
Launch and monitor the quantitative RD-Agent loop from a single Gradio app.
This app expects `rdagent` to already be installed and configured, plus QLib data to be available.
"""
)
with gr.Row():
command = gr.Dropdown(
choices=["fin_quant", "fin_factor", "fin_model"],
value=DEFAULT_COMMAND,
label="RD-Agent command",
)
start_btn = gr.Button("Start", variant="primary")
stop_btn = gr.Button("Stop")
refresh_btn = gr.Button("Refresh")
status = gr.Textbox(label="Status", value=_status(), interactive=False)
trace_path = gr.Textbox(label="Latest trace folder", value=_latest_trace_folder(), interactive=False)
logs = gr.Textbox(label="Logs", lines=24, max_lines=30, interactive=False)
poller = gr.Timer(2)
start_btn.click(start_agent, inputs=command, outputs=[status, logs, trace_path])
stop_btn.click(stop_agent, outputs=[status, logs, trace_path])
refresh_btn.click(poll_logs, outputs=[status, logs, trace_path])
demo.load(poll_logs, outputs=[status, logs, trace_path])
poller.tick(poll_logs, outputs=[status, logs, trace_path])
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", "7860")))
|