| """
|
| AI Research Paper Helper - Text Processor
|
| Handles text cleaning, section segmentation, and chunking.
|
| """
|
|
|
| import re
|
| from typing import List, Dict, Optional
|
| from dataclasses import dataclass
|
| import logging
|
|
|
| from config import settings
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| @dataclass
|
| class TextChunk:
|
| """Represents a chunk of text with metadata."""
|
| id: str
|
| text: str
|
| section: Optional[str]
|
| start_idx: int
|
| end_idx: int
|
| token_count: int
|
|
|
|
|
| @dataclass
|
| class Section:
|
| """Represents a document section."""
|
| title: str
|
| content: str
|
| section_type: str
|
| level: int
|
|
|
|
|
| class TextProcessor:
|
| """Handles all text processing operations."""
|
|
|
|
|
| SECTION_PATTERNS = {
|
| 'abstract': r'^abstract\s*$',
|
| 'introduction': r'^(1\.?\s*)?introduction\s*$',
|
| 'related_work': r'^(2\.?\s*)?(related\s+work|background|literature\s+review)\s*$',
|
| 'methods': r'^(3\.?\s*)?(method(s|ology)?|approach|model)\s*$',
|
| 'experiments': r'^(4\.?\s*)?(experiment(s)?|evaluation|results)\s*$',
|
| 'results': r'^(5\.?\s*)?(result(s)?|finding(s)?)\s*$',
|
| 'discussion': r'^(6\.?\s*)?discussion\s*$',
|
| 'conclusion': r'^(7\.?\s*)?(conclusion(s)?|summary)\s*$',
|
| 'references': r'^references?\s*$',
|
| 'appendix': r'^appendix\s*'
|
| }
|
|
|
| def __init__(self):
|
| self.chunk_size = settings.chunk_size
|
| self.chunk_overlap = settings.chunk_overlap
|
|
|
| def clean_text(self, text: str) -> str:
|
| """Clean and normalize text."""
|
| if not text:
|
| return ""
|
|
|
|
|
| text = re.sub(r'\s+', ' ', text)
|
|
|
|
|
| text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]', '', text)
|
|
|
|
|
| text = text.replace('"', '"').replace('"', '"')
|
| text = text.replace(''', "'").replace(''', "'")
|
|
|
|
|
| text = re.sub(r' {2,}', ' ', text)
|
|
|
| return text.strip()
|
|
|
| def extract_sections(self, content: str, sections_data: List[Dict] = None) -> List[Section]:
|
| """Extract sections from document content."""
|
| if sections_data:
|
|
|
| return [
|
| Section(
|
| title=s.get('title', ''),
|
| content=self.clean_text(s.get('content', '')),
|
| section_type=self._classify_section(s.get('title', '')),
|
| level=s.get('level', 2)
|
| )
|
| for s in sections_data
|
| ]
|
|
|
|
|
| sections = []
|
| lines = content.split('\n')
|
| current_section = None
|
| current_content = []
|
|
|
| for line in lines:
|
| line = line.strip()
|
| if not line:
|
| continue
|
|
|
|
|
| section_type = self._classify_section(line)
|
| if section_type != 'other' and len(line) < 100:
|
|
|
| if current_section is not None:
|
| sections.append(Section(
|
| title=current_section,
|
| content=self.clean_text(' '.join(current_content)),
|
| section_type=self._classify_section(current_section),
|
| level=2
|
| ))
|
|
|
| current_section = line
|
| current_content = []
|
| else:
|
| current_content.append(line)
|
|
|
|
|
| if current_section is not None:
|
| sections.append(Section(
|
| title=current_section,
|
| content=self.clean_text(' '.join(current_content)),
|
| section_type=self._classify_section(current_section),
|
| level=2
|
| ))
|
|
|
| return sections
|
|
|
| def _classify_section(self, title: str) -> str:
|
| """Classify a section based on its title."""
|
| title_lower = title.lower().strip()
|
|
|
| for section_type, pattern in self.SECTION_PATTERNS.items():
|
| if re.match(pattern, title_lower, re.IGNORECASE):
|
| return section_type
|
|
|
| return 'other'
|
|
|
| def chunk_text(
|
| self,
|
| text: str,
|
| section: Optional[str] = None,
|
| chunk_id_prefix: str = "chunk"
|
| ) -> List[TextChunk]:
|
| """
|
| Split text into overlapping chunks.
|
|
|
| Uses word-based chunking to respect word boundaries.
|
| Overlap ensures context is preserved across chunks.
|
| """
|
| if not text:
|
| return []
|
|
|
| text = self.clean_text(text)
|
| words = text.split()
|
|
|
| if not words:
|
| return []
|
|
|
| chunks = []
|
| chunk_idx = 0
|
| word_idx = 0
|
|
|
|
|
| words_per_chunk = self.chunk_size
|
| overlap_words = self.chunk_overlap
|
|
|
| while word_idx < len(words):
|
|
|
| end_idx = min(word_idx + words_per_chunk, len(words))
|
| chunk_words = words[word_idx:end_idx]
|
| chunk_text = ' '.join(chunk_words)
|
|
|
|
|
| start_char = len(' '.join(words[:word_idx])) + (1 if word_idx > 0 else 0)
|
| end_char = start_char + len(chunk_text)
|
|
|
| chunks.append(TextChunk(
|
| id=f"{chunk_id_prefix}_{chunk_idx}",
|
| text=chunk_text,
|
| section=section,
|
| start_idx=start_char,
|
| end_idx=end_char,
|
| token_count=len(chunk_words)
|
| ))
|
|
|
| chunk_idx += 1
|
| word_idx = end_idx - overlap_words
|
|
|
|
|
| if word_idx >= len(words) or end_idx >= len(words):
|
| break
|
| if word_idx <= end_idx - words_per_chunk:
|
| word_idx = end_idx
|
|
|
| return chunks
|
|
|
| def chunk_document(
|
| self,
|
| content: str,
|
| abstract: Optional[str] = None,
|
| sections: List[Dict] = None
|
| ) -> List[TextChunk]:
|
| """
|
| Chunk an entire document, respecting section boundaries.
|
|
|
| Strategy:
|
| 1. Abstract gets its own chunk(s)
|
| 2. Each section is chunked separately
|
| 3. If no sections, chunk the entire content
|
| """
|
| all_chunks = []
|
| chunk_counter = 0
|
|
|
|
|
| if abstract:
|
| abstract_chunks = self.chunk_text(
|
| abstract,
|
| section="abstract",
|
| chunk_id_prefix=f"chunk_{chunk_counter}"
|
| )
|
| all_chunks.extend(abstract_chunks)
|
| chunk_counter += len(abstract_chunks)
|
|
|
|
|
| if sections:
|
| parsed_sections = self.extract_sections(content, sections)
|
|
|
| for section in parsed_sections:
|
| if section.section_type == 'references':
|
| continue
|
|
|
| section_chunks = self.chunk_text(
|
| section.content,
|
| section=section.title,
|
| chunk_id_prefix=f"chunk_{chunk_counter}"
|
| )
|
| all_chunks.extend(section_chunks)
|
| chunk_counter += len(section_chunks)
|
| else:
|
|
|
| content_chunks = self.chunk_text(
|
| content,
|
| section=None,
|
| chunk_id_prefix=f"chunk_{chunk_counter}"
|
| )
|
| all_chunks.extend(content_chunks)
|
|
|
| return all_chunks
|
|
|
| def extract_sentences(self, text: str) -> List[str]:
|
| """Extract sentences from text."""
|
|
|
| sentences = re.split(r'(?<=[.!?])\s+', text)
|
| return [s.strip() for s in sentences if s.strip()]
|
|
|
| def truncate_to_limit(self, text: str, max_words: int = 500) -> str:
|
| """Truncate text to a maximum number of words."""
|
| words = text.split()
|
| if len(words) <= max_words:
|
| return text
|
| return ' '.join(words[:max_words]) + '...'
|
|
|
|
|
|
|
| _text_processor = None
|
|
|
| def get_text_processor() -> TextProcessor:
|
| """Get the singleton text processor instance."""
|
| global _text_processor
|
| if _text_processor is None:
|
| _text_processor = TextProcessor()
|
| return _text_processor
|
|
|