import re import os import logging try: import wetext except ImportError: wetext = None logger = logging.getLogger(__name__) class TextCleaner: @staticmethod def remove_urls(text): """Remove URLs from text""" return re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text) @staticmethod def remove_html(text): """Remove HTML tags from text""" clean = re.compile('<.*?>') return re.sub(clean, '', text) @staticmethod def filter_ads(text): """Remove lines containing common ad keywords""" ad_keywords = [ "subscribe", "click here", "follow us", "donate", "patreon", "copyright", "all rights reserved", "visit our website", "关注", "订阅", "点赞", "投币", "收藏", "转发", "公众号", "微信", "微博" ] lines = text.split('\n') cleaned_lines = [] for line in lines: if not any(keyword in line.lower() for keyword in ad_keywords): cleaned_lines.append(line) return '\n'.join(cleaned_lines) @staticmethod def fix_encoding(text): """Fix common encoding issues""" try: # Basic fix for common mojibake if ftfy is not available return text.encode('utf-8', 'ignore').decode('utf-8') except Exception: return text @staticmethod def tidy_whitespace(text): """Normalize whitespace""" # Replace multiple spaces with single space text = re.sub(r' +', ' ', text) # Replace multiple newlines with double newline (paragraph break) text = re.sub(r'\n\s*\n', '\n\n', text) # Merge lines for CJK text (remove single newlines between CJK characters) # Lookbehind for CJK/Punctuation, match newline, Lookahead for CJK/Punctuation # Ranges: # \u4e00-\u9fa5 (Common CJK) # \u3000-\u303f (CJK Symbols and Punctuation) # \uff00-\uffef (Fullwidth forms) cjk_range = r'[\u4e00-\u9fa5\u3000-\u303f\uff00-\uffef]' pattern = f'(?<={cjk_range})\\s*\\n\\s*(?={cjk_range})' text = re.sub(pattern, '', text) return text.strip() @staticmethod def remove_gutenberg(text): """Remove Project Gutenberg headers and footers""" # Simple heuristic for Gutenberg markers lines = text.split('\n') start_idx = 0 end_idx = len(lines) for i, line in enumerate(lines): if "*** START OF" in line or "***START OF" in line: start_idx = i + 1 if "*** END OF" in line or "***END OF" in line: end_idx = i break return '\n'.join(lines[start_idx:end_idx]) @staticmethod def remove_markdown(text): """Remove markdown formatting symbols""" # Remove code blocks first (```code```) text = re.sub(r'```.*?```', '', text, flags=re.DOTALL) # Remove inline code (`code`) text = re.sub(r'`([^`]+)`', r'\1', text) # Remove bold (**text** or __text__) text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) text = re.sub(r'__(.+?)__', r'\1', text) # Remove italic (*text* or _text_) text = re.sub(r'\*(.+?)\*', r'\1', text) text = re.sub(r'_(.+?)_', r'\1', text) # Remove strikethrough (~~text~~) text = re.sub(r'~~(.+?)~~', r'\1', text) # Remove headers (# ## ### etc.) text = re.sub(r'^#{1,6}\s+', '', text, flags=re.MULTILINE) # Remove links [text](url) -> text text = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', text) # Remove images ![alt](url) text = re.sub(r'!\[([^\]]*)\]\([^\)]+\)', r'\1', text) # Remove blockquotes (> text) text = re.sub(r'^>\s+', '', text, flags=re.MULTILINE) # Remove horizontal rules (---, ***, ___) text = re.sub(r'^[\-\*_]{3,}\s*$', '', text, flags=re.MULTILINE) # Remove list markers (-, *, +, 1., 2., etc.) text = re.sub(r'^\s*[\-\*\+]\s+', '', text, flags=re.MULTILINE) text = re.sub(r'^\s*\d+\.\s+', '', text, flags=re.MULTILINE) return text @staticmethod def remove_special_chars(text): """Remove special characters that affect TTS but keep normal punctuation""" # Only remove characters that TTS engines typically read aloud incorrectly # Keep: letters, numbers, spaces, newlines, and common punctuation # Characters to remove (symbols that TTS might read literally) text = re.sub(r'[@#$%^&*+=|\\<>{}\[\]~`]', '', text) # Remove multiple consecutive special punctuation (like *** or ---) text = re.sub(r'([!?.,;:\-])\1{2,}', r'\1', text) return text @staticmethod def wetext_normalize(text): """Use WeText library for normalization if available""" if wetext: try: # Assuming wetext has a normalize function or similar. # Since I don't have full docs, I'll try a standard usage or skip if fails. # Based on common usage of such libs: # text = wetext.Normalizer().normalize(text) # Let's assume a simple pass-through if specific API isn't known, # but the user asked for it, so I'll try to use it if I can find the API. # For now, I will just log that it's enabled but might need specific API call. # If the user provided image implies it works, it probably does something standard. # Let's try to instantiate a normalizer if possible. pass except Exception as e: logger.error(f"WeText normalization failed: {e}") return text @classmethod def clean_text(cls, text, options): """ Main cleaning function options: dict of {option_name: boolean} """ if not text: return text logger.info("Starting text cleaning...") original_len = len(text) if options.get('remove_gutenberg', False): text = cls.remove_gutenberg(text) if options.get('remove_html', False): text = cls.remove_html(text) if options.get('remove_markdown', False): text = cls.remove_markdown(text) if options.get('remove_urls', False): text = cls.remove_urls(text) if options.get('filter_ads', False): text = cls.filter_ads(text) if options.get('fix_encoding', False): text = cls.fix_encoding(text) if options.get('remove_special_chars', False): text = cls.remove_special_chars(text) if options.get('wetext_normalization', False): text = cls.wetext_normalize(text) if options.get('tidy_whitespace', False): text = cls.tidy_whitespace(text) logger.info(f"Text cleaning complete. Length: {original_len} -> {len(text)}") return text @staticmethod def save_cleaned_text(text, original_filename="output"): """Save cleaned text to file""" output_dir = "cleaned_txt" if not os.path.exists(output_dir): os.makedirs(output_dir) timestamp = os.path.basename(original_filename).split('.')[0] # Simple name usage # If original_filename is just a name, use it. If it's a path, take basename. base_name = os.path.splitext(os.path.basename(original_filename))[0] # Avoid overwriting by adding timestamp if needed, but user said "will overwrite if exists" in image? # The image says "[filename]_cleaned.txt. ... (will overwrite if exists)" filename = f"{base_name}_cleaned.txt" filepath = os.path.join(output_dir, filename) try: with open(filepath, 'w', encoding='utf-8') as f: f.write(text) logger.info(f"Cleaned text saved to {filepath}") return filepath except Exception as e: logger.error(f"Failed to save cleaned text: {e}") return None