emmajeed commited on
Commit
fa9aec9
·
verified ·
1 Parent(s): df8ba4d

Update transcribe_core.py

Browse files
Files changed (1) hide show
  1. transcribe_core.py +40 -188
transcribe_core.py CHANGED
@@ -16,6 +16,9 @@ import zipfile
16
  import time
17
  from ai_providers import TranscriptionProvider
18
 
 
 
 
19
 
20
  def format_timestamp(seconds: float) -> str:
21
  """Convert seconds to ffmpeg time format (HH:MM:SS.xxx)."""
@@ -25,7 +28,6 @@ def format_timestamp(seconds: float) -> str:
25
  secs = seconds % 60
26
  return f"{hours:02d}:{minutes:02d}:{secs:06.3f}"
27
 
28
-
29
  def check_memory_usage() -> bool:
30
  """Check current memory usage and print warning if too high."""
31
  process = psutil.Process()
@@ -35,194 +37,85 @@ def check_memory_usage() -> bool:
35
  return False
36
  return True
37
 
38
-
39
  def clean_partial_chunks(base_file_path: str) -> None:
40
  """Clean up any existing partial chunks before starting."""
41
  try:
42
  base_name = os.path.splitext(os.path.basename(base_file_path))[0]
43
- output_folder = os.path.dirname(base_file_path)
44
- pattern = f"{base_name}_part*"
45
 
46
- print(f"Cleaning up any existing chunks matching: {pattern}")
47
- for file in os.listdir(output_folder):
48
  if file.startswith(f"{base_name}_part") and file.endswith(".mp3"):
49
- file_path = os.path.join(output_folder, file)
50
  try:
51
  os.remove(file_path)
52
- print(f"Removed existing chunk: {file}")
53
  except Exception as e:
54
  print(f"Warning: Could not remove {file}: {e}")
55
  except Exception as e:
56
  print(f"Warning: Error during cleanup: {e}")
57
 
58
-
59
  def chunk_audio_file(audio_file_path: str, chunk_duration_minutes: int = 25, overlap_seconds: int = 5) -> List[str]:
60
  """Chunks an audio file into smaller parts using ffmpeg streaming."""
61
  chunked_files = []
62
  try:
63
- # Clean up any existing chunks first
64
  clean_partial_chunks(audio_file_path)
65
-
66
- # Get audio duration
67
- print("\nAnalyzing audio file duration...")
68
  duration = get_audio_duration(audio_file_path)
69
- if duration is None:
70
- print("Error: Could not determine audio file duration.")
71
- return chunked_files
72
-
73
  chunk_length = chunk_duration_minutes * 60
74
- overlap = overlap_seconds
75
  start_time = 0
76
  chunk_index = 1
77
-
78
  base_name = os.path.splitext(os.path.basename(audio_file_path))[0]
79
  output_folder = os.path.dirname(audio_file_path)
80
 
81
- total_chunks = int((duration - overlap) / (chunk_length - overlap)) + 1
82
- print(f"\nChunking audio file: {audio_file_path}")
83
- print(f"Total duration: {format_timestamp(duration)}")
84
- print(f"Chunk duration: {chunk_duration_minutes} minutes, Overlap: {overlap_seconds} seconds")
85
- print(f"Estimated number of chunks: {total_chunks}\n")
86
-
87
  while start_time < duration:
88
  if not check_memory_usage():
89
- print("Memory usage too high, waiting before continuing...")
90
  time.sleep(5)
91
  continue
92
 
93
- # Calculate end time for current chunk
94
  end_time = min(start_time + chunk_length, duration)
95
-
96
- # Make sure we don't create a tiny final chunk
97
- if end_time - start_time < 30: # If chunk would be less than 30 seconds
98
- if chunk_index > 1: # If not the first chunk
99
- break # Skip creating this small final chunk
100
- end_time = duration # If it's the first chunk, include all audio
101
 
102
  chunk_file_name = f"{base_name}_part{chunk_index}.mp3"
103
  chunk_file_path = os.path.join(output_folder, chunk_file_name)
104
 
105
- print(f"Creating chunk {chunk_index}/{total_chunks}: {chunk_file_name}")
106
- print(f" Time range: {format_timestamp(start_time)} to {format_timestamp(end_time)}")
107
-
108
  try:
