JinrikiHelper / src /text_processor.py
TNOT's picture
完成了单音频或选择文件夹 → Silero VAD 切片 → Whisper 转录 → 生成 .lab → MFA 强制对齐 → 简单导出,已可用
9361148
# -*- coding: utf-8 -*-
"""
文本处理模块
将中文文本转换为拼音,供 MFA 对齐使用
"""
import os
import re
import logging
from pathlib import Path
from typing import Optional, Callable, List, Tuple
logger = logging.getLogger(__name__)
# ==================== 单字转拼音/罗马音 ====================
def char_to_pinyin(char: str, language: str = "chinese") -> Optional[str]:
"""
将单个字符转换为拼音/罗马音
参数:
char: 单个字符
language: 语言 (chinese/japanese)
返回:
拼音/罗马音字符串,无法转换返回 None
"""
if not char or len(char) != 1:
return None
if language in ("chinese", "zh", "mandarin"):
return _chinese_char_to_pinyin(char)
elif language in ("japanese", "ja", "jp"):
return _japanese_char_to_romaji(char)
else:
# 英文字母直接返回小写
if char.isalpha():
return char.lower()
return None
def _chinese_char_to_pinyin(char: str) -> Optional[str]:
"""中文单字转拼音"""
try:
from pypinyin import pinyin, Style
# 数字转中文读法
digit_map = {
'0': 'ling', '1': 'yi', '2': 'er', '3': 'san', '4': 'si',
'5': 'wu', '6': 'liu', '7': 'qi', '8': 'ba', '9': 'jiu',
'0': 'ling', '1': 'yi', '2': 'er', '3': 'san', '4': 'si',
'5': 'wu', '6': 'liu', '7': 'qi', '8': 'ba', '9': 'jiu',
}
if char in digit_map:
return digit_map[char]
# 英文字母按中文读法
letter_map = {
'a': 'ei', 'b': 'bi', 'c': 'xi', 'd': 'di', 'e': 'yi',
'f': 'ai fu', 'g': 'ji', 'h': 'ai qi', 'i': 'ai', 'j': 'jie',
'k': 'kai', 'l': 'ai lu', 'm': 'ai mu', 'n': 'en', 'o': 'ou',
'p': 'pi', 'q': 'kiu', 'r': 'a', 's': 'ai si', 't': 'ti',
'u': 'you', 'v': 'wei', 'w': 'da bu liu', 'x': 'ai ke si',
'y': 'wai', 'z': 'zei',
}
lower_char = char.lower()
if lower_char in letter_map:
# 返回第一个音节
return letter_map[lower_char].split()[0]
# 汉字转拼音
result = pinyin(char, style=Style.NORMAL, heteronym=False)
if result and result[0] and result[0][0]:
return result[0][0].strip()
return None
except ImportError:
logger.error("pypinyin 未安装")
return None
def _japanese_char_to_romaji(char: str) -> Optional[str]:
"""日文单字转罗马音"""
try:
import pykakasi
# 数字转日文读法
digit_map = {
'0': 'zero', '1': 'ichi', '2': 'ni', '3': 'san', '4': 'yon',
'5': 'go', '6': 'roku', '7': 'nana', '8': 'hachi', '9': 'kyuu',
}
if char in digit_map:
return digit_map[char]
kks = pykakasi.kakasi()
result = kks.convert(char)
if result and result[0]:
romaji = result[0].get('hepburn', result[0].get('orig', ''))
return romaji if romaji else None
return None
except ImportError:
logger.error("pykakasi 未安装")
return None
def is_valid_char(char: str, language: str = "chinese") -> bool:
"""
判断字符是否为有效的可转换字符
参数:
char: 单个字符
language: 语言
返回:
是否有效
"""
if not char or len(char) != 1:
return False
# 数字有效
if char.isdigit():
return True
# 英文字母有效
if char.isalpha() and char.isascii():
return True
if language in ("chinese", "zh", "mandarin"):
# 中文字符范围
return '\u4e00' <= char <= '\u9fff' or '\u3400' <= char <= '\u4dbf'
elif language in ("japanese", "ja", "jp"):
# 日文假名和汉字
return (
'\u3040' <= char <= '\u309f' or # 平假名
'\u30a0' <= char <= '\u30ff' or # 片假名
'\u4e00' <= char <= '\u9fff' # 汉字
)
return False
def chinese_to_pinyin(text: str) -> str:
"""
将中文文本转换为拼音(空格分隔)
参数:
text: 中文文本
返回:
拼音字符串,空格分隔
"""
try:
from pypinyin import pinyin, Style
# 获取拼音,不带声调
result = pinyin(text, style=Style.NORMAL, heteronym=False)
# 展平并过滤空值
pinyins = []
for item in result:
if item and item[0]:
py = item[0].strip()
if py:
pinyins.append(py)
return ' '.join(pinyins)
except ImportError:
logger.error("pypinyin 未安装,请运行: pip install pypinyin")
raise
def japanese_to_romaji(text: str) -> str:
"""
将日文文本转换为罗马字
参数:
text: 日文文本
返回:
罗马字字符串,空格分隔
"""
try:
import pykakasi
kks = pykakasi.kakasi()
result = kks.convert(text)
romajis = []
for item in result:
romaji = item.get('hepburn', item.get('orig', ''))
if romaji:
romajis.append(romaji)
return ' '.join(romajis)
except ImportError:
logger.error("pykakasi 未安装,请运行: pip install pykakasi")
raise
def process_lab_file(
lab_path: str,
language: str = "chinese",
output_path: Optional[str] = None
) -> Tuple[bool, str]:
"""
处理单个 .lab 文件,将文本转换为拼音/罗马字
参数:
lab_path: .lab 文件路径
language: 语言 (chinese/japanese)
output_path: 输出路径,默认覆盖原文件
返回:
(成功标志, 转换后的文本或错误信息)
"""
try:
with open(lab_path, 'r', encoding='utf-8') as f:
text = f.read().strip()
if not text:
return False, "空文件"
# 根据语言选择转换函数
if language in ("chinese", "zh", "mandarin"):
converted = chinese_to_pinyin(text)
elif language in ("japanese", "ja", "jp"):
converted = japanese_to_romaji(text)
else:
# 英文或其他语言,保持原样但分词
converted = ' '.join(text.split())
# 写入文件
output = output_path or lab_path
with open(output, 'w', encoding='utf-8') as f:
f.write(converted)
return True, converted
except Exception as e:
logger.error(f"处理 {lab_path} 失败: {e}")
return False, str(e)
def process_lab_directory(
input_dir: str,
language: str = "chinese",
output_dir: Optional[str] = None,
progress_callback: Optional[Callable[[str], None]] = None
) -> Tuple[bool, str, int]:
"""
批量处理目录下的所有 .lab 文件
参数:
input_dir: 输入目录
language: 语言
output_dir: 输出目录,默认覆盖原文件
progress_callback: 进度回调
返回:
(成功标志, 消息, 处理文件数)
"""
def log(msg: str):
logger.info(msg)
if progress_callback:
progress_callback(msg)
try:
lab_files = list(Path(input_dir).glob('*.lab'))
if not lab_files:
return False, "未找到 .lab 文件", 0
log(f"找到 {len(lab_files)} 个 .lab 文件")
if output_dir:
os.makedirs(output_dir, exist_ok=True)
success_count = 0
for i, lab_path in enumerate(lab_files):
output_path = None
if output_dir:
output_path = os.path.join(output_dir, lab_path.name)
success, result = process_lab_file(
str(lab_path),
language,
output_path
)
if success:
success_count += 1
log(f"[{i+1}/{len(lab_files)}] {lab_path.name} -> {result[:30]}...")
else:
log(f"[{i+1}/{len(lab_files)}] {lab_path.name} 失败: {result}")
return True, f"处理完成: {success_count}/{len(lab_files)}", success_count
except Exception as e:
logger.error(f"批量处理失败: {e}", exc_info=True)
return False, str(e), 0