Instructions to use happyme531/Qwen3-ASR-1.7B-RKLLM with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- RKLLM
How to use happyme531/Qwen3-ASR-1.7B-RKLLM with RKLLM:
# No code snippets available yet for this library. # To use this model, check the repository files and the library's documentation. # Want to help? PRs adding snippets are welcome at: # https://github.com/huggingface/huggingface.js
- Notebooks
- Google Colab
- Kaggle
改了一个mic采集实时输出的
#2
by thomascatlee - opened
run_qwen3_asr_realtime.py
#!/usr/bin/env python3
"""
实时音频采集与语音识别 (Qwen3-ASR + RKNN/RKLLM) — VAD 驱动模式
基于 run_qwen3_asr_e2e.py 改造, 支持从麦克风实时采集音频并流式识别。
原理: 持续监听麦克风, 通过 VAD (语音活动检测) 自动切割语音段,
每检测到一段完整语音, 立即送入流水线: 预处理→mel→encoder→LLM。
识别结果 token 级流式输出, 说完一句出识别一句。
音频采集使用 arecord (无需额外 Python 依赖), 通过 subprocess pipe 读取。
"""
import argparse
import ctypes
import math
import os
import queue
import subprocess
import sys
import threading
import time
from pathlib import Path
import faulthandler
import numpy as np
from transformers import WhisperFeatureExtractor
faulthandler.enable()
os.environ.setdefault("RKLLM_LOG_LEVEL", "0") # 0=仅错误, 1=信息, 2=调试; 0抑制统计日志
REPO_ROOT = Path(__file__).resolve().parent
if str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))
from rkllm_binding import ( # noqa: E402
LLMCallState,
RKLLMInferMode,
RKLLMInferParam,
RKLLMInput,
RKLLMInputType,
RKLLMResult,
RKLLMRuntime,
)
import ztu_somemodelruntime_ez_rknn_async as ort # noqa: E402
# ---------------------------------------------------------------------------
# 常量
# ---------------------------------------------------------------------------
DEFAULT_ENCODER_PATH = "rknn/audio_encoder.rknn"
DEFAULT_LLM_PATH = "rknn/language_model.rkllm"
SAMPLE_RATE = 16000
# ---------------------------------------------------------------------------
# 工具函数
# ---------------------------------------------------------------------------
def now() -> float:
return time.perf_counter()
def preprocess_audio(waveform: np.ndarray, sample_rate: int = 16000) -> np.ndarray:
"""麦克风音频预处理: DC去除 + 预加重 + 简单降噪.
实时麦克风输入与干净 wav 文件的主要差异:
- DC 偏移 (电路偏置)
- 低频底噪
- 高频衰减
"""
waveform = np.asarray(waveform, dtype=np.float32).copy()
# 1. DC 偏移去除
waveform -= np.mean(waveform)
# 2. 预加重 (pre-emphasis), 补偿高频衰减, 系数 α=0.97
preemphasis = 0.97
waveform[1:] -= preemphasis * waveform[:-1]
waveform[0] *= (1.0 - preemphasis)
# 3. 简单能量门限: 极低能量置零
rms = float(np.sqrt(np.mean(waveform ** 2)))
if rms < 1e-6:
return waveform
# 4. 峰值归一化, 防止削波
peak = np.max(np.abs(waveform))
if peak > 0.95:
waveform *= (0.95 / peak)
return waveform.astype(np.float32)
# ---------------------------------------------------------------------------
# 流式文本收集器 (回调)
# ---------------------------------------------------------------------------
class StreamingTextCollector:
def __init__(self, stream_output: bool = True):
self.stream_output = stream_output
self.parts: list[str] = []
self.error = False
def __call__(self, result_ptr, userdata_ptr, state_enum):
state = LLMCallState(state_enum)
result: RKLLMResult = result_ptr.contents
if state == LLMCallState.RKLLM_RUN_NORMAL and result.text:
chunk = result.text.decode("utf-8", errors="ignore")
self.parts.append(chunk)
if self.stream_output:
print(chunk, end="", flush=True)
elif state == LLMCallState.RKLLM_RUN_FINISH and self.stream_output:
print(flush=True)
elif state == LLMCallState.RKLLM_RUN_ERROR:
self.error = True
if self.stream_output:
print("\n[识别错误]", flush=True)
return 0
def reset(self):
self.parts.clear()
self.error = False
@property
def text(self) -> str:
return "".join(self.parts)
# ---------------------------------------------------------------------------
# 特征提取
# ---------------------------------------------------------------------------
def configure_feature_extractor_for_audio(
feature_extractor: WhisperFeatureExtractor, waveform: np.ndarray
) -> None:
required_seconds = max(1, math.ceil(waveform.shape[0] / float(feature_extractor.sampling_rate)))
if required_seconds <= feature_extractor.chunk_length:
return
feature_extractor.chunk_length = required_seconds
feature_extractor.n_samples = int(required_seconds * feature_extractor.sampling_rate)
feature_extractor.nb_max_frames = feature_extractor.n_samples // feature_extractor.hop_length
def extract_mel_features(
feature_extractor: WhisperFeatureExtractor, waveform: np.ndarray
) -> tuple[np.ndarray, int]:
"""从波形提取 mel 特征 (支持变长输入)."""
if waveform.ndim == 2:
waveform = waveform.mean(axis=-1)
waveform = np.asarray(waveform, dtype=np.float32)
configure_feature_extractor_for_audio(feature_extractor, waveform)
outputs = feature_extractor(
waveform,
sampling_rate=SAMPLE_RATE,
return_attention_mask=True,
return_tensors="np",
)
input_features = outputs["input_features"][0].astype(np.float32)
feature_len = int(outputs["attention_mask"][0].sum())
return input_features, feature_len
def split_mel_features(
input_features: np.ndarray, feature_len: int, chunk_frames: int
) -> list[tuple[np.ndarray, int]]:
chunks = []
start = 0
while start < feature_len:
cur_len = min(chunk_frames, feature_len - start)
chunk = np.zeros((input_features.shape[0], chunk_frames), dtype=np.float32)
chunk[:, :cur_len] = input_features[:, start: start + cur_len]
chunks.append((chunk, cur_len))
start += cur_len
return chunks
def get_chunk_output_length_value(length: int) -> int:
value = int(length)
value = (value + 1) // 2
value = (value + 1) // 2
value = (value + 1) // 2
return value
# ---------------------------------------------------------------------------
# 音频编码器 (RKNN)
# ---------------------------------------------------------------------------
def run_audio_encoder(
session,
input_features: np.ndarray,
feature_len: int,
chunk_frames: int,
) -> np.ndarray:
chunks = split_mel_features(input_features, feature_len, chunk_frames)
if not chunks:
return np.zeros((0, 2048), dtype=np.float32)
batch_tensor = np.ascontiguousarray(
np.stack([chunk for chunk, _ in chunks], axis=0),
dtype=np.float32,
)
session_outputs = session.run(
None,
{"input_features": batch_tensor},
run_options={"ztu_modelrt_dispatch_batch": True},
)
audio_features = np.asarray(session_outputs[0], dtype=np.float32)
if len(session_outputs) >= 2:
valid_lens = np.asarray(session_outputs[1]).reshape(-1)
if valid_lens.size == 1 and len(chunks) > 1:
valid_lens = np.repeat(valid_lens, len(chunks))
else:
valid_lens = np.asarray(
[get_chunk_output_length_value(chunk_len) for _, chunk_len in chunks],
dtype=np.int32,
)
if audio_features.shape[0] != len(chunks):
raise RuntimeError(
f"Audio encoder batch mismatch: got {audio_features.shape[0]} outputs for {len(chunks)} inputs."
)
if valid_lens.size != len(chunks):
raise RuntimeError(
f"Audio encoder valid length mismatch: got {valid_lens.size} lengths for {len(chunks)} inputs."
)
outputs = [audio_features[idx, : int(valid_len)] for idx, valid_len in enumerate(valid_lens)]
return np.concatenate(outputs, axis=0)
# ---------------------------------------------------------------------------
# LLM (RKLLM)
# ---------------------------------------------------------------------------
def build_chat_template(system_prompt: str, force_language) -> tuple[str, str, str]:
assistant_prefix = ""
if force_language:
assistant_prefix = f"language {force_language}<asr_text>"
return (
f"<|im_start|>system\n{system_prompt or ''}<|im_end|>\n",
"<|im_start|>user\n",
f"<|im_end|>\n<|im_start|>assistant\n{assistant_prefix}",
)
def load_rkllm(
llm_model_path: str,
max_new_tokens: int,
max_context_len: int,
top_k: int,
system_prompt: str,
force_language,
stream_output: bool,
):
collector = StreamingTextCollector(stream_output=stream_output)
rk_llm = RKLLMRuntime()
param = rk_llm.create_default_param()
param.model_path = llm_model_path.encode("utf-8")
param.top_k = top_k
param.max_new_tokens = max_new_tokens
param.max_context_len = max_context_len
param.skip_special_token = True
param.img_start = b"<|audio_start|>"
param.img_end = b"<|audio_end|>"
param.img_content = b"<|audio_pad|>"
param.extend_param.base_domain_id = 1
rk_llm.init(param, collector)
system_text, prompt_prefix, prompt_postfix = build_chat_template(
system_prompt=system_prompt,
force_language=force_language,
)
rk_llm.set_chat_template(
system_prompt=system_text,
prompt_prefix=prompt_prefix,
prompt_postfix=prompt_postfix,
)
return rk_llm, collector
def run_rkllm(
rk_llm: RKLLMRuntime,
audio_features: np.ndarray,
keep_history: int = 0,
) -> None:
rkllm_input = RKLLMInput()
rkllm_input.role = b"user"
rkllm_input.input_type = RKLLMInputType.RKLLM_INPUT_MULTIMODAL
flattened = np.ascontiguousarray(audio_features.reshape(-1), dtype=np.float32)
rkllm_input.multimodal_input.prompt = b"<image>"
rkllm_input.multimodal_input.image_embed = flattened.ctypes.data_as(
ctypes.POINTER(ctypes.c_float)
)
rkllm_input.multimodal_input.n_image_tokens = audio_features.shape[0]
rkllm_input.multimodal_input.n_image = 1
rkllm_input.multimodal_input.image_height = 1
rkllm_input.multimodal_input.image_width = max(audio_features.shape[0], 1)
infer_param = RKLLMInferParam()
infer_param.mode = RKLLMInferMode.RKLLM_INFER_GENERATE
infer_param.keep_history = keep_history
rk_llm.run(rkllm_input, infer_param)
# ---------------------------------------------------------------------------
# 实时音频电平表 (终端可视化)
# ---------------------------------------------------------------------------
class AudioLevelMeter:
"""在终端实时显示当前音频输入电平, 让用户确认音频正在正常输入."""
def __init__(self, bar_width: int = 40, update_interval: float = 0.1):
self.bar_width = bar_width
self.update_interval = update_interval # 最短刷新间隔 (秒)
self._last_draw = 0.0
self._peak: float = 0.0
self._visible = False
def feed(self, samples: np.ndarray):
"""喂入一帧音频, 内部追踪峰值."""
rms = float(np.sqrt(np.mean(samples.astype(np.float64) ** 2)))
if rms > self._peak:
self._peak = rms
def draw(self, force: bool = False):
"""在终端原地刷新电平表 (如果距离上次刷新已超过 update_interval)."""
t = time.monotonic()
if not force and (t - self._last_draw) < self.update_interval:
return
self._last_draw = t
rms = self._peak
self._peak = 0.0
# 动态范围: 0 ~ 0.5 (clamp at 0.5 for display)
ratio = min(rms / 0.1, 1.0) # 0.1 对应满格
filled = int(ratio * self.bar_width)
if rms < 0.001:
# 几乎无声
bar = "▁" * self.bar_width
else:
bar = "█" * filled + "░" * (self.bar_width - filled)
# 用颜色直观标识: 绿=正常, 黄=较响, 红=削波风险
if rms < 0.02:
db_str = f"\033[32m{bar}\033[0m" # 绿色: 安静/正常
elif rms < 0.2:
db_str = f"\033[33m{bar}\033[0m" # 黄色: 较响
else:
db_str = f"\033[31m{bar}\033[0m" # 红色: 非常大
line = f"\r🎤 [{db_str}] {rms:.4f} "
print(line, end="", flush=True)
self._visible = True
def hide(self):
"""清除电平表行, 为后续输出腾空."""
if self._visible:
print("\r" + " " * 90 + "\r", end="", flush=True)
self._visible = False
# ---------------------------------------------------------------------------
# VAD (语音活动检测)
# ---------------------------------------------------------------------------
class EnergyVAD:
"""基于能量的简单语音活动检测."""
def __init__(
self,
sample_rate: int = 16000,
frame_duration_ms: int = 30,
speech_threshold: float = 0.01,
silence_duration_sec: float = 1.0,
min_speech_duration_sec: float = 0.3,
):
self.sample_rate = sample_rate
self.frame_size = int(sample_rate * frame_duration_ms / 1000)
self.speech_threshold = speech_threshold
self.silence_frames = int(silence_duration_sec * 1000 / frame_duration_ms)
self.min_speech_samples = int(min_speech_duration_sec * sample_rate)
self.is_speech = False
self.speech_frame_count = 0
self.silence_frame_count = 0
self.buffer: list[np.ndarray] = []
def reset(self):
self.is_speech = False
self.speech_frame_count = 0
self.silence_frame_count = 0
self.buffer.clear()
def add_frame(self, frame: np.ndarray) -> tuple[bool, np.ndarray | None]:
"""
处理一个音频帧.
返回 (speech_ended, audio_segment).
"""
energy = float(np.sqrt(np.mean(frame ** 2)))
self.buffer.append(frame.copy())
if energy > self.speech_threshold:
self.speech_frame_count += 1
self.silence_frame_count = 0
if not self.is_speech and self.speech_frame_count >= 3:
self.is_speech = True
else:
self.silence_frame_count += 1
if self.is_speech:
self.speech_frame_count = 0
# 语音结束判定
if self.is_speech and self.silence_frame_count >= self.silence_frames:
segment = np.concatenate(self.buffer)
self.reset()
# 检查最小时长
if len(segment) < self.min_speech_samples:
return False, None
return True, segment
return False, None
def get_current_audio(self) -> np.ndarray | None:
"""获取当前缓冲区中的音频 (不重置状态)."""
if self.buffer:
return np.concatenate(self.buffer)
return None
# ---------------------------------------------------------------------------
# 音频采集 (arecord subprocess)
# ---------------------------------------------------------------------------
class AudioCapture:
"""基于 arecord 的实时音频采集 (通过 subprocess pipe 读取)."""
BYTES_PER_SAMPLE = 2 # S16_LE
dtype = np.int16
def __init__(
self,
sample_rate: int = 16000,
block_size: int = 480,
device: str | None = None,
channels: int = 2,
):
self.sample_rate = sample_rate
self.block_size = block_size
self.device = device
self.channels = channels
self.bytes_per_frame = block_size * self.BYTES_PER_SAMPLE * channels
self.audio_queue: queue.Queue = queue.Queue(maxsize=500)
self._proc: subprocess.Popen | None = None
self._thread: threading.Thread | None = None
self._running = False
def _reader_thread(self):
"""后台线程: 持续从 arecord stdout 读取 raw PCM 并写入队列."""
while self._running and self._proc and self._proc.stdout:
try:
raw = self._proc.stdout.read(self.bytes_per_frame)
except Exception:
break
if not raw:
break
if len(raw) < self.bytes_per_frame:
# 不足一帧, 丢弃
continue
# raw bytes → int16 → float32 [-1, 1]
samples = np.frombuffer(raw, dtype=np.int16).astype(np.float32) / 32768.0
if self.channels > 1:
samples = samples.reshape(-1, self.channels).mean(axis=1)
try:
self.audio_queue.put_nowait(samples)
except queue.Full:
try:
self.audio_queue.get_nowait()
self.audio_queue.put_nowait(samples)
except queue.Empty:
pass
def start(self):
if self._running:
return
# 构造 arecord 命令
cmd = [
"arecord",
"-f", "S16_LE", # 16-bit little-endian
"-r", str(self.sample_rate),
"-c", str(self.channels),
"-t", "raw", # 输出原始 PCM
"-q", # 静默模式
]
if self.device is not None:
cmd.extend(["-D", self.device])
self._running = True
self._proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
)
self._thread = threading.Thread(target=self._reader_thread, daemon=True)
self._thread.start()
def stop(self):
self._running = False
if self._thread and self._thread.is_alive():
self._thread.join(timeout=1.0)
if self._proc:
# 优雅终止 arecord
try:
self._proc.terminate()
self._proc.wait(timeout=1.0)
except Exception:
try:
self._proc.kill()
self._proc.wait(timeout=1.0)
except Exception:
pass
self._proc = None
self._thread = None
def read(self, timeout: float = 0.1) -> np.ndarray | None:
try:
return self.audio_queue.get(timeout=timeout)
except queue.Empty:
return None
def list_audio_devices():
"""通过 arecord -l 列出可用录音设备."""
print("可用的录音设备 (arecord -l):")
print("-" * 72)
try:
result = subprocess.run(
["arecord", "-l"],
capture_output=True, text=True,
)
if result.returncode == 0 and result.stdout.strip():
print(result.stdout)
else:
print("(无输出)")
except FileNotFoundError:
print("arecord 未找到。请安装 alsa-utils: apt install alsa-utils")
print("-" * 72)
print("使用 --device 参数指定设备名, 如 --device 'hw:0,0' 或 --device 'plughw:1,0'")
# ---------------------------------------------------------------------------
# 处理单段语音
# ---------------------------------------------------------------------------
def process_speech_segment(
segment: np.ndarray,
feature_extractor: WhisperFeatureExtractor,
encoder_session,
rk_llm: RKLLMRuntime,
collector: StreamingTextCollector,
chunk_frames: int,
all_transcripts: list[str],
stream_output: bool,
):
"""对一段语音执行完整的 ASR 流水线."""
t0 = now()
duration_sec = len(segment) / SAMPLE_RATE
# 1. 提取 mel 特征
try:
input_features, feature_len = extract_mel_features(feature_extractor, segment)
except Exception as e:
print(f"\n[特征提取失败] {e}", flush=True)
return
# 2. 音频编码器
try:
audio_features = run_audio_encoder(
session=encoder_session,
input_features=input_features,
feature_len=feature_len,
chunk_frames=chunk_frames,
)
except Exception as e:
print(f"\n[编码器错误] {e}", flush=True)
return
if audio_features.shape[0] == 0:
return
# 3. 清除 KV 缓存, 重置收集器
try:
rk_llm.clear_kv_cache(keep_system_prompt=True)
except Exception:
pass
collector.reset()
# 4. LLM 解码
if stream_output:
print(f"\n[语音 {duration_sec:.1f}s] ", end="", flush=True)
try:
run_rkllm(rk_llm=rk_llm, audio_features=audio_features)
except Exception as e:
print(f"\n[解码错误] {e}", flush=True)
return
elapsed = now() - t0
if collector.error:
if stream_output:
print("[识别失败]", flush=True)
return
text = collector.text.strip()
if text:
all_transcripts.append(text)
if not stream_output:
print(f"\n[语音 {duration_sec:.1f}s] {text}", flush=True)
print(f"(耗时 {elapsed:.2f}s, RTF={elapsed/duration_sec:.2f})", flush=True)
else:
print(f"\n[语音 {duration_sec:.1f}s] (无识别结果)", flush=True)
# ---------------------------------------------------------------------------
# 参数解析
# ---------------------------------------------------------------------------
def parse_args():
parser = argparse.ArgumentParser(
description="实时音频采集与语音识别 (Qwen3-ASR RKNN/RKLLM)"
)
parser.add_argument(
"--model-path", type=str, default=".",
help="原始 Qwen3-ASR 模型目录路径."
)
parser.add_argument(
"--encoder-model-path", type=str, default=DEFAULT_ENCODER_PATH,
help="音频编码器模型路径 (.rknn)."
)
parser.add_argument(
"--llm-model-path", type=str, default=DEFAULT_LLM_PATH,
help="LLM 模型路径 (.rkllm)."
)
parser.add_argument(
"--chunk-frames", type=int, default=100,
help="Mel 特征分块帧数."
)
parser.add_argument(
"--max-new-tokens", type=int, default=1024,
help="最大生成 token 数."
)
parser.add_argument(
"--max-context-len", type=int, default=4096,
help="最大上下文长度."
)
parser.add_argument(
"--top-k", type=int, default=5,
help="RKLLM Top-k 采样参数."
)
parser.add_argument(
"--system-prompt", type=str, default="",
help="系统提示词."
)
parser.add_argument(
"--force-language", type=str, default="Chinese",
help="强制语言, 例如 'Chinese' (默认: Chinese). 设为空字符串禁用."
)
parser.add_argument(
"--device", type=str, default=None,
help="ALSA 录音设备名, 如 'hw:0,0' 或 'plughw:1,0' (使用 --list-devices 查看)."
)
parser.add_argument(
"--list-devices", action="store_true",
help="列出音频输入设备并退出."
)
parser.add_argument(
"--channels", type=int, default=2,
help="录音声道数 (默认: 2). 如果你的麦克风是立体声设备, 请设为 2."
)
parser.add_argument(
"--block-size", type=int, default=480,
help="音频采集块大小 (样本). 480=30ms@16kHz."
)
parser.add_argument(
"--speech-threshold", type=float, default=0.01,
help="VAD 语音能量阈值 (0~1)."
)
parser.add_argument(
"--silence-duration", type=float, default=1.0,
help="静音持续秒数, 超过视为语音结束."
)
parser.add_argument(
"--min-speech-duration", type=float, default=0.3,
help="最短语音秒数, 短于此的忽略."
)
parser.add_argument(
"--no-stream", action="store_true",
help="禁用流式输出 (仅在整句结束后显示)."
)
parser.add_argument(
"--save-text", type=str, default=None,
help="保存识别文本到文件."
)
parser.add_argument(
"--save-debug-audio", type=str, default=None,
help="调试: 保存采集的原始音频到指定路径 (.wav), 可用 e2e 脚本跑同一段音频对比效果."
)
return parser.parse_args()
# ---------------------------------------------------------------------------
# 主函数
# ---------------------------------------------------------------------------
def main():
args = parse_args()
if args.list_devices:
list_audio_devices()
return
stream_output = not args.no_stream
# ---------- 加载模型 ----------
print("=" * 60)
print("实时语音识别系统 (Qwen3-ASR + RKNN/RKLLM)")
print("=" * 60)
print("\n[1/3] 加载 Whisper 特征提取器 ...")
t_load = now()
feature_extractor = WhisperFeatureExtractor.from_pretrained(args.model_path)
print("[2/3] 加载音频编码器 (RKNN) ...")
encoder_session = ort.InferenceSession(
args.encoder_model_path,
provider_options=[{"schedule": [0, 1, 2]}],
)
print("[3/3] 加载语言模型 (RKLLM) ...")
# 空字符串视为不强制语言
lang = args.force_language if args.force_language else None
rk_llm, collector = load_rkllm(
llm_model_path=args.llm_model_path,
max_new_tokens=args.max_new_tokens,
max_context_len=args.max_context_len,
top_k=args.top_k,
system_prompt=args.system_prompt,
force_language=lang,
stream_output=stream_output,
)
load_time = now() - t_load
print(f"模型加载完成 (耗时 {load_time:.1f}s)")
# ---------- 初始化 VAD ----------
frame_duration_ms = int(args.block_size / SAMPLE_RATE * 1000)
vad = EnergyVAD(
sample_rate=SAMPLE_RATE,
frame_duration_ms=frame_duration_ms,
speech_threshold=args.speech_threshold,
silence_duration_sec=args.silence_duration,
min_speech_duration_sec=args.min_speech_duration,
)
# ---------- 初始化音频采集 ----------
capture = AudioCapture(
sample_rate=SAMPLE_RATE,
block_size=args.block_size,
device=args.device,
channels=args.channels,
)
capture.start()
print()
print("=" * 60)
print("🎤 实时流式识别已启动 (VAD 驱动模式)")
print(" 按 Ctrl+C 退出。")
print(" 说话 → 自动检测语音段 → mel → encoder → LLM → 输出识别文本")
print(f" VAD: 能量阈值={args.speech_threshold}, "
f"静音超{args.silence_duration}s判定结束, "
f"最短语音{args.min_speech_duration}s")
if lang:
print(f" 语言: {lang}")
print(f" 音频预处理: DC去除 + 预加重 + 峰值归一化")
print(f" chunk_frames={args.chunk_frames}, "
f"max_new_tokens={args.max_new_tokens}")
print("=" * 60)
print()
audio_buffer: list[np.ndarray] = [] # 全量音频 (供 --save-debug-audio 使用)
all_transcripts: list[str] = []
level_meter = AudioLevelMeter()
print() # 为电平表留空行
try:
while True:
frame = capture.read(timeout=0.05)
if frame is None:
level_meter.draw()
continue
audio_buffer.append(frame.copy())
level_meter.feed(frame)
level_meter.draw()
# ---- VAD 检测 ----
speech_ended, segment = vad.add_frame(frame)
if speech_ended and segment is not None:
level_meter.hide()
dur = len(segment) / SAMPLE_RATE
print(f"\n[🎤 检测到语音 {dur:.1f}s]", flush=True)
segment_processed = preprocess_audio(segment, SAMPLE_RATE)
process_speech_segment(
segment=segment_processed,
feature_extractor=feature_extractor,
encoder_session=encoder_session,
rk_llm=rk_llm,
collector=collector,
chunk_frames=args.chunk_frames,
all_transcripts=all_transcripts,
stream_output=stream_output,
)
except KeyboardInterrupt:
level_meter.hide()
print("\n\n正在退出 ...")
finally:
level_meter.hide()
capture.stop()
# 处理 VAD 中尚未完结的残余语音
remaining = vad.get_current_audio()
if remaining is not None and len(remaining) >= int(SAMPLE_RATE * args.min_speech_duration):
dur = len(remaining) / SAMPLE_RATE
print(f"\n[处理残余语音 {dur:.1f}s ...]", flush=True)
try:
remaining_processed = preprocess_audio(remaining, SAMPLE_RATE)
process_speech_segment(
segment=remaining_processed,
feature_extractor=feature_extractor,
encoder_session=encoder_session,
rk_llm=rk_llm,
collector=collector,
chunk_frames=args.chunk_frames,
all_transcripts=all_transcripts,
stream_output=stream_output,
)
except Exception as e:
print(f" [错误] {e}", flush=True)
# 汇总
final_text = " ".join(all_transcripts) if all_transcripts else ""
print()
print("=" * 60)
print("识别结束。")
if all_transcripts:
for i, t in enumerate(all_transcripts, 1):
print(f" [{i}] {t}")
if final_text:
print(f" 合并: {final_text}")
print("=" * 60)
if args.save_text and final_text:
savepath = Path(args.save_text)
savepath.parent.mkdir(parents=True, exist_ok=True)
savepath.write_text(final_text, encoding="utf-8")
print(f"\n识别文本已保存到: {savepath.resolve()}")
if args.save_debug_audio and audio_buffer:
all_audio = np.concatenate(audio_buffer)
debugpath = Path(args.save_debug_audio)
debugpath.parent.mkdir(parents=True, exist_ok=True)
saved = False
try:
import soundfile as sf
sf.write(str(debugpath), all_audio.astype(np.float32), SAMPLE_RATE)
saved = True
except ImportError:
try:
import scipy.io.wavfile as wavfile
int_audio = (all_audio * 32767).clip(-32768, 32767).astype(np.int16)
wavfile.write(str(debugpath), SAMPLE_RATE, int_audio)
saved = True
except ImportError:
rawpath = debugpath.with_suffix(".raw")
all_audio.astype(np.float32).tofile(str(rawpath))
print(f"\n调试原始PCM已保存到: {rawpath.resolve()}")
print(f" (如需 .wav, 请安装 soundfile: pip install soundfile)")
if saved:
print(f"\n调试音频已保存到: {debugpath.resolve()}")
print(f" 可用以下命令对比效果:")
print(f" python run_qwen3_asr_e2e.py --audio-path {debugpath}")
if __name__ == "__main__":
main()