AI_BUSINESS_PROCESS_AUTOMATION / agents /validation_agent.py
parthib07's picture
Upload 52 files
61411b5 verified
from __future__ import annotations
import json
import logging
import os
from typing import Any, Dict, List
from langchain_core.messages import HumanMessage
from ai_business_automation_agent.prompts.validation_prompt import VALIDATION_PROMPT
from ai_business_automation_agent.utils import append_agent_log, parse_llm_json
from ai_business_automation_agent.vectorstore.pinecone_client import PineconeVectorStore
logger = logging.getLogger(__name__)
def _format_policy_context(chunks: List[Dict[str, Any]]) -> str:
if not chunks:
return "No policy context available."
lines = []
for c in chunks:
score = c.get("score")
text = (c.get("text") or "").strip()
if text:
lines.append(f"- (score={score}) {text}")
return "\n".join(lines).strip() or "No policy context available."
def run_validation_agent(state: Dict[str, Any], llm) -> Dict[str, Any]:
extracted = state.get("extracted_data") or {}
vendor_ver = state.get("vendor_verification") or {}
policy_context = "No policy context available."
try:
vs = PineconeVectorStore(namespace="policies")
if os.getenv("SEED_VECTORSTORE", "true").lower() in {"1", "true", "yes"}:
vs.seed_default_policies()
query = json.dumps(
{
"invoice": extracted.get("invoice", {}),
"vendor": extracted.get("vendor", {}),
"vendor_verification": vendor_ver,
},
ensure_ascii=False,
)
chunks = vs.retrieve(query, top_k=5)
policy_context = _format_policy_context(chunks)
rag_payload = {"retrieved": chunks}
except Exception as e:
logger.warning("Pinecone retrieval unavailable: %s", e)
rag_payload = {"error": str(e)}
prompt = VALIDATION_PROMPT.format(
extracted_json=json.dumps(extracted, ensure_ascii=False),
vendor_verification_json=json.dumps(vendor_ver, ensure_ascii=False),
policy_context=policy_context,
)
resp = llm.invoke([HumanMessage(content=prompt)])
text = getattr(resp, "content", str(resp))
parsed, err = parse_llm_json(text)
updates: Dict[str, Any] = {}
if err:
logger.warning("Validation JSON parse error: %s", err)
updates["validation_status"] = {
"status": "needs_review",
"issues": [{"code": "PARSING_ERROR", "severity": "high", "message": err}],
"compliance_flags": [],
"validated_fields": {},
"recommendation": "manual_review",
"raw_model_output": text,
"rag": rag_payload,
}
updates.update(append_agent_log(state, agent="validation", event="error", payload={"error": err}))
else:
parsed["rag"] = rag_payload
updates["validation_status"] = parsed
updates.update(append_agent_log(state, agent="validation", event="ok", payload=parsed))
updates.update(append_agent_log(state, agent="validation", event="rag", payload=rag_payload))
updates.update(append_agent_log(state, agent="validation", event="prompt", payload={"prompt": prompt}))
updates.update(append_agent_log(state, agent="validation", event="raw_response", payload={"text": text}))
return updates