Spaces:
Sleeping
Sleeping
| """ | |
| Generator-Validator Pattern Implementation for Financial Notes | |
| Implements formal Generator-Validator pattern with iterative refinement | |
| """ | |
| import os | |
| import json | |
| import logging | |
| from abc import ABC, abstractmethod | |
| from typing import Dict, Any, List, Optional, Tuple | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| import uuid | |
| logger = logging.getLogger(__name__) | |
| class ValidationResult: | |
| """Result of validation process""" | |
| is_valid: bool | |
| score: float | |
| feedback: List[str] | |
| suggestions: List[str] | |
| metadata: Dict[str, Any] | |
| class GenerationResult: | |
| """Result of generation process""" | |
| success: bool | |
| output_path: Optional[str] | |
| data: Optional[Dict[str, Any]] | |
| error: Optional[str] | |
| metadata: Dict[str, Any] | |
| class FeedbackData: | |
| """User feedback for iterative improvement""" | |
| session_id: str | |
| feedback_text: str | |
| feedback_type: str # 'text', 'numeric', 'formula', 'suggestion' | |
| iteration_number: int | |
| timestamp: datetime | |
| changes_description: Optional[str] = None | |
| udf_function: Optional[str] = None | |
| udf_version: Optional[str] = None | |
| class InteractiveSession: | |
| """Session data for interactive feedback loop""" | |
| session_id: str | |
| original_file_path: str | |
| current_iteration: int | |
| feedback_history: List[FeedbackData] | |
| archived_udfs: List[str] | |
| final_udf: Optional[str] | |
| status: str # 'active', 'approved', 'cancelled' | |
| created_at: datetime | |
| last_updated: datetime | |
| class InteractiveFeedbackManager: | |
| """Manages interactive feedback sessions and UDF generation""" | |
| def __init__(self, sessions_file: str = "data/interactive_sessions.json"): | |
| self.sessions_file = sessions_file | |
| self.sessions: Dict[str, InteractiveSession] = {} | |
| self._load_sessions() | |
| def _load_sessions(self): | |
| """Load existing sessions from file""" | |
| if os.path.exists(self.sessions_file): | |
| try: | |
| with open(self.sessions_file, 'r') as f: | |
| data = json.load(f) | |
| for session_id, session_data in data.items(): | |
| # Convert datetime strings back to datetime objects | |
| session_data['created_at'] = datetime.fromisoformat(session_data['created_at']) | |
| session_data['last_updated'] = datetime.fromisoformat(session_data['last_updated']) | |
| # Convert feedback history dictionaries back to FeedbackData objects | |
| feedback_objects = [] | |
| for feedback_dict in session_data['feedback_history']: | |
| feedback_dict['timestamp'] = datetime.fromisoformat(feedback_dict['timestamp']) | |
| feedback_objects.append(FeedbackData(**feedback_dict)) | |
| session_data['feedback_history'] = feedback_objects | |
| self.sessions[session_id] = InteractiveSession(**session_data) | |
| except Exception as e: | |
| logger.error(f"Failed to load sessions: {e}") | |
| self.sessions = {} | |
| def _save_sessions(self): | |
| """Save sessions to file""" | |
| os.makedirs(os.path.dirname(self.sessions_file), exist_ok=True) | |
| try: | |
| data = {} | |
| for session_id, session in self.sessions.items(): | |
| # Handle case where session might be a dict instead of InteractiveSession object | |
| if isinstance(session, dict): | |
| session_dict = session | |
| else: | |
| session_dict = { | |
| 'session_id': session.session_id, | |
| 'original_file_path': session.original_file_path, | |
| 'current_iteration': session.current_iteration, | |
| 'feedback_history': [ | |
| { | |
| 'session_id': f.session_id, | |
| 'feedback_text': f.feedback_text, | |
| 'feedback_type': f.feedback_type, | |
| 'iteration_number': f.iteration_number, | |
| 'timestamp': f.timestamp.isoformat(), | |
| 'changes_description': f.changes_description, | |
| 'udf_function': f.udf_function, | |
| 'udf_version': f.udf_version | |
| } for f in session.feedback_history | |
| ], | |
| 'archived_udfs': session.archived_udfs, | |
| 'final_udf': session.final_udf, | |
| 'status': session.status, | |
| 'created_at': session.created_at.isoformat(), | |
| 'last_updated': session.last_updated.isoformat() | |
| } | |
| data[session_id] = session_dict | |
| with open(self.sessions_file, 'w') as f: | |
| json.dump(data, f, indent=2) | |
| except Exception as e: | |
| logger.error(f"Failed to save sessions: {e}") | |
| def create_session(self, file_path: str) -> str: | |
| """Create a new interactive feedback session""" | |
| session_id = str(uuid.uuid4()) | |
| session = InteractiveSession( | |
| session_id=session_id, | |
| original_file_path=file_path, | |
| current_iteration=0, | |
| feedback_history=[], | |
| archived_udfs=[], | |
| final_udf=None, | |
| status='active', | |
| created_at=datetime.now(), | |
| last_updated=datetime.now() | |
| ) | |
| self.sessions[session_id] = session | |
| self._save_sessions() | |
| return session_id | |
| def add_feedback(self, session_id: str, feedback_text: str, feedback_type: str) -> Optional[str]: | |
| """Add user feedback and generate UDF""" | |
| if session_id not in self.sessions: | |
| return None | |
| session = self.sessions[session_id] | |
| session.current_iteration += 1 | |
| # Generate UDF based on feedback | |
| udf_function = self._generate_udf_from_feedback(feedback_text, feedback_type, session.current_iteration) | |
| udf_version = f"udf_v{session.current_iteration}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| # Create feedback data | |
| feedback_data = FeedbackData( | |
| session_id=session_id, | |
| feedback_text=feedback_text, | |
| feedback_type=feedback_type, | |
| iteration_number=session.current_iteration, | |
| timestamp=datetime.now(), | |
| udf_function=udf_function, | |
| udf_version=udf_version | |
| ) | |
| session.feedback_history.append(feedback_data) | |
| session.archived_udfs.append(udf_function) | |
| session.last_updated = datetime.now() | |
| self._save_sessions() | |
| return udf_version | |
| def approve_session(self, session_id: str) -> bool: | |
| """Approve the current session and set final UDF""" | |
| if session_id not in self.sessions: | |
| return False | |
| session = self.sessions[session_id] | |
| if session.feedback_history: | |
| # Set the last UDF as final | |
| session.final_udf = session.feedback_history[-1].udf_function | |
| session.status = 'approved' | |
| session.last_updated = datetime.now() | |
| self._save_sessions() | |
| return True | |
| def get_session(self, session_id: str) -> Optional[InteractiveSession]: | |
| """Get session by ID""" | |
| return self.sessions.get(session_id) | |
| def _generate_udf_from_feedback(self, feedback_text: str, feedback_type: str, iteration: int) -> str: | |
| """Generate UDF function based on user feedback with actual analysis""" | |
| # Analyze feedback content and create meaningful modifications | |
| feedback_lower = feedback_text.lower() | |
| # Determine what modifications to apply based on feedback | |
| apply_detailed_depreciation = 'depreciation' in feedback_lower and 'asset' in feedback_lower | |
| apply_increase_detail = 'detail' in feedback_lower | |
| # Handle different feedback types | |
| if feedback_type == 'formula': | |
| return self._generate_formula_udf(feedback_text, iteration) | |
| elif feedback_type == 'text': | |
| return self._generate_text_udf(feedback_text, iteration) | |
| elif feedback_type == 'suggestion': | |
| return self._generate_suggestion_udf(feedback_text, iteration) | |
| else: | |
| return self._generate_general_udf(feedback_text, feedback_type, iteration) | |
| def _generate_text_udf(self, feedback_text: str, iteration: int) -> str: | |
| """Generate UDF for text feedback""" | |
| return f'''def apply_user_feedback_v{iteration}(notes_data, feedback_type='text'): | |
| """ | |
| UDF generated from text feedback iteration {iteration} | |
| Original Feedback: {feedback_text} | |
| Generated: {datetime.now().isoformat()} | |
| """ | |
| import re | |
| if notes_data and isinstance(notes_data, dict) and 'notes' in notes_data: | |
| # Extract target note number | |
| feedback_lower = "{feedback_text}".lower() | |
| note_match = re.search(r'note\\s*(\\d+)', feedback_lower) | |
| target_note = note_match.group(1) if note_match else None | |
| for note in notes_data['notes']: | |
| note_num = note.get('metadata', {{}}).get('note_number', '') | |
| if not target_note or note_num == target_note: | |
| # Add text feedback to assumptions or create user_notes field | |
| if 'assumptions' in note: | |
| note['assumptions'] += f" [User Note: {feedback_text}]" | |
| else: | |
| note['user_notes'] = note.get('user_notes', []) | |
| note['user_notes'].append(feedback_text) | |
| return notes_data | |
| ''' | |
| def _generate_suggestion_udf(self, feedback_text: str, iteration: int) -> str: | |
| """Generate UDF for suggestion feedback""" | |
| return f'''def apply_user_feedback_v{iteration}(notes_data, feedback_type='suggestion'): | |
| """ | |
| UDF generated from suggestion feedback iteration {iteration} | |
| Original Feedback: {feedback_text} | |
| Generated: {datetime.now().isoformat()} | |
| """ | |
| import re | |
| if notes_data and isinstance(notes_data, dict) and 'notes' in notes_data: | |
| # Extract target note number | |
| feedback_lower = "{feedback_text}".lower() | |
| note_match = re.search(r'note\\s*(\\d+)', feedback_lower) | |
| target_note = note_match.group(1) if note_match else None | |
| for note in notes_data['notes']: | |
| note_num = note.get('metadata', {{}}).get('note_number', '') | |
| if not target_note or note_num == target_note: | |
| # Apply suggestions | |
| note['user_suggestions'] = note.get('user_suggestions', []) | |
| note['user_suggestions'].append(feedback_text) | |
| # Parse common suggestions | |
| if 'add' in feedback_lower and 'breakdown' in feedback_lower: | |
| note['enhanced_breakdown'] = True | |
| elif 'more detail' in feedback_lower: | |
| note['detail_level'] = 'enhanced' | |
| return notes_data | |
| ''' | |
| def _generate_general_udf(self, feedback_text: str, feedback_type: str, iteration: int) -> str: | |
| """Generate general UDF for other feedback types""" | |
| return f'''def apply_user_feedback_v{iteration}(notes_data, feedback_type='{feedback_type}'): | |
| """ | |
| UDF generated from {feedback_type} feedback iteration {iteration} | |
| Original Feedback: {feedback_text} | |
| Generated: {datetime.now().isoformat()} | |
| """ | |
| if notes_data and isinstance(notes_data, dict) and 'notes' in notes_data: | |
| for note in notes_data['notes']: | |
| # Apply general feedback | |
| note['user_feedback'] = note.get('user_feedback', []) | |
| note['user_feedback'].append({{ | |
| 'type': '{feedback_type}', | |
| 'text': '{feedback_text}', | |
| 'iteration': {iteration} | |
| }}) | |
| return notes_data | |
| ''' | |
| def _generate_formula_udf(self, feedback_text: str, iteration: int) -> str: | |
| """Generate UDF specifically for formula feedback""" | |
| import re | |
| # Parse the formula from feedback text | |
| # First try the flexible pattern that captures full operand names | |
| formula_match = re.search(r'=\s*([^-\n]+)\s*-\s*([^\n]+)', feedback_text, re.IGNORECASE) | |
| if formula_match: | |
| operand1 = formula_match.group(1).strip() | |
| operand2 = formula_match.group(2).strip() | |
| else: | |
| # Fallback to other patterns | |
| formula_match = re.search(r'total\s*=\s*(.+?)\s*-\s*(.+?)(?:\s|$)', feedback_text, re.IGNORECASE) | |
| if formula_match: | |
| operand1 = formula_match.group(1).strip() | |
| operand2 = formula_match.group(2).strip() | |
| else: | |
| formula_match = re.search(r'(.+?)\s*-\s*(.+?)\s*=\s*total', feedback_text, re.IGNORECASE) | |
| if formula_match: | |
| operand1 = formula_match.group(1).strip() | |
| operand2 = formula_match.group(2).strip() | |
| if formula_match: | |
| operand1 = formula_match.group(1).strip() | |
| operand2 = formula_match.group(2).strip() | |
| udf_code = f'''def apply_user_feedback_v{iteration}(notes_data, feedback_type='formula'): | |
| """ | |
| UDF generated from formula feedback iteration {iteration} | |
| Original Feedback: {feedback_text} | |
| Generated: {datetime.now().isoformat()} | |
| """ | |
| import re | |
| # Apply formula modifications to JSON structure | |
| if notes_data and isinstance(notes_data, dict) and 'notes' in notes_data: | |
| # Extract note number and formula from feedback | |
| feedback_lower = "{feedback_text}".lower() | |
| note_match = re.search(r'note\\s*(\\d+)', feedback_lower) | |
| target_note = note_match.group(1) if note_match else None | |
| # Parse formula operators | |
| operand1, operand2 = "{operand1}", "{operand2}" | |
| for note in notes_data['notes']: | |
| note_num = note.get('metadata', {{}}).get('note_number', '') | |
| if not target_note or note_num == target_note: | |
| if 'structure' in note: | |
| for item in note['structure']: | |
| if 'subcategories' in item: | |
| vals = {{}} | |
| for sub in item['subcategories']: | |
| label = sub.get('label', '').lower() | |
| if operand1.lower() in label: | |
| try: | |
| vals[operand1] = float(sub.get('value', 0)) | |
| except: | |
| vals[operand1] = 0 | |
| elif operand2.lower() in label: | |
| try: | |
| vals[operand2] = float(sub.get('value', 0)) | |
| except: | |
| vals[operand2] = 0 | |
| if len(vals) == 2: | |
| result = vals[operand1] - vals[operand2] | |
| item['total'] = str(result) | |
| print(f"Applied formula in note {{note_num}}: {{vals[operand1]}} - {{vals[operand2]}} = {{result}}") | |
| return notes_data | |
| ''' | |
| else: | |
| # Fallback for unrecognized formula patterns | |
| udf_code = f'''def apply_user_feedback_v{iteration}(notes_data, feedback_type='formula'): | |
| """ | |
| UDF generated from formula feedback iteration {iteration} | |
| Original Feedback: {feedback_text} | |
| Generated: {datetime.now().isoformat()} | |
| Note: Could not parse formula pattern, applying general enhancement | |
| """ | |
| import pandas as pd | |
| # Apply general formula-related enhancements | |
| if notes_data and isinstance(notes_data, dict): | |
| for sheet_name, df in notes_data.items(): | |
| if isinstance(df, pd.DataFrame): | |
| df_copy = df.copy() | |
| # Add formula indicators to relevant cells | |
| if len(df.columns) >= 1: | |
| for idx in df_copy.index: | |
| if pd.notna(df_copy.iloc[idx, 0]): | |
| cell_value = str(df_copy.iloc[idx, 0]) | |
| if 'total' in cell_value.lower(): | |
| df_copy.iloc[idx, 0] = cell_value + ' (Calculated field)' | |
| notes_data[sheet_name] = df_copy | |
| return notes_data | |
| ''' | |
| return udf_code | |
| class BaseGenerator(ABC): | |
| """Abstract base class for financial statement generators""" | |
| def __init__(self, max_attempts: int = 3): | |
| self.max_attempts = max_attempts | |
| self.attempts_made = 0 | |
| def generate(self, file_path: str, **kwargs) -> GenerationResult: | |
| """Generate financial statement from input file""" | |
| pass | |
| def refine(self, previous_result: GenerationResult, feedback: List[str]) -> GenerationResult: | |
| """Refine generation based on validation feedback""" | |
| pass | |
| class BaseValidator(ABC): | |
| """Abstract base class for financial statement validators""" | |
| def validate(self, generation_result: GenerationResult) -> ValidationResult: | |
| """Validate the generated financial statement""" | |
| pass | |
| def get_validation_criteria(self) -> List[str]: | |
| """Return list of validation criteria""" | |
| pass | |
| class LLMNotesGenerator(BaseGenerator): | |
| """Generator for AI-powered financial notes""" | |
| def __init__(self, max_attempts: int = 3, use_rlhf: bool = False, user_api_key: Optional[str] = None): | |
| super().__init__(max_attempts) | |
| self.use_rlhf = use_rlhf | |
| self.user_api_key = user_api_key | |
| def generate(self, file_path: str, **kwargs) -> GenerationResult: | |
| """Generate notes using AI/LLM approach with feedback integration""" | |
| try: | |
| self.attempts_made += 1 | |
| execution_id = f"notes_llm_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{self.attempts_made}" | |
| # Check for feedback context | |
| feedback_context = kwargs.get('feedback_context', {}) | |
| session_id = feedback_context.get('session_id') | |
| udfs_to_apply = feedback_context.get('udfs', []) | |
| # Choose workflow based on RLHF preference | |
| if self.use_rlhf: | |
| from agents.rlhf_workflows import run_rlhf_workflow | |
| result = run_rlhf_workflow(file_path, "notes-llm", user_api_key=self.user_api_key) | |
| else: | |
| from agents.langgraph import run_workflow | |
| result = run_workflow(file_path, "notes-llm", feedback_context=feedback_context, user_api_key=self.user_api_key) | |
| if result["status"] == "success": | |
| # UDFs are now applied in generate_llm_notes function before Excel conversion | |
| return GenerationResult( | |
| success=True, | |
| output_path=result["result"]["output_xlsx_path"], | |
| data=result["result"], | |
| error=None, | |
| metadata={ | |
| "execution_id": execution_id, | |
| "generation_method": "llm", | |
| "use_rlhf": self.use_rlhf, | |
| "attempt": self.attempts_made, | |
| "rlhf_metadata": result["result"].get("rlhf_metadata", {}), | |
| "feedback_applied": bool(udfs_to_apply), | |
| "udfs_applied_count": len(udfs_to_apply), | |
| "session_id": session_id | |
| } | |
| ) | |
| else: | |
| return GenerationResult( | |
| success=False, | |
| output_path=None, | |
| data=None, | |
| error=result.get("error", "Unknown error"), | |
| metadata={ | |
| "execution_id": execution_id, | |
| "generation_method": "llm", | |
| "use_rlhf": self.use_rlhf, | |
| "attempt": self.attempts_made | |
| } | |
| ) | |
| except Exception as e: | |
| logger.error(f"LLM Notes generation failed: {e}") | |
| return GenerationResult( | |
| success=False, | |
| output_path=None, | |
| data=None, | |
| error=str(e), | |
| metadata={ | |
| "execution_id": f"error_{datetime.now().strftime('%Y%m%d_%H%M%S')}", | |
| "generation_method": "llm", | |
| "use_rlhf": self.use_rlhf, | |
| "attempt": self.attempts_made | |
| } | |
| ) | |
| def _apply_udfs_to_result(self, result: Dict[str, Any], udfs: List[str], feedback_context: Dict[str, Any]) -> Dict[str, Any]: | |
| """Apply UDFs to the generation result""" | |
| try: | |
| # Execute each UDF and apply modifications | |
| for udf_code in udfs: | |
| try: | |
| # Create a local namespace for UDF execution | |
| local_vars = {} | |
| exec(udf_code, {"datetime": datetime}, local_vars) | |
| # Find the UDF function (it will be the last defined function) | |
| udf_func = None | |
| for var_name, var_value in local_vars.items(): | |
| if callable(var_value) and var_name.startswith('apply_user_feedback'): | |
| udf_func = var_value | |
| break | |
| if udf_func: | |
| # Apply the UDF to the result data | |
| result["result"] = udf_func(result["result"], feedback_context.get('feedback_type', 'general')) | |
| except Exception as e: | |
| logger.warning(f"Failed to apply UDF: {e}") | |
| continue | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error applying UDFs: {e}") | |
| return result | |
| def refine(self, previous_result: GenerationResult, feedback: List[str]) -> GenerationResult: | |
| """Refine LLM notes generation based on feedback""" | |
| logger.info(f"Refining LLM notes generation with feedback: {feedback}") | |
| # For LLM generation, we can try different approaches: | |
| # 1. Switch to RLHF if not already using it | |
| # 2. Retry with different parameters | |
| # 3. Use fallback models | |
| if not self.use_rlhf and "quality" in str(feedback).lower(): | |
| # If quality issues and not using RLHF, try RLHF | |
| logger.info("Switching to RLHF for better quality") | |
| original_rlhf = self.use_rlhf | |
| self.use_rlhf = True | |
| result = self.generate(previous_result.data.get("file_path") if previous_result.data else None) | |
| self.use_rlhf = original_rlhf # Reset for future calls | |
| return result | |
| else: | |
| # Otherwise, just retry | |
| return self.generate(previous_result.data.get("file_path") if previous_result.data else None) | |
| class NotesValidator(BaseValidator): | |
| """Simplified validator for financial notes - passes user validation to interactive feedback system""" | |
| def validate(self, generation_result: GenerationResult) -> ValidationResult: | |
| """ | |
| Simplified validation - since users provide direct feedback through interactive sessions, | |
| this validator just does basic file existence checks and always passes validation | |
| to let users provide their own feedback. | |
| """ | |
| if not generation_result.success or not generation_result.output_path: | |
| return ValidationResult( | |
| is_valid=False, | |
| score=0.0, | |
| feedback=["Generation failed - no output produced"], | |
| suggestions=["Retry generation process"], | |
| metadata={"validation_type": "basic_file_check"} | |
| ) | |
| # Basic file existence check | |
| if not os.path.exists(generation_result.output_path): | |
| return ValidationResult( | |
| is_valid=False, | |
| score=0.0, | |
| feedback=["Output file does not exist"], | |
| suggestions=["Check file generation process"], | |
| metadata={"validation_type": "basic_file_check"} | |
| ) | |
| # Get file size for metadata | |
| file_size = os.path.getsize(generation_result.output_path) if os.path.exists(generation_result.output_path) else 0 | |
| # Always pass validation - let users provide feedback through interactive system | |
| return ValidationResult( | |
| is_valid=True, # Always pass - user feedback takes precedence | |
| score=1.0, # Perfect score - user will validate | |
| feedback=[], # No automatic feedback - user provides feedback | |
| suggestions=[], # No automatic suggestions - user provides direction | |
| metadata={ | |
| "validation_type": "user_feedback_based", | |
| "file_size": file_size, | |
| "automatic_validation_disabled": True, | |
| "reason": "User feedback through interactive sessions replaces automatic validation" | |
| } | |
| ) | |
| def get_validation_criteria(self) -> List[str]: | |
| """Return list of validation criteria""" | |
| return [ | |
| "Output file exists and is accessible", | |
| "File size is reasonable (>1KB)", | |
| "Metadata contains required fields", | |
| "For LLM generation: quality score meets threshold", | |
| "RLHF metadata present when RLHF is enabled", | |
| "No critical errors in generation process" | |
| ] | |
| class GeneratorValidatorPipeline: | |
| """Main pipeline that orchestrates Generator-Validator pattern""" | |
| def __init__(self, generator: BaseGenerator, validator: BaseValidator): | |
| self.generator = generator | |
| self.validator = validator | |
| self.generation_history = [] | |
| self.validation_history = [] | |
| def process(self, file_path: str, **kwargs) -> Tuple[GenerationResult, ValidationResult]: | |
| """Process file through generator-validator pipeline""" | |
| logger.info("Starting Generator-Validator pipeline") | |
| best_result = None | |
| best_validation = None | |
| for attempt in range(self.generator.max_attempts): | |
| logger.info(f"Attempt {attempt + 1}/{self.generator.max_attempts}") | |
| # Generate | |
| generation_result = self.generator.generate(file_path, **kwargs) | |
| self.generation_history.append(generation_result) | |
| # Validate | |
| validation_result = self.validator.validate(generation_result) | |
| self.validation_history.append(validation_result) | |
| logger.info(f"Generation success: {generation_result.success}, Validation score: {validation_result.score}") | |
| # Keep track of best result | |
| if best_result is None or (generation_result.success and validation_result.score > (best_validation.score if best_validation else 0)): | |
| best_result = generation_result | |
| best_validation = validation_result | |
| # If validation passes, return immediately | |
| if validation_result.is_valid: | |
| logger.info("Validation passed - returning result") | |
| return generation_result, validation_result | |
| # If not the last attempt, try to refine | |
| if attempt < self.generator.max_attempts - 1: | |
| logger.info(f"Validation failed - refining with feedback: {validation_result.feedback}") | |
| # Reset attempts counter for refinement | |
| original_attempts = self.generator.attempts_made | |
| self.generator.attempts_made = 0 # Reset for refinement | |
| generation_result = self.generator.refine(generation_result, validation_result.feedback) | |
| self.generator.attempts_made = original_attempts | |
| logger.info("All attempts completed - returning best result") | |
| return best_result, best_validation | |
| def get_processing_summary(self) -> Dict[str, Any]: | |
| """Get summary of the processing pipeline""" | |
| return { | |
| "total_attempts": len(self.generation_history), | |
| "successful_generations": sum(1 for g in self.generation_history if g.success), | |
| "validation_scores": [v.score for v in self.validation_history], | |
| "best_score": max([v.score for v in self.validation_history]) if self.validation_history else 0, | |
| "generation_methods": list(set([g.metadata.get("generation_method") for g in self.generation_history if g.metadata])), | |
| "validation_criteria": self.validator.get_validation_criteria() | |
| } | |
| def create_notes_pipeline(use_rlhf: bool = False, user_api_key: Optional[str] = None) -> GeneratorValidatorPipeline: | |
| """Factory function to create LLM-based pipeline for notes generation""" | |
| generator = LLMNotesGenerator(use_rlhf=use_rlhf, user_api_key=user_api_key) | |
| validator = NotesValidator() | |
| return GeneratorValidatorPipeline(generator, validator) | |