109
- # Use ffmpeg to extract chunk
110
- if os.path.exists(chunk_file_path):
111
- os.remove(chunk_file_path)
112
-
113
  stream = ffmpeg.input(audio_file_path, ss=start_time, t=end_time-start_time)
114
  stream = ffmpeg.output(stream, chunk_file_path, acodec='libmp3lame', loglevel='error')
115
- ffmpeg.run(stream, capture_stdout=True, capture_stderr=True, overwrite_output=True)
116
 
117
  if os.path.exists(chunk_file_path):
118
- chunk_size = os.path.getsize(chunk_file_path) / (1024 * 1024)
119
- print(f" ✓ Saved chunk: {chunk_file_path} ({chunk_size:.2f}MB)")
120
  chunked_files.append(chunk_file_path)
121
  chunk_index += 1
122
- else:
123
- print(f" ✗ Error: Chunk file was not created")
124
- break
125
-
126
  except ffmpeg.Error as e:
127
- print(f" ✗ Error processing chunk: {e.stderr.decode() if e.stderr else str(e)}")
128
  break
129
 
130
- # Update start time for next chunk, considering overlap
131
- if end_time == duration: # If this was the last chunk
132
  break
133
- start_time = end_time - overlap
134
-
135
- # Force garbage collection after each chunk
136
  gc.collect()
137
 
138
- created_chunks = chunk_index - 1
139
- print(f"\nAudio file chunking completed:")
140
- print(f"- Created {created_chunks} out of {total_chunks} expected chunks")
141
- print(f"- Final chunk duration: {format_timestamp(end_time - start_time)}")
142
-
143
  except Exception as e:
144
  print(f"Error during audio chunking: {e}")
145
-
146
  return chunked_files
147
 
148
-
149
  def get_audio_duration(file_path: str) -> float:
150
  """Get the duration of an audio file using ffmpeg."""
151
- try:
152
- probe = ffmpeg.probe(file_path)
153
- duration = float(probe['format']['duration'])
154
- return duration
155
- except Exception as e:
156
- raise Exception(f"Error getting audio duration: {e}")
157
-
158
 
159
  def generate_transcription(audio_file_path: str, provider: TranscriptionProvider) -> str:
160
- """
161
- Generate transcription using the configured AI provider.
162
-
163
- Args:
164
- audio_file_path: Path to audio file
165
- provider: TranscriptionProvider instance (Gemini or HuggingFace)
166
-
167
- Returns:
168
- Transcription text (with timestamps/speakers for Gemini, plain text for HF)
169
- """
170
- try:
171
- return provider.transcribe(audio_file_path)
172
- except Exception as e:
173
- raise Exception(f"Error during transcription: {e}")
174
-
175
 
176
  def generate_summary(transcription_text: str, provider: TranscriptionProvider) -> str:
177
- """
178
- Generate a concise 2-3 sentence summary using the configured provider.
179
-
180
- Args:
181
- transcription_text: Full transcription
182
- provider: TranscriptionProvider instance
183
-
184
- Returns:
185
- Summary text
186
- """
187
- try:
188
- return provider.generate_summary(transcription_text)
189
- except Exception as e:
190
- return f"Error generating summary: {e}"
191
-
192
 
193
  def generate_key_ideas(transcription_text: str, provider: TranscriptionProvider) -> List[Dict[str, str]]:
194
- """
195
- Identify 3-5 key ideas from the transcription using the configured provider.
196
-
197
- Args:
198
- transcription_text: Full transcription
199
- provider: TranscriptionProvider instance
200
-
201
- Returns:
202
- List of {idea, description} dictionaries
203
- """
204
- try:
205
- return provider.generate_key_ideas(transcription_text)
206
- except Exception as e:
207
- return [{'idea': 'Error generating key ideas', 'description': str(e)}]
208
-
209
 
210
  def create_transcript_markdown(audio_filename: str, transcription: str, summary: str, key_ideas: List[Dict[str, str]]) -> str:
