|
|
import os |
|
|
import re |
|
|
import unicodedata |
|
|
from typing import List, Optional, Dict, Any, Tuple, Literal |
|
|
import logging |
|
|
import json |
|
|
import sys |
|
|
import csv |
|
|
import yaml |
|
|
import jaconv |
|
|
import threading |
|
|
import tkinter as tk |
|
|
from tkinter import ttk, filedialog, messagebox, scrolledtext |
|
|
|
|
|
|
|
|
try: |
|
|
from janome.tokenizer import Tokenizer, Token |
|
|
from janome.analyzer import Analyzer |
|
|
from janome.charfilter import UnicodeNormalizeCharFilter |
|
|
|
|
|
JANOME_AVAILABLE = True |
|
|
except ImportError: |
|
|
JANOME_AVAILABLE = False |
|
|
|
|
|
try: |
|
|
import pyopenjtalk |
|
|
PYOPENJTALK_AVAILABLE = True |
|
|
except ImportError: |
|
|
PYOPENJTALK_AVAILABLE = False |
|
|
except Exception as e: |
|
|
PYOPENJTALK_AVAILABLE = False |
|
|
print(f"警告: pyopenjtalkのインポート中にエラーが発生しました: {e}", file=sys.stderr) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
import tkinter as tk |
|
|
from tkinter import ttk, filedialog, messagebox, scrolledtext |
|
|
ENABLE_GUI = True |
|
|
except ImportError: |
|
|
ENABLE_GUI = False |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class TextProcessingError(Exception): pass |
|
|
class FileProcessingError(TextProcessingError): pass |
|
|
class InitializationError(TextProcessingError): pass |
|
|
class NormalizationError(TextProcessingError): pass |
|
|
|
|
|
|
|
|
class TextNormalizer: |
|
|
def __init__(self, rule_filepath: Optional[str] = None): |
|
|
self.rules = self._load_rules(rule_filepath) |
|
|
logger.info(f"正規化ルールをロードしました (ファイル: {rule_filepath or 'デフォルト'})") |
|
|
self.number_converter = self._create_number_converter() |
|
|
|
|
|
def _load_rules(self, filepath: Optional[str]) -> Dict[str, Any]: |
|
|
|
|
|
default_rules = { |
|
|
'unicode_normalize': 'NFKC', |
|
|
'remove_whitespace': True, |
|
|
'replace_symbols': {'&': ' アンド ', '%': ' パーセント '}, |
|
|
'remove_symbols': r'[「」『』【】[]()<>‘’“”・※→←↑↓*#〜]', |
|
|
'punctuation': {'remove': False, 'replacement': ' <pau> ', 'target': '、。?!'}, |
|
|
'number_conversion': {'enabled': True, 'target': r'\d+([\.,]\d+)?'}, |
|
|
'alphabet_conversion': {'enabled': True, 'rule': 'spellout', 'dictionary': {'AI': 'エーアイ', 'USB': 'ユーエスビー'}}, |
|
|
'custom_replacements_pre': [], |
|
|
'custom_replacements_post': [], |
|
|
} |
|
|
if filepath and os.path.exists(filepath): |
|
|
try: |
|
|
with open(filepath, 'r', encoding='utf-8') as f: |
|
|
loaded_rules = yaml.safe_load(f) |
|
|
|
|
|
for key, value in loaded_rules.items(): |
|
|
if isinstance(value, dict) and isinstance(default_rules.get(key), dict): |
|
|
default_rules[key].update(value) |
|
|
else: |
|
|
default_rules[key] = value |
|
|
logger.info(f"カスタム正規化ルール '{filepath}' をロードしました。") |
|
|
return default_rules |
|
|
except Exception as e: |
|
|
logger.warning(f"正規化ルールファイル '{filepath}' の読み込みに失敗: {e}。デフォルトルールを使用。") |
|
|
return default_rules |
|
|
else: |
|
|
if filepath: logger.warning(f"正規化ルールファイル '{filepath}' が見つかりません。デフォルトルールを使用。") |
|
|
else: logger.info("正規化ルールファイル未指定。デフォルトルールを使用。") |
|
|
return default_rules |
|
|
|
|
|
def _create_number_converter(self): |
|
|
|
|
|
class SimpleNumConverter: |
|
|
def convert(self, num_str: str) -> str: |
|
|
|
|
|
|
|
|
return num_str |
|
|
return SimpleNumConverter() |
|
|
|
|
|
def normalize(self, text: str) -> str: |
|
|
if not isinstance(text, str): return "" |
|
|
try: |
|
|
|
|
|
norm_type = self.rules.get('unicode_normalize') |
|
|
if norm_type: text = unicodedata.normalize(norm_type, text) |
|
|
|
|
|
|
|
|
for rule in self.rules.get('custom_replacements_pre', []): |
|
|
text = re.sub(rule['pattern'], rule['replacement'], text) |
|
|
|
|
|
|
|
|
if self.rules.get('remove_whitespace', True): |
|
|
text = re.sub(r'\s+', ' ', text).strip() |
|
|
|
|
|
|
|
|
num_conf = self.rules.get('number_conversion', {}) |
|
|
if num_conf.get('enabled'): |
|
|
pattern = num_conf.get('target', r'\d+([\.,]\d+)?') |
|
|
try: |
|
|
text = re.sub(pattern, lambda m: self.number_converter.convert(m.group(0)), text) |
|
|
except Exception as e_num: logger.warning(f"数字変換エラー: {e_num}") |
|
|
|
|
|
|
|
|
alpha_conf = self.rules.get('alphabet_conversion', {}) |
|
|
if alpha_conf.get('enabled'): |
|
|
for word, reading in alpha_conf.get('dictionary', {}).items(): |
|
|
|
|
|
text = text.replace(word, f' {reading} ') |
|
|
if alpha_conf.get('rule') == 'spellout': |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
for symbol, reading in self.rules.get('replace_symbols', {}).items(): |
|
|
text = text.replace(symbol, reading) |
|
|
|
|
|
|
|
|
remove_pattern = self.rules.get('remove_symbols') |
|
|
if remove_pattern: |
|
|
text = re.sub(remove_pattern, '', text) |
|
|
|
|
|
|
|
|
punct_conf = self.rules.get('punctuation', {}) |
|
|
target_punct = punct_conf.get('target', '、。?!') |
|
|
punct_pattern = f'[{re.escape(target_punct)}]' |
|
|
if punct_conf.get('remove', True): |
|
|
text = re.sub(punct_pattern, '', text) |
|
|
else: |
|
|
replacement = punct_conf.get('replacement', '<pau>') |
|
|
text = re.sub(punct_pattern, replacement, text) |
|
|
|
|
|
|
|
|
for rule in self.rules.get('custom_replacements_post', []): |
|
|
text = re.sub(rule['pattern'], rule['replacement'], text) |
|
|
|
|
|
|
|
|
if self.rules.get('remove_whitespace', True): |
|
|
text = re.sub(r'\s+', ' ', text).strip() |
|
|
|
|
|
return text |
|
|
except Exception as e: |
|
|
raise NormalizationError(f"正規化失敗 ({text[:20]}...): {e}") |
|
|
|
|
|
|
|
|
|
|
|
class PronunciationEstimator: |
|
|
def __init__( |
|
|
self, |
|
|
engine: Literal['janome', 'pyopenjtalk'] = 'pyopenjtalk', |
|
|
janome_udic_path: Optional[str] = None, |
|
|
jtalk_dic_path: Optional[str] = None, |
|
|
jtalk_user_dic_path: Optional[str] = None |
|
|
): |
|
|
self.engine = engine |
|
|
self.jtalk_user_dic_path = jtalk_user_dic_path |
|
|
self.unknown_words = set() |
|
|
self.janome_analyzer = None |
|
|
self.jtalk_opts = [] |
|
|
|
|
|
if self.engine == 'janome': |
|
|
if not JANOME_AVAILABLE: raise InitializationError("Janome利用不可") |
|
|
try: |
|
|
logger.info(f"Janome初期化 (ユーザー辞書: {janome_udic_path})") |
|
|
char_filters = [UnicodeNormalizeCharFilter()] |
|
|
|
|
|
tokenizer_kwargs = {"udic": janome_udic_path, "udic_enc": "utf8"} if janome_udic_path else {} |
|
|
self.janome_analyzer = Analyzer( |
|
|
char_filters=char_filters, |
|
|
tokenizer=Tokenizer(**tokenizer_kwargs), |
|
|
token_filters=[] |
|
|
) |
|
|
logger.info("Janome初期化完了") |
|
|
except Exception as e: raise InitializationError(f"Janome初期化失敗: {e}") |
|
|
|
|
|
elif self.engine == 'pyopenjtalk': |
|
|
if not PYOPENJTALK_AVAILABLE: raise InitializationError("pyopenjtalk利用不可") |
|
|
try: |
|
|
logger.info("pyopenjtalk初期化...") |
|
|
if jtalk_dic_path: |
|
|
logger.info(f"jtalkシステム辞書パス設定: {jtalk_dic_path}") |
|
|
os.environ['PYOPENJTALK_DICT_PATH'] = jtalk_dic_path |
|
|
|
|
|
if self.jtalk_user_dic_path: |
|
|
logger.warning(f"jtalkユーザー辞書 '{self.jtalk_user_dic_path}' はrun_frontendでは直接使用されません。システム辞書への統合等を検討してください。g2pには影響する可能性があります。") |
|
|
|
|
|
|
|
|
_ = pyopenjtalk.g2p('テスト') |
|
|
logger.info("pyopenjtalk初期化完了") |
|
|
except Exception as e: raise InitializationError(f"pyopenjtalk初期化失敗: {e}") |
|
|
else: |
|
|
raise InitializationError(f"無効なエンジン: {self.engine}") |
|
|
|
|
|
def _get_janome_reading(self, text: str) -> Tuple[str, List[Dict[str, Any]]]: |
|
|
tokens_details = [] |
|
|
katakana_parts = [] |
|
|
try: |
|
|
tokens = self.janome_analyzer.analyze(text) |
|
|
for token in tokens: |
|
|
surf = token.surface |
|
|
pos = token.part_of_speech |
|
|
reading = token.reading if token.reading != '*' else None |
|
|
pron = token.pronunciation if hasattr(token, 'pronunciation') and token.pronunciation != '*' else reading |
|
|
|
|
|
reading_used = reading or pron |
|
|
if reading_used: |
|
|
katakana_parts.append(reading_used) |
|
|
else: |
|
|
katakana_parts.append(jaconv.hira2kata(surf)) |
|
|
if not re.fullmatch(r'[ぁ-んァ-ンー\s]+', surf) and '記号' not in pos: |
|
|
self.unknown_words.add(surf) |
|
|
|
|
|
tokens_details.append({ |
|
|
"surface": surf, "pos": pos, "reading": reading, "pronunciation": pron |
|
|
}) |
|
|
return "".join(katakana_parts), tokens_details |
|
|
except Exception as e: |
|
|
logger.error(f"Janome読み推定エラー: {e}", exc_info=True) |
|
|
return "[READING ERROR]", [] |
|
|
|
|
|
def _get_jtalk_pronunciation(self, text: str) -> Tuple[str, List[Dict[str, Any]], List[str]]: |
|
|
katakana_reading = "[READING ERROR]" |
|
|
tokens_details = [] |
|
|
full_context_labels = [] |
|
|
try: |
|
|
|
|
|
|
|
|
g2p_opts = ['-u', self.jtalk_user_dic_path] if self.jtalk_user_dic_path and os.path.exists(self.jtalk_user_dic_path) else [] |
|
|
|
|
|
katakana_reading = pyopenjtalk.g2p(text, kana=True) |
|
|
|
|
|
|
|
|
|
|
|
full_context_labels = pyopenjtalk.run_frontend(text, self.jtalk_opts) |
|
|
|
|
|
|
|
|
tokens_details = [{"surface": text, "reading": katakana_reading}] |
|
|
|
|
|
|
|
|
if katakana_reading == text and not re.fullmatch(r'[ァ-ンヴー\s]+', text): |
|
|
self.unknown_words.add(text) |
|
|
|
|
|
return katakana_reading, tokens_details, full_context_labels |
|
|
except Exception as e: |
|
|
logger.error(f"pyopenjtalk読み推定エラー: {e}", exc_info=True) |
|
|
return "[READING ERROR]", [], [] |
|
|
|
|
|
def get_pronunciation(self, text: str, output_format: Literal['hiragana', 'katakana'] = 'katakana') -> Tuple[str, List[Dict[str, Any]], List[str]]: |
|
|
reading = "[READING ERROR]" |
|
|
tokens_details = [] |
|
|
lab_data = [] |
|
|
|
|
|
if not text: return "", [], [] |
|
|
|
|
|
if self.engine == 'janome': |
|
|
reading, tokens_details = self._get_janome_reading(text) |
|
|
elif self.engine == 'pyopenjtalk': |
|
|
reading, tokens_details, lab_data = self._get_jtalk_pronunciation(text) |
|
|
|
|
|
|
|
|
if output_format == 'hiragana' and "[ERROR]" not in reading: |
|
|
try: reading = jaconv.kata2hira(reading) |
|
|
except Exception: logger.warning("ひらがな変換失敗", exc_info=True) |
|
|
|
|
|
return reading, tokens_details, lab_data |
|
|
|
|
|
|
|
|
|
|
|
def read_text_with_bom_removal(filepath: str, encoding: str = 'utf-8') -> List[str]: |
|
|
try: |
|
|
with open(filepath, 'rb') as f: raw_data = f.read() |
|
|
if raw_data.startswith(b'\xef\xbb\xbf'): text = raw_data[3:].decode(encoding, errors='replace') |
|
|
elif raw_data.startswith((b'\xff\xfe', b'\xfe\xff')): text = raw_data.decode('utf-16', errors='replace') |
|
|
else: text = raw_data.decode(encoding, errors='replace') |
|
|
return text.splitlines() |
|
|
except FileNotFoundError: raise FileProcessingError(f"ファイルが見つかりません: {filepath}") |
|
|
except Exception as e: raise FileProcessingError(f"ファイル読み込みエラー ({filepath}): {e}") |
|
|
|
|
|
def output_corpus_data(output_filepath_base: str, data: List[Dict[str, Any]], output_formats: List[Literal['tsv', 'jsonl']]) -> None: |
|
|
output_successful = [] |
|
|
if not data: logger.warning(f"{output_filepath_base}*: 出力データなし"); return |
|
|
|
|
|
|
|
|
if 'tsv' in output_formats: |
|
|
fpath = f"{output_filepath_base}.tsv" |
|
|
try: |
|
|
header = ["id", "text", "reading"] |
|
|
|
|
|
with open(fpath, 'w', encoding='utf-8', newline='') as f: |
|
|
writer = csv.DictWriter(f, fieldnames=header, delimiter='\t', quoting=csv.QUOTE_MINIMAL, extrasaction='ignore') |
|
|
writer.writeheader() |
|
|
writer.writerows(data) |
|
|
logger.info(f"TSV出力完了: {fpath}") |
|
|
output_successful.append('tsv') |
|
|
except Exception as e: logger.error(f"TSV出力失敗 ({fpath}): {e}") |
|
|
|
|
|
|
|
|
if 'jsonl' in output_formats: |
|
|
fpath = f"{output_filepath_base}.jsonl" |
|
|
try: |
|
|
with open(fpath, 'w', encoding='utf-8') as f: |
|
|
for item in data: |
|
|
item_copy = {k: v for k, v in item.items() if k != 'lab_data'} |
|
|
f.write(json.dumps(item_copy, ensure_ascii=False) + '\n') |
|
|
logger.info(f"JSONL出力完了: {fpath}") |
|
|
output_successful.append('jsonl') |
|
|
except Exception as e: logger.error(f"JSONL出力失敗 ({fpath}): {e}") |
|
|
|
|
|
if not output_successful and output_formats: |
|
|
raise FileProcessingError("指定形式での出力に成功せず") |
|
|
|
|
|
def output_lab_data(output_filepath_base: str, all_lab_data: Dict[str, List[str]]) -> None: |
|
|
lab_dir = f"{output_filepath_base}_lab" |
|
|
try: |
|
|
os.makedirs(lab_dir, exist_ok=True) |
|
|
count = 0 |
|
|
for file_id, lab_lines in all_lab_data.items(): |
|
|
if lab_lines: |
|
|
fpath = os.path.join(lab_dir, f"{file_id}.lab") |
|
|
with open(fpath, 'w', encoding='utf-8') as f: |
|
|
f.write("\n".join(lab_lines)) |
|
|
count += 1 |
|
|
logger.info(f"LABデータ出力完了: {count}ファイル -> {lab_dir}") |
|
|
except Exception as e: logger.error(f"LABデータ出力失敗: {e}") |
|
|
|
|
|
def output_comparison_data(output_filepath_base: str, comparison_data: List[Dict[str, str]]) -> None: |
|
|
"""オプション: 元テキスト、正規化テキスト、読みの比較データをTSVで出力""" |
|
|
fpath = f"{output_filepath_base}_comparison.tsv" |
|
|
try: |
|
|
if not comparison_data: return |
|
|
header = ["id", "original", "normalized", "reading"] |
|
|
with open(fpath, 'w', encoding='utf-8', errors='replace', newline='') as tsvfile: |
|
|
writer = csv.DictWriter(tsvfile, fieldnames=header, delimiter='\t', quoting=csv.QUOTE_MINIMAL) |
|
|
writer.writeheader() |
|
|
writer.writerows(comparison_data) |
|
|
logger.info(f"比較データ出力完了: {fpath}") |
|
|
except Exception as e: |
|
|
logger.error(f"比較データ出力失敗 ({fpath}): {e}") |
|
|
|
|
|
|
|
|
|
|
|
def process_file( |
|
|
filepath: str, output_folder: str, normalizer: TextNormalizer, pron_estimator: PronunciationEstimator, |
|
|
output_formats: List[Literal['tsv', 'jsonl']], lab_output_enabled: bool, reading_output_format: Literal['hiragana', 'katakana'], |
|
|
encoding: str, output_comparison: bool = False, |
|
|
) -> Tuple[int, int]: |
|
|
"""ファイルを処理し、コーパスデータを出力""" |
|
|
filename = os.path.basename(filepath) |
|
|
output_base = os.path.join(output_folder, os.path.splitext(filename)[0]) |
|
|
processed_data: List[Dict[str, Any]] = [] |
|
|
comparison_items: List[Dict[str, str]] = [] |
|
|
all_lab_data: Dict[str, List[str]] = {} |
|
|
success_count = 0 |
|
|
error_count = 0 |
|
|
|
|
|
try: |
|
|
original_lines = read_text_with_bom_removal(filepath, encoding) |
|
|
logger.debug(f"'{filename}'読み込み完了、{len(original_lines)}行") |
|
|
|
|
|
for i, line in enumerate(original_lines): |
|
|
line_id = f"{os.path.splitext(filename)[0]}_{i:04d}" |
|
|
original_line = line.strip() |
|
|
if not original_line: continue |
|
|
|
|
|
normalized_line = "[NORM ERROR]" |
|
|
reading = "[READING ERROR]" |
|
|
lab_data = [] |
|
|
line_success = False |
|
|
|
|
|
try: |
|
|
normalized_line = normalizer.normalize(original_line) |
|
|
if not normalized_line: logger.warning(f"{line_id}: 正規化後空"); continue |
|
|
|
|
|
reading, _, lab_data = pron_estimator.get_pronunciation(normalized_line, reading_output_format) |
|
|
if "[ERROR]" not in reading: line_success = True |
|
|
|
|
|
except NormalizationError as e_norm: logger.error(f"{line_id}: 正規化エラー: {e_norm}") |
|
|
except Exception as e_line: logger.error(f"{line_id}: 行処理エラー: {e_line}", exc_info=True) |
|
|
|
|
|
|
|
|
if line_success: success_count += 1 |
|
|
else: error_count += 1 |
|
|
|
|
|
corpus_item = {"id": line_id, "text": normalized_line, "reading": reading} |
|
|
processed_data.append(corpus_item) |
|
|
if lab_output_enabled and lab_data: all_lab_data[line_id] = lab_data |
|
|
if output_comparison: |
|
|
comparison_items.append({"id": line_id, "original": original_line, "normalized": normalized_line, "reading": reading}) |
|
|
|
|
|
|
|
|
if processed_data: output_corpus_data(output_base, processed_data, output_formats) |
|
|
if lab_output_enabled and all_lab_data: output_lab_data(output_base, all_lab_data) |
|
|
if output_comparison and comparison_items: output_comparison_data(output_base, comparison_items) |
|
|
|
|
|
logger.debug(f"'{filename}'処理完了 S:{success_count}, E:{error_count}") |
|
|
return success_count, error_count |
|
|
|
|
|
except FileProcessingError as e_fp: raise e_fp |
|
|
except Exception as e_f: raise FileProcessingError(f"ファイル処理中エラー ({filepath}): {e_f}") |
|
|
|
|
|
|
|
|
|
|
|
class TextProcessorGUI(tk.Tk): |
|
|
def __init__(self): |
|
|
super().__init__() |
|
|
|
|
|
self.title("音声合成コーパス前処理ツール") |
|
|
self.geometry("850x700") |
|
|
style = ttk.Style(self) |
|
|
try: style.theme_use('clam') |
|
|
except tk.TclError: pass |
|
|
|
|
|
|
|
|
self.input_files_var = tk.StringVar(value="") |
|
|
self.output_folder_var = tk.StringVar(value="") |
|
|
self.norm_rules_var = tk.StringVar(value="normalization_rules.yaml") |
|
|
self.janome_udic_var = tk.StringVar(value="") |
|
|
self.jtalk_dic_var = tk.StringVar(value="") |
|
|
self.jtalk_user_dic_var = tk.StringVar(value="") |
|
|
|
|
|
engine_choices = [eng for eng in ['janome', 'pyopenjtalk'] if globals()[f"{eng.upper()}_AVAILABLE"]] |
|
|
default_engine = 'pyopenjtalk' if PYOPENJTALK_AVAILABLE else ('janome' if JANOME_AVAILABLE else '') |
|
|
self.engine_var = tk.StringVar(value=default_engine) |
|
|
self.output_format_vars = {"tsv": tk.BooleanVar(value=False), "jsonl": tk.BooleanVar(value=True)} |
|
|
self.reading_format_var = tk.StringVar(value="katakana") |
|
|
self.output_lab_var = tk.BooleanVar(value=False) |
|
|
self.output_comp_var = tk.BooleanVar(value=False) |
|
|
self.encoding_var = tk.StringVar(value="utf-8") |
|
|
self.processing_active = False |
|
|
|
|
|
|
|
|
self._create_widgets() |
|
|
self._setup_logging() |
|
|
|
|
|
|
|
|
if not PYOPENJTALK_AVAILABLE: |
|
|
logger.warning("pyopenjtalkが見つからないか初期化に失敗しました。pyopenjtalkエンジンは選択できません。") |
|
|
if not JANOME_AVAILABLE: |
|
|
logger.warning("Janomeが見つかりません。Janomeエンジンは選択できません。") |
|
|
if not engine_choices: |
|
|
messagebox.showerror("致命的エラー", "利用可能な読み推定エンジン(Janome/pyopenjtalk)がありません。\nライブラリのインストールを確認してください。") |
|
|
self.destroy() |
|
|
|
|
|
def _create_widgets(self): |
|
|
main_frame = ttk.Frame(self, padding="10") |
|
|
main_frame.pack(expand=True, fill=tk.BOTH) |
|
|
main_frame.columnconfigure(1, weight=1) |
|
|
|
|
|
row_idx = 0 |
|
|
|
|
|
|
|
|
io_frame = ttk.LabelFrame(main_frame, text=" 入出力 ", padding="10") |
|
|
io_frame.grid(row=row_idx, column=0, columnspan=3, sticky="ew", padx=5, pady=(0,5)) |
|
|
io_frame.columnconfigure(1, weight=1) |
|
|
ttk.Label(io_frame, text="入力ファイル:").grid(row=0, column=0, sticky=tk.W, padx=5, pady=2) |
|
|
ttk.Entry(io_frame, textvariable=self.input_files_var).grid(row=0, column=1, sticky="ew", padx=5, pady=2) |
|
|
ttk.Button(io_frame, text="選択...", width=8, command=self._select_input_files).grid(row=0, column=2, padx=(0,5), pady=2) |
|
|
ttk.Label(io_frame, text="出力フォルダ:").grid(row=1, column=0, sticky=tk.W, padx=5, pady=2) |
|
|
ttk.Entry(io_frame, textvariable=self.output_folder_var).grid(row=1, column=1, sticky="ew", padx=5, pady=2) |
|
|
ttk.Button(io_frame, text="選択...", width=8, command=self._select_output_folder).grid(row=1, column=2, padx=(0,5), pady=2) |
|
|
row_idx += 1 |
|
|
|
|
|
|
|
|
config_frame = ttk.LabelFrame(main_frame, text=" 設定ファイル ", padding="10") |
|
|
config_frame.grid(row=row_idx, column=0, columnspan=3, sticky="ew", padx=5, pady=5) |
|
|
config_frame.columnconfigure(1, weight=1) |
|
|
ttk.Label(config_frame, text="正規化ルール:").grid(row=0, column=0, sticky=tk.W, padx=5, pady=2) |
|
|
ttk.Entry(config_frame, textvariable=self.norm_rules_var).grid(row=0, column=1, sticky="ew", padx=5, pady=2) |
|
|
ttk.Button(config_frame, text="選択...", width=8, command=self._select_norm_rules).grid(row=0, column=2, padx=(0,5), pady=2) |
|
|
row_idx += 1 |
|
|
|
|
|
|
|
|
engine_frame = ttk.LabelFrame(main_frame, text=" 読み推定エンジンと辞書 ", padding="10") |
|
|
engine_frame.grid(row=row_idx, column=0, columnspan=3, sticky="ew", padx=5, pady=5) |
|
|
engine_frame.columnconfigure(1, weight=1) |
|
|
engine_choices = [eng for eng in ['janome', 'pyopenjtalk'] if globals()[f"{eng.upper()}_AVAILABLE"]] |
|
|
ttk.Label(engine_frame, text="エンジン:").grid(row=0, column=0, sticky=tk.W, padx=5, pady=2) |
|
|
engine_combo = ttk.Combobox(engine_frame, textvariable=self.engine_var, values=engine_choices, state="readonly", width=15) |
|
|
engine_combo.grid(row=0, column=1, sticky=tk.W, padx=5, pady=2) |
|
|
|
|
|
ttk.Label(engine_frame, text="Janomeユーザー辞書:").grid(row=1, column=0, sticky=tk.W, padx=5, pady=2) |
|
|
ttk.Entry(engine_frame, textvariable=self.janome_udic_var).grid(row=1, column=1, sticky="ew", padx=5, pady=2) |
|
|
ttk.Button(engine_frame, text="選択...", width=8, command=lambda: self._select_file(self.janome_udic_var, "Janome辞書", [("CSV", "*.csv")])).grid(row=1, column=2, padx=(0,5), pady=2) |
|
|
|
|
|
ttk.Label(engine_frame, text="JTalkシステム辞書:").grid(row=2, column=0, sticky=tk.W, padx=5, pady=2) |
|
|
ttk.Entry(engine_frame, textvariable=self.jtalk_dic_var).grid(row=2, column=1, sticky="ew", padx=5, pady=2) |
|
|
ttk.Button(engine_frame, text="選択...", width=8, command=self._select_jtalk_dic).grid(row=2, column=2, padx=(0,5), pady=2) |
|
|
|
|
|
ttk.Label(engine_frame, text="JTalkユーザー辞書:").grid(row=3, column=0, sticky=tk.W, padx=5, pady=2) |
|
|
ttk.Entry(engine_frame, textvariable=self.jtalk_user_dic_var).grid(row=3, column=1, sticky="ew", padx=5, pady=2) |
|
|
ttk.Button(engine_frame, text="選択...", width=8, command=lambda: self._select_file(self.jtalk_user_dic_var, "JTalkユーザー辞書", [("CSV/DIC", "*.csv *.dic")])).grid(row=3, column=2, padx=(0,5), pady=2) |
|
|
row_idx += 1 |
|
|
|
|
|
|
|
|
output_frame = ttk.LabelFrame(main_frame, text=" 出力オプション ", padding="10") |
|
|
output_frame.grid(row=row_idx, column=0, columnspan=3, sticky="ew", padx=5, pady=5) |
|
|
output_frame.columnconfigure(1, weight=1) |
|
|
|
|
|
|
|
|
cf_frame = ttk.Frame(output_frame) |
|
|
cf_frame.grid(row=0, column=0, columnspan=4, sticky=tk.W, pady=2) |
|
|
ttk.Label(cf_frame, text="コーパス形式:").pack(side=tk.LEFT, padx=(5,10)) |
|
|
ttk.Checkbutton(cf_frame, text="TSV", variable=self.output_format_vars["tsv"]).pack(side=tk.LEFT, padx=5) |
|
|
ttk.Checkbutton(cf_frame, text="JSONL", variable=self.output_format_vars["jsonl"]).pack(side=tk.LEFT, padx=5) |
|
|
|
|
|
|
|
|
rf_frame = ttk.Frame(output_frame) |
|
|
rf_frame.grid(row=1, column=0, columnspan=4, sticky=tk.W, pady=2) |
|
|
ttk.Label(rf_frame, text="読みの形式:").pack(side=tk.LEFT, padx=(5,10)) |
|
|
reading_combo = ttk.Combobox(rf_frame, textvariable=self.reading_format_var, values=['katakana', 'hiragana'], state="readonly", width=10) |
|
|
reading_combo.pack(side=tk.LEFT, padx=5) |
|
|
|
|
|
|
|
|
oc_frame = ttk.Frame(output_frame) |
|
|
oc_frame.grid(row=2, column=0, columnspan=4, sticky=tk.W, pady=2) |
|
|
ttk.Checkbutton(oc_frame, text="LAB出力 (JTalk)", variable=self.output_lab_var).pack(side=tk.LEFT, padx=5) |
|
|
ttk.Checkbutton(oc_frame, text="比較データ出力", variable=self.output_comp_var).pack(side=tk.LEFT, padx=5) |
|
|
|
|
|
|
|
|
enc_frame = ttk.Frame(output_frame) |
|
|
enc_frame.grid(row=3, column=0, columnspan=4, sticky=tk.W, pady=2) |
|
|
ttk.Label(enc_frame, text="入力エンコーディング:").pack(side=tk.LEFT, padx=(5, 10)) |
|
|
encoding_entry = ttk.Entry(enc_frame, textvariable=self.encoding_var, width=12) |
|
|
encoding_entry.pack(side=tk.LEFT, padx=5) |
|
|
row_idx += 1 |
|
|
|
|
|
|
|
|
self.run_button = ttk.Button(main_frame, text="処理実行", command=self._start_processing, style="Accent.TButton") |
|
|
self.run_button.grid(row=row_idx, column=0, columnspan=3, pady=15) |
|
|
row_idx += 1 |
|
|
|
|
|
|
|
|
log_frame = ttk.LabelFrame(main_frame, text=" ログ ", padding="10") |
|
|
log_frame.grid(row=row_idx, column=0, columnspan=3, sticky="nsew", padx=5, pady=(5,0)) |
|
|
main_frame.rowconfigure(row_idx, weight=1) |
|
|
log_frame.grid_rowconfigure(0, weight=1) |
|
|
log_frame.grid_columnconfigure(0, weight=1) |
|
|
self.log_text = scrolledtext.ScrolledText(log_frame, wrap=tk.WORD, height=10, state=tk.DISABLED, bd=0, relief=tk.FLAT) |
|
|
self.log_text.grid(row=0, column=0, sticky="nsew") |
|
|
|
|
|
|
|
|
def _select_input_files(self): |
|
|
fpaths = filedialog.askopenfilenames(title="入力ファイル選択 (複数可)", filetypes=[("テキストファイル", "*.txt"), ("全ファイル", "*.*")]) |
|
|
if fpaths: self.input_files_var.set(";".join(fpaths)) |
|
|
|
|
|
def _select_output_folder(self): |
|
|
fpath = filedialog.askdirectory(title="出力フォルダ選択") |
|
|
if fpath: self.output_folder_var.set(fpath) |
|
|
|
|
|
def _select_file(self, var, title, filetypes): |
|
|
fpath = filedialog.askopenfilename(title=title, filetypes=filetypes) |
|
|
if fpath: var.set(fpath) |
|
|
|
|
|
def _select_norm_rules(self): |
|
|
self._select_file(self.norm_rules_var, "正規化ルール選択", [("YAML", "*.yaml *.yml"), ("全ファイル", "*.*")]) |
|
|
|
|
|
def _select_jtalk_dic(self): |
|
|
fpath = filedialog.askdirectory(title="JTalkシステム辞書フォルダ選択 (例: .../open_jtalk_dic_utf_8-1.11)") |
|
|
if fpath: self.jtalk_dic_var.set(fpath) |
|
|
|
|
|
|
|
|
def _setup_logging(self): |
|
|
class TextHandler(logging.Handler): |
|
|
def __init__(self, text_widget): |
|
|
logging.Handler.__init__(self) |
|
|
self.text_widget = text_widget |
|
|
self.queue = [] |
|
|
self.processing = False |
|
|
|
|
|
def schedule_update(self): |
|
|
|
|
|
if not self.processing: |
|
|
self.processing = True |
|
|
self.text_widget.after(50, self._update_widget) |
|
|
|
|
|
def _update_widget(self): |
|
|
self.processing = False |
|
|
if not self.text_widget.winfo_exists(): return |
|
|
|
|
|
self.text_widget.configure(state='normal') |
|
|
while self.queue: |
|
|
record = self.queue.pop(0) |
|
|
msg = self.format(record) |
|
|
self.text_widget.insert(tk.END, msg + '\n') |
|
|
self.text_widget.configure(state='disabled') |
|
|
self.text_widget.yview(tk.END) |
|
|
|
|
|
def emit(self, record): |
|
|
self.queue.append(record) |
|
|
self.schedule_update() |
|
|
|
|
|
gui_handler = TextHandler(self.log_text) |
|
|
gui_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s', datefmt='%H:%M:%S')) |
|
|
logging.getLogger().addHandler(gui_handler) |
|
|
logging.getLogger().setLevel(logging.INFO) |
|
|
|
|
|
|
|
|
def _start_processing(self): |
|
|
if self.processing_active: logger.warning("処理中"); return |
|
|
|
|
|
|
|
|
self.params = {} |
|
|
try: |
|
|
self.params['input_files'] = [f.strip() for f in self.input_files_var.get().split(';') if f.strip()] |
|
|
self.params['output_folder'] = self.output_folder_var.get() |
|
|
self.params['norm_rules'] = self.norm_rules_var.get() |
|
|
self.params['engine'] = self.engine_var.get() |
|
|
self.params['janome_udic'] = self.janome_udic_var.get() or None |
|
|
self.params['jtalk_dic'] = self.jtalk_dic_var.get() or None |
|
|
self.params['jtalk_user_dic'] = self.jtalk_user_dic_var.get() or None |
|
|
self.params['output_format'] = [fmt for fmt, var in self.output_format_vars.items() if var.get()] |
|
|
self.params['reading_format'] = self.reading_format_var.get() |
|
|
self.params['output_lab'] = self.output_lab_var.get() |
|
|
self.params['output_comparison'] = self.output_comp_var.get() |
|
|
self.params['encoding'] = self.encoding_var.get().strip() or 'utf-8' |
|
|
|
|
|
if not self.params['input_files']: raise ValueError("入力ファイル未選択") |
|
|
if not self.params['output_folder']: raise ValueError("出力フォルダ未選択") |
|
|
if not self.params['output_format']: raise ValueError("出力コーパス形式未選択") |
|
|
if self.params['output_lab'] and self.params['engine'] != 'pyopenjtalk': |
|
|
raise ValueError("LAB出力はpyopenjtalkエンジン選択時のみ可能") |
|
|
if not os.path.exists(self.params['norm_rules']): |
|
|
logger.warning(f"正規化ルールファイル '{self.params['norm_rules']}' が存在しません。デフォルトルールを使用します。") |
|
|
|
|
|
|
|
|
|
|
|
except ValueError as ve: |
|
|
messagebox.showerror("入力エラー", str(ve)); return |
|
|
except Exception as e: |
|
|
messagebox.showerror("設定エラー", f"設定値の取得中にエラー: {e}"); return |
|
|
|
|
|
|
|
|
self.processing_active = True |
|
|
self.run_button.config(state=tk.DISABLED, text="処理中...") |
|
|
self.log_text.configure(state='normal'); self.log_text.delete('1.0', tk.END); self.log_text.configure(state='disabled') |
|
|
logger.info("処理を開始します...") |
|
|
|
|
|
|
|
|
self.processing_thread = threading.Thread(target=self._run_processing_logic, daemon=True) |
|
|
self.processing_thread.start() |
|
|
self.after(100, self._check_thread) |
|
|
|
|
|
def _check_thread(self): |
|
|
if self.processing_thread.is_alive(): |
|
|
self.after(100, self._check_thread) |
|
|
|
|
|
|
|
|
def _run_processing_logic(self): |
|
|
"""実際の処理 (別スレッドで実行)""" |
|
|
exit_code = 1 |
|
|
final_message = "処理中にエラーが発生しました。" |
|
|
message_type = "error" |
|
|
p = self.params |
|
|
|
|
|
try: |
|
|
|
|
|
logger.info(f"入力ファイル数: {len(p['input_files'])}") |
|
|
logger.info(f"出力フォルダ: {p['output_folder']}") |
|
|
logger.info(f"正規化ルール: {p['norm_rules'] if os.path.exists(p['norm_rules']) else 'デフォルト'}") |
|
|
logger.info(f"エンジン: {p['engine']}") |
|
|
logger.info(f"出力形式: {p['output_format']}") |
|
|
logger.info(f"LAB出力: {p['output_lab']}") |
|
|
|
|
|
|
|
|
normalizer = TextNormalizer(p['norm_rules']) |
|
|
pron_estimator = PronunciationEstimator( |
|
|
engine=p['engine'], janome_udic_path=p['janome_udic'], |
|
|
jtalk_dic_path=p['jtalk_dic'], jtalk_user_dic_path=p['jtalk_user_dic'] |
|
|
) |
|
|
|
|
|
|
|
|
total_success_lines, total_error_lines, file_error_count, processed_files = 0, 0, 0, 0 |
|
|
for i, filepath in enumerate(p['input_files']): |
|
|
logger.info(f"--- 処理中 ({i+1}/{len(p['input_files'])}): {os.path.basename(filepath)} ---") |
|
|
try: |
|
|
norm_filepath = os.path.normpath(filepath) |
|
|
if not os.path.isfile(norm_filepath): |
|
|
logger.warning(f"スキップ (非ファイル): {norm_filepath}"); file_error_count += 1; continue |
|
|
s, e = process_file( |
|
|
norm_filepath, p['output_folder'], normalizer, pron_estimator, |
|
|
p['output_format'], p['output_lab'], p['reading_format'], |
|
|
p['encoding'], p['output_comparison'] |
|
|
) |
|
|
total_success_lines += s; total_error_lines += e |
|
|
if e > 0: file_error_count += 1 |
|
|
processed_files += 1 |
|
|
except FileProcessingError as e_fp: logger.error(f"{os.path.basename(filepath)} ファイルエラー: {e_fp}"); file_error_count += 1 |
|
|
except Exception as e_f: logger.error(f"{os.path.basename(filepath)} 予期せぬエラー: {e_f}", exc_info=True); file_error_count += 1 |
|
|
|
|
|
|
|
|
|
|
|
if pron_estimator.unknown_words: |
|
|
unknown_file = os.path.join(p['output_folder'], "unknown_words.txt") |
|
|
try: |
|
|
with open(unknown_file, 'w', encoding='utf-8') as f: |
|
|
f.write("\n".join(sorted(list(pron_estimator.unknown_words)))) |
|
|
logger.warning(f"未知語リスト出力: {unknown_file}") |
|
|
except Exception as e_unk: logger.error(f"未知語リスト出力失敗: {e_unk}") |
|
|
|
|
|
|
|
|
logger.info("=" * 30 + " 処理結果 " + "=" * 30) |
|
|
logger.info(f"処理ファイル数: {processed_files} / {len(p['input_files'])}") |
|
|
logger.info(f"エラー発生ファイル数: {file_error_count}") |
|
|
logger.info(f"総処理行数: {total_success_lines + total_error_lines}") |
|
|
logger.info(f" 正常処理行数: {total_success_lines}") |
|
|
logger.info(f" エラー発生行数: {total_error_lines}") |
|
|
logger.info("=" * 68) |
|
|
|
|
|
if file_error_count > 0 or total_error_lines > 0: |
|
|
final_message = "処理完了(エラーあり)。詳細はログを確認してください。" |
|
|
message_type = "warning" |
|
|
exit_code = 1 |
|
|
else: |
|
|
final_message = "すべての処理が正常に完了しました。" |
|
|
message_type = "info" |
|
|
exit_code = 0 |
|
|
|
|
|
except InitializationError as e_init: final_message = f"初期化エラー: {e_init}"; message_type = "error" |
|
|
except Exception as e_main: final_message = f"予期せぬエラー: {e_main}"; message_type = "error"; logger.error("予期せぬエラー", exc_info=True) |
|
|
|
|
|
|
|
|
def update_gui(): |
|
|
self.processing_active = False |
|
|
if self.winfo_exists(): |
|
|
self.run_button.config(state=tk.NORMAL, text="処理実行") |
|
|
if message_type == "info": messagebox.showinfo("完了", final_message) |
|
|
elif message_type == "warning": messagebox.showwarning("完了(一部エラー)", final_message) |
|
|
else: messagebox.showerror("エラー", final_message) |
|
|
self.after(0, update_gui) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
if not ENABLE_GUI: |
|
|
print("エラー: GUI表示に必要な tkinter が見つかりません。", file=sys.stderr) |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
app = TextProcessorGUI() |
|
|
app.mainloop() |