finryver-dev / agents /langgraph.py
Sahil Garg
udf generation is dynamic, different files, udf application on json
29ee329
raw
history blame
2.45 kB
from typing import TypedDict, Dict, Any, List, Annotated
import time, uuid, os
from langgraph.graph import StateGraph, END
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage
from agents.simple_tools import (
generate_notes_full_pipeline_from_path,
generate_balance_sheet,
generate_pnl_statement,
generate_cash_flow_statement,
generate_llm_notes,
)
class FinancialAgentState(TypedDict):
messages: Annotated[List[BaseMessage], "History"]
file_path: str
result: Dict[str, Any]
status: str
start_time: float
end_time: float
error: str
def make_workflow(tool_func):
def node(state: FinancialAgentState) -> FinancialAgentState:
state["start_time"] = time.time()
try:
# Prepare parameters for tool invocation
tool_params = {"file_path": state["file_path"]}
# Add feedback_context if available
if "feedback_context" in state:
tool_params["feedback_context"] = state["feedback_context"]
# Use .invoke() to avoid deprecation warning
result = tool_func.invoke(tool_params)
state["result"] = result
state["status"] = "success" if result.get("status") == "success" else "error"
state["error"] = result.get("error", "")
except Exception as e:
state["status"] = "error"
state["error"] = str(e)
state["end_time"] = time.time()
return state
wf = StateGraph(FinancialAgentState)
wf.add_node("run", node)
wf.set_entry_point("run")
wf.add_edge("run", END)
return wf.compile()
workflows = {
"notes": make_workflow(generate_notes_full_pipeline_from_path),
"pnl": make_workflow(generate_pnl_statement),
"bs": make_workflow(generate_balance_sheet),
"cf": make_workflow(generate_cash_flow_statement),
"notes-llm": make_workflow(generate_llm_notes),
}
def run_workflow(file_path: str, kind: str, **kwargs) -> Dict[str, Any]:
state = FinancialAgentState(
messages=[HumanMessage(content=f"Run {kind} for {file_path}")],
file_path=file_path,
result={},
status="",
start_time=0,
end_time=0,
error="",
)
# Add feedback_context if provided
if "feedback_context" in kwargs:
state["feedback_context"] = kwargs["feedback_context"]
final = workflows[kind].invoke(state)
return final