finryver-dev / agents /langgraph.py
dipan004's picture
Update agents/langgraph.py (#12)
2f16cc8 verified
from typing import TypedDict, Dict, Any, List, Annotated, Optional
import time, uuid, os
from langgraph.graph import StateGraph, END
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage
from agents.simple_tools import (
generate_notes_full_pipeline_from_path,
generate_balance_sheet,
generate_pnl_statement,
generate_cash_flow_statement,
generate_llm_notes,
)
class FinancialAgentState(TypedDict):
messages: Annotated[List[BaseMessage], "History"]
file_path: str
result: Dict[str, Any]
status: str
start_time: float
end_time: float
error: str
user_api_key: Optional[str]
feedback_context: Optional[Dict[str, Any]]
def make_workflow(tool_func):
def node(state: FinancialAgentState) -> FinancialAgentState:
state["start_time"] = time.time()
try:
# Prepare parameters for tool invocation
tool_params = {"file_path": state["file_path"]}
# Add feedback_context if available
if "feedback_context" in state:
tool_params["feedback_context"] = state["feedback_context"]
# Add user_api_key if available
if "user_api_key" in state:
tool_params["user_api_key"] = state["user_api_key"]
# Use .invoke() to avoid deprecation warning
result = tool_func.invoke(tool_params)
state["result"] = result
state["status"] = "success" if result.get("status") == "success" else "error"
state["error"] = result.get("error", "")
except Exception as e:
state["status"] = "error"
state["error"] = str(e)
state["end_time"] = time.time()
return state
wf = StateGraph(FinancialAgentState)
wf.add_node("run", node)
wf.set_entry_point("run")
wf.add_edge("run", END)
return wf.compile()
workflows = {
"notes": make_workflow(generate_notes_full_pipeline_from_path),
"pnl": make_workflow(generate_pnl_statement),
"bs": make_workflow(generate_balance_sheet),
"cf": make_workflow(generate_cash_flow_statement),
"notes-llm": make_workflow(generate_llm_notes),
}
def run_workflow(file_path: str, kind: str, **kwargs) -> Dict[str, Any]:
state = FinancialAgentState(
messages=[HumanMessage(content=f"Run {kind} for {file_path}")],
file_path=file_path,
result={},
status="",
start_time=0,
end_time=0,
error="",
user_api_key=None,
feedback_context=None,
)
# Add feedback_context if provided
if "feedback_context" in kwargs:
state["feedback_context"] = kwargs["feedback_context"]
# Add user_api_key if provided
if "user_api_key" in kwargs:
state["user_api_key"] = kwargs["user_api_key"]
final = workflows[kind].invoke(state)
return final