|
|
import re |
|
|
from typing import List, Dict, Tuple |
|
|
from tqdm import tqdm |
|
|
from utils.text_utils import TextUtils |
|
|
from config import Config |
|
|
|
|
|
|
|
|
class TextProcessor: |
|
|
"""大规模文本处理器""" |
|
|
|
|
|
def __init__(self): |
|
|
self.text_utils = TextUtils() |
|
|
|
|
|
def chunk_text(self, text: str, chunk_size: int = None, |
|
|
overlap: int = None) -> List[Dict]: |
|
|
"""将长文本分块,保持语义完整性 |
|
|
|
|
|
Args: |
|
|
text: 输入文本 |
|
|
chunk_size: 每块的最大字符数 |
|
|
overlap: 块之间的重叠字符数 |
|
|
|
|
|
Returns: |
|
|
分块结果列表,每个元素包含 text, start, end, chunk_id |
|
|
""" |
|
|
chunk_size = chunk_size or Config.MAX_CHUNK_SIZE |
|
|
overlap = overlap or Config.CHUNK_OVERLAP |
|
|
|
|
|
|
|
|
paragraphs = text.split('\n\n') |
|
|
|
|
|
chunks = [] |
|
|
current_chunk = "" |
|
|
current_start = 0 |
|
|
total_processed = 0 |
|
|
|
|
|
print(f"开始分块处理 (块大小: {chunk_size}, 重叠: {overlap})...") |
|
|
|
|
|
for para in tqdm(paragraphs, desc="分块进度"): |
|
|
para = para.strip() |
|
|
if not para: |
|
|
continue |
|
|
|
|
|
|
|
|
if len(current_chunk) + len(para) + 2 > chunk_size: |
|
|
if current_chunk: |
|
|
|
|
|
chunks.append({ |
|
|
'text': current_chunk.strip(), |
|
|
'start': current_start, |
|
|
'end': current_start + len(current_chunk), |
|
|
'chunk_id': len(chunks) |
|
|
}) |
|
|
|
|
|
|
|
|
if len(current_chunk) > overlap: |
|
|
|
|
|
overlap_text = current_chunk[-overlap:] |
|
|
|
|
|
sentences = self.text_utils.split_into_sentences(overlap_text) |
|
|
if sentences: |
|
|
overlap_text = sentences[-1] if len(sentences) == 1 else ' '.join(sentences[-2:]) |
|
|
else: |
|
|
overlap_text = current_chunk |
|
|
|
|
|
|
|
|
total_processed += len(current_chunk) - len(overlap_text) |
|
|
current_start = total_processed |
|
|
|
|
|
|
|
|
current_chunk = overlap_text + "\n\n" + para |
|
|
else: |
|
|
|
|
|
current_chunk = para |
|
|
current_start = total_processed |
|
|
else: |
|
|
|
|
|
if current_chunk: |
|
|
current_chunk += "\n\n" + para |
|
|
else: |
|
|
current_chunk = para |
|
|
|
|
|
|
|
|
if current_chunk: |
|
|
chunks.append({ |
|
|
'text': current_chunk.strip(), |
|
|
'start': current_start, |
|
|
'end': current_start + len(current_chunk), |
|
|
'chunk_id': len(chunks) |
|
|
}) |
|
|
|
|
|
print(f"✓ 文本分块完成: 总共 {len(chunks)} 块") |
|
|
return chunks |
|
|
|
|
|
def chunk_text_by_tokens(self, text: str, max_tokens: int = 1500, |
|
|
overlap_tokens: int = 150) -> List[Dict]: |
|
|
"""按 token 数量分块(更精确但较慢) |
|
|
|
|
|
Args: |
|
|
text: 输入文本 |
|
|
max_tokens: 每块的最大 token 数 |
|
|
overlap_tokens: 重叠的 token 数 |
|
|
|
|
|
Returns: |
|
|
分块结果列表 |
|
|
""" |
|
|
sentences = self.text_utils.split_into_sentences(text) |
|
|
|
|
|
chunks = [] |
|
|
current_chunk = [] |
|
|
current_tokens = 0 |
|
|
current_start = 0 |
|
|
|
|
|
print(f"按 token 分块处理 (最大: {max_tokens} tokens)...") |
|
|
|
|
|
for sentence in tqdm(sentences, desc="处理句子"): |
|
|
sentence_tokens = self.text_utils.count_tokens(sentence) |
|
|
|
|
|
if current_tokens + sentence_tokens > max_tokens and current_chunk: |
|
|
|
|
|
chunk_text = ' '.join(current_chunk) |
|
|
chunks.append({ |
|
|
'text': chunk_text, |
|
|
'start': current_start, |
|
|
'end': current_start + len(chunk_text), |
|
|
'chunk_id': len(chunks), |
|
|
'token_count': current_tokens |
|
|
}) |
|
|
|
|
|
|
|
|
overlap_chunk = [] |
|
|
overlap_tokens_count = 0 |
|
|
for s in reversed(current_chunk): |
|
|
s_tokens = self.text_utils.count_tokens(s) |
|
|
if overlap_tokens_count + s_tokens <= overlap_tokens: |
|
|
overlap_chunk.insert(0, s) |
|
|
overlap_tokens_count += s_tokens |
|
|
else: |
|
|
break |
|
|
|
|
|
current_chunk = overlap_chunk + [sentence] |
|
|
current_tokens = overlap_tokens_count + sentence_tokens |
|
|
current_start += len(chunk_text) - len(' '.join(overlap_chunk)) |
|
|
else: |
|
|
current_chunk.append(sentence) |
|
|
current_tokens += sentence_tokens |
|
|
|
|
|
|
|
|
if current_chunk: |
|
|
chunk_text = ' '.join(current_chunk) |
|
|
chunks.append({ |
|
|
'text': chunk_text, |
|
|
'start': current_start, |
|
|
'end': current_start + len(chunk_text), |
|
|
'chunk_id': len(chunks), |
|
|
'token_count': current_tokens |
|
|
}) |
|
|
|
|
|
print(f"✓ Token 分块完成: 总共 {len(chunks)} 块") |
|
|
return chunks |
|
|
|
|
|
def extract_dialogues(self, text: str) -> List[Dict]: |
|
|
"""提取对话片段 |
|
|
|
|
|
Args: |
|
|
text: 输入文本 |
|
|
|
|
|
Returns: |
|
|
对话列表,每个元素包含 content, attribution, position |
|
|
""" |
|
|
|
|
|
language = self.text_utils.detect_language(text) |
|
|
|
|
|
dialogues = [] |
|
|
|
|
|
if language == "zh": |
|
|
|
|
|
patterns = [ |
|
|
(r'"([^"]+)"[,,]?\s*([^说道讲告诉问答叫喊]*(?:说|道|讲|告诉|问|答|叫|喊))', 'chinese_quote'), |
|
|
(r'「([^」]+)」[,,]?\s*([^说道讲]*(?:说|道|讲))', 'chinese_bracket'), |
|
|
(r'"([^"]+)"', 'simple_quote'), |
|
|
] |
|
|
else: |
|
|
|
|
|
patterns = [ |
|
|
(r'"([^"]+)",?\s+([A-Z][a-z]+\s+(?:said|asked|replied|shouted|whispered|muttered|exclaimed))', 'english_quote_said'), |
|
|
(r'"([^"]+)"', 'simple_quote'), |
|
|
(r"'([^']+)',?\s+([A-Z][a-z]+\s+said)", 'english_single_quote'), |
|
|
] |
|
|
|
|
|
for pattern, pattern_type in patterns: |
|
|
matches = re.finditer(pattern, text, re.IGNORECASE) |
|
|
for match in matches: |
|
|
dialogue = { |
|
|
'content': match.group(1).strip(), |
|
|
'attribution': match.group(2).strip() if len(match.groups()) > 1 else '', |
|
|
'position': match.start(), |
|
|
'type': pattern_type |
|
|
} |
|
|
|
|
|
if len(dialogue['content']) > 5: |
|
|
dialogues.append(dialogue) |
|
|
|
|
|
|
|
|
dialogues.sort(key=lambda x: x['position']) |
|
|
|
|
|
return dialogues |
|
|
|
|
|
def split_by_chapters(self, text: str) -> List[Dict]: |
|
|
"""按章节分割文本 |
|
|
|
|
|
Args: |
|
|
text: 输入文本 |
|
|
|
|
|
Returns: |
|
|
章节列表,每个元素包含 title, content, chapter_num |
|
|
""" |
|
|
|
|
|
chapter_patterns = [ |
|
|
r'Chapter\s+(\d+)[:\s]*([^\n]*)', |
|
|
r'第([一二三四五六七八九十百千零\d]+)章[:\s]*([^\n]*)', |
|
|
r'CHAPTER\s+([IVXLCDM]+)[:\s]*([^\n]*)', |
|
|
] |
|
|
|
|
|
chapters = [] |
|
|
last_pos = 0 |
|
|
|
|
|
for pattern in chapter_patterns: |
|
|
matches = list(re.finditer(pattern, text, re.IGNORECASE | re.MULTILINE)) |
|
|
|
|
|
if matches: |
|
|
for i, match in enumerate(matches): |
|
|
start = match.start() |
|
|
end = matches[i + 1].start() if i + 1 < len(matches) else len(text) |
|
|
|
|
|
chapters.append({ |
|
|
'chapter_num': match.group(1), |
|
|
'title': match.group(2).strip() if len(match.groups()) > 1 else '', |
|
|
'content': text[start:end].strip(), |
|
|
'start': start, |
|
|
'end': end |
|
|
}) |
|
|
break |
|
|
|
|
|
|
|
|
if not chapters: |
|
|
chapters.append({ |
|
|
'chapter_num': '1', |
|
|
'title': 'Full Text', |
|
|
'content': text, |
|
|
'start': 0, |
|
|
'end': len(text) |
|
|
}) |
|
|
|
|
|
return chapters |
|
|
|
|
|
def get_statistics(self, text: str) -> Dict: |
|
|
"""获取文本统计信息 |
|
|
|
|
|
Args: |
|
|
text: 输入文本 |
|
|
|
|
|
Returns: |
|
|
统计信息字典 |
|
|
""" |
|
|
|
|
|
total_length = len(text) |
|
|
total_tokens = self.text_utils.count_tokens(text) |
|
|
|
|
|
|
|
|
paragraphs = [p for p in text.split('\n\n') if p.strip()] |
|
|
paragraph_count = len(paragraphs) |
|
|
|
|
|
|
|
|
sentences = self.text_utils.split_into_sentences(text) |
|
|
sentence_count = len(sentences) |
|
|
|
|
|
|
|
|
words = re.findall(r'\b\w+\b', text) |
|
|
word_count = len(words) |
|
|
|
|
|
|
|
|
language = self.text_utils.detect_language(text) |
|
|
|
|
|
|
|
|
dialogues = self.extract_dialogues(text[:10000]) |
|
|
dialogue_count = len(dialogues) |
|
|
|
|
|
|
|
|
chapters = self.split_by_chapters(text) |
|
|
chapter_count = len(chapters) |
|
|
|
|
|
return { |
|
|
'total_length': total_length, |
|
|
'total_tokens': total_tokens, |
|
|
'paragraphs': paragraph_count, |
|
|
'sentences': sentence_count, |
|
|
'words': word_count, |
|
|
'language': language, |
|
|
'dialogues': dialogue_count, |
|
|
'chapters': chapter_count, |
|
|
'avg_paragraph_length': total_length // paragraph_count if paragraph_count > 0 else 0, |
|
|
'avg_sentence_length': total_length // sentence_count if sentence_count > 0 else 0, |
|
|
} |
|
|
|
|
|
def clean_text(self, text: str, |
|
|
remove_extra_whitespace: bool = True, |
|
|
normalize_quotes: bool = True) -> str: |
|
|
"""清理文本 |
|
|
|
|
|
Args: |
|
|
text: 输入文本 |
|
|
remove_extra_whitespace: 是否移除多余空白 |
|
|
normalize_quotes: 是否标准化引号 |
|
|
|
|
|
Returns: |
|
|
清理后的文本 |
|
|
""" |
|
|
cleaned = text |
|
|
|
|
|
|
|
|
if remove_extra_whitespace: |
|
|
|
|
|
cleaned = '\n'.join(line.strip() for line in cleaned.split('\n')) |
|
|
|
|
|
cleaned = re.sub(r'\n{3,}', '\n\n', cleaned) |
|
|
|
|
|
cleaned = cleaned.replace('\t', ' ') |
|
|
|
|
|
cleaned = re.sub(r' {2,}', ' ', cleaned) |
|
|
|
|
|
|
|
|
if normalize_quotes: |
|
|
|
|
|
cleaned = cleaned.replace('『', '"').replace('』', '"') |
|
|
cleaned = cleaned.replace('「', '"').replace('」', '"') |
|
|
|
|
|
cleaned = cleaned.replace('"', '"').replace('"', '"') |
|
|
cleaned = cleaned.replace(''', "'").replace(''', "'") |
|
|
|
|
|
return cleaned |
|
|
|
|
|
def extract_metadata(self, text: str) -> Dict: |
|
|
"""提取文本元数据(标题、作者等) |
|
|
|
|
|
Args: |
|
|
text: 输入文本 |
|
|
|
|
|
Returns: |
|
|
元数据字典 |
|
|
""" |
|
|
metadata = { |
|
|
'title': None, |
|
|
'author': None, |
|
|
'year': None, |
|
|
} |
|
|
|
|
|
|
|
|
lines = text.split('\n')[:20] |
|
|
|
|
|
for line in lines: |
|
|
line = line.strip() |
|
|
|
|
|
|
|
|
if not metadata['title'] and len(line) > 5 and len(line) < 100: |
|
|
|
|
|
if line.isupper() or line.istitle(): |
|
|
metadata['title'] = line |
|
|
|
|
|
|
|
|
author_patterns = [ |
|
|
r'by\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)', |
|
|
r'作者[::]\s*(.+)', |
|
|
r'Author[:\s]+(.+)', |
|
|
] |
|
|
|
|
|
for pattern in author_patterns: |
|
|
match = re.search(pattern, line, re.IGNORECASE) |
|
|
if match: |
|
|
metadata['author'] = match.group(1).strip() |
|
|
break |
|
|
|
|
|
|
|
|
year_match = re.search(r'\b(19|20)\d{2}\b', line) |
|
|
if year_match: |
|
|
metadata['year'] = year_match.group(0) |
|
|
|
|
|
return metadata |
|
|
|
|
|
def sample_text(self, text: str, sample_size: int = 1000, |
|
|
strategy: str = 'random') -> str: |
|
|
"""从文本中采样 |
|
|
|
|
|
Args: |
|
|
text: 输入文本 |
|
|
sample_size: 采样大小(字符数) |
|
|
strategy: 采样策略 ('start', 'random', 'distributed') |
|
|
|
|
|
Returns: |
|
|
采样的文本 |
|
|
""" |
|
|
if len(text) <= sample_size: |
|
|
return text |
|
|
|
|
|
if strategy == 'start': |
|
|
|
|
|
return text[:sample_size] |
|
|
|
|
|
elif strategy == 'random': |
|
|
|
|
|
import random |
|
|
start = random.randint(0, len(text) - sample_size) |
|
|
return text[start:start + sample_size] |
|
|
|
|
|
elif strategy == 'distributed': |
|
|
|
|
|
num_samples = 3 |
|
|
sample_per_part = sample_size // num_samples |
|
|
samples = [] |
|
|
|
|
|
for i in range(num_samples): |
|
|
start = (len(text) // num_samples) * i |
|
|
end = min(start + sample_per_part, len(text)) |
|
|
samples.append(text[start:end]) |
|
|
|
|
|
return '\n...\n'.join(samples) |
|
|
|
|
|
else: |
|
|
return text[:sample_size] |