File size: 71,403 Bytes
828f04e 1c116cd aaa0e51 5448d17 aaa0e51 828f04e 5448d17 828f04e aaa0e51 5448d17 aaa0e51 5448d17 1c116cd 5448d17 828f04e aaa0e51 5448d17 1c116cd 828f04e aaa0e51 5448d17 1c116cd 828f04e aaa0e51 1c116cd aaa0e51 1c116cd aaa0e51 828f04e 5448d17 1c116cd 5448d17 1c116cd 5448d17 1c116cd 5448d17 1c116cd 5448d17 1c116cd 5448d17 828f04e 5448d17 aaa0e51 5448d17 1c116cd 5448d17 aaa0e51 5448d17 aaa0e51 5448d17 aaa0e51 1c116cd aaa0e51 1c116cd aaa0e51 828f04e aaa0e51 5448d17 aaa0e51 828f04e 5448d17 828f04e 5448d17 1c116cd 5448d17 828f04e 5448d17 828f04e 5448d17 828f04e 5448d17 828f04e 5448d17 828f04e 5448d17 828f04e 5448d17 1c116cd 5448d17 aaa0e51 5448d17 aaa0e51 5448d17 828f04e 5448d17 aaa0e51 5448d17 aaa0e51 828f04e 5448d17 828f04e 5448d17 aaa0e51 5448d17 828f04e 5448d17 828f04e 5448d17 aaa0e51 5448d17 828f04e 5448d17 aaa0e51 5448d17 aaa0e51 828f04e 5448d17 828f04e 5448d17 aaa0e51 5448d17 aaa0e51 5448d17 aaa0e51 5448d17 aaa0e51 5448d17 aaa0e51 5448d17 828f04e 5448d17 828f04e 1c116cd 5448d17 828f04e 5448d17 828f04e 5448d17 aaa0e51 5448d17 aaa0e51 5448d17 828f04e 5448d17 aaa0e51 5448d17 828f04e 5448d17 828f04e 5448d17 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 |
# chain_of_thought_wrapper.py
import re
import torch
import logging
from transformers import (
PreTrainedModel,
AutoTokenizer,
GenerationConfig,
GenerationMixin,
# Keep AutoModelForCausalLM for example usage block, but not used in main wrapper logic
# We rely on AutoModel now
# AutoModelForCausalLM, # Removed as AutoModel is more general
# ADDED: AutoProcessor and AutoModel for multimodal handling
AutoProcessor,
AutoModel,
AutoConfig, # Needed for checking model config
# Import specific model classes if AutoModel isn't sufficient for a specific type
# from transformers import LlamaForCausalLM # Example
# from transformers import LlavaForConditionalGeneration # Example multimodal model class
)
from transformers.utils import is_accelerate_available, is_bitsandbytes_available
from typing import Optional, List, Tuple, Dict, Union, Any # Added Any
import gc # Import garbage collector for cleanup
import time # Import time for potential timing/logging (unused in final code, but good practice)
from collections import Counter # Needed for voting
from PIL import Image # Needed for handling image data
import io # Needed for handling image bytes
import os # Needed for path handling
# ─── NEW: memory imports ─────────────────────────────────────────
# Assuming these custom classes are provided and handle text-based data
# Ensure these files (Enhanced_MemoryEngine.py, etc.) are in the same directory
try:
from Enhanced_MemoryEngine import MemoryEngine # 📝🧠💾✨🔍
from NeuroMemoryProcessor import NeuroMemoryProcessor # 📝⚙️🧬🔄
from AGIEnhancer import AGIEnhancer # ✍️❤️🩹🧠
from FullAGI_ExpansionModule import NeoSentientCore # 🤖💭✨
# ADDED: Import the new Self Assessment module
from SimulatedSelfAssessment import SimulatedSelfAssessment # 📈📊🧠
AGI_IMPORTS_SUCCESS = True
logger = logging.getLogger(__name__) # Re-get logger after potential basicConfig in imported modules
logger.info("AGI helper modules imported successfully.")
except ImportError as e:
AGI_IMPORTS_SUCCESS = False
logger = logging.getLogger(__name__) # Re-get logger
logger.error(f"Failed to import AGI helper modules. AGI features will be disabled: {e}")
# Define dummy classes/objects or handle None checks later if imports fail
class MemoryEngine: # Dummy class to prevent NameError
def __init__(self, *args, **kwargs): pass
def __getattr__(self, name): return lambda *args, **kwargs: None # Mock methods
class NeuroMemoryProcessor: # Dummy class
def __init__(self, *args, **kwargs): pass
def __getattr__(self, name): return lambda *args, **kwargs: None
class AGIEnhancer: # Dummy class
def __init__(self, *args, **kwargs): pass
def __getattr__(self, name): return lambda *args, **kwargs: None
class NeoSentientCore: # Dummy class
def __init__(self, *args, **kwargs): pass
def __getattr__(self, name): return lambda *args, **kwargs: None
# ADDED: Dummy class for Self Assessment if import fails
class SimulatedSelfAssessment: # Dummy class
def __init__(self, *args, **kwargs): pass
def __getattr__(self, name): return lambda *args, **kwargs: {"state_summary": "Simulated self-assessment module not available."} # Mock method returning default summary
# --- Logging Setup for Wrapper ---
# Configure logging for the module. This helps in debugging and understanding wrapper behavior.
# Ensure this runs only if basicConfig hasn't been called by imported modules
if not logging.root.handlers:
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
if not logger.handlers: # Check again in case imported modules added handlers
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.propagate = False
logger.setLevel(logging.DEBUG) # Set default level to DEBUG for detailed wrapper logs
# --- Default Configuration Values ---
# These defaults provide sensible starting points for the wrapper's behavior,
# based on common practices and the audit recommendations.
DEFAULT_MAX_LENGTH = 2048 # Increased default max length to accommodate longer CoT
DEFAULT_REASONING_LIMIT = 15 # A conceptual limit for extracted steps (not strictly enforced by parsing logic)
DEFAULT_CONSISTENCY_ROUNDS = 5 # Default number of chains for self-consistency, increased based on typical research
# DEFAULT_COMPLEXITY_KEYWORDS = ["explain", "step by step", "plan", "analyze", "reasoning", "logic"] # Keywords (currently unused as CoT is always on)
DEFAULT_FINAL_ANSWER_TAG = "Final Answer:" # Explicit tag to signal the final answer, reverted to a more common default
# --- Regex Pattern for Parsing Steps ---
# This pattern is used to identify and extract individual reasoning steps from
# the generated text. It's designed to be flexible, capturing common step formats
# like "Step N:", "N.", etc., case-insensitive for "Step".
# Captures the text *after* the step marker.
DEFAULT_STEP_PATTERN = re.compile(
r"^(?:Step\s*\d+[:.)-]\s*|\d+[:.)-]\s*)(.*)", re.IGNORECASE
)
# --- Common Artifact Cleanup Regex ---
# Regex patterns to remove common problematic tokens or structures models sometimes emit,
# which are not part of the desired reasoning or answer. Based on audit suggestion.
ARTIFACT_PATTERNS = [
re.compile(r"<init>.*?</init>", re.DOTALL), # Example: DeepSeek R1 init tags
re.compile(r"<final_output>.*?</final_output>", re.DOTALL), # Example: DeepSeek R1 final output tags
# re.compile(r"\{.*?\}", re.DOTALL), # Removing all {} might be too aggressive, removed based on re-evaluation.
# Add other specific artifact patterns here as needed for observed model outputs
]
# --- Self-Consistency Voting (Defined here, but used by the GUI) ---
# Keep the normalize_answer function here as it's a utility
def normalize_answer(answer: str) -> str:
"""
Normalizes a string answer for robust comparison during voting.
- Converts to lowercase.
- Strips leading/trailing whitespace.
- Removes common punctuation and articles.
- Handles simple cases of number words (e.g., "two" -> "2").
- Removes extra internal whitespace.
"""
if not isinstance(answer, str):
return "" # Handle non-string inputs
normalized = answer.lower().strip()
# Remove common trailing characters like periods, commas, etc.
normalized = re.sub(r'[.,!?;:]+$', '', normalized).strip()
# Remove common leading preambles (case-insensitive)
normalized = re.sub(r'^\s*(?:the answer is|result|output)\s*[:\-]?\s*', '', normalized, flags=re.IGNORECASE).strip()
# Remove common articles (a, an, the) only if they appear at the start of the answer
normalized = re.sub(r'^\s*(a|an|the)\s+', '', normalized, flags=re.IGNORECASE).strip()
# Basic number word to digit conversion for common cases (can be expanded)
num_word_map = {
'zero': '0', 'one': '1', 'two': '2', 'three': '3', 'four': '4',
'five': '5', 'six': '6', 'seven': '7', 'eight': '8', 'nine': '9',
'ten': '10', 'eleven': '11', 'twelve': '12', 'thirteen': '13',
'fourteen': '14', 'fifteen': '15', 'sixteen': '16', 'seventeen': '17',
'eighteen': '18', 'nineteen': '19', 'twenty': '20', 'thirty': '30',
'forty': '40', 'fifty': '50', 'sixty': '60', 'seventy': '70',
'eighty': '80', 'ninety': '90', 'hundred': '100', 'thousand': '1000',
'million': '1000000', 'billion': '1000000000'
}
# Simple word replacement - might fail on "twenty-two" or "one hundred".
# More robust parsing is complex.
words = normalized.split()
normalized_words = [num_word_map.get(word, word) for word in words]
normalized = " ".join(normalized_words)
# Remove extra whitespace within the string (replace multiple spaces with single)
normalized = re.sub(r'\s+', ' ', normalized).strip()
# Remove trailing spaces from the very end again just in case
normalized = normalized.strip()
return normalized
# NOTE: This voting function is for the EXAMPLE USAGE BLOCK only and is NOT
# directly used by the ChainOfThoughtWrapper.generate method.
# It's included here for completeness if the user wanted to test the wrapper
# standalone, but the GUI implements its own voting logic using normalize_answer.
# Removed this function as it's explicitly not used by the wrapper itself and the GUI has its own.
# def perform_self_consistency_voting(...)
# --- ChainOfThoughtWrapper Class (Multimodal Enabled) ---
class ChainOfThoughtWrapper:
"""
ChainOfThoughtWrapper: Orchestrates model generation with CoT prompting
and interacts with AGI helper modules.
Supports multimodal input (image + text) for compatible models
loaded with Hugging Face's AutoModel and AutoProcessor.
"""
def __init__(
self,
model: Union[PreTrainedModel, GenerationMixin, AutoModel, Any], # Accept AutoModel
processor: Union[AutoTokenizer, AutoProcessor, Any], # Accept AutoProcessor (can be AutoTokenizer)
device: Union[str, torch.device],
# cot_template is less critical now as multimodal models often use specific chat templates
# We'll keep a basic CoT prompt component but rely on processor for formatting
cot_instruction: str = "Analyze this step by step to find the answer.",
reasoning_header: str = "Reasoning:",
step_prefix: str = "Step", # e.g., "Step 1: " - model will ideally continue this
final_answer_tag: str = DEFAULT_FINAL_ANSWER_TAG, # Explicit tag to signal the final answer
max_length: int = DEFAULT_MAX_LENGTH # Max length for tokenization (input + output)
):
"""
Initializes the ChainOfThoughtWrapper.
Args:
model (Union[PreTrainedModel, GenerationMixin, AutoModel, Any]): The loaded Hugging Face model.
processor (Union[AutoTokenizer, AutoProcessor, Any]): The loaded Hugging Face processor
(tokenizer or multimodal processor).
device (Union[str, torch.device]): The device the model is on.
cot_instruction (str): The core instruction phrase for CoT.
reasoning_header (str): The header text before the reasoning steps.
step_prefix (str): The prefix for the first step.
final_answer_tag (str): The specific string marker expected before the final answer.
max_length (int): The maximum combined length of input prompt and generated tokens.
"""
logger.debug("ChainOfThoughtWrapper __init__ started.")
self.model = model
self.processor = processor # Store the processor (can be AutoProcessor or AutoTokenizer)
self.device = device
self.cot_instruction = cot_instruction
self.reasoning_header = reasoning_header
self.step_prefix = step_prefix
self.final_answer_tag = final_answer_tag
self.max_length = max_length
self._artifact_patterns = ARTIFACT_PATTERNS # Use default artifact patterns
self.reasoning_steps_limit = DEFAULT_REASONING_LIMIT # Use default limit for parsing
# Determine if the loaded processor has an image processor component -> Multimodal capability flag
# This is how we check if the loaded model/processor pair is multimodal capable for input
self.multimodal_capable = hasattr(self.processor, 'image_processor') and self.processor.image_processor is not None
logger.info(f"Wrapper initialized on {self.device}. Multimodal capability detected: {self.multimodal_capable}")
# Ensure we have a tokenizer, whether the processor is multimodal or text-only
# If processor IS the tokenizer, getattr will return the processor itself.
# CORRECTED: Use getattr to get the tokenizer from the processor
self.tokenizer = getattr(self.processor, 'tokenizer', self.processor)
if self.tokenizer is None:
logger.error("Processor does not contain a tokenizer.")
# Depending on model, this might be fatal. Proceed, but expect errors during tokenization/decoding.
# Handle models/tokenizers without a defined pad_token_id for batch generation
# Only attempt this if a tokenizer was found
if self.tokenizer and self.tokenizer.pad_token_id is None:
if hasattr(self.tokenizer, 'eos_token_id') and self.tokenizer.eos_token_id is not None:
self.tokenizer.pad_token_id = self.tokenizer.eos_token_id
logger.warning("Tokenizer pad_token_id is None, using eos_token_id (%s) as pad_token_id for batching.", self.tokenizer.eos_token_id)
else:
# Fallback: Add a new pad token if neither exists
logger.warning("Tokenizer pad_token_id and eos_token_id are both None. Attempting to add a [PAD] token.")
try:
# Check if the token already exists before adding
if hasattr(self.tokenizer, 'vocab') and '[PAD]' not in self.tokenizer.vocab:
self.tokenizer.add_special_tokens({'pad_token': '[PAD]'})
# Note: Resizing embeddings should ideally happen on the model *after* adding the token.
# The GUI's loading function attempts this, but log if it's needed and might not happen here.
logger.warning("Added new [PAD] token to tokenizer. Model embeddings may need resizing.")
elif not hasattr(self.tokenizer, 'vocab'):
logger.warning("Tokenizer does not have a vocabulary attribute. Cannot check for or add [PAD] token.")
else:
logger.info("[PAD] token already exists in tokenizer vocabulary.")
# After potentially adding the token, set pad_token_id if it's still None
if self.tokenizer.pad_token_id is None and hasattr(self.tokenizer, 'convert_tokens_to_ids'):
self.tokenizer.pad_token_id = self.tokenizer.convert_tokens_to_ids('[PAD]')
logger.info("Set pad_token_id to ID of [PAD] token (%s).", self.tokenizer.pad_token_id)
elif self.tokenizer.pad_token_id is None:
logger.warning("Cannot set pad_token_id as convert_tokens_to_ids method is missing.")
except Exception as e:
logger.error(f"Failed to add [PAD] token or set pad_token_id: {e}")
self.tokenizer.pad_token_id = None # Ensure it stays None if adding fails
logger.warning("Failed to set pad_token_id. Batch generation might fail.")
elif self.tokenizer:
logger.debug("Tokenizer has pad_token_id: %s", self.tokenizer.pad_token_id)
else:
logger.warning("No tokenizer available to check or set pad_token_id.")
# Compile regex pattern for final answer extraction based on the specified tag.
# re.escape handles potential special characters in the tag. re.DOTALL matches newline.
self.final_answer_pattern = re.compile(
re.escape(final_answer_tag) + r"\s*(.*)", re.IGNORECASE | re.DOTALL
)
self._step_pattern = DEFAULT_STEP_PATTERN # Use the default compiled step pattern
logger.debug("Final answer pattern compiled: %s", self.final_answer_pattern.pattern)
logger.debug("Step pattern: %s", self._step_pattern.pattern)
# --- Initialize AGI Helper Modules ---
# Instantiate your AGI components here, only if imports were successful
self.memory_engine = None
self.neuro_processor = None
self.agi_enhancer = None
self.neo_sentient_core = None
# ADDED: Initialize the Self Assessment module
self.self_assessment_module = None # Initialize the attribute
if AGI_IMPORTS_SUCCESS:
try:
self.memory_engine = MemoryEngine()
logger.info("MemoryEngine initialized.")
except Exception as e:
self.memory_engine = None
logger.error(f"Failed to initialize MemoryEngine: {e}")
try:
self.neuro_processor = NeuroMemoryProcessor()
logger.info("NeuroMemoryProcessor initialized.")
except Exception as e:
self.neuro_processor = None
logger.error(f"Failed to initialize NeuroMemoryProcessor: {e}")
try:
self.agi_enhancer = AGIEnhancer()
logger.info("AGIEnhancer initialized.")
except Exception as e:
self.agi_enhancer = None
logger.error(f"Failed to initialize AGIEnhancer: {e}")
try:
self.neo_sentient_core = NeoSentientCore(name="NeoAGI")
logger.info("NeoSentientCore initialized.")
except Exception as e:
self.neo_sentient_core = None
logger.error(f"Failed to initialize NeoSentientCore: {e}")
# ADDED: Initialize the Self Assessment module instance
try:
self.self_assessment_module = SimulatedSelfAssessment()
logger.info("SimulatedSelfAssessment initialized.")
except Exception as e:
self.self_assessment_module = None
logger.error(f"Failed to initialize SimulatedSelfAssessment: {e}")
else:
logger.warning("AGI helper modules were not imported, AGI features will not be available.")
logger.debug("ChainOfThoughtWrapper __init__ finished.")
@torch.no_grad() # Ensure no gradients are calculated during inference
def generate(
self,
input_text: str,
image_data: Optional[List[bytes]] = None, # Accept list of image bytes
multimodal_model: bool = False,
generation_params: Optional[Dict[str, Any]] = None,
chat_history: Optional[List[Dict[str, str]]] = None
) -> Tuple[Optional[List[Dict[str, str]]], Optional[str], Optional[str]]:
"""
Generates a Chain-of-Thought response from the language model, optionally
handling multimodal input (text + image). Integrates AGI helper modules
(MemoryEngine, NeuroProcessor, AGIEnhancer, NeoSentientCore, SelfAssessment)
and includes conversation history in the prompt.
Args:
prompt (str): The user's input prompt (text part).
image (Optional[Image.Image]): The input image, if any.
multimodal_model (bool): True if the loaded model is multimodal.
generation_params (Optional[Dict[str, Any]]): Dictionary of generation parameters
chat_history (Optional[List[Dict[str, str]]]): A list of dictionaries
representing previous turns of the conversation. Each dict
is expected to have keys 'role' ('user' or 'assistant')
and 'content' (the message text).
Returns:
Tuple[Optional[List[Dict[str, str]]], Optional[str], Optional[str]]:
A tuple containing:
1. List of dictionaries representing the parsed CoT steps (or None).
2. The extracted final answer string (or None).
3. The raw body text of the model's response (or None).
"""
logger.debug("Wrapper generate method called.")
# Added check for model generation compatibility at the start of generate
if self.model is None or self.processor is None or self.tokenizer is None or \
not (hasattr(self.model, 'generate') and callable(getattr(self.model, 'generate', None)) or isinstance(self.model, GenerationMixin)):
logger.error("Model, Processor, Tokenizer not loaded or loaded model is not generation compatible.")
# Return an empty result dict to indicate failure, GUI will handle displaying error
return {"full_texts": [], "reasoning_steps": [], "final_answers": [], "generated_images": [], "generation_scores": None}
# Safely get generation parameters
params = generation_params if generation_params is not None else {}
effective_num_return_sequences = params.get("num_return_sequences", 1)
# Use default values if not provided in params
max_new_tokens = params.get("max_new_tokens", 512)
temperature = params.get("temperature", 0.7)
top_k = params.get("top_k", 50)
top_p = params.get("top_p", 1.0)
do_sample = params.get("do_sample", True)
repetition_penalty = params.get("repetition_penalty", 1.1)
no_repeat_ngram_size = params.get("no_repeat_ngram_size", 0)
logger.info(f"Generating {effective_num_return_sequences} sequence(s) with params: {params}")
if image_data:
logger.info(f"Received {len(image_data)} image(s). Wrapper multimodal capable: {self.multimodal_capable}")
# --- AGI Helper Module Interaction (Pre-Generation) ---
# Use NeoSentientCore and AGIEnhancer to add internal state to the prompt
# Adapt to include mention of image data if present
agi_pre_prompt_elements: List[str] = []
if AGI_IMPORTS_SUCCESS and self.neo_sentient_core:
# Simulate perception of the input (text and image presence)
perception_detail = f"User input: '{input_text[:200]}{'...' if len(input_text) > 200 else ''}'"
if image_data:
perception_detail += f" (with {len(image_data)} image(s))"
try:
self.neo_sentient_core.perceive(perception_detail)
logger.debug("NeoSentientCore perceived input.")
except Exception as e:
logger.warning(f"NeoSentientCore perceive failed: {e}")
# Get elements from the AGI core to inject into the prompt
# Decide goal (conceptual)
try:
current_goal = self.neo_sentient_core.decide_goal()
if current_goal and isinstance(current_goal, str): agi_pre_prompt_elements.append(f"Intention: {current_goal.strip()}")
except Exception as e:
logger.warning(f"NeoSentientCore decide_goal failed: {e}")
# Get inner voice (conceptual)
try:
inner_monologue = self.neo_sentient_core.inner_voice()
if inner_monologue and isinstance(inner_monologue, str): agi_pre_prompt_elements.append(f"InnerVoice: {inner_monologue.strip()}")
except Exception as e:
logger.warning(f"NeoSentientCore inner_voice failed: {e}")
# Get qualia token (conceptual emotion priming)
# Using curiosity as a default for exploration, could be more dynamic later
try:
qualia_token = self.neo_sentient_core.generate_qualia_token("curiosity") # Example
if qualia_token and isinstance(qualia_token, str): agi_pre_prompt_elements.insert(0, qualia_token.strip()) # Add qualia at the start
except Exception as e:
logger.warning(f"NeoSentientCore generate_qualia_token failed: {e}")
if AGI_IMPORTS_SUCCESS and self.agi_enhancer:
# Log the experience with the AGIEnhancer
# Pass text and mention image presence
enhancer_experience_detail = f"User input: '{input_text[:200]}{'...' if len(input_text) > 200 else ''}'"
if image_data:
enhancer_experience_detail += f" (with {len(image_data)} image(s))"
try:
self.agi_enhancer.log_experience(enhancer_experience_detail)
logger.debug("AGIEnhancer logged experience.")
except Exception as e:
logger.warning(f"AGIEnhancer log_experience failed: {e}")
self_assessment_summary_text: Optional[str] = None # Use a descriptive name for the summary text
if AGI_IMPORTS_SUCCESS and self.self_assessment_module and \
self.memory_engine and self.neuro_processor and self.neo_sentient_core:
try:
# Gather necessary data snapshots from other modules for the assessment
# These calls assume your other modules have methods like these
recent_reflections_snapshot = self.memory_engine.recall(include_long_term=True, include_working=True, limit=5) # Get some recent memories/reflections
top_biases_snapshot = self.neuro_processor.recall_biases(top_k=10) # Get top biases
synaptic_weights_snapshot = self.neuro_processor.recall_weights(top_k=10) # Get top weights
neo_state_snapshot = self.neo_sentient_core.get_state() # Get core state (emotions, intents, narrative)
current_emotions_snapshot = neo_state_snapshot.get("emotions", {}) # Extract emotions dict
intent_pool_snapshot = neo_state_snapshot.get("intent_pool", []) # Extract intents list
# Assuming AGIEnhancer or NeoSentientCore stores/calculates QRI if used
# You'll need to retrieve QRI data from where you store it if you want it in the assessment
qri_snapshot_data = None # Placeholder - set to actual QRI data if available
# Call the assessment module's main method
assessment_result = self.self_assessment_module.perform_assessment(
recent_reflections=recent_reflections_snapshot,
top_biases=top_biases_snapshot,
synaptic_weights_snapshot=synaptic_weights_snapshot,
current_emotions=current_emotions_snapshot,
intent_pool=intent_pool_snapshot,
# Assuming MemoryEngine trace is accessible, or NeoSentientCore narrative memory
trace_summary=self.memory_engine.get_trace()[-10:] if self.memory_engine and len(self.memory_engine.get_trace()) > 0 else [], # Get recent trace summary
qri_snapshot=qri_snapshot_data # Pass QRI data here if retrieved
)
# Extract the summary text generated by the assessment module
self_assessment_summary_text = assessment_result.get("state_summary", None)
logger.debug("Performed simulated self-assessment and retrieved summary for prompt.")
except Exception as e:
logger.error(f"Failed to perform simulated self-assessment: {e}")
# Provide a default error summary if assessment fails, so the prompt still has something
self_assessment_summary_text = "\n--- Simulated Self-Assessment Error ---\nInternal assessment module encountered an issue and cannot provide a state summary.\n---\n"
# Construct the full prompt including AGI elements, Self-Assessment summary, and CoT template components
# This text will be combined with images by the processor for multimodal models
agi_pre_prompt = "\n".join(agi_pre_prompt_elements) + "\n\n" if agi_pre_prompt_elements else ""
# ADDED: Include the self-assessment summary in the prompt if it was successfully generated
self_assessment_prompt_part = self_assessment_summary_text + "\n\n" if self_assessment_summary_text else ""
# Construct the core CoT prompt string for the text part of the input
# Include instructions, reasoning header, and step prefix to guide the model
cot_instruction_text = (
f"{self.cot_instruction}\n\n"
# Optional: Add an instruction to the model about using the assessment summary
"Based on the provided 'Simulated Internal State Assessment', incorporate insights about your perceived internal state, coherence, and well-being into your response and reasoning process.\n\n"
)
cot_prompt_core_text = (
cot_instruction_text +
f"{self.reasoning_header}\n\n"
f"{self.step_prefix} 1: " # Explicitly start the first step
)
# Combine AGI pre-prompt, Self-Assessment summary, and the core CoT text prompt
history_prompt_part = ""
if chat_history:
logger.debug(f"Including {len(chat_history)} turns in conversation history prompt part.")
formatted_history_lines = []
for turn in chat_history:
role = turn.get('role', 'unknown').capitalize()
raw_content = turn.get('content', '')
if isinstance(raw_content, str):
content = raw_content.strip()
else:
content = str(raw_content).strip()
if role and content:
formatted_history_lines.append(f"{role}: {content}")
# Join history lines with a separator, add a final separator
history_prompt_part = "\n".join(formatted_history_lines) + "\n\n---\n\n" if formatted_history_lines else ""
logger.debug(f"Formatted history prompt part:\n{history_prompt_part[:500]}...") # Log snippet
# Combine history, AGI pre-prompt, Self-Assessment summary, and the core CoT text prompt
# ADDED: Prepend history_prompt_part
full_text_prompt = history_prompt_part + agi_pre_prompt + self_assessment_prompt_part + cot_prompt_core_text
# --- Prepare Multimodal Input ---
input_tensors = {} # Dictionary to hold input tensors
try:
# Use the processor to handle both text and image inputs
# This is the core change for multimodal input processing
# Multimodal models often require a specific format for messages (e.g., interleaved text/image)
# We'll create a simple message structure for the processor: [image(s)], text prompt
messages = []
if image_data and self.multimodal_capable:
for img_bytes in image_data:
try:
img = Image.open(io.BytesIO(img_bytes))
messages.append({"type": "image", "content": img}) # Use PIL Image object
except Exception as e:
logger.warning(f"Could not open image from bytes for processing: {e}. Skipping this image.")
# Decide if you want to continue without the image or raise an error
# For robustness, we'll just skip this image and log a warning
# Append the text part of the prompt as a text message
# It's often beneficial to include the user's original text input as part of the prompt
# for the model to explicitly reference.
# Let's use a simple structure: User Query + [Image(s)] + CoT Guiding text
# Revised message structure for processor:
processor_messages = []
# Add user's original input text first
if input_text and input_text.strip():
processor_messages.append({"type": "text", "content": f"User Input: {input_text.strip()}"})
# Add image messages *after* the initial text input if images are available and wrapper is multimodal
if image_data and self.multimodal_capable and messages: # Check if images were successfully loaded into `messages` list
processor_messages.extend(messages)
logger.debug(f"Prepared {len(messages)} image messages for processor.")
elif image_data and not self.multimodal_capable:
logger.warning("Image data provided but wrapper/model is text-only. Images will be ignored by the processor.")
# Add the core CoT guiding text (AGI + template) as the final text message
# This guides the *output* format regardless of input modality
if full_text_prompt.strip():
processor_messages.append({"type": "text", "content": full_text_prompt.strip()})
elif not processor_messages: # If no text input, no images, and no CoT prompt text, add a default
logger.warning("No text or image content in messages. Adding a default text message.")
processor_messages.append({"type": "text", "content": "Please provide input."})
# Note: An empty prompt might cause issues for some models. This is a safeguard.
# Log the structured messages for debugging
logger.debug(f"Messages prepared for processor: {processor_messages}")
# Use the processor to handle input, adapting based on chat template availability
tokenizer_for_template = getattr(self.processor, 'tokenizer', None) # Access tokenizer via processor
has_chat_template = tokenizer_for_template and hasattr(tokenizer_for_template, 'apply_chat_template') and tokenizer_for_template.chat_template
if hasattr(self.processor, '__call__') and has_chat_template:
# Scenario 1: Processor is callable AND has a chat template
logger.debug("Processor is callable and has a chat template. Using processor's chat template to format messages.")
# apply_chat_template returns a string, so we then tokenize this string
# Use add_generation_prompt=True to ensure the template is completed for the model to generate
chat_prompt_text = tokenizer_for_template.apply_chat_template(processor_messages, tokenize=False, add_generation_prompt=True)
logger.debug(f"Chat template applied. Resulting text prompt: {chat_prompt_text[:200]}...")
# Now tokenize the formatted text prompt
inputs = self.tokenizer( # Use the stored tokenizer from __init__
chat_prompt_text,
return_tensors="pt",
padding="longest",
truncation=True,
max_length=self.max_length,
).to(self.device)
# Need to also process images separately if using chat template, as apply_chat_template is text-only
if image_data and self.multimodal_capable and messages: # Check if images were successfully loaded into `messages` list
image_processor_component = getattr(self.processor, 'image_processor', None)
if image_processor_component:
try:
# Extract PIL Images from the 'messages' list
pil_images = [msg["content"] for msg in messages if msg["type"] == "image" and isinstance(msg["content"], Image.Image)]
if pil_images:
image_inputs = image_processor_component(
pil_images, # Process list of images
return_tensors="pt"
).to(self.device)
# Merge image inputs (pixel_values) with text inputs (input_ids, attention_mask)
inputs.update(image_inputs)
logger.debug(f"Image inputs processed separately and merged for chat template case. Keys now: {inputs.keys()}")
else:
logger.warning("No valid PIL images found in messages despite image_data for chat template case. Skipping image processing.")
except Exception as image_process_e:
logger.error(f"Failed to process image inputs separately for chat template case: {image_process_e}. Generation might fail.")
# Continue with text inputs only, but log error
else:
logger.warning("Processor's image_processor component is missing despite multimodal capability flag for chat template case. Cannot process images.")
elif hasattr(self.processor, '__call__'):
# Scenario 2: Processor is callable but NO chat template.
# Attempt to pass concatenated text and separate image inputs to processor.__call__
logger.debug("Processor is callable but no chat template. Concatenating text messages and processing images separately.")
# Concatenate text content from all text messages
concatenated_text_input = "\n".join([msg["content"] for msg in processor_messages if msg["type"] == "text"])
if not concatenated_text_input.strip() and any(msg["type"] == "image" for msg in processor_messages):
# Handle case where there's only image input but no text input.
# Some multimodal models might still need a minimal text input like "".
logger.warning("No text content in messages, but images are present. Passing empty string as text input.")
concatenated_text_input = ""
elif not concatenated_text_input.strip():
# Handle case with no text and no images
logger.warning("No text or image content in messages. Passing empty string as text input.")
concatenated_text_input = ""
# Duplicate the concatenated text string for batching
text_input_for_processor = [concatenated_text_input] * effective_num_return_sequences
logger.debug(f"Concatenated text input for processor: '{concatenated_text_input[:200]}...' (duplicated {effective_num_return_sequences} times for batching)")
# Process images separately if images are present
image_inputs = {} # Initialize empty image inputs
if image_data and self.multimodal_capable and messages: # Check if images were successfully loaded into `messages` list
image_processor_component = getattr(self.processor, 'image_processor', None)
if image_processor_component:
try:
# Extract PIL Images from the 'messages' list
pil_images = [msg["content"] for msg in messages if msg["type"] == "image" and isinstance(msg["content"], Image.Image)]
if pil_images:
# Process images once and add them.
# Note: For batching num_return_sequences > 1, the model's generate method
# is usually expected to handle the batching dimension for image inputs
# if the image processor outputs batched tensors. If this causes errors,
# model-specific handling might be needed here.
image_inputs = image_processor_component(
pil_images, # Process list of images
return_tensors="pt"
).to(self.device)
logger.debug(f"Image inputs processed separately for callable processor without chat template. Keys now: {image_inputs.keys()}")
else:
logger.warning("No valid PIL images found in messages despite image_data for callable processor without chat template. Skipping image processing.")
except Exception as image_process_e:
logger.error(f"Failed to process image inputs separately for callable processor without chat template: {image_process_e}. Generation might fail.")
# Continue with text inputs only, but log error
else:
logger.warning("Processor's image_processor component is missing despite multimodal capability flag for callable processor without chat template. Cannot process images.")
# Pass the concatenated text (as a list for batching) and image inputs (if any)
# to the processor's __call__ method.
# Assuming the processor.__call__ signature handles this pattern.
inputs = self.processor(
text=text_input_for_processor, # Pass list of strings for batching
**image_inputs, # Unpack image inputs (e.g., pixel_values)
return_tensors="pt",
padding="longest",
truncation=True,
max_length=self.max_length,
).to(self.device)
logger.debug("Input processed using processor.__call__ with concatenated text and separate image inputs.")
elif hasattr(self.processor, 'tokenizer'): # Fallback for text-only models loaded with AutoTokenizer
# Scenario 3: Processor is NOT callable, but HAS a tokenizer (text-only model)
logger.debug("Processor is text-only (using tokenizer). Processing text input only.")
# Use the stored tokenizer from __init__ to process only the combined text prompt
# Combine user input and CoT guiding text for text-only models
# Let's use a simple format: User Input + CoT Template Text
combined_text_for_tokenizer = f"User Input: {input_text.strip()}\n\n{full_text_prompt.strip()}"
inputs = self.tokenizer(
combined_text_for_tokenizer,
return_tensors="pt",
padding="longest",
truncation=True,
max_length=self.max_length,
).to(self.device)
logger.debug("Input processed using tokenizer directly.")
else:
# Safeguard: Should not happen if tokenizer check passes, but as a safeguard
raise TypeError("Loaded processor is neither callable nor contains a tokenizer attribute.")
# ... (rest of input preparation block) ...
# Prepare the input tensors dictionary for the model's generate method
input_tensors = inputs # 'inputs' is already a dictionary or object acting like one
# Log the keys present in the input_tensors for debugging
logger.debug("Input tensors prepared for model.generate. Keys: %s", list(input_tensors.keys()))
if 'input_ids' in input_tensors:
logger.debug("Input IDs shape: %s, dtype: %s, on device: %s", input_tensors['input_ids'].shape, input_tensors['input_ids'].dtype, input_tensors['input_ids'].device)
if 'pixel_values' in input_tensors:
logger.debug("Pixel values shape: %s, dtype: %s, on device: %s", input_tensors['pixel_values'].shape, input_tensors['pixel_values'].dtype, input_tensors['pixel_values'].device)
except Exception as e:
logger.error("Failed to prepare input tensors (tokenization/image processing): %s", e)
# Attempt cleanup before raising
if torch.cuda.is_available(): torch.cuda.empty_cache()
gc.collect()
# Do not re-raise here, return empty lists and let the GUI handle the error
return {"full_texts": [], "reasoning_steps": [], "final_answers": [], "generated_images": [], "generation_scores": None}
# --- Generate Response ---
generated_outputs = None
try:
# Build the final GenerationConfig for this specific call
# Start with a default, then update with provided params
# Ensure pad_token_id and eos_token_id are set from the tokenizer
cfg = GenerationConfig() # Start with an empty config
if self.tokenizer:
# Safely get pad_token_id and eos_token_id, defaulting to None if not found
cfg.pad_token_id = getattr(self.tokenizer, 'pad_token_id', None)
cfg.eos_token_id = getattr(self.tokenizer, 'eos_token_id', None)
else:
logger.warning("Tokenizer not available, GenerationConfig may lack pad/eos tokens.")
# Update config with parameters from the GUI/caller
if params:
# Remove 'self_consistency_enabled' and 'requested_chains' as they are not GenerationConfig parameters
params_for_gen_config = {k: v for k, v in params.items() if k not in ['self_consistency_enabled', 'requested_chains', 'pad_token_id', 'eos_token_id']}
cfg.update(**params_for_gen_config)
logger.debug("Merged generation_params into GenerationConfig.")
# Ensure required parameters for batch generation are set
cfg.num_return_sequences = effective_num_return_sequences
if cfg.num_return_sequences > 1 and not cfg.do_sample:
logger.warning("num_return_sequences > 1 but do_sample is False. Generated sequences will be identical.")
if cfg.do_sample and cfg.temperature == 0:
logger.warning("do_sample is True but temperature is 0. Generation will be deterministic.")
# Ensure max_length or max_new_tokens is handled correctly
# Use max_new_tokens from params if available, otherwise calculate from max_length
# Safely get input_length, defaulting to 0 if input_ids is missing or empty
input_ids_tensor = input_tensors.get('input_ids', torch.tensor([[]]))
input_length = input_ids_tensor.shape[-1] if input_ids_tensor.numel() > 0 else 0
# Prioritize max_new_tokens from input params if provided, otherwise use max_length
if 'max_new_tokens' in params:
cfg.max_new_tokens = params['max_new_tokens']
# Ensure max_length is also set to reflect the potential total length constraint
# Only set cfg.max_length if it's not already explicitly set in params or if it's smaller
# This prevents overwriting a larger desired max_length from a user-provided config object
if cfg.max_length is None or (input_length + cfg.max_new_tokens) < cfg.max_length:
cfg.max_length = input_length + cfg.max_new_tokens if input_length + cfg.max_new_tokens > 0 else None
logger.debug("Using max_new_tokens from params: %s. Calculated total max_length: %s", cfg.max_new_tokens, cfg.max_length)
elif cfg.max_new_tokens is None:
# If max_new_tokens is NOT set in params or default cfg, ensure the total length
# does not exceed the wrapper's max_length limit. Use wrapper's default max_length.
cfg.max_length = min(self.max_length, cfg.max_length if cfg.max_length is not None else self.max_length)
# If max_length is set this way, max_new_tokens should effectively be the difference
cfg.max_new_tokens = max(0, cfg.max_length - input_length) # Ensure it's not negative
logger.debug("max_new_tokens not set in params or default cfg. Using wrapper max_length: %s. Calculated max_new_tokens: %s", cfg.max_length, cfg.max_new_tokens)
else:
# If max_new_tokens was set in default cfg but not params, validate against wrapper's max_length
effective_total_length = input_length + cfg.max_new_tokens
if effective_total_length > self.max_length:
logger.warning("Effective total length (%d) exceeds wrapper max_length (%d). Adjusting max_new_tokens.", effective_total_length, self.max_length)
cfg.max_new_tokens = max(0, self.max_length - input_length)
cfg.max_length = input_length + cfg.max_new_tokens if input_length + cfg.max_new_tokens > 0 else None
logger.warning("Adjusted max_new_tokens to %d.", cfg.max_new_tokens)
else:
# If max_new_tokens was set and is within limits, ensure cfg.max_length is also set correctly
cfg.max_length = input_length + cfg.max_new_tokens if input_length + cfg.max_new_tokens > 0 else None
logger.debug("Using max_new_tokens from default cfg: %s. Calculated total max_length: %s", cfg.max_new_tokens, cfg.max_length)
# Ensure max_length is not None unless input_length + max_new_tokens is 0 or less
if cfg.max_length is None and (input_length + (cfg.max_new_tokens if cfg.max_new_tokens is not None else 0)) > 0:
calculated_max_length = input_length + (cfg.max_new_tokens if cfg.max_new_tokens is not None else 0)
if calculated_max_length > 0:
cfg.max_length = calculated_max_length
else:
cfg.max_length = None # If calculation somehow results in <= 0
# Final check: if max_new_tokens became 0 or less, maybe generation isn't possible?
if cfg.max_new_tokens is not None and cfg.max_new_tokens <= 0:
logger.warning("Calculated max_new_tokens is 0 or less. Generation might return only prompt.")
# Set max_new_tokens to a small value like 1 to attempt at least one new token if possible
if input_length < self.max_length and self.max_length > 0:
cfg.max_new_tokens = 1
# Re-calculate max_length to reflect the adjusted max_new_tokens
cfg.max_length = input_length + cfg.max_new_tokens
logger.warning("Setting max_new_tokens to 1 to attempt minimal generation.")
else:
# If input already max_length or max_length is 0, cannot generate new tokens
cfg.max_new_tokens = 0 # Explicitly 0
logger.warning("Input length is already at max_length or max_length is zero. Cannot generate new tokens (max_new_tokens = 0).")
logger.debug("Final GenerationConfig for this call after resolving params: %s", cfg.to_dict())
# --- Call model.generate ---
# Pass the prepared input tensors (which may include pixel_values) and generation config
# The model's generate method will handle the multimodal input if supported
generated_outputs = self.model.generate(
**input_tensors, # Unpack the input tensors (input_ids, attention_mask, pixel_values, etc.)
generation_config=cfg, # Pass the fully configured GenerationConfig
return_dict_in_generate=True, # Ensure we get a dictionary output
output_scores=True # Request scores if needed (though not used in parsing currently)
)
logger.info(f"Model generation complete. Generated {len(generated_outputs.sequences)} sequences.")
# If scores were requested and returned, they are available in generation_output.scores
generation_scores = generated_outputs.scores if hasattr(generated_outputs, 'scores') else None
if generation_scores is not None: # Check explicitly for None
logger.debug("Generation scores available (%d scores tensors).", len(generation_scores))
except Exception as e:
logger.error("Failed during model generation: %s", e)
# Attempt cleanup before raising
if torch.cuda.is_available(): torch.cuda.empty_cache()
gc.collect()
# Do not re-raise here, return empty lists and let the GUI handle the error
return {"full_texts": [], "reasoning_steps": [], "final_answers": [], "generated_images": [], "generation_scores": None}
# --- Process Generated Outputs ---
full_texts: List[str] = []
reasoning_steps: List[List[str]] = [] # List of lists, one list of steps per sequence
final_answers: List[Optional[str]] = [] # List of final answers per sequence
# Placeholder for future generated images (multimodal output)
generated_images_list: List[Any] = [] # Will store image data if generated
if generated_outputs and hasattr(generated_outputs, 'sequences'):
# Decode the generated token sequences
# Need the tokenizer from the processor
if self.tokenizer is None:
logger.error("Tokenizer is missing. Cannot decode generated sequences.")
# Return empty lists but don't stop processing
else:
# Get the length of the input prompt's token IDs for prompt removal
# Safely get input_length, defaulting to 0 if input_ids is missing or empty
input_ids_tensor = input_tensors.get('input_ids', torch.tensor([[]]))
input_length = input_ids_tensor.shape[-1] if input_ids_tensor.numel() > 0 else 0
logger.debug(f"Input token length determined for prompt removal during decoding: {input_length}")
for i, sequence in enumerate(generated_outputs.sequences):
# Decode the entire generated sequence back to text
# Need to handle potential prompt remnants in the output for causal models.
# A common approach is to find the start of the generation (length of input_ids)
# and decode only from that point onwards.
# Ensure sequence is a tensor before slicing and decoding
if isinstance(sequence, torch.Tensor):
# Decode only the newly generated tokens (after the input prompt)
# Use max(0, input_length) to handle cases where input_length might be negative or zero
# Ensure the slice is valid (sequence might be shorter than input_length in error cases)
start_index = max(0, input_length)
# Use skip_special_tokens=True to remove EOS, BOS, PAD tokens from output text
decoded_text = self.tokenizer.decode(sequence[start_index:], skip_special_tokens=True)
logger.debug(f"Decoded new tokens for sequence {i} (input length {input_length}, decoded from index {start_index}): {decoded_text[:200]}...")
else:
# If sequence is not a tensor, decode the whole thing and log a warning
logger.warning(f"Generated sequence {i} is not a tensor (type: {type(sequence)}). Decoding full sequence and hoping parsing handles it.")
# Decode the full sequence, including potential prompt if it's not handled correctly upstream
decoded_text = self.tokenizer.decode(sequence, skip_special_tokens=True)
logger.debug(f"Decoded full sequence {i}: {decoded_text[:200]}...")
# In a multimodal generation scenario, the output might *also* contain image tokens
# or encoded image data. Extracting those would require model-specific parsing.
# For now, we assume text output, potentially with text-encoded image info that parsing might ignore.
# Placeholder for future image extraction:
# extracted_image_data = self._extract_image_data_from_text(decoded_text) # Conceptual
# Parse the decoded text for CoT steps and final answer
# Pass the original user text and the constructed CoT prompt text for parsing reference
steps, answer, full_output_text_cleaned = self._parse(
decoded_text, # The raw decoded output (just the new tokens part)
input_text, # Original user text input (for potential robust prompt removal in parse)
full_text_prompt # The constructed CoT prompt text (AGI + template) (for potential robust prompt removal in parse)
)
full_texts.append(full_output_text_cleaned) # Append the cleaned output body
reasoning_steps.append(steps)
final_answers.append(answer)
# Append placeholder or extracted image data
# generated_images_list.append(extracted_image_data if extracted_image_data is not None else None)
else:
logger.warning("Model generation did not return sequences in expected format or returned no sequences.")
# Return empty lists
# --- AGI Helper Module Interaction (Post-Generation) ---
# Use NeoSentientCore and AGIEnhancer to process the generated output
# Process the output of the first generated chain as the main experience, if any were generated.
if AGI_IMPORTS_SUCCESS and full_texts:
# Use the first chain's full output text for AGI processing
main_output_text = full_texts[0]
if self.memory_engine:
try:
# Observe the generated output (text)
# Pass text content. Image observation would need adapting MemoryEngine
self.memory_engine.observe(main_output_text)
logger.debug("MemoryEngine observed generated output (text).")
except Exception as e:
logger.warning(f"MemoryEngine observe failed: {e}")
try:
# Save reasoning chains (example: save steps from the first chain)
if reasoning_steps and reasoning_steps[0]:
# Ensure steps list contains strings before saving
valid_steps = [step for step in reasoning_steps[0] if isinstance(step, str) and step.strip()]
if valid_steps:
self.memory_engine.save_reasoning_chain(1, valid_steps) # Save steps from the first chain
logger.debug("MemoryEngine saved reasoning chain (from first chain).")
else:
logger.debug("MemoryEngine skipping saving empty or invalid reasoning chain.")
except Exception as e:
logger.warning(f"MemoryEngine save_reasoning_chain failed: {e}")
# Consider reflecting periodically - this logic should be managed externally or less frequently
# logger.debug("MemoryEngine reflection not called here.")
if self.neuro_processor:
try:
# Record the generation experience (text)
generation_experience_detail = f"Generated response (first chain): {main_output_text[:200]}{'...' if len(main_output_text) > 200 else ''}"
# Pass text content. Image experience would need adapting NeuroMemoryProcessor
self.neuro_processor.record_experience("generation", generation_experience_detail)
logger.debug("NeuroMemoryProcessor recorded generation experience (text).")
except Exception as e:
logger.warning(f"NeuroMemoryProcessor record_experience failed: {e}")
# Update biases based on the output (example: process the text)
# Consider moving to scheduled task
# try:
# self.neuro_processor._evolve_cognitive_bias(main_output_text) # Direct call for simplicity
# logger.debug("NeuroProcessor evolved biases based on output.")
# except Exception as e:
# logger.warning(f"NeuroProcessor _evolve_cognitive_bias failed: {e}")
if self.agi_enhancer:
try:
# Log the generation experience (text)
enhancer_experience_detail = f"Generated response (first chain): {main_output_text[:200]}{'...' if len(main_output_text) > 200 else ''}"
# Pass text content. Image logging would need adapting AGIEnhancer
self.agi_enhancer.log_experience(enhancer_experience_detail)
logger.debug("AGIEnhancer logged experience.")
except Exception as e:
logger.warning(f"AGIEnhancer log_experience failed: {e}")
# Engage in reflection periodically - this logic should be managed externally or less frequently
# logger.debug("AGIEnhancer reflection not called here post-gen.")
# NeoSentientCore post-generation actions (perception of its own output is handled above)
if self.neo_sentient_core:
try:
# Simulate the core processing the generated output (text)
# Assuming NeoSentientCore has a process_output method that accepts text
if hasattr(self.neo_sentient_core, 'process_output'):
self.neo_sentient_core.process_output(main_output_text)
logger.debug("NeoSentientCore processed generated output (text).")
else:
logger.warning("NeoSentientCore does not have a 'process_output' method. Skipping output processing.")
except Exception as e:
logger.warning(f"NeoSentientCore process_output failed: {e}")
# Attempt cleanup after generation attempt (success or failure)
if torch.cuda.is_available():
try:
torch.cuda.empty_cache()
logger.debug("GPU memory cache cleared after generation attempt.")
except Exception as cleanup_e:
logger.warning(f"Error during cuda empty_cache after generation attempt: {cleanup_e}")
pass # Suppress this warning unless in debug mode
gc.collect()
logger.debug("Garbage collection performed after generation attempt.")
# Return the collected results
return {
"full_texts": full_texts,
"reasoning_steps": reasoning_steps,
"final_answers": final_answers,
"generation_scores": generation_scores, # Include scores (will be None if not requested/available)
# In a future multimodal version, generated_images might be included here
"generated_images": generated_images_list # Return the list (might be empty)
}
def _parse(self, text: str, user_input: str, cot_prompt_text: str) -> Tuple[List[str], Optional[str], str]:
"""
Parses one chain’s generated text into steps + final answer.
Handles artifact cleaning. Attempts to handle potential prompt remnants.
Returns: (steps_list, final_answer_string_or_None, cleaned_body_text)
"""
logger.debug("_parse method called.")
# Ensure input is a string
if not isinstance(text, str):
logger.warning(f"Attempted to parse non-string output: {type(text)}. Returning empty.")
return [], None, str(text) # Return empty lists/None and the stringified input
body = text.strip() # Start with the raw decoded text and strip leading/trailing whitespace
# 1) Clean up artifacts using compiled patterns
for pattern in self._artifact_patterns:
body = pattern.sub("", body)
body = body.strip()
logger.debug(f"Text body after artifact cleanup: {body[:200]}...")
# 2) Split into non‐empty lines for parsing
lines = [l.strip() for l in body.splitlines() if l.strip()]
logger.debug(f"Split into {len(lines)} non-empty lines.")
# 3) Extract tagged answer if present
steps: List[str] = []
final_answer: Optional[str] = None # Use Optional[str]
tagged = False
answer_line_index = -1 # Track line index of the answer tag
# Search for the final answer tag *anywhere* in the lines
# Use the compiled pattern
for i, line in enumerate(lines):
m = self.final_answer_pattern.search(line)
if m:
final_answer = m.group(1).strip()
tagged = True
answer_line_index = i # Store the index
logger.debug(f"Found final answer tag on line {i}: '{final_answer[:100]}...'")
break # Stop searching once the tag is found
# 4) Collect steps from the beginning up to the line containing the answer tag (if tagged)
# If not tagged, collect steps from all lines that match the step pattern.
step_lines = []
if tagged and answer_line_index != -1:
# Collect steps from lines *before* the answer line index
step_lines = lines[:answer_line_index]
logger.debug(f"Collecting steps from lines before answer tag (up to line {answer_line_index}).")
else:
# If not tagged, consider all lines for steps
step_lines = lines
logger.debug("Final answer tag not found. Collecting steps from all lines matching step pattern.")
# Extract steps using the step pattern from the identified step lines
for line in step_lines:
m = self._step_pattern.match(line)
if m:
steps.append(m.group(1).strip())
# Apply conceptual limit *during* collection if needed, though parsing is usually fast.
if self.reasoning_steps_limit > 0 and len(steps) >= self.reasoning_steps_limit:
logger.debug("Reached reasoning steps limit (%d). Stopping step collection.", self.reasoning_steps_limit)
break # Stop collecting steps if limit is reached
logger.debug(f"Extracted {len(steps)} reasoning steps.")
# 5) Fallback for final answer if no tagged answer was found
# If no tagged answer was found AND no final_answer was extracted (e.g., tag was empty),
# try to find the last non-step line as the answer.
if not tagged and (final_answer is None or not final_answer.strip()): # Only attempt if no valid tagged answer found
logger.debug("Attempting fallback for final answer...")
# Iterate backwards from the end
# Start from the last line, or just before the answer tag line if tag was found but empty
start_index_for_fallback = answer_line_index if tagged and answer_line_index != -1 else len(lines) -1
for i in range(start_index_for_fallback, -1, -1):
line = lines[i]
# Check if the line is *not* a step line AND is not empty
if line.strip() and not self._step_pattern.match(line):
# Attempt to remove common answer prefixes from the fallback line
fallback_answer_attempt = re.sub(
r"^\s*(?:Answer|Result|Output|Final Answer)\s*[:\-]?\s*",
"",
line, # Use the original line for prefix removal attempt
flags=re.IGNORECASE
).strip()
# If after removing prefixes, the line is not empty, use it as the fallback answer
if fallback_answer_attempt:
final_answer = fallback_answer_attempt
logger.debug("Fallback answer found: '%s'", final_answer[:100])
break # Found the fallback answer
# If removing prefixes resulted in an empty string, maybe the original line is the answer?
elif line.strip():
final_answer = line.strip()
logger.debug("Using last non-empty, non-step line as fallback answer: '%s'", final_answer[:100])
break # Found the fallback answer
logger.debug(f"Final Answer (after fallback): '{final_answer[:100] if final_answer is not None else 'None'}'")
# 6) Final cleanup on the extracted answer
# Remove trailing punctuation that might be part of the model's generation habit
if final_answer is not None:
final_answer = re.sub(r'[.,;:]+$', '', final_answer).strip()
logger.debug(f"Final Answer (after cleanup): '{final_answer[:100] if final_answer is not None else 'None'}'")
logger.debug("Parsing complete. %d steps, Final Answer: '%s'", len(steps), final_answer[:100] if final_answer is not None else 'None')
# Return steps list, final answer string (or None), and cleaned body text
return steps, final_answer, body # Return the cleaned body text
# Add placeholder for potential image data extraction from text output
# This method would be highly model-specific
# Multimodal output is not currently supported by this wrapper's parsing/extraction
# def _extract_image_data_from_text(self, text: str) -> Optional[Any]:
# """
# Conceptual: Extracts encoded image data or image tokens from text output.
# Requires model-specific parsing logic.
# Returns image data or None.
# """
# logger.debug("Attempting to extract image data from text output (not implemented).")
# return None |