# -*- coding: utf-8 -*- """ 文本处理模块 将中文文本转换为拼音,供 MFA 对齐使用 """ import os import re import logging from pathlib import Path from typing import Optional, Callable, List, Tuple logger = logging.getLogger(__name__) # ==================== 单字转拼音/罗马音 ==================== def char_to_pinyin(char: str, language: str = "chinese") -> Optional[str]: """ 将单个字符转换为拼音/罗马音 参数: char: 单个字符 language: 语言 (chinese/japanese) 返回: 拼音/罗马音字符串,无法转换返回 None """ if not char or len(char) != 1: return None if language in ("chinese", "zh", "mandarin"): return _chinese_char_to_pinyin(char) elif language in ("japanese", "ja", "jp"): return _japanese_char_to_romaji(char) else: # 英文字母直接返回小写 if char.isalpha(): return char.lower() return None def _chinese_char_to_pinyin(char: str) -> Optional[str]: """中文单字转拼音""" try: from pypinyin import pinyin, Style # 数字转中文读法 digit_map = { '0': 'ling', '1': 'yi', '2': 'er', '3': 'san', '4': 'si', '5': 'wu', '6': 'liu', '7': 'qi', '8': 'ba', '9': 'jiu', '0': 'ling', '1': 'yi', '2': 'er', '3': 'san', '4': 'si', '5': 'wu', '6': 'liu', '7': 'qi', '8': 'ba', '9': 'jiu', } if char in digit_map: return digit_map[char] # 英文字母按中文读法 letter_map = { 'a': 'ei', 'b': 'bi', 'c': 'xi', 'd': 'di', 'e': 'yi', 'f': 'ai fu', 'g': 'ji', 'h': 'ai qi', 'i': 'ai', 'j': 'jie', 'k': 'kai', 'l': 'ai lu', 'm': 'ai mu', 'n': 'en', 'o': 'ou', 'p': 'pi', 'q': 'kiu', 'r': 'a', 's': 'ai si', 't': 'ti', 'u': 'you', 'v': 'wei', 'w': 'da bu liu', 'x': 'ai ke si', 'y': 'wai', 'z': 'zei', } lower_char = char.lower() if lower_char in letter_map: # 返回第一个音节 return letter_map[lower_char].split()[0] # 汉字转拼音 result = pinyin(char, style=Style.NORMAL, heteronym=False) if result and result[0] and result[0][0]: return result[0][0].strip() return None except ImportError: logger.error("pypinyin 未安装") return None def _japanese_char_to_romaji(char: str) -> Optional[str]: """日文单字转罗马音""" try: import pykakasi # 数字转日文读法 digit_map = { '0': 'zero', '1': 'ichi', '2': 'ni', '3': 'san', '4': 'yon', '5': 'go', '6': 'roku', '7': 'nana', '8': 'hachi', '9': 'kyuu', } if char in digit_map: return digit_map[char] kks = pykakasi.kakasi() result = kks.convert(char) if result and result[0]: romaji = result[0].get('hepburn', result[0].get('orig', '')) return romaji if romaji else None return None except ImportError: logger.error("pykakasi 未安装") return None def is_valid_char(char: str, language: str = "chinese") -> bool: """ 判断字符是否为有效的可转换字符 参数: char: 单个字符 language: 语言 返回: 是否有效 """ if not char or len(char) != 1: return False # 数字有效 if char.isdigit(): return True # 英文字母有效 if char.isalpha() and char.isascii(): return True if language in ("chinese", "zh", "mandarin"): # 中文字符范围 return '\u4e00' <= char <= '\u9fff' or '\u3400' <= char <= '\u4dbf' elif language in ("japanese", "ja", "jp"): # 日文假名和汉字 return ( '\u3040' <= char <= '\u309f' or # 平假名 '\u30a0' <= char <= '\u30ff' or # 片假名 '\u4e00' <= char <= '\u9fff' # 汉字 ) return False def chinese_to_pinyin(text: str) -> str: """ 将中文文本转换为拼音(空格分隔) 参数: text: 中文文本 返回: 拼音字符串,空格分隔 """ try: from pypinyin import pinyin, Style # 获取拼音,不带声调 result = pinyin(text, style=Style.NORMAL, heteronym=False) # 展平并过滤空值 pinyins = [] for item in result: if item and item[0]: py = item[0].strip() if py: pinyins.append(py) return ' '.join(pinyins) except ImportError: logger.error("pypinyin 未安装,请运行: pip install pypinyin") raise def japanese_to_romaji(text: str) -> str: """ 将日文文本转换为罗马字 参数: text: 日文文本 返回: 罗马字字符串,空格分隔 """ try: import pykakasi kks = pykakasi.kakasi() result = kks.convert(text) romajis = [] for item in result: romaji = item.get('hepburn', item.get('orig', '')) if romaji: romajis.append(romaji) return ' '.join(romajis) except ImportError: logger.error("pykakasi 未安装,请运行: pip install pykakasi") raise def process_lab_file( lab_path: str, language: str = "chinese", output_path: Optional[str] = None ) -> Tuple[bool, str]: """ 处理单个 .lab 文件,将文本转换为拼音/罗马字 参数: lab_path: .lab 文件路径 language: 语言 (chinese/japanese) output_path: 输出路径,默认覆盖原文件 返回: (成功标志, 转换后的文本或错误信息) """ try: with open(lab_path, 'r', encoding='utf-8') as f: text = f.read().strip() if not text: return False, "空文件" # 根据语言选择转换函数 if language in ("chinese", "zh", "mandarin"): converted = chinese_to_pinyin(text) elif language in ("japanese", "ja", "jp"): converted = japanese_to_romaji(text) else: # 英文或其他语言,保持原样但分词 converted = ' '.join(text.split()) # 写入文件 output = output_path or lab_path with open(output, 'w', encoding='utf-8') as f: f.write(converted) return True, converted except Exception as e: logger.error(f"处理 {lab_path} 失败: {e}") return False, str(e) def process_lab_directory( input_dir: str, language: str = "chinese", output_dir: Optional[str] = None, progress_callback: Optional[Callable[[str], None]] = None ) -> Tuple[bool, str, int]: """ 批量处理目录下的所有 .lab 文件 参数: input_dir: 输入目录 language: 语言 output_dir: 输出目录,默认覆盖原文件 progress_callback: 进度回调 返回: (成功标志, 消息, 处理文件数) """ def log(msg: str): logger.info(msg) if progress_callback: progress_callback(msg) try: lab_files = list(Path(input_dir).glob('*.lab')) if not lab_files: return False, "未找到 .lab 文件", 0 log(f"找到 {len(lab_files)} 个 .lab 文件") if output_dir: os.makedirs(output_dir, exist_ok=True) success_count = 0 for i, lab_path in enumerate(lab_files): output_path = None if output_dir: output_path = os.path.join(output_dir, lab_path.name) success, result = process_lab_file( str(lab_path), language, output_path ) if success: success_count += 1 log(f"[{i+1}/{len(lab_files)}] {lab_path.name} -> {result[:30]}...") else: log(f"[{i+1}/{len(lab_files)}] {lab_path.name} 失败: {result}") return True, f"处理完成: {success_count}/{len(lab_files)}", success_count except Exception as e: logger.error(f"批量处理失败: {e}", exc_info=True) return False, str(e), 0