|
|
| import re
|
| import logging
|
| from typing import List, Tuple
|
| from datetime import datetime
|
| import os
|
| import unicodedata
|
| import nltk
|
|
|
|
|
| nltk.download('punkt', quiet=True)
|
| from nltk.tokenize import sent_tokenize
|
|
|
| class SentenceAnalyzer:
|
| def __init__(self):
|
| self._setup_logger()
|
|
|
|
|
| self.SENTENCE_TYPES = ['exclamation', 'question', 'statement', 'ellipsis', 'quote', 'emphasis']
|
| self.FLAGS = {
|
| 'exclamation': 'EXCL',
|
| 'question': 'QUES',
|
| 'statement': 'STMT',
|
| 'ellipsis': 'ELIP',
|
| 'quote': 'QUOT',
|
| 'emphasis': 'EMPH'
|
| }
|
|
|
| self.logger.info("SentenceAnalyzer initialized successfully")
|
|
|
| def _setup_logger(self):
|
| """Set up logging configuration."""
|
| try:
|
|
|
| os.makedirs('logs', exist_ok=True)
|
|
|
|
|
| current_date = datetime.now().strftime('%Y-%m-%d')
|
| log_file = f'logs/sentence_analyzer_{current_date}.log'
|
|
|
|
|
| self.logger = logging.getLogger('SentenceAnalyzer')
|
| self.logger.setLevel(logging.DEBUG)
|
|
|
|
|
| if self.logger.handlers:
|
| self.logger.handlers.clear()
|
|
|
|
|
| file_handler = logging.FileHandler(log_file, encoding='utf-8')
|
| file_handler.setLevel(logging.DEBUG)
|
|
|
|
|
| console_handler = logging.StreamHandler()
|
| console_handler.setLevel(logging.INFO)
|
|
|
|
|
| formatter = logging.Formatter(
|
| '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
| )
|
| file_handler.setFormatter(formatter)
|
| console_handler.setFormatter(formatter)
|
|
|
|
|
| self.logger.addHandler(file_handler)
|
| self.logger.addHandler(console_handler)
|
|
|
| self.logger.debug("Logger set up successfully")
|
|
|
| except Exception as e:
|
| print(f"Error setting up logger: {str(e)}")
|
| raise
|
|
|
| def split_into_sentences(self, text: str) -> List[str]:
|
| """Split text into sentences using NLTK's sentence tokenizer."""
|
| if not text:
|
| return []
|
|
|
| self.logger.debug("Starting sentence splitting")
|
|
|
|
|
| text = unicodedata.normalize('NFC', text)
|
| self.logger.debug("Normalized text using NFC")
|
|
|
|
|
| text = re.sub(r'Page \d+|Chapter \d+:.*', '', text)
|
| self.logger.debug("Removed page numbers and chapter titles")
|
|
|
|
|
| text = re.sub(r'-\s+\n', '', text)
|
| text = re.sub(r'-\s+', '', text)
|
| self.logger.debug("Replaced hyphenated line breaks")
|
|
|
|
|
| text = re.sub(r'[\r\n]+', ' ', text)
|
| self.logger.debug("Replaced multiple newlines with a space")
|
|
|
|
|
| text = re.sub(r'\s+', ' ', text).strip()
|
| self.logger.debug("Normalized whitespace")
|
|
|
|
|
| sentences = sent_tokenize(text)
|
| self.logger.debug(f"Split text into {len(sentences)} sentences using NLTK")
|
|
|
|
|
| sentences = [sentence.strip() for sentence in sentences if sentence.strip()]
|
| self.logger.info(f"Split text into {len(sentences)} sentences after cleanup")
|
| return sentences
|
|
|
| def analyze_sentence(self, sentence: str) -> Tuple[str, str, str]:
|
| """Analyze a sentence and return its type, color (handled by CSS), and flag."""
|
| if not sentence:
|
| return ('statement', '', self.FLAGS['statement'])
|
|
|
| sentence = sentence.strip()
|
| self.logger.debug(f"Analyzing sentence: '{sentence}'")
|
|
|
|
|
| def has_complete_quote(text):
|
| quote_pairs = [
|
| ('"', '"'),
|
| ("'", "'"),
|
| ('“', '”'),
|
| ('‘', '’'),
|
| ('«', '»')
|
| ]
|
| text = text.strip()
|
| for open_quote, close_quote in quote_pairs:
|
| if text.startswith(open_quote) and text.endswith(close_quote):
|
|
|
| if text.count(open_quote) == text.count(close_quote):
|
| self.logger.debug(f"Sentence starts and ends with matching quotes: {open_quote}{close_quote}")
|
| return True
|
| return False
|
|
|
|
|
| if has_complete_quote(sentence):
|
| sent_type = 'quote'
|
| self.logger.debug("Sentence classified as 'quote'")
|
|
|
| elif re.search(r'\*[^*]+\*', sentence):
|
| sent_type = 'emphasis'
|
| self.logger.debug("Sentence classified as 'emphasis'")
|
|
|
| elif sentence.endswith(('!', '!')):
|
| sent_type = 'exclamation'
|
| self.logger.debug("Sentence classified as 'exclamation'")
|
| elif sentence.endswith(('?', '?')):
|
| sent_type = 'question'
|
| self.logger.debug("Sentence classified as 'question'")
|
| elif sentence.endswith('…') or sentence.endswith('...'):
|
| sent_type = 'ellipsis'
|
| self.logger.debug("Sentence classified as 'ellipsis'")
|
| else:
|
| sent_type = 'statement'
|
| self.logger.debug("Sentence classified as 'statement'")
|
|
|
| color = ''
|
| self.logger.debug(f"Sentence type: {sent_type}, Flag: {self.FLAGS[sent_type]}")
|
| return (sent_type, color, self.FLAGS[sent_type])
|
|
|
| def clean_sentence(self, sentence: str) -> str:
|
| """Remove special characters from the sentence that might confuse TTS models."""
|
|
|
| pattern = r'[^\w\s.,!?\'"“”‘’«»\-—()]'
|
| cleaned_sentence = re.sub(pattern, '', sentence)
|
| self.logger.debug(f"Cleaned sentence: '{cleaned_sentence}'")
|
| return cleaned_sentence
|
|
|
| def process_text_interactive(self, text: str) -> str:
|
| """Process the text and return HTML-formatted output with interactive sentences."""
|
| self.logger.info("Starting interactive text processing")
|
|
|
| if not text:
|
| self.logger.warning("Empty text received")
|
| return ''
|
|
|
| try:
|
|
|
| text = unicodedata.normalize('NFC', text)
|
| self.logger.debug("Normalized text using NFC in interactive processing")
|
|
|
| sentences = self.split_into_sentences(text)
|
| formatted_output = []
|
|
|
| for index, sentence in enumerate(sentences, 1):
|
| sent_type, color, flag = self.analyze_sentence(sentence)
|
|
|
| formatted_sentence = f'''
|
| <div class="sentence-row {sent_type}">
|
| <div class="sentence-number">{index}.</div>
|
| <div class="sentence-content">
|
| {sentence}
|
| </div>
|
| <div class="sentence-type">{sent_type.capitalize()}</div>
|
| </div>
|
| '''
|
| formatted_output.append(formatted_sentence)
|
| self.logger.info(f"Processed sentence {index}/{len(sentences)} - Type: {sent_type}")
|
| self.logger.debug(f"Formatted HTML for sentence {index}: {formatted_sentence}")
|
|
|
| result = ''.join(formatted_output)
|
| self.logger.info("Text processing completed successfully")
|
| return result
|
|
|
| except Exception as e:
|
| self.logger.error(f"Error processing text: {str(e)}", exc_info=True)
|
| return f'<span style="color: red;">Error processing text: {str(e)}</span>'
|
|
|
| def prepare_text_for_tts(self, sentences: List[str]) -> str:
|
| """Prepare the text for TTS by cleaning special characters from each sentence."""
|
| cleaned_sentences = [self.clean_sentence(sentence) for sentence in sentences]
|
| tts_text = ' '.join(cleaned_sentences)
|
| self.logger.debug(f"Prepared text for TTS: '{tts_text}'")
|
| return tts_text
|
|
|
| def process_text(self, text: str) -> str:
|
| """Legacy method for non-interactive processing. Kept for compatibility."""
|
| self.logger.info("Starting text processing (legacy method)")
|
|
|
| if not text:
|
| self.logger.warning("Empty text received")
|
| return ""
|
|
|
| try:
|
|
|
| text = unicodedata.normalize('NFC', text)
|
| self.logger.debug("Normalized text using NFC in legacy processing")
|
|
|
| sentences = self.split_into_sentences(text)
|
| formatted_output = []
|
|
|
| for index, sentence in enumerate(sentences, 1):
|
| sent_type, _, flag = self.analyze_sentence(sentence)
|
|
|
| formatted_sentence = (
|
| f'<span class="{sent_type}" '
|
| f'data-flag="{flag}" '
|
| f'title="Sentence type: {sent_type}">'
|
| f'{sentence}</span>'
|
| )
|
| formatted_output.append(formatted_sentence)
|
| self.logger.info(f"Processed sentence {index}/{len(sentences)} - Type: {sent_type}")
|
| self.logger.debug(f"Formatted HTML for sentence {index}: {formatted_sentence}")
|
|
|
| result = " ".join(formatted_output)
|
| self.logger.info("Text processing completed successfully")
|
| return result
|
|
|
| except Exception as e:
|
| self.logger.error(f"Error processing text: {str(e)}", exc_info=True)
|
| return f'<span style="color: red;">Error processing text: {str(e)}</span>'
|
|
|