daihui.zhang commited on
Commit
9694503
·
1 Parent(s): 22453ce

fix bug of queue lock

Browse files
Files changed (1) hide show
  1. transcribe/serve.py +53 -43
transcribe/serve.py CHANGED
@@ -37,7 +37,8 @@ class WhisperTranscriptionService:
37
  # 音频处理相关
38
  self.sample_rate = config.SAMPLE_RATE
39
 
40
- self.lock = threading.Lock()
 
41
  # 文本分隔符,根据语言设置
42
  self.text_separator = get_text_separator(language)
43
  self.loop = asyncio.get_event_loop()
@@ -72,61 +73,70 @@ class WhisperTranscriptionService:
72
  def _read_frame_processing_loop(self) -> None:
73
  """从队列获取音频帧并合并到缓冲区"""
74
  while not self._stop.is_set():
75
- try:
76
- frame_np = self.frame_queue.get(timeout=0.1)
77
- frame_np, speech_status = self._apply_voice_activity_detection(frame_np)
78
 
79
- if frame_np is None:
80
- continue
81
- # logger.critical(f"frame np:{frame_np.shape}, {speech_status}")
82
- with self.lock:
83
- self.frames_np = np.append(self.frames_np, frame_np)
84
-
85
- # 音频开始时间节点 用来统计时间来 达到最小断句时间长度
86
- if speech_status == "START" and self.frames_np_start_timestamp is None:
87
- self.frames_np_start_timestamp = time.time()
88
 
89
- # 音频最长时间缓冲区限制,超过了就强制断句
90
- if len(self.frames_np) >= self.sample_rate * config.MAX_SPEECH_DURATION_S:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
91
  audio_array=self.frames_np.copy()
92
- self.full_segments_queue.appendleft(audio_array) # 根据时间是否满足三秒长度 来整合音频块
93
- self.frames_np_start_timestamp = time.time()
94
  self.frames_np = np.array([], dtype=np.float32)
95
 
96
- # 音频结束信号的时候 整合当前缓冲区
97
- # START -- END -- START -- END 通常
98
- # START -- END -- END end块带有音频信息的通常是4096内断的一个短音
99
- elif speech_status == "END" and len(self.frames_np) > 0 and self.frames_np_start_timestamp:
100
- time_diff = time.time() - self.frames_np_start_timestamp
101
- if time_diff >= config.FRAME_SCOPE_TIME_THRESHOLD:
102
- audio_array=self.frames_np.copy()
103
- self.full_segments_queue.appendleft(audio_array) # 根据时间是否满足三秒长度 来整合音频块
104
- self.frames_np_start_timestamp = None
105
- self.frames_np = np.array([], dtype=np.float32)
106
- else:
107
- logger.debug(f"🥳 当前时间与上一句的时间差: {time_diff:.2f}s,继续保留在缓冲区")
108
-
109
- except queue.Empty:
110
- pass
111
 
112
  def _transcription_processing_loop(self) -> None:
113
  """主转录处理循环"""
114
  frame_epoch = 1
115
 
116
  while not self._stop.is_set():
117
-
118
- if len(self.frames_np) ==0:
119
- time.sleep(0.1)
120
- continue
121
-
122
- with self.lock:
123
- if len(self.full_segments_queue) > 0:
124
- audio_buffer = self.full_segments_queue.pop()
125
- partial = False
126
- else:
 
127
  audio_buffer = self.frames_np[:int(frame_epoch * 1.5 * self.sample_rate)].copy()# 获取 1.5s * epoch 个音频长度
128
  partial = True
129
 
 
 
 
130
  if len(audio_buffer) < int(self.sample_rate):
131
  # Add a small buffer (e.g., 10ms worth of samples) to be safe
132
  padding_samples = int(self.sample_rate * 0.01) # e.g., 160 samples for 10ms at 16kHz
@@ -137,7 +147,7 @@ class WhisperTranscriptionService:
137
  silence_audio[-copy_length:] = audio_buffer[-copy_length:] # Copy from the end of audio_buffer
138
  audio_buffer = silence_audio
139
 
140
- logger.debug(f"audio buffer size: {len(audio_buffer) / self.sample_rate:.2f}s")
141
  meta_item = self._transcribe_audio(audio_buffer)
142
  segments = meta_item.segments
143
  logger.debug(f"Segments: {segments}")
 
