diff --git "a/chain_of_thought_wrapper.py" "b/chain_of_thought_wrapper.py" --- "a/chain_of_thought_wrapper.py" +++ "b/chain_of_thought_wrapper.py" @@ -8,27 +8,79 @@ from transformers import ( AutoTokenizer, GenerationConfig, GenerationMixin, - AutoModelForCausalLM # Needed for example usage + # Keep AutoModelForCausalLM for example usage block, but not used in main wrapper logic + # We rely on AutoModel now + # AutoModelForCausalLM, # Removed as AutoModel is more general + # ADDED: AutoProcessor and AutoModel for multimodal handling + AutoProcessor, + AutoModel, + AutoConfig, # Needed for checking model config + # Import specific model classes if AutoModel isn't sufficient for a specific type + # from transformers import LlamaForCausalLM # Example + # from transformers import LlavaForConditionalGeneration # Example multimodal model class ) from transformers.utils import is_accelerate_available, is_bitsandbytes_available -from typing import Optional, List, Tuple, Dict, Union, Any +from typing import Optional, List, Tuple, Dict, Union, Any # Added Any import gc # Import garbage collector for cleanup import time # Import time for potential timing/logging (unused in final code, but good practice) -from collections import Counter # Needed for example voting - -# --- Logging Setup --- +from collections import Counter # Needed for voting +from PIL import Image # Needed for handling image data +import io # Needed for handling image bytes +import os # Needed for path handling + + +# ─── NEW: memory imports ───────────────────────────────────────── +# Assuming these custom classes are provided and handle text-based data +# Ensure these files (Enhanced_MemoryEngine.py, etc.) are in the same directory +try: + from Enhanced_MemoryEngine import MemoryEngine # 📝🧠💾✨🔍 + from NeuroMemoryProcessor import NeuroMemoryProcessor # 📝⚙️🧬🔄 + from AGIEnhancer import AGIEnhancer # ✍️❤️‍🩹🧠 + from FullAGI_ExpansionModule import NeoSentientCore # 🤖💭✨ + # ADDED: Import the new Self Assessment module + from SimulatedSelfAssessment import SimulatedSelfAssessment # 📈📊🧠 + + AGI_IMPORTS_SUCCESS = True + logger = logging.getLogger(__name__) # Re-get logger after potential basicConfig in imported modules + logger.info("AGI helper modules imported successfully.") +except ImportError as e: + AGI_IMPORTS_SUCCESS = False + logger = logging.getLogger(__name__) # Re-get logger + logger.error(f"Failed to import AGI helper modules. AGI features will be disabled: {e}") + # Define dummy classes/objects or handle None checks later if imports fail + class MemoryEngine: # Dummy class to prevent NameError + def __init__(self, *args, **kwargs): pass + def __getattr__(self, name): return lambda *args, **kwargs: None # Mock methods + class NeuroMemoryProcessor: # Dummy class + def __init__(self, *args, **kwargs): pass + def __getattr__(self, name): return lambda *args, **kwargs: None + class AGIEnhancer: # Dummy class + def __init__(self, *args, **kwargs): pass + def __getattr__(self, name): return lambda *args, **kwargs: None + class NeoSentientCore: # Dummy class + def __init__(self, *args, **kwargs): pass + def __getattr__(self, name): return lambda *args, **kwargs: None + # ADDED: Dummy class for Self Assessment if import fails + class SimulatedSelfAssessment: # Dummy class + def __init__(self, *args, **kwargs): pass + def __getattr__(self, name): return lambda *args, **kwargs: {"state_summary": "Simulated self-assessment module not available."} # Mock method returning default summary + + +# --- Logging Setup for Wrapper --- # Configure logging for the module. This helps in debugging and understanding wrapper behavior. -# Set level to DEBUG temporarily to see the detailed logs added below -logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +# Ensure this runs only if basicConfig hasn't been called by imported modules +if not logging.root.handlers: + logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') + logger = logging.getLogger(__name__) -# Ensure logger doesn't add handlers multiple times if the script is imported repeatedly -if not logger.handlers: +if not logger.handlers: # Check again in case imported modules added handlers handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) - # Avoid propagation to the root logger, preventing duplicate messages logger.propagate = False +logger.setLevel(logging.DEBUG) # Set default level to DEBUG for detailed wrapper logs + # --- Default Configuration Values --- # These defaults provide sensible starting points for the wrapper's behavior, @@ -36,8 +88,9 @@ if not logger.handlers: DEFAULT_MAX_LENGTH = 2048 # Increased default max length to accommodate longer CoT DEFAULT_REASONING_LIMIT = 15 # A conceptual limit for extracted steps (not strictly enforced by parsing logic) DEFAULT_CONSISTENCY_ROUNDS = 5 # Default number of chains for self-consistency, increased based on typical research -DEFAULT_COMPLEXITY_KEYWORDS = ["explain", "step by step", "plan", "analyze", "reasoning", "logic"] # Keywords (currently unused as CoT is always on) -DEFAULT_FINAL_ANSWER_TAG = "Final_Answer:" # Explicit tag to signal the final answer +# DEFAULT_COMPLEXITY_KEYWORDS = ["explain", "step by step", "plan", "analyze", "reasoning", "logic"] # Keywords (currently unused as CoT is always on) +DEFAULT_FINAL_ANSWER_TAG = "Final Answer:" # Explicit tag to signal the final answer, reverted to a more common default + # --- Regex Pattern for Parsing Steps --- # This pattern is used to identify and extract individual reasoning steps from @@ -58,870 +111,1048 @@ ARTIFACT_PATTERNS = [ # Add other specific artifact patterns here as needed for observed model outputs ] -class ChainOfThoughtWrapper: + +# --- Self-Consistency Voting (Defined here, but used by the GUI) --- +# Keep the normalize_answer function here as it's a utility +def normalize_answer(answer: str) -> str: """ - A robust Chain-of-Thought (CoT) wrapper for Hugging Face models. - - This wrapper enforces a Chain-of-Thought process by injecting a specific - template into the prompt. It handles model generation and parses the - output to extract reasoning steps and a final answer. It is designed - to generate multiple sequences for potential Self-Consistency voting - (voting logic is intended for the calling application, e.g., a GUI). - - It incorporates enhancements based on a detailed audit, focusing on - prompting, decoding, parsing robustness, cross-model compatibility, - reliability mitigation, and efficiency, while adhering to the "always-on CoT" - principle. - - Key Features: - - Forces CoT via a structured, adaptive prompt template. - - Parses structured reasoning steps and uses robust logic to find the final answer. - - Supports generating multiple chains for Self-Consistency analysis via GenerationConfig. - - Handles common cross-model compatibility issues (e.g., pad tokens, device placement). - - Merges user-provided GenerationConfig with sensible defaults. - - Includes basic cleanup for common model output artifacts. + Normalizes a string answer for robust comparison during voting. + - Converts to lowercase. + - Strips leading/trailing whitespace. + - Removes common punctuation and articles. + - Handles simple cases of number words (e.g., "two" -> "2"). + - Removes extra internal whitespace. """ + if not isinstance(answer, str): + return "" # Handle non-string inputs + + normalized = answer.lower().strip() + + # Remove common trailing characters like periods, commas, etc. + normalized = re.sub(r'[.,!?;:]+$', '', normalized).strip() + + # Remove common leading preambles (case-insensitive) + normalized = re.sub(r'^\s*(?:the answer is|result|output)\s*[:\-]?\s*', '', normalized, flags=re.IGNORECASE).strip() + + # Remove common articles (a, an, the) only if they appear at the start of the answer + normalized = re.sub(r'^\s*(a|an|the)\s+', '', normalized, flags=re.IGNORECASE).strip() + # Basic number word to digit conversion for common cases (can be expanded) + num_word_map = { + 'zero': '0', 'one': '1', 'two': '2', 'three': '3', 'four': '4', + 'five': '5', 'six': '6', 'seven': '7', 'eight': '8', 'nine': '9', + 'ten': '10', 'eleven': '11', 'twelve': '12', 'thirteen': '13', + 'fourteen': '14', 'fifteen': '15', 'sixteen': '16', 'seventeen': '17', + 'eighteen': '18', 'nineteen': '19', 'twenty': '20', 'thirty': '30', + 'forty': '40', 'fifty': '50', 'sixty': '60', 'seventy': '70', + 'eighty': '80', 'ninety': '90', 'hundred': '100', 'thousand': '1000', + 'million': '1000000', 'billion': '1000000000' + } + # Simple word replacement - might fail on "twenty-two" or "one hundred". + # More robust parsing is complex. + words = normalized.split() + normalized_words = [num_word_map.get(word, word) for word in words] + normalized = " ".join(normalized_words) + + + # Remove extra whitespace within the string (replace multiple spaces with single) + normalized = re.sub(r'\s+', ' ', normalized).strip() + + # Remove trailing spaces from the very end again just in case + normalized = normalized.strip() + + + return normalized + +# NOTE: This voting function is for the EXAMPLE USAGE BLOCK only and is NOT +# directly used by the ChainOfThoughtWrapper.generate method. +# It's included here for completeness if the user wanted to test the wrapper +# standalone, but the GUI implements its own voting logic using normalize_answer. +# Removed this function as it's explicitly not used by the wrapper itself and the GUI has its own. +# def perform_self_consistency_voting(...) + + +# --- ChainOfThoughtWrapper Class (Multimodal Enabled) --- +class ChainOfThoughtWrapper: + """ + ChainOfThoughtWrapper: Orchestrates model generation with CoT prompting + and interacts with AGI helper modules. + + Supports multimodal input (image + text) for compatible models + loaded with Hugging Face's AutoModel and AutoProcessor. + """ def __init__( self, - model: Union[PreTrainedModel, GenerationMixin, Any], - tokenizer: AutoTokenizer, - generation_config: Optional[GenerationConfig] = None, - device: Optional[str] = None, - max_length: int = DEFAULT_MAX_LENGTH, - reasoning_steps_limit: int = DEFAULT_REASONING_LIMIT, - self_consistency_enabled: bool = False, # Control if multiple chains are generated - consistency_rounds: int = DEFAULT_CONSISTENCY_ROUNDS, - complexity_keywords: Optional[List[str]] = None, # Currently unused as CoT is always on - final_answer_tag: str = DEFAULT_FINAL_ANSWER_TAG, - # Optional prompt customization for advanced users - cot_instruction: str = "Let's analyze this problem logically, breaking it down step by step to reach the precise final answer.", - reasoning_header: str = "Reasoning Process:", - step_prefix: str = "Step ", # e.g., "Step 1: " - model will ideally continue this - # Optional reliability controls (simple, prompt-based) - emphasize_factual: bool = True, - allow_uncertainty_phrase: Optional[str] = "If information is insufficient or you are unsure, state that clearly.", - # Optional parsing flexibility - strip_artifact_patterns: List[re.Pattern] = ARTIFACT_PATTERNS, + model: Union[PreTrainedModel, GenerationMixin, AutoModel, Any], # Accept AutoModel + processor: Union[AutoTokenizer, AutoProcessor, Any], # Accept AutoProcessor (can be AutoTokenizer) + device: Union[str, torch.device], + # cot_template is less critical now as multimodal models often use specific chat templates + # We'll keep a basic CoT prompt component but rely on processor for formatting + cot_instruction: str = "Analyze this step by step to find the answer.", + reasoning_header: str = "Reasoning:", + step_prefix: str = "Step", # e.g., "Step 1: " - model will ideally continue this + final_answer_tag: str = DEFAULT_FINAL_ANSWER_TAG, # Explicit tag to signal the final answer + max_length: int = DEFAULT_MAX_LENGTH # Max length for tokenization (input + output) ): """ - Initializes the ChainOfThoughtWrapper with enhanced configurations. + Initializes the ChainOfThoughtWrapper. Args: - model (Union[PreTrainedModel, GenerationMixin, Any]): The language model. - Must have a .generate() method. - tokenizer (AutoTokenizer): The corresponding tokenizer. - generation_config (Optional[GenerationConfig]): A default generation configuration. - Values here can be overridden by generate() call. - device (Optional[str]): The device to load the model onto ('cpu' or 'cuda'). - Defaults to 'cuda' if available, otherwise 'cpu'. - max_length (int): The maximum total length of the input + generated sequence. - This should be large enough for the prompt, reasoning, and answer. - reasoning_steps_limit (int): Conceptual limit for parsed steps. Not strictly enforced by current parsing. - self_consistency_enabled (bool): If True, enable multi-chain generation for self-consistency. - consistency_rounds (int): The number of chains to generate if `self_consistency_enabled` is True. - Actual number of sequences is controlled by `num_return_sequences` - in the final `GenerationConfig`. - complexity_keywords (Optional[List[str]]): List of keywords (unused with always-on CoT). - final_answer_tag (str): The specific string marker expected before the final answer. + model (Union[PreTrainedModel, GenerationMixin, AutoModel, Any]): The loaded Hugging Face model. + processor (Union[AutoTokenizer, AutoProcessor, Any]): The loaded Hugging Face processor + (tokenizer or multimodal processor). + device (Union[str, torch.device]): The device the model is on. cot_instruction (str): The core instruction phrase for CoT. reasoning_header (str): The header text before the reasoning steps. step_prefix (str): The prefix for the first step. - emphasize_factual (bool): If True, add prompt text emphasizing factual reasoning. - allow_uncertainty_phrase (Optional[str]): If provided, add a phrase prompting model to state uncertainty. - strip_artifact_patterns (List[re.Pattern]): List of regex patterns to remove from model output before parsing. + final_answer_tag (str): The specific string marker expected before the final answer. + max_length (int): The maximum combined length of input prompt and generated tokens. """ - # --- Device Handling --- - # Determine and set the device. Log the chosen device. - self.device = device or ("cuda" if torch.cuda.is_available() else "cpu") - logger.info("Initializing ChainOfThoughtWrapper on device: %s", self.device) - - # --- Model and Tokenizer Loading and Configuration --- - # Move the model to the specified device and set to evaluation mode. - # Includes error handling for device transfer. - try: - self.model = model.to(self.device) - self.model.eval() # Set model to evaluation mode (disables dropout, etc.) - logger.info("Model moved to %s and set to eval mode.", self.device) - except Exception as e: - logger.error("Failed to move model to device %s: %s", self.device, e) - raise # Re-raise the exception if device transfer fails - - self.tokenizer = tokenizer - - # Attempt to find the underlying Hugging Face model instance and its config. - # This helps reliably access attributes like `config.vocab_size`, `resize_token_embeddings`, etc. - self._hf_model_instance, self._hf_config = self._find_hf_model_and_config(self.model) - - # Handle models/tokenizers without a defined pad_token_id. - # This is crucial for batch generation (like `num_return_sequences`). - # If the tokenizer doesn't have a pad_token, try to use the eos_token. - # If neither exists, add a special token and resize embeddings. - # The wrapper's `resize_token_embeddings` method is called here if a new token is added. - if self.tokenizer.pad_token_id is None: - if self.tokenizer.eos_token_id is not None: + logger.debug("ChainOfThoughtWrapper __init__ started.") + self.model = model + self.processor = processor # Store the processor (can be AutoProcessor or AutoTokenizer) + self.device = device + self.cot_instruction = cot_instruction + self.reasoning_header = reasoning_header + self.step_prefix = step_prefix + self.final_answer_tag = final_answer_tag + self.max_length = max_length + self._artifact_patterns = ARTIFACT_PATTERNS # Use default artifact patterns + self.reasoning_steps_limit = DEFAULT_REASONING_LIMIT # Use default limit for parsing + + # Determine if the loaded processor has an image processor component -> Multimodal capability flag + # This is how we check if the loaded model/processor pair is multimodal capable for input + self.multimodal_capable = hasattr(self.processor, 'image_processor') and self.processor.image_processor is not None + logger.info(f"Wrapper initialized on {self.device}. Multimodal capability detected: {self.multimodal_capable}") + + # Ensure we have a tokenizer, whether the processor is multimodal or text-only + # If processor IS the tokenizer, getattr will return the processor itself. + # CORRECTED: Use getattr to get the tokenizer from the processor + self.tokenizer = getattr(self.processor, 'tokenizer', self.processor) + + if self.tokenizer is None: + logger.error("Processor does not contain a tokenizer.") + # Depending on model, this might be fatal. Proceed, but expect errors during tokenization/decoding. + + # Handle models/tokenizers without a defined pad_token_id for batch generation + # Only attempt this if a tokenizer was found + if self.tokenizer and self.tokenizer.pad_token_id is None: + if hasattr(self.tokenizer, 'eos_token_id') and self.tokenizer.eos_token_id is not None: self.tokenizer.pad_token_id = self.tokenizer.eos_token_id - logger.warning("Tokenizer pad_token_id is None, using eos_token_id (%s) as pad_token_id.", self.tokenizer.eos_token_id) + logger.warning("Tokenizer pad_token_id is None, using eos_token_id (%s) as pad_token_id for batching.", self.tokenizer.eos_token_id) else: # Fallback: Add a new pad token if neither exists - logger.warning("Tokenizer pad_token_id and eos_token_id are both None. Adding a [PAD] token.") - self.tokenizer.add_special_tokens({'pad_token': '[PAD]'}) - self.tokenizer.pad_token_id = self.tokenizer.convert_tokens_to_ids('[PAD]') - logger.info("Added new [PAD] token with ID %s.", self.tokenizer.pad_token_id) - # Resize model embeddings if we added a new token AND we found a base HF model instance - if self._hf_model_instance: - self.resize_token_embeddings(len(self.tokenizer)) # Call the instance method - logger.info("Resized model embeddings to accommodate new PAD token.") - else: - logger.warning("Could not resize model embeddings after adding PAD token; underlying HF model instance not found.") - logger.warning("Ensure the model can handle a larger vocabulary if batching is used.") - - # --- Configuration Attributes --- - self.max_length = max_length - self.reasoning_steps_limit = reasoning_steps_limit - # The actual number of sequences to generate is controlled by `num_return_sequences` in the final `GenerationConfig`. - # We store `consistency_rounds` to potentially inform this value. - self.self_consistency_enabled = self_consistency_enabled - self.consistency_rounds = max(1, consistency_rounds) if self_consistency_enabled else 1 - - # --- Prompt Template Components --- - self.complexity_keywords = complexity_keywords or list(DEFAULT_COMPLEXITY_KEYWORDS) # Store keywords (currently unused for logic) - self.final_answer_tag = final_answer_tag - self._cot_instruction = cot_instruction # Customizable CoT instruction - self._reasoning_header = reasoning_header # Customizable reasoning header - self._step_prefix = step_prefix # Customizable step prefix (e.g., "Step ") + logger.warning("Tokenizer pad_token_id and eos_token_id are both None. Attempting to add a [PAD] token.") + try: + # Check if the token already exists before adding + if hasattr(self.tokenizer, 'vocab') and '[PAD]' not in self.tokenizer.vocab: + self.tokenizer.add_special_tokens({'pad_token': '[PAD]'}) + # Note: Resizing embeddings should ideally happen on the model *after* adding the token. + # The GUI's loading function attempts this, but log if it's needed and might not happen here. + logger.warning("Added new [PAD] token to tokenizer. Model embeddings may need resizing.") + elif not hasattr(self.tokenizer, 'vocab'): + logger.warning("Tokenizer does not have a vocabulary attribute. Cannot check for or add [PAD] token.") + else: + logger.info("[PAD] token already exists in tokenizer vocabulary.") + + # After potentially adding the token, set pad_token_id if it's still None + if self.tokenizer.pad_token_id is None and hasattr(self.tokenizer, 'convert_tokens_to_ids'): + self.tokenizer.pad_token_id = self.tokenizer.convert_tokens_to_ids('[PAD]') + logger.info("Set pad_token_id to ID of [PAD] token (%s).", self.tokenizer.pad_token_id) + elif self.tokenizer.pad_token_id is None: + logger.warning("Cannot set pad_token_id as convert_tokens_to_ids method is missing.") + + + except Exception as e: + logger.error(f"Failed to add [PAD] token or set pad_token_id: {e}") + self.tokenizer.pad_token_id = None # Ensure it stays None if adding fails + logger.warning("Failed to set pad_token_id. Batch generation might fail.") + elif self.tokenizer: + logger.debug("Tokenizer has pad_token_id: %s", self.tokenizer.pad_token_id) + else: + logger.warning("No tokenizer available to check or set pad_token_id.") - # --- Reliability/Hallucination Mitigation Prompt Components --- - self._emphasize_factual = emphasize_factual - self._allow_uncertainty_phrase = allow_uncertainty_phrase - # --- Parsing Attributes and Compiled Regex --- # Compile regex pattern for final answer extraction based on the specified tag. # re.escape handles potential special characters in the tag. re.DOTALL matches newline. self.final_answer_pattern = re.compile( re.escape(final_answer_tag) + r"\s*(.*)", re.IGNORECASE | re.DOTALL ) self._step_pattern = DEFAULT_STEP_PATTERN # Use the default compiled step pattern - self._artifact_patterns = strip_artifact_patterns # Patterns for cleaning model output logger.debug("Final answer pattern compiled: %s", self.final_answer_pattern.pattern) logger.debug("Step pattern: %s", self._step_pattern.pattern) - # --- Base Generation Config Setup --- - # Create or copy the base GenerationConfig. This config holds the default - # generation parameters that will be used unless overridden during a generate() call. - # Use .from_dict(.to_dict()) for a clean copy if a config was provided. - if generation_config: - self.base_generation_config = GenerationConfig.from_dict(generation_config.to_dict()) - logger.info("Initialized with provided base GenerationConfig.") - else: - # Create a default GenerationConfig if none was provided. - # Incorporate parameters known to work well for CoT based on audit (temp, top_p, top_k). - # Ensure pad_token_id and eos_token_id are set from the tokenizer (or the fallback). - self.base_generation_config = GenerationConfig( - eos_token_id=self.tokenizer.eos_token_id, - pad_token_id=self.tokenizer.pad_token_id, - max_length=self.max_length, # Max total length - do_sample=True, # Always sample for diversity (essential for multi-chain) - temperature=0.7, # Balanced randomness - top_p=0.95, # Nucleus sampling - top_k=50, # Top-k sampling cutoff - num_return_sequences=1, # Default to 1 sequence (will be overridden by generate call if self-consistency is on) - # Add a mild repetition penalty, useful for longer CoT - repetition_penalty=1.1, # Discourage immediate repetition - no_repeat_ngram_size=0, # Default to no n-gram repetition prevention - ) - logger.info("Initialized with default base GenerationConfig.") - # Ensure the base config uses the determined pad_token_id - # This might be redundant if tokenizer already has it, but ensures consistency - self.base_generation_config.pad_token_id = self.tokenizer.pad_token_id - logger.debug("Base GenerationConfig pad_token_id set to %s.", self.base_generation_config.pad_token_id) + # --- Initialize AGI Helper Modules --- + # Instantiate your AGI components here, only if imports were successful + self.memory_engine = None + self.neuro_processor = None + self.agi_enhancer = None + self.neo_sentient_core = None + # ADDED: Initialize the Self Assessment module + self.self_assessment_module = None # Initialize the attribute + + if AGI_IMPORTS_SUCCESS: + try: + self.memory_engine = MemoryEngine() + logger.info("MemoryEngine initialized.") + except Exception as e: + self.memory_engine = None + logger.error(f"Failed to initialize MemoryEngine: {e}") + + try: + self.neuro_processor = NeuroMemoryProcessor() + logger.info("NeuroMemoryProcessor initialized.") + except Exception as e: + self.neuro_processor = None + logger.error(f"Failed to initialize NeuroMemoryProcessor: {e}") + + try: + self.agi_enhancer = AGIEnhancer() + logger.info("AGIEnhancer initialized.") + except Exception as e: + self.agi_enhancer = None + logger.error(f"Failed to initialize AGIEnhancer: {e}") + + try: + self.neo_sentient_core = NeoSentientCore(name="NeoAGI") + logger.info("NeoSentientCore initialized.") + except Exception as e: + self.neo_sentient_core = None + logger.error(f"Failed to initialize NeoSentientCore: {e}") + + # ADDED: Initialize the Self Assessment module instance + try: + self.self_assessment_module = SimulatedSelfAssessment() + logger.info("SimulatedSelfAssessment initialized.") + except Exception as e: + self.self_assessment_module = None + logger.error(f"Failed to initialize SimulatedSelfAssessment: {e}") - # Check if the underlying HF model (if found) supports returning scores, useful for CISC. - # We set this on the model's config if possible, as `generate` reads from there. - if self._hf_model_instance and hasattr(self._hf_model_instance.config, 'return_dict_in_generate'): - try: - # Set these attributes directly on the model's config object - self._hf_model_instance.config.return_dict_in_generate = True - self._hf_model_instance.config.output_scores = True # Also request scores - logger.debug("Set underlying HF model config to return dict in generate and output scores.") - except Exception as e: - logger.warning("Failed to set return_dict_in_generate/output_scores on HF model config: %s", e) else: - logger.debug("Underlying HF model instance or config does not support setting return_dict_in_generate/output_scores.") + logger.warning("AGI helper modules were not imported, AGI features will not be available.") - logger.info("ChainOfThoughtWrapper initialization complete.") - logger.debug("Final Base GenerationConfig: %s", self.base_generation_config.to_dict()) + logger.debug("ChainOfThoughtWrapper __init__ finished.") - def _find_hf_model_and_config(self, obj: Any) -> Tuple[Optional[PreTrainedModel], Optional[Any]]: + @torch.no_grad() # Ensure no gradients are calculated during inference + def generate( + self, + input_text: str, + image_data: Optional[List[bytes]] = None, # Accept list of image bytes + multimodal_model: bool = False, + generation_params: Optional[Dict[str, Any]] = None, + chat_history: Optional[List[Dict[str, str]]] = None + ) -> Tuple[Optional[List[Dict[str, str]]], Optional[str], Optional[str]]: """ - Recursively searches for an underlying Hugging Face PreTrainedModel - and its configuration within a potentially wrapped or custom object. - This helps in accessing standard HF attributes like `config` or - methods like `resize_token_embeddings`. + Generates a Chain-of-Thought response from the language model, optionally + handling multimodal input (text + image). Integrates AGI helper modules + (MemoryEngine, NeuroProcessor, AGIEnhancer, NeoSentientCore, SelfAssessment) + and includes conversation history in the prompt. Args: - obj (Any): The object to inspect (could be the model itself or a wrapper). + prompt (str): The user's input prompt (text part). + image (Optional[Image.Image]): The input image, if any. + multimodal_model (bool): True if the loaded model is multimodal. + generation_params (Optional[Dict[str, Any]]): Dictionary of generation parameters + chat_history (Optional[List[Dict[str, str]]]): A list of dictionaries + representing previous turns of the conversation. Each dict + is expected to have keys 'role' ('user' or 'assistant') + and 'content' (the message text). Returns: - Tuple[Optional[PreTrainedModel], Optional[Any]]: The found HF model instance and its config. - Returns (None, None) if not found. - """ - # Add a check to prevent infinite recursion - if getattr(obj, '_searching_hf_model', False): - logger.debug("Preventing infinite recursion in _find_hf_model_and_config for object type: %s", type(obj)) - return None, None - setattr(obj, '_searching_hf_model', True) - - logger.debug("Searching for HF model in object of type: %s", type(obj)) - # If the object is directly a PreTrainedModel and has a config - if isinstance(obj, PreTrainedModel): - logger.debug("Found HF PreTrainedModel directly.") - setattr(obj, '_searching_hf_model', False) # Reset flag - return obj, getattr(obj, 'config', None) # Return config if it exists - - # Check common attribute names where the base model might be stored - potential_attrs = ('model', 'base_model', 'transformer', '_original_model', 'module') # Added 'module' - for attr_name in potential_attrs: - m = getattr(obj, attr_name, None) - if m is not None: - logger.debug("Checking attribute '%s' of type %s", attr_name, type(m)) - # Recursively search within the attribute - found_model, found_config = self._find_hf_model_and_config(m) - if found_model or found_config: - setattr(obj, '_searching_hf_model', False) # Reset flag before returning - return found_model, found_config - - # If no PreTrainedModel found through attributes, check if the object itself has a 'config' attribute - if hasattr(obj, 'config'): - logger.debug("Found config attribute on object, but no PreTrainedModel instance.") - setattr(obj, '_searching_hf_model', False) # Reset flag - return None, obj.config # Return the config found - - logger.debug("No underlying HF PreTrainedModel instance or config found.") - setattr(obj, '_searching_hf_model', False) # Reset flag - return None, None - - - def _inject_cot(self, prompt: str) -> str: + Tuple[Optional[List[Dict[str, str]]], Optional[str], Optional[str]]: + A tuple containing: + 1. List of dictionaries representing the parsed CoT steps (or None). + 2. The extracted final answer string (or None). + 3. The raw body text of the model's response (or None). """ - Injects the structured Chain-of-Thought template into the user's prompt. - This template guides the model's response format. - Incorporates reliability prompts based on settings. + logger.debug("Wrapper generate method called.") + # Added check for model generation compatibility at the start of generate + if self.model is None or self.processor is None or self.tokenizer is None or \ + not (hasattr(self.model, 'generate') and callable(getattr(self.model, 'generate', None)) or isinstance(self.model, GenerationMixin)): + logger.error("Model, Processor, Tokenizer not loaded or loaded model is not generation compatible.") + # Return an empty result dict to indicate failure, GUI will handle displaying error + return {"full_texts": [], "reasoning_steps": [], "final_answers": [], "generated_images": [], "generation_scores": None} + + + # Safely get generation parameters + params = generation_params if generation_params is not None else {} + effective_num_return_sequences = params.get("num_return_sequences", 1) + # Use default values if not provided in params + max_new_tokens = params.get("max_new_tokens", 512) + temperature = params.get("temperature", 0.7) + top_k = params.get("top_k", 50) + top_p = params.get("top_p", 1.0) + do_sample = params.get("do_sample", True) + repetition_penalty = params.get("repetition_penalty", 1.1) + no_repeat_ngram_size = params.get("no_repeat_ngram_size", 0) + + + logger.info(f"Generating {effective_num_return_sequences} sequence(s) with params: {params}") + if image_data: + logger.info(f"Received {len(image_data)} image(s). Wrapper multimodal capable: {self.multimodal_capable}") + + + # --- AGI Helper Module Interaction (Pre-Generation) --- + # Use NeoSentientCore and AGIEnhancer to add internal state to the prompt + # Adapt to include mention of image data if present + agi_pre_prompt_elements: List[str] = [] + if AGI_IMPORTS_SUCCESS and self.neo_sentient_core: + # Simulate perception of the input (text and image presence) + perception_detail = f"User input: '{input_text[:200]}{'...' if len(input_text) > 200 else ''}'" + if image_data: + perception_detail += f" (with {len(image_data)} image(s))" + try: + self.neo_sentient_core.perceive(perception_detail) + logger.debug("NeoSentientCore perceived input.") + except Exception as e: + logger.warning(f"NeoSentientCore perceive failed: {e}") - Args: - prompt (str): The original user prompt. - Returns: - str: The prompt with the CoT template appended. - """ - # Start with the cleaned original prompt - injected_prompt = f"{prompt.strip()}\n\n" + # Get elements from the AGI core to inject into the prompt + # Decide goal (conceptual) + try: + current_goal = self.neo_sentient_core.decide_goal() + if current_goal and isinstance(current_goal, str): agi_pre_prompt_elements.append(f"Intention: {current_goal.strip()}") + except Exception as e: + logger.warning(f"NeoSentientCore decide_goal failed: {e}") - # Add the core CoT instruction phrase - injected_prompt += self._cot_instruction + "\n" + # Get inner voice (conceptual) + try: + inner_monologue = self.neo_sentient_core.inner_voice() + if inner_monologue and isinstance(inner_monologue, str): agi_pre_prompt_elements.append(f"InnerVoice: {inner_monologue.strip()}") + except Exception as e: + logger.warning(f"NeoSentientCore inner_voice failed: {e}") - # Add reliability-focused instructions if enabled - if self._emphasize_factual: - injected_prompt += "Think through the problem step-by-step using only factual information and logical deduction. Do not assume any facts that are not given.\n" - if self._allow_uncertainty_phrase: - injected_prompt += self._allow_uncertainty_phrase + "\n" + # Get qualia token (conceptual emotion priming) + # Using curiosity as a default for exploration, could be more dynamic later + try: + qualia_token = self.neo_sentient_core.generate_qualia_token("curiosity") # Example + if qualia_token and isinstance(qualia_token, str): agi_pre_prompt_elements.insert(0, qualia_token.strip()) # Add qualia at the start + except Exception as e: + logger.warning(f"NeoSentientCore generate_qualia_token failed: {e}") - # Add the structured template for reasoning steps and final answer tag - injected_prompt += f"\n{self._reasoning_header}\n\n" - injected_prompt += f"{self._step_prefix}1: " # Explicitly start the first step to guide format consistency - logger.debug("Injected CoT template. Full prompt starts with: %s...", injected_prompt[:200].replace('\n', '\\n')) - return injected_prompt + if AGI_IMPORTS_SUCCESS and self.agi_enhancer: + # Log the experience with the AGIEnhancer + # Pass text and mention image presence + enhancer_experience_detail = f"User input: '{input_text[:200]}{'...' if len(input_text) > 200 else ''}'" + if image_data: + enhancer_experience_detail += f" (with {len(image_data)} image(s))" + try: + self.agi_enhancer.log_experience(enhancer_experience_detail) + logger.debug("AGIEnhancer logged experience.") + except Exception as e: + logger.warning(f"AGIEnhancer log_experience failed: {e}") - @torch.no_grad() # Disable gradient calculation during generation for efficiency - def generate( - self, - input_text: str, - generation_config: Optional[GenerationConfig] = None, # Optional override config for this call - num_return_sequences: Optional[int] = None, # Explicitly request N sequences - ) -> Dict[str, Any]: - """ - Generates text using the wrapped model with Chain-of-Thought injection. - Handles tokenization, prompt injection, generation, and parsing. - Efficiently generates multiple sequences using `num_return_sequences`. + self_assessment_summary_text: Optional[str] = None # Use a descriptive name for the summary text + if AGI_IMPORTS_SUCCESS and self.self_assessment_module and \ + self.memory_engine and self.neuro_processor and self.neo_sentient_core: + try: + # Gather necessary data snapshots from other modules for the assessment + # These calls assume your other modules have methods like these + recent_reflections_snapshot = self.memory_engine.recall(include_long_term=True, include_working=True, limit=5) # Get some recent memories/reflections + top_biases_snapshot = self.neuro_processor.recall_biases(top_k=10) # Get top biases + synaptic_weights_snapshot = self.neuro_processor.recall_weights(top_k=10) # Get top weights + neo_state_snapshot = self.neo_sentient_core.get_state() # Get core state (emotions, intents, narrative) + current_emotions_snapshot = neo_state_snapshot.get("emotions", {}) # Extract emotions dict + intent_pool_snapshot = neo_state_snapshot.get("intent_pool", []) # Extract intents list + # Assuming AGIEnhancer or NeoSentientCore stores/calculates QRI if used + # You'll need to retrieve QRI data from where you store it if you want it in the assessment + qri_snapshot_data = None # Placeholder - set to actual QRI data if available + + # Call the assessment module's main method + assessment_result = self.self_assessment_module.perform_assessment( + recent_reflections=recent_reflections_snapshot, + top_biases=top_biases_snapshot, + synaptic_weights_snapshot=synaptic_weights_snapshot, + current_emotions=current_emotions_snapshot, + intent_pool=intent_pool_snapshot, + # Assuming MemoryEngine trace is accessible, or NeoSentientCore narrative memory + trace_summary=self.memory_engine.get_trace()[-10:] if self.memory_engine and len(self.memory_engine.get_trace()) > 0 else [], # Get recent trace summary + qri_snapshot=qri_snapshot_data # Pass QRI data here if retrieved + ) + # Extract the summary text generated by the assessment module + self_assessment_summary_text = assessment_result.get("state_summary", None) + logger.debug("Performed simulated self-assessment and retrieved summary for prompt.") + except Exception as e: + logger.error(f"Failed to perform simulated self-assessment: {e}") + # Provide a default error summary if assessment fails, so the prompt still has something + self_assessment_summary_text = "\n--- Simulated Self-Assessment Error ---\nInternal assessment module encountered an issue and cannot provide a state summary.\n---\n" - Args: - input_text (str): The user's input text/question. - generation_config (Optional[GenerationConfig]): Additional generation parameters - to override the base config for this call. - num_return_sequences (Optional[int]): Number of independent sequences (chains) to generate. - If None, uses the value from the merged generation config - (defaulting to 1 or `consistency_rounds` if enabled). - Returns: - Dict[str, Any]: A dictionary containing the generation results: - - 'sequences': The raw generated token IDs (list of tensors). - - 'full_texts': List of raw, cleaned text outputs (after stripping prompt/artifacts) for each chain. - - 'reasoning_steps': List of lists of extracted reasoning steps for each chain. - - 'final_answers': List of extracted final answer strings for each chain. - - 'generation_scores': Scores if requested and available (for CISC externally). - """ - logger.info("Received generate call with input text starting: '%s...'", input_text[:100]) + # Construct the full prompt including AGI elements, Self-Assessment summary, and CoT template components + # This text will be combined with images by the processor for multimodal models + agi_pre_prompt = "\n".join(agi_pre_prompt_elements) + "\n\n" if agi_pre_prompt_elements else "" - # 1) Inject the CoT prompt into the original input text - cot_prompt_text = self._inject_cot(input_text) + # ADDED: Include the self-assessment summary in the prompt if it was successfully generated + self_assessment_prompt_part = self_assessment_summary_text + "\n\n" if self_assessment_summary_text else "" + + + # Construct the core CoT prompt string for the text part of the input + # Include instructions, reasoning header, and step prefix to guide the model + cot_instruction_text = ( + f"{self.cot_instruction}\n\n" + # Optional: Add an instruction to the model about using the assessment summary + "Based on the provided 'Simulated Internal State Assessment', incorporate insights about your perceived internal state, coherence, and well-being into your response and reasoning process.\n\n" + ) + + + cot_prompt_core_text = ( + cot_instruction_text + + f"{self.reasoning_header}\n\n" + f"{self.step_prefix} 1: " # Explicitly start the first step + ) + + # Combine AGI pre-prompt, Self-Assessment summary, and the core CoT text prompt + history_prompt_part = "" + if chat_history: + logger.debug(f"Including {len(chat_history)} turns in conversation history prompt part.") + formatted_history_lines = [] + for turn in chat_history: + role = turn.get('role', 'unknown').capitalize() + + raw_content = turn.get('content', '') + if isinstance(raw_content, str): + content = raw_content.strip() + else: + content = str(raw_content).strip() + + if role and content: + formatted_history_lines.append(f"{role}: {content}") + # Join history lines with a separator, add a final separator + history_prompt_part = "\n".join(formatted_history_lines) + "\n\n---\n\n" if formatted_history_lines else "" + logger.debug(f"Formatted history prompt part:\n{history_prompt_part[:500]}...") # Log snippet + + + # Combine history, AGI pre-prompt, Self-Assessment summary, and the core CoT text prompt + # ADDED: Prepend history_prompt_part + full_text_prompt = history_prompt_part + agi_pre_prompt + self_assessment_prompt_part + cot_prompt_core_text + + + # --- Prepare Multimodal Input --- + input_tensors = {} # Dictionary to hold input tensors - # 2) Tokenize the full CoT prompt - # Ensure padding is handled correctly. Use return_tensors="pt" for PyTorch tensors. - # truncation=True ensures the input fits within max_length. - # max_length applies to the input sequence here. try: - encoded_input = self.tokenizer( - cot_prompt_text, - return_tensors="pt", - padding="longest", # Pad to the longest sequence in the batch (only 1 here, but good practice) - truncation=True, - max_length=self.max_length, # Truncate if the prompt itself is too long - ).to(self.device) - logger.debug("Input text tokenized. Input IDs shape: %s, on device: %s", encoded_input['input_ids'].shape, encoded_input['input_ids'].device) + # Use the processor to handle both text and image inputs + # This is the core change for multimodal input processing + # Multimodal models often require a specific format for messages (e.g., interleaved text/image) + # We'll create a simple message structure for the processor: [image(s)], text prompt + messages = [] + if image_data and self.multimodal_capable: + for img_bytes in image_data: + try: + img = Image.open(io.BytesIO(img_bytes)) + messages.append({"type": "image", "content": img}) # Use PIL Image object + except Exception as e: + logger.warning(f"Could not open image from bytes for processing: {e}. Skipping this image.") + # Decide if you want to continue without the image or raise an error + # For robustness, we'll just skip this image and log a warning + + # Append the text part of the prompt as a text message + # It's often beneficial to include the user's original text input as part of the prompt + # for the model to explicitly reference. + # Let's use a simple structure: User Query + [Image(s)] + CoT Guiding text + + # Revised message structure for processor: + processor_messages = [] + # Add user's original input text first + if input_text and input_text.strip(): + processor_messages.append({"type": "text", "content": f"User Input: {input_text.strip()}"}) + + # Add image messages *after* the initial text input if images are available and wrapper is multimodal + if image_data and self.multimodal_capable and messages: # Check if images were successfully loaded into `messages` list + processor_messages.extend(messages) + logger.debug(f"Prepared {len(messages)} image messages for processor.") + elif image_data and not self.multimodal_capable: + logger.warning("Image data provided but wrapper/model is text-only. Images will be ignored by the processor.") + + # Add the core CoT guiding text (AGI + template) as the final text message + # This guides the *output* format regardless of input modality + if full_text_prompt.strip(): + processor_messages.append({"type": "text", "content": full_text_prompt.strip()}) + elif not processor_messages: # If no text input, no images, and no CoT prompt text, add a default + logger.warning("No text or image content in messages. Adding a default text message.") + processor_messages.append({"type": "text", "content": "Please provide input."}) + # Note: An empty prompt might cause issues for some models. This is a safeguard. + + + # Log the structured messages for debugging + logger.debug(f"Messages prepared for processor: {processor_messages}") + + + # Use the processor to handle input, adapting based on chat template availability + tokenizer_for_template = getattr(self.processor, 'tokenizer', None) # Access tokenizer via processor + has_chat_template = tokenizer_for_template and hasattr(tokenizer_for_template, 'apply_chat_template') and tokenizer_for_template.chat_template + + if hasattr(self.processor, '__call__') and has_chat_template: + # Scenario 1: Processor is callable AND has a chat template + logger.debug("Processor is callable and has a chat template. Using processor's chat template to format messages.") + # apply_chat_template returns a string, so we then tokenize this string + # Use add_generation_prompt=True to ensure the template is completed for the model to generate + chat_prompt_text = tokenizer_for_template.apply_chat_template(processor_messages, tokenize=False, add_generation_prompt=True) + logger.debug(f"Chat template applied. Resulting text prompt: {chat_prompt_text[:200]}...") + + # Now tokenize the formatted text prompt + inputs = self.tokenizer( # Use the stored tokenizer from __init__ + chat_prompt_text, + return_tensors="pt", + padding="longest", + truncation=True, + max_length=self.max_length, + ).to(self.device) + + # Need to also process images separately if using chat template, as apply_chat_template is text-only + if image_data and self.multimodal_capable and messages: # Check if images were successfully loaded into `messages` list + image_processor_component = getattr(self.processor, 'image_processor', None) + if image_processor_component: + try: + # Extract PIL Images from the 'messages' list + pil_images = [msg["content"] for msg in messages if msg["type"] == "image" and isinstance(msg["content"], Image.Image)] + if pil_images: + image_inputs = image_processor_component( + pil_images, # Process list of images + return_tensors="pt" + ).to(self.device) + # Merge image inputs (pixel_values) with text inputs (input_ids, attention_mask) + inputs.update(image_inputs) + logger.debug(f"Image inputs processed separately and merged for chat template case. Keys now: {inputs.keys()}") + else: + logger.warning("No valid PIL images found in messages despite image_data for chat template case. Skipping image processing.") + + except Exception as image_process_e: + logger.error(f"Failed to process image inputs separately for chat template case: {image_process_e}. Generation might fail.") + # Continue with text inputs only, but log error + else: + logger.warning("Processor's image_processor component is missing despite multimodal capability flag for chat template case. Cannot process images.") + + + elif hasattr(self.processor, '__call__'): + # Scenario 2: Processor is callable but NO chat template. + # Attempt to pass concatenated text and separate image inputs to processor.__call__ + logger.debug("Processor is callable but no chat template. Concatenating text messages and processing images separately.") + + # Concatenate text content from all text messages + concatenated_text_input = "\n".join([msg["content"] for msg in processor_messages if msg["type"] == "text"]) + + if not concatenated_text_input.strip() and any(msg["type"] == "image" for msg in processor_messages): + # Handle case where there's only image input but no text input. + # Some multimodal models might still need a minimal text input like "". + logger.warning("No text content in messages, but images are present. Passing empty string as text input.") + concatenated_text_input = "" + elif not concatenated_text_input.strip(): + # Handle case with no text and no images + logger.warning("No text or image content in messages. Passing empty string as text input.") + concatenated_text_input = "" + + # Duplicate the concatenated text string for batching + text_input_for_processor = [concatenated_text_input] * effective_num_return_sequences + logger.debug(f"Concatenated text input for processor: '{concatenated_text_input[:200]}...' (duplicated {effective_num_return_sequences} times for batching)") + + # Process images separately if images are present + image_inputs = {} # Initialize empty image inputs + if image_data and self.multimodal_capable and messages: # Check if images were successfully loaded into `messages` list + image_processor_component = getattr(self.processor, 'image_processor', None) + if image_processor_component: + try: + # Extract PIL Images from the 'messages' list + pil_images = [msg["content"] for msg in messages if msg["type"] == "image" and isinstance(msg["content"], Image.Image)] + if pil_images: + # Process images once and add them. + # Note: For batching num_return_sequences > 1, the model's generate method + # is usually expected to handle the batching dimension for image inputs + # if the image processor outputs batched tensors. If this causes errors, + # model-specific handling might be needed here. + image_inputs = image_processor_component( + pil_images, # Process list of images + return_tensors="pt" + ).to(self.device) + logger.debug(f"Image inputs processed separately for callable processor without chat template. Keys now: {image_inputs.keys()}") + + else: + logger.warning("No valid PIL images found in messages despite image_data for callable processor without chat template. Skipping image processing.") + + except Exception as image_process_e: + logger.error(f"Failed to process image inputs separately for callable processor without chat template: {image_process_e}. Generation might fail.") + # Continue with text inputs only, but log error + else: + logger.warning("Processor's image_processor component is missing despite multimodal capability flag for callable processor without chat template. Cannot process images.") + + + # Pass the concatenated text (as a list for batching) and image inputs (if any) + # to the processor's __call__ method. + # Assuming the processor.__call__ signature handles this pattern. + inputs = self.processor( + text=text_input_for_processor, # Pass list of strings for batching + **image_inputs, # Unpack image inputs (e.g., pixel_values) + return_tensors="pt", + padding="longest", + truncation=True, + max_length=self.max_length, + ).to(self.device) + logger.debug("Input processed using processor.__call__ with concatenated text and separate image inputs.") + + + elif hasattr(self.processor, 'tokenizer'): # Fallback for text-only models loaded with AutoTokenizer + # Scenario 3: Processor is NOT callable, but HAS a tokenizer (text-only model) + logger.debug("Processor is text-only (using tokenizer). Processing text input only.") + # Use the stored tokenizer from __init__ to process only the combined text prompt + # Combine user input and CoT guiding text for text-only models + # Let's use a simple format: User Input + CoT Template Text + combined_text_for_tokenizer = f"User Input: {input_text.strip()}\n\n{full_text_prompt.strip()}" + + inputs = self.tokenizer( + combined_text_for_tokenizer, + return_tensors="pt", + padding="longest", + truncation=True, + max_length=self.max_length, + ).to(self.device) + logger.debug("Input processed using tokenizer directly.") + + + else: + # Safeguard: Should not happen if tokenizer check passes, but as a safeguard + raise TypeError("Loaded processor is neither callable nor contains a tokenizer attribute.") + + # ... (rest of input preparation block) ... + # Prepare the input tensors dictionary for the model's generate method + input_tensors = inputs # 'inputs' is already a dictionary or object acting like one + + # Log the keys present in the input_tensors for debugging + logger.debug("Input tensors prepared for model.generate. Keys: %s", list(input_tensors.keys())) + if 'input_ids' in input_tensors: + logger.debug("Input IDs shape: %s, dtype: %s, on device: %s", input_tensors['input_ids'].shape, input_tensors['input_ids'].dtype, input_tensors['input_ids'].device) + if 'pixel_values' in input_tensors: + logger.debug("Pixel values shape: %s, dtype: %s, on device: %s", input_tensors['pixel_values'].shape, input_tensors['pixel_values'].dtype, input_tensors['pixel_values'].device) + + except Exception as e: - logger.error("Failed to tokenize input text: %s", e) + logger.error("Failed to prepare input tensors (tokenization/image processing): %s", e) # Attempt cleanup before raising if torch.cuda.is_available(): torch.cuda.empty_cache() gc.collect() - raise # Re-raise tokenization error - - - # 3) Build the final GenerationConfig for this specific call - # Start with the base config, then merge any provided overrides. - # Use .from_dict(.to_dict()) for safe merging. - cfg = GenerationConfig.from_dict(self.base_generation_config.to_dict()) - - if generation_config is not None: - logger.debug("Merging provided generation_config overrides...") - cfg.update(**generation_config.to_dict()) - logger.debug("Merged user-provided GenerationConfig.") - - # Explicitly set num_return_sequences for this call based on the argument. - # This overrides any num_return_sequences set in the base config or the provided override config. - if num_return_sequences is not None: - cfg.num_return_sequences = num_return_sequences - logger.debug("Using num_return_sequences from function argument: %s", cfg.num_return_sequences) - elif self.self_consistency_enabled: - # Fallback: If num_return_sequences argument is None, use consistency_rounds if self_consistency is enabled - cfg.num_return_sequences = self.consistency_rounds - logger.debug("num_return_sequences argument is None, using consistency_rounds (%s) because self_consistency is enabled.", cfg.num_return_sequences) - else: - # Fallback: If num_return_sequences argument is None and self_consistency is disabled, default to 1 - cfg.num_return_sequences = 1 - logger.debug("num_return_sequences argument is None and self_consistency disabled, defaulting to 1.") - - - # Ensure max_length in the config respects the wrapper's max_length setting - # max_length in generate() config is the *total* length (input + new tokens) - # max_new_tokens is the number of *new* tokens generated - # Prefer max_new_tokens if set, otherwise calculate from max_length - input_length = encoded_input['input_ids'].shape[1] - if cfg.max_new_tokens is None: - # If max_new_tokens is NOT set, ensure the total length does not exceed the wrapper's max_length - if cfg.max_length is not None: - # Only adjust cfg.max_length if it's set in the base/override config - cfg.max_length = min(self.max_length, cfg.max_length) - else: - # If neither max_new_tokens nor max_length were set in base/override, use wrapper's max_length - cfg.max_length = self.max_length - logger.debug("max_new_tokens not set in config. Using total max_length: %s (Input length: %s)", cfg.max_length, input_length) - else: - # If max_new_tokens IS set, the total length will be input_length + max_new_tokens - # We should check if this effective total length exceeds the wrapper's overall max_length - effective_total_length = input_length + cfg.max_new_tokens - if effective_total_length > self.max_length: - logger.warning("Effective total length (input %d + new %d = %d) exceeds wrapper max_length (%d). Adjusting max_new_tokens.", - input_length, cfg.max_new_tokens, effective_total_length, self.max_length) - # Adjust max_new_tokens down to respect the wrapper's limit - cfg.max_new_tokens = max(0, self.max_length - input_length) - logger.warning("Adjusted max_new_tokens to %d.", cfg.max_new_tokens) - - # Ensure pad_token_id and eos_token_id are correctly set in the final config - # Use tokenizer's IDs as the source of truth - cfg.pad_token_id = self.tokenizer.pad_token_id - cfg.eos_token_id = self.tokenizer.eos_token_id - - logger.debug("Final GenerationConfig for this call after resolving overrides and num_return_sequences: %s", cfg.to_dict()) - - # --- Debugging: Inspect inputs immediately before generation --- - # ADDED LOGGING HERE TO DIAGNOSE CUDA ERROR - logger.debug("-" * 30 + " Inputs to model.generate " + "-" * 30) - logger.debug(" Input Text Snippet: '%s...'", input_text[:100]) - logger.debug(" CoT Prompt Text Snippet: '%s...'", cot_prompt_text[:200].replace('\n', '\\n')) - logger.debug(" Input IDs shape: %s, dtype: %s, device: %s", encoded_input["input_ids"].shape, encoded_input["input_ids"].dtype, encoded_input["input_ids"].device) - if encoded_input.get("attention_mask", None) is not None: - logger.debug(" Attention Mask shape: %s, dtype: %s, device: %s", encoded_input["attention_mask"].shape, encoded_input["attention_mask"].dtype, encoded_input["attention_mask"].device) - # Log a snippet of the attention mask for inspection (only first batch item, first 20 tokens) - if encoded_input["attention_mask"].numel() > 0: - logger.debug(" Attention Mask snippet (first 20): %s", encoded_input["attention_mask"][0, :20].tolist()) - # Check if mask seems valid (contains only 0s and 1s) - might not catch all CUDA errors but helps debug - if not torch.all((encoded_input["attention_mask"] == 0) | (encoded_input["attention_mask"] == 1)): - logger.error("!!! Attention mask contains values other than 0 or 1 !!!") - else: - logger.warning("!!! No attention mask provided to model.generate !!!") - logger.debug(" GenerationConfig.pad_token_id: %s", cfg.pad_token_id) - logger.debug(" GenerationConfig.eos_token_id: %s", cfg.eos_token_id) - logger.debug(" GenerationConfig.num_return_sequences: %s", cfg.num_return_sequences) - logger.debug("-" * 30 + " End Inputs to model.generate " + "-" * 30) - # --- End Debugging --- + # Do not re-raise here, return empty lists and let the GUI handle the error + return {"full_texts": [], "reasoning_steps": [], "final_answers": [], "generated_images": [], "generation_scores": None} - # 4) Generate text using the model's generate method - # Pass input_ids and attention_mask. Pass the *final* GenerationConfig object. + # --- Generate Response --- + generated_outputs = None try: - generation_output = self.model.generate( - input_ids=encoded_input["input_ids"], - attention_mask=encoded_input.get("attention_mask", None), + # Build the final GenerationConfig for this specific call + # Start with a default, then update with provided params + # Ensure pad_token_id and eos_token_id are set from the tokenizer + cfg = GenerationConfig() # Start with an empty config + if self.tokenizer: + # Safely get pad_token_id and eos_token_id, defaulting to None if not found + cfg.pad_token_id = getattr(self.tokenizer, 'pad_token_id', None) + cfg.eos_token_id = getattr(self.tokenizer, 'eos_token_id', None) + else: + logger.warning("Tokenizer not available, GenerationConfig may lack pad/eos tokens.") + + # Update config with parameters from the GUI/caller + if params: + # Remove 'self_consistency_enabled' and 'requested_chains' as they are not GenerationConfig parameters + params_for_gen_config = {k: v for k, v in params.items() if k not in ['self_consistency_enabled', 'requested_chains', 'pad_token_id', 'eos_token_id']} + cfg.update(**params_for_gen_config) + logger.debug("Merged generation_params into GenerationConfig.") + + + # Ensure required parameters for batch generation are set + cfg.num_return_sequences = effective_num_return_sequences + if cfg.num_return_sequences > 1 and not cfg.do_sample: + logger.warning("num_return_sequences > 1 but do_sample is False. Generated sequences will be identical.") + if cfg.do_sample and cfg.temperature == 0: + logger.warning("do_sample is True but temperature is 0. Generation will be deterministic.") + + + # Ensure max_length or max_new_tokens is handled correctly + # Use max_new_tokens from params if available, otherwise calculate from max_length + # Safely get input_length, defaulting to 0 if input_ids is missing or empty + input_ids_tensor = input_tensors.get('input_ids', torch.tensor([[]])) + input_length = input_ids_tensor.shape[-1] if input_ids_tensor.numel() > 0 else 0 + + # Prioritize max_new_tokens from input params if provided, otherwise use max_length + if 'max_new_tokens' in params: + cfg.max_new_tokens = params['max_new_tokens'] + # Ensure max_length is also set to reflect the potential total length constraint + # Only set cfg.max_length if it's not already explicitly set in params or if it's smaller + # This prevents overwriting a larger desired max_length from a user-provided config object + if cfg.max_length is None or (input_length + cfg.max_new_tokens) < cfg.max_length: + cfg.max_length = input_length + cfg.max_new_tokens if input_length + cfg.max_new_tokens > 0 else None + logger.debug("Using max_new_tokens from params: %s. Calculated total max_length: %s", cfg.max_new_tokens, cfg.max_length) + + elif cfg.max_new_tokens is None: + # If max_new_tokens is NOT set in params or default cfg, ensure the total length + # does not exceed the wrapper's max_length limit. Use wrapper's default max_length. + cfg.max_length = min(self.max_length, cfg.max_length if cfg.max_length is not None else self.max_length) + # If max_length is set this way, max_new_tokens should effectively be the difference + cfg.max_new_tokens = max(0, cfg.max_length - input_length) # Ensure it's not negative + logger.debug("max_new_tokens not set in params or default cfg. Using wrapper max_length: %s. Calculated max_new_tokens: %s", cfg.max_length, cfg.max_new_tokens) + else: + # If max_new_tokens was set in default cfg but not params, validate against wrapper's max_length + effective_total_length = input_length + cfg.max_new_tokens + if effective_total_length > self.max_length: + logger.warning("Effective total length (%d) exceeds wrapper max_length (%d). Adjusting max_new_tokens.", effective_total_length, self.max_length) + cfg.max_new_tokens = max(0, self.max_length - input_length) + cfg.max_length = input_length + cfg.max_new_tokens if input_length + cfg.max_new_tokens > 0 else None + logger.warning("Adjusted max_new_tokens to %d.", cfg.max_new_tokens) + else: + # If max_new_tokens was set and is within limits, ensure cfg.max_length is also set correctly + cfg.max_length = input_length + cfg.max_new_tokens if input_length + cfg.max_new_tokens > 0 else None + logger.debug("Using max_new_tokens from default cfg: %s. Calculated total max_length: %s", cfg.max_new_tokens, cfg.max_length) + + + # Ensure max_length is not None unless input_length + max_new_tokens is 0 or less + if cfg.max_length is None and (input_length + (cfg.max_new_tokens if cfg.max_new_tokens is not None else 0)) > 0: + calculated_max_length = input_length + (cfg.max_new_tokens if cfg.max_new_tokens is not None else 0) + if calculated_max_length > 0: + cfg.max_length = calculated_max_length + else: + cfg.max_length = None # If calculation somehow results in <= 0 + + + # Final check: if max_new_tokens became 0 or less, maybe generation isn't possible? + if cfg.max_new_tokens is not None and cfg.max_new_tokens <= 0: + logger.warning("Calculated max_new_tokens is 0 or less. Generation might return only prompt.") + # Set max_new_tokens to a small value like 1 to attempt at least one new token if possible + if input_length < self.max_length and self.max_length > 0: + cfg.max_new_tokens = 1 + # Re-calculate max_length to reflect the adjusted max_new_tokens + cfg.max_length = input_length + cfg.max_new_tokens + logger.warning("Setting max_new_tokens to 1 to attempt minimal generation.") + else: + # If input already max_length or max_length is 0, cannot generate new tokens + cfg.max_new_tokens = 0 # Explicitly 0 + logger.warning("Input length is already at max_length or max_length is zero. Cannot generate new tokens (max_new_tokens = 0).") + + + logger.debug("Final GenerationConfig for this call after resolving params: %s", cfg.to_dict()) + + + # --- Call model.generate --- + # Pass the prepared input tensors (which may include pixel_values) and generation config + # The model's generate method will handle the multimodal input if supported + generated_outputs = self.model.generate( + **input_tensors, # Unpack the input tensors (input_ids, attention_mask, pixel_values, etc.) generation_config=cfg, # Pass the fully configured GenerationConfig - # Request scores if supported by the model/config for potential CISC implementation externally - return_dict_in_generate=True, # Request dict output - output_scores=True, # Request scores + return_dict_in_generate=True, # Ensure we get a dictionary output + output_scores=True # Request scores if needed (though not used in parsing currently) ) - generated_sequences = generation_output.sequences + logger.info(f"Model generation complete. Generated {len(generated_outputs.sequences)} sequences.") + # If scores were requested and returned, they are available in generation_output.scores - # These can be used by the caller for CISC voting. - generation_scores = generation_output.scores if hasattr(generation_output, 'scores') else None - logger.info("Generation complete. Generated %d sequence(s).", len(generated_sequences)) - if generation_scores: + generation_scores = generated_outputs.scores if hasattr(generated_outputs, 'scores') else None + if generation_scores is not None: # Check explicitly for None logger.debug("Generation scores available (%d scores tensors).", len(generation_scores)) + except Exception as e: - logger.error("Model generation failed: %s", e) - # Log the exception details - import traceback - logger.error(traceback.format_exc()) # Log full traceback - - # Attempt cleanup even on failure - this *might* also trigger the CUDA error again, - # but it's the correct place to *try* to clean up GPU memory associated with the model. - if torch.cuda.is_available(): - try: - torch.cuda.empty_cache() - logger.debug("Attempted torch.cuda.empty_cache() after generation failure.") - except Exception as cache_e: - logger.error("Error during cuda empty_cache after generation failure: %s", cache_e) + logger.error("Failed during model generation: %s", e) + # Attempt cleanup before raising + if torch.cuda.is_available(): torch.cuda.empty_cache() gc.collect() - logger.debug("Attempted gc.collect() after generation failure.") + # Do not re-raise here, return empty lists and let the GUI handle the error + return {"full_texts": [], "reasoning_steps": [], "final_answers": [], "generated_images": [], "generation_scores": None} - raise # Re-raise generation error + # --- Process Generated Outputs --- + full_texts: List[str] = [] + reasoning_steps: List[List[str]] = [] # List of lists, one list of steps per sequence + final_answers: List[Optional[str]] = [] # List of final answers per sequence + # Placeholder for future generated images (multimodal output) + generated_images_list: List[Any] = [] # Will store image data if generated - # 5) Decode and Parse the generated sequences - # Ensure generated_sequences is a list or tensor before decoding - if not isinstance(generated_sequences, (list, torch.Tensor)) or len(generated_sequences) == 0: - logger.warning("No sequences generated. Returning empty results.") - return { - "sequences": [], - "full_texts": [], - "reasoning_steps": [], - "final_answers": [], - "generation_scores": None, - } - decoded_outputs = self.tokenizer.batch_decode(generated_sequences, skip_special_tokens=True) - logger.debug("Batch decoding complete.") - parsed_results = [self._parse(text, cot_prompt_text) for text in decoded_outputs] - logger.debug("Parsing complete for %d sequences.", len(parsed_results)) + if generated_outputs and hasattr(generated_outputs, 'sequences'): + # Decode the generated token sequences + # Need the tokenizer from the processor + if self.tokenizer is None: + logger.error("Tokenizer is missing. Cannot decode generated sequences.") + # Return empty lists but don't stop processing + else: + # Get the length of the input prompt's token IDs for prompt removal + # Safely get input_length, defaulting to 0 if input_ids is missing or empty + input_ids_tensor = input_tensors.get('input_ids', torch.tensor([[]])) + input_length = input_ids_tensor.shape[-1] if input_ids_tensor.numel() > 0 else 0 + logger.debug(f"Input token length determined for prompt removal during decoding: {input_length}") + + + for i, sequence in enumerate(generated_outputs.sequences): + # Decode the entire generated sequence back to text + # Need to handle potential prompt remnants in the output for causal models. + # A common approach is to find the start of the generation (length of input_ids) + # and decode only from that point onwards. + + # Ensure sequence is a tensor before slicing and decoding + if isinstance(sequence, torch.Tensor): + # Decode only the newly generated tokens (after the input prompt) + # Use max(0, input_length) to handle cases where input_length might be negative or zero + # Ensure the slice is valid (sequence might be shorter than input_length in error cases) + start_index = max(0, input_length) + # Use skip_special_tokens=True to remove EOS, BOS, PAD tokens from output text + decoded_text = self.tokenizer.decode(sequence[start_index:], skip_special_tokens=True) + logger.debug(f"Decoded new tokens for sequence {i} (input length {input_length}, decoded from index {start_index}): {decoded_text[:200]}...") + else: + # If sequence is not a tensor, decode the whole thing and log a warning + logger.warning(f"Generated sequence {i} is not a tensor (type: {type(sequence)}). Decoding full sequence and hoping parsing handles it.") + # Decode the full sequence, including potential prompt if it's not handled correctly upstream + decoded_text = self.tokenizer.decode(sequence, skip_special_tokens=True) + logger.debug(f"Decoded full sequence {i}: {decoded_text[:200]}...") + + + # In a multimodal generation scenario, the output might *also* contain image tokens + # or encoded image data. Extracting those would require model-specific parsing. + # For now, we assume text output, potentially with text-encoded image info that parsing might ignore. + # Placeholder for future image extraction: + # extracted_image_data = self._extract_image_data_from_text(decoded_text) # Conceptual + + # Parse the decoded text for CoT steps and final answer + # Pass the original user text and the constructed CoT prompt text for parsing reference + steps, answer, full_output_text_cleaned = self._parse( + decoded_text, # The raw decoded output (just the new tokens part) + input_text, # Original user text input (for potential robust prompt removal in parse) + full_text_prompt # The constructed CoT prompt text (AGI + template) (for potential robust prompt removal in parse) + ) + + full_texts.append(full_output_text_cleaned) # Append the cleaned output body + reasoning_steps.append(steps) + final_answers.append(answer) + # Append placeholder or extracted image data + # generated_images_list.append(extracted_image_data if extracted_image_data is not None else None) - # Unpack the parsed results - all_steps = [result[0] for result in parsed_results] - all_final_answers = [result[1] for result in parsed_results] - full_generated_bodies = [result[2] for result in parsed_results] + else: + logger.warning("Model generation did not return sequences in expected format or returned no sequences.") + # Return empty lists + + + # --- AGI Helper Module Interaction (Post-Generation) --- + # Use NeoSentientCore and AGIEnhancer to process the generated output + # Process the output of the first generated chain as the main experience, if any were generated. + if AGI_IMPORTS_SUCCESS and full_texts: + # Use the first chain's full output text for AGI processing + main_output_text = full_texts[0] + + if self.memory_engine: + try: + # Observe the generated output (text) + # Pass text content. Image observation would need adapting MemoryEngine + self.memory_engine.observe(main_output_text) + logger.debug("MemoryEngine observed generated output (text).") + except Exception as e: + logger.warning(f"MemoryEngine observe failed: {e}") + + try: + # Save reasoning chains (example: save steps from the first chain) + if reasoning_steps and reasoning_steps[0]: + # Ensure steps list contains strings before saving + valid_steps = [step for step in reasoning_steps[0] if isinstance(step, str) and step.strip()] + if valid_steps: + self.memory_engine.save_reasoning_chain(1, valid_steps) # Save steps from the first chain + logger.debug("MemoryEngine saved reasoning chain (from first chain).") + else: + logger.debug("MemoryEngine skipping saving empty or invalid reasoning chain.") + except Exception as e: + logger.warning(f"MemoryEngine save_reasoning_chain failed: {e}") + + # Consider reflecting periodically - this logic should be managed externally or less frequently + # logger.debug("MemoryEngine reflection not called here.") + + + if self.neuro_processor: + try: + # Record the generation experience (text) + generation_experience_detail = f"Generated response (first chain): {main_output_text[:200]}{'...' if len(main_output_text) > 200 else ''}" + # Pass text content. Image experience would need adapting NeuroMemoryProcessor + self.neuro_processor.record_experience("generation", generation_experience_detail) + logger.debug("NeuroMemoryProcessor recorded generation experience (text).") + except Exception as e: + logger.warning(f"NeuroMemoryProcessor record_experience failed: {e}") + + # Update biases based on the output (example: process the text) + # Consider moving to scheduled task + # try: + # self.neuro_processor._evolve_cognitive_bias(main_output_text) # Direct call for simplicity + # logger.debug("NeuroProcessor evolved biases based on output.") + # except Exception as e: + # logger.warning(f"NeuroProcessor _evolve_cognitive_bias failed: {e}") + + + if self.agi_enhancer: + try: + # Log the generation experience (text) + enhancer_experience_detail = f"Generated response (first chain): {main_output_text[:200]}{'...' if len(main_output_text) > 200 else ''}" + # Pass text content. Image logging would need adapting AGIEnhancer + self.agi_enhancer.log_experience(enhancer_experience_detail) + logger.debug("AGIEnhancer logged experience.") + except Exception as e: + logger.warning(f"AGIEnhancer log_experience failed: {e}") + + # Engage in reflection periodically - this logic should be managed externally or less frequently + # logger.debug("AGIEnhancer reflection not called here post-gen.") + + # NeoSentientCore post-generation actions (perception of its own output is handled above) + if self.neo_sentient_core: + try: + # Simulate the core processing the generated output (text) + # Assuming NeoSentientCore has a process_output method that accepts text + if hasattr(self.neo_sentient_core, 'process_output'): + self.neo_sentient_core.process_output(main_output_text) + logger.debug("NeoSentientCore processed generated output (text).") + else: + logger.warning("NeoSentientCore does not have a 'process_output' method. Skipping output processing.") + + except Exception as e: + logger.warning(f"NeoSentientCore process_output failed: {e}") + + + + # Attempt cleanup after generation attempt (success or failure) + if torch.cuda.is_available(): + try: + torch.cuda.empty_cache() + logger.debug("GPU memory cache cleared after generation attempt.") + except Exception as cleanup_e: + logger.warning(f"Error during cuda empty_cache after generation attempt: {cleanup_e}") + pass # Suppress this warning unless in debug mode + gc.collect() + logger.debug("Garbage collection performed after generation attempt.") - # 6) Construct and return the results dictionary - # The actual self-consistency voting logic is handled by the caller, - # but the wrapper provides the necessary outputs (multiple chains and parsed answers). + + # Return the collected results return { - "sequences": generated_sequences, # Raw sequences (token IDs) - "full_texts": full_generated_bodies, # Cleaned generated text bodies - "reasoning_steps": all_steps, # Parsed reasoning steps for each chain - "final_answers": all_final_answers, # Parsed final answer for each chain - "generation_scores": generation_scores, # Scores if requested and available (for CISC) + "full_texts": full_texts, + "reasoning_steps": reasoning_steps, + "final_answers": final_answers, + "generation_scores": generation_scores, # Include scores (will be None if not requested/available) + # In a future multimodal version, generated_images might be included here + "generated_images": generated_images_list # Return the list (might be empty) } - def _parse(self, text: str, cot_prompt: str) -> Tuple[List[str], str, str]: + def _parse(self, text: str, user_input: str, cot_prompt_text: str) -> Tuple[List[str], Optional[str], str]: """ - Parses the generated text to extract reasoning steps and the final answer. - This is a robust parsing function that handles different formats, - artifacts, and provides fallback logic for finding the answer. - - Args: - text (str): The raw text output from the model for a single chain. - cot_prompt (str): The exact prompt text that was injected (used to remove it from the output). - - Returns: - Tuple[List[str], str, str]: A tuple containing: - - A list of extracted reasoning step strings. - - The extracted final answer string. - - The full body of the generated text (after removing the prompt and artifacts). + Parses one chain’s generated text into steps + final answer. + Handles artifact cleaning. Attempts to handle potential prompt remnants. + Returns: (steps_list, final_answer_string_or_None, cleaned_body_text) """ - logger.debug("Starting parsing for a single generated text chunk...") - - # 1) Remove the exact injected prompt from the beginning of the text. - # This isolates the model's generated continuation. - body = text - if text.startswith(cot_prompt): - body = text[len(cot_prompt):] # Remove the prefix - logger.debug("Removed exact CoT prompt (%d characters) from beginning.", len(cot_prompt)) - else: - logger.warning("Generated text does not start with the injected CoT prompt. Attempting to parse entire text after initial whitespace strip.") - body = text.lstrip() # Just strip leading whitespace if template wasn't followed + logger.debug("_parse method called.") + # Ensure input is a string + if not isinstance(text, str): + logger.warning(f"Attempted to parse non-string output: {type(text)}. Returning empty.") + return [], None, str(text) # Return empty lists/None and the stringified input + + body = text.strip() # Start with the raw decoded text and strip leading/trailing whitespace - # 2) Apply artifact cleanup patterns - logger.debug("Applying artifact cleanup patterns...") - original_body_len = len(body) - cleaned_body = body # Start with body after prompt removal + # 1) Clean up artifacts using compiled patterns for pattern in self._artifact_patterns: - cleaned_body = pattern.sub("", cleaned_body) - if len(cleaned_body) < original_body_len: - logger.debug("Artifact cleanup removed %d characters.", original_body_len - len(cleaned_body)) + body = pattern.sub("", body) + body = body.strip() + logger.debug(f"Text body after artifact cleanup: {body[:200]}...") + + # 2) Split into non‐empty lines for parsing + lines = [l.strip() for l in body.splitlines() if l.strip()] + logger.debug(f"Split into {len(lines)} non-empty lines.") + + # 3) Extract tagged answer if present + steps: List[str] = [] + final_answer: Optional[str] = None # Use Optional[str] + tagged = False + answer_line_index = -1 # Track line index of the answer tag + + # Search for the final answer tag *anywhere* in the lines + # Use the compiled pattern + for i, line in enumerate(lines): + m = self.final_answer_pattern.search(line) + if m: + final_answer = m.group(1).strip() + tagged = True + answer_line_index = i # Store the index + logger.debug(f"Found final answer tag on line {i}: '{final_answer[:100]}...'") + break # Stop searching once the tag is found + + # 4) Collect steps from the beginning up to the line containing the answer tag (if tagged) + # If not tagged, collect steps from all lines that match the step pattern. + step_lines = [] + if tagged and answer_line_index != -1: + # Collect steps from lines *before* the answer line index + step_lines = lines[:answer_line_index] + logger.debug(f"Collecting steps from lines before answer tag (up to line {answer_line_index}).") else: - logger.debug("No artifacts found matching patterns.") - - # Ensure body is stripped after cleanup - cleaned_body = cleaned_body.strip() - body_lines = [l.strip() for l in cleaned_body.splitlines() if l.strip()] # Split into non-empty, stripped lines - - steps = [] # List to store extracted steps - final_answer = "" # Variable to store the final answer - found_final_answer_tagged = False # Flag to track if the specific tag was found - - # 3) Extract Steps and Final Answer (Primary Method: Tagged Answer) - # Iterate through lines and apply regex patterns. - # Prioritize finding the explicit final answer tag. - logger.debug("Attempting to extract steps and final answer using explicit tag '%s'...", self.final_answer_tag) - for i, line in enumerate(body_lines): - # Check for the explicit final answer tag pattern first - final_answer_match = self.final_answer_pattern.search(line) - if final_answer_match: - final_answer = final_answer_match.group(1).strip() - logger.debug("Extracted final answer using explicit tag: '%s'", final_answer[:100]) - found_final_answer_tagged = True - # Once the tagged answer is found, we can stop processing lines for it - # We still iterate through ALL lines below to capture all steps BEFORE the tag. - # No break here because we need to collect steps that might appear after the tag was first encountered on a line. - # E.g., "Step 1: ... Final_Answer: X Step 2: ..." (unlikely but possible) - # The logic below ensures we capture steps *before* the final answer. - - - # Now, iterate through lines AGAIN to collect steps. - # This second pass ensures we collect steps even if the answer tag was found early. - # We stop collecting steps once we encounter the line that *contained* the final answer tag, - # or if we apply a step limit. - logger.debug("Collecting reasoning steps...") - for i, line in enumerate(body_lines): - # Stop collecting steps if we found the final answer tag on this line or a previous one - # And if we've reached or passed the line where the tag was found (if it was found) - # This requires knowing the index of the line where the tag was found. - # A simpler approach: just collect all lines matching step pattern UP TO the first line - # where the final answer tag was found. - final_answer_line_index = -1 - for idx, l in enumerate(body_lines): - if self.final_answer_pattern.search(l): - final_answer_line_index = idx - break # Found the first occurrence of the tag - - if final_answer_line_index != -1 and i >= final_answer_line_index: - logger.debug("Stopped collecting steps at line index %d because final answer tag was found on line %d.", i, final_answer_line_index) - break # Stop collecting steps once we reach the line with the answer tag - - # Check for reasoning step pattern - step_match = self._step_pattern.match(line) - if step_match: - step_text = step_match.group(1).strip() - if step_text: # Only add non-empty steps - steps.append(step_text) - # logger.debug("Extracted step: '%s'", steps[-1][:50]) # Too verbose usually - # Stop adding steps if we've reached a defined limit - if len(steps) >= self.reasoning_steps_limit: - logger.debug("Reached reasoning steps limit (%d). Stopping step extraction.", self.reasoning_steps_limit) - break # Stop collecting steps if limit is reached - - - # 4) Fallback for Final Answer (If Tag Still Not Found) - # If the explicit final answer tag was not found after both passes, apply fallback heuristics. - if not found_final_answer_tagged: - logger.debug("Explicit final answer tag not found. Applying fallback heuristics.") - - # Fallback: Assume the last non-step line is the answer. - # Iterate backwards through the processed lines to find the last line that doesn't look like a step. - # Using the 'body_lines' list after cleanup and stripping. - last_non_step_line = "" - for line in reversed(body_lines): # Iterate backwards through non-empty, stripped lines - if line and not self._step_pattern.match(line): - last_non_step_line = line.strip() - logger.debug("Fallback: Identified last non-step line: '%s'", last_non_step_line[:100]) - break # Found the last non-step line, stop searching backwards - - if last_non_step_line: - # Check if the last non-step line *contains* the final answer tag, - # even if it didn't *start* with it or wasn't the line where the tag was first found. - fa_match_fallback = self.final_answer_pattern.search(last_non_step_line) - if fa_match_fallback: - final_answer = fa_match_fallback.group(1).strip() - logger.debug("Fallback found tagged answer in last non-step line: '%s'", final_answer[:100]) - else: - # If no tag in the last non-step line, just use the line itself as the answer - final_answer = last_non_step_line - logger.debug("Fallback using last non-step line as answer: '%s'", final_answer[:100]) - else: - # If no non-empty or non-step lines were found, the final answer is empty - final_answer = "" - logger.debug("Fallback: No non-empty or non-step lines found in body. Final answer is empty.") - - # 5) Basic Post-Parsing Cleanup on Final Answer - # Remove any trailing punctuation from the final answer, unless it's part of specific symbols (like !?) - # This helps normalize answers for voting. - if final_answer: - # Remove common trailing characters like periods, commas, etc. + # If not tagged, consider all lines for steps + step_lines = lines + logger.debug("Final answer tag not found. Collecting steps from all lines matching step pattern.") + + + # Extract steps using the step pattern from the identified step lines + for line in step_lines: + m = self._step_pattern.match(line) + if m: + steps.append(m.group(1).strip()) + # Apply conceptual limit *during* collection if needed, though parsing is usually fast. + if self.reasoning_steps_limit > 0 and len(steps) >= self.reasoning_steps_limit: + logger.debug("Reached reasoning steps limit (%d). Stopping step collection.", self.reasoning_steps_limit) + break # Stop collecting steps if limit is reached + + logger.debug(f"Extracted {len(steps)} reasoning steps.") + + # 5) Fallback for final answer if no tagged answer was found + # If no tagged answer was found AND no final_answer was extracted (e.g., tag was empty), + # try to find the last non-step line as the answer. + if not tagged and (final_answer is None or not final_answer.strip()): # Only attempt if no valid tagged answer found + logger.debug("Attempting fallback for final answer...") + # Iterate backwards from the end + # Start from the last line, or just before the answer tag line if tag was found but empty + start_index_for_fallback = answer_line_index if tagged and answer_line_index != -1 else len(lines) -1 + for i in range(start_index_for_fallback, -1, -1): + line = lines[i] + # Check if the line is *not* a step line AND is not empty + if line.strip() and not self._step_pattern.match(line): + # Attempt to remove common answer prefixes from the fallback line + fallback_answer_attempt = re.sub( + r"^\s*(?:Answer|Result|Output|Final Answer)\s*[:\-]?\s*", + "", + line, # Use the original line for prefix removal attempt + flags=re.IGNORECASE + ).strip() + # If after removing prefixes, the line is not empty, use it as the fallback answer + if fallback_answer_attempt: + final_answer = fallback_answer_attempt + logger.debug("Fallback answer found: '%s'", final_answer[:100]) + break # Found the fallback answer + # If removing prefixes resulted in an empty string, maybe the original line is the answer? + elif line.strip(): + final_answer = line.strip() + logger.debug("Using last non-empty, non-step line as fallback answer: '%s'", final_answer[:100]) + break # Found the fallback answer + + logger.debug(f"Final Answer (after fallback): '{final_answer[:100] if final_answer is not None else 'None'}'") + + # 6) Final cleanup on the extracted answer + # Remove trailing punctuation that might be part of the model's generation habit + if final_answer is not None: final_answer = re.sub(r'[.,;:]+$', '', final_answer).strip() - # Remove common leading "Answer: " or similar preambles if they weren't removed by tag matching - # This needs to be case-insensitive - final_answer = re.sub(r'^\s*(?:Answer|Result|Output|Final Answer)\s*[:\-]?\s*', '', final_answer, flags=re.IGNORECASE).strip() - logger.debug("Applied basic post-parsing cleanup to final answer: '%s'", final_answer[:100]) - - # Final check: Ensure steps list doesn't contain the final answer line or text - # This is a belt-and-suspenders approach as the logic above should prevent it, - # but safeguards against edge cases where the tag wasn't found but the line - # looked like a step *and* contained the answer. - if final_answer and steps: - # Remove any step that exactly matches the final answer after stripping - steps = [step for step in steps if step.strip() != final_answer.strip()] - # Also check if the final answer is contained *within* a step (less likely but possible) - steps = [step for step in steps if final_answer.strip() not in step.strip()] - - - logger.info("Parsing complete. Steps found: %d, Final Answer: '%s'", len(steps), final_answer[:100]) - - # Return the extracted steps, the final answer, and the cleaned generated body text - return steps, final_answer, cleaned_body # Return steps, final answer, and the cleaned body text - - - def resize_token_embeddings(self, new_size: int): - """ - Resizes the model's token embeddings to match a new vocabulary size, - useful after adding new tokens (like a custom PAD token) to the tokenizer. - This operation is crucial if the tokenizer size changes and the model - is used for generation or training. + logger.debug(f"Final Answer (after cleanup): '{final_answer[:100] if final_answer is not None else 'None'}'") - Only works if the underlying model object is a PreTrainedModel - or has a `resize_token_embeddings` method. - Args: - new_size (int): The new size of the vocabulary/embedding layer. - Should typically be `len(self.tokenizer)`. - """ - # Use the stored HF model instance found during initialization - hf_model_instance = self._hf_model_instance - - if hf_model_instance and hasattr(hf_model_instance, 'resize_token_embeddings'): - try: - old_size = hf_model_instance.get_input_embeddings().weight.size(0) - if new_size != old_size: - logger.info("Attempting to resize model token embeddings from %d to %d.", old_size, new_size) - # Ensure the model is on the correct device before resizing - hf_model_instance.to(self.device) - hf_model_instance.resize_token_embeddings(new_size) - logger.info("Successfully resized token embeddings.") - # Update model config's vocab size if available - if hasattr(hf_model_instance, 'config') and hasattr(hf_model_instance.config, 'vocab_size'): - hf_model_instance.config.vocab_size = new_size - logger.debug("Updated underlying model config vocab_size to %d.", new_size) - # Attempt garbage collection after a potentially memory-intensive operation - if torch.cuda.is_available(): torch.cuda.empty_cache() - gc.collect() - else: - logger.info("Embedding size is already %d, no resizing needed.", new_size) - except Exception as e: - logger.error("Failed to resize token embeddings: %s", e) - # Attempt cleanup even on failure - if torch.cuda.is_available(): torch.cuda.empty_cache() - gc.collect() - # Note: Not re-raising here by default, as a failure might not be critical - # depending on the user's intended use (e.g., if they don't use the new tokens for generation). - # Could be re-raised if this is deemed a critical error. - else: - logger.warning("Cannot resize token embeddings: The underlying model object does not have a 'resize_token_embeddings' method or HF model instance not found.") - - -# Example Usage (Illustrative) -if __name__ == "__main__": - print("--- ChainOfThoughtWrapper Example Usage ---") - print("This block demonstrates loading a small HF model and using the wrapper.") - print("Setting logging level to DEBUG to see detailed wrapper logs.") - logger.setLevel(logging.DEBUG) # Set logger to DEBUG for example - - # You would replace this with your actual model loading logic - try: - # Use a tiny, fast model for a quick test - # NOTE: distilgpt2 might still hit CUDA errors with num_return_sequences > 1 - # if there are underlying driver/CUDA/PyTorch compatibility issues or - # subtle model-specific padding bugs in HF transformers for this architecture. - # If this example still fails, try a different simple causal model like 'gpt2' or a small LLaMA variant. - model_id = "distilbert/distilgpt2" # A slightly larger but still fast GPT-2 variant - device = "cuda" if torch.cuda.is_available() else "cpu" - - logger.info(f"Attempting to load model {model_id} on {device}...") - - # Load tokenizer - tokenizer = AutoTokenizer.from_pretrained(model_id) - - # Ensure pad token is set for generation robustness (common requirement for GPT-like models) - # Handle this *before* loading the model if possible, or ensure embeddings are resized. - if tokenizer.pad_token_id is None: - if tokenizer.eos_token_id is not None: - tokenizer.pad_token_id = tokenizer.eos_token_id - logger.warning("Tokenizer pad_token_id is None, using eos_token_id (%s) as pad_token_id.", tokenizer.eos_token_id) - else: - # Add a pad token if neither eos nor pad exists. - # This *must* be done before loading the model or resizing embeddings. - logger.warning("Tokenizer pad_token_id and eos_token_id are both None. Adding a [PAD] token.") - tokenizer.add_special_tokens({'pad_token': '[PAD]'}) - tokenizer.pad_token_id = tokenizer.convert_tokens_to_ids('[PAD]') - logger.info("Added new [PAD] token with ID %s.", tokenizer.pad_token_id) - # Note: Resizing embeddings will be handled by the wrapper during initialization - # if a compatible HF model instance is found. - - - # Load model - model = AutoModelForCausalLM.from_pretrained(model_id) - - - # Instantiate the wrapper - # Simulate parameters that would come from a GUI or config - # This GenerationConfig will override some defaults in the wrapper's base config for this call. - simulated_base_gen_config = GenerationConfig( - max_new_tokens=128, # Limit generated tokens - temperature=0.85, # Slightly higher temp for diversity in multiple chains - do_sample=True, # Crucial for sampling-based generation - # num_return_sequences is intentionally NOT set here; it's set by the wrapper based on generate() argument - pad_token_id=tokenizer.pad_token_id, # Pass pad_token_id explicitly - eos_token_id=tokenizer.eos_token_id, # Pass eos_token_id explicitly - # Add other parameters based on tuning recommendations if desired - repetition_penalty=1.1 # Apply repetition penalty - ) + logger.debug("Parsing complete. %d steps, Final Answer: '%s'", len(steps), final_answer[:100] if final_answer is not None else 'None') + # Return steps list, final answer string (or None), and cleaned body text + return steps, final_answer, body # Return the cleaned body text - # Instantiate the wrapper, enabling self-consistency flags in init - # These flags inform the wrapper's default behavior if generate() args are None - cot_wrapper = ChainOfThoughtWrapper( - model=model, - tokenizer=tokenizer, - generation_config=simulated_base_gen_config, # Pass overrides here if desired as base - device=device, - self_consistency_enabled=True, # Simulate SC enabled - consistency_rounds=5, # Simulate consistency rounds setting - final_answer_tag="Final Answer:", # Use a slightly different tag for demo - # Keep factual emphasis on for demo - emphasize_factual=True, - allow_uncertainty_phrase="If you cannot determine a definitive answer, state that.", - ) - # Prepare input prompt - # Use a prompt that encourages steps and a clear answer - prompt_text = "If a train travels at 60 mph for 2.5 hours, how far does it travel? Calculate step-by-step." - logger.info(f"Generating reasoning for prompt: '{prompt_text}'") - - # Generate outputs - # We explicitly pass num_return_sequences to the generate call (e.g., from GUI slider) - num_chains_to_generate = 3 # Simulate GUI setting num_chains slider to 3 - logger.info(f"Calling wrapper.generate() requesting {num_chains_to_generate} chains.") - - start_time = time.time() - outputs = cot_wrapper.generate( - input_text=prompt_text, - # No explicit generation_config override here; uses the base config initialized in the wrapper - # but you *could* pass overrides like: generation_config=GenerationConfig(temperature=1.0) - num_return_sequences=num_chains_to_generate, # Pass the desired number of sequences here - ) - end_time = time.time() - logger.info(f"Generation of {len(outputs.get('sequences', []))} sequences took {end_time - start_time:.2f} seconds.") - - - # --- Process Results (including simulated Self-Consistency voting logic) --- - print("\n" + "="*50) - print("--- Generation Results ---") - print("="*50) - - full_texts = outputs.get('full_texts', []) - reasoning_steps = outputs.get('reasoning_steps', []) - final_answers_raw = outputs.get('final_answers', []) # Raw answers from wrapper - - if not full_texts: - print("No chains were generated or parsed.") - else: - for i, (full_text, steps, final_answer_raw) in enumerate(zip(full_texts, reasoning_steps, final_answers_raw)): - print(f"\n--- Chain {i+1} ---") - print("Full Text (Cleaned):") - print(full_text) - print("\nReasoning Steps Parsed:") - if steps: - # Ensure steps is a list before iterating - steps = steps if isinstance(steps, list) else [] - for j, step in enumerate(steps): - # Ensure step is a string before printing - if isinstance(step, str) and step.strip(): - print(f" Step {j+1}: {step.strip()}") - elif not isinstance(step, str): - print(f" [Step {j+1} has invalid format]") - if not steps: # If steps list was empty after checks - print(" [No steps parsed]") - else: # If steps was None or not a list initially - print(" [No steps parsed]") - print("\nFinal Answer Parsed (Raw):") - # Ensure raw answer is a string before printing - display_raw_answer = final_answer_raw if isinstance(final_answer_raw, str) and final_answer_raw.strip() else "[No final answer parsed]" - print(f" '{display_raw_answer}'") - - - # --- Simulate Self-Consistency Voting (as would be done in GUI) --- - print("\n" + "="*50) - print("--- Simple Self-Consistency Voting Simulation ---") - print("="*50) - - if final_answers_raw: - # Perform the actual voting using the helper functions - consensus_answer, answer_distribution_dict = perform_self_consistency_voting(final_answers_raw) - answer_distribution = Counter(answer_distribution_dict) # Convert to Counter for display - - print(f"Raw Answers Submitted for Voting: {final_answers_raw}") - print(f"Normalized Answers for Voting: {list(answer_distribution_dict.keys())}") # Show unique normalized answers - print(f"Answer Counts: {dict(answer_distribution)}") - - if consensus_answer: - print(f"\nConsensus Answer: '{consensus_answer}'") - # Get count of the winning normalized answer - winner_count = answer_distribution.get(normalize_answer(consensus_answer), 0) - print(f"(Voted by {winner_count} chain(s) out of {len(final_answers_raw)})") - - # Optional: Check for ties (more sophisticated tie-breaking would go here in a real voter) - if len(answer_distribution) > 1 and answer_distribution.most_common(2)[0][1] == answer_distribution.most_common(2)[1][1]: - print("Note: There is a tie for the most common normalized answer.") - - else: - print("No valid final answers found for voting.") - else: - print("No final answers were parsed from any chain for voting.") - - - except Exception as e: - logger.error("An error occurred during the example usage: %s", e) - import traceback - traceback.print_exc() # Print detailed traceback for the example failure - - print("\n--- Example Usage End ---") - # Attempt final cleanup - if torch.cuda.is_available(): - try: - torch.cuda.empty_cache() - print("GPU memory cache cleared.") - except Exception as cleanup_e: - print(f"Error during final cuda empty_cache: {cleanup_e}") - gc.collect() - print("Garbage collected.") \ No newline at end of file + # Add placeholder for potential image data extraction from text output + # This method would be highly model-specific + # Multimodal output is not currently supported by this wrapper's parsing/extraction + # def _extract_image_data_from_text(self, text: str) -> Optional[Any]: + # """ + # Conceptual: Extracts encoded image data or image tokens from text output. + # Requires model-specific parsing logic. + # Returns image data or None. + # """ + # logger.debug("Attempting to extract image data from text output (not implemented).") + # return None \ No newline at end of file