| """ |
| ์ ํ ํตํ ํ์ง ์๋ฎฌ๋ ์ด์
(PSTN) |
| |
| ๊นจ๋ํ ๋
น์ ์ค๋์ค๋ฅผ ์ ํ ํตํ ํ์ง๋ก ๋ณํํ์ฌ |
| AI Hub ๋ฑ์ ์คํ๋์ค ๋
น์ ๋ฐ์ดํฐ๋ฅผ ํ์ต์ฉ ํตํ ๋ฐ์ดํฐ๋ก ์ ์ฒ๋ฆฌํ๋ค. |
| |
| 3๋จ๊ณ ์ฒ๋ฆฌ: |
| 1. ๋ฐด๋ํจ์ค ํํฐ๋ง (300โ3400 Hz) โ ITU-T G.712 |
| 2. 8 kHz ๋ค์ด์ํ๋ง (anti-aliasing ํฌํจ) |
| 3. G.711 ๋น์ ํ ์์ํ (A-law / ฮผ-law companding) |
| """ |
|
|
| import audioop |
| import random |
| from enum import Enum |
|
|
| import numpy as np |
| import scipy.signal as signal |
|
|
|
|
| class CompandingType(str, Enum): |
| ALAW = "alaw" |
| ULAW = "ulaw" |
| RANDOM = "random" |
|
|
|
|
| class PhoneSimulator: |
| """๊นจ๋ํ ๋
น์ ์ค๋์ค โ ์ ํ ํตํ ํ์ง ๋ณํ๊ธฐ""" |
|
|
| |
| PSTN_LOW_FREQ = 300.0 |
| PSTN_HIGH_FREQ = 3400.0 |
| PSTN_SAMPLE_RATE = 8000 |
| FILTER_ORDER = 5 |
|
|
| def __init__(self, companding: CompandingType = CompandingType.RANDOM): |
| """ |
| Args: |
| companding: ์์ํ ๋ฐฉ์. RANDOM์ด๋ฉด ํ์ผ๋ง๋ค alaw/ulaw ๋๋ค ์ ํ |
| """ |
| self.companding = companding |
|
|
| def process(self, audio: np.ndarray, sr: int) -> tuple[np.ndarray, int]: |
| """ |
| ์ ํ ํตํ ํ์ง ์๋ฎฌ๋ ์ด์
์ ์ฉ. |
| |
| Args: |
| audio: float32 mono numpy array (๋ฒ์: -1.0 ~ 1.0) |
| sr: ์๋ณธ ์ํ๋ ์ดํธ |
| |
| Returns: |
| (์ฒ๋ฆฌ๋ ์ค๋์ค, ์ ์ํ๋ ์ดํธ=8000) |
| """ |
| if audio.ndim != 1: |
| raise ValueError(f"Mono audio expected, got shape {audio.shape}") |
|
|
| |
| audio = self._bandpass_filter(audio, sr) |
|
|
| |
| audio = self._downsample(audio, sr) |
|
|
| |
| audio = self._compand(audio) |
|
|
| return audio, self.PSTN_SAMPLE_RATE |
|
|
| def _bandpass_filter(self, audio: np.ndarray, sr: int) -> np.ndarray: |
| """ |
| ITU-T G.712 ๋์ญ ํํฐ๋ง. |
| |
| 300 Hz ๋ฏธ๋ง(ํ ๋
ธ์ด์ฆ) + 3400 Hz ์ด์(์น์ฐฐ์) ์ ๊ฑฐ. |
| 5์ฐจ Butterworth: ์ถฉ๋ถํ ๊ฐํ๋ฅด๋ฉด์ ringing ์ต์ํ. |
| """ |
| nyq = sr / 2.0 |
| low = self.PSTN_LOW_FREQ / nyq |
| high = self.PSTN_HIGH_FREQ / nyq |
|
|
| |
| if high >= 1.0: |
| high = 0.99 |
| if low <= 0.0: |
| low = 0.01 |
|
|
| b, a = signal.butter(self.FILTER_ORDER, [low, high], btype="band") |
| return signal.filtfilt(b, a, audio).astype(np.float32) |
|
|
| def _downsample(self, audio: np.ndarray, sr: int) -> np.ndarray: |
| """ |
| Anti-aliasing + ๋ค์ด์ํ๋ง. |
| |
| scipy.signal.resample_poly๋ ๋ด๋ถ์ ์ผ๋ก anti-aliasing ํํฐ๋ฅผ ์ ์ฉํ์ฌ |
| ์์ผ๋ฆฌ์ด์ฑ์ ๋ฐฉ์งํ๋ค. |
| """ |
| if sr == self.PSTN_SAMPLE_RATE: |
| return audio |
|
|
| |
| gcd = np.gcd(sr, self.PSTN_SAMPLE_RATE) |
| up = self.PSTN_SAMPLE_RATE // gcd |
| down = sr // gcd |
| return signal.resample_poly(audio, up, down).astype(np.float32) |
|
|
| def _compand(self, audio: np.ndarray) -> np.ndarray: |
| """ |
| G.711 A-law/ฮผ-law encodeโdecode round-trip. |
| |
| 16๋นํธ โ 8๋นํธ ์์ถ โ 16๋นํธ ๋ณต์ ๊ณผ์ ์์ |
| ๋น์ ํ ์์ํ ๋
ธ์ด์ฆ๊ฐ ๋ฐ์ํ์ฌ ์ ํ ํน์ ์ '๊ฑฐ์น' ์์์ ๋ง๋ ๋ค. |
| """ |
| |
| if self.companding == CompandingType.RANDOM: |
| method = random.choice([CompandingType.ALAW, CompandingType.ULAW]) |
| else: |
| method = self.companding |
|
|
| |
| pcm16 = np.clip(audio * 32767, -32768, 32767).astype(np.int16) |
| raw_bytes = pcm16.tobytes() |
|
|
| |
| if method == CompandingType.ALAW: |
| compressed = audioop.lin2alaw(raw_bytes, 2) |
| decompressed = audioop.alaw2lin(compressed, 2) |
| else: |
| compressed = audioop.lin2ulaw(raw_bytes, 2) |
| decompressed = audioop.ulaw2lin(compressed, 2) |
|
|
| |
| return np.frombuffer(decompressed, dtype=np.int16).astype(np.float32) / 32767.0 |
|
|