37
  # 音频处理相关
38
  self.sample_rate = config.SAMPLE_RATE
39
 
40
+ self.frame_lock = threading.Lock()
41
+ self.segment_lock = threading.Lock()
42
  # 文本分隔符,根据语言设置
43
  self.text_separator = get_text_separator(language)
44
  self.loop = asyncio.get_event_loop()
 
73
  def _read_frame_processing_loop(self) -> None:
74
  """从队列获取音频帧并合并到缓冲区"""
75
  while not self._stop.is_set():
76
+ frame_np = self.frame_queue.get()
77
+ frame_np, speech_status = self._apply_voice_activity_detection(frame_np)
 
78
 
79
+ if frame_np is None:
80
+ continue
 
 
 
 
 
 
 
81
 
82
+ with self.frame_lock:
83
+ self.frames_np = np.append(self.frames_np, frame_np)
84
+
85
+ # 音频开始时间节点 用来统计时间来 达到最小断句时间长度
86
+ if speech_status == "START" and self.frames_np_start_timestamp is None:
87
+ self.frames_np_start_timestamp = time.time()
88
+
89
+ # 音频最长时间缓冲区限制,超过了就强制断句
90
+ if len(self.frames_np) >= self.sample_rate * config.MAX_SPEECH_DURATION_S:
91
+ audio_array=self.frames_np.copy()
92
+ with self.segment_lock:
93
+ self.full_segments_queue.appendleft(audio_array) # 根据时间是否满足三秒长度 来整合音频块
94
+ self.frames_np_start_timestamp = time.time()
95
+
96
+ with self.frame_lock:
97
+ self.frames_np = np.array([], dtype=np.float32)
98
+
99
+ # 音频结束信号的时候 整合当前缓冲区
100
+ # START -- END -- START -- END 通常
101
+ # START -- END -- END end块带有音频信息的通常是4096内断的一个短音
102
+ if speech_status == "END" and len(self.frames_np) > 0 and self.frames_np_start_timestamp:
103
+ time_diff = time.time() - self.frames_np_start_timestamp
104
+ if time_diff >= config.FRAME_SCOPE_TIME_THRESHOLD:
105
+ with self.frame_lock:
106
  audio_array=self.frames_np.copy()
 
 
107
  self.frames_np = np.array([], dtype=np.float32)
108
 
109
+ with self.segment_lock:
110
+ self.full_segments_queue.appendleft(audio_array) # 根据时间是否满足三秒长度 来整合音频块
111
+ logger.debug(f"🥳 增加整句到队列")
112
+ self.frames_np_start_timestamp = None
113
+
114
+ else:
115
+ logger.debug(f"🥳 当前时间与上一句的时间差: {time_diff:.2f}s,继续保留在缓冲区")
116
+
 
 
 
 
 
 
 
117
 
118
  def _transcription_processing_loop(self) -> None:
119
  """主转录处理循环"""
120
  frame_epoch = 1
121
 
122
  while not self._stop.is_set():
123
+ time.sleep(0.1)
124
+
125
+ with self.segment_lock:
126
+ segment_length = len(self.full_segments_queue)
127
+ if segment_length > 0:
128
+ audio_buffer = self.full_segments_queue.pop()
129
+ partial = False
130
+ else:
131
+ with self.frame_lock:
132
+ if len(self.frames_np) ==0:
133
+ continue
134
  audio_buffer = self.frames_np[:int(frame_epoch * 1.5 * self.sample_rate)].copy()# 获取 1.5s * epoch 个音频长度
135
  partial = True
136
 
137
+ logger.debug(f"full_segments_queue size: {segment_length}")
138
+ logger.debug(f"audio buffer size: {len(self.frames_np) / self.sample_rate:.2f}s")
139
+
140
  if len(audio_buffer) < int(self.sample_rate):
141
  # Add a small buffer (e.g., 10ms worth of samples) to be safe
142
  padding_samples = int(self.sample_rate * 0.01) # e.g., 160 samples for 10ms at 16kHz
 
147
  silence_audio[-copy_length:] = audio_buffer[-copy_length:] # Copy from the end of audio_buffer
148
  audio_buffer = silence_audio
149
 
150
+
151
  meta_item = self._transcribe_audio(audio_buffer)
152
  segments = meta_item.segments
153
  logger.debug(f"Segments: {segments}")