testqlib / app.py
awax1122's picture
Create app.py
efb94c0 verified
import os
import signal
import subprocess
import threading
import time
from collections import deque
from pathlib import Path
import gradio as gr
RDAGENT_HOME = Path(os.environ.get("RDAGENT_HOME", "/home/rtivolle/Quant/RD-Agent")).expanduser()
LOG_DIR = Path(os.environ.get("RDAGENT_LOG_DIR", RDAGENT_HOME / "git_ignore_folder" / "log")).expanduser()
DEFAULT_COMMAND = os.environ.get("RDAGENT_COMMAND", "fin_quant")
_PROCESS = None
_LOCK = threading.Lock()
_LOG_BUFFER = deque(maxlen=4000)
def _append_log(line: str) -> None:
_LOG_BUFFER.append(line.rstrip("\n"))
def _reader_thread(pipe) -> None:
try:
for line in iter(pipe.readline, ""):
if not line:
break
_append_log(line)
finally:
pipe.close()
def _status() -> str:
with _LOCK:
if _PROCESS is None:
return "idle"
code = _PROCESS.poll()
if code is None:
return f"running (pid {_PROCESS.pid})"
return f"finished (exit code {code})"
def _latest_trace_folder() -> str:
if not LOG_DIR.exists():
return "No log directory found yet."
folders = [p for p in LOG_DIR.iterdir() if p.is_dir()]
if not folders:
return "No trace folder created yet."
latest = max(folders, key=lambda p: p.stat().st_mtime)
return str(latest)
def start_agent(command_name: str) -> tuple[str, str, str]:
global _PROCESS
with _LOCK:
if _PROCESS is not None and _PROCESS.poll() is None:
return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder()
if not RDAGENT_HOME.exists():
_append_log(f"RD-Agent home does not exist: {RDAGENT_HOME}")
return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder()
LOG_DIR.mkdir(parents=True, exist_ok=True)
_LOG_BUFFER.clear()
_append_log(f"Starting rdagent {command_name} in {RDAGENT_HOME}")
env = os.environ.copy()
env.setdefault("DO_NOT_TRACK", "1")
env.setdefault("PREFECT_SERVER_ANALYTICS_ENABLED", "false")
_PROCESS = subprocess.Popen(
["rdagent", command_name],
cwd=RDAGENT_HOME,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
threading.Thread(target=_reader_thread, args=(_PROCESS.stdout,), daemon=True).start()
time.sleep(1)
return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder()
def stop_agent() -> tuple[str, str, str]:
global _PROCESS
with _LOCK:
if _PROCESS is None or _PROCESS.poll() is not None:
return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder()
_append_log("Stopping active RD-Agent run...")
_PROCESS.send_signal(signal.SIGTERM)
time.sleep(1)
return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder()
def poll_logs() -> tuple[str, str, str]:
return _status(), "\n".join(_LOG_BUFFER), _latest_trace_folder()
with gr.Blocks(title="RD-Agent Quant") as demo:
gr.Markdown(
"""
# RD-Agent Quant Runner
Launch and monitor the quantitative RD-Agent loop from a single Gradio app.
This app expects `rdagent` to already be installed and configured, plus QLib data to be available.
"""
)
with gr.Row():
command = gr.Dropdown(
choices=["fin_quant", "fin_factor", "fin_model"],
value=DEFAULT_COMMAND,
label="RD-Agent command",
)
start_btn = gr.Button("Start", variant="primary")
stop_btn = gr.Button("Stop")
refresh_btn = gr.Button("Refresh")
status = gr.Textbox(label="Status", value=_status(), interactive=False)
trace_path = gr.Textbox(label="Latest trace folder", value=_latest_trace_folder(), interactive=False)
logs = gr.Textbox(label="Logs", lines=24, max_lines=30, interactive=False)
poller = gr.Timer(2)
start_btn.click(start_agent, inputs=command, outputs=[status, logs, trace_path])
stop_btn.click(stop_agent, outputs=[status, logs, trace_path])
refresh_btn.click(poll_logs, outputs=[status, logs, trace_path])
demo.load(poll_logs, outputs=[status, logs, trace_path])
poller.tick(poll_logs, outputs=[status, logs, trace_path])
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", "7860")))