from typing import List, Dict, Any import re import logging logger = logging.getLogger(__name__) class Preprocessor: def __init__(self): """Initialize preprocessor without external dependencies""" pass def clean_text(self, text: str) -> str: """Clean and normalize text""" if not text: return "" # Remove extra whitespace text = text.strip() text = re.sub(r'\s+', ' ', text) # Remove special characters but keep punctuation text = re.sub(r'[^\w\s\.\,\!\?\;\:\-\(\)]', '', text) return text.strip() def extract_sentences(self, text: str) -> List[str]: """Extract sentences from text (simplified version without NLTK)""" if not text: return [] # Simple sentence splitting based on punctuation sentences = re.split(r'[.!?]+', text) sentences = [s.strip() for s in sentences if s.strip()] return sentences def tokenize(self, text: str) -> List[str]: """Tokenize text into words (simplified version)""" if not text: return [] # Simple word tokenization words = re.findall(r'\b\w+\b', text.lower()) return words def preprocess_passages(self, passages: List[str]) -> List[Dict[str, Any]]: """Preprocess a list of passages""" processed = [] for i, passage in enumerate(passages): if not passage: continue cleaned = self.clean_text(passage) sentences = self.extract_sentences(cleaned) tokens = self.tokenize(cleaned) processed.append({ 'id': i, 'text': cleaned, 'sentences': sentences, 'tokens': tokens, 'length': len(tokens) }) return processed def preprocess_qa_data(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Preprocess QA data, auto convert dict/list fields to string""" processed = [] def to_str(val): if isinstance(val, dict): # 拼接所有value return " ".join([to_str(v) for v in val.values()]) elif isinstance(val, list): return " ".join([to_str(v) for v in val]) elif val is None: return "" return str(val) for item in data: if not isinstance(item, dict): continue question = to_str(item.get('question', '')) answer = to_str(item.get('answer', '')) context = to_str(item.get('context', '')) processed_item = { 'question': self.clean_text(question), 'answer': self.clean_text(answer), 'context': self.clean_text(context), 'question_tokens': self.tokenize(question), 'answer_tokens': self.tokenize(answer), 'context_tokens': self.tokenize(context) } processed.append(processed_item) return processed def create_chunks(self, text: str, chunk_size: int = 512, overlap: int = 50) -> List[str]: """Create overlapping text chunks""" if not text: return [] tokens = self.tokenize(text) chunks = [] for i in range(0, len(tokens), chunk_size - overlap): chunk_tokens = tokens[i:i + chunk_size] chunk_text = ' '.join(chunk_tokens) chunks.append(chunk_text) return chunks