import io import yaml import inspect import asyncio import base64 from abc import ABC, abstractmethod from pydantic import Field from typing import Union, Optional, Type, Callable, List, Any, Dict from ..core.parser import Parser from .model_configs import LLMConfig from ..core.module_utils import ( parse_json_from_text, get_type_name, parse_xml_from_text, parse_data_from_text ) PARSER_VALID_MODE = ["str", "json", "xml", "title", "custom"] class LLMOutputParser(Parser): """A basic parser for LLM-generated content. This parser stores the raw text generated by an LLM in the `.content` attribute and provides methods to extract structured data from this text using different parsing strategies. Attributes: content: The raw text generated by the LLM. """ content: str = Field(default=None, exclude=True, description="the text generated by LLM") @classmethod def get_attrs(cls, return_type: bool = False) -> List[Union[str, tuple]]: """Returns the attributes of the LLMOutputParser class. Excludes ["class_name", "content"] by default. Args: return_type: Whether to return the type of the attributes along with their names. Returns: If `return_type` is True, returns a list of tuples where each tuple contains the attribute name and its type. Otherwise, returns a list of attribute names. """ attrs = [] exclude_attrs = ["class_name", "content"] for field, field_info in cls.model_fields.items(): if field not in exclude_attrs: if return_type: field_type = get_type_name(field_info.annotation) attrs.append((field, field_type)) else: attrs.append(field) return attrs @classmethod def get_attr_descriptions(cls) -> dict: """Returns the attributes and their descriptions. Returns: A dictionary mapping attribute names to their descriptions. """ attrs = cls.get_attrs() results = {} for field_name, field_info in cls.model_fields.items(): if field_name not in attrs: continue field_desc = field_info.description if field_info.description is not None else "None" results[field_name] = field_desc return results @classmethod def get_content_data(cls, content: str, parse_mode: str = "json", parse_func: Optional[Callable] = None, **kwargs) -> dict: """Parses LLM-generated content into a dictionary. This method takes content from an LLM response and converts it to a structured dictionary based on the specified parsing mode. Args: content: The content to parse. parse_mode: The mode to parse the content. Must be one of: - 'str': Assigns the raw text content to all attributes of the parser. - 'json': Extracts and parses JSON objects from LLM output. It will return a dictionary parsed from the first valid JSON string. - 'xml': Parses content using XML tags. It will return a dictionary parsed from the XML tags. - 'title': Parses content with Markdown-style headings. - 'custom': Uses custom parsing logic. Requires providing `parse_func` parameter as a custom parsing function. parse_func: The function to parse the content, only valid when parse_mode is 'custom'. **kwargs (Any): Additional arguments passed to the parsing function. Returns: The parsed content as a dictionary. Raises: ValueError: If parse_mode is invalid or if parse_func is not provided when parse_mode is 'custom'. """ attrs = cls.get_attrs() if len(attrs) <= 0: return {} if parse_mode == "str": parse_func = cls._parse_str_content elif parse_mode == "json": parse_func = cls._parse_json_content elif parse_mode == "xml": parse_func = cls._parse_xml_content elif parse_mode == "title": parse_func = cls._parse_title_content elif parse_mode == "custom": if parse_func is None: raise ValueError("`parse_func` must be provided when `parse_mode` is 'custom'.") # obtain the function inputs signature = inspect.signature(parse_func) if "content" not in signature.parameters: raise ValueError("`parse_func` must have an input argument `content`.") func_args = {} func_args["content"] = content for param_name, param in signature.parameters.items(): if param_name == "content": continue # Already set if param_name in kwargs: func_args[param_name] = kwargs[param_name] data = parse_func(**func_args) if not isinstance(data, dict): raise ValueError(f"The output of `parse_func` must be a dictionary, but found {type(data)}.") return data else: raise ValueError(f"Invalid value '{parse_mode}' detected for `parse_mode`. Available choices: {PARSER_VALID_MODE}") data = parse_func(content=content, **kwargs) return data @classmethod def _parse_str_content(cls, content: str, **kwargs) -> dict: """Parses content by setting all attributes to the raw content. Args: content: The content to parse. **kwargs: Additional arguments (not used). Returns: A dictionary mapping all attributes to the raw content. """ # if `parse_mode=str` in llm.generate(), it will set all the defined parameters to `content` attrs = cls.get_attrs() return {attr: content for attr in attrs} @classmethod def _parse_json_content(cls, content: str, **kwargs) -> dict: """Parses content by extracting and parsing a JSON object. If the content contains multiple JSON objects, only the first one will be used. Args: content: The content containing a JSON object. **kwargs: Additional arguments (not used). Returns: The parsed JSON as a dictionary. Raises: ValueError: If the content doesn't contain a valid JSON object. """ extracted_json_list = parse_json_from_text(content) if len(extracted_json_list) > 0: json_str = extracted_json_list[0] # only use the first JSON try: data = yaml.safe_load(json_str) if not isinstance(data, dict): if isinstance(data, list): # LLM returns a list of JSON strings, without specifying the attribute name attrs = cls.get_attrs() if len(attrs) == 1: # if there is only one attribute, use it as the attribute name return {attrs[0]: data} else: # if there are multiple attributes, raise an error raise ValueError("The generated content is a list of JSON strings, but the attribute name for the list is not specified. You should instruct the LLM to specify the attribute name for the list.") else: raise ValueError(f"The generated content is not a valid JSON string:\n{json_str}") except Exception: raise ValueError(f"The generated content is not a valid JSON string:\n{json_str}") else: raise ValueError(f"The following generated content does not contain JSON string!\n{content}") return data @classmethod def _parse_xml_content(cls, content: str, **kwargs) -> dict: """Parses content by extracting values from XML tags. Each attribute of the parser is expected to be enclosed in XML tags with the attribute name as the tag name. Args: content: The content containing XML tags. **kwargs: Additional arguments (not used). Returns: A dictionary mapping attributes to their extracted values. Raises: ValueError: If the content is missing expected XML tags or if the extracted values can't be converted to the expected types. """ attrs_with_types: List[tuple] = cls.get_attrs(return_type=True) data = {} for attr, attr_type in attrs_with_types: attr_raw_value_list = parse_xml_from_text(text=content, label=attr) if len(attr_raw_value_list) > 0: attr_raw_value = attr_raw_value_list[0] try: attr_value = parse_data_from_text(text=attr_raw_value, datatype=attr_type) except Exception: raise ValueError(f"Cannot parse text: {attr_raw_value} into {attr_type} data!") else: raise ValueError(f"The following generated content does not contain xml label <{attr}>xxx!\n{content}") data[attr] = attr_value return data @classmethod def _parse_title_content(cls, content: str, title_format: str = "## {title}", **kwargs) -> dict: """Parses content with markdown-style titles. Extracts sections from content that are divided by titles following the specified format described in `title_format`. The default format is "## {title}". For example: ``` ## title1 content1 ## title2 content2 ``` This content will be parsed into: ``` { "title1": "content1", "title2": "content2" } ``` Args: content: The content with title-divided sections. title_format: The format of the titles, default is "## {title}". **kwargs: Additional arguments (not used). Returns: A dictionary mapping title names to their section contents. """ attrs: List[str] = cls.get_attrs() if not attrs: return {} output_titles = [title_format.format(title=attr) for attr in attrs] def is_output_title(text: str): for title in output_titles: if text.strip().lower().startswith(title.lower()): return True, title return False, None data = {} current_output_name: str = None current_output_content: list = None for line in content.split("\n"): is_title, title = is_output_title(line) if is_title: if current_output_name is not None and current_output_content is not None: data[current_output_name] = "\n".join(current_output_content) current_output_content = [] current_output_name = title.replace("#", "").strip() output_titles.remove(title) else: if current_output_content is not None: current_output_content.append(line) if current_output_name is not None and current_output_content is not None: data[current_output_name] = "\n".join(current_output_content) return data @classmethod def parse(cls, content: str, parse_mode: str = "json", parse_func: Optional[Callable] = None, **kwargs) -> "LLMOutputParser": """Parses LLM-generated text into a structured parser instance. This is the main method for creating parser instances from LLM output. Args: content: The text generated by the LLM. parse_mode: The mode to parse the content, must be one of: - 'str': Assigns the raw text content to all attributes of the parser. - 'json': Extracts and parses JSON objects from LLM output. Uses the first valid JSON string to create an instance of LLMOutputParser. - 'xml': Parses content using XML tags. Uses the XML tags to create an instance of LLMOutputParser. - 'title': Parses content with Markdown-style headings. Uses the Markdown-style headings to create an instance of LLMOutputParser. The default title format is "## {title}", you can change it by providing `title_format` parameter, which should be a string that contains `{title}` placeholder. - 'custom': Uses custom parsing logic. Requires providing `parse_func` parameter as a custom parsing function. The `parse_func` must have a parameter named `content` and return a dictionary where the keys are the attribute names and the values are the parsed data. parse_func: The function to parse the content, only valid when `parse_mode` is 'custom'. **kwargs (Any): Additional arguments passed to parsing functions, such as: - `title_format` for `parse_mode="title"`. Returns: An instance of LLMOutputParser containing the parsed data. Raises: ValueError: If parse_mode is invalid or if content is not a string. """ if parse_mode not in PARSER_VALID_MODE: raise ValueError(f"'{parse_mode}' is an invalid value for `parse_mode`. Available choices: {PARSER_VALID_MODE}.") if not isinstance(content, str): raise ValueError(f"The input to {cls.__name__}.parse should be a str, but found {type(content)}.") data = cls.get_content_data(content=content, parse_mode=parse_mode, parse_func=parse_func, **kwargs) data.update({"content": content}) parser = cls.from_dict(data, **kwargs) # parser.content = content return parser def __str__(self) -> str: """ Returns a string representation of the parser. """ return self.to_str() def to_str(self, **kwargs) -> str: """ Converts the parser to a string. """ return self.content def get_structured_data(self) -> dict: """Extracts structured data from the parser. Returns: A dictionary containing only the defined attributes and their values, excluding metadata like class_name. """ attrs = type(self).get_attrs() data = self.to_dict(ignore=["class_name"]) # structured_data = {attr: data[attr] for attr in attrs} structured_data = {key: value for key, value in data.items() if key in attrs} return structured_data def _is_multimodal_content(content: Any) -> bool: """Check if content contains multimodal objects (TextChunk, ImageChunk, etc.).""" try: from ..rag.schema import TextChunk, ImageChunk # Handle different content formats if isinstance(content, list): return any(isinstance(item, (TextChunk, ImageChunk)) for item in content) elif isinstance(content, (TextChunk, ImageChunk)): return True return False except ImportError: return False def _process_multimodal_content(content: List[Any], model_type: str = "openai") -> List[Dict[str, Any]]: """Convert multimodal content (TextChunk, ImageChunk) to model-specific message format.""" try: from ..rag.schema import TextChunk, ImageChunk except ImportError: raise ImportError("Cannot import TextChunk/ImageChunk from rag.schema for multimodal processing") processed_content = [] for item in content: if isinstance(item, TextChunk): processed_content.append({ "type": "text", "text": item.text }) elif isinstance(item, ImageChunk): if model_type.lower() in ["openai", "openrouter", "litellm"]: # OpenAI-style format image_data = _get_image_data_url(item) processed_content.append({ "type": "image_url", "image_url": {"url": image_data} }) else: # For other models, include image path/data as appropriate processed_content.append({ "type": "image", "image_path": item.image_path, "image_mimetype": item.image_mimetype }) else: # Handle other types (strings, etc.) if isinstance(item, str): processed_content.append({ "type": "text", "text": item }) else: # Convert to string as fallback processed_content.append({ "type": "text", "text": str(item) }) return processed_content def _get_image_data_url(image_chunk) -> str: """Convert ImageChunk to data URL format for model consumption.""" try: # Load image using the chunk's lazy loading image = image_chunk.get_image() if image is None: raise ValueError(f"Could not load image from path: {image_chunk.image_path}") # Convert to base64 buffer = io.BytesIO() # Determine format from mimetype or default to PNG format_name = "PNG" if image_chunk.image_mimetype: format_name = image_chunk.image_mimetype.split('/')[-1].upper() if format_name not in ['PNG', 'JPEG', 'JPG', 'GIF', 'WEBP']: format_name = "PNG" image.save(buffer, format=format_name) image_data = base64.b64encode(buffer.getvalue()).decode('utf-8') # Create data URL mime_type = image_chunk.image_mimetype or f"image/{format_name.lower()}" return f"data:{mime_type};base64,{image_data}" except Exception as e: raise RuntimeError(f"Failed to convert image to data URL: {str(e)}") class BaseLLM(ABC): """Abstract base class for Large Language Model implementations. This class defines the interface that all LLM implementations must follow, providing methods for generating text, formatting messages, and parsing output. Attributes: config: Configuration for the LLM. kwargs: Additional keyword arguments provided during initialization. """ def __init__(self, config: LLMConfig, **kwargs): """Initializes the LLM with configuration. Args: config: Configuration object for the LLM. **kwargs (Any): Additional keyword arguments. """ self.config = config self.kwargs = kwargs self.init_model() @abstractmethod def init_model(self): """Initializes the underlying model. This method should be implemented by subclasses to set up the actual LLM. """ pass def __deepcopy__(self, memo) -> "BaseLLM": """Handles deep copying of the LLM instance. Returns the same instance when deepcopy is called, as LLM instances often cannot be meaningfully deep-copied. Args: memo (Dict[int, Any]): Memo dictionary used by the deepcopy process. Returns: The same LLM instance. """ # return the same instance when deepcopy memo[id(self)] = self return self @abstractmethod def formulate_messages(self, prompts: List[str], system_messages: Optional[List[str]] = None) -> List[List[dict]]: """Converts input prompts into the chat format compatible with different LLMs. Args: prompts: A list of user prompts that need to be converted. system_messages: An optional list of system messages that provide instructions or context to the model. Returns: A list of message lists, where each inner list contains messages in the chat format required by LLMs. """ pass @abstractmethod def single_generate(self, messages: List[dict], **kwargs) -> str: """Generates LLM output for a single set of messages. Args: messages: The input messages to the LLM in chat format. **kwargs (Any): Additional keyword arguments for generation settings. Returns: The generated output text from the LLM. """ pass @abstractmethod def batch_generate(self, batch_messages: List[List[dict]], **kwargs) -> List[str]: """Generates outputs for a batch of message sets. Args: batch_messages: A list of message lists, where each inner list contains messages for a single generation. **kwargs (Any): Additional keyword arguments for generation settings. Returns: A list of generated outputs from the LLM, one for each input message set. """ pass async def single_generate_async(self, messages: List[dict], **kwargs) -> str: """Asynchronously generates LLM output for a single set of messages. This default implementation wraps the synchronous method in an async executor. Subclasses should override this for true async implementation if supported. Args: messages: The input messages to the LLM in chat format. **kwargs (Any): Additional keyword arguments for generation settings. Returns: The generated output text from the LLM. """ # Default implementation for backward compatibility loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, self.single_generate, messages, **kwargs) return result async def batch_generate_async(self, batch_messages: List[List[dict]], **kwargs) -> List[str]: """Asynchronously generates outputs for a batch of message sets. This default implementation runs each generation as a separate async task. Subclasses should override this for more efficient async batching if supported. Args: batch_messages: A list of message lists, where each inner list contains messages for a single generation. **kwargs (Any): Additional keyword arguments for generation settings. Returns: A list of generated outputs from the LLM, one for each input message set. """ # Default implementation for backward compatibility tasks = [self.single_generate_async(messages, **kwargs) for messages in batch_messages] return await asyncio.gather(*tasks) def parse_generated_text(self, text: str, parser: Optional[Type[LLMOutputParser]]=None, parse_mode: Optional[str] = "json", parse_func: Optional[Callable] = None, **kwargs) -> LLMOutputParser: """Parses generated text into a structured output using a parser. Args: text: The text generated by the LLM. parser: An LLMOutputParser class to use for parsing. If None, the default LLMOutputParser is used. parse_mode: The mode to use for parsing, must be the `parse_mode` supported by the `parser`. **kwargs (Any): Additional arguments passed to the parser. Returns: An LLMOutputParser instance containing the parsed data. """ if not parser: parser = LLMOutputParser return parser.parse(text, parse_mode=parse_mode, parse_func=parse_func) def parse_generated_texts(self, texts: List[str], parser: Optional[Type[LLMOutputParser]]=None, parse_mode: Optional[str] = "json", parse_func: Optional[Callable] = None, **kwargs) -> List[LLMOutputParser]: """Parses multiple generated texts into structured outputs. Args: texts: A list of texts generated by the LLM. parser: An LLMOutputParser class to use for parsing. parse_mode: The mode to use for parsing, must be the `parse_mode` supported by the `parser`. **kwargs (Any): Additional arguments passed to the parser. Returns: A list of LLMOutputParser instances containing the parsed data. """ parsed_results = [self.parse_generated_text(text=text, parser=parser, parse_mode=parse_mode, parse_func=parse_func, **kwargs) for text in texts] return parsed_results def _prepare_messages( self, prompt: Optional[Union[str, List[str]]] = None, system_message: Optional[Union[str, List[str]]] = None, messages: Optional[Union[List[dict],List[List[dict]]]] = None, ) -> tuple[List[List[dict]], bool]: """Prepares and validates input messages for generation. This internal method handles the various input formats (prompt strings, system messages, or pre-formatted message dictionaries) and converts them to a consistent format for generation. Args: prompt: Input prompt(s) to the LLM. system_message: System message(s) for the LLM. messages: Chat message(s) for the LLM, already in the required format. Returns: A tuple containing: - prepared_messages: List of message lists ready for generation - is_single_generate: Boolean indicating if this is a single generation (vs. batch) Raises: ValueError: If neither prompt nor messages is provided, or if both are provided. TypeError: If the inputs have inconsistent types or formats. """ if not (prompt or messages): raise ValueError("Either 'prompt' or 'messages' must be provided.") if prompt and messages: raise ValueError("Both 'prompt' and 'messages' are provided. Please provide only one of them.") single_generate = False if messages is not None: if not messages: # empty messages return [], False if isinstance(messages[0], dict): single_generate = True messages = [messages] # Process multimodal content in messages processed_messages = self._process_messages_for_multimodal(messages) return processed_messages, single_generate if isinstance(prompt, str): single_generate = True prompt = [prompt] if system_message: if not isinstance(system_message, str): raise TypeError(f"'system_message' should be a string when passing a single prompt, but found {type(system_message)}.") system_message = [system_message] elif isinstance(prompt, list) and all(isinstance(p, str) for p in prompt): single_generate = False if not prompt: # empty prompt return [], False if system_message: if not isinstance(system_message, list) or len(prompt) != len(system_message): raise ValueError(f"'system_message' should be a list of string when passing multiple prompts and the number of prompts ({len(prompt)}) must match the number of system messages ({len(system_message)}).") else: raise ValueError(f"'prompt' must be a str or List[str], but found {type(prompt)}.") prepared_messages = self.formulate_messages(prompts=prompt, system_messages=system_message) return prepared_messages, single_generate def _process_messages_for_multimodal(self, messages: List[List[dict]]) -> List[List[dict]]: """Process messages to handle multimodal content (TextChunk, ImageChunk).""" processed_messages = [] for message_list in messages: processed_message_list = [] for message in message_list: processed_message = message.copy() content = message.get("content") # Check if content contains multimodal objects if _is_multimodal_content(content): # Get model type for proper formatting llm_type = getattr(self.config, 'llm_type', 'openai') # Map LLM types to processing types if llm_type.lower() in ["openaillm", "openai"]: model_type = "openai" elif llm_type.lower() in ["litellm"]: model_type = "litellm" elif llm_type.lower() in ["openrouter"]: model_type = "openrouter" else: model_type = "openai" # Default to OpenAI format from ..core.logging import logger logger.debug(f"Processing multimodal content: llm_type={llm_type}, model_type={model_type}") # Convert multimodal content to appropriate format if isinstance(content, list): processed_message["content"] = _process_multimodal_content(content, model_type) else: # Single chunk - wrap in list for processing processed_message["content"] = _process_multimodal_content([content], model_type) processed_message_list.append(processed_message) processed_messages.append(processed_message_list) return processed_messages def generate( self, prompt: Optional[Union[str, List[str]]] = None, system_message: Optional[Union[str, List[str]]] = None, messages: Optional[Union[List[dict],List[List[dict]]]] = None, parser: Optional[Type[LLMOutputParser]] = None, parse_mode: Optional[str] = "json", parse_func: Optional[Callable] = None, **kwargs ) -> Union[LLMOutputParser, List[LLMOutputParser]]: """Generates LLM output(s) and parses the result(s). This is the main method for generating text with the LLM. It handles both single and batch generation, and automatically parses the outputs. Args: prompt: Input prompt(s) to the LLM. system_message: System message(s) for the LLM. messages: Chat message(s) for the LLM, already in the required format (either `prompt` or `messages` must be provided). parser: Parser class to use for processing the output. parse_mode: The mode to use for parsing, must be the `parse_mode` supported by the `parser`. **kwargs (Any): Additional generation configuration parameters. Returns: For single generation: An LLMOutputParser instance. For batch generation: A list of LLMOutputParser instances. Raises: ValueError: If the input format is invalid. Note: Either prompt or messages must be provided. If both or neither is provided, an error will be raised. """ prepared_messages, single_generate = self._prepare_messages(prompt, system_message, messages) if not prepared_messages: # Handle empty messages case return [] generated_texts = self.batch_generate(batch_messages=prepared_messages, **kwargs) parsed_outputs = self.parse_generated_texts(texts=generated_texts, parser=parser, parse_mode=parse_mode, parse_func=parse_func, **kwargs) return parsed_outputs[0] if single_generate else parsed_outputs async def async_generate( self, prompt: Optional[Union[str, List[str]]] = None, system_message: Optional[Union[str, List[str]]] = None, messages: Optional[Union[List[dict],List[List[dict]]]] = None, parser: Optional[Type[LLMOutputParser]] = None, parse_mode: Optional[str] = "json", parse_func: Optional[Callable] = None, **kwargs ) -> Union[LLMOutputParser, List[LLMOutputParser]]: """Asynchronously generates LLM output(s) and parses the result(s). This is the async version of the generate method. It works identically but performs the generation asynchronously. """ prepared_messages, single_generate = self._prepare_messages(prompt, system_message, messages) if not prepared_messages: # Handle empty messages case return [] generated_texts = await self.batch_generate_async(batch_messages=prepared_messages, **kwargs) parsed_outputs = self.parse_generated_texts(texts=generated_texts, parser=parser, parse_mode=parse_mode, parse_func=parse_func, **kwargs) return parsed_outputs[0] if single_generate else parsed_outputs __all__ = ["LLMConfig", "BaseLLM", "LLMOutputParser"]