""" agents/cli.py — CLI interactivo para DocOps Agent con HITL y persistencia SQLite Clase 12: Human-in-the-Loop real desde terminal Uso: python agents/cli.py # nuevo thread automático python agents/cli.py --thread mi-sesion # retomar thread existente python agents/cli.py --db /ruta/custom.db # base de datos personalizada Comandos en sesión: /new Crear nuevo thread /thread Cambiar a thread existente /threads Listar todos los threads guardados /history Ver historial de pasos del thread actual /exit Salir """ import argparse import sqlite3 import sys import uuid from langchain_core.messages import HumanMessage from langgraph.checkpoint.sqlite import SqliteSaver from langgraph.types import Command from agents.multi_agent_graph import build_docops_agent # ─── Configuración ─── DEFAULT_DB = "docops_checkpoints.db" # ─── Colores ANSI ─── BOLD = "\033[1m" DIM = "\033[2m" RESET = "\033[0m" GREEN = "\033[32m" YELLOW = "\033[33m" RED = "\033[31m" CYAN = "\033[36m" MAGENTA = "\033[35m" WHITE = "\033[97m" BLUE = "\033[34m" NODE_ICONS = { "planner": ("📋", BLUE), "retriever": ("🔍", CYAN), "executor": ("⚙️ ", YELLOW), "verifier": ("✅", MAGENTA), "human_gate": ("🔐", RED), } # ─── UI helpers ───────────────────────────────────────────── def banner(db_path: str): print(f"\n{BOLD}{WHITE}{'═'*62}{RESET}") print(f"{BOLD}{WHITE} DocOps Agent CLI — HITL Interactivo{RESET}") print(f"{BOLD}{WHITE} Persistencia: {DIM}SQLite → {db_path}{RESET}") print(f"{BOLD}{WHITE}{'═'*62}{RESET}") print(f" {DIM}Comandos:{RESET} " f"{CYAN}/new{RESET} " f"{CYAN}/thread {RESET} " f"{CYAN}/threads{RESET} " f"{CYAN}/history{RESET} " f"{CYAN}/exit{RESET}") print(f"{BOLD}{WHITE}{'═'*62}{RESET}\n") def print_node_event(node_name: str, node_output: dict): icon, color = NODE_ICONS.get(node_name, ("▸", WHITE)) score_str = "" if isinstance(node_output, dict) and "quality_score" in node_output: s = node_output["quality_score"] sc = GREEN if s >= 0.8 else YELLOW if s >= 0.6 else RED score_str = f" {sc}score={s:.2f}{RESET}" print(f" {color}{icon} {node_name}{RESET}{score_str}") def print_interrupt_panel(payload: dict): risk = payload.get("risk_level", "?") score = payload.get("quality_score", 0.0) draft = payload.get("draft_preview", "") rc = RED if risk == "critical" else YELLOW print(f"\n{rc}{BOLD}{'─'*62}{RESET}") print(f"{rc}{BOLD} ⏸ PAUSA — Aprobación humana requerida{RESET}") print(f"{rc}{BOLD}{'─'*62}{RESET}") print(f" {DIM}Nivel de riesgo:{RESET} {rc}{BOLD}{risk.upper()}{RESET} " f"{DIM}Quality score:{RESET} {score:.2f}") print(f"\n{DIM} Draft generado:{RESET}") for line in draft.split("\n"): print(f" {line}") print(f"\n {GREEN}[a]{RESET} Aprobar tal cual") print(f" {YELLOW}[e]{RESET} Editar draft antes de aprobar") print(f" {RED}[r]{RESET} Rechazar") print(f"{rc}{'─'*62}{RESET}") def ask_decision(payload: dict) -> dict: """Solicita la decisión del humano y devuelve el dict para Command(resume=...).""" print_interrupt_panel(payload) while True: try: choice = input(f"\n{BOLD}Decisión [a/e/r]: {RESET}").strip().lower() except (EOFError, KeyboardInterrupt): print(f"\n{YELLOW}Sin input — aprobando automáticamente.{RESET}") return {"approved": True} if choice in ("a", "aprobar", "approve", "y", "s", "si", ""): print(f"{GREEN}✓ Aprobado{RESET}") return {"approved": True} elif choice in ("e", "edit", "editar"): print(f"{YELLOW}Escribe el draft corregido.") print(f"{DIM}(Línea con solo '###' para terminar){RESET}") lines = [] try: while True: line = input() if line.strip() == "###": break lines.append(line) except EOFError: pass edited = "\n".join(lines).strip() if not edited: print(f"{YELLOW}Draft vacío — aprobando sin cambios.{RESET}") return {"approved": True} print(f"{GREEN}✓ Draft editado ({len(edited)} chars){RESET}") return {"approved": True, "edited_draft": edited} elif choice in ("r", "reject", "rechazar", "n", "no"): try: reason = input(f"{RED}Razón del rechazo: {RESET}").strip() except (EOFError, KeyboardInterrupt): reason = "Rechazado por el supervisor" print(f"{RED}✗ Rechazado{RESET}") return {"approved": False, "reason": reason or "Rechazado por el supervisor"} else: print(f"{DIM}Opción no reconocida. Escribe a, e o r.{RESET}") # ─── Lógica principal ──────────────────────────────────────── def get_interrupt_payload(agent, config: dict) -> dict | None: """Extrae el payload del interrupt pendiente, si existe.""" snapshot = agent.get_state(config) if not snapshot.next: return None if hasattr(snapshot, "tasks"): for task in snapshot.tasks: if hasattr(task, "interrupts") and task.interrupts: return task.interrupts[0].value return None def run_query(agent, thread_id: str, query: str): """ Ejecuta una consulta y gestiona el bucle HITL completo. Retorna el dict de valores finales del estado. """ config = {"configurable": {"thread_id": thread_id}} initial_state = { "messages": [HumanMessage(content=query)], "plan": "", "search_results": "", "draft": "", "feedback": "", "quality_score": 0.0, "iteration": 0, } print(f"\n{DIM} thread:{RESET} {CYAN}{thread_id}{RESET}") print(f"{CYAN}{BOLD} Procesando...{RESET}\n") # Primera ejecución con stream para mostrar progreso for event in agent.stream(initial_state, config, stream_mode="updates"): for node_name, node_output in event.items(): print_node_event(node_name, node_output) # Bucle HITL: puede haber múltiples interrupts (p.ej. human_gate_strict) while True: payload = get_interrupt_payload(agent, config) if payload is None: break decision = ask_decision(payload) print(f"\n{CYAN}{BOLD} Reanudando...{RESET}\n") for event in agent.stream(Command(resume=decision), config, stream_mode="updates"): for node_name, node_output in event.items(): print_node_event(node_name, node_output) return agent.get_state(config).values def list_threads(conn: sqlite3.Connection): try: rows = conn.execute( "SELECT thread_id, COUNT(*) as steps " "FROM checkpoints GROUP BY thread_id ORDER BY thread_id" ).fetchall() if not rows: print(f" {DIM}No hay threads guardados.{RESET}\n") else: print(f"\n{BOLD}Threads en la base de datos:{RESET}") for thread_id, steps in rows: print(f" {CYAN}•{RESET} {thread_id} {DIM}({steps} checkpoints){RESET}") print() except Exception as e: print(f" {RED}Error al listar threads: {e}{RESET}\n") def show_history(agent, thread_id: str): from memory.store import get_thread_history history = get_thread_history(agent, thread_id, max_steps=15) if not history: print(f" {DIM}Sin historial para '{thread_id}'.{RESET}\n") return print(f"\n{BOLD}Historial de '{CYAN}{thread_id}{RESET}{BOLD}':{RESET}") for step in history: score = step.get("quality_score") score_str = f"{score:.2f}" if score is not None else " — " sc = GREEN if (score or 0) >= 0.8 else YELLOW if (score or 0) >= 0.6 else RED print(f" {DIM}paso {step['step']:2d}{RESET} → " f"{WHITE}{step['next_node']:<15}{RESET} " f"score={sc}{score_str}{RESET} " f"{DIM}msgs={step['message_count']}{RESET}") print() # ─── Entrypoint ────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="DocOps Agent CLI con HITL") parser.add_argument("--thread", default=None, help="ID del thread a retomar") parser.add_argument("--db", default=DEFAULT_DB, help="Ruta al archivo SQLite") args = parser.parse_args() db_path = args.db banner(db_path) # Abrir conexión SQLite persistente (sobrevive entre ejecuciones) conn = sqlite3.connect(db_path, check_same_thread=False) cp = SqliteSaver(conn) cp.setup() # Construir agente con el checkpointer SQLite agent = build_docops_agent(cp=cp) # Thread inicial thread_id = args.thread or f"docops-{uuid.uuid4().hex[:8]}" print(f"Thread activo: {CYAN}{BOLD}{thread_id}{RESET}\n") try: while True: try: raw = input(f"{BOLD}[{thread_id[:20]}]> {RESET}").strip() except (EOFError, KeyboardInterrupt): print(f"\n{DIM}Saliendo...{RESET}") break if not raw: continue # ─── Comandos ─── if raw == "/exit": print(f"{DIM}¡Hasta luego!{RESET}") break elif raw == "/new": thread_id = f"docops-{uuid.uuid4().hex[:8]}" print(f"{GREEN}Nuevo thread: {CYAN}{thread_id}{RESET}\n") elif raw.startswith("/thread "): thread_id = raw.split(" ", 1)[1].strip() print(f"{GREEN}Thread activo: {CYAN}{thread_id}{RESET}\n") elif raw == "/threads": list_threads(conn) elif raw == "/history": show_history(agent, thread_id) elif raw.startswith("/"): print(f"{DIM}Comando desconocido. Disponibles: " f"/new /thread /threads /history /exit{RESET}\n") # ─── Consulta al agente ─── else: try: result = run_query(agent, thread_id, raw) draft = result.get("draft", "") score = result.get("quality_score", 0.0) iterations = result.get("iteration", 0) sc = GREEN if score >= 0.8 else YELLOW if score >= 0.6 else RED print(f"\n{GREEN}{BOLD}{'─'*62}{RESET}") print(f"{GREEN}{BOLD} Respuesta final{RESET} " f"{DIM}score={sc}{score:.2f}{RESET}{DIM} iter={iterations}{RESET}") print(f"{GREEN}{BOLD}{'─'*62}{RESET}") print(f"{WHITE}{draft}{RESET}") print(f"{GREEN}{'─'*62}{RESET}\n") except KeyboardInterrupt: print(f"\n{YELLOW}Consulta interrumpida.{RESET}\n") except Exception as e: print(f"\n{RED}Error: {e}{RESET}\n") finally: conn.close() if __name__ == "__main__": main()