211
- """
212
- Create a formatted markdown file with YAML frontmatter.
213
-
214
- Args:
215
- audio_filename: Name of the audio file
216
- transcription: Full transcription text
217
- summary: Summary text
218
- key_ideas: List of key ideas
219
-
220
- Returns:
221
- Formatted markdown content
222
- """
223
  base_name = os.path.splitext(audio_filename)[0]
224
-
225
- # Build YAML frontmatter
226
  yaml_metadata = {
227
  'title': base_name,
228
  'audio_file': audio_filename,
@@ -231,99 +124,58 @@ def create_transcript_markdown(audio_filename: str, transcription: str, summary:
231
  'key_ideas': key_ideas,
232
  'note_id': str(uuid.uuid4())
233
  }
234
-
235
  yaml_frontmatter = "---\n" + yaml.dump(yaml_metadata, sort_keys=False, indent=2, allow_unicode=True) + "---\n\n"
236
-
237
- # Build content sections
238
- content = yaml_frontmatter
239
-
240
- # Key ideas section
241
- content += "## Key Ideas\n\n"
242
- if key_ideas:
243
- for idea_item in key_ideas:
244
- if idea_item['description']:
245
- content += f"- **{idea_item['idea']}:** {idea_item['description']}\n"
246
- else:
247
- content += f"- **{idea_item['idea']}**\n"
248
- else:
249
- content += "*(No key ideas generated)*\n"
250
-
251
- content += "\n## Full Transcription\n\n"
252
- content += transcription
253
-
254
  return content
255
 
256
-
257
  def process_audio_file(audio_file_path: str, gemini_provider: TranscriptionProvider, openrouter_provider: TranscriptionProvider = None, progress_callback=None) -> Tuple[str, str]:
258
- # 1. SETUP ABSOLUTE PATH (Keep this)
259
- current_dir = os.path.dirname(os.path.abspath(__file__))
260
- output_dir = os.path.join(current_dir, "outputs")
261
- os.makedirs(output_dir, exist_ok=True)
262
 
263
  audio_filename = os.path.basename(audio_file_path)
264
  base_name = os.path.splitext(audio_filename)[0]
265
-
266
  file_size_mb = os.path.getsize(audio_file_path) / (1024 * 1024)
267
 
268
  files_to_transcribe = []
269
  if file_size_mb > 30:
270
- if progress_callback:
271
- progress_callback("📦 Chunking large audio file...", 0.1)
272
- chunked_files = chunk_audio_file(audio_file_path)
273
- files_to_transcribe.extend(chunked_files)
274
  else:
275
  files_to_transcribe.append(audio_file_path)
276
 
277
  markdown_files = []
278
- total_files = len(files_to_transcribe)
279
-
280
  for idx, file_path in enumerate(files_to_transcribe, 1):
281
- file_name = os.path.basename(file_path)
282
-
283
- if progress_callback:
284
- progress = 0.2 + (0.6 * (idx - 1) / total_files)
285
- progress_callback(f"🎙️ Transcribing part {idx}/{total_files}...", progress)
286
 
287
  transcription = generate_transcription(file_path, gemini_provider)
288
-
289
  text_provider = openrouter_provider if openrouter_provider else gemini_provider
290
  summary = generate_summary(transcription, text_provider)
291
  key_ideas = generate_key_ideas(transcription, text_provider)
292
 
293
- markdown_content = create_transcript_markdown(file_name, transcription, summary, key_ideas)
294
 
295
- # 2. FIX: Use the absolute output_dir established at the top
296
- output_filename = os.path.splitext(file_name)[0] + ".md"
297
- markdown_path = os.path.join(output_dir, output_filename)
298
 
299
  with open(markdown_path, 'w', encoding='utf-8') as f:
300
  f.write(markdown_content)
301
-
302
  markdown_files.append(markdown_path)
303
 
304
- if "_part" in file_name:
305
- try:
306
- os.remove(file_path)
307
- except Exception as e:
308
- print(f"Warning: Could not delete chunk {file_name}: {e}")
309
-
310
  if len(markdown_files) == 1:
311
  return markdown_files[0], "False"
312
  else:
313
- if progress_callback:
314
- progress_callback("📦 Creating ZIP file...", 0.9)
315
-
316
- # 3. FIX: Use absolute zip path
317
- zip_filename = f"{base_name}_transcripts.zip"
318
- zip_path = os.path.join(output_dir, zip_filename)
319
-
320
  with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
321
  for md_file in markdown_files:
322
- basename = os.path.basename(md_file)
323
- zipf.write(md_file, basename)
324
- try:
325
- os.remove(md_file)
326
- except Exception as e:
327
- print(f"Warning: Could not delete {md_file}: {e}")
328
-
329
  return zip_path, "True"
 
16
  import time
17
  from ai_providers import TranscriptionProvider
18
 
19
+ # Define absolute output directory relative to this file
20
+ CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
21
+ OUTPUT_DIR = os.path.join(CURRENT_DIR, "outputs")
22
 
23
  def format_timestamp(seconds: float) -> str:
24
  """Convert seconds to ffmpeg time format (HH:MM:SS.xxx)."""
 
28
  secs = seconds % 60
29
  return f"{hours:02d}:{minutes:02d}:{secs:06.3f}"
30
 
 
31
  def check_memory_usage() -> bool:
32
  """Check current memory usage and print warning if too high."""
33
  process = psutil.Process()
 
37
  return False
38
  return True
39
 
 
40
  def clean_partial_chunks(base_file_path: str) -> None:
41
  """Clean up any existing partial chunks before starting."""
42
  try:
43
  base_name = os.path.splitext(os.path.basename(base_file_path))[0]
44
+ # Ensure we look in the same directory as the audio file for chunks
45
+ chunk_folder = os.path.dirname(base_file_path)
46
 
47
+ for file in os.listdir(chunk_folder):
 
48
  if file.startswith(f"{base_name}_part") and file.endswith(".mp3"):
49
+ file_path = os.path.join(chunk_folder, file)
50
  try:
51
  os.remove(file_path)
 
52
  except Exception as e:
53
  print(f"Warning: Could not remove {file}: {e}")
54
  except Exception as e:
55
  print(f"Warning: Error during cleanup: {e}")
56
 
 
57
  def chunk_audio_file(audio_file_path: str, chunk_duration_minutes: int = 25, overlap_seconds: int = 5) -> List[str]:
58
  """Chunks an audio file into smaller parts using ffmpeg streaming."""
59
  chunked_files = []
60
  try:
 
61
  clean_partial_chunks(audio_file_path)
 
 
 
62
  duration = get_audio_duration(audio_file_path)
63
+
 
 
 
64
  chunk_length = chunk_duration_minutes * 60
 
65
  start_time = 0
66
  chunk_index = 1
67
+
68
  base_name = os.path.splitext(os.path.basename(audio_file_path))[0]
69
  output_folder = os.path.dirname(audio_file_path)
70
 
 
 
 
 
 
 
71
  while start_time < duration:
72
  if not check_memory_usage():
 
73
  time.sleep(5)
74
  continue
75
 
 
76
  end_time = min(start_time + chunk_length, duration)
77
+ if end_time - start_time < 30 and chunk_index > 1:
78
+ break
 
 
 
 
79
 
80
  chunk_file_name = f"{base_name}_part{chunk_index}.mp3"
81
  chunk_file_path = os.path.join(output_folder, chunk_file_name)
82
 
 
 
 
83
  try:
 
 
 
 
84
  stream = ffmpeg.input(audio_file_path, ss=start_time, t=end_time-start_time)
85
  stream = ffmpeg.output(stream, chunk_file_path, acodec='libmp3lame', loglevel='error')
86
+ ffmpeg.run(stream, overwrite_output=True)
87
 
88
  if os.path.exists(chunk_file_path):
 
 
89
  chunked_files.append(chunk_file_path)
90
  chunk_index += 1
 
 
 
 
91
  except ffmpeg.Error as e:
 
92
  break
93
 
94
+ if end_time == duration:
 
95
  break
96
+ start_time = end_time - overlap_seconds
 
 
97
  gc.collect()
98
 
 
 
 
 
 
99
  except Exception as e:
100
  print(f"Error during audio chunking: {e}")
 
101
  return chunked_files
102
 
 
103
  def get_audio_duration(file_path: str) -> float:
104
  """Get the duration of an audio file using ffmpeg."""
105
+ probe = ffmpeg.probe(file_path)
106
+ return float(probe['format']['duration'])
 
 
 
 
 
107
 
108
  def generate_transcription(audio_file_path: str, provider: TranscriptionProvider) -> str:
109
+ return provider.transcribe(audio_file_path)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
110
 
111
  def generate_summary(transcription_text: str, provider: TranscriptionProvider) -> str:
112
+ return provider.generate_summary(transcription_text)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
113
 
114
  def generate_key_ideas(transcription_text: str, provider: TranscriptionProvider) -> List[Dict[str, str]]:
115
+ return provider.generate_key_ideas(transcription_text)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
116
 
117
  def create_transcript_markdown(audio_filename: str, transcription: str, summary: str, key_ideas: List[Dict[str, str]]) -> str:
 
 
 
 
 
 
 
 
 
 
 
 
118
  base_name = os.path.splitext(audio_filename)[0]
 
 
119
  yaml_metadata = {
120
  'title': base_name,
121
  'audio_file': audio_filename,
 
124
  'key_ideas': key_ideas,
125
  'note_id': str(uuid.uuid4())
126
  }
 
127
  yaml_frontmatter = "---\n" + yaml.dump(yaml_metadata, sort_keys=False, indent=2, allow_unicode=True) + "---\n\n"
128
+ content = yaml_frontmatter + "## Key Ideas\n\n"
129
+ for idea_item in key_ideas:
130
+ content += f"- **{idea_item['idea']}:** {idea_item['description']}\n"
131
+ content += "\n## Full Transcription\n\n" + transcription
 
 
 
 
 
 
 
 
 
 
 
 
 
 
132
  return content
133
 
 
134
  def process_audio_file(audio_file_path: str, gemini_provider: TranscriptionProvider, openrouter_provider: TranscriptionProvider = None, progress_callback=None) -> Tuple[str, str]:
135
+ # Ensure the absolute output directory exists
136
+ os.makedirs(OUTPUT_DIR, exist_ok=True)
 
 
137
 
138
  audio_filename = os.path.basename(audio_file_path)
139
  base_name = os.path.splitext(audio_filename)[0]
 
140
  file_size_mb = os.path.getsize(audio_file_path) / (1024 * 1024)
141
 
142
  files_to_transcribe = []
143
  if file_size_mb > 30:
144
+ if progress_callback: progress_callback("📦 Chunking file...", 0.1)
145
+ files_to_transcribe = chunk_audio_file(audio_file_path)
 
 
146
  else:
147
  files_to_transcribe.append(audio_file_path)
148
 
149
  markdown_files = []
 
 
150
  for idx, file_path in enumerate(files_to_transcribe, 1):
151
+ if progress_callback: progress_callback(f"🎙️ Transcribing {idx}/{len(files_to_transcribe)}...", 0.2 + (0.6 * idx/len(files_to_transcribe)))
 
 
 
 
152
 
153
  transcription = generate_transcription(file_path, gemini_provider)
 
154
  text_provider = openrouter_provider if openrouter_provider else gemini_provider
155
  summary = generate_summary(transcription, text_provider)
156
  key_ideas = generate_key_ideas(transcription, text_provider)
157
 
158
+ markdown_content = create_transcript_markdown(os.path.basename(file_path), transcription, summary, key_ideas)
159
 
160
+ # Use the global absolute OUTPUT_DIR
161
+ output_filename = os.path.splitext(os.path.basename(file_path))[0] + ".md"
162
+ markdown_path = os.path.join(OUTPUT_DIR, output_filename)
163
 
164
  with open(markdown_path, 'w', encoding='utf-8') as f:
165
  f.write(markdown_content)
 
166
  markdown_files.append(markdown_path)
167
 
168
+ if "_part" in file_path:
169
+ try: os.remove(file_path)
170
+ except: pass
171
+
 
 
172
  if len(markdown_files) == 1:
173
  return markdown_files[0], "False"
174
  else:
175
+ zip_path = os.path.join(OUTPUT_DIR, f"{base_name}_transcripts.zip")
 
 
 
 
 
 
176
  with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
177
  for md_file in markdown_files:
178
+ zipf.write(md_file, os.path.basename(md_file))
179
+ try: os.remove(md_file)
180
+ except: pass
 
 
 
 
181
  return zip_path, "True"