DocAgentSystem / agents /docops_agent.py
RamsesCamas's picture
Initial clean commit for HF Space deployment
d0d2f42
"""Agente DocOps — pipeline secuencial para consultas sobre documentos.
Implementa un agente simple basado en pipeline lineal:
Retrieve → Process → Generate
Cada paso es una función decorada con ``@pipeline_step`` del módulo
de orquestación. El agente conecta las herramientas del ``ToolRegistry``
con un LLM (Groq vía OpenAI API) para responder preguntas usando
contexto recuperado de documentos reales indexados en ChromaDB.
Este es un agente "clásico" sin ciclos de razonamiento — ejecuta los
pasos en orden fijo y produce una respuesta.
"""
from __future__ import annotations
import logging
import os
import time
from dataclasses import dataclass, field
from dotenv import load_dotenv
from openai import OpenAI
from orchestration.pipelines import Pipeline, PipelineResult, pipeline_step
from orchestration.tools import (
ToolDefinition,
ToolRegistry,
get_current_datetime,
)
from rag.vectorstore import create_vectorstore, search, SearchResult
load_dotenv()
logger = logging.getLogger("agents.docops")
# ---------------------------------------------------------------------------
# Colores ANSI
# ---------------------------------------------------------------------------
BOLD = "\033[1m"
RESET = "\033[0m"
CYAN = "\033[96m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
MAGENTA = "\033[95m"
RED = "\033[91m"
DIM = "\033[2m"
BLUE = "\033[94m"
WHITE = "\033[97m"
# ---------------------------------------------------------------------------
# Configuración LLM (Groq vía OpenAI-compatible API)
# ---------------------------------------------------------------------------
_GROQ_BASE_URL = os.environ.get("GROQ_BASE_URL", "https://api.groq.com/openai/v1")
_GROQ_MODEL = os.environ.get("GROQ_MODEL", "openai/gpt-oss-120b")
_SYSTEM_PROMPT = (
"Eres un asistente de documentación técnica. "
"Responde basándote ÚNICAMENTE en el contexto proporcionado. "
"Si no encuentras la respuesta en el contexto, di 'No tengo "
"información suficiente para responder esa pregunta.' "
"Responde en español de forma clara y concisa."
)
def _print_header(title: str, color: str = CYAN) -> None:
print(f"\n{color}{BOLD}{'=' * 70}{RESET}")
print(f"{color}{BOLD} {title}{RESET}")
print(f"{color}{BOLD}{'=' * 70}{RESET}")
def _print_step(step_name: str, color: str, msg: str) -> None:
print(f"{color}{BOLD}[{step_name}]{RESET} {msg}")
def _print_metric(label: str, value: str, color: str = DIM) -> None:
print(f" {color}{label}: {RESET}{value}")
# ---------------------------------------------------------------------------
# Resultado del agente
# ---------------------------------------------------------------------------
@dataclass
class AgentResult:
"""Resultado de la ejecución del agente DocOps."""
query: str
context: str = ""
answer: str = ""
pipeline_result: PipelineResult | None = None
prompt_tokens: int = 0
completion_tokens: int = 0
retrieved_chunks: list = field(default_factory=list)
@property
def success(self) -> bool:
return self.pipeline_result is not None and self.pipeline_result.success
def summary(self) -> str:
status = f"{GREEN}SUCCESS{RESET}" if self.success else f"{RED}FAILED{RESET}"
total_tokens = self.prompt_tokens + self.completion_tokens
duration = self.pipeline_result.total_duration if self.pipeline_result else 0.0
lines = [
f"\n{WHITE}{BOLD}{'─' * 70}{RESET}",
f"{WHITE}{BOLD} RESUMEN DEL AGENTE{RESET}",
f"{WHITE}{BOLD}{'─' * 70}{RESET}",
f" Status: {status}",
f" Query: {self.query}",
f" Chunks usados: {len(self.retrieved_chunks)}",
f" Contexto: {len(self.context)} chars",
f" Tokens: {YELLOW}{total_tokens}{RESET} (prompt={self.prompt_tokens}, completion={self.completion_tokens})",
f" Duración: {YELLOW}{duration:.2f}s{RESET}",
]
if self.pipeline_result:
lines.append(f"\n {DIM}Desglose por paso:{RESET}")
step_names = ["RETRIEVE", "PROCESS", "GENERATE"]
step_colors = [CYAN, MAGENTA, GREEN]
for i, step in enumerate(self.pipeline_result.steps):
name = step_names[i] if i < len(step_names) else f"STEP {i+1}"
color = step_colors[i] if i < len(step_colors) else DIM
mark = f"{GREEN}OK{RESET}" if step.success else f"{RED}FAIL{RESET}"
lines.append(
f" {color}{BOLD}[{name}]{RESET} {mark}{step.duration_seconds:.3f}s"
)
if step.error:
lines.append(f" {RED}Error: {step.error}{RESET}")
if self.answer:
lines.append(f"\n {GREEN}{BOLD}Respuesta:{RESET}")
lines.append(f" {self.answer}")
lines.append(f"{WHITE}{BOLD}{'─' * 70}{RESET}")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Agente DocOps
# ---------------------------------------------------------------------------
class DocOpsAgent:
"""Agente secuencial para consultas sobre documentos.
Ejecuta un pipeline de 3 pasos:
1. **Retrieve**: busca documentos reales en ChromaDB via ToolRegistry.
2. **Process**: formatea el contexto recuperado con metadata.
3. **Generate**: envía el contexto + pregunta al LLM.
Args:
registry: Registro de herramientas. Si no se provee, construye uno
conectado a la colección ChromaDB ``novatech_docs``.
model: Modelo de Groq a utilizar.
temperature: Temperatura para la generación.
system_prompt: Prompt del sistema para el LLM.
collection_name: Nombre de la colección en ChromaDB.
chroma_dir: Directorio de persistencia de ChromaDB.
"""
def __init__(
self,
registry: ToolRegistry | None = None,
model: str | None = None,
temperature: float = 0.2,
system_prompt: str | None = None,
collection_name: str = "novatech_docs",
chroma_dir: str = "./chroma_db",
) -> None:
api_key = os.environ.get("GROQ_API_KEY", "")
if not api_key:
raise ValueError("Missing GROQ_API_KEY environment variable.")
self.client = OpenAI(api_key=api_key, base_url=_GROQ_BASE_URL)
self.model = model or _GROQ_MODEL
self.temperature = temperature
self.system_prompt = system_prompt or _SYSTEM_PROMPT
self.collection_name = collection_name
self.chroma_dir = chroma_dir
self.registry = registry or self._default_registry()
def run(self, query: str) -> AgentResult:
"""Ejecuta el pipeline secuencial para responder la consulta.
Args:
query: Pregunta del usuario.
Returns:
``AgentResult`` con contexto, respuesta y métricas.
"""
agent_result = AgentResult(query=query)
_print_header(f"QUERY: {query}")
retrieve = self._make_retrieve_step(agent_result)
process = self._make_process_step()
generate = self._make_generate_step(query, agent_result)
pipe = Pipeline(
name="docops",
steps=[retrieve, process, generate],
)
pipeline_result = pipe.run(query)
agent_result.pipeline_result = pipeline_result
if pipeline_result.success:
agent_result.answer = pipeline_result.final_output
else:
agent_result.answer = "Error: el pipeline no completó todos los pasos."
# Imprimir respuesta final
print(f"\n{GREEN}{BOLD} RESPUESTA:{RESET}")
print(f"{GREEN} {agent_result.answer}{RESET}")
return agent_result
def _make_retrieve_step(self, agent_result: AgentResult):
"""Crea el paso de recuperación de documentos reales."""
registry = self.registry
@pipeline_step(name="retrieve", max_retries=2, timeout_seconds=10)
def retrieve(query: str) -> list[dict]:
_print_step("RETRIEVE", CYAN, f"Buscando documentos para: '{query}'")
t0 = time.time()
raw = registry.execute_tool(
"search_documents", {"query": query, "top_k": 3}
)
elapsed = time.time() - t0
_print_metric("Tiempo de búsqueda", f"{elapsed:.3f}s", CYAN)
# Parsear los resultados (vienen como string serializado del tool)
# La herramienta real retorna una lista de dicts
if isinstance(raw, list):
chunks = raw
else:
# Si viene como string (del execute_tool), intentar evaluar
try:
import ast
chunks = ast.literal_eval(raw)
except (ValueError, SyntaxError):
chunks = [{"content": raw, "source": "raw", "score": 0.0}]
agent_result.retrieved_chunks = chunks
_print_metric("Chunks recuperados", str(len(chunks)), CYAN)
for i, chunk in enumerate(chunks):
source = os.path.basename(chunk.get("source", "?"))
score = chunk.get("score", 0.0)
preview = chunk.get("content", "")[:100].replace("\n", " ")
print(
f" {CYAN}{i+1}. [{score:.3f}] {BOLD}{source}{RESET}"
f" {DIM}{preview}...{RESET}"
)
return chunks
return retrieve
def _make_process_step(self):
"""Crea el paso de procesamiento del contexto."""
@pipeline_step(name="process", max_retries=1, timeout_seconds=5)
def process(chunks: list[dict]) -> str:
_print_step("PROCESS", MAGENTA, f"Formateando {len(chunks)} chunks como contexto")
context_parts = []
total_chars = 0
for i, chunk in enumerate(chunks):
source = os.path.basename(chunk.get("source", "desconocido"))
content = chunk.get("content", "")
block = f"[Fuente: {source}]\n{content}"
context_parts.append(block)
total_chars += len(content)
context = "\n\n---\n\n".join(context_parts)
# Agregar timestamp
try:
timestamp = self.registry.execute_tool("get_current_datetime", {})
context += f"\n\n[Consulta realizada: {timestamp}]"
except Exception:
pass
_print_metric("Caracteres de contexto", str(total_chars), MAGENTA)
_print_metric("Fuentes utilizadas",
", ".join(os.path.basename(c.get("source", "?")) for c in chunks),
MAGENTA)
return context
return process
def _make_generate_step(self, query: str, agent_result: AgentResult):
"""Crea el paso de generación con el LLM."""
client = self.client
model = self.model
temperature = self.temperature
system_prompt = self.system_prompt
@pipeline_step(name="generate", max_retries=2, timeout_seconds=30)
def generate(context: str) -> str:
agent_result.context = context
_print_step("GENERATE", GREEN, f"Enviando al LLM ({model})")
_print_metric("Temperatura", str(temperature), GREEN)
_print_metric("Contexto enviado", f"{len(context)} chars", GREEN)
t0 = time.time()
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system_prompt},
{
"role": "user",
"content": (
f"CONTEXTO:\n{context}\n\n"
f"PREGUNTA: {query}"
),
},
],
temperature=temperature,
max_tokens=1024,
)
llm_elapsed = time.time() - t0
usage = response.usage
prompt_tokens = 0
completion_tokens = 0
if usage:
prompt_tokens = getattr(usage, "prompt_tokens", 0) or 0
completion_tokens = getattr(usage, "completion_tokens", 0) or 0
agent_result.prompt_tokens = prompt_tokens
agent_result.completion_tokens = completion_tokens
answer = response.choices[0].message.content or ""
_print_metric("Latencia LLM", f"{llm_elapsed:.2f}s", GREEN)
_print_metric("Prompt tokens", str(prompt_tokens), GREEN)
_print_metric("Completion tokens", str(completion_tokens), GREEN)
_print_metric("Total tokens", str(prompt_tokens + completion_tokens), GREEN)
return answer
return generate
def _default_registry(self) -> ToolRegistry:
"""Crea un registro con herramientas conectadas a ChromaDB real."""
registry = ToolRegistry()
collection = create_vectorstore(self.collection_name, self.chroma_dir)
def real_search(query: str, top_k: int = 3) -> list[dict]:
"""Busca en ChromaDB y retorna chunks reales."""
results: list[SearchResult] = search(collection, query, n_results=top_k)
return [
{
"content": r.content,
"source": r.metadata.get("source", "desconocido"),
"score": r.score,
"chunk_id": r.chunk_id,
}
for r in results
]
registry.register(ToolDefinition(
name="search_documents",
description="Busca documentos relevantes en la base de conocimiento.",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Consulta de búsqueda",
},
"top_k": {
"type": "integer",
"description": "Número de resultados (default: 3)",
},
},
"required": ["query"],
},
function=real_search,
))
registry.register(ToolDefinition(
name="get_current_datetime",
description="Obtiene la fecha y hora actual.",
parameters={"type": "object", "properties": {}},
function=get_current_datetime,
))
return registry
# ---------------------------------------------------------------------------
# Demo
# ---------------------------------------------------------------------------
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(name)s | %(message)s")
agent = DocOpsAgent()
queries = [
"¿Cuál es el horario de trabajo?",
"¿Cuántos días de vacaciones corresponden el primer año?",
"¿Qué equipo de cómputo reciben los desarrolladores?",
]
for q in queries:
result = agent.run(q)
print(result.summary())