finryver-dev / agents /simple_tools.py
dipan004's picture
Update agents/simple_tools.py
35cd5eb verified
"""
Simplified LangChain tools for FinRyver financial statement generation
Focus: Notes, Balance Sheet, P&L, Cash Flow generation only
"""
from langchain_core.tools import tool
import subprocess
import os
import subprocess
import json
import shutil
import time
import uuid
from datetime import datetime
from typing import Dict, Any
import logging
logger = logging.getLogger(__name__)
@tool
def generate_notes_full_pipeline_from_path(file_path: str, note_numbers: str = "") -> dict:
"""
Implements the full notes generation pipeline as in /hardcoded route, but as a tool.
Args:
file_path: Path to the uploaded Excel file
note_numbers: Optional comma-separated note numbers
Returns:
dict with status, output_xlsx_path, and error if any
"""
import logging
from notes.data_extraction import extract_trial_balance_data, analyze_and_save_results
from notes.notes_generator import process_json
from notes.json_to_excel import json_to_xlsx
logger = logging.getLogger(__name__)
try:
os.makedirs("data/input", exist_ok=True)
# Copy file to input dir with original name
file_location = f"data/input/{os.path.basename(file_path)}"
if os.path.abspath(file_path) != os.path.abspath(file_location):
shutil.copyfile(file_path, file_location)
os.makedirs("data/output1", exist_ok=True)
structured_data = extract_trial_balance_data(file_location)
output1_json = "data/output1/parsed_trial_balance.json"
analyze_and_save_results(structured_data, output1_json)
os.makedirs("data/output2", exist_ok=True)
process_json(output1_json)
notes_json = "data/output2/notes_output.json"
with open(notes_json, "r", encoding="utf-8") as f:
notes_data = json.load(f)
if isinstance(notes_data, dict):
for key in ["notes", "trial_balance"]:
if key in notes_data:
notes_data = notes_data[key]
break
def wrap_notes(notes):
return {"notes": notes}
if note_numbers:
numbers = [n.strip() for n in note_numbers.split(",")]
notes_data = [
note for note in notes_data
if str(note.get('note_number', '')).strip() in numbers
]
filtered_json = "data/output2/notes_output_filtered.json"
with open(filtered_json, "w", encoding="utf-8") as f2:
json.dump(wrap_notes(notes_data), f2, ensure_ascii=False, indent=2)
json_input_for_excel = filtered_json
else:
temp_json = "data/output2/notes_output_wrapped.json"
with open(temp_json, "w", encoding="utf-8") as f2:
json.dump(wrap_notes(notes_data), f2, ensure_ascii=False, indent=2)
json_input_for_excel = temp_json
os.makedirs("data/output3", exist_ok=True)
output3_xlsx = "data/output3/final_output.xlsx"
json_to_xlsx(json_input_for_excel, output3_xlsx)
return {"status": "success", "output_xlsx_path": output3_xlsx}
except Exception as e:
logger.error(f"notes_full_pipeline failed: {e}")
return {"status": "error", "error": str(e)}
@tool
def generate_balance_sheet(file_path: str, user_api_key: str = None, **kwargs) -> Dict[str, Any]:
"""
Generate balance sheet from trial balance file using complete pipeline
Args:
file_path: Path to trial balance Excel file
user_api_key: OpenRouter API key for LLM calls
"""
execution_id = str(uuid.uuid4())[:8]
start_time = time.time()
tool_name = "generate_balance_sheet"
try:
# Use the complete BS pipeline from the existing endpoint
env = os.environ.copy()
# Get user-provided API key from parameter
logger.info(f"generate_balance_sheet: user_api_key received = {bool(user_api_key)}")
if user_api_key:
env["OPENROUTER_API_KEY"] = user_api_key
logger.info("generate_balance_sheet: Setting OPENROUTER_API_KEY from user_api_key")
elif os.getenv("OPENROUTER_API_KEY"):
# Fallback to environment variable if no user key provided
env["OPENROUTER_API_KEY"] = os.getenv("OPENROUTER_API_KEY")
logger.info("generate_balance_sheet: Using OPENROUTER_API_KEY from environment")
else:
logger.warning("generate_balance_sheet: No API key available!")
env["INPUT_FILE"] = "data/clean_financial_data_bs.json"
cwd = os.getcwd()
# Step 1: Run balance_sheet_data_extractor.py
result1 = subprocess.run(
["python", "bs/balance_sheet_data_extractor.py", file_path],
env=env,
cwd=cwd,
capture_output=True,
text=True
)
if result1.returncode != 0:
return {"status": "error", "error": f"Balance sheet data extraction failed: {result1.stderr}"}
# Step 2: Run balance_sheet_csv_to_json_converter.py
result2 = subprocess.run(
["python", "bs/balance_sheet_csv_to_json_converter.py"],
env=env,
cwd=cwd,
capture_output=True,
text=True
)
if result2.returncode != 0:
return {"status": "error", "error": f"CSV to JSON conversion failed: {result2.stderr}"}
# Step 3: Run balance_sheet_generator.py
result3 = subprocess.run(
["python", "bs/balance_sheet_generator.py"],
env=env,
cwd=cwd,
capture_output=True,
text=True
)
if result3.returncode == 0:
# Parse the output file path from stdout
output_file_path = None
if result3.stdout:
for line in result3.stdout.strip().split('\n'):
if line.startswith('Output file:'):
output_file_path = line.split('Output file:', 1)[1].strip()
break
# Fallback: check for output files in data/output directory
if not output_file_path or not os.path.exists(output_file_path):
output_dir = "data/output"
os.makedirs(output_dir, exist_ok=True)
output_files = []
if os.path.exists(output_dir):
output_files = [f for f in os.listdir(output_dir) if f.endswith('.xlsx') and f.startswith('balance_sheet')]
# Sort by modification time, get the most recent
if output_files:
output_files.sort(key=lambda x: os.path.getmtime(os.path.join(output_dir, x)), reverse=True)
output_file_path = os.path.join(output_dir, output_files[0])
if output_file_path and os.path.exists(output_file_path):
execution_time = round(time.time() - start_time, 2)
return {
"status": "success",
"message": "Balance sheet generated successfully",
"output_path": output_file_path,
"execution_id": execution_id,
"execution_time": execution_time
}
else:
execution_time = round(time.time() - start_time, 2)
return {
"status": "error",
"error": "Balance sheet generation completed but output file not found",
"execution_id": execution_id,
"execution_time": execution_time
}
else:
execution_time = round(time.time() - start_time, 2)
return {
"status": "error",
"error": f"Balance sheet generation failed: {result3.stderr}",
"execution_id": execution_id
}
except Exception as e:
execution_time = round(time.time() - start_time, 2)
return {
"status": "error",
"error": str(e),
"execution_id": execution_id,
"execution_time": execution_time
}
@tool
def generate_pnl_statement(file_path: str, **kwargs) -> Dict[str, Any]:
"""
Generate P&L statement from trial balance file using complete pipeline
Args:
file_path: Path to trial balance Excel file
"""
execution_id = str(uuid.uuid4())[:8]
start_time = time.time()
tool_name = "generate_pnl_statement"
try:
# Use the complete P&L pipeline from existing endpoint
env = os.environ.copy()
env["INPUT_FILE"] = "data/clean_financial_data_pnl.json"
cwd = os.getcwd()
# Step 1: Run profit_loss_data_extractor.py
result1 = subprocess.run(
["python", "pnl/profit_loss_data_extractor.py", file_path],
env=env,
cwd=cwd,
capture_output=True,
text=True
)
if result1.returncode != 0:
return {"status": "error", "error": f"P&L data extraction failed: {result1.stderr}"}
# Step 2: Run profit_loss_csv_to_json_converter.py
result2 = subprocess.run(
["python", "pnl/profit_loss_csv_to_json_converter.py"],
env=env,
cwd=cwd,
capture_output=True,
text=True
)
if result2.returncode != 0:
return {"status": "error", "error": f"P&L CSV to JSON conversion failed: {result2.stderr}"}
# Step 3: Run profit_loss_statement_generator.py
result3 = subprocess.run(
["python", "pnl/profit_loss_statement_generator.py", "data/clean_financial_data_pnl.json"],
env=env,
cwd=cwd,
capture_output=True,
text=True
)
if result3.returncode == 0:
execution_time = round(time.time() - start_time, 2)
output_path = "data/pnl_statement.xlsx"
return {
"status": "success",
"message": "P&L statement generated successfully",
"output_path": output_path,
"execution_id": execution_id,
"execution_time": execution_time
}
else:
execution_time = round(time.time() - start_time, 2)
return {
"status": "error",
"error": f"P&L generation failed: {result3.stderr}",
"execution_id": execution_id
}
except Exception as e:
execution_time = round(time.time() - start_time, 2)
return {
"status": "error",
"error": str(e),
"execution_id": execution_id,
"execution_time": execution_time
}
@tool
def generate_cash_flow_statement(file_path: str, **kwargs) -> Dict[str, Any]:
"""
Generate cash flow statement from trial balance file using complete pipeline
Args:
file_path: Path to trial balance Excel file
"""
execution_id = str(uuid.uuid4())[:8]
start_time = time.time()
tool_name = "generate_cash_flow_statement"
try:
# Use the complete CF pipeline from existing endpoint
env = os.environ.copy()
env["INPUT_FILE"] = "data/clean_financial_data_cfs.json"
env["CFS_JSON_INPUT"] = "data/clean_financial_data_cfs.json"
env["CFS_JSON_OUTPUT"] = "data/extracted_cfs_data.json"
env["CFS_EXTRACTED_FILE"] = "data/extracted_cfs_data.json"
env["CFS_OUTPUT_FILE"] = "data/cash_flow_statements.xlsx"
cwd = os.getcwd()
# Step 1: Run cash_flow_data_extractor.py
result1 = subprocess.run(
["python", "cf/cash_flow_data_extractor.py", file_path],
env=env,
cwd=cwd,
capture_output=True,
text=True
)
if result1.returncode != 0:
return {"status": "error", "error": f"Cash flow data extraction failed: {result1.stderr}"}
# Step 2: Run cash_flow_csv_to_json_converter.py
result2 = subprocess.run(
["python", "cf/cash_flow_csv_to_json_converter.py"],
env=env,
cwd=cwd,
capture_output=True,
text=True
)
if result2.returncode != 0:
return {"status": "error", "error": f"Cash flow CSV to JSON conversion failed: {result2.stderr}"}
# Step 3: Run cash_flow_data_processor.py
result3 = subprocess.run(
["python", "cf/cash_flow_data_processor.py"],
env=env,
cwd=cwd,
capture_output=True,
text=True
)
if result3.returncode != 0:
return {"status": "error", "error": f"Cash flow data processing failed: {result3.stderr}"}
# Step 4: Run cash_flow_statement_generator.py
result4 = subprocess.run(
["python", "cf/cash_flow_statement_generator.py"],
env=env,
cwd=cwd,
capture_output=True,
text=True
)
if result4.returncode == 0:
execution_time = round(time.time() - start_time, 2)
output_path = "data/cash_flow_statements.xlsx"
return {
"status": "success",
"message": "Cash flow statement generated successfully",
"output_path": output_path,
"execution_id": execution_id,
"execution_time": execution_time
}
else:
execution_time = round(time.time() - start_time, 2)
return {
"status": "error",
"error": f"Cash flow generation failed: {result4.stderr}",
"execution_id": execution_id
}
except Exception as e:
execution_time = round(time.time() - start_time, 2)
return {
"status": "error",
"error": str(e),
"execution_id": execution_id,
"execution_time": execution_time
}
@tool
def generate_llm_notes(file_path: str, note_numbers: str = "", user_api_key: str = None, **kwargs) -> Dict[str, Any]:
"""
Generate notes using LLM-based approach (FlexibleFinancialNoteGenerator)
Args:
file_path: Path to trial balance Excel file
note_numbers: Optional comma-separated note numbers to generate
user_api_key: OpenRouter API key for LLM calls
Returns:
dict with status, output_xlsx_path, and error if any
"""
execution_id = str(uuid.uuid4())[:8]
start_time = time.time()
try:
# Use the complete LLM notes pipeline from existing scripts
env = os.environ.copy()
# Get user-provided API key from parameter
if user_api_key:
env["OPENROUTER_API_KEY"] = user_api_key
elif os.getenv("OPENROUTER_API_KEY"):
env["OPENROUTER_API_KEY"] = os.getenv("OPENROUTER_API_KEY")
cwd = os.getcwd()
# Step 1: Run LLM notes data processor
logger.info("Step 1: Processing trial balance data")
result1 = subprocess.run(
["python", "notes/llm_notes_data_processor.py", file_path],
env=env,
cwd=cwd,
capture_output=True,
text=True,
timeout=120 # ADD THIS: 2 minute timeout
)
if result1.returncode != 0:
return {"status": "error", "error": f"LLM notes data processing failed: {result1.stderr}"}
# Step 2: Run LLM notes generator
logger.info("Step 2: Generating notes using LLM")
if note_numbers:
result2 = subprocess.run(
["python", "notes/llm_notes_generator.py", "specific", note_numbers],
env=env,
cwd=cwd,
capture_output=True,
text=True,
timeout=600 # ADD THIS: 10 minute timeout for LLM generation
)
else:
result2 = subprocess.run(
["python", "notes/llm_notes_generator.py", "all", ""],
env=env,
cwd=cwd,
capture_output=True,
text=True,
timeout=900 # ADD THIS: 15 minute timeout for all notes
)
if result2.returncode != 0:
return {"status": "error", "error": f"LLM notes generation failed: {result2.stderr}"}
# Step 3: Convert to Excel
logger.info("Step 3: Converting to Excel format")
input_json = "data/generated_notes/notes.json"
# Create unique output path in llm_generated folder
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_folder = "data/notes_llm_generated"
os.makedirs(output_folder, exist_ok=True)
output_excel = f"{output_folder}/new_{timestamp}_{execution_id}.xlsx"
# Check if the JSON file was created and has content
if not os.path.exists(input_json):
execution_time = round(time.time() - start_time, 2)
return {
"status": "error",
"error": "No notes JSON file was generated - LLM may have failed to produce any notes",
"execution_id": execution_id,
"execution_time": execution_time
}
# Apply UDFs if provided in kwargs
feedback_context = kwargs.get('feedback_context', {})
udfs_to_apply = feedback_context.get('udfs', [])
if udfs_to_apply:
try:
# Load JSON data
with open(input_json, 'r', encoding='utf-8') as f:
notes_data = json.load(f)
# Apply each UDF
for udf_code in udfs_to_apply:
try:
local_vars = {}
exec(udf_code, {"datetime": datetime}, local_vars)
# Find the UDF function
udf_func = None
for var_name, var_value in local_vars.items():
if callable(var_value) and var_name.startswith('apply_user_feedback'):
udf_func = var_value
break
if udf_func:
notes_data = udf_func(notes_data, feedback_context.get('feedback_type', 'general'))
logger.info(f"Applied UDF successfully")
except Exception as e:
logger.warning(f"Failed to apply UDF: {e}")
continue
# Save modified JSON back
with open(input_json, 'w', encoding='utf-8') as f:
json.dump(notes_data, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"Error applying UDFs to JSON: {e}")
result3 = subprocess.run(
["python", "notes/llm_notes_excel_converter.py", input_json, output_excel],
env=env,
cwd=cwd,
capture_output=True,
text=True,
timeout=120 # ADD THIS: 2 minute timeout
)
if result3.returncode == 0:
execution_time = round(time.time() - start_time, 2)
return {
"status": "success",
"message": "LLM-based notes generated successfully",
"output_xlsx_path": output_excel,
"execution_id": execution_id,
"execution_time": execution_time
}
else:
execution_time = round(time.time() - start_time, 2)
return {
"status": "error",
"error": f"Excel conversion failed: {result3.stderr}",
"execution_id": execution_id,
"execution_time": execution_time
}
except subprocess.TimeoutExpired as te: # ADD THIS: Handle timeout
execution_time = round(time.time() - start_time, 2)
logger.error(f"LLM notes generation timed out after {execution_time}s")
return {
"status": "error",
"error": f"LLM notes generation timed out. The process took longer than expected. Try generating fewer notes or check your API key.",
"execution_id": execution_id,
"execution_time": execution_time,
"timeout": True
}
except Exception as e:
execution_time = round(time.time() - start_time, 2)
logger.error(f"LLM notes generation failed: {e}")
return {
"status": "error",
"error": f"LLM notes generation failed: {e}",
"execution_id": execution_id,
"execution_time": execution_time
}
# Simplified tool list - only financial statement generation
FINANCIAL_TOOLS = [
generate_notes_full_pipeline_from_path,
generate_balance_sheet,
generate_pnl_statement,
generate_cash_flow_statement,
generate_llm_notes
]