Grinding commited on
Commit
a617e93
·
verified ·
1 Parent(s): f16db0b

Update app/processing.py

Browse files
Files changed (1) hide show
  1. app/processing.py +35 -39
app/processing.py CHANGED
@@ -65,6 +65,8 @@ async def transcribe_chunk(chunk_index: int, audio_chunk: AudioSegment):
65
  with io.BytesIO() as chunk_bytes:
66
  audio_chunk.export(chunk_bytes, format="wav")
67
  chunk_bytes.seek(0)
 
 
68
 
69
  transcription = await asyncio.to_thread(
70
  groq_client.audio.transcriptions.create,
@@ -73,7 +75,6 @@ async def transcribe_chunk(chunk_index: int, audio_chunk: AudioSegment):
73
  response_format="text"
74
  )
75
  logger.info(f"Finished transcription for chunk {chunk_index + 1}.")
76
- # Return index along with text for sorting after parallel processing
77
  return (chunk_index, transcription)
78
  except Exception as e:
79
  logger.error(f"Error transcribing chunk {chunk_index + 1}: {e}")
@@ -87,58 +88,57 @@ async def run_pipeline(task_id: str, file_path: Path, tasks_db: dict):
87
  try:
88
  logger.info(f"Starting pipeline for task {task_id} with file {file_path}")
89
 
90
- # Make chunk duration configurable via environment variable, default to 120 seconds
91
- CHUNK_DURATION_S = int(os.getenv("CHUNK_DURATION_S", 120))
 
 
92
 
93
- sr = librosa.get_samplerate(str(file_path))
94
- target_sr = 16000 # Resample to 16kHz for Whisper compatibility and smaller file size
 
 
 
 
 
 
 
 
 
 
 
95
 
96
- stream = librosa.stream(
97
- str(file_path),
98
- block_length=int(sr * CHUNK_DURATION_S),
99
- frame_length=4096,
100
- hop_length=1024
101
- )
102
-
103
  transcription_tasks = []
104
- for i, y_chunk in enumerate(stream):
105
- logger.info(f"Queuing audio segment {i+1} for transcription...")
 
 
106
 
107
- # Ensure y_chunk is 2D
108
- if y_chunk.ndim == 1:
109
- y_chunk = y_chunk.reshape(-1, 1)
110
-
111
- # Mix to mono if multi-channel
112
- if y_chunk.shape[1] > 1:
113
- y_chunk = np.mean(y_chunk, axis=1, keepdims=True)
114
 
115
  # Resample to target_sr
116
- if sr != target_sr:
117
- y_chunk = librosa.resample(y_chunk, orig_sr=sr, target_sr=target_sr, axis=0)
118
-
119
- current_sr = target_sr
120
 
121
  pcm_chunk = (y_chunk * 32767).astype(np.int16)
122
 
123
- channels = y_chunk.shape[1] # Should be 1
124
-
125
  audio_segment = AudioSegment(
126
  pcm_chunk.tobytes(),
127
- frame_rate=current_sr,
128
- sample_width=pcm_chunk.dtype.itemsize,
129
- channels=channels
130
  )
 
131
  transcription_tasks.append(transcribe_chunk(i, audio_segment))
132
 
133
- # Clean up memory explicitly
134
- del pcm_chunk, y_chunk, audio_segment
135
  gc.collect()
136
 
137
  # Run all transcription tasks in parallel
138
  logger.info(f"Running {len(transcription_tasks)} transcription tasks in parallel...")
139
  transcription_results = await asyncio.gather(*transcription_tasks)
140
 
141
- # Sort results by their original index and join with newlines
142
  transcription_results.sort(key=lambda x: x[0])
143
  full_transcript = "\n".join([text for index, text in transcription_results])
144
 
@@ -150,22 +150,18 @@ async def run_pipeline(task_id: str, file_path: Path, tasks_db: dict):
150
 
151
  summary_task = asyncio.to_thread(
152
  groq_client.chat.completions.create,
153
- model="qwen/qwen3-32b",
154
  messages=[{"role": "system", "content": SUMMARIZATION_SYSTEM_PROMPT}, {"role": "user", "content": full_transcript}],
155
  temperature=0.2,
156
- reasoning_effort="default",
157
- reasoning_format="hidden",
158
  max_tokens=1024
159
  )
160
 
161
  action_item_task = asyncio.to_thread(
162
  groq_client.chat.completions.create,
163
- model="qwen/qwen3-32b",
164
  messages=[{"role": "system", "content": ACTION_ITEMS_SYSTEM_PROMPT}, {"role": "user", "content": full_transcript}],
165
  temperature=0.1,
166
- reasoning_effort="default",
167
  max_tokens=1024,
168
- reasoning_format="hidden",
169
  response_format={"type": "json_object"}
170
  )
