import subprocess import logging import string from pathlib import Path from collections import OrderedDict from nltk.tokenize import TweetTokenizer from typing import List, Dict, Optional import re # Constants SUPPORTED_LANGUAGES = {'eu', 'es'} SUPPORTED_SYMBOLS = {'sampa', 'ipa'} SAMPA_TO_IPA = OrderedDict([ ("p", "p"), ("b", "b"), ("t", "t"), ("c", "c"), ("d", "d"), ("k", "k"), ("g", "ɡ"), ("tS", "tʃ"), ("ts", "ts"), ("ts`", "tʂ"), ("gj", "ɟ"), ("jj", "ʝ"), ("f", "f"), ("B", "β"), ("T", "θ"), ("D", "ð"), ("s", "s"), ("s`", "ʂ"), ("S", "ʃ"), ("x", "x"), ("G", "ɣ"), ("m", "m"), ("n", "n"), ("J", "ɲ"), ("l", "l"), ("L", "ʎ"), ("r", "ɾ"), ("rr", "r"), ("j", "j"), ("w", "w"), ("i", "i"), ("'i", "'i"), ("e", "e"), ("'e", "'e"), ("a", "a"), ("'a", "'a"), ("o", "o"), ("'o", "'o"), ("u", "u"), ("'u", "'u"), ("y", "y"), ("Z", "ʒ"), ("h", "h"), ("ph", "pʰ"), ("kh", "kʰ"), ("th", "tʰ") ]) MULTICHAR_TO_SINGLECHAR = { "tʃ": "C", "ts": "V", "tʂ": "P", "'i": "I", "'e": "E", "'a": "A", "'o": "O", "'u": "U", "pʰ": "H", "kʰ": "K", "tʰ": "T" } class PhonemizerError(Exception): """Custom exception for Phonemizer errors.""" pass class Phonemizer: def __init__(self, language: str = "eu", symbol: str = "sampa", path_modulo1y2: str = "modulo1y2/modulo1y2", path_dicts: str = "dict") -> None: """Initialize the Phonemizer with the given language and symbol.""" if language not in SUPPORTED_LANGUAGES: raise PhonemizerError(f"Unsupported language: {language}") if symbol not in SUPPORTED_SYMBOLS: raise PhonemizerError(f"Unsupported symbol type: {symbol}") self.language = language self.symbol = symbol self.path_modulo1y2 = Path(path_modulo1y2) self.path_dicts = Path(path_dicts) self.logger = logging.getLogger(__name__) # Initialize SAMPA to IPA dictionary self._sampa_to_ipa_dict = SAMPA_TO_IPA # Initialize word splitter regex self._word_splitter = re.compile(r'\w+|[^\w\s]', re.UNICODE) self._validate_paths() def normalize(self, text: str) -> str: """Normalize the given text using an external command.""" try: command = self._build_normalization_command() process = subprocess.Popen( command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding='ISO-8859-15', shell=True ) stdout, stderr = process.communicate(input=text) if process.returncode != 0: # Filter out the SetDur warning from the error message filtered_stderr = '\n'.join(line for line in stderr.split('\n') if 'Warning: argument not used SetDur' not in line) if filtered_stderr.strip(): # Only raise error if there are other errors error_msg = f"Normalization failed: {filtered_stderr}" self.logger.error(error_msg) raise PhonemizerError(error_msg) return stdout.strip() except Exception as e: error_msg = f"Error during normalization: {str(e)}" self.logger.error(error_msg) return text def getPhonemes(self, text: str, separate_phonemes: bool = False, use_single_char: bool = False) -> str: """Extract phonemes from the given text. Args: text (str): The input text to convert to phonemes separate_phonemes (bool): If True, keeps spaces between phonemes. If False, produces compact phoneme strings. Defaults to False. use_single_char (bool): When `symbol` is "ipa" and True, collapse multichar IPA sequences into mapped single characters (uses `_transform_multichar_phonemes`). Defaults to False. Returns: str: The phoneme sequence with words separated by " | " """ try: # Pre-process text to handle dots consistently # Replace multiple dots with a single dot to avoid issues with ellipsis text = re.sub(r'\.{2,}', '.', text) # Process input line-by-line so we preserve original newlines lines = text.split('\n') per_line_outputs = [] for line in lines: # If the input line is empty, preserve empty line if not line.strip(): per_line_outputs.append('') continue command = self._build_phoneme_extraction_command() proc = subprocess.Popen( command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding='ISO-8859-15', shell=True ) stdout, stderr = proc.communicate(input=line) if proc.returncode != 0: error_msg = f"Phoneme extraction failed: {stderr}" self.logger.error(error_msg) raise PhonemizerError(error_msg) # Replace any internal newlines in tool output with sentinel (shouldn't normally occur for single line) stdout_line = stdout.replace('\n', ' | _ | ') # Split into words and handle each separately for this line word_phonemes = stdout_line.split(" | ") result_phonemes = [] cleaned_phonemes = [] for phoneme_seq in word_phonemes: if not phoneme_seq.strip(): continue if phoneme_seq.strip() == "_": continue cleaned_phonemes.append(phoneme_seq.strip()) # Tokenize the original line into words/punctuation words = self._word_splitter.findall(line) # Count non-punctuation words non_punct_words = [w for w in words if w not in string.punctuation] # Ensure we have enough phonemes for all non-punctuation words if len(cleaned_phonemes) < len(non_punct_words): while len(cleaned_phonemes) < len(non_punct_words): if cleaned_phonemes: cleaned_phonemes.append(cleaned_phonemes[-1]) else: cleaned_phonemes.append("a") # Process words and phonemes together for this line phoneme_idx = 0 word_idx = 0 line_result = [] while word_idx < len(words): word = words[word_idx] if word in string.punctuation: line_result.append(word) word_idx += 1 continue # Regular word processing if phoneme_idx < len(cleaned_phonemes): phonemes = cleaned_phonemes[phoneme_idx].split() if self.symbol == "sampa": if separate_phonemes: processed_phonemes = " ".join(p for p in phonemes if p != "-") else: processed_phonemes = "".join(p for p in phonemes if p != "-") else: ipa_phonemes = [self._sampa_to_ipa_dict.get(p, p) for p in phonemes if p != "-"] if separate_phonemes: processed_phonemes = " ".join(ipa_phonemes) else: # Start with spaced IPA tokens to allow matching multichar tokens processed_phonemes = " ".join(ipa_phonemes) if use_single_char: processed_phonemes = self._transform_multichar_phonemes(processed_phonemes) # Remove spaces for compact form processed_phonemes = processed_phonemes.replace(" ", "") line_result.append(processed_phonemes) phoneme_idx += 1 word_idx += 1 else: # No phoneme left for this word: skip it word_idx += 1 # If there are leftover phonemes, append them while phoneme_idx < len(cleaned_phonemes): phonemes = cleaned_phonemes[phoneme_idx].split() if self.symbol == "sampa": processed_phonemes = " ".join(p for p in phonemes if p != "-") else: ipa_phonemes = [self._sampa_to_ipa_dict.get(p, p) for p in phonemes if p != "-"] if separate_phonemes: processed_phonemes = " ".join(ipa_phonemes) else: processed_phonemes = " ".join(ipa_phonemes) if use_single_char: processed_phonemes = self._transform_multichar_phonemes(processed_phonemes) processed_phonemes = processed_phonemes.replace(" ", "") line_result.append(processed_phonemes) phoneme_idx += 1 # Format final output for this line using spacing rules out_parts = [] # Keep a parallel map to the original words so we can decide sentence splits orig_map = [] for idx, token in enumerate(line_result): is_punct = token in string.punctuation if not is_punct: normalized = re.sub(r"\s+", " ", token.strip()) out_parts.append(normalized) # Map this output token to the corresponding original word (if available) if idx < len(words): orig_map.append(words[idx]) else: orig_map.append(None) else: out_parts.append(token) if idx < len(words): orig_map.append(words[idx]) else: orig_map.append(None) final_line = "" for i, tok in enumerate(out_parts): if i == 0: final_line += tok continue prev = out_parts[i-1] if tok in string.punctuation: final_line = final_line.rstrip(' ') final_line += (' ' if separate_phonemes else ' ') + tok # Preserve input line boundaries: do NOT insert newlines mid-line. # Always add the standard separator after punctuation. if i < len(out_parts) - 1: final_line += (' ' if separate_phonemes else ' ') else: if prev in string.punctuation: final_line += tok else: sep = ' ' if separate_phonemes else ' ' final_line += sep + tok # If a sentence-ending punctuation is followed by a capital letter, # split into separate lines (keeps numeric periods like "1980. urtean" intact). # This turns "... ? Ni ..." into two lines at the sentence boundary. split_line = re.sub(r"(?<=[\?\!\.])\s+(?=[A-ZÁÉÍÓÚÜÑ])", "\n", final_line) per_line_outputs.append(split_line) return "\n".join(per_line_outputs) except Exception as e: error_msg = f"Error in phoneme extraction: {str(e)}" self.logger.error(error_msg) return "" def _build_normalization_command(self) -> str: """Build the command string for normalization.""" modulo_path = self._get_file_path() / self.path_modulo1y2 dict_path = self._get_file_path() / self.path_dicts dict_file = f"{self.language}_dicc" return f'{modulo_path} -TxtMode=Word -Lang={self.language} -HDic={dict_path/dict_file}' def _build_phoneme_extraction_command(self) -> str: """Build the command string for phoneme extraction.""" modulo_path = self._get_file_path() / self.path_modulo1y2 dict_path = self._get_file_path() / self.path_dicts dict_file = f"{self.language}_dicc" return f'{modulo_path} -Lang={self.language} -HDic={dict_path/dict_file}' def _get_file_path(self) -> Path: return Path(__file__).parent def _validate_paths(self) -> None: """Validate paths with enhanced error reporting.""" try: if not self.path_modulo1y2.exists(): raise PhonemizerError(f"Modulo1y2 executable not found at: {self.path_modulo1y2}") if not self.path_dicts.exists(): raise PhonemizerError(f"Dictionary directory not found at: {self.path_dicts}") # Check for both possible dictionary files dict_file = self.path_dicts / f"{self.language}_dicc" if not dict_file.exists(): # Try with .dic extension as fallback dict_file_alt = self.path_dicts / f"{self.language}_dicc.dic" if not dict_file_alt.exists(): raise PhonemizerError(f"Dictionary file not found at either {dict_file} or {dict_file_alt}") except Exception as e: self.logger.error(f"Path validation error: {str(e)}") raise def _transform_multichar_phonemes(self, phoneme_sequence: str) -> str: """ Transform multicharacter IPA phonemes to single characters using the MULTICHAR_TO_SINGLECHAR mapping. Args: phoneme_sequence (str): A string containing phonemes separated by spaces Returns: str: The transformed phoneme sequence with multicharacter phonemes replaced by single characters """ # Split the sequence into individual phonemes phonemes = phoneme_sequence.split() transformed_phonemes = [] for phoneme in phonemes: # Check if the phoneme exists in our mapping if phoneme in MULTICHAR_TO_SINGLECHAR: transformed_phonemes.append(MULTICHAR_TO_SINGLECHAR[phoneme]) else: transformed_phonemes.append(phoneme) return " ".join(transformed_phonemes)