Spaces:
Sleeping
Sleeping
| """ | |
| Simplified LangChain tools for FinRyver financial statement generation | |
| Focus: Notes, Balance Sheet, P&L, Cash Flow generation only | |
| """ | |
| from langchain_core.tools import tool | |
| import subprocess | |
| import os | |
| import subprocess | |
| import json | |
| import shutil | |
| import time | |
| import uuid | |
| from datetime import datetime | |
| from typing import Dict, Any | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| def generate_notes_full_pipeline_from_path(file_path: str, note_numbers: str = "") -> dict: | |
| """ | |
| Implements the full notes generation pipeline as in /hardcoded route, but as a tool. | |
| Args: | |
| file_path: Path to the uploaded Excel file | |
| note_numbers: Optional comma-separated note numbers | |
| Returns: | |
| dict with status, output_xlsx_path, and error if any | |
| """ | |
| import logging | |
| from notes.data_extraction import extract_trial_balance_data, analyze_and_save_results | |
| from notes.notes_generator import process_json | |
| from notes.json_to_excel import json_to_xlsx | |
| logger = logging.getLogger(__name__) | |
| try: | |
| os.makedirs("data/input", exist_ok=True) | |
| # Copy file to input dir with original name | |
| file_location = f"data/input/{os.path.basename(file_path)}" | |
| if os.path.abspath(file_path) != os.path.abspath(file_location): | |
| shutil.copyfile(file_path, file_location) | |
| os.makedirs("data/output1", exist_ok=True) | |
| structured_data = extract_trial_balance_data(file_location) | |
| output1_json = "data/output1/parsed_trial_balance.json" | |
| analyze_and_save_results(structured_data, output1_json) | |
| os.makedirs("data/output2", exist_ok=True) | |
| process_json(output1_json) | |
| notes_json = "data/output2/notes_output.json" | |
| with open(notes_json, "r", encoding="utf-8") as f: | |
| notes_data = json.load(f) | |
| if isinstance(notes_data, dict): | |
| for key in ["notes", "trial_balance"]: | |
| if key in notes_data: | |
| notes_data = notes_data[key] | |
| break | |
| def wrap_notes(notes): | |
| return {"notes": notes} | |
| if note_numbers: | |
| numbers = [n.strip() for n in note_numbers.split(",")] | |
| notes_data = [ | |
| note for note in notes_data | |
| if str(note.get('note_number', '')).strip() in numbers | |
| ] | |
| filtered_json = "data/output2/notes_output_filtered.json" | |
| with open(filtered_json, "w", encoding="utf-8") as f2: | |
| json.dump(wrap_notes(notes_data), f2, ensure_ascii=False, indent=2) | |
| json_input_for_excel = filtered_json | |
| else: | |
| temp_json = "data/output2/notes_output_wrapped.json" | |
| with open(temp_json, "w", encoding="utf-8") as f2: | |
| json.dump(wrap_notes(notes_data), f2, ensure_ascii=False, indent=2) | |
| json_input_for_excel = temp_json | |
| os.makedirs("data/output3", exist_ok=True) | |
| output3_xlsx = "data/output3/final_output.xlsx" | |
| json_to_xlsx(json_input_for_excel, output3_xlsx) | |
| return {"status": "success", "output_xlsx_path": output3_xlsx} | |
| except Exception as e: | |
| logger.error(f"notes_full_pipeline failed: {e}") | |
| return {"status": "error", "error": str(e)} | |
| def generate_balance_sheet(file_path: str, user_api_key: str = None, **kwargs) -> Dict[str, Any]: | |
| """ | |
| Generate balance sheet from trial balance file using complete pipeline | |
| Args: | |
| file_path: Path to trial balance Excel file | |
| user_api_key: OpenRouter API key for LLM calls | |
| """ | |
| execution_id = str(uuid.uuid4())[:8] | |
| start_time = time.time() | |
| tool_name = "generate_balance_sheet" | |
| try: | |
| # Use the complete BS pipeline from the existing endpoint | |
| env = os.environ.copy() | |
| # Get user-provided API key from parameter | |
| logger.info(f"generate_balance_sheet: user_api_key received = {bool(user_api_key)}") | |
| if user_api_key: | |
| env["OPENROUTER_API_KEY"] = user_api_key | |
| logger.info("generate_balance_sheet: Setting OPENROUTER_API_KEY from user_api_key") | |
| elif os.getenv("OPENROUTER_API_KEY"): | |
| # Fallback to environment variable if no user key provided | |
| env["OPENROUTER_API_KEY"] = os.getenv("OPENROUTER_API_KEY") | |
| logger.info("generate_balance_sheet: Using OPENROUTER_API_KEY from environment") | |
| else: | |
| logger.warning("generate_balance_sheet: No API key available!") | |
| env["INPUT_FILE"] = "data/clean_financial_data_bs.json" | |
| cwd = os.getcwd() | |
| # Step 1: Run balance_sheet_data_extractor.py | |
| result1 = subprocess.run( | |
| ["python", "bs/balance_sheet_data_extractor.py", file_path], | |
| env=env, | |
| cwd=cwd, | |
| capture_output=True, | |
| text=True | |
| ) | |
| if result1.returncode != 0: | |
| return {"status": "error", "error": f"Balance sheet data extraction failed: {result1.stderr}"} | |
| # Step 2: Run balance_sheet_csv_to_json_converter.py | |
| result2 = subprocess.run( | |
| ["python", "bs/balance_sheet_csv_to_json_converter.py"], | |
| env=env, | |
| cwd=cwd, | |
| capture_output=True, | |
| text=True | |
| ) | |
| if result2.returncode != 0: | |
| return {"status": "error", "error": f"CSV to JSON conversion failed: {result2.stderr}"} | |
| # Step 3: Run balance_sheet_generator.py | |
| result3 = subprocess.run( | |
| ["python", "bs/balance_sheet_generator.py"], | |
| env=env, | |
| cwd=cwd, | |
| capture_output=True, | |
| text=True | |
| ) | |
| if result3.returncode == 0: | |
| # Parse the output file path from stdout | |
| output_file_path = None | |
| if result3.stdout: | |
| for line in result3.stdout.strip().split('\n'): | |
| if line.startswith('Output file:'): | |
| output_file_path = line.split('Output file:', 1)[1].strip() | |
| break | |
| # Fallback: check for output files in data/output directory | |
| if not output_file_path or not os.path.exists(output_file_path): | |
| output_dir = "data/output" | |
| os.makedirs(output_dir, exist_ok=True) | |
| output_files = [] | |
| if os.path.exists(output_dir): | |
| output_files = [f for f in os.listdir(output_dir) if f.endswith('.xlsx') and f.startswith('balance_sheet')] | |
| # Sort by modification time, get the most recent | |
| if output_files: | |
| output_files.sort(key=lambda x: os.path.getmtime(os.path.join(output_dir, x)), reverse=True) | |
| output_file_path = os.path.join(output_dir, output_files[0]) | |
| if output_file_path and os.path.exists(output_file_path): | |
| execution_time = round(time.time() - start_time, 2) | |
| return { | |
| "status": "success", | |
| "message": "Balance sheet generated successfully", | |
| "output_path": output_file_path, | |
| "execution_id": execution_id, | |
| "execution_time": execution_time | |
| } | |
| else: | |
| execution_time = round(time.time() - start_time, 2) | |
| return { | |
| "status": "error", | |
| "error": "Balance sheet generation completed but output file not found", | |
| "execution_id": execution_id, | |
| "execution_time": execution_time | |
| } | |
| else: | |
| execution_time = round(time.time() - start_time, 2) | |
| return { | |
| "status": "error", | |
| "error": f"Balance sheet generation failed: {result3.stderr}", | |
| "execution_id": execution_id | |
| } | |
| except Exception as e: | |
| execution_time = round(time.time() - start_time, 2) | |
| return { | |
| "status": "error", | |
| "error": str(e), | |
| "execution_id": execution_id, | |
| "execution_time": execution_time | |
| } | |
| def generate_pnl_statement(file_path: str, **kwargs) -> Dict[str, Any]: | |
| """ | |
| Generate P&L statement from trial balance file using complete pipeline | |
| Args: | |
| file_path: Path to trial balance Excel file | |
| """ | |
| execution_id = str(uuid.uuid4())[:8] | |
| start_time = time.time() | |
| tool_name = "generate_pnl_statement" | |
| try: | |
| # Use the complete P&L pipeline from existing endpoint | |
| env = os.environ.copy() | |
| env["INPUT_FILE"] = "data/clean_financial_data_pnl.json" | |
| cwd = os.getcwd() | |
| # Step 1: Run profit_loss_data_extractor.py | |
| result1 = subprocess.run( | |
| ["python", "pnl/profit_loss_data_extractor.py", file_path], | |
| env=env, | |
| cwd=cwd, | |
| capture_output=True, | |
| text=True | |
| ) | |
| if result1.returncode != 0: | |
| return {"status": "error", "error": f"P&L data extraction failed: {result1.stderr}"} | |
| # Step 2: Run profit_loss_csv_to_json_converter.py | |
| result2 = subprocess.run( | |
| ["python", "pnl/profit_loss_csv_to_json_converter.py"], | |
| env=env, | |
| cwd=cwd, | |
| capture_output=True, | |
| text=True | |
| ) | |
| if result2.returncode != 0: | |
| return {"status": "error", "error": f"P&L CSV to JSON conversion failed: {result2.stderr}"} | |
| # Step 3: Run profit_loss_statement_generator.py | |
| result3 = subprocess.run( | |
| ["python", "pnl/profit_loss_statement_generator.py", "data/clean_financial_data_pnl.json"], | |
| env=env, | |
| cwd=cwd, | |
| capture_output=True, | |
| text=True | |
| ) | |
| if result3.returncode == 0: | |
| execution_time = round(time.time() - start_time, 2) | |
| output_path = "data/pnl_statement.xlsx" | |
| return { | |
| "status": "success", | |
| "message": "P&L statement generated successfully", | |
| "output_path": output_path, | |
| "execution_id": execution_id, | |
| "execution_time": execution_time | |
| } | |
| else: | |
| execution_time = round(time.time() - start_time, 2) | |
| return { | |
| "status": "error", | |
| "error": f"P&L generation failed: {result3.stderr}", | |
| "execution_id": execution_id | |
| } | |
| except Exception as e: | |
| execution_time = round(time.time() - start_time, 2) | |
| return { | |
| "status": "error", | |
| "error": str(e), | |
| "execution_id": execution_id, | |
| "execution_time": execution_time | |
| } | |
| def generate_cash_flow_statement(file_path: str, **kwargs) -> Dict[str, Any]: | |
| """ | |
| Generate cash flow statement from trial balance file using complete pipeline | |
| Args: | |
| file_path: Path to trial balance Excel file | |
| """ | |
| execution_id = str(uuid.uuid4())[:8] | |
| start_time = time.time() | |
| tool_name = "generate_cash_flow_statement" | |
| try: | |
| # Use the complete CF pipeline from existing endpoint | |
| env = os.environ.copy() | |
| env["INPUT_FILE"] = "data/clean_financial_data_cfs.json" | |
| env["CFS_JSON_INPUT"] = "data/clean_financial_data_cfs.json" | |
| env["CFS_JSON_OUTPUT"] = "data/extracted_cfs_data.json" | |
| env["CFS_EXTRACTED_FILE"] = "data/extracted_cfs_data.json" | |
| env["CFS_OUTPUT_FILE"] = "data/cash_flow_statements.xlsx" | |
| cwd = os.getcwd() | |
| # Step 1: Run cash_flow_data_extractor.py | |
| result1 = subprocess.run( | |
| ["python", "cf/cash_flow_data_extractor.py", file_path], | |
| env=env, | |
| cwd=cwd, | |
| capture_output=True, | |
| text=True | |
| ) | |
| if result1.returncode != 0: | |
| return {"status": "error", "error": f"Cash flow data extraction failed: {result1.stderr}"} | |
| # Step 2: Run cash_flow_csv_to_json_converter.py | |
| result2 = subprocess.run( | |
| ["python", "cf/cash_flow_csv_to_json_converter.py"], | |
| env=env, | |
| cwd=cwd, | |
| capture_output=True, | |
| text=True | |
| ) | |
| if result2.returncode != 0: | |
| return {"status": "error", "error": f"Cash flow CSV to JSON conversion failed: {result2.stderr}"} | |
| # Step 3: Run cash_flow_data_processor.py | |
| result3 = subprocess.run( | |
| ["python", "cf/cash_flow_data_processor.py"], | |
| env=env, | |
| cwd=cwd, | |
| capture_output=True, | |
| text=True | |
| ) | |
| if result3.returncode != 0: | |
| return {"status": "error", "error": f"Cash flow data processing failed: {result3.stderr}"} | |
| # Step 4: Run cash_flow_statement_generator.py | |
| result4 = subprocess.run( | |
| ["python", "cf/cash_flow_statement_generator.py"], | |
| env=env, | |
| cwd=cwd, | |
| capture_output=True, | |
| text=True | |
| ) | |
| if result4.returncode == 0: | |
| execution_time = round(time.time() - start_time, 2) | |
| output_path = "data/cash_flow_statements.xlsx" | |
| return { | |
| "status": "success", | |
| "message": "Cash flow statement generated successfully", | |
| "output_path": output_path, | |
| "execution_id": execution_id, | |
| "execution_time": execution_time | |
| } | |
| else: | |
| execution_time = round(time.time() - start_time, 2) | |
| return { | |
| "status": "error", | |
| "error": f"Cash flow generation failed: {result4.stderr}", | |
| "execution_id": execution_id | |
| } | |
| except Exception as e: | |
| execution_time = round(time.time() - start_time, 2) | |
| return { | |
| "status": "error", | |
| "error": str(e), | |
| "execution_id": execution_id, | |
| "execution_time": execution_time | |
| } | |
| def generate_llm_notes(file_path: str, note_numbers: str = "", user_api_key: str = None, **kwargs) -> Dict[str, Any]: | |
| """ | |
| Generate notes using LLM-based approach (FlexibleFinancialNoteGenerator) | |
| Args: | |
| file_path: Path to trial balance Excel file | |
| note_numbers: Optional comma-separated note numbers to generate | |
| user_api_key: OpenRouter API key for LLM calls | |
| Returns: | |
| dict with status, output_xlsx_path, and error if any | |
| """ | |
| execution_id = str(uuid.uuid4())[:8] | |
| start_time = time.time() | |
| try: | |
| # Use the complete LLM notes pipeline from existing scripts | |
| env = os.environ.copy() | |
| # Get user-provided API key from parameter | |
| if user_api_key: | |
| env["OPENROUTER_API_KEY"] = user_api_key | |
| elif os.getenv("OPENROUTER_API_KEY"): | |
| env["OPENROUTER_API_KEY"] = os.getenv("OPENROUTER_API_KEY") | |
| cwd = os.getcwd() | |
| # Step 1: Run LLM notes data processor | |
| logger.info("Step 1: Processing trial balance data") | |
| result1 = subprocess.run( | |
| ["python", "notes/llm_notes_data_processor.py", file_path], | |
| env=env, | |
| cwd=cwd, | |
| capture_output=True, | |
| text=True, | |
| timeout=120 # ADD THIS: 2 minute timeout | |
| ) | |
| if result1.returncode != 0: | |
| return {"status": "error", "error": f"LLM notes data processing failed: {result1.stderr}"} | |
| # Step 2: Run LLM notes generator | |
| logger.info("Step 2: Generating notes using LLM") | |
| if note_numbers: | |
| result2 = subprocess.run( | |
| ["python", "notes/llm_notes_generator.py", "specific", note_numbers], | |
| env=env, | |
| cwd=cwd, | |
| capture_output=True, | |
| text=True, | |
| timeout=600 # ADD THIS: 10 minute timeout for LLM generation | |
| ) | |
| else: | |
| result2 = subprocess.run( | |
| ["python", "notes/llm_notes_generator.py", "all", ""], | |
| env=env, | |
| cwd=cwd, | |
| capture_output=True, | |
| text=True, | |
| timeout=900 # ADD THIS: 15 minute timeout for all notes | |
| ) | |
| if result2.returncode != 0: | |
| return {"status": "error", "error": f"LLM notes generation failed: {result2.stderr}"} | |
| # Step 3: Convert to Excel | |
| logger.info("Step 3: Converting to Excel format") | |
| input_json = "data/generated_notes/notes.json" | |
| # Create unique output path in llm_generated folder | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| output_folder = "data/notes_llm_generated" | |
| os.makedirs(output_folder, exist_ok=True) | |
| output_excel = f"{output_folder}/new_{timestamp}_{execution_id}.xlsx" | |
| # Check if the JSON file was created and has content | |
| if not os.path.exists(input_json): | |
| execution_time = round(time.time() - start_time, 2) | |
| return { | |
| "status": "error", | |
| "error": "No notes JSON file was generated - LLM may have failed to produce any notes", | |
| "execution_id": execution_id, | |
| "execution_time": execution_time | |
| } | |
| # Apply UDFs if provided in kwargs | |
| feedback_context = kwargs.get('feedback_context', {}) | |
| udfs_to_apply = feedback_context.get('udfs', []) | |
| if udfs_to_apply: | |
| try: | |
| # Load JSON data | |
| with open(input_json, 'r', encoding='utf-8') as f: | |
| notes_data = json.load(f) | |
| # Apply each UDF | |
| for udf_code in udfs_to_apply: | |
| try: | |
| local_vars = {} | |
| exec(udf_code, {"datetime": datetime}, local_vars) | |
| # Find the UDF function | |
| udf_func = None | |
| for var_name, var_value in local_vars.items(): | |
| if callable(var_value) and var_name.startswith('apply_user_feedback'): | |
| udf_func = var_value | |
| break | |
| if udf_func: | |
| notes_data = udf_func(notes_data, feedback_context.get('feedback_type', 'general')) | |
| logger.info(f"Applied UDF successfully") | |
| except Exception as e: | |
| logger.warning(f"Failed to apply UDF: {e}") | |
| continue | |
| # Save modified JSON back | |
| with open(input_json, 'w', encoding='utf-8') as f: | |
| json.dump(notes_data, f, ensure_ascii=False, indent=2) | |
| except Exception as e: | |
| logger.error(f"Error applying UDFs to JSON: {e}") | |
| result3 = subprocess.run( | |
| ["python", "notes/llm_notes_excel_converter.py", input_json, output_excel], | |
| env=env, | |
| cwd=cwd, | |
| capture_output=True, | |
| text=True, | |
| timeout=120 # ADD THIS: 2 minute timeout | |
| ) | |
| if result3.returncode == 0: | |
| execution_time = round(time.time() - start_time, 2) | |
| return { | |
| "status": "success", | |
| "message": "LLM-based notes generated successfully", | |
| "output_xlsx_path": output_excel, | |
| "execution_id": execution_id, | |
| "execution_time": execution_time | |
| } | |
| else: | |
| execution_time = round(time.time() - start_time, 2) | |
| return { | |
| "status": "error", | |
| "error": f"Excel conversion failed: {result3.stderr}", | |
| "execution_id": execution_id, | |
| "execution_time": execution_time | |
| } | |
| except subprocess.TimeoutExpired as te: # ADD THIS: Handle timeout | |
| execution_time = round(time.time() - start_time, 2) | |
| logger.error(f"LLM notes generation timed out after {execution_time}s") | |
| return { | |
| "status": "error", | |
| "error": f"LLM notes generation timed out. The process took longer than expected. Try generating fewer notes or check your API key.", | |
| "execution_id": execution_id, | |
| "execution_time": execution_time, | |
| "timeout": True | |
| } | |
| except Exception as e: | |
| execution_time = round(time.time() - start_time, 2) | |
| logger.error(f"LLM notes generation failed: {e}") | |
| return { | |
| "status": "error", | |
| "error": f"LLM notes generation failed: {e}", | |
| "execution_id": execution_id, | |
| "execution_time": execution_time | |
| } | |
| # Simplified tool list - only financial statement generation | |
| FINANCIAL_TOOLS = [ | |
| generate_notes_full_pipeline_from_path, | |
| generate_balance_sheet, | |
| generate_pnl_statement, | |
| generate_cash_flow_statement, | |
| generate_llm_notes | |
| ] | |