import os import re import unicodedata from typing import List, Optional, Dict, Any, Tuple, Literal import logging import json import sys import csv import yaml # PyYAMLが必要: pip install PyYAML import jaconv # jaconvが必要: pip install jaconv import threading import tkinter as tk from tkinter import ttk, filedialog, messagebox, scrolledtext # --- 依存ライブラリのチェックとインポート --- try: from janome.tokenizer import Tokenizer, Token from janome.analyzer import Analyzer from janome.charfilter import UnicodeNormalizeCharFilter # from janome.tokenfilter import POSKeepFilter, CompoundNounFilter # 必要なら使う JANOME_AVAILABLE = True except ImportError: JANOME_AVAILABLE = False try: import pyopenjtalk PYOPENJTALK_AVAILABLE = True except ImportError: PYOPENJTALK_AVAILABLE = False except Exception as e: # pyopenjtalkはImportError以外も発生しうる PYOPENJTALK_AVAILABLE = False print(f"警告: pyopenjtalkのインポート中にエラーが発生しました: {e}", file=sys.stderr) # --- GUI機能が利用可能か --- # ENABLE_GUI = True # tkinterのimport成功時にTrueになる想定 try: # tkinterは標準ライブラリだが、最小環境でない場合もある import tkinter as tk from tkinter import ttk, filedialog, messagebox, scrolledtext ENABLE_GUI = True except ImportError: ENABLE_GUI = False # ロギングの設定 (GUI表示用にレベルはINFO推奨) logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # --- 例外クラス --- class TextProcessingError(Exception): pass class FileProcessingError(TextProcessingError): pass class InitializationError(TextProcessingError): pass class NormalizationError(TextProcessingError): pass # --- テキスト正規化クラス --- class TextNormalizer: def __init__(self, rule_filepath: Optional[str] = None): self.rules = self._load_rules(rule_filepath) logger.info(f"正規化ルールをロードしました (ファイル: {rule_filepath or 'デフォルト'})") self.number_converter = self._create_number_converter() def _load_rules(self, filepath: Optional[str]) -> Dict[str, Any]: # デフォルトルール (必要最低限) default_rules = { 'unicode_normalize': 'NFKC', 'remove_whitespace': True, 'replace_symbols': {'&': ' アンド ', '%': ' パーセント '}, 'remove_symbols': r'[「」『』【】[]()<>‘’“”・※→←↑↓*#〜]', 'punctuation': {'remove': False, 'replacement': ' ', 'target': '、。?!'}, 'number_conversion': {'enabled': True, 'target': r'\d+([\.,]\d+)?'}, 'alphabet_conversion': {'enabled': True, 'rule': 'spellout', 'dictionary': {'AI': 'エーアイ', 'USB': 'ユーエスビー'}}, 'custom_replacements_pre': [], 'custom_replacements_post': [], } if filepath and os.path.exists(filepath): try: with open(filepath, 'r', encoding='utf-8') as f: loaded_rules = yaml.safe_load(f) # より安全なマージ (ネストした辞書も考慮するなら再帰的なマージが必要) for key, value in loaded_rules.items(): if isinstance(value, dict) and isinstance(default_rules.get(key), dict): default_rules[key].update(value) else: default_rules[key] = value logger.info(f"カスタム正規化ルール '{filepath}' をロードしました。") return default_rules except Exception as e: logger.warning(f"正規化ルールファイル '{filepath}' の読み込みに失敗: {e}。デフォルトルールを使用。") return default_rules else: if filepath: logger.warning(f"正規化ルールファイル '{filepath}' が見つかりません。デフォルトルールを使用。") else: logger.info("正規化ルールファイル未指定。デフォルトルールを使用。") return default_rules def _create_number_converter(self): # 数字 -> 日本語読み 変換クラス (要実装) class SimpleNumConverter: def convert(self, num_str: str) -> str: # ここに詳細な変換ロジックを実装する # 例: 123 -> ひゃくにじゅうさん, 10.5 -> じゅってんご return num_str # プレースホルダー return SimpleNumConverter() def normalize(self, text: str) -> str: if not isinstance(text, str): return "" try: # 1. Unicode正規化 norm_type = self.rules.get('unicode_normalize') if norm_type: text = unicodedata.normalize(norm_type, text) # 2. 前処理カスタム置換 for rule in self.rules.get('custom_replacements_pre', []): text = re.sub(rule['pattern'], rule['replacement'], text) # 3. 空白正規化 (早期に行う方が後続処理が楽な場合も) if self.rules.get('remove_whitespace', True): text = re.sub(r'\s+', ' ', text).strip() # 4. 数字変換 (ルールの target に基づいて検索・置換) num_conf = self.rules.get('number_conversion', {}) if num_conf.get('enabled'): pattern = num_conf.get('target', r'\d+([\.,]\d+)?') try: text = re.sub(pattern, lambda m: self.number_converter.convert(m.group(0)), text) except Exception as e_num: logger.warning(f"数字変換エラー: {e_num}") # 5. アルファベット変換 (辞書 + ルール) alpha_conf = self.rules.get('alphabet_conversion', {}) if alpha_conf.get('enabled'): for word, reading in alpha_conf.get('dictionary', {}).items(): # 大文字小文字区別など考慮 text = text.replace(word, f' {reading} ') if alpha_conf.get('rule') == 'spellout': # ここにアルファベットを1文字ずつ読む処理 (A->エー) を実装 pass # 6. 記号読み置換 for symbol, reading in self.rules.get('replace_symbols', {}).items(): text = text.replace(symbol, reading) # 7. 記号削除 remove_pattern = self.rules.get('remove_symbols') if remove_pattern: text = re.sub(remove_pattern, '', text) # 8. 句読点処理 punct_conf = self.rules.get('punctuation', {}) target_punct = punct_conf.get('target', '、。?!') punct_pattern = f'[{re.escape(target_punct)}]' if punct_conf.get('remove', True): text = re.sub(punct_pattern, '', text) else: replacement = punct_conf.get('replacement', '') text = re.sub(punct_pattern, replacement, text) # 9. 後処理カスタム置換 for rule in self.rules.get('custom_replacements_post', []): text = re.sub(rule['pattern'], rule['replacement'], text) # 10. 最終空白処理 if self.rules.get('remove_whitespace', True): text = re.sub(r'\s+', ' ', text).strip() return text except Exception as e: raise NormalizationError(f"正規化失敗 ({text[:20]}...): {e}") # --- 読み・アクセント推定クラス --- class PronunciationEstimator: def __init__( self, engine: Literal['janome', 'pyopenjtalk'] = 'pyopenjtalk', janome_udic_path: Optional[str] = None, jtalk_dic_path: Optional[str] = None, jtalk_user_dic_path: Optional[str] = None ): self.engine = engine self.jtalk_user_dic_path = jtalk_user_dic_path # ユーザー辞書パスを保持 self.unknown_words = set() self.janome_analyzer = None # Janomeインスタンス用 self.jtalk_opts = [] # pyopenjtalkオプション用 if self.engine == 'janome': if not JANOME_AVAILABLE: raise InitializationError("Janome利用不可") try: logger.info(f"Janome初期化 (ユーザー辞書: {janome_udic_path})") char_filters = [UnicodeNormalizeCharFilter()] # ユーザー辞書はTokenizerに渡す tokenizer_kwargs = {"udic": janome_udic_path, "udic_enc": "utf8"} if janome_udic_path else {} self.janome_analyzer = Analyzer( char_filters=char_filters, tokenizer=Tokenizer(**tokenizer_kwargs), token_filters=[] # 必要なら追加 ) logger.info("Janome初期化完了") except Exception as e: raise InitializationError(f"Janome初期化失敗: {e}") elif self.engine == 'pyopenjtalk': if not PYOPENJTALK_AVAILABLE: raise InitializationError("pyopenjtalk利用不可") try: logger.info("pyopenjtalk初期化...") if jtalk_dic_path: logger.info(f"jtalkシステム辞書パス設定: {jtalk_dic_path}") os.environ['PYOPENJTALK_DICT_PATH'] = jtalk_dic_path # pyopenjtalkのユーザー辞書指定はrun_frontendでは直接できないため注意喚起 if self.jtalk_user_dic_path: logger.warning(f"jtalkユーザー辞書 '{self.jtalk_user_dic_path}' はrun_frontendでは直接使用されません。システム辞書への統合等を検討してください。g2pには影響する可能性があります。") # g2p に渡すオプションなど、より詳細な制御が必要なら実装 _ = pyopenjtalk.g2p('テスト') # 動作確認 logger.info("pyopenjtalk初期化完了") except Exception as e: raise InitializationError(f"pyopenjtalk初期化失敗: {e}") else: raise InitializationError(f"無効なエンジン: {self.engine}") def _get_janome_reading(self, text: str) -> Tuple[str, List[Dict[str, Any]]]: tokens_details = [] katakana_parts = [] try: tokens = self.janome_analyzer.analyze(text) for token in tokens: surf = token.surface pos = token.part_of_speech # 詳細な品詞 reading = token.reading if token.reading != '*' else None pron = token.pronunciation if hasattr(token, 'pronunciation') and token.pronunciation != '*' else reading reading_used = reading or pron # 読み or 発音 if reading_used: katakana_parts.append(reading_used) else: katakana_parts.append(jaconv.hira2kata(surf)) # 表層形をカタカナ化 if not re.fullmatch(r'[ぁ-んァ-ンー\s]+', surf) and '記号' not in pos: self.unknown_words.add(surf) tokens_details.append({ "surface": surf, "pos": pos, "reading": reading, "pronunciation": pron }) return "".join(katakana_parts), tokens_details except Exception as e: logger.error(f"Janome読み推定エラー: {e}", exc_info=True) return "[READING ERROR]", [] def _get_jtalk_pronunciation(self, text: str) -> Tuple[str, List[Dict[str, Any]], List[str]]: katakana_reading = "[READING ERROR]" tokens_details = [] full_context_labels = [] try: # 読みはg2pで取得するのが確実 # ユーザー辞書はg2pには影響するはず (-u オプション?) 要調査 g2p_opts = ['-u', self.jtalk_user_dic_path] if self.jtalk_user_dic_path and os.path.exists(self.jtalk_user_dic_path) else [] # katakana_reading = pyopenjtalk.g2p(text, kana=True, opts=g2p_opts) # opts引数は存在しない? katakana_reading = pyopenjtalk.g2p(text, kana=True) # 現状 opts なしで実行 # LABデータはrun_frontendで取得 # run_frontendにはユーザー辞書オプションがない full_context_labels = pyopenjtalk.run_frontend(text, self.jtalk_opts) # 簡易的なトークン情報(本来はMeCabの結果をパースすべき) tokens_details = [{"surface": text, "reading": katakana_reading}] # 未知語判定(読みが表層形と同じでカタカナのみでない場合) if katakana_reading == text and not re.fullmatch(r'[ァ-ンヴー\s]+', text): self.unknown_words.add(text) return katakana_reading, tokens_details, full_context_labels except Exception as e: logger.error(f"pyopenjtalk読み推定エラー: {e}", exc_info=True) return "[READING ERROR]", [], [] def get_pronunciation(self, text: str, output_format: Literal['hiragana', 'katakana'] = 'katakana') -> Tuple[str, List[Dict[str, Any]], List[str]]: reading = "[READING ERROR]" tokens_details = [] lab_data = [] if not text: return "", [], [] if self.engine == 'janome': reading, tokens_details = self._get_janome_reading(text) elif self.engine == 'pyopenjtalk': reading, tokens_details, lab_data = self._get_jtalk_pronunciation(text) # 出力形式変換 if output_format == 'hiragana' and "[ERROR]" not in reading: try: reading = jaconv.kata2hira(reading) except Exception: logger.warning("ひらがな変換失敗", exc_info=True) return reading, tokens_details, lab_data # --- ファイルIO & 出力関数 --- def read_text_with_bom_removal(filepath: str, encoding: str = 'utf-8') -> List[str]: try: with open(filepath, 'rb') as f: raw_data = f.read() if raw_data.startswith(b'\xef\xbb\xbf'): text = raw_data[3:].decode(encoding, errors='replace') elif raw_data.startswith((b'\xff\xfe', b'\xfe\xff')): text = raw_data.decode('utf-16', errors='replace') else: text = raw_data.decode(encoding, errors='replace') return text.splitlines() except FileNotFoundError: raise FileProcessingError(f"ファイルが見つかりません: {filepath}") except Exception as e: raise FileProcessingError(f"ファイル読み込みエラー ({filepath}): {e}") def output_corpus_data(output_filepath_base: str, data: List[Dict[str, Any]], output_formats: List[Literal['tsv', 'jsonl']]) -> None: output_successful = [] if not data: logger.warning(f"{output_filepath_base}*: 出力データなし"); return # TSV Output if 'tsv' in output_formats: fpath = f"{output_filepath_base}.tsv" try: header = ["id", "text", "reading"] # 基本 # 必要なら他のキーも追加 with open(fpath, 'w', encoding='utf-8', newline='') as f: writer = csv.DictWriter(f, fieldnames=header, delimiter='\t', quoting=csv.QUOTE_MINIMAL, extrasaction='ignore') writer.writeheader() writer.writerows(data) logger.info(f"TSV出力完了: {fpath}") output_successful.append('tsv') except Exception as e: logger.error(f"TSV出力失敗 ({fpath}): {e}") # JSONL Output if 'jsonl' in output_formats: fpath = f"{output_filepath_base}.jsonl" try: with open(fpath, 'w', encoding='utf-8') as f: for item in data: item_copy = {k: v for k, v in item.items() if k != 'lab_data'} # LABは除く f.write(json.dumps(item_copy, ensure_ascii=False) + '\n') logger.info(f"JSONL出力完了: {fpath}") output_successful.append('jsonl') except Exception as e: logger.error(f"JSONL出力失敗 ({fpath}): {e}") if not output_successful and output_formats: raise FileProcessingError("指定形式での出力に成功せず") def output_lab_data(output_filepath_base: str, all_lab_data: Dict[str, List[str]]) -> None: lab_dir = f"{output_filepath_base}_lab" try: os.makedirs(lab_dir, exist_ok=True) count = 0 for file_id, lab_lines in all_lab_data.items(): if lab_lines: fpath = os.path.join(lab_dir, f"{file_id}.lab") with open(fpath, 'w', encoding='utf-8') as f: f.write("\n".join(lab_lines)) count += 1 logger.info(f"LABデータ出力完了: {count}ファイル -> {lab_dir}") except Exception as e: logger.error(f"LABデータ出力失敗: {e}") def output_comparison_data(output_filepath_base: str, comparison_data: List[Dict[str, str]]) -> None: """オプション: 元テキスト、正規化テキスト、読みの比較データをTSVで出力""" fpath = f"{output_filepath_base}_comparison.tsv" try: if not comparison_data: return header = ["id", "original", "normalized", "reading"] with open(fpath, 'w', encoding='utf-8', errors='replace', newline='') as tsvfile: writer = csv.DictWriter(tsvfile, fieldnames=header, delimiter='\t', quoting=csv.QUOTE_MINIMAL) writer.writeheader() writer.writerows(comparison_data) logger.info(f"比較データ出力完了: {fpath}") except Exception as e: logger.error(f"比較データ出力失敗 ({fpath}): {e}") # --- ファイル処理関数 --- def process_file( filepath: str, output_folder: str, normalizer: TextNormalizer, pron_estimator: PronunciationEstimator, output_formats: List[Literal['tsv', 'jsonl']], lab_output_enabled: bool, reading_output_format: Literal['hiragana', 'katakana'], encoding: str, output_comparison: bool = False, ) -> Tuple[int, int]: """ファイルを処理し、コーパスデータを出力""" filename = os.path.basename(filepath) output_base = os.path.join(output_folder, os.path.splitext(filename)[0]) processed_data: List[Dict[str, Any]] = [] comparison_items: List[Dict[str, str]] = [] all_lab_data: Dict[str, List[str]] = {} success_count = 0 error_count = 0 try: original_lines = read_text_with_bom_removal(filepath, encoding) logger.debug(f"'{filename}'読み込み完了、{len(original_lines)}行") for i, line in enumerate(original_lines): line_id = f"{os.path.splitext(filename)[0]}_{i:04d}" original_line = line.strip() if not original_line: continue normalized_line = "[NORM ERROR]" reading = "[READING ERROR]" lab_data = [] line_success = False try: normalized_line = normalizer.normalize(original_line) if not normalized_line: logger.warning(f"{line_id}: 正規化後空"); continue reading, _, lab_data = pron_estimator.get_pronunciation(normalized_line, reading_output_format) if "[ERROR]" not in reading: line_success = True except NormalizationError as e_norm: logger.error(f"{line_id}: 正規化エラー: {e_norm}") except Exception as e_line: logger.error(f"{line_id}: 行処理エラー: {e_line}", exc_info=True) # 結果格納 if line_success: success_count += 1 else: error_count += 1 corpus_item = {"id": line_id, "text": normalized_line, "reading": reading} processed_data.append(corpus_item) if lab_output_enabled and lab_data: all_lab_data[line_id] = lab_data if output_comparison: comparison_items.append({"id": line_id, "original": original_line, "normalized": normalized_line, "reading": reading}) # ファイル単位出力 if processed_data: output_corpus_data(output_base, processed_data, output_formats) if lab_output_enabled and all_lab_data: output_lab_data(output_base, all_lab_data) if output_comparison and comparison_items: output_comparison_data(output_base, comparison_items) logger.debug(f"'{filename}'処理完了 S:{success_count}, E:{error_count}") return success_count, error_count except FileProcessingError as e_fp: raise e_fp # 再throw except Exception as e_f: raise FileProcessingError(f"ファイル処理中エラー ({filepath}): {e_f}") # --- GUI アプリケーションクラス --- class TextProcessorGUI(tk.Tk): def __init__(self): super().__init__() # --- ウィンドウ設定 --- self.title("音声合成コーパス前処理ツール") self.geometry("850x700") style = ttk.Style(self) try: style.theme_use('clam') # or 'alt', 'default', 'classic' except tk.TclError: pass # テーマが存在しない場合 # --- 変数定義 --- self.input_files_var = tk.StringVar(value="") self.output_folder_var = tk.StringVar(value="") self.norm_rules_var = tk.StringVar(value="normalization_rules.yaml") # デフォルトファイル名 self.janome_udic_var = tk.StringVar(value="") self.jtalk_dic_var = tk.StringVar(value="") self.jtalk_user_dic_var = tk.StringVar(value="") # 利用可能なエンジンのみ選択肢に含める engine_choices = [eng for eng in ['janome', 'pyopenjtalk'] if globals()[f"{eng.upper()}_AVAILABLE"]] default_engine = 'pyopenjtalk' if PYOPENJTALK_AVAILABLE else ('janome' if JANOME_AVAILABLE else '') self.engine_var = tk.StringVar(value=default_engine) self.output_format_vars = {"tsv": tk.BooleanVar(value=False), "jsonl": tk.BooleanVar(value=True)} self.reading_format_var = tk.StringVar(value="katakana") self.output_lab_var = tk.BooleanVar(value=False) self.output_comp_var = tk.BooleanVar(value=False) self.encoding_var = tk.StringVar(value="utf-8") self.processing_active = False # 処理中フラグ # --- ウィジェット作成 & ロギング設定 --- self._create_widgets() self._setup_logging() # --- エンジン利用不可の場合の警告 --- if not PYOPENJTALK_AVAILABLE: logger.warning("pyopenjtalkが見つからないか初期化に失敗しました。pyopenjtalkエンジンは選択できません。") if not JANOME_AVAILABLE: logger.warning("Janomeが見つかりません。Janomeエンジンは選択できません。") if not engine_choices: messagebox.showerror("致命的エラー", "利用可能な読み推定エンジン(Janome/pyopenjtalk)がありません。\nライブラリのインストールを確認してください。") self.destroy() # アプリケーション終了 def _create_widgets(self): main_frame = ttk.Frame(self, padding="10") main_frame.pack(expand=True, fill=tk.BOTH) main_frame.columnconfigure(1, weight=1) # 2列目が伸びるように row_idx = 0 # --- 入出力パス --- io_frame = ttk.LabelFrame(main_frame, text=" 入出力 ", padding="10") io_frame.grid(row=row_idx, column=0, columnspan=3, sticky="ew", padx=5, pady=(0,5)) io_frame.columnconfigure(1, weight=1) ttk.Label(io_frame, text="入力ファイル:").grid(row=0, column=0, sticky=tk.W, padx=5, pady=2) ttk.Entry(io_frame, textvariable=self.input_files_var).grid(row=0, column=1, sticky="ew", padx=5, pady=2) ttk.Button(io_frame, text="選択...", width=8, command=self._select_input_files).grid(row=0, column=2, padx=(0,5), pady=2) ttk.Label(io_frame, text="出力フォルダ:").grid(row=1, column=0, sticky=tk.W, padx=5, pady=2) ttk.Entry(io_frame, textvariable=self.output_folder_var).grid(row=1, column=1, sticky="ew", padx=5, pady=2) ttk.Button(io_frame, text="選択...", width=8, command=self._select_output_folder).grid(row=1, column=2, padx=(0,5), pady=2) row_idx += 1 # --- 設定ファイル --- config_frame = ttk.LabelFrame(main_frame, text=" 設定ファイル ", padding="10") config_frame.grid(row=row_idx, column=0, columnspan=3, sticky="ew", padx=5, pady=5) config_frame.columnconfigure(1, weight=1) ttk.Label(config_frame, text="正規化ルール:").grid(row=0, column=0, sticky=tk.W, padx=5, pady=2) ttk.Entry(config_frame, textvariable=self.norm_rules_var).grid(row=0, column=1, sticky="ew", padx=5, pady=2) ttk.Button(config_frame, text="選択...", width=8, command=self._select_norm_rules).grid(row=0, column=2, padx=(0,5), pady=2) row_idx += 1 # --- エンジンと辞書 --- engine_frame = ttk.LabelFrame(main_frame, text=" 読み推定エンジンと辞書 ", padding="10") engine_frame.grid(row=row_idx, column=0, columnspan=3, sticky="ew", padx=5, pady=5) engine_frame.columnconfigure(1, weight=1) engine_choices = [eng for eng in ['janome', 'pyopenjtalk'] if globals()[f"{eng.upper()}_AVAILABLE"]] ttk.Label(engine_frame, text="エンジン:").grid(row=0, column=0, sticky=tk.W, padx=5, pady=2) engine_combo = ttk.Combobox(engine_frame, textvariable=self.engine_var, values=engine_choices, state="readonly", width=15) engine_combo.grid(row=0, column=1, sticky=tk.W, padx=5, pady=2) ttk.Label(engine_frame, text="Janomeユーザー辞書:").grid(row=1, column=0, sticky=tk.W, padx=5, pady=2) ttk.Entry(engine_frame, textvariable=self.janome_udic_var).grid(row=1, column=1, sticky="ew", padx=5, pady=2) ttk.Button(engine_frame, text="選択...", width=8, command=lambda: self._select_file(self.janome_udic_var, "Janome辞書", [("CSV", "*.csv")])).grid(row=1, column=2, padx=(0,5), pady=2) ttk.Label(engine_frame, text="JTalkシステム辞書:").grid(row=2, column=0, sticky=tk.W, padx=5, pady=2) ttk.Entry(engine_frame, textvariable=self.jtalk_dic_var).grid(row=2, column=1, sticky="ew", padx=5, pady=2) ttk.Button(engine_frame, text="選択...", width=8, command=self._select_jtalk_dic).grid(row=2, column=2, padx=(0,5), pady=2) ttk.Label(engine_frame, text="JTalkユーザー辞書:").grid(row=3, column=0, sticky=tk.W, padx=5, pady=2) ttk.Entry(engine_frame, textvariable=self.jtalk_user_dic_var).grid(row=3, column=1, sticky="ew", padx=5, pady=2) ttk.Button(engine_frame, text="選択...", width=8, command=lambda: self._select_file(self.jtalk_user_dic_var, "JTalkユーザー辞書", [("CSV/DIC", "*.csv *.dic")])).grid(row=3, column=2, padx=(0,5), pady=2) row_idx += 1 # --- 出力オプション --- output_frame = ttk.LabelFrame(main_frame, text=" 出力オプション ", padding="10") output_frame.grid(row=row_idx, column=0, columnspan=3, sticky="ew", padx=5, pady=5) output_frame.columnconfigure(1, weight=1) # 右側のスペースを確保 # コーパス形式 (横並び) cf_frame = ttk.Frame(output_frame) cf_frame.grid(row=0, column=0, columnspan=4, sticky=tk.W, pady=2) ttk.Label(cf_frame, text="コーパス形式:").pack(side=tk.LEFT, padx=(5,10)) ttk.Checkbutton(cf_frame, text="TSV", variable=self.output_format_vars["tsv"]).pack(side=tk.LEFT, padx=5) ttk.Checkbutton(cf_frame, text="JSONL", variable=self.output_format_vars["jsonl"]).pack(side=tk.LEFT, padx=5) # 読みの形式 rf_frame = ttk.Frame(output_frame) rf_frame.grid(row=1, column=0, columnspan=4, sticky=tk.W, pady=2) ttk.Label(rf_frame, text="読みの形式:").pack(side=tk.LEFT, padx=(5,10)) reading_combo = ttk.Combobox(rf_frame, textvariable=self.reading_format_var, values=['katakana', 'hiragana'], state="readonly", width=10) reading_combo.pack(side=tk.LEFT, padx=5) # その他チェックボックス (横並び) oc_frame = ttk.Frame(output_frame) oc_frame.grid(row=2, column=0, columnspan=4, sticky=tk.W, pady=2) ttk.Checkbutton(oc_frame, text="LAB出力 (JTalk)", variable=self.output_lab_var).pack(side=tk.LEFT, padx=5) ttk.Checkbutton(oc_frame, text="比較データ出力", variable=self.output_comp_var).pack(side=tk.LEFT, padx=5) # エンコーディング enc_frame = ttk.Frame(output_frame) enc_frame.grid(row=3, column=0, columnspan=4, sticky=tk.W, pady=2) ttk.Label(enc_frame, text="入力エンコーディング:").pack(side=tk.LEFT, padx=(5, 10)) encoding_entry = ttk.Entry(enc_frame, textvariable=self.encoding_var, width=12) encoding_entry.pack(side=tk.LEFT, padx=5) row_idx += 1 # --- 実行ボタン --- self.run_button = ttk.Button(main_frame, text="処理実行", command=self._start_processing, style="Accent.TButton") # スタイル適用例 self.run_button.grid(row=row_idx, column=0, columnspan=3, pady=15) row_idx += 1 # --- ログ表示 --- log_frame = ttk.LabelFrame(main_frame, text=" ログ ", padding="10") log_frame.grid(row=row_idx, column=0, columnspan=3, sticky="nsew", padx=5, pady=(5,0)) main_frame.rowconfigure(row_idx, weight=1) # ログエリアが伸縮するように log_frame.grid_rowconfigure(0, weight=1) log_frame.grid_columnconfigure(0, weight=1) self.log_text = scrolledtext.ScrolledText(log_frame, wrap=tk.WORD, height=10, state=tk.DISABLED, bd=0, relief=tk.FLAT) # ボーダー調整 self.log_text.grid(row=0, column=0, sticky="nsew") # --- ファイル/フォルダ選択メソッド --- def _select_input_files(self): fpaths = filedialog.askopenfilenames(title="入力ファイル選択 (複数可)", filetypes=[("テキストファイル", "*.txt"), ("全ファイル", "*.*")]) if fpaths: self.input_files_var.set(";".join(fpaths)) # 区切り文字で結合 def _select_output_folder(self): fpath = filedialog.askdirectory(title="出力フォルダ選択") if fpath: self.output_folder_var.set(fpath) def _select_file(self, var, title, filetypes): fpath = filedialog.askopenfilename(title=title, filetypes=filetypes) if fpath: var.set(fpath) def _select_norm_rules(self): self._select_file(self.norm_rules_var, "正規化ルール選択", [("YAML", "*.yaml *.yml"), ("全ファイル", "*.*")]) def _select_jtalk_dic(self): fpath = filedialog.askdirectory(title="JTalkシステム辞書フォルダ選択 (例: .../open_jtalk_dic_utf_8-1.11)") if fpath: self.jtalk_dic_var.set(fpath) # --- ロギング設定 --- def _setup_logging(self): class TextHandler(logging.Handler): def __init__(self, text_widget): logging.Handler.__init__(self) self.text_widget = text_widget self.queue = [] # メッセージキュー self.processing = False def schedule_update(self): # 複数メッセージをまとめて更新 if not self.processing: self.processing = True self.text_widget.after(50, self._update_widget) # 50ms後に更新 def _update_widget(self): self.processing = False if not self.text_widget.winfo_exists(): return self.text_widget.configure(state='normal') while self.queue: record = self.queue.pop(0) msg = self.format(record) self.text_widget.insert(tk.END, msg + '\n') self.text_widget.configure(state='disabled') self.text_widget.yview(tk.END) def emit(self, record): self.queue.append(record) self.schedule_update() gui_handler = TextHandler(self.log_text) gui_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s', datefmt='%H:%M:%S')) logging.getLogger().addHandler(gui_handler) logging.getLogger().setLevel(logging.INFO) # INFOレベル以上をGUIに表示 # --- 処理実行メソッド --- def _start_processing(self): if self.processing_active: logger.warning("処理中"); return # --- 設定値取得 & バリデーション --- self.params = {} # パラメータを格納する辞書 try: self.params['input_files'] = [f.strip() for f in self.input_files_var.get().split(';') if f.strip()] self.params['output_folder'] = self.output_folder_var.get() self.params['norm_rules'] = self.norm_rules_var.get() self.params['engine'] = self.engine_var.get() self.params['janome_udic'] = self.janome_udic_var.get() or None self.params['jtalk_dic'] = self.jtalk_dic_var.get() or None self.params['jtalk_user_dic'] = self.jtalk_user_dic_var.get() or None self.params['output_format'] = [fmt for fmt, var in self.output_format_vars.items() if var.get()] self.params['reading_format'] = self.reading_format_var.get() self.params['output_lab'] = self.output_lab_var.get() self.params['output_comparison'] = self.output_comp_var.get() self.params['encoding'] = self.encoding_var.get().strip() or 'utf-8' if not self.params['input_files']: raise ValueError("入力ファイル未選択") if not self.params['output_folder']: raise ValueError("出力フォルダ未選択") if not self.params['output_format']: raise ValueError("出力コーパス形式未選択") if self.params['output_lab'] and self.params['engine'] != 'pyopenjtalk': raise ValueError("LAB出力はpyopenjtalkエンジン選択時のみ可能") if not os.path.exists(self.params['norm_rules']): logger.warning(f"正規化ルールファイル '{self.params['norm_rules']}' が存在しません。デフォルトルールを使用します。") # デフォルトを使用する場合、ファイルパスはNoneとして扱う方が良いかも # self.params['norm_rules'] = None # またはそのまま存在しないパスを渡す except ValueError as ve: messagebox.showerror("入力エラー", str(ve)); return except Exception as e: messagebox.showerror("設定エラー", f"設定値の取得中にエラー: {e}"); return # 処理開始 self.processing_active = True self.run_button.config(state=tk.DISABLED, text="処理中...") self.log_text.configure(state='normal'); self.log_text.delete('1.0', tk.END); self.log_text.configure(state='disabled') logger.info("処理を開始します...") # 別スレッドで実行 self.processing_thread = threading.Thread(target=self._run_processing_logic, daemon=True) self.processing_thread.start() self.after(100, self._check_thread) # スレッド監視開始 def _check_thread(self): if self.processing_thread.is_alive(): self.after(100, self._check_thread) # スレッド終了後の処理は _run_processing_logic の最後で after(0, ...) を使う def _run_processing_logic(self): """実際の処理 (別スレッドで実行)""" exit_code = 1 final_message = "処理中にエラーが発生しました。" message_type = "error" p = self.params # 短縮名 try: # --- ログ表示 (主要パラメータ) --- logger.info(f"入力ファイル数: {len(p['input_files'])}") logger.info(f"出力フォルダ: {p['output_folder']}") logger.info(f"正規化ルール: {p['norm_rules'] if os.path.exists(p['norm_rules']) else 'デフォルト'}") logger.info(f"エンジン: {p['engine']}") logger.info(f"出力形式: {p['output_format']}") logger.info(f"LAB出力: {p['output_lab']}") # --- 初期化 --- normalizer = TextNormalizer(p['norm_rules']) pron_estimator = PronunciationEstimator( engine=p['engine'], janome_udic_path=p['janome_udic'], jtalk_dic_path=p['jtalk_dic'], jtalk_user_dic_path=p['jtalk_user_dic'] ) # --- ファイル処理ループ --- total_success_lines, total_error_lines, file_error_count, processed_files = 0, 0, 0, 0 for i, filepath in enumerate(p['input_files']): logger.info(f"--- 処理中 ({i+1}/{len(p['input_files'])}): {os.path.basename(filepath)} ---") try: norm_filepath = os.path.normpath(filepath) if not os.path.isfile(norm_filepath): logger.warning(f"スキップ (非ファイル): {norm_filepath}"); file_error_count += 1; continue s, e = process_file( # process_fileは (成功数, エラー数) を返す想定 norm_filepath, p['output_folder'], normalizer, pron_estimator, p['output_format'], p['output_lab'], p['reading_format'], p['encoding'], p['output_comparison'] ) total_success_lines += s; total_error_lines += e if e > 0: file_error_count += 1 processed_files += 1 except FileProcessingError as e_fp: logger.error(f"{os.path.basename(filepath)} ファイルエラー: {e_fp}"); file_error_count += 1 except Exception as e_f: logger.error(f"{os.path.basename(filepath)} 予期せぬエラー: {e_f}", exc_info=True); file_error_count += 1 # logger.info(f"--- 処理完了 ({i+1}/{len(p['input_files'])}): {os.path.basename(filepath)} ---") # ログが冗長ならコメントアウト # --- 未知語出力 --- if pron_estimator.unknown_words: unknown_file = os.path.join(p['output_folder'], "unknown_words.txt") try: with open(unknown_file, 'w', encoding='utf-8') as f: f.write("\n".join(sorted(list(pron_estimator.unknown_words)))) logger.warning(f"未知語リスト出力: {unknown_file}") except Exception as e_unk: logger.error(f"未知語リスト出力失敗: {e_unk}") # --- 最終結果判定 --- logger.info("=" * 30 + " 処理結果 " + "=" * 30) logger.info(f"処理ファイル数: {processed_files} / {len(p['input_files'])}") logger.info(f"エラー発生ファイル数: {file_error_count}") logger.info(f"総処理行数: {total_success_lines + total_error_lines}") logger.info(f" 正常処理行数: {total_success_lines}") logger.info(f" エラー発生行数: {total_error_lines}") logger.info("=" * 68) if file_error_count > 0 or total_error_lines > 0: final_message = "処理完了(エラーあり)。詳細はログを確認してください。" message_type = "warning" exit_code = 1 else: final_message = "すべての処理が正常に完了しました。" message_type = "info" exit_code = 0 except InitializationError as e_init: final_message = f"初期化エラー: {e_init}"; message_type = "error" except Exception as e_main: final_message = f"予期せぬエラー: {e_main}"; message_type = "error"; logger.error("予期せぬエラー", exc_info=True) # --- GUI要素の更新 (メインスレッドで実行) --- def update_gui(): self.processing_active = False if self.winfo_exists(): # ウィンドウが閉じられていないか確認 self.run_button.config(state=tk.NORMAL, text="処理実行") if message_type == "info": messagebox.showinfo("完了", final_message) elif message_type == "warning": messagebox.showwarning("完了(一部エラー)", final_message) else: messagebox.showerror("エラー", final_message) self.after(0, update_gui) # スレッド終了 # --- アプリケーション起動 --- if __name__ == "__main__": if not ENABLE_GUI: print("エラー: GUI表示に必要な tkinter が見つかりません。", file=sys.stderr) sys.exit(1) # 利用可能なエンジンがない場合のチェックはGUIクラスの __init__ で行う app = TextProcessorGUI() app.mainloop()