from __future__ import annotations import logging from typing import Any, Dict from langgraph.graph import END, START, StateGraph from ai_business_automation_agent.agents.decision_agent import decision_route, run_decision_agent from ai_business_automation_agent.agents.extraction_agent import run_extraction_agent from ai_business_automation_agent.agents.reporting_agent import run_reporting_agent from ai_business_automation_agent.agents.validation_agent import run_validation_agent from ai_business_automation_agent.agents.vendor_verification_agent import run_vendor_verification_agent from ai_business_automation_agent.llm import get_groq_llm from ai_business_automation_agent.tools.erp_tool import simulate_erp_update from ai_business_automation_agent.tools.web_search_tool import TavilyWebSearchTool from ai_business_automation_agent.utils import append_agent_log from ai_business_automation_agent.workflow.state_schema import InvoiceState logger = logging.getLogger(__name__) def build_workflow(): llm = get_groq_llm(model="llama-3.3-70b-versatile", temperature=0.0) try: web_search = TavilyWebSearchTool() except Exception as e: logger.warning("Tavily web search disabled: %s", e) web_search = None def extraction_node(state: Dict[str, Any]) -> Dict[str, Any]: return run_extraction_agent(state, llm) def vendor_node(state: Dict[str, Any]) -> Dict[str, Any]: return run_vendor_verification_agent(state, llm, web_search) def validation_node(state: Dict[str, Any]) -> Dict[str, Any]: return run_validation_agent(state, llm) def decision_node(state: Dict[str, Any]) -> Dict[str, Any]: return run_decision_agent(state, llm) def erp_node(state: Dict[str, Any]) -> Dict[str, Any]: extracted = state.get("extracted_data") or {} try: status = simulate_erp_update(extracted) updates = {"erp_update_status": status} updates.update(append_agent_log(state, agent="erp_tool", event="ok", payload=status)) return updates except Exception as e: err = {"status": "failed", "message": str(e)} updates = {"erp_update_status": err} updates.update(append_agent_log(state, agent="erp_tool", event="error", payload=err)) return updates def reporting_node(state: Dict[str, Any]) -> Dict[str, Any]: return run_reporting_agent(state, llm) graph = StateGraph(InvoiceState) graph.add_node("extraction_agent", extraction_node) graph.add_node("vendor_verification_agent", vendor_node) graph.add_node("validation_agent", validation_node) graph.add_node("decision_agent", decision_node) graph.add_node("erp_update_tool", erp_node) graph.add_node("reporting_agent", reporting_node) graph.add_edge(START, "extraction_agent") graph.add_edge("extraction_agent", "vendor_verification_agent") graph.add_edge("vendor_verification_agent", "validation_agent") graph.add_edge("validation_agent", "decision_agent") graph.add_conditional_edges( "decision_agent", decision_route, { "approved": "erp_update_tool", "manual_review": "reporting_agent", "rejected": "reporting_agent", }, ) graph.add_edge("erp_update_tool", "reporting_agent") graph.add_edge("reporting_agent", END) return graph.compile() def run_workflow(email_content: str) -> Dict[str, Any]: app = build_workflow() initial_state: InvoiceState = {"email_content": email_content, "agent_logs": []} return app.invoke(initial_state)