ustwo-api / src /common /phone_simulator.py
asdfasdfqrqwer's picture
Deploy from GitHub 2026-04-23T03:56:31Z
c857b85
Raw
History Blame Contribute Delete
4.57 kB
"""
์ „ํ™” ํ†ตํ™” ํ’ˆ์งˆ ์‹œ๋ฎฌ๋ ˆ์ด์…˜ (PSTN)
๊นจ๋—ํ•œ ๋…น์Œ ์˜ค๋””์˜ค๋ฅผ ์ „ํ™” ํ†ตํ™” ํ’ˆ์งˆ๋กœ ๋ณ€ํ™˜ํ•˜์—ฌ
AI Hub ๋“ฑ์˜ ์ŠคํŠœ๋””์˜ค ๋…น์Œ ๋ฐ์ดํ„ฐ๋ฅผ ํ•™์Šต์šฉ ํ†ตํ™” ๋ฐ์ดํ„ฐ๋กœ ์ „์ฒ˜๋ฆฌํ•œ๋‹ค.
3๋‹จ๊ณ„ ์ฒ˜๋ฆฌ:
1. ๋ฐด๋“œํŒจ์Šค ํ•„ํ„ฐ๋ง (300โ€“3400 Hz) โ€” ITU-T G.712
2. 8 kHz ๋‹ค์šด์ƒ˜ํ”Œ๋ง (anti-aliasing ํฌํ•จ)
3. G.711 ๋น„์„ ํ˜• ์–‘์žํ™” (A-law / ฮผ-law companding)
"""
import audioop
import random
from enum import Enum
import numpy as np
import scipy.signal as signal
class CompandingType(str, Enum):
ALAW = "alaw" # ํ•œ๊ตญ/์œ ๋Ÿฝ/์•„์‹œ์•„ PSTN ํ‘œ์ค€
ULAW = "ulaw" # ๋ถ๋ฏธ PSTN ํ‘œ์ค€
RANDOM = "random" # ๋žœ๋ค ์„ ํƒ (ํ•™์Šต ๋ฐ์ดํ„ฐ ๋‹ค์–‘์„ฑ ํ™•๋ณด)
class PhoneSimulator:
"""๊นจ๋—ํ•œ ๋…น์Œ ์˜ค๋””์˜ค โ†’ ์ „ํ™” ํ†ตํ™” ํ’ˆ์งˆ ๋ณ€ํ™˜๊ธฐ"""
# PSTN ํ‘œ์ค€ ํŒŒ๋ผ๋ฏธํ„ฐ
PSTN_LOW_FREQ = 300.0 # Hz โ€” ITU-T G.712 ํ•˜ํ•œ
PSTN_HIGH_FREQ = 3400.0 # Hz โ€” ITU-T G.712 ์ƒํ•œ
PSTN_SAMPLE_RATE = 8000 # Hz โ€” G.711 ํ‘œ์ค€ ์ƒ˜ํ”Œ๋ ˆ์ดํŠธ
FILTER_ORDER = 5 # Butterworth ํ•„ํ„ฐ ์ฐจ์ˆ˜
def __init__(self, companding: CompandingType = CompandingType.RANDOM):
"""
Args:
companding: ์–‘์žํ™” ๋ฐฉ์‹. RANDOM์ด๋ฉด ํŒŒ์ผ๋งˆ๋‹ค alaw/ulaw ๋žœ๋ค ์„ ํƒ
"""
self.companding = companding
def process(self, audio: np.ndarray, sr: int) -> tuple[np.ndarray, int]:
"""
์ „ํ™” ํ†ตํ™” ํ’ˆ์งˆ ์‹œ๋ฎฌ๋ ˆ์ด์…˜ ์ ์šฉ.
Args:
audio: float32 mono numpy array (๋ฒ”์œ„: -1.0 ~ 1.0)
sr: ์›๋ณธ ์ƒ˜ํ”Œ๋ ˆ์ดํŠธ
Returns:
(์ฒ˜๋ฆฌ๋œ ์˜ค๋””์˜ค, ์ƒˆ ์ƒ˜ํ”Œ๋ ˆ์ดํŠธ=8000)
"""
if audio.ndim != 1:
raise ValueError(f"Mono audio expected, got shape {audio.shape}")
# Step 1: ๋ฐด๋“œํŒจ์Šค ํ•„ํ„ฐ๋ง (300โ€“3400 Hz)
audio = self._bandpass_filter(audio, sr)
# Step 2: 8 kHz ๋‹ค์šด์ƒ˜ํ”Œ๋ง
audio = self._downsample(audio, sr)
# Step 3: G.711 companding (encodeโ†’decode round-trip)
audio = self._compand(audio)
return audio, self.PSTN_SAMPLE_RATE
def _bandpass_filter(self, audio: np.ndarray, sr: int) -> np.ndarray:
"""
ITU-T G.712 ๋Œ€์—ญ ํ•„ํ„ฐ๋ง.
300 Hz ๋ฏธ๋งŒ(ํ—˜ ๋…ธ์ด์ฆˆ) + 3400 Hz ์ด์ƒ(์น˜์ฐฐ์Œ) ์ œ๊ฑฐ.
5์ฐจ Butterworth: ์ถฉ๋ถ„ํžˆ ๊ฐ€ํŒŒ๋ฅด๋ฉด์„œ ringing ์ตœ์†Œํ™”.
"""
nyq = sr / 2.0
low = self.PSTN_LOW_FREQ / nyq
high = self.PSTN_HIGH_FREQ / nyq
# ๋‚˜์ดํ€ด์ŠคํŠธ ์ด์ƒ์ด๋ฉด ํ•„ํ„ฐ ์ ์šฉ ๋ถˆ๊ฐ€ (์ด๋ฏธ ๋Œ€์—ญ ๋‚ด)
if high >= 1.0:
high = 0.99
if low <= 0.0:
low = 0.01
b, a = signal.butter(self.FILTER_ORDER, [low, high], btype="band")
return signal.filtfilt(b, a, audio).astype(np.float32)
def _downsample(self, audio: np.ndarray, sr: int) -> np.ndarray:
"""
Anti-aliasing + ๋‹ค์šด์ƒ˜ํ”Œ๋ง.
scipy.signal.resample_poly๋Š” ๋‚ด๋ถ€์ ์œผ๋กœ anti-aliasing ํ•„ํ„ฐ๋ฅผ ์ ์šฉํ•˜์—ฌ
์—์ผ๋ฆฌ์–ด์‹ฑ์„ ๋ฐฉ์ง€ํ•œ๋‹ค.
"""
if sr == self.PSTN_SAMPLE_RATE:
return audio
# GCD ๊ธฐ๋ฐ˜ rational resampling (resample_poly๊ฐ€ ๋” ์ •ํ™•)
gcd = np.gcd(sr, self.PSTN_SAMPLE_RATE)
up = self.PSTN_SAMPLE_RATE // gcd
down = sr // gcd
return signal.resample_poly(audio, up, down).astype(np.float32)
def _compand(self, audio: np.ndarray) -> np.ndarray:
"""
G.711 A-law/ฮผ-law encodeโ†’decode round-trip.
16๋น„ํŠธ โ†’ 8๋น„ํŠธ ์••์ถ• โ†’ 16๋น„ํŠธ ๋ณต์› ๊ณผ์ •์—์„œ
๋น„์„ ํ˜• ์–‘์žํ™” ๋…ธ์ด์ฆˆ๊ฐ€ ๋ฐœ์ƒํ•˜์—ฌ ์ „ํ™” ํŠน์œ ์˜ '๊ฑฐ์นœ' ์Œ์ƒ‰์„ ๋งŒ๋“ ๋‹ค.
"""
# companding ๋ฐฉ์‹ ๊ฒฐ์ •
if self.companding == CompandingType.RANDOM:
method = random.choice([CompandingType.ALAW, CompandingType.ULAW])
else:
method = self.companding
# float32 โ†’ 16-bit PCM
pcm16 = np.clip(audio * 32767, -32768, 32767).astype(np.int16)
raw_bytes = pcm16.tobytes()
# encode (16bit โ†’ 8bit) โ†’ decode (8bit โ†’ 16bit) round-trip
if method == CompandingType.ALAW:
compressed = audioop.lin2alaw(raw_bytes, 2)
decompressed = audioop.alaw2lin(compressed, 2)
else:
compressed = audioop.lin2ulaw(raw_bytes, 2)
decompressed = audioop.ulaw2lin(compressed, 2)
# 16-bit PCM โ†’ float32
return np.frombuffer(decompressed, dtype=np.int16).astype(np.float32) / 32767.0