finryver-dev / agents /generator_validator.py
dipan004's picture
Update agents/generator_validator.py (#1)
042bad4 verified
"""
Generator-Validator Pattern Implementation for Financial Notes
Implements formal Generator-Validator pattern with iterative refinement
"""
import os
import json
import logging
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime
import uuid
logger = logging.getLogger(__name__)
@dataclass
class ValidationResult:
"""Result of validation process"""
is_valid: bool
score: float
feedback: List[str]
suggestions: List[str]
metadata: Dict[str, Any]
@dataclass
class GenerationResult:
"""Result of generation process"""
success: bool
output_path: Optional[str]
data: Optional[Dict[str, Any]]
error: Optional[str]
metadata: Dict[str, Any]
@dataclass
class FeedbackData:
"""User feedback for iterative improvement"""
session_id: str
feedback_text: str
feedback_type: str # 'text', 'numeric', 'formula', 'suggestion'
iteration_number: int
timestamp: datetime
changes_description: Optional[str] = None
udf_function: Optional[str] = None
udf_version: Optional[str] = None
@dataclass
class InteractiveSession:
"""Session data for interactive feedback loop"""
session_id: str
original_file_path: str
current_iteration: int
feedback_history: List[FeedbackData]
archived_udfs: List[str]
final_udf: Optional[str]
status: str # 'active', 'approved', 'cancelled'
created_at: datetime
last_updated: datetime
class InteractiveFeedbackManager:
"""Manages interactive feedback sessions and UDF generation"""
def __init__(self, sessions_file: str = "data/interactive_sessions.json"):
self.sessions_file = sessions_file
self.sessions: Dict[str, InteractiveSession] = {}
self._load_sessions()
def _load_sessions(self):
"""Load existing sessions from file"""
if os.path.exists(self.sessions_file):
try:
with open(self.sessions_file, 'r') as f:
data = json.load(f)
for session_id, session_data in data.items():
# Convert datetime strings back to datetime objects
session_data['created_at'] = datetime.fromisoformat(session_data['created_at'])
session_data['last_updated'] = datetime.fromisoformat(session_data['last_updated'])
# Convert feedback history dictionaries back to FeedbackData objects
feedback_objects = []
for feedback_dict in session_data['feedback_history']:
feedback_dict['timestamp'] = datetime.fromisoformat(feedback_dict['timestamp'])
feedback_objects.append(FeedbackData(**feedback_dict))
session_data['feedback_history'] = feedback_objects
self.sessions[session_id] = InteractiveSession(**session_data)
except Exception as e:
logger.error(f"Failed to load sessions: {e}")
self.sessions = {}
def _save_sessions(self):
"""Save sessions to file"""
os.makedirs(os.path.dirname(self.sessions_file), exist_ok=True)
try:
data = {}
for session_id, session in self.sessions.items():
# Handle case where session might be a dict instead of InteractiveSession object
if isinstance(session, dict):
session_dict = session
else:
session_dict = {
'session_id': session.session_id,
'original_file_path': session.original_file_path,
'current_iteration': session.current_iteration,
'feedback_history': [
{
'session_id': f.session_id,
'feedback_text': f.feedback_text,
'feedback_type': f.feedback_type,
'iteration_number': f.iteration_number,
'timestamp': f.timestamp.isoformat(),
'changes_description': f.changes_description,
'udf_function': f.udf_function,
'udf_version': f.udf_version
} for f in session.feedback_history
],
'archived_udfs': session.archived_udfs,
'final_udf': session.final_udf,
'status': session.status,
'created_at': session.created_at.isoformat(),
'last_updated': session.last_updated.isoformat()
}
data[session_id] = session_dict
with open(self.sessions_file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
logger.error(f"Failed to save sessions: {e}")
def create_session(self, file_path: str) -> str:
"""Create a new interactive feedback session"""
session_id = str(uuid.uuid4())
session = InteractiveSession(
session_id=session_id,
original_file_path=file_path,
current_iteration=0,
feedback_history=[],
archived_udfs=[],
final_udf=None,
status='active',
created_at=datetime.now(),
last_updated=datetime.now()
)
self.sessions[session_id] = session
self._save_sessions()
return session_id
def add_feedback(self, session_id: str, feedback_text: str, feedback_type: str) -> Optional[str]:
"""Add user feedback and generate UDF"""
if session_id not in self.sessions:
return None
session = self.sessions[session_id]
session.current_iteration += 1
# Generate UDF based on feedback
udf_function = self._generate_udf_from_feedback(feedback_text, feedback_type, session.current_iteration)
udf_version = f"udf_v{session.current_iteration}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# Create feedback data
feedback_data = FeedbackData(
session_id=session_id,
feedback_text=feedback_text,
feedback_type=feedback_type,
iteration_number=session.current_iteration,
timestamp=datetime.now(),
udf_function=udf_function,
udf_version=udf_version
)
session.feedback_history.append(feedback_data)
session.archived_udfs.append(udf_function)
session.last_updated = datetime.now()
self._save_sessions()
return udf_version
def approve_session(self, session_id: str) -> bool:
"""Approve the current session and set final UDF"""
if session_id not in self.sessions:
return False
session = self.sessions[session_id]
if session.feedback_history:
# Set the last UDF as final
session.final_udf = session.feedback_history[-1].udf_function
session.status = 'approved'
session.last_updated = datetime.now()
self._save_sessions()
return True
def get_session(self, session_id: str) -> Optional[InteractiveSession]:
"""Get session by ID"""
return self.sessions.get(session_id)
def _generate_udf_from_feedback(self, feedback_text: str, feedback_type: str, iteration: int) -> str:
"""Generate UDF function based on user feedback with actual analysis"""
# Analyze feedback content and create meaningful modifications
feedback_lower = feedback_text.lower()
# Determine what modifications to apply based on feedback
apply_detailed_depreciation = 'depreciation' in feedback_lower and 'asset' in feedback_lower
apply_increase_detail = 'detail' in feedback_lower
# Handle different feedback types
if feedback_type == 'formula':
return self._generate_formula_udf(feedback_text, iteration)
elif feedback_type == 'text':
return self._generate_text_udf(feedback_text, iteration)
elif feedback_type == 'suggestion':
return self._generate_suggestion_udf(feedback_text, iteration)
else:
return self._generate_general_udf(feedback_text, feedback_type, iteration)
def _generate_text_udf(self, feedback_text: str, iteration: int) -> str:
"""Generate UDF for text feedback"""
return f'''def apply_user_feedback_v{iteration}(notes_data, feedback_type='text'):
"""
UDF generated from text feedback iteration {iteration}
Original Feedback: {feedback_text}
Generated: {datetime.now().isoformat()}
"""
import re
if notes_data and isinstance(notes_data, dict) and 'notes' in notes_data:
# Extract target note number
feedback_lower = "{feedback_text}".lower()
note_match = re.search(r'note\\s*(\\d+)', feedback_lower)
target_note = note_match.group(1) if note_match else None
for note in notes_data['notes']:
note_num = note.get('metadata', {{}}).get('note_number', '')
if not target_note or note_num == target_note:
# Add text feedback to assumptions or create user_notes field
if 'assumptions' in note:
note['assumptions'] += f" [User Note: {feedback_text}]"
else:
note['user_notes'] = note.get('user_notes', [])
note['user_notes'].append(feedback_text)
return notes_data
'''
def _generate_suggestion_udf(self, feedback_text: str, iteration: int) -> str:
"""Generate UDF for suggestion feedback"""
return f'''def apply_user_feedback_v{iteration}(notes_data, feedback_type='suggestion'):
"""
UDF generated from suggestion feedback iteration {iteration}
Original Feedback: {feedback_text}
Generated: {datetime.now().isoformat()}
"""
import re
if notes_data and isinstance(notes_data, dict) and 'notes' in notes_data:
# Extract target note number
feedback_lower = "{feedback_text}".lower()
note_match = re.search(r'note\\s*(\\d+)', feedback_lower)
target_note = note_match.group(1) if note_match else None
for note in notes_data['notes']:
note_num = note.get('metadata', {{}}).get('note_number', '')
if not target_note or note_num == target_note:
# Apply suggestions
note['user_suggestions'] = note.get('user_suggestions', [])
note['user_suggestions'].append(feedback_text)
# Parse common suggestions
if 'add' in feedback_lower and 'breakdown' in feedback_lower:
note['enhanced_breakdown'] = True
elif 'more detail' in feedback_lower:
note['detail_level'] = 'enhanced'
return notes_data
'''
def _generate_general_udf(self, feedback_text: str, feedback_type: str, iteration: int) -> str:
"""Generate general UDF for other feedback types"""
return f'''def apply_user_feedback_v{iteration}(notes_data, feedback_type='{feedback_type}'):
"""
UDF generated from {feedback_type} feedback iteration {iteration}
Original Feedback: {feedback_text}
Generated: {datetime.now().isoformat()}
"""
if notes_data and isinstance(notes_data, dict) and 'notes' in notes_data:
for note in notes_data['notes']:
# Apply general feedback
note['user_feedback'] = note.get('user_feedback', [])
note['user_feedback'].append({{
'type': '{feedback_type}',
'text': '{feedback_text}',
'iteration': {iteration}
}})
return notes_data
'''
def _generate_formula_udf(self, feedback_text: str, iteration: int) -> str:
"""Generate UDF specifically for formula feedback"""
import re
# Parse the formula from feedback text
# First try the flexible pattern that captures full operand names
formula_match = re.search(r'=\s*([^-\n]+)\s*-\s*([^\n]+)', feedback_text, re.IGNORECASE)
if formula_match:
operand1 = formula_match.group(1).strip()
operand2 = formula_match.group(2).strip()
else:
# Fallback to other patterns
formula_match = re.search(r'total\s*=\s*(.+?)\s*-\s*(.+?)(?:\s|$)', feedback_text, re.IGNORECASE)
if formula_match:
operand1 = formula_match.group(1).strip()
operand2 = formula_match.group(2).strip()
else:
formula_match = re.search(r'(.+?)\s*-\s*(.+?)\s*=\s*total', feedback_text, re.IGNORECASE)
if formula_match:
operand1 = formula_match.group(1).strip()
operand2 = formula_match.group(2).strip()
if formula_match:
operand1 = formula_match.group(1).strip()
operand2 = formula_match.group(2).strip()
udf_code = f'''def apply_user_feedback_v{iteration}(notes_data, feedback_type='formula'):
"""
UDF generated from formula feedback iteration {iteration}
Original Feedback: {feedback_text}
Generated: {datetime.now().isoformat()}
"""
import re
# Apply formula modifications to JSON structure
if notes_data and isinstance(notes_data, dict) and 'notes' in notes_data:
# Extract note number and formula from feedback
feedback_lower = "{feedback_text}".lower()
note_match = re.search(r'note\\s*(\\d+)', feedback_lower)
target_note = note_match.group(1) if note_match else None
# Parse formula operators
operand1, operand2 = "{operand1}", "{operand2}"
for note in notes_data['notes']:
note_num = note.get('metadata', {{}}).get('note_number', '')
if not target_note or note_num == target_note:
if 'structure' in note:
for item in note['structure']:
if 'subcategories' in item:
vals = {{}}
for sub in item['subcategories']:
label = sub.get('label', '').lower()
if operand1.lower() in label:
try:
vals[operand1] = float(sub.get('value', 0))
except:
vals[operand1] = 0
elif operand2.lower() in label:
try:
vals[operand2] = float(sub.get('value', 0))
except:
vals[operand2] = 0
if len(vals) == 2:
result = vals[operand1] - vals[operand2]
item['total'] = str(result)
print(f"Applied formula in note {{note_num}}: {{vals[operand1]}} - {{vals[operand2]}} = {{result}}")
return notes_data
'''
else:
# Fallback for unrecognized formula patterns
udf_code = f'''def apply_user_feedback_v{iteration}(notes_data, feedback_type='formula'):
"""
UDF generated from formula feedback iteration {iteration}
Original Feedback: {feedback_text}
Generated: {datetime.now().isoformat()}
Note: Could not parse formula pattern, applying general enhancement
"""
import pandas as pd
# Apply general formula-related enhancements
if notes_data and isinstance(notes_data, dict):
for sheet_name, df in notes_data.items():
if isinstance(df, pd.DataFrame):
df_copy = df.copy()
# Add formula indicators to relevant cells
if len(df.columns) >= 1:
for idx in df_copy.index:
if pd.notna(df_copy.iloc[idx, 0]):
cell_value = str(df_copy.iloc[idx, 0])
if 'total' in cell_value.lower():
df_copy.iloc[idx, 0] = cell_value + ' (Calculated field)'
notes_data[sheet_name] = df_copy
return notes_data
'''
return udf_code
class BaseGenerator(ABC):
"""Abstract base class for financial statement generators"""
def __init__(self, max_attempts: int = 3):
self.max_attempts = max_attempts
self.attempts_made = 0
@abstractmethod
def generate(self, file_path: str, **kwargs) -> GenerationResult:
"""Generate financial statement from input file"""
pass
@abstractmethod
def refine(self, previous_result: GenerationResult, feedback: List[str]) -> GenerationResult:
"""Refine generation based on validation feedback"""
pass
class BaseValidator(ABC):
"""Abstract base class for financial statement validators"""
@abstractmethod
def validate(self, generation_result: GenerationResult) -> ValidationResult:
"""Validate the generated financial statement"""
pass
@abstractmethod
def get_validation_criteria(self) -> List[str]:
"""Return list of validation criteria"""
pass
class LLMNotesGenerator(BaseGenerator):
"""Generator for AI-powered financial notes"""
def __init__(self, max_attempts: int = 3, use_rlhf: bool = False, user_api_key: Optional[str] = None):
super().__init__(max_attempts)
self.use_rlhf = use_rlhf
self.user_api_key = user_api_key
def generate(self, file_path: str, **kwargs) -> GenerationResult:
"""Generate notes using AI/LLM approach with feedback integration"""
try:
self.attempts_made += 1
execution_id = f"notes_llm_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{self.attempts_made}"
# Check for feedback context
feedback_context = kwargs.get('feedback_context', {})
session_id = feedback_context.get('session_id')
udfs_to_apply = feedback_context.get('udfs', [])
# Choose workflow based on RLHF preference
if self.use_rlhf:
from agents.rlhf_workflows import run_rlhf_workflow
result = run_rlhf_workflow(file_path, "notes-llm", user_api_key=self.user_api_key)
else:
from agents.langgraph import run_workflow
result = run_workflow(file_path, "notes-llm", feedback_context=feedback_context, user_api_key=self.user_api_key)
if result["status"] == "success":
# UDFs are now applied in generate_llm_notes function before Excel conversion
return GenerationResult(
success=True,
output_path=result["result"]["output_xlsx_path"],
data=result["result"],
error=None,
metadata={
"execution_id": execution_id,
"generation_method": "llm",
"use_rlhf": self.use_rlhf,
"attempt": self.attempts_made,
"rlhf_metadata": result["result"].get("rlhf_metadata", {}),
"feedback_applied": bool(udfs_to_apply),
"udfs_applied_count": len(udfs_to_apply),
"session_id": session_id
}
)
else:
return GenerationResult(
success=False,
output_path=None,
data=None,
error=result.get("error", "Unknown error"),
metadata={
"execution_id": execution_id,
"generation_method": "llm",
"use_rlhf": self.use_rlhf,
"attempt": self.attempts_made
}
)
except Exception as e:
logger.error(f"LLM Notes generation failed: {e}")
return GenerationResult(
success=False,
output_path=None,
data=None,
error=str(e),
metadata={
"execution_id": f"error_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
"generation_method": "llm",
"use_rlhf": self.use_rlhf,
"attempt": self.attempts_made
}
)
def _apply_udfs_to_result(self, result: Dict[str, Any], udfs: List[str], feedback_context: Dict[str, Any]) -> Dict[str, Any]:
"""Apply UDFs to the generation result"""
try:
# Execute each UDF and apply modifications
for udf_code in udfs:
try:
# Create a local namespace for UDF execution
local_vars = {}
exec(udf_code, {"datetime": datetime}, local_vars)
# Find the UDF function (it will be the last defined function)
udf_func = None
for var_name, var_value in local_vars.items():
if callable(var_value) and var_name.startswith('apply_user_feedback'):
udf_func = var_value
break
if udf_func:
# Apply the UDF to the result data
result["result"] = udf_func(result["result"], feedback_context.get('feedback_type', 'general'))
except Exception as e:
logger.warning(f"Failed to apply UDF: {e}")
continue
return result
except Exception as e:
logger.error(f"Error applying UDFs: {e}")
return result
def refine(self, previous_result: GenerationResult, feedback: List[str]) -> GenerationResult:
"""Refine LLM notes generation based on feedback"""
logger.info(f"Refining LLM notes generation with feedback: {feedback}")
# For LLM generation, we can try different approaches:
# 1. Switch to RLHF if not already using it
# 2. Retry with different parameters
# 3. Use fallback models
if not self.use_rlhf and "quality" in str(feedback).lower():
# If quality issues and not using RLHF, try RLHF
logger.info("Switching to RLHF for better quality")
original_rlhf = self.use_rlhf
self.use_rlhf = True
result = self.generate(previous_result.data.get("file_path") if previous_result.data else None)
self.use_rlhf = original_rlhf # Reset for future calls
return result
else:
# Otherwise, just retry
return self.generate(previous_result.data.get("file_path") if previous_result.data else None)
class NotesValidator(BaseValidator):
"""Simplified validator for financial notes - passes user validation to interactive feedback system"""
def validate(self, generation_result: GenerationResult) -> ValidationResult:
"""
Simplified validation - since users provide direct feedback through interactive sessions,
this validator just does basic file existence checks and always passes validation
to let users provide their own feedback.
"""
if not generation_result.success or not generation_result.output_path:
return ValidationResult(
is_valid=False,
score=0.0,
feedback=["Generation failed - no output produced"],
suggestions=["Retry generation process"],
metadata={"validation_type": "basic_file_check"}
)
# Basic file existence check
if not os.path.exists(generation_result.output_path):
return ValidationResult(
is_valid=False,
score=0.0,
feedback=["Output file does not exist"],
suggestions=["Check file generation process"],
metadata={"validation_type": "basic_file_check"}
)
# Get file size for metadata
file_size = os.path.getsize(generation_result.output_path) if os.path.exists(generation_result.output_path) else 0
# Always pass validation - let users provide feedback through interactive system
return ValidationResult(
is_valid=True, # Always pass - user feedback takes precedence
score=1.0, # Perfect score - user will validate
feedback=[], # No automatic feedback - user provides feedback
suggestions=[], # No automatic suggestions - user provides direction
metadata={
"validation_type": "user_feedback_based",
"file_size": file_size,
"automatic_validation_disabled": True,
"reason": "User feedback through interactive sessions replaces automatic validation"
}
)
def get_validation_criteria(self) -> List[str]:
"""Return list of validation criteria"""
return [
"Output file exists and is accessible",
"File size is reasonable (>1KB)",
"Metadata contains required fields",
"For LLM generation: quality score meets threshold",
"RLHF metadata present when RLHF is enabled",
"No critical errors in generation process"
]
class GeneratorValidatorPipeline:
"""Main pipeline that orchestrates Generator-Validator pattern"""
def __init__(self, generator: BaseGenerator, validator: BaseValidator):
self.generator = generator
self.validator = validator
self.generation_history = []
self.validation_history = []
def process(self, file_path: str, **kwargs) -> Tuple[GenerationResult, ValidationResult]:
"""Process file through generator-validator pipeline"""
logger.info("Starting Generator-Validator pipeline")
best_result = None
best_validation = None
for attempt in range(self.generator.max_attempts):
logger.info(f"Attempt {attempt + 1}/{self.generator.max_attempts}")
# Generate
generation_result = self.generator.generate(file_path, **kwargs)
self.generation_history.append(generation_result)
# Validate
validation_result = self.validator.validate(generation_result)
self.validation_history.append(validation_result)
logger.info(f"Generation success: {generation_result.success}, Validation score: {validation_result.score}")
# Keep track of best result
if best_result is None or (generation_result.success and validation_result.score > (best_validation.score if best_validation else 0)):
best_result = generation_result
best_validation = validation_result
# If validation passes, return immediately
if validation_result.is_valid:
logger.info("Validation passed - returning result")
return generation_result, validation_result
# If not the last attempt, try to refine
if attempt < self.generator.max_attempts - 1:
logger.info(f"Validation failed - refining with feedback: {validation_result.feedback}")
# Reset attempts counter for refinement
original_attempts = self.generator.attempts_made
self.generator.attempts_made = 0 # Reset for refinement
generation_result = self.generator.refine(generation_result, validation_result.feedback)
self.generator.attempts_made = original_attempts
logger.info("All attempts completed - returning best result")
return best_result, best_validation
def get_processing_summary(self) -> Dict[str, Any]:
"""Get summary of the processing pipeline"""
return {
"total_attempts": len(self.generation_history),
"successful_generations": sum(1 for g in self.generation_history if g.success),
"validation_scores": [v.score for v in self.validation_history],
"best_score": max([v.score for v in self.validation_history]) if self.validation_history else 0,
"generation_methods": list(set([g.metadata.get("generation_method") for g in self.generation_history if g.metadata])),
"validation_criteria": self.validator.get_validation_criteria()
}
def create_notes_pipeline(use_rlhf: bool = False, user_api_key: Optional[str] = None) -> GeneratorValidatorPipeline:
"""Factory function to create LLM-based pipeline for notes generation"""
generator = LLMNotesGenerator(use_rlhf=use_rlhf, user_api_key=user_api_key)
validator = NotesValidator()
return GeneratorValidatorPipeline(generator, validator)