| | import logging
|
| | import re
|
| | from typing import Optional, Dict, Any, List
|
| |
|
| | logger = logging.getLogger(__name__)
|
| |
|
| | class OutputFormatter:
|
| | """
|
| | Formats model responses for better presentation and usability.
|
| | """
|
| |
|
| | def __init__(self):
|
| | """
|
| | Initialize the OutputFormatter.
|
| | """
|
| | self.post_processors = {
|
| | "programming_software_dev": self._format_code,
|
| | "mbpp": self._format_code,
|
| | "machine_learning_ai_data_science": self._format_technical_content,
|
| | "mathematics": self._format_equations,
|
| | "default": self._default_formatter
|
| | }
|
| | logger.info("OutputFormatter initialized")
|
| |
|
| | def format_response(self, response: str, specialization: Optional[str] = None) -> str:
|
| | """
|
| | Format the model response based on specialization.
|
| |
|
| | Args:
|
| | response: The raw response from the model
|
| | specialization: The specialization area (optional)
|
| |
|
| | Returns:
|
| | Formatted response
|
| | """
|
| | if not response:
|
| | return ""
|
| |
|
| |
|
| | formatted_response = self._clean_whitespace(response)
|
| |
|
| |
|
| | processor = self.post_processors.get(specialization, self.post_processors["default"])
|
| | formatted_response = processor(formatted_response)
|
| |
|
| | return formatted_response
|
| |
|
| | def _clean_whitespace(self, text: str) -> str:
|
| | """
|
| | Clean up excessive whitespace.
|
| | """
|
| |
|
| | text = re.sub(r'\n{3,}', '\n\n', text)
|
| |
|
| | text = re.sub(r' {2,}', ' ', text)
|
| | return text.strip()
|
| |
|
| | def _format_code(self, text: str) -> str:
|
| | """
|
| | Format code blocks with proper syntax highlighting markers.
|
| | """
|
| |
|
| |
|
| | code_patterns = [
|
| | r'((?:^|\n)(?:def |class |import |function |public |private |var |let |const |if |for |while ).+(?:\n[ \t]+.+)+)',
|
| | r'((?:^|\n)(?:SELECT |INSERT |UPDATE |DELETE |CREATE |ALTER |DROP ).+(?:;)(?:\n|$))'
|
| | ]
|
| |
|
| | for pattern in code_patterns:
|
| | def add_code_markers(match):
|
| | code_block = match.group(1)
|
| |
|
| | lang = self._detect_language(code_block)
|
| | return f"\n```{lang}\n{code_block}\n```\n"
|
| |
|
| | text = re.sub(pattern, add_code_markers, text)
|
| |
|
| | return text
|
| |
|
| | def _detect_language(self, code_block: str) -> str:
|
| | """
|
| | Attempt to detect the programming language from a code block.
|
| | """
|
| | if re.search(r'def |class |import |if __name__ ==|print\(', code_block):
|
| | return "python"
|
| | elif re.search(r'function |var |const |let |=> |document\.', code_block):
|
| | return "javascript"
|
| | elif re.search(r'public |private |class .+ {|void |String |int |boolean', code_block):
|
| | return "java"
|
| | elif re.search(r'#include|int main|std::|printf|scanf', code_block):
|
| | return "c++"
|
| | elif re.search(r'SELECT |INSERT |UPDATE |DELETE |CREATE TABLE|ALTER TABLE', code_block):
|
| | return "sql"
|
| | else:
|
| | return ""
|
| |
|
| | def _format_equations(self, text: str) -> str:
|
| | """
|
| | Format mathematical equations with LaTeX markers if needed.
|
| | """
|
| |
|
| | equation_patterns = [
|
| | r'([^$])(\\frac{.+?}{.+?}|\\sum_|\\int_|\\lim_)',
|
| | r'([^$])([a-zA-Z]_[0-9]+)',
|
| | r'([^$])([a-zA-Z]\\in)'
|
| | ]
|
| |
|
| | for pattern in equation_patterns:
|
| | text = re.sub(pattern, r'\1$\2$', text)
|
| |
|
| |
|
| | text = re.sub(r'\\begin{equation}(.+?)\\end{equation}', r'$$\1$$', text, flags=re.DOTALL)
|
| |
|
| | return text
|
| |
|
| | def _format_technical_content(self, text: str) -> str:
|
| | """
|
| | Format technical content with proper highlighting of terms and concepts.
|
| | """
|
| |
|
| | technical_terms = [
|
| | "neural network", "machine learning", "deep learning", "algorithm",
|
| | "regression", "classification", "clustering", "backpropagation",
|
| | "gradient descent", "optimization", "hyperparameter"
|
| | ]
|
| |
|
| | for term in technical_terms:
|
| |
|
| | text = re.sub(r'\b(' + re.escape(term) + r')\b(?![*_])', r'*\1*', text)
|
| |
|
| | return text
|
| |
|
| | def _default_formatter(self, text: str) -> str:
|
| | """
|
| | Default formatter that applies general improvements.
|
| | """
|
| |
|
| | text = re.sub(r'(\w\.)\s+([A-Z])', r'\1\n\n\2', text)
|
| |
|
| |
|
| | text = re.sub(r'(?<!\n)(\d+\.)\s+', r'\n\1 ', text)
|
| |
|
| | return text
|
| |
|
| | def format_structured_output(self, data: Dict[str, Any]) -> str:
|
| | """
|
| | Format structured data outputs (like JSON) into readable text.
|
| |
|
| | Args:
|
| | data: Dictionary containing structured data
|
| |
|
| | Returns:
|
| | Formatted string representation
|
| | """
|
| | if not isinstance(data, dict):
|
| | return str(data)
|
| |
|
| | formatted_parts = []
|
| |
|
| |
|
| | if "response" in data:
|
| | formatted_parts.append(self.format_response(data["response"]))
|
| |
|
| |
|
| | metadata = {}
|
| | for key, value in data.items():
|
| | if key != "response" and not key.startswith("_"):
|
| | metadata[key] = value
|
| |
|
| | if metadata:
|
| | formatted_parts.append("\n\n---\n")
|
| | for key, value in metadata.items():
|
| | formatted_parts.append(f"**{key.replace('_', ' ').title()}**: {value}")
|
| |
|
| | return "\n".join(formatted_parts)
|
| |
|