""" Generator-Validator Pattern Implementation for Financial Notes Implements formal Generator-Validator pattern with iterative refinement """ import os import json import logging from abc import ABC, abstractmethod from typing import Dict, Any, List, Optional, Tuple from dataclasses import dataclass from datetime import datetime import uuid logger = logging.getLogger(__name__) @dataclass class ValidationResult: """Result of validation process""" is_valid: bool score: float feedback: List[str] suggestions: List[str] metadata: Dict[str, Any] @dataclass class GenerationResult: """Result of generation process""" success: bool output_path: Optional[str] data: Optional[Dict[str, Any]] error: Optional[str] metadata: Dict[str, Any] @dataclass class FeedbackData: """User feedback for iterative improvement""" session_id: str feedback_text: str feedback_type: str # 'text', 'numeric', 'formula', 'suggestion' iteration_number: int timestamp: datetime changes_description: Optional[str] = None udf_function: Optional[str] = None udf_version: Optional[str] = None @dataclass class InteractiveSession: """Session data for interactive feedback loop""" session_id: str original_file_path: str current_iteration: int feedback_history: List[FeedbackData] archived_udfs: List[str] final_udf: Optional[str] status: str # 'active', 'approved', 'cancelled' created_at: datetime last_updated: datetime class InteractiveFeedbackManager: """Manages interactive feedback sessions and UDF generation""" def __init__(self, sessions_file: str = "data/interactive_sessions.json"): self.sessions_file = sessions_file self.sessions: Dict[str, InteractiveSession] = {} self._load_sessions() def _load_sessions(self): """Load existing sessions from file""" if os.path.exists(self.sessions_file): try: with open(self.sessions_file, 'r') as f: data = json.load(f) for session_id, session_data in data.items(): # Convert datetime strings back to datetime objects session_data['created_at'] = datetime.fromisoformat(session_data['created_at']) session_data['last_updated'] = datetime.fromisoformat(session_data['last_updated']) # Convert feedback history dictionaries back to FeedbackData objects feedback_objects = [] for feedback_dict in session_data['feedback_history']: feedback_dict['timestamp'] = datetime.fromisoformat(feedback_dict['timestamp']) feedback_objects.append(FeedbackData(**feedback_dict)) session_data['feedback_history'] = feedback_objects self.sessions[session_id] = InteractiveSession(**session_data) except Exception as e: logger.error(f"Failed to load sessions: {e}") self.sessions = {} def _save_sessions(self): """Save sessions to file""" os.makedirs(os.path.dirname(self.sessions_file), exist_ok=True) try: data = {} for session_id, session in self.sessions.items(): # Handle case where session might be a dict instead of InteractiveSession object if isinstance(session, dict): session_dict = session else: session_dict = { 'session_id': session.session_id, 'original_file_path': session.original_file_path, 'current_iteration': session.current_iteration, 'feedback_history': [ { 'session_id': f.session_id, 'feedback_text': f.feedback_text, 'feedback_type': f.feedback_type, 'iteration_number': f.iteration_number, 'timestamp': f.timestamp.isoformat(), 'changes_description': f.changes_description, 'udf_function': f.udf_function, 'udf_version': f.udf_version } for f in session.feedback_history ], 'archived_udfs': session.archived_udfs, 'final_udf': session.final_udf, 'status': session.status, 'created_at': session.created_at.isoformat(), 'last_updated': session.last_updated.isoformat() } data[session_id] = session_dict with open(self.sessions_file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: logger.error(f"Failed to save sessions: {e}") def create_session(self, file_path: str) -> str: """Create a new interactive feedback session""" session_id = str(uuid.uuid4()) session = InteractiveSession( session_id=session_id, original_file_path=file_path, current_iteration=0, feedback_history=[], archived_udfs=[], final_udf=None, status='active', created_at=datetime.now(), last_updated=datetime.now() ) self.sessions[session_id] = session self._save_sessions() return session_id def add_feedback(self, session_id: str, feedback_text: str, feedback_type: str) -> Optional[str]: """Add user feedback and generate UDF""" if session_id not in self.sessions: return None session = self.sessions[session_id] session.current_iteration += 1 # Generate UDF based on feedback udf_function = self._generate_udf_from_feedback(feedback_text, feedback_type, session.current_iteration) udf_version = f"udf_v{session.current_iteration}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" # Create feedback data feedback_data = FeedbackData( session_id=session_id, feedback_text=feedback_text, feedback_type=feedback_type, iteration_number=session.current_iteration, timestamp=datetime.now(), udf_function=udf_function, udf_version=udf_version ) session.feedback_history.append(feedback_data) session.archived_udfs.append(udf_function) session.last_updated = datetime.now() self._save_sessions() return udf_version def approve_session(self, session_id: str) -> bool: """Approve the current session and set final UDF""" if session_id not in self.sessions: return False session = self.sessions[session_id] if session.feedback_history: # Set the last UDF as final session.final_udf = session.feedback_history[-1].udf_function session.status = 'approved' session.last_updated = datetime.now() self._save_sessions() return True def get_session(self, session_id: str) -> Optional[InteractiveSession]: """Get session by ID""" return self.sessions.get(session_id) def _generate_udf_from_feedback(self, feedback_text: str, feedback_type: str, iteration: int) -> str: """Generate UDF function based on user feedback with actual analysis""" # Analyze feedback content and create meaningful modifications feedback_lower = feedback_text.lower() # Determine what modifications to apply based on feedback apply_detailed_depreciation = 'depreciation' in feedback_lower and 'asset' in feedback_lower apply_increase_detail = 'detail' in feedback_lower # Handle different feedback types if feedback_type == 'formula': return self._generate_formula_udf(feedback_text, iteration) elif feedback_type == 'text': return self._generate_text_udf(feedback_text, iteration) elif feedback_type == 'suggestion': return self._generate_suggestion_udf(feedback_text, iteration) else: return self._generate_general_udf(feedback_text, feedback_type, iteration) def _generate_text_udf(self, feedback_text: str, iteration: int) -> str: """Generate UDF for text feedback""" return f'''def apply_user_feedback_v{iteration}(notes_data, feedback_type='text'): """ UDF generated from text feedback iteration {iteration} Original Feedback: {feedback_text} Generated: {datetime.now().isoformat()} """ import re if notes_data and isinstance(notes_data, dict) and 'notes' in notes_data: # Extract target note number feedback_lower = "{feedback_text}".lower() note_match = re.search(r'note\\s*(\\d+)', feedback_lower) target_note = note_match.group(1) if note_match else None for note in notes_data['notes']: note_num = note.get('metadata', {{}}).get('note_number', '') if not target_note or note_num == target_note: # Add text feedback to assumptions or create user_notes field if 'assumptions' in note: note['assumptions'] += f" [User Note: {feedback_text}]" else: note['user_notes'] = note.get('user_notes', []) note['user_notes'].append(feedback_text) return notes_data ''' def _generate_suggestion_udf(self, feedback_text: str, iteration: int) -> str: """Generate UDF for suggestion feedback""" return f'''def apply_user_feedback_v{iteration}(notes_data, feedback_type='suggestion'): """ UDF generated from suggestion feedback iteration {iteration} Original Feedback: {feedback_text} Generated: {datetime.now().isoformat()} """ import re if notes_data and isinstance(notes_data, dict) and 'notes' in notes_data: # Extract target note number feedback_lower = "{feedback_text}".lower() note_match = re.search(r'note\\s*(\\d+)', feedback_lower) target_note = note_match.group(1) if note_match else None for note in notes_data['notes']: note_num = note.get('metadata', {{}}).get('note_number', '') if not target_note or note_num == target_note: # Apply suggestions note['user_suggestions'] = note.get('user_suggestions', []) note['user_suggestions'].append(feedback_text) # Parse common suggestions if 'add' in feedback_lower and 'breakdown' in feedback_lower: note['enhanced_breakdown'] = True elif 'more detail' in feedback_lower: note['detail_level'] = 'enhanced' return notes_data ''' def _generate_general_udf(self, feedback_text: str, feedback_type: str, iteration: int) -> str: """Generate general UDF for other feedback types""" return f'''def apply_user_feedback_v{iteration}(notes_data, feedback_type='{feedback_type}'): """ UDF generated from {feedback_type} feedback iteration {iteration} Original Feedback: {feedback_text} Generated: {datetime.now().isoformat()} """ if notes_data and isinstance(notes_data, dict) and 'notes' in notes_data: for note in notes_data['notes']: # Apply general feedback note['user_feedback'] = note.get('user_feedback', []) note['user_feedback'].append({{ 'type': '{feedback_type}', 'text': '{feedback_text}', 'iteration': {iteration} }}) return notes_data ''' def _generate_formula_udf(self, feedback_text: str, iteration: int) -> str: """Generate UDF specifically for formula feedback""" import re # Parse the formula from feedback text # First try the flexible pattern that captures full operand names formula_match = re.search(r'=\s*([^-\n]+)\s*-\s*([^\n]+)', feedback_text, re.IGNORECASE) if formula_match: operand1 = formula_match.group(1).strip() operand2 = formula_match.group(2).strip() else: # Fallback to other patterns formula_match = re.search(r'total\s*=\s*(.+?)\s*-\s*(.+?)(?:\s|$)', feedback_text, re.IGNORECASE) if formula_match: operand1 = formula_match.group(1).strip() operand2 = formula_match.group(2).strip() else: formula_match = re.search(r'(.+?)\s*-\s*(.+?)\s*=\s*total', feedback_text, re.IGNORECASE) if formula_match: operand1 = formula_match.group(1).strip() operand2 = formula_match.group(2).strip() if formula_match: operand1 = formula_match.group(1).strip() operand2 = formula_match.group(2).strip() udf_code = f'''def apply_user_feedback_v{iteration}(notes_data, feedback_type='formula'): """ UDF generated from formula feedback iteration {iteration} Original Feedback: {feedback_text} Generated: {datetime.now().isoformat()} """ import re # Apply formula modifications to JSON structure if notes_data and isinstance(notes_data, dict) and 'notes' in notes_data: # Extract note number and formula from feedback feedback_lower = "{feedback_text}".lower() note_match = re.search(r'note\\s*(\\d+)', feedback_lower) target_note = note_match.group(1) if note_match else None # Parse formula operators operand1, operand2 = "{operand1}", "{operand2}" for note in notes_data['notes']: note_num = note.get('metadata', {{}}).get('note_number', '') if not target_note or note_num == target_note: if 'structure' in note: for item in note['structure']: if 'subcategories' in item: vals = {{}} for sub in item['subcategories']: label = sub.get('label', '').lower() if operand1.lower() in label: try: vals[operand1] = float(sub.get('value', 0)) except: vals[operand1] = 0 elif operand2.lower() in label: try: vals[operand2] = float(sub.get('value', 0)) except: vals[operand2] = 0 if len(vals) == 2: result = vals[operand1] - vals[operand2] item['total'] = str(result) print(f"Applied formula in note {{note_num}}: {{vals[operand1]}} - {{vals[operand2]}} = {{result}}") return notes_data ''' else: # Fallback for unrecognized formula patterns udf_code = f'''def apply_user_feedback_v{iteration}(notes_data, feedback_type='formula'): """ UDF generated from formula feedback iteration {iteration} Original Feedback: {feedback_text} Generated: {datetime.now().isoformat()} Note: Could not parse formula pattern, applying general enhancement """ import pandas as pd # Apply general formula-related enhancements if notes_data and isinstance(notes_data, dict): for sheet_name, df in notes_data.items(): if isinstance(df, pd.DataFrame): df_copy = df.copy() # Add formula indicators to relevant cells if len(df.columns) >= 1: for idx in df_copy.index: if pd.notna(df_copy.iloc[idx, 0]): cell_value = str(df_copy.iloc[idx, 0]) if 'total' in cell_value.lower(): df_copy.iloc[idx, 0] = cell_value + ' (Calculated field)' notes_data[sheet_name] = df_copy return notes_data ''' return udf_code class BaseGenerator(ABC): """Abstract base class for financial statement generators""" def __init__(self, max_attempts: int = 3): self.max_attempts = max_attempts self.attempts_made = 0 @abstractmethod def generate(self, file_path: str, **kwargs) -> GenerationResult: """Generate financial statement from input file""" pass @abstractmethod def refine(self, previous_result: GenerationResult, feedback: List[str]) -> GenerationResult: """Refine generation based on validation feedback""" pass class BaseValidator(ABC): """Abstract base class for financial statement validators""" @abstractmethod def validate(self, generation_result: GenerationResult) -> ValidationResult: """Validate the generated financial statement""" pass @abstractmethod def get_validation_criteria(self) -> List[str]: """Return list of validation criteria""" pass class LLMNotesGenerator(BaseGenerator): """Generator for AI-powered financial notes""" def __init__(self, max_attempts: int = 3, use_rlhf: bool = False, user_api_key: Optional[str] = None): super().__init__(max_attempts) self.use_rlhf = use_rlhf self.user_api_key = user_api_key def generate(self, file_path: str, **kwargs) -> GenerationResult: """Generate notes using AI/LLM approach with feedback integration""" try: self.attempts_made += 1 execution_id = f"notes_llm_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{self.attempts_made}" # Check for feedback context feedback_context = kwargs.get('feedback_context', {}) session_id = feedback_context.get('session_id') udfs_to_apply = feedback_context.get('udfs', []) # Choose workflow based on RLHF preference if self.use_rlhf: from agents.rlhf_workflows import run_rlhf_workflow result = run_rlhf_workflow(file_path, "notes-llm", user_api_key=self.user_api_key) else: from agents.langgraph import run_workflow result = run_workflow(file_path, "notes-llm", feedback_context=feedback_context, user_api_key=self.user_api_key) if result["status"] == "success": # UDFs are now applied in generate_llm_notes function before Excel conversion return GenerationResult( success=True, output_path=result["result"]["output_xlsx_path"], data=result["result"], error=None, metadata={ "execution_id": execution_id, "generation_method": "llm", "use_rlhf": self.use_rlhf, "attempt": self.attempts_made, "rlhf_metadata": result["result"].get("rlhf_metadata", {}), "feedback_applied": bool(udfs_to_apply), "udfs_applied_count": len(udfs_to_apply), "session_id": session_id } ) else: return GenerationResult( success=False, output_path=None, data=None, error=result.get("error", "Unknown error"), metadata={ "execution_id": execution_id, "generation_method": "llm", "use_rlhf": self.use_rlhf, "attempt": self.attempts_made } ) except Exception as e: logger.error(f"LLM Notes generation failed: {e}") return GenerationResult( success=False, output_path=None, data=None, error=str(e), metadata={ "execution_id": f"error_{datetime.now().strftime('%Y%m%d_%H%M%S')}", "generation_method": "llm", "use_rlhf": self.use_rlhf, "attempt": self.attempts_made } ) def _apply_udfs_to_result(self, result: Dict[str, Any], udfs: List[str], feedback_context: Dict[str, Any]) -> Dict[str, Any]: """Apply UDFs to the generation result""" try: # Execute each UDF and apply modifications for udf_code in udfs: try: # Create a local namespace for UDF execution local_vars = {} exec(udf_code, {"datetime": datetime}, local_vars) # Find the UDF function (it will be the last defined function) udf_func = None for var_name, var_value in local_vars.items(): if callable(var_value) and var_name.startswith('apply_user_feedback'): udf_func = var_value break if udf_func: # Apply the UDF to the result data result["result"] = udf_func(result["result"], feedback_context.get('feedback_type', 'general')) except Exception as e: logger.warning(f"Failed to apply UDF: {e}") continue return result except Exception as e: logger.error(f"Error applying UDFs: {e}") return result def refine(self, previous_result: GenerationResult, feedback: List[str]) -> GenerationResult: """Refine LLM notes generation based on feedback""" logger.info(f"Refining LLM notes generation with feedback: {feedback}") # For LLM generation, we can try different approaches: # 1. Switch to RLHF if not already using it # 2. Retry with different parameters # 3. Use fallback models if not self.use_rlhf and "quality" in str(feedback).lower(): # If quality issues and not using RLHF, try RLHF logger.info("Switching to RLHF for better quality") original_rlhf = self.use_rlhf self.use_rlhf = True result = self.generate(previous_result.data.get("file_path") if previous_result.data else None) self.use_rlhf = original_rlhf # Reset for future calls return result else: # Otherwise, just retry return self.generate(previous_result.data.get("file_path") if previous_result.data else None) class NotesValidator(BaseValidator): """Simplified validator for financial notes - passes user validation to interactive feedback system""" def validate(self, generation_result: GenerationResult) -> ValidationResult: """ Simplified validation - since users provide direct feedback through interactive sessions, this validator just does basic file existence checks and always passes validation to let users provide their own feedback. """ if not generation_result.success or not generation_result.output_path: return ValidationResult( is_valid=False, score=0.0, feedback=["Generation failed - no output produced"], suggestions=["Retry generation process"], metadata={"validation_type": "basic_file_check"} ) # Basic file existence check if not os.path.exists(generation_result.output_path): return ValidationResult( is_valid=False, score=0.0, feedback=["Output file does not exist"], suggestions=["Check file generation process"], metadata={"validation_type": "basic_file_check"} ) # Get file size for metadata file_size = os.path.getsize(generation_result.output_path) if os.path.exists(generation_result.output_path) else 0 # Always pass validation - let users provide feedback through interactive system return ValidationResult( is_valid=True, # Always pass - user feedback takes precedence score=1.0, # Perfect score - user will validate feedback=[], # No automatic feedback - user provides feedback suggestions=[], # No automatic suggestions - user provides direction metadata={ "validation_type": "user_feedback_based", "file_size": file_size, "automatic_validation_disabled": True, "reason": "User feedback through interactive sessions replaces automatic validation" } ) def get_validation_criteria(self) -> List[str]: """Return list of validation criteria""" return [ "Output file exists and is accessible", "File size is reasonable (>1KB)", "Metadata contains required fields", "For LLM generation: quality score meets threshold", "RLHF metadata present when RLHF is enabled", "No critical errors in generation process" ] class GeneratorValidatorPipeline: """Main pipeline that orchestrates Generator-Validator pattern""" def __init__(self, generator: BaseGenerator, validator: BaseValidator): self.generator = generator self.validator = validator self.generation_history = [] self.validation_history = [] def process(self, file_path: str, **kwargs) -> Tuple[GenerationResult, ValidationResult]: """Process file through generator-validator pipeline""" logger.info("Starting Generator-Validator pipeline") best_result = None best_validation = None for attempt in range(self.generator.max_attempts): logger.info(f"Attempt {attempt + 1}/{self.generator.max_attempts}") # Generate generation_result = self.generator.generate(file_path, **kwargs) self.generation_history.append(generation_result) # Validate validation_result = self.validator.validate(generation_result) self.validation_history.append(validation_result) logger.info(f"Generation success: {generation_result.success}, Validation score: {validation_result.score}") # Keep track of best result if best_result is None or (generation_result.success and validation_result.score > (best_validation.score if best_validation else 0)): best_result = generation_result best_validation = validation_result # If validation passes, return immediately if validation_result.is_valid: logger.info("Validation passed - returning result") return generation_result, validation_result # If not the last attempt, try to refine if attempt < self.generator.max_attempts - 1: logger.info(f"Validation failed - refining with feedback: {validation_result.feedback}") # Reset attempts counter for refinement original_attempts = self.generator.attempts_made self.generator.attempts_made = 0 # Reset for refinement generation_result = self.generator.refine(generation_result, validation_result.feedback) self.generator.attempts_made = original_attempts logger.info("All attempts completed - returning best result") return best_result, best_validation def get_processing_summary(self) -> Dict[str, Any]: """Get summary of the processing pipeline""" return { "total_attempts": len(self.generation_history), "successful_generations": sum(1 for g in self.generation_history if g.success), "validation_scores": [v.score for v in self.validation_history], "best_score": max([v.score for v in self.validation_history]) if self.validation_history else 0, "generation_methods": list(set([g.metadata.get("generation_method") for g in self.generation_history if g.metadata])), "validation_criteria": self.validator.get_validation_criteria() } def create_notes_pipeline(use_rlhf: bool = False, user_api_key: Optional[str] = None) -> GeneratorValidatorPipeline: """Factory function to create LLM-based pipeline for notes generation""" generator = LLMNotesGenerator(use_rlhf=use_rlhf, user_api_key=user_api_key) validator = NotesValidator() return GeneratorValidatorPipeline(generator, validator)