finryver-dev / app.py
Sahil Garg
udf added to /notes-llm alongwith RLHF
6611563
raw
history blame
16.3 kB
from fastapi import FastAPI, APIRouter, UploadFile, File, HTTPException, Query, Form
from fastapi.responses import FileResponse
from typing import Optional
import os
import shutil
import logging
import json
from dotenv import load_dotenv
from agents.simple_tools import generate_notes_full_pipeline_from_path
from agents.generator_validator import create_notes_pipeline, InteractiveFeedbackManager
from agents.langgraph import run_workflow
from agents.rlhf_workflows import run_rlhf_workflow
from agents.rlhf_routes import rlhf_router
# Load environment variables from .env file
load_dotenv()
# Configure logging for the application
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("financial_notes_api")
app = FastAPI(
title="Financial Notes Generator API",
description="API for generating financial notes, balance sheets, cash flow statements, and P&L reports with RLHF capabilities and Interactive Feedback.",
version="1.0.0"
)
# Initialize Interactive Feedback Manager
feedback_manager = InteractiveFeedbackManager()
# Include RLHF routes
app.include_router(rlhf_router)
@app.on_event("startup")
async def startup_event():
logger.info("Financial Notes Generator API has started.")
@app.on_event("shutdown")
async def shutdown_event():
logger.info("Financial Notes Generator API is shutting down.")
router = APIRouter()
@router.post("/notes-llm")
async def notes_llm_route(file: UploadFile = File(...), use_rlhf: bool = Query(False)):
"""Generate AI-powered financial notes using Generator-Validator pattern with Interactive Feedback"""
file_path = f"data/input/{file.filename}"
os.makedirs("data/input", exist_ok=True)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
try:
# Create Generator-Validator pipeline for LLM notes
pipeline = create_notes_pipeline(use_rlhf=use_rlhf)
# Process through pipeline
generation_result, validation_result = pipeline.process(file_path)
# Log processing summary
summary = pipeline.get_processing_summary()
logger.info(f"LLM Notes Pipeline Summary: {summary}")
if generation_result.success and validation_result.is_valid:
# Create interactive feedback session
session_id = feedback_manager.create_session(file_path)
response = FileResponse(
generation_result.output_path,
filename=os.path.basename(generation_result.output_path)
)
# Add comprehensive metadata to headers
response.headers["X-Generation-Method"] = "llm"
response.headers["X-Validation-Score"] = str(validation_result.score)
response.headers["X-Attempts-Made"] = str(generation_result.metadata.get("attempt", 1))
response.headers["X-Execution-ID"] = generation_result.metadata.get("execution_id", "")
response.headers["X-Session-ID"] = session_id # New: Interactive session ID
response.headers["X-Interactive-Enabled"] = "true" # New: Indicates interactive mode available
# Add RLHF metadata if available
if use_rlhf and "rlhf_metadata" in generation_result.metadata:
rlhf_data = generation_result.metadata["rlhf_metadata"]
response.headers["X-RLHF-Statement-ID"] = str(rlhf_data.get("statement_id", ""))
response.headers["X-RLHF-Quality-Score"] = str(rlhf_data.get("predicted_quality", ""))
response.headers["X-RLHF-Confidence"] = str(rlhf_data.get("confidence_score", ""))
# Add validation feedback
if validation_result.feedback:
response.headers["X-Validation-Feedback"] = json.dumps(validation_result.feedback)
return response
else:
error_detail = {
"generation_error": generation_result.error,
"validation_feedback": validation_result.feedback,
"validation_score": validation_result.score,
"attempts_made": generation_result.metadata.get("attempt", 1),
"processing_summary": summary
}
raise HTTPException(status_code=500, detail=json.dumps(error_detail))
except Exception as e:
logger.error(f"LLM Notes pipeline failed: {e}")
raise HTTPException(status_code=500, detail=f"Pipeline processing failed: {str(e)}")
@router.post("/notes-llm/feedback")
async def submit_feedback(
session_id: str = Form(...),
feedback_text: str = Form(...),
feedback_type: str = Form(..., regex="^(text|numeric|formula|suggestion)$")
):
"""Submit user feedback for iterative improvement"""
try:
# Add feedback and generate UDF
udf_version = feedback_manager.add_feedback(session_id, feedback_text, feedback_type)
if udf_version is None:
raise HTTPException(status_code=404, detail="Session not found")
return {
"status": "success",
"session_id": session_id,
"udf_version": udf_version,
"iteration": feedback_manager.get_session(session_id).current_iteration,
"message": "Feedback submitted and UDF generated successfully"
}
except Exception as e:
logger.error(f"Feedback submission failed: {e}")
raise HTTPException(status_code=500, detail=f"Feedback submission failed: {str(e)}")
@router.post("/notes-llm/approve")
async def approve_session(session_id: str = Form(...)):
"""Approve the current session and set final UDF"""
try:
success = feedback_manager.approve_session(session_id)
if not success:
raise HTTPException(status_code=404, detail="Session not found")
session = feedback_manager.get_session(session_id)
return {
"status": "approved",
"session_id": session_id,
"final_udf": session.final_udf,
"total_iterations": session.current_iteration,
"archived_udfs_count": len(session.archived_udfs),
"message": "Session approved and final UDF set"
}
except Exception as e:
logger.error(f"Session approval failed: {e}")
raise HTTPException(status_code=500, detail=f"Session approval failed: {str(e)}")
@router.get("/notes-llm/session/{session_id}")
async def get_session_info(session_id: str):
"""Get session information and feedback history"""
try:
session = feedback_manager.get_session(session_id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
return {
"session_id": session.session_id,
"status": session.status,
"current_iteration": session.current_iteration,
"total_feedbacks": len(session.feedback_history),
"archived_udfs_count": len(session.archived_udfs),
"final_udf": session.final_udf,
"created_at": session.created_at.isoformat(),
"last_updated": session.last_updated.isoformat(),
"feedback_history": [
{
"iteration": f.iteration_number,
"feedback_type": f.feedback_type,
"feedback_text": f.feedback_text,
"udf_version": f.udf_version,
"timestamp": f.timestamp.isoformat(),
"changes_description": f.changes_description
} for f in session.feedback_history
]
}
except Exception as e:
logger.error(f"Session info retrieval failed: {e}")
raise HTTPException(status_code=500, detail=f"Session info retrieval failed: {str(e)}")
@router.post("/notes-llm/generate")
async def generate_with_feedback(
session_id: str = Form(...),
file: UploadFile = File(...)
):
"""Generate notes using accumulated feedback and UDFs"""
try:
session = feedback_manager.get_session(session_id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
if session.status != 'active':
raise HTTPException(status_code=400, detail=f"Session is {session.status}")
# Save uploaded file
file_path = f"data/input/{file.filename}"
os.makedirs("data/input", exist_ok=True)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Create pipeline with feedback integration
pipeline = create_notes_pipeline(use_rlhf=False)
# Prepare feedback context for the generator
feedback_context = {
'session_id': session_id,
'udfs': session.archived_udfs, # Pass all archived UDFs
'feedback_history': [
{
'text': f.feedback_text,
'type': f.feedback_type,
'iteration': f.iteration_number
} for f in session.feedback_history
],
'current_iteration': session.current_iteration
}
# Process through pipeline with feedback context
generation_result, validation_result = pipeline.process(file_path, feedback_context=feedback_context)
if generation_result.success and validation_result.is_valid:
response = FileResponse(
generation_result.output_path,
filename=os.path.basename(generation_result.output_path)
)
# Add session and feedback metadata
response.headers["X-Session-ID"] = session_id
response.headers["X-Iteration"] = str(session.current_iteration)
response.headers["X-Feedbacks-Applied"] = str(len(session.feedback_history))
response.headers["X-UDFs-Archived"] = str(len(session.archived_udfs))
# Add generation metadata
response.headers["X-Generation-Method"] = "llm_with_feedback"
response.headers["X-Validation-Score"] = str(validation_result.score)
response.headers["X-Execution-ID"] = generation_result.metadata.get("execution_id", "")
return response
else:
error_detail = {
"generation_error": generation_result.error,
"validation_feedback": validation_result.feedback,
"validation_score": validation_result.score,
"session_id": session_id,
"current_iteration": session.current_iteration
}
raise HTTPException(status_code=500, detail=json.dumps(error_detail))
except HTTPException:
raise
except Exception as e:
logger.error(f"Feedback-based generation failed: {e}")
raise HTTPException(status_code=500, detail=f"Generation failed: {str(e)}")
@router.post("/notes")
async def notes_route(file: UploadFile = File(...)):
"""Generate financial notes directly from uploaded file"""
try:
# Save uploaded file
file_path = f"data/input/{file.filename}"
os.makedirs("data/input", exist_ok=True)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Generate notes directly
result = generate_notes_full_pipeline_from_path(file_path)
if result["status"] == "success":
# Return the generated Excel file
output_path = result["output_xlsx_path"]
return FileResponse(output_path, filename=os.path.basename(output_path))
# If generation failed, raise HTTP exception
raise HTTPException(status_code=500, detail=result.get("error", "Notes generation failed"))
except Exception as e:
logger.error(f"Error in notes generation: {e}")
raise HTTPException(status_code=500, detail=f"Error generating notes: {str(e)}")
@router.post("/pnl")
async def pnl_route(file: UploadFile = File(...), use_rlhf: bool = Query(False)):
file_path = f"data/input/{file.filename}"
os.makedirs("data/input", exist_ok=True)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Choose workflow based on RLHF preference
if use_rlhf:
result = run_rlhf_workflow(file_path, "pnl")
else:
result = run_workflow(file_path, "pnl")
if result["status"] == "success":
response = FileResponse(result["result"].get("output_path", "data/pnl_statement.xlsx"), filename=os.path.basename(result["result"].get("output_path", "data/pnl_statement.xlsx")))
# Add RLHF metadata to headers if available
if "rlhf_metadata" in result.get("result", {}):
rlhf_data = result["result"]["rlhf_metadata"]
response.headers["X-RLHF-Statement-ID"] = str(rlhf_data.get("statement_id", ""))
response.headers["X-RLHF-Quality-Score"] = str(rlhf_data.get("predicted_quality", ""))
response.headers["X-RLHF-Confidence"] = str(rlhf_data.get("confidence_score", ""))
return response
raise HTTPException(status_code=500, detail=result["error"])
@router.post("/bs")
async def bs_route(file: UploadFile = File(...), use_rlhf: bool = Query(False)):
file_path = f"data/input/{file.filename}"
os.makedirs("data/input", exist_ok=True)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Choose workflow based on RLHF preference
if use_rlhf:
result = run_rlhf_workflow(file_path, "bs")
else:
result = run_workflow(file_path, "bs")
if result["status"] == "success":
# Use first xlsx file in output dir if present
output_file = result["result"].get("output_path")
if not output_file or not os.path.isfile(output_file):
# Try to find the first .xlsx file in data/output/ (ensure it's a file, not a directory)
output_dir = "data/output/"
xlsx_files = [f for f in os.listdir(output_dir) if f.endswith('.xlsx') and os.path.isfile(os.path.join(output_dir, f))]
if xlsx_files:
output_file = os.path.join(output_dir, xlsx_files[0])
else:
raise HTTPException(status_code=500, detail="No balance sheet Excel file produced")
response = FileResponse(output_file, filename=os.path.basename(output_file))
# Add RLHF metadata to headers if available
if "rlhf_metadata" in result.get("result", {}):
rlhf_data = result["result"]["rlhf_metadata"]
response.headers["X-RLHF-Statement-ID"] = str(rlhf_data.get("statement_id", ""))
response.headers["X-RLHF-Quality-Score"] = str(rlhf_data.get("predicted_quality", ""))
response.headers["X-RLHF-Confidence"] = str(rlhf_data.get("confidence_score", ""))
return response
else:
raise HTTPException(status_code=500, detail=result["error"])
@router.post("/cf")
async def cf_route(file: UploadFile = File(...), use_rlhf: bool = Query(False)):
file_path = f"data/input/{file.filename}"
os.makedirs("data/input", exist_ok=True)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Choose workflow based on RLHF preference
if use_rlhf:
result = run_rlhf_workflow(file_path, "cf")
else:
result = run_workflow(file_path, "cf")
if result["status"] == "success":
response = FileResponse(result["result"].get("output_path", "data/cash_flow_statements.xlsx"), filename=os.path.basename(result["result"].get("output_path", "data/cash_flow_statements.xlsx")))
# Add RLHF metadata to headers if available
if "rlhf_metadata" in result.get("result", {}):
rlhf_data = result["result"]["rlhf_metadata"]
response.headers["X-RLHF-Statement-ID"] = str(rlhf_data.get("statement_id", ""))
response.headers["X-RLHF-Quality-Score"] = str(rlhf_data.get("predicted_quality", ""))
response.headers["X-RLHF-Confidence"] = str(rlhf_data.get("confidence_score", ""))
return response
raise HTTPException(status_code=500, detail=result["error"])
app.include_router(router)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)