RamsesCamas's picture
Initial clean commit for HF Space deployment
d0d2f42
"""
agents/cli.py — CLI interactivo para DocOps Agent con HITL y persistencia SQLite
Clase 12: Human-in-the-Loop real desde terminal
Uso:
python agents/cli.py # nuevo thread automático
python agents/cli.py --thread mi-sesion # retomar thread existente
python agents/cli.py --db /ruta/custom.db # base de datos personalizada
Comandos en sesión:
/new Crear nuevo thread
/thread <id> Cambiar a thread existente
/threads Listar todos los threads guardados
/history Ver historial de pasos del thread actual
/exit Salir
"""
import argparse
import sqlite3
import sys
import uuid
from langchain_core.messages import HumanMessage
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.types import Command
from agents.multi_agent_graph import build_docops_agent
# ─── Configuración ───
DEFAULT_DB = "docops_checkpoints.db"
# ─── Colores ANSI ───
BOLD = "\033[1m"
DIM = "\033[2m"
RESET = "\033[0m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
RED = "\033[31m"
CYAN = "\033[36m"
MAGENTA = "\033[35m"
WHITE = "\033[97m"
BLUE = "\033[34m"
NODE_ICONS = {
"planner": ("📋", BLUE),
"retriever": ("🔍", CYAN),
"executor": ("⚙️ ", YELLOW),
"verifier": ("✅", MAGENTA),
"human_gate": ("🔐", RED),
}
# ─── UI helpers ─────────────────────────────────────────────
def banner(db_path: str):
print(f"\n{BOLD}{WHITE}{'═'*62}{RESET}")
print(f"{BOLD}{WHITE} DocOps Agent CLI — HITL Interactivo{RESET}")
print(f"{BOLD}{WHITE} Persistencia: {DIM}SQLite → {db_path}{RESET}")
print(f"{BOLD}{WHITE}{'═'*62}{RESET}")
print(f" {DIM}Comandos:{RESET} "
f"{CYAN}/new{RESET} "
f"{CYAN}/thread <id>{RESET} "
f"{CYAN}/threads{RESET} "
f"{CYAN}/history{RESET} "
f"{CYAN}/exit{RESET}")
print(f"{BOLD}{WHITE}{'═'*62}{RESET}\n")
def print_node_event(node_name: str, node_output: dict):
icon, color = NODE_ICONS.get(node_name, ("▸", WHITE))
score_str = ""
if isinstance(node_output, dict) and "quality_score" in node_output:
s = node_output["quality_score"]
sc = GREEN if s >= 0.8 else YELLOW if s >= 0.6 else RED
score_str = f" {sc}score={s:.2f}{RESET}"
print(f" {color}{icon} {node_name}{RESET}{score_str}")
def print_interrupt_panel(payload: dict):
risk = payload.get("risk_level", "?")
score = payload.get("quality_score", 0.0)
draft = payload.get("draft_preview", "")
rc = RED if risk == "critical" else YELLOW
print(f"\n{rc}{BOLD}{'─'*62}{RESET}")
print(f"{rc}{BOLD} ⏸ PAUSA — Aprobación humana requerida{RESET}")
print(f"{rc}{BOLD}{'─'*62}{RESET}")
print(f" {DIM}Nivel de riesgo:{RESET} {rc}{BOLD}{risk.upper()}{RESET} "
f"{DIM}Quality score:{RESET} {score:.2f}")
print(f"\n{DIM} Draft generado:{RESET}")
for line in draft.split("\n"):
print(f" {line}")
print(f"\n {GREEN}[a]{RESET} Aprobar tal cual")
print(f" {YELLOW}[e]{RESET} Editar draft antes de aprobar")
print(f" {RED}[r]{RESET} Rechazar")
print(f"{rc}{'─'*62}{RESET}")
def ask_decision(payload: dict) -> dict:
"""Solicita la decisión del humano y devuelve el dict para Command(resume=...)."""
print_interrupt_panel(payload)
while True:
try:
choice = input(f"\n{BOLD}Decisión [a/e/r]: {RESET}").strip().lower()
except (EOFError, KeyboardInterrupt):
print(f"\n{YELLOW}Sin input — aprobando automáticamente.{RESET}")
return {"approved": True}
if choice in ("a", "aprobar", "approve", "y", "s", "si", ""):
print(f"{GREEN}✓ Aprobado{RESET}")
return {"approved": True}
elif choice in ("e", "edit", "editar"):
print(f"{YELLOW}Escribe el draft corregido.")
print(f"{DIM}(Línea con solo '###' para terminar){RESET}")
lines = []
try:
while True:
line = input()
if line.strip() == "###":
break
lines.append(line)
except EOFError:
pass
edited = "\n".join(lines).strip()
if not edited:
print(f"{YELLOW}Draft vacío — aprobando sin cambios.{RESET}")
return {"approved": True}
print(f"{GREEN}✓ Draft editado ({len(edited)} chars){RESET}")
return {"approved": True, "edited_draft": edited}
elif choice in ("r", "reject", "rechazar", "n", "no"):
try:
reason = input(f"{RED}Razón del rechazo: {RESET}").strip()
except (EOFError, KeyboardInterrupt):
reason = "Rechazado por el supervisor"
print(f"{RED}✗ Rechazado{RESET}")
return {"approved": False, "reason": reason or "Rechazado por el supervisor"}
else:
print(f"{DIM}Opción no reconocida. Escribe a, e o r.{RESET}")
# ─── Lógica principal ────────────────────────────────────────
def get_interrupt_payload(agent, config: dict) -> dict | None:
"""Extrae el payload del interrupt pendiente, si existe."""
snapshot = agent.get_state(config)
if not snapshot.next:
return None
if hasattr(snapshot, "tasks"):
for task in snapshot.tasks:
if hasattr(task, "interrupts") and task.interrupts:
return task.interrupts[0].value
return None
def run_query(agent, thread_id: str, query: str):
"""
Ejecuta una consulta y gestiona el bucle HITL completo.
Retorna el dict de valores finales del estado.
"""
config = {"configurable": {"thread_id": thread_id}}
initial_state = {
"messages": [HumanMessage(content=query)],
"plan": "",
"search_results": "",
"draft": "",
"feedback": "",
"quality_score": 0.0,
"iteration": 0,
}
print(f"\n{DIM} thread:{RESET} {CYAN}{thread_id}{RESET}")
print(f"{CYAN}{BOLD} Procesando...{RESET}\n")
# Primera ejecución con stream para mostrar progreso
for event in agent.stream(initial_state, config, stream_mode="updates"):
for node_name, node_output in event.items():
print_node_event(node_name, node_output)
# Bucle HITL: puede haber múltiples interrupts (p.ej. human_gate_strict)
while True:
payload = get_interrupt_payload(agent, config)
if payload is None:
break
decision = ask_decision(payload)
print(f"\n{CYAN}{BOLD} Reanudando...{RESET}\n")
for event in agent.stream(Command(resume=decision), config, stream_mode="updates"):
for node_name, node_output in event.items():
print_node_event(node_name, node_output)
return agent.get_state(config).values
def list_threads(conn: sqlite3.Connection):
try:
rows = conn.execute(
"SELECT thread_id, COUNT(*) as steps "
"FROM checkpoints GROUP BY thread_id ORDER BY thread_id"
).fetchall()
if not rows:
print(f" {DIM}No hay threads guardados.{RESET}\n")
else:
print(f"\n{BOLD}Threads en la base de datos:{RESET}")
for thread_id, steps in rows:
print(f" {CYAN}{RESET} {thread_id} {DIM}({steps} checkpoints){RESET}")
print()
except Exception as e:
print(f" {RED}Error al listar threads: {e}{RESET}\n")
def show_history(agent, thread_id: str):
from memory.store import get_thread_history
history = get_thread_history(agent, thread_id, max_steps=15)
if not history:
print(f" {DIM}Sin historial para '{thread_id}'.{RESET}\n")
return
print(f"\n{BOLD}Historial de '{CYAN}{thread_id}{RESET}{BOLD}':{RESET}")
for step in history:
score = step.get("quality_score")
score_str = f"{score:.2f}" if score is not None else " — "
sc = GREEN if (score or 0) >= 0.8 else YELLOW if (score or 0) >= 0.6 else RED
print(f" {DIM}paso {step['step']:2d}{RESET} → "
f"{WHITE}{step['next_node']:<15}{RESET} "
f"score={sc}{score_str}{RESET} "
f"{DIM}msgs={step['message_count']}{RESET}")
print()
# ─── Entrypoint ──────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="DocOps Agent CLI con HITL")
parser.add_argument("--thread", default=None, help="ID del thread a retomar")
parser.add_argument("--db", default=DEFAULT_DB, help="Ruta al archivo SQLite")
args = parser.parse_args()
db_path = args.db
banner(db_path)
# Abrir conexión SQLite persistente (sobrevive entre ejecuciones)
conn = sqlite3.connect(db_path, check_same_thread=False)
cp = SqliteSaver(conn)
cp.setup()
# Construir agente con el checkpointer SQLite
agent = build_docops_agent(cp=cp)
# Thread inicial
thread_id = args.thread or f"docops-{uuid.uuid4().hex[:8]}"
print(f"Thread activo: {CYAN}{BOLD}{thread_id}{RESET}\n")
try:
while True:
try:
raw = input(f"{BOLD}[{thread_id[:20]}]> {RESET}").strip()
except (EOFError, KeyboardInterrupt):
print(f"\n{DIM}Saliendo...{RESET}")
break
if not raw:
continue
# ─── Comandos ───
if raw == "/exit":
print(f"{DIM}¡Hasta luego!{RESET}")
break
elif raw == "/new":
thread_id = f"docops-{uuid.uuid4().hex[:8]}"
print(f"{GREEN}Nuevo thread: {CYAN}{thread_id}{RESET}\n")
elif raw.startswith("/thread "):
thread_id = raw.split(" ", 1)[1].strip()
print(f"{GREEN}Thread activo: {CYAN}{thread_id}{RESET}\n")
elif raw == "/threads":
list_threads(conn)
elif raw == "/history":
show_history(agent, thread_id)
elif raw.startswith("/"):
print(f"{DIM}Comando desconocido. Disponibles: "
f"/new /thread <id> /threads /history /exit{RESET}\n")
# ─── Consulta al agente ───
else:
try:
result = run_query(agent, thread_id, raw)
draft = result.get("draft", "")
score = result.get("quality_score", 0.0)
iterations = result.get("iteration", 0)
sc = GREEN if score >= 0.8 else YELLOW if score >= 0.6 else RED
print(f"\n{GREEN}{BOLD}{'─'*62}{RESET}")
print(f"{GREEN}{BOLD} Respuesta final{RESET} "
f"{DIM}score={sc}{score:.2f}{RESET}{DIM} iter={iterations}{RESET}")
print(f"{GREEN}{BOLD}{'─'*62}{RESET}")
print(f"{WHITE}{draft}{RESET}")
print(f"{GREEN}{'─'*62}{RESET}\n")
except KeyboardInterrupt:
print(f"\n{YELLOW}Consulta interrumpida.{RESET}\n")
except Exception as e:
print(f"\n{RED}Error: {e}{RESET}\n")
finally:
conn.close()
if __name__ == "__main__":
main()