""" Simplified LangChain tools for FinRyver financial statement generation Focus: Notes, Balance Sheet, P&L, Cash Flow generation only """ from langchain_core.tools import tool import subprocess import os import subprocess import json import shutil import time import uuid from datetime import datetime from typing import Dict, Any import logging logger = logging.getLogger(__name__) @tool def generate_notes_full_pipeline_from_path(file_path: str, note_numbers: str = "") -> dict: """ Implements the full notes generation pipeline as in /hardcoded route, but as a tool. Args: file_path: Path to the uploaded Excel file note_numbers: Optional comma-separated note numbers Returns: dict with status, output_xlsx_path, and error if any """ import logging from notes.data_extraction import extract_trial_balance_data, analyze_and_save_results from notes.notes_generator import process_json from notes.json_to_excel import json_to_xlsx logger = logging.getLogger(__name__) try: os.makedirs("data/input", exist_ok=True) # Copy file to input dir with original name file_location = f"data/input/{os.path.basename(file_path)}" if os.path.abspath(file_path) != os.path.abspath(file_location): shutil.copyfile(file_path, file_location) os.makedirs("data/output1", exist_ok=True) structured_data = extract_trial_balance_data(file_location) output1_json = "data/output1/parsed_trial_balance.json" analyze_and_save_results(structured_data, output1_json) os.makedirs("data/output2", exist_ok=True) process_json(output1_json) notes_json = "data/output2/notes_output.json" with open(notes_json, "r", encoding="utf-8") as f: notes_data = json.load(f) if isinstance(notes_data, dict): for key in ["notes", "trial_balance"]: if key in notes_data: notes_data = notes_data[key] break def wrap_notes(notes): return {"notes": notes} if note_numbers: numbers = [n.strip() for n in note_numbers.split(",")] notes_data = [ note for note in notes_data if str(note.get('note_number', '')).strip() in numbers ] filtered_json = "data/output2/notes_output_filtered.json" with open(filtered_json, "w", encoding="utf-8") as f2: json.dump(wrap_notes(notes_data), f2, ensure_ascii=False, indent=2) json_input_for_excel = filtered_json else: temp_json = "data/output2/notes_output_wrapped.json" with open(temp_json, "w", encoding="utf-8") as f2: json.dump(wrap_notes(notes_data), f2, ensure_ascii=False, indent=2) json_input_for_excel = temp_json os.makedirs("data/output3", exist_ok=True) output3_xlsx = "data/output3/final_output.xlsx" json_to_xlsx(json_input_for_excel, output3_xlsx) return {"status": "success", "output_xlsx_path": output3_xlsx} except Exception as e: logger.error(f"notes_full_pipeline failed: {e}") return {"status": "error", "error": str(e)} @tool def generate_balance_sheet(file_path: str, user_api_key: str = None, **kwargs) -> Dict[str, Any]: """ Generate balance sheet from trial balance file using complete pipeline Args: file_path: Path to trial balance Excel file user_api_key: OpenRouter API key for LLM calls """ execution_id = str(uuid.uuid4())[:8] start_time = time.time() tool_name = "generate_balance_sheet" try: # Use the complete BS pipeline from the existing endpoint env = os.environ.copy() # Get user-provided API key from parameter logger.info(f"generate_balance_sheet: user_api_key received = {bool(user_api_key)}") if user_api_key: env["OPENROUTER_API_KEY"] = user_api_key logger.info("generate_balance_sheet: Setting OPENROUTER_API_KEY from user_api_key") elif os.getenv("OPENROUTER_API_KEY"): # Fallback to environment variable if no user key provided env["OPENROUTER_API_KEY"] = os.getenv("OPENROUTER_API_KEY") logger.info("generate_balance_sheet: Using OPENROUTER_API_KEY from environment") else: logger.warning("generate_balance_sheet: No API key available!") env["INPUT_FILE"] = "data/clean_financial_data_bs.json" cwd = os.getcwd() # Step 1: Run balance_sheet_data_extractor.py result1 = subprocess.run( ["python", "bs/balance_sheet_data_extractor.py", file_path], env=env, cwd=cwd, capture_output=True, text=True ) if result1.returncode != 0: return {"status": "error", "error": f"Balance sheet data extraction failed: {result1.stderr}"} # Step 2: Run balance_sheet_csv_to_json_converter.py result2 = subprocess.run( ["python", "bs/balance_sheet_csv_to_json_converter.py"], env=env, cwd=cwd, capture_output=True, text=True ) if result2.returncode != 0: return {"status": "error", "error": f"CSV to JSON conversion failed: {result2.stderr}"} # Step 3: Run balance_sheet_generator.py result3 = subprocess.run( ["python", "bs/balance_sheet_generator.py"], env=env, cwd=cwd, capture_output=True, text=True ) if result3.returncode == 0: # Parse the output file path from stdout output_file_path = None if result3.stdout: for line in result3.stdout.strip().split('\n'): if line.startswith('Output file:'): output_file_path = line.split('Output file:', 1)[1].strip() break # Fallback: check for output files in data/output directory if not output_file_path or not os.path.exists(output_file_path): output_dir = "data/output" os.makedirs(output_dir, exist_ok=True) output_files = [] if os.path.exists(output_dir): output_files = [f for f in os.listdir(output_dir) if f.endswith('.xlsx') and f.startswith('balance_sheet')] # Sort by modification time, get the most recent if output_files: output_files.sort(key=lambda x: os.path.getmtime(os.path.join(output_dir, x)), reverse=True) output_file_path = os.path.join(output_dir, output_files[0]) if output_file_path and os.path.exists(output_file_path): execution_time = round(time.time() - start_time, 2) return { "status": "success", "message": "Balance sheet generated successfully", "output_path": output_file_path, "execution_id": execution_id, "execution_time": execution_time } else: execution_time = round(time.time() - start_time, 2) return { "status": "error", "error": "Balance sheet generation completed but output file not found", "execution_id": execution_id, "execution_time": execution_time } else: execution_time = round(time.time() - start_time, 2) return { "status": "error", "error": f"Balance sheet generation failed: {result3.stderr}", "execution_id": execution_id } except Exception as e: execution_time = round(time.time() - start_time, 2) return { "status": "error", "error": str(e), "execution_id": execution_id, "execution_time": execution_time } @tool def generate_pnl_statement(file_path: str, **kwargs) -> Dict[str, Any]: """ Generate P&L statement from trial balance file using complete pipeline Args: file_path: Path to trial balance Excel file """ execution_id = str(uuid.uuid4())[:8] start_time = time.time() tool_name = "generate_pnl_statement" try: # Use the complete P&L pipeline from existing endpoint env = os.environ.copy() env["INPUT_FILE"] = "data/clean_financial_data_pnl.json" cwd = os.getcwd() # Step 1: Run profit_loss_data_extractor.py result1 = subprocess.run( ["python", "pnl/profit_loss_data_extractor.py", file_path], env=env, cwd=cwd, capture_output=True, text=True ) if result1.returncode != 0: return {"status": "error", "error": f"P&L data extraction failed: {result1.stderr}"} # Step 2: Run profit_loss_csv_to_json_converter.py result2 = subprocess.run( ["python", "pnl/profit_loss_csv_to_json_converter.py"], env=env, cwd=cwd, capture_output=True, text=True ) if result2.returncode != 0: return {"status": "error", "error": f"P&L CSV to JSON conversion failed: {result2.stderr}"} # Step 3: Run profit_loss_statement_generator.py result3 = subprocess.run( ["python", "pnl/profit_loss_statement_generator.py", "data/clean_financial_data_pnl.json"], env=env, cwd=cwd, capture_output=True, text=True ) if result3.returncode == 0: execution_time = round(time.time() - start_time, 2) output_path = "data/pnl_statement.xlsx" return { "status": "success", "message": "P&L statement generated successfully", "output_path": output_path, "execution_id": execution_id, "execution_time": execution_time } else: execution_time = round(time.time() - start_time, 2) return { "status": "error", "error": f"P&L generation failed: {result3.stderr}", "execution_id": execution_id } except Exception as e: execution_time = round(time.time() - start_time, 2) return { "status": "error", "error": str(e), "execution_id": execution_id, "execution_time": execution_time } @tool def generate_cash_flow_statement(file_path: str, **kwargs) -> Dict[str, Any]: """ Generate cash flow statement from trial balance file using complete pipeline Args: file_path: Path to trial balance Excel file """ execution_id = str(uuid.uuid4())[:8] start_time = time.time() tool_name = "generate_cash_flow_statement" try: # Use the complete CF pipeline from existing endpoint env = os.environ.copy() env["INPUT_FILE"] = "data/clean_financial_data_cfs.json" env["CFS_JSON_INPUT"] = "data/clean_financial_data_cfs.json" env["CFS_JSON_OUTPUT"] = "data/extracted_cfs_data.json" env["CFS_EXTRACTED_FILE"] = "data/extracted_cfs_data.json" env["CFS_OUTPUT_FILE"] = "data/cash_flow_statements.xlsx" cwd = os.getcwd() # Step 1: Run cash_flow_data_extractor.py result1 = subprocess.run( ["python", "cf/cash_flow_data_extractor.py", file_path], env=env, cwd=cwd, capture_output=True, text=True ) if result1.returncode != 0: return {"status": "error", "error": f"Cash flow data extraction failed: {result1.stderr}"} # Step 2: Run cash_flow_csv_to_json_converter.py result2 = subprocess.run( ["python", "cf/cash_flow_csv_to_json_converter.py"], env=env, cwd=cwd, capture_output=True, text=True ) if result2.returncode != 0: return {"status": "error", "error": f"Cash flow CSV to JSON conversion failed: {result2.stderr}"} # Step 3: Run cash_flow_data_processor.py result3 = subprocess.run( ["python", "cf/cash_flow_data_processor.py"], env=env, cwd=cwd, capture_output=True, text=True ) if result3.returncode != 0: return {"status": "error", "error": f"Cash flow data processing failed: {result3.stderr}"} # Step 4: Run cash_flow_statement_generator.py result4 = subprocess.run( ["python", "cf/cash_flow_statement_generator.py"], env=env, cwd=cwd, capture_output=True, text=True ) if result4.returncode == 0: execution_time = round(time.time() - start_time, 2) output_path = "data/cash_flow_statements.xlsx" return { "status": "success", "message": "Cash flow statement generated successfully", "output_path": output_path, "execution_id": execution_id, "execution_time": execution_time } else: execution_time = round(time.time() - start_time, 2) return { "status": "error", "error": f"Cash flow generation failed: {result4.stderr}", "execution_id": execution_id } except Exception as e: execution_time = round(time.time() - start_time, 2) return { "status": "error", "error": str(e), "execution_id": execution_id, "execution_time": execution_time } @tool def generate_llm_notes(file_path: str, note_numbers: str = "", user_api_key: str = None, **kwargs) -> Dict[str, Any]: """ Generate notes using LLM-based approach (FlexibleFinancialNoteGenerator) Args: file_path: Path to trial balance Excel file note_numbers: Optional comma-separated note numbers to generate user_api_key: OpenRouter API key for LLM calls Returns: dict with status, output_xlsx_path, and error if any """ execution_id = str(uuid.uuid4())[:8] start_time = time.time() try: # Use the complete LLM notes pipeline from existing scripts env = os.environ.copy() # Get user-provided API key from parameter if user_api_key: env["OPENROUTER_API_KEY"] = user_api_key elif os.getenv("OPENROUTER_API_KEY"): env["OPENROUTER_API_KEY"] = os.getenv("OPENROUTER_API_KEY") cwd = os.getcwd() # Step 1: Run LLM notes data processor logger.info("Step 1: Processing trial balance data") result1 = subprocess.run( ["python", "notes/llm_notes_data_processor.py", file_path], env=env, cwd=cwd, capture_output=True, text=True, timeout=120 # ADD THIS: 2 minute timeout ) if result1.returncode != 0: return {"status": "error", "error": f"LLM notes data processing failed: {result1.stderr}"} # Step 2: Run LLM notes generator logger.info("Step 2: Generating notes using LLM") if note_numbers: result2 = subprocess.run( ["python", "notes/llm_notes_generator.py", "specific", note_numbers], env=env, cwd=cwd, capture_output=True, text=True, timeout=600 # ADD THIS: 10 minute timeout for LLM generation ) else: result2 = subprocess.run( ["python", "notes/llm_notes_generator.py", "all", ""], env=env, cwd=cwd, capture_output=True, text=True, timeout=900 # ADD THIS: 15 minute timeout for all notes ) if result2.returncode != 0: return {"status": "error", "error": f"LLM notes generation failed: {result2.stderr}"} # Step 3: Convert to Excel logger.info("Step 3: Converting to Excel format") input_json = "data/generated_notes/notes.json" # Create unique output path in llm_generated folder timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') output_folder = "data/notes_llm_generated" os.makedirs(output_folder, exist_ok=True) output_excel = f"{output_folder}/new_{timestamp}_{execution_id}.xlsx" # Check if the JSON file was created and has content if not os.path.exists(input_json): execution_time = round(time.time() - start_time, 2) return { "status": "error", "error": "No notes JSON file was generated - LLM may have failed to produce any notes", "execution_id": execution_id, "execution_time": execution_time } # Apply UDFs if provided in kwargs feedback_context = kwargs.get('feedback_context', {}) udfs_to_apply = feedback_context.get('udfs', []) if udfs_to_apply: try: # Load JSON data with open(input_json, 'r', encoding='utf-8') as f: notes_data = json.load(f) # Apply each UDF for udf_code in udfs_to_apply: try: local_vars = {} exec(udf_code, {"datetime": datetime}, local_vars) # Find the UDF function udf_func = None for var_name, var_value in local_vars.items(): if callable(var_value) and var_name.startswith('apply_user_feedback'): udf_func = var_value break if udf_func: notes_data = udf_func(notes_data, feedback_context.get('feedback_type', 'general')) logger.info(f"Applied UDF successfully") except Exception as e: logger.warning(f"Failed to apply UDF: {e}") continue # Save modified JSON back with open(input_json, 'w', encoding='utf-8') as f: json.dump(notes_data, f, ensure_ascii=False, indent=2) except Exception as e: logger.error(f"Error applying UDFs to JSON: {e}") result3 = subprocess.run( ["python", "notes/llm_notes_excel_converter.py", input_json, output_excel], env=env, cwd=cwd, capture_output=True, text=True, timeout=120 # ADD THIS: 2 minute timeout ) if result3.returncode == 0: execution_time = round(time.time() - start_time, 2) return { "status": "success", "message": "LLM-based notes generated successfully", "output_xlsx_path": output_excel, "execution_id": execution_id, "execution_time": execution_time } else: execution_time = round(time.time() - start_time, 2) return { "status": "error", "error": f"Excel conversion failed: {result3.stderr}", "execution_id": execution_id, "execution_time": execution_time } except subprocess.TimeoutExpired as te: # ADD THIS: Handle timeout execution_time = round(time.time() - start_time, 2) logger.error(f"LLM notes generation timed out after {execution_time}s") return { "status": "error", "error": f"LLM notes generation timed out. The process took longer than expected. Try generating fewer notes or check your API key.", "execution_id": execution_id, "execution_time": execution_time, "timeout": True } except Exception as e: execution_time = round(time.time() - start_time, 2) logger.error(f"LLM notes generation failed: {e}") return { "status": "error", "error": f"LLM notes generation failed: {e}", "execution_id": execution_id, "execution_time": execution_time } # Simplified tool list - only financial statement generation FINANCIAL_TOOLS = [ generate_notes_full_pipeline_from_path, generate_balance_sheet, generate_pnl_statement, generate_cash_flow_statement, generate_llm_notes ]