Spaces:
Sleeping
Sleeping
File size: 11,692 Bytes
d0d2f42 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 | """
agents/cli.py — CLI interactivo para DocOps Agent con HITL y persistencia SQLite
Clase 12: Human-in-the-Loop real desde terminal
Uso:
python agents/cli.py # nuevo thread automático
python agents/cli.py --thread mi-sesion # retomar thread existente
python agents/cli.py --db /ruta/custom.db # base de datos personalizada
Comandos en sesión:
/new Crear nuevo thread
/thread <id> Cambiar a thread existente
/threads Listar todos los threads guardados
/history Ver historial de pasos del thread actual
/exit Salir
"""
import argparse
import sqlite3
import sys
import uuid
from langchain_core.messages import HumanMessage
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.types import Command
from agents.multi_agent_graph import build_docops_agent
# ─── Configuración ───
DEFAULT_DB = "docops_checkpoints.db"
# ─── Colores ANSI ───
BOLD = "\033[1m"
DIM = "\033[2m"
RESET = "\033[0m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
RED = "\033[31m"
CYAN = "\033[36m"
MAGENTA = "\033[35m"
WHITE = "\033[97m"
BLUE = "\033[34m"
NODE_ICONS = {
"planner": ("📋", BLUE),
"retriever": ("🔍", CYAN),
"executor": ("⚙️ ", YELLOW),
"verifier": ("✅", MAGENTA),
"human_gate": ("🔐", RED),
}
# ─── UI helpers ─────────────────────────────────────────────
def banner(db_path: str):
print(f"\n{BOLD}{WHITE}{'═'*62}{RESET}")
print(f"{BOLD}{WHITE} DocOps Agent CLI — HITL Interactivo{RESET}")
print(f"{BOLD}{WHITE} Persistencia: {DIM}SQLite → {db_path}{RESET}")
print(f"{BOLD}{WHITE}{'═'*62}{RESET}")
print(f" {DIM}Comandos:{RESET} "
f"{CYAN}/new{RESET} "
f"{CYAN}/thread <id>{RESET} "
f"{CYAN}/threads{RESET} "
f"{CYAN}/history{RESET} "
f"{CYAN}/exit{RESET}")
print(f"{BOLD}{WHITE}{'═'*62}{RESET}\n")
def print_node_event(node_name: str, node_output: dict):
icon, color = NODE_ICONS.get(node_name, ("▸", WHITE))
score_str = ""
if isinstance(node_output, dict) and "quality_score" in node_output:
s = node_output["quality_score"]
sc = GREEN if s >= 0.8 else YELLOW if s >= 0.6 else RED
score_str = f" {sc}score={s:.2f}{RESET}"
print(f" {color}{icon} {node_name}{RESET}{score_str}")
def print_interrupt_panel(payload: dict):
risk = payload.get("risk_level", "?")
score = payload.get("quality_score", 0.0)
draft = payload.get("draft_preview", "")
rc = RED if risk == "critical" else YELLOW
print(f"\n{rc}{BOLD}{'─'*62}{RESET}")
print(f"{rc}{BOLD} ⏸ PAUSA — Aprobación humana requerida{RESET}")
print(f"{rc}{BOLD}{'─'*62}{RESET}")
print(f" {DIM}Nivel de riesgo:{RESET} {rc}{BOLD}{risk.upper()}{RESET} "
f"{DIM}Quality score:{RESET} {score:.2f}")
print(f"\n{DIM} Draft generado:{RESET}")
for line in draft.split("\n"):
print(f" {line}")
print(f"\n {GREEN}[a]{RESET} Aprobar tal cual")
print(f" {YELLOW}[e]{RESET} Editar draft antes de aprobar")
print(f" {RED}[r]{RESET} Rechazar")
print(f"{rc}{'─'*62}{RESET}")
def ask_decision(payload: dict) -> dict:
"""Solicita la decisión del humano y devuelve el dict para Command(resume=...)."""
print_interrupt_panel(payload)
while True:
try:
choice = input(f"\n{BOLD}Decisión [a/e/r]: {RESET}").strip().lower()
except (EOFError, KeyboardInterrupt):
print(f"\n{YELLOW}Sin input — aprobando automáticamente.{RESET}")
return {"approved": True}
if choice in ("a", "aprobar", "approve", "y", "s", "si", ""):
print(f"{GREEN}✓ Aprobado{RESET}")
return {"approved": True}
elif choice in ("e", "edit", "editar"):
print(f"{YELLOW}Escribe el draft corregido.")
print(f"{DIM}(Línea con solo '###' para terminar){RESET}")
lines = []
try:
while True:
line = input()
if line.strip() == "###":
break
lines.append(line)
except EOFError:
pass
edited = "\n".join(lines).strip()
if not edited:
print(f"{YELLOW}Draft vacío — aprobando sin cambios.{RESET}")
return {"approved": True}
print(f"{GREEN}✓ Draft editado ({len(edited)} chars){RESET}")
return {"approved": True, "edited_draft": edited}
elif choice in ("r", "reject", "rechazar", "n", "no"):
try:
reason = input(f"{RED}Razón del rechazo: {RESET}").strip()
except (EOFError, KeyboardInterrupt):
reason = "Rechazado por el supervisor"
print(f"{RED}✗ Rechazado{RESET}")
return {"approved": False, "reason": reason or "Rechazado por el supervisor"}
else:
print(f"{DIM}Opción no reconocida. Escribe a, e o r.{RESET}")
# ─── Lógica principal ────────────────────────────────────────
def get_interrupt_payload(agent, config: dict) -> dict | None:
"""Extrae el payload del interrupt pendiente, si existe."""
snapshot = agent.get_state(config)
if not snapshot.next:
return None
if hasattr(snapshot, "tasks"):
for task in snapshot.tasks:
if hasattr(task, "interrupts") and task.interrupts:
return task.interrupts[0].value
return None
def run_query(agent, thread_id: str, query: str):
"""
Ejecuta una consulta y gestiona el bucle HITL completo.
Retorna el dict de valores finales del estado.
"""
config = {"configurable": {"thread_id": thread_id}}
initial_state = {
"messages": [HumanMessage(content=query)],
"plan": "",
"search_results": "",
"draft": "",
"feedback": "",
"quality_score": 0.0,
"iteration": 0,
}
print(f"\n{DIM} thread:{RESET} {CYAN}{thread_id}{RESET}")
print(f"{CYAN}{BOLD} Procesando...{RESET}\n")
# Primera ejecución con stream para mostrar progreso
for event in agent.stream(initial_state, config, stream_mode="updates"):
for node_name, node_output in event.items():
print_node_event(node_name, node_output)
# Bucle HITL: puede haber múltiples interrupts (p.ej. human_gate_strict)
while True:
payload = get_interrupt_payload(agent, config)
if payload is None:
break
decision = ask_decision(payload)
print(f"\n{CYAN}{BOLD} Reanudando...{RESET}\n")
for event in agent.stream(Command(resume=decision), config, stream_mode="updates"):
for node_name, node_output in event.items():
print_node_event(node_name, node_output)
return agent.get_state(config).values
def list_threads(conn: sqlite3.Connection):
try:
rows = conn.execute(
"SELECT thread_id, COUNT(*) as steps "
"FROM checkpoints GROUP BY thread_id ORDER BY thread_id"
).fetchall()
if not rows:
print(f" {DIM}No hay threads guardados.{RESET}\n")
else:
print(f"\n{BOLD}Threads en la base de datos:{RESET}")
for thread_id, steps in rows:
print(f" {CYAN}•{RESET} {thread_id} {DIM}({steps} checkpoints){RESET}")
print()
except Exception as e:
print(f" {RED}Error al listar threads: {e}{RESET}\n")
def show_history(agent, thread_id: str):
from memory.store import get_thread_history
history = get_thread_history(agent, thread_id, max_steps=15)
if not history:
print(f" {DIM}Sin historial para '{thread_id}'.{RESET}\n")
return
print(f"\n{BOLD}Historial de '{CYAN}{thread_id}{RESET}{BOLD}':{RESET}")
for step in history:
score = step.get("quality_score")
score_str = f"{score:.2f}" if score is not None else " — "
sc = GREEN if (score or 0) >= 0.8 else YELLOW if (score or 0) >= 0.6 else RED
print(f" {DIM}paso {step['step']:2d}{RESET} → "
f"{WHITE}{step['next_node']:<15}{RESET} "
f"score={sc}{score_str}{RESET} "
f"{DIM}msgs={step['message_count']}{RESET}")
print()
# ─── Entrypoint ──────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="DocOps Agent CLI con HITL")
parser.add_argument("--thread", default=None, help="ID del thread a retomar")
parser.add_argument("--db", default=DEFAULT_DB, help="Ruta al archivo SQLite")
args = parser.parse_args()
db_path = args.db
banner(db_path)
# Abrir conexión SQLite persistente (sobrevive entre ejecuciones)
conn = sqlite3.connect(db_path, check_same_thread=False)
cp = SqliteSaver(conn)
cp.setup()
# Construir agente con el checkpointer SQLite
agent = build_docops_agent(cp=cp)
# Thread inicial
thread_id = args.thread or f"docops-{uuid.uuid4().hex[:8]}"
print(f"Thread activo: {CYAN}{BOLD}{thread_id}{RESET}\n")
try:
while True:
try:
raw = input(f"{BOLD}[{thread_id[:20]}]> {RESET}").strip()
except (EOFError, KeyboardInterrupt):
print(f"\n{DIM}Saliendo...{RESET}")
break
if not raw:
continue
# ─── Comandos ───
if raw == "/exit":
print(f"{DIM}¡Hasta luego!{RESET}")
break
elif raw == "/new":
thread_id = f"docops-{uuid.uuid4().hex[:8]}"
print(f"{GREEN}Nuevo thread: {CYAN}{thread_id}{RESET}\n")
elif raw.startswith("/thread "):
thread_id = raw.split(" ", 1)[1].strip()
print(f"{GREEN}Thread activo: {CYAN}{thread_id}{RESET}\n")
elif raw == "/threads":
list_threads(conn)
elif raw == "/history":
show_history(agent, thread_id)
elif raw.startswith("/"):
print(f"{DIM}Comando desconocido. Disponibles: "
f"/new /thread <id> /threads /history /exit{RESET}\n")
# ─── Consulta al agente ───
else:
try:
result = run_query(agent, thread_id, raw)
draft = result.get("draft", "")
score = result.get("quality_score", 0.0)
iterations = result.get("iteration", 0)
sc = GREEN if score >= 0.8 else YELLOW if score >= 0.6 else RED
print(f"\n{GREEN}{BOLD}{'─'*62}{RESET}")
print(f"{GREEN}{BOLD} Respuesta final{RESET} "
f"{DIM}score={sc}{score:.2f}{RESET}{DIM} iter={iterations}{RESET}")
print(f"{GREEN}{BOLD}{'─'*62}{RESET}")
print(f"{WHITE}{draft}{RESET}")
print(f"{GREEN}{'─'*62}{RESET}\n")
except KeyboardInterrupt:
print(f"\n{YELLOW}Consulta interrumpida.{RESET}\n")
except Exception as e:
print(f"\n{RED}Error: {e}{RESET}\n")
finally:
conn.close()
if __name__ == "__main__":
main()
|