""" memory/store.py — Configuración de checkpointing para DocOps Agent Clase 12: Memoria persistente Provee el checkpointer que persiste el estado del grafo. Cambiar de desarrollo a producción solo requiere cambiar la instancia. """ import os from dotenv import load_dotenv load_dotenv() # ─── SELECCIÓN DE CHECKPOINTER POR ENTORNO ─── ENVIRONMENT = os.getenv("DOCOPS_ENV", "development") def get_checkpointer(): """ Factory que retorna el checkpointer apropiado según el entorno. Entornos: - "development": MemorySaver (RAM, se pierde al morir el proceso) - "staging": SqliteSaver (archivo .db, sobrevive reinicios) - "production": PostgresSaver (PostgreSQL, multi-proceso, durable) """ if ENVIRONMENT == "production": try: from langgraph.checkpoint.postgres import PostgresSaver DATABASE_URL = os.getenv("DATABASE_URL") if not DATABASE_URL: raise ValueError( "DATABASE_URL requerido para entorno de producción" ) return PostgresSaver.from_conn_string(DATABASE_URL) except ImportError: print( "WARN: langgraph-checkpoint-postgres no instalado. " "Usando SqliteSaver como fallback." ) # Fallback a SQLite from langgraph.checkpoint.sqlite import SqliteSaver db_path = os.getenv("SQLITE_DB_PATH", "docops_checkpoints.db") return SqliteSaver.from_conn_string(db_path) elif ENVIRONMENT == "staging": from langgraph.checkpoint.sqlite import SqliteSaver db_path = os.getenv("SQLITE_DB_PATH", "docops_checkpoints.db") return SqliteSaver.from_conn_string(db_path) else: # development (default) from langgraph.checkpoint.memory import MemorySaver return MemorySaver() # ─── INSTANCIA GLOBAL ─── checkpointer = get_checkpointer() # ─── UTILIDADES DE ESTADO ─── def inspect_thread(graph, thread_id: str) -> dict: """ Inspecciona el estado actual de un thread. Args: graph: El grafo compilado (docops_agent) thread_id: ID del thread a inspeccionar Returns: dict con next_node, values y metadata """ config = {"configurable": {"thread_id": thread_id}} snapshot = graph.get_state(config) return { "thread_id": thread_id, "next_node": snapshot.next if snapshot.next else ["END"], "values": { k: (str(v)[:200] + "..." if len(str(v)) > 200 else v) for k, v in snapshot.values.items() if k != "messages" # Excluir mensajes para legibilidad }, "message_count": len(snapshot.values.get("messages", [])), "has_checkpoint": snapshot.config is not None, } def get_thread_history(graph, thread_id: str, max_steps: int = 20) -> list: """ Retorna el historial de pasos de un thread para auditoría. Args: graph: El grafo compilado thread_id: ID del thread max_steps: Máximo de pasos a retornar Returns: Lista de dicts con info de cada paso """ config = {"configurable": {"thread_id": thread_id}} history = [] for i, state in enumerate(graph.get_state_history(config)): if i >= max_steps: break history.append({ "step": i, "next_node": state.next[0] if state.next else "END", "quality_score": state.values.get("quality_score", None), "iteration": state.values.get("iteration", 0), "message_count": len(state.values.get("messages", [])), "config": state.config, }) return history def rollback_thread(graph, thread_id: str, steps_back: int = 1): """ Rebobina un thread N pasos hacia atrás. Args: graph: El grafo compilado thread_id: ID del thread steps_back: Cuántos pasos retroceder Returns: El config del checkpoint restaurado, o None si no hay historial """ config = {"configurable": {"thread_id": thread_id}} history = list(graph.get_state_history(config)) if steps_back >= len(history): print(f"Solo hay {len(history)} pasos. No se puede retroceder {steps_back}.") return None target = history[steps_back] graph.update_state(target.config, {}) print(f"Thread '{thread_id}' rebobinado {steps_back} paso(s).") return target.config # ─── MAIN (para testing) ─── if __name__ == "__main__": print(f"Entorno: {ENVIRONMENT}") print(f"Checkpointer: {type(checkpointer).__name__}") print("memory/store.py cargado correctamente.")