171
 
 
65
  with io.BytesIO() as chunk_bytes:
66
  audio_chunk.export(chunk_bytes, format="wav")
67
  chunk_bytes.seek(0)
68
+ chunk_size = chunk_bytes.getbuffer().nbytes
69
+ logger.info(f"Chunk {chunk_index + 1} size: {chunk_size / (1024 * 1024):.2f} MB")
70
 
71
  transcription = await asyncio.to_thread(
72
  groq_client.audio.transcriptions.create,
 
75
  response_format="text"
76
  )
77
  logger.info(f"Finished transcription for chunk {chunk_index + 1}.")
 
78
  return (chunk_index, transcription)
79
  except Exception as e:
80
  logger.error(f"Error transcribing chunk {chunk_index + 1}: {e}")
 
88
  try:
89
  logger.info(f"Starting pipeline for task {task_id} with file {file_path}")
90
 
91
+ # Get total duration
92
+ duration = librosa.get_duration(filename=str(file_path))
93
+ orig_sr = librosa.get_samplerate(str(file_path))
94
+ logger.info(f"Audio duration: {duration:.2f} seconds, sample rate: {orig_sr}")
95
 
96
+ target_sr = 16000
97
+ max_chunk_mb = 19.5
98
+ max_chunk_bytes = max_chunk_mb * 1024 * 1024
99
+ bytes_per_second = target_sr * 2 * 1 # 16-bit mono
100
+ max_chunk_duration = (max_chunk_bytes - 1000) / bytes_per_second # conservative
101
+
102
+ # Configurable base chunk duration, but cap at max
103
+ base_chunk_duration = int(os.getenv("CHUNK_DURATION_S", 300)) # default 5 minutes
104
+ chunk_duration = min(base_chunk_duration, max_chunk_duration)
105
+ logger.info(f"Using chunk duration: {chunk_duration:.2f} seconds")
106
+
107
+ num_chunks = int(np.ceil(duration / chunk_duration))
108
+ logger.info(f"Number of chunks: {num_chunks}")
109
 
 
 
 
 
 
 
 
110
  transcription_tasks = []
111
+ for i in range(num_chunks):
112
+ offset = i * chunk_duration
113
+ this_dur = min(chunk_duration, duration - offset)
114
+ logger.info(f"Loading audio chunk {i+1} (offset: {offset:.2f}s, duration: {this_dur:.2f}s)")
115
 
116
+ y_chunk, _ = librosa.load(str(file_path), sr=None, mono=True, offset=offset, duration=this_dur)
 
 
 
 
 
 
117
 
118
  # Resample to target_sr
119
+ if _ != target_sr:
120
+ y_chunk = librosa.resample(y_chunk, orig_sr=_, target_sr=target_sr)
 
 
121
 
122
  pcm_chunk = (y_chunk * 32767).astype(np.int16)
123
 
 
 
124
  audio_segment = AudioSegment(
125
  pcm_chunk.tobytes(),
126
+ frame_rate=target_sr,
127
+ sample_width=2,
128
+ channels=1
129
  )
130
+
131
  transcription_tasks.append(transcribe_chunk(i, audio_segment))
132
 
133
+ # Clean up memory
134
+ del y_chunk, pcm_chunk, audio_segment
135
  gc.collect()
136
 
137
  # Run all transcription tasks in parallel
138
  logger.info(f"Running {len(transcription_tasks)} transcription tasks in parallel...")
139
  transcription_results = await asyncio.gather(*transcription_tasks)
140
 
141
+ # Sort results by index and join
142
  transcription_results.sort(key=lambda x: x[0])
143
  full_transcript = "\n".join([text for index, text in transcription_results])
144
 
 
150
 
151
  summary_task = asyncio.to_thread(
152
  groq_client.chat.completions.create,
153
+ model="llama3-70b-8192",
154
  messages=[{"role": "system", "content": SUMMARIZATION_SYSTEM_PROMPT}, {"role": "user", "content": full_transcript}],
155
  temperature=0.2,
 
 
156
  max_tokens=1024
157
  )
158
 
159
  action_item_task = asyncio.to_thread(
160
  groq_client.chat.completions.create,
161
+ model="llama3-70b-8192",
162
  messages=[{"role": "system", "content": ACTION_ITEMS_SYSTEM_PROMPT}, {"role": "user", "content": full_transcript}],
163
  temperature=0.1,
 
164
  max_tokens=1024,
 
165
  response_format={"type": "json_object"}
166
  )
167