Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, APIRouter, UploadFile, File, Form, HTTPException | |
| from fastapi.responses import JSONResponse, PlainTextResponse, FileResponse | |
| from typing import Optional, Dict, Any | |
| import pandas as pd | |
| import os | |
| import shutil | |
| import json | |
| import subprocess | |
| import logging | |
| # Import utilities and logic from modular files | |
| from utils.utils import clean_value | |
| from notes.data_extraction import extract_trial_balance_data, analyze_and_save_results | |
| from notes.llm_notes_generator import FlexibleFinancialNoteGenerator | |
| from notes.notes_generator import process_json | |
| from notes.json_to_excel import json_to_xlsx | |
| from utils.utils_normalize import normalize_llm_note_json, normalize_llm_notes_json | |
| # Configure logging for the application | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("financial_notes_api") | |
| app = FastAPI( | |
| title="Financial Notes Generator API", | |
| description="API for generating financial notes, balance sheets, cash flow statements, and P&L reports.", | |
| version="1.0.0" | |
| ) | |
| async def startup_event(): | |
| logger.info("Financial Notes Generator API has started.") | |
| async def shutdown_event(): | |
| logger.info("Financial Notes Generator API is shutting down.") | |
| router = APIRouter() | |
| def process_uploaded_file(file: UploadFile) -> pd.DataFrame: | |
| os.makedirs("data/input", exist_ok=True) | |
| file_location = f"data/input/{file.filename}" | |
| with open(file_location, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| structured_data = extract_trial_balance_data(file_location) | |
| output_file = "data/output1/parsed_trial_balance.json" | |
| analyze_and_save_results(structured_data, output_file) | |
| with open(output_file, "r", encoding="utf-8") as f: | |
| parsed_data = json.load(f) | |
| tb_df = pd.DataFrame(parsed_data if isinstance(parsed_data, list) else parsed_data.get("trial_balance", parsed_data)) | |
| tb_df['amount'] = tb_df['amount'].apply(clean_value) | |
| return tb_df | |
| async def llm_generate_and_excel( | |
| file: UploadFile = File(...), | |
| note_number: Optional[str] = Form(None) | |
| ): | |
| os.makedirs("data/input", exist_ok=True) | |
| file_location = f"data/input/{file.filename}" | |
| with open(file_location, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| structured_data = extract_trial_balance_data(file_location) | |
| output_json = "data/output1/parsed_trial_balance.json" | |
| analyze_and_save_results(structured_data, output_json) | |
| try: | |
| generator = FlexibleFinancialNoteGenerator() | |
| except Exception as e: | |
| logger.error(f"Generator init failed: {e}") | |
| raise HTTPException(status_code=500, detail=f"Generator init failed: {e}") | |
| os.makedirs("data/generated_notes_excel", exist_ok=True) | |
| wrapped_json_path = "data/generated_notes/notes_wrapped.json" | |
| if note_number: | |
| note_numbers = [n.strip() for n in note_number.split(",")] | |
| all_notes = [] | |
| for n in note_numbers: | |
| success = generator.generate_note(n, trial_balance_path=output_json) | |
| if success: | |
| with open("data/generated_notes/notes.json", "r", encoding="utf-8") as f: | |
| note_json = json.load(f) | |
| all_notes.append(note_json) | |
| with open("data/generated_notes/notes.json", "w", encoding="utf-8") as f: | |
| json.dump({"notes": all_notes}, f, indent=2, ensure_ascii=False) | |
| wrapped = normalize_llm_notes_json({"notes": all_notes}) | |
| with open(wrapped_json_path, "w", encoding="utf-8") as f2: | |
| json.dump(wrapped, f2, ensure_ascii=False, indent=2) | |
| excel_path = "data/generated_notes_excel/notes.xlsx" | |
| json_to_xlsx(wrapped_json_path, excel_path) | |
| else: | |
| results = generator.generate_all_notes(trial_balance_path=output_json) | |
| if not any(results.values()): | |
| logger.error("Failed to generate any notes. LLM API may be down or unreachable.") | |
| raise HTTPException(status_code=500, detail="Failed to generate any notes. LLM API may be down or unreachable.") | |
| with open("data/generated_notes/notes.json", "r", encoding="utf-8") as f: | |
| notes_json = json.load(f) | |
| wrapped = normalize_llm_notes_json(notes_json) | |
| with open(wrapped_json_path, "w", encoding="utf-8") as f2: | |
| json.dump(wrapped, f2, ensure_ascii=False, indent=2) | |
| excel_path = "data/generated_notes_excel/notes.xlsx" | |
| json_to_xlsx(wrapped_json_path, excel_path) | |
| return FileResponse( | |
| excel_path, | |
| filename=os.path.basename(excel_path), | |
| media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" | |
| ) | |
| async def run_full_pipeline( | |
| file: UploadFile = File(...), | |
| note_number: Optional[str] = Form(None) | |
| ): | |
| os.makedirs("data/input", exist_ok=True) | |
| file_location = f"data/input/{file.filename}" | |
| with open(file_location, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| os.makedirs("data/output1", exist_ok=True) | |
| structured_data = extract_trial_balance_data(file_location) | |
| output1_json = "data/output1/parsed_trial_balance.json" | |
| analyze_and_save_results(structured_data, output1_json) | |
| os.makedirs("data/output2", exist_ok=True) | |
| try: | |
| process_json(output1_json) | |
| except ImportError: | |
| logger.error("main16_23.process_json not found. Please ensure 'app/main16_23.py' exists and is named correctly.") | |
| raise HTTPException(status_code=500, detail="main16_23.process_json not found. Please ensure 'app/main16_23.py' exists and is named correctly.") | |
| except Exception as e: | |
| logger.error(f"main16_23.process_json failed: {e}") | |
| raise HTTPException(status_code=500, detail=f"main16_23.process_json failed: {e}") | |
| notes_json = "data/output2/notes_output.json" | |
| with open(notes_json, "r", encoding="utf-8") as f: | |
| notes_data = json.load(f) | |
| if isinstance(notes_data, dict): | |
| for key in ["notes", "trial_balance"]: | |
| if key in notes_data: | |
| notes_data = notes_data[key] | |
| break | |
| def wrap_notes(notes): | |
| return {"notes": notes} | |
| if note_number: | |
| numbers = [n.strip() for n in note_number.split(",")] | |
| notes_data = [ | |
| note for note in notes_data | |
| if str(note.get('note_number', '')).strip() in numbers | |
| ] | |
| filtered_json = "data/output2/notes_output_filtered.json" | |
| with open(filtered_json, "w", encoding="utf-8") as f2: | |
| json.dump(wrap_notes(notes_data), f2, ensure_ascii=False, indent=2) | |
| json_input_for_excel = filtered_json | |
| else: | |
| temp_json = "data/output2/notes_output_wrapped.json" | |
| with open(temp_json, "w", encoding="utf-8") as f2: | |
| json.dump(wrap_notes(notes_data), f2, ensure_ascii=False, indent=2) | |
| json_input_for_excel = temp_json | |
| os.makedirs("data/output3", exist_ok=True) | |
| try: | |
| output3_xlsx = "data/output3/final_output.xlsx" | |
| json_to_xlsx(json_input_for_excel, output3_xlsx) | |
| except ImportError: | |
| logger.error("json_xlsx.json_to_xlsx not found") | |
| raise HTTPException(status_code=500, detail="json_xlsx.json_to_xlsx not found") | |
| except Exception as e: | |
| logger.error(f"json_xlsx.json_to_xlsx failed: {e}") | |
| raise HTTPException(status_code=500, detail=f"json_xlsx.json_to_xlsx failed: {e}") | |
| return FileResponse( | |
| output3_xlsx, | |
| filename=os.path.basename(output3_xlsx), | |
| media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" | |
| ) | |
| def run_subprocess( | |
| script_path: str, | |
| args: list, | |
| env: Dict[str, str], | |
| cwd: str | |
| ) -> subprocess.CompletedProcess: | |
| try: | |
| logger.info(f"Running {script_path} with args {args} in {cwd}") | |
| result = subprocess.run( | |
| ["python", script_path] + args, | |
| capture_output=True, | |
| text=True, | |
| check=True, | |
| env=env, | |
| cwd=cwd | |
| ) | |
| logger.debug(f"{script_path} STDOUT:\n{result.stdout}") | |
| logger.debug(f"{script_path} STDERR:\n{result.stderr}") | |
| return result | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"{script_path} failed: {e}") | |
| logger.error(f"STDOUT: {e.stdout}") | |
| logger.error(f"STDERR: {e.stderr}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"{script_path} failed: {e}\nSTDOUT:\n{e.stdout}\nSTDERR:\n{e.stderr}" | |
| ) | |
| def extract_output_file(stdout: str, keyword: str = "Output file:") -> Optional[str]: | |
| for line in stdout.splitlines(): | |
| if keyword in line: | |
| return line.split(keyword)[-1].strip() | |
| return None | |
| async def bs_from_notes(file: UploadFile = File(...)): | |
| os.makedirs("data/input", exist_ok=True) | |
| input_excel_path = os.path.join("data/input", file.filename) | |
| with open(input_excel_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| logger.info(f"Uploaded Excel saved to: {input_excel_path}") | |
| logger.info(f"Files in data/input/: {os.listdir('data/input')}") | |
| env = os.environ.copy() | |
| if os.getenv("OPENROUTER_API_KEY"): | |
| env["OPENROUTER_API_KEY"] = os.getenv("OPENROUTER_API_KEY") | |
| env["INPUT_FILE"] = "data/clean_financial_data_bs.json" | |
| cwd = os.getenv("PROJECT_ROOT", os.getcwd()) | |
| # Run sircodebs.py | |
| run_subprocess("bs/sircodebs.py", [input_excel_path], env, cwd) | |
| logger.info(f"Files in data/csv_notes_bs/: {os.listdir('data/csv_notes_bs') if os.path.exists('data/csv_notes_bs') else 'data/csv_notes_bs does not exist'}") | |
| # Run csv_json_bs.py | |
| run_subprocess("bs/csv_json_bs.py", [], env, cwd) | |
| logger.info(f"data/clean_financial_data_bs.json exists: {os.path.exists('data/clean_financial_data_bs.json')}") | |
| # Run bl_llm.py | |
| result = run_subprocess("bs/bl_llm.py", [], env, cwd) | |
| output_file = extract_output_file(result.stdout) | |
| if output_file and not os.path.isabs(output_file): | |
| output_file_path = os.path.join(cwd, output_file) | |
| else: | |
| output_file_path = output_file | |
| if not output_file or not os.path.exists(output_file_path): | |
| debug_msg = f"\nSTDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}" | |
| logger.error(f"Could not determine output file from bl_llm.py output.{debug_msg}") | |
| raise HTTPException(status_code=500, detail=f"Could not determine output file from bl_llm.py output.{debug_msg}") | |
| logger.info(f"Pipeline completed. Output file: {output_file_path}") | |
| return FileResponse( | |
| output_file_path, | |
| filename=os.path.basename(output_file_path), | |
| media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" | |
| ) | |
| async def pnl_from_notes(file: UploadFile = File(...)): | |
| os.makedirs("data/input", exist_ok=True) | |
| input_excel_path = os.path.join("data/input", file.filename) | |
| with open(input_excel_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| logger.info(f"Uploaded Excel saved to: {input_excel_path}") | |
| logger.info(f"Files in data/input/: {os.listdir('data/input')}") | |
| env = os.environ.copy() | |
| if os.getenv("OPENROUTER_API_KEY"): | |
| env["OPENROUTER_API_KEY"] = os.getenv("OPENROUTER_API_KEY") | |
| env["INPUT_FILE"] = "data/clean_financial_data_pnl.json" | |
| cwd = os.getenv("PROJECT_ROOT", os.getcwd()) | |
| # Run sircodepnl.py | |
| run_subprocess("pnl/sircodepnl.py", [input_excel_path], env, cwd) | |
| csv_notes_pnl_path = os.path.join(cwd, 'data/csv_notes_pnl') | |
| logger.info(f"Files in {csv_notes_pnl_path}/: {os.listdir(csv_notes_pnl_path) if os.path.exists(csv_notes_pnl_path) else f'{csv_notes_pnl_path} does not exist'}") | |
| # Run csv_json_pnl.py | |
| run_subprocess("pnl/csv_json_pnl.py", [], env, cwd) | |
| json_path = os.path.join(cwd, 'data/clean_financial_data_pnl.json') | |
| logger.info(f"data/clean_financial_data_pnl.json exists: {os.path.exists(json_path)}") | |
| # Run pnl_note.py | |
| run_subprocess("pnl/pnl_note.py", [], env, cwd) | |
| # Use fixed output file path | |
| output_file_path = os.path.join(cwd, "data/pnl_statement.xlsx") | |
| if not os.path.exists(output_file_path): | |
| logger.error(f"Could not find expected output file for P&L statement: {output_file_path}") | |
| raise HTTPException(status_code=500, detail=f"Could not find expected output file for P&L statement: {output_file_path}") | |
| logger.info(f"Pipeline completed. Output file: {output_file_path}") | |
| return FileResponse( | |
| output_file_path, | |
| filename=os.path.basename(output_file_path), | |
| media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" | |
| ) | |
| async def cf_from_notes(file: UploadFile = File(...)): | |
| os.makedirs("data/input", exist_ok=True) | |
| input_excel_path = os.path.join("data/input", file.filename) | |
| with open(input_excel_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| logger.info(f"Uploaded Excel saved to: {input_excel_path}") | |
| logger.info(f"Files in data/input/: {os.listdir('data/input')}") | |
| env = os.environ.copy() | |
| cwd = os.getenv("PROJECT_ROOT", os.getcwd()) | |
| # Step 1: Run sircodecf.py | |
| run_subprocess("cf/sircodecf.py", [input_excel_path], env, cwd) | |
| csv_notes_cfs_path = os.path.join(cwd, 'data/csv_notes_cfs') | |
| logger.info(f"Files in {csv_notes_cfs_path}/: {os.listdir(csv_notes_cfs_path) if os.path.exists(csv_notes_cfs_path) else f'{csv_notes_cfs_path} does not exist'}") | |
| # Step 2: Run csv_json_cf.py | |
| run_subprocess("cf/csv_json_cf.py", [], env, cwd) | |
| json_path = os.path.join(cwd, 'data/clean_financial_data_cfs.json') | |
| logger.info(f"data/clean_financial_data_cfs.json exists: {os.path.exists(json_path)}") | |
| # Step 3: Run cf_middlestep.py | |
| run_subprocess("cf/cf_middlestep.py", [], env, cwd) | |
| extracted_json_path = os.path.join(cwd, 'data/extracted_cfs_data.json') | |
| logger.info(f"data/extracted_cfs_data.json exists: {os.path.exists(extracted_json_path)}") | |
| # Step 4: Run cf_generation.py | |
| result = run_subprocess("cf/cf_generation.py", [], env, cwd) | |
| output_file = "data/cash_flow_statements.xlsx" | |
| output_file_path = os.path.join(cwd, output_file) | |
| if not os.path.exists(output_file_path): | |
| output_file_path = os.path.join(cwd, "data/cash_flow_statements.xlsx") | |
| if not os.path.exists(output_file_path): | |
| debug_msg = f"\nSTDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}" | |
| logger.error(f"Could not determine output file from cf_generation.py output.{debug_msg}") | |
| raise HTTPException(status_code=500, detail=f"Could not determine output file from cf_generation.py output.{debug_msg}") | |
| logger.info(f"Pipeline completed. Output file: {output_file_path}") | |
| return FileResponse( | |
| output_file_path, | |
| filename=os.path.basename(output_file_path), | |
| media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" | |
| ) | |
| app.include_router(router) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |