staraks commited on
Commit
cbbc496
·
verified ·
1 Parent(s): a2d60a6

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +196 -115
app.py CHANGED
@@ -1,15 +1,20 @@
1
- # @title Default title text
 
 
2
  from docx import Document
3
  import os
4
  import whisper
5
  import gradio as gr
6
  import pyzipper
7
  import glob
8
- import shutil # Import shutil for removing directories
 
 
9
 
10
- # Load default model
11
  model_cache = {}
12
 
 
13
  def save_as_word(text, filename="merged_transcripts.docx"):
14
  """Saves the given text as a Word document."""
15
  document = Document()
@@ -18,155 +23,231 @@ def save_as_word(text, filename="merged_transcripts.docx"):
18
  return filename
19
 
20
 
21
- def transcribe_multiple(audio_list, model_name, advanced, merge_checkbox, zip_file=None, zip_password=None):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22
  log_outputs = []
23
  transcript_outputs_list = []
24
  word_file_path = None
25
  extracted_audio_paths = []
26
  temp_extract_dir = "/tmp/extracted_audio"
27
 
28
- # Clean up previous extractions
 
 
 
29
  if os.path.exists(temp_extract_dir):
30
  try:
31
- shutil.rmtree(temp_extract_dir) # Use shutil.rmtree for removing the directory and its contents
32
  log_outputs.append(f"Cleaned up previous temporary directory: {temp_extract_dir}")
33
  except OSError as e:
34
  log_outputs.append(f"Warning: Could not clean up previous temporary directory {temp_extract_dir}: {e}")
35
 
36
-
37
  if zip_file:
38
- log_outputs.append(f"Processing zip file: {zip_file.name}")
 
39
  try:
40
- with pyzipper.ZipFile(zip_file.name, 'r') as zf:
41
  if zip_password:
42
  try:
43
  zf.setpassword(zip_password.encode())
44
  except RuntimeError:
45
- log_outputs.append("Error: Incorrect password for the zip file.")
46
- # Return immediately on password error
47
- return "\n\n".join(log_outputs), "", None
48
 
49
- # Create the extraction directory if it doesn't exist
50
  os.makedirs(temp_extract_dir, exist_ok=True)
51
-
52
- audio_extensions = ['.mp3', '.wav', '.aac', '.flac', '.ogg', '.dat', '.dct'] # Added .dct extension
53
  extracted_count = 0
54
  for file_info in zf.infolist():
55
  if not file_info.is_dir() and os.path.splitext(file_info.filename)[1].lower() in audio_extensions:
56
- try:
57
- extracted_path = zf.extract(file_info, path=temp_extract_dir)
58
- extracted_audio_paths.append(extracted_path)
59
- log_outputs.append(f"Extracted: {file_info.filename}")
60
- extracted_count += 1
61
- except Exception as e:
62
- log_outputs.append(f"Error extracting {file_info.filename}: {e}")
63
-
 
 
 
 
64
 
65
  if extracted_count == 0:
66
- log_outputs.append("No supported audio files found in the zip archive.")
67
- # If zip was provided but no audio found, return here
68
- if not audio_list:
69
- # Clean up the newly created empty directory before returning
70
- if os.path.exists(temp_extract_dir):
71
- try:
72
- os.rmdir(temp_extract_dir)
73
- log_outputs.append(f"Removed empty temporary directory: {temp_extract_dir}")
74
- except OSError as e:
75
- log_outputs.append(f"Warning: Could not remove empty temporary directory {temp_extract_dir}: {e}")
76
- return "\n\n".join(log_outputs), "", None
77
-
78
 
79
  except pyzipper.BadZipFile:
80
- log_outputs.append(f"Error: Invalid zip file format.")
81
- # Clean up any partial extractions before returning
82
  if os.path.exists(temp_extract_dir):
83
  try:
84
  shutil.rmtree(temp_extract_dir)
85
  log_outputs.append(f"Cleaned up partial temporary directory: {temp_extract_dir}")
86
  except OSError as e:
87
  log_outputs.append(f"Warning: Could not clean up partial temporary directory {temp_extract_dir}: {e}")
88
- return "\n\n".join(log_outputs), "", None
89
- except FileNotFoundError:
90
- log_outputs.append(f"Error: Zip file not found.")
91
- return "\n\n".join(log_outputs), "", None
92
  except Exception as e:
93
- log_outputs.append(f"An unexpected error occurred during zip processing: {e}")
94
- # Clean up any partial extractions before returning
95
- if os.path.exists(temp_extract_dir):
96
- try:
97
- shutil.rmtree(temp_extract_dir)
98
- log_outputs.append(f"Cleaned up partial temporary directory: {temp_extract_dir}")
99
- except OSError as e:
100
- log_outputs.append(f"Warning: Could not clean up partial temporary directory {temp_extract_dir}: {e}")
101
- return "\n\n".join(log_outputs), "", None
102
-
103
- all_audio_files = []
104
- if audio_list:
105
- all_audio_files.extend(audio_list)
 
 
 
 
 
 
106
  if extracted_audio_paths:
107
- for path in extracted_audio_paths:
108
- if os.path.exists(path):
109
- all_audio_files.append(gr.File(path))
110
-
111
-
112
- if not all_audio_files:
113
- log_outputs.append("No audio files provided for transcription.")
114
- # Clean up the temporary directory if it was created but no audio was found
115
- if os.path.exists(temp_extract_dir):
116
- try:
117
- shutil.rmtree(temp_extract_dir)
118
- log_outputs.append(f"Cleaned up temporary directory: {temp_extract_dir}")
119
- except OSError as e:
120
- log_outputs.append(f"Warning: Could not clean up temporary directory {temp_extract_dir}: {e}")
121
- return "\n\n".join(log_outputs), "", None
122
-
123
-
124
- for audio in all_audio_files:
125
- # Load model (cache for reuse)
126
- if model_name not in model_cache:
127
- log_outputs.append(f"Loading model: {model_name}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
128
  try:
129
- model_cache[model_name] = whisper.load_model(model_name)
 
 
 
 
 
130
  except Exception as e:
131
- log_outputs.append(f"Error loading model {model_name}: {e}")
132
- # If model loading fails for the first file, stop processing
133
- # Clean up extracted files before returning
134
- if os.path.exists(temp_extract_dir):
135
- try:
136
- shutil.rmtree(temp_extract_dir)
137
- log_outputs.append(f"Cleaned up temporary directory after model loading error: {temp_extract_dir}")
138
- except OSError as e:
139
- log_outputs.append(f"Warning: Could not clean up temporary directory {temp_extract_dir}: {e}")
140
- return "\n\n".join(log_outputs), "", None
141
 
 
 
 
 
142
 
143
- model = model_cache[model_name]
 
144
 
145
- # Transcribe
146
- try:
147
- log_outputs.append(f"Transcribing: {os.path.basename(audio.name)}")
148
- result = model.transcribe(audio.name)
149
- transcript = result["text"]
150
 
151
- # Save transcript
152
- base = os.path.splitext(os.path.basename(audio.name))[0]
153
- # Save transcripts in the /tmp directory
154
- save_path = os.path.join("/tmp", f"{base}-transcript.txt")
155
 
156
- with open(save_path, "w", encoding="utf-8") as f:
157
- f.write(transcript)
 
 
 
 
 
 
 
 
 
158
 
159
- log = f"File: {os.path.basename(audio.name)}\nSaved to: {save_path}"
160
- log_outputs.append(log)
161
- transcript_outputs_list.append(f"Transcript for {os.path.basename(audio.name)}:\n{transcript}")
162
 
163
  except Exception as e:
164
- log_outputs.append(f"Error processing {os.path.basename(audio.name)}: {e}")
165
- transcript_outputs_list.append(f"Could not transcribe {os.path.basename(audio.name)} due to an error.")
 
 
 
166
 
 
167
  combined_transcript_string = "\n\n---\n\n".join(transcript_outputs_list)
168
 
169
- if merge_checkbox and combined_transcript_string.strip(): # Only create word file if merging and there is content
170
  try:
171
  word_filename = save_as_word(combined_transcript_string)
172
  log_outputs.append(f"Merged transcript saved to: {word_filename}")
@@ -174,22 +255,21 @@ def transcribe_multiple(audio_list, model_name, advanced, merge_checkbox, zip_fi
174
  except Exception as e:
175
  log_outputs.append(f"Error saving merged transcript to Word file: {e}")
176
 
177
-
178
- # Clean up extracted files after processing
179
  if os.path.exists(temp_extract_dir):
180
  try:
181
  shutil.rmtree(temp_extract_dir)
182
  log_outputs.append(f"Cleaned up temporary directory: {temp_extract_dir}")
183
  except OSError as e:
184
- log_outputs.append(f"Warning: Could not clean up temporary directory {temp_extract_extract_dir}: {e}")
185
-
186
 
187
- return "\n\n".join(log_outputs), combined_transcript_string, word_file_path
 
188
 
189
 
190
  # Gradio UI
191
  with gr.Blocks() as demo:
192
- gr.Markdown("## Whisper Transcription Tool (Multiple Files)")
193
 
194
  with gr.Row():
195
  model_dropdown = gr.Dropdown(
@@ -210,10 +290,11 @@ with gr.Blocks() as demo:
210
 
211
  log_output = gr.Textbox(label="Log Output", lines=10)
212
  transcript_output = gr.Textbox(label="Transcripts", lines=20)
213
- word_file_output = gr.File(label="Download Merged Transcript (.docx)", visible=False)
 
214
 
215
  def update_file_visibility(merge_checked):
216
- return gr.File(visible=merge_checked)
217
 
218
  merge_checkbox.change(
219
  update_file_visibility,
@@ -222,11 +303,11 @@ with gr.Blocks() as demo:
222
  api_name="update_file_visibility"
223
  )
224
 
225
-
226
  transcribe_btn.click(
227
  transcribe_multiple,
228
  inputs=[audio_input, model_dropdown, advanced_checkbox, merge_checkbox, zip_input, zip_password_input],
229
- outputs=[log_output, transcript_output, word_file_output]
230
  )
231
 
232
- demo.launch()
 
 
1
+ # Whisper Transcription Tool with .dct support and progress updates
2
+ # Drop-in replacement for your app.py. Paste into your Hugging Face Space.
3
+
4
  from docx import Document
5
  import os
6
  import whisper
7
  import gradio as gr
8
  import pyzipper
9
  import glob
10
+ import shutil
11
+ import tempfile
12
+ from pydub import AudioSegment
13
 
14
+ # Load default model cache
15
  model_cache = {}
16
 
17
+
18
  def save_as_word(text, filename="merged_transcripts.docx"):
19
  """Saves the given text as a Word document."""
20
  document = Document()
 
23
  return filename
24
 
25
 
26
+ def convert_to_wav_if_needed(input_path):
27
+ """
28
+ If the input file is not WAV, try to convert it to WAV using pydub/ffmpeg.
29
+ Returns path to WAV file (may be same as input if already WAV).
30
+ """
31
+ lower = input_path.lower()
32
+ if lower.endswith('.wav'):
33
+ return input_path
34
+
35
+ # create a temp wav file
36
+ tmp_wav = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)
37
+ tmp_wav.close()
38
+ try:
39
+ # pydub will use ffmpeg under the hood
40
+ AudioSegment.from_file(input_path).export(tmp_wav.name, format='wav')
41
+ return tmp_wav.name
42
+ except Exception as e:
43
+ # cleanup if conversion failed
44
+ try:
45
+ os.unlink(tmp_wav.name)
46
+ except Exception:
47
+ pass
48
+ raise e
49
+
50
+
51
+ def transcribe_multiple(file_paths, model_name, advanced, merge_checkbox, zip_file=None, zip_password=None):
52
+ """
53
+ Generator function for Gradio that yields progress updates.
54
+ Outputs: (log_text, transcripts_text, word_file_path_or_None, percent_int)
55
+ """
56
+ # initial state
57
  log_outputs = []
58
  transcript_outputs_list = []
59
  word_file_path = None
60
  extracted_audio_paths = []
61
  temp_extract_dir = "/tmp/extracted_audio"
62
 
63
+ # yield initial empty state (so UI shows up immediately)
64
+ yield "", "", None, 0
65
+
66
+ # cleanup any previous temp dir
67
  if os.path.exists(temp_extract_dir):
68
  try:
69
+ shutil.rmtree(temp_extract_dir)
70
  log_outputs.append(f"Cleaned up previous temporary directory: {temp_extract_dir}")
71
  except OSError as e:
72
  log_outputs.append(f"Warning: Could not clean up previous temporary directory {temp_extract_dir}: {e}")
73
 
74
+ # If a zip is provided, extract supported audio files
75
  if zip_file:
76
+ log_outputs.append(f"Processing zip file: {zip_file}")
77
+ yield "\n\n".join(log_outputs), "", None, 2
78
  try:
79
+ with pyzipper.ZipFile(zip_file, 'r') as zf:
80
  if zip_password:
81
  try:
82
  zf.setpassword(zip_password.encode())
83
  except RuntimeError:
84
+ log_outputs.append("Error: Incorrect password for the zip file.")
85
+ yield "\n\n".join(log_outputs), "", None, 100
86
+ return
87
 
 
88
  os.makedirs(temp_extract_dir, exist_ok=True)
89
+ audio_extensions = ['.mp3', '.wav', '.aac', '.flac', '.ogg', '.dat', '.dct']
 
90
  extracted_count = 0
91
  for file_info in zf.infolist():
92
  if not file_info.is_dir() and os.path.splitext(file_info.filename)[1].lower() in audio_extensions:
93
+ try:
94
+ # extract returns path relative to extract dir; build absolute path
95
+ zf.extract(file_info, path=temp_extract_dir)
96
+ extracted_path = os.path.join(temp_extract_dir, file_info.filename)
97
+ # Ensure parent dirs exist (zip could contain folders)
98
+ extracted_path = os.path.normpath(extracted_path)
99
+ if os.path.exists(extracted_path):
100
+ extracted_audio_paths.append(extracted_path)
101
+ log_outputs.append(f"Extracted: {file_info.filename}")
102
+ extracted_count += 1
103
+ except Exception as e:
104
+ log_outputs.append(f"Error extracting {file_info.filename}: {e}")
105
 
106
  if extracted_count == 0:
107
+ log_outputs.append("No supported audio files found in the zip archive.")
108
+ # cleanup empty dir
109
+ try:
110
+ shutil.rmtree(temp_extract_dir)
111
+ log_outputs.append(f"Removed empty temporary directory: {temp_extract_dir}")
112
+ except Exception as e:
113
+ log_outputs.append(f"Warning: Could not remove temporary directory {temp_extract_dir}: {e}")
114
+ yield "\n\n".join(log_outputs), "", None, 100
115
+ return
 
 
 
116
 
117
  except pyzipper.BadZipFile:
118
+ log_outputs.append("Error: Invalid zip file format.")
 
119
  if os.path.exists(temp_extract_dir):
120
  try:
121
  shutil.rmtree(temp_extract_dir)
122
  log_outputs.append(f"Cleaned up partial temporary directory: {temp_extract_dir}")
123
  except OSError as e:
124
  log_outputs.append(f"Warning: Could not clean up partial temporary directory {temp_extract_dir}: {e}")
125
+ yield "\n\n".join(log_outputs), "", None, 100
126
+ return
 
 
127
  except Exception as e:
128
+ log_outputs.append(f"An unexpected error occurred during zip processing: {e}")
129
+ if os.path.exists(temp_extract_dir):
130
+ try:
131
+ shutil.rmtree(temp_extract_dir)
132
+ log_outputs.append(f"Cleaned up partial temporary directory: {temp_extract_dir}")
133
+ except OSError as e:
134
+ log_outputs.append(f"Warning: Could not clean up partial temporary directory {temp_extract_dir}: {e}")
135
+ yield "\n\n".join(log_outputs), "", None, 100
136
+ return
137
+
138
+ # Build list of input file paths (strings)
139
+ all_audio_paths = []
140
+ if file_paths:
141
+ # file_paths from Gradio with type="filepath" come as list of paths
142
+ if isinstance(file_paths, (list, tuple)):
143
+ all_audio_paths.extend(file_paths)
144
+ else:
145
+ all_audio_paths.append(file_paths)
146
+
147
  if extracted_audio_paths:
148
+ all_audio_paths.extend(extracted_audio_paths)
149
+
150
+ if not all_audio_paths:
151
+ log_outputs.append("No audio files provided for transcription.")
152
+ # cleanup
153
+ if os.path.exists(temp_extract_dir):
154
+ try:
155
+ shutil.rmtree(temp_extract_dir)
156
+ log_outputs.append(f"Cleaned up temporary directory: {temp_extract_dir}")
157
+ except OSError as e:
158
+ log_outputs.append(f"Warning: Could not clean up temporary directory {temp_extract_dir}: {e}")
159
+ yield "\n\n".join(log_outputs), "", None, 100
160
+ return
161
+
162
+ total_files = len(all_audio_paths)
163
+ processed = 0
164
+
165
+ # Load model once (cache)
166
+ if model_name not in model_cache:
167
+ log_outputs.append(f"Loading model: {model_name}")
168
+ yield "\n\n".join(log_outputs), "", None, 3
169
+ try:
170
+ model_cache[model_name] = whisper.load_model(model_name)
171
+ except Exception as e:
172
+ log_outputs.append(f"Error loading model {model_name}: {e}")
173
+ # cleanup
174
+ if os.path.exists(temp_extract_dir):
175
+ try:
176
+ shutil.rmtree(temp_extract_dir)
177
+ log_outputs.append(f"Cleaned up temporary directory after model loading error: {temp_extract_dir}")
178
+ except OSError as e:
179
+ log_outputs.append(f"Warning: Could not clean up temporary directory {temp_extract_dir}: {e}")
180
+ yield "\n\n".join(log_outputs), "", None, 100
181
+ return
182
+
183
+ model = model_cache[model_name]
184
+
185
+ # Process files one by one and yield progress
186
+ for idx, path in enumerate(all_audio_paths):
187
+ basename = os.path.basename(path)
188
+ try:
189
+ log_outputs.append(f"Starting processing: {basename}")
190
+ yield "\n\n".join(log_outputs), "\n\n".join(transcript_outputs_list), None, int(5 + 90 * (processed / total_files))
191
+
192
+ # If file is .dct or other non-wav, convert
193
  try:
194
+ wav_path = convert_to_wav_if_needed(path)
195
+ if wav_path != path:
196
+ log_outputs.append(f"Converted {basename} -> WAV")
197
+ else:
198
+ log_outputs.append(f"Using WAV file: {basename}")
199
+ yield "\n\n".join(log_outputs), "\n\n".join(transcript_outputs_list), None, int(5 + 90 * (processed / total_files))
200
  except Exception as e:
201
+ log_outputs.append(f"Conversion failed for {basename}: {e}")
202
+ transcript_outputs_list.append(f"Could not convert {basename}: {e}")
203
+ processed += 1
204
+ yield "\n\n".join(log_outputs), "\n\n".join(transcript_outputs_list), None, int(5 + 90 * (processed / total_files))
205
+ continue
 
 
 
 
 
206
 
207
+ # Transcribe using Whisper model
208
+ try:
209
+ log_outputs.append(f"Transcribing: {basename}")
210
+ yield "\n\n".join(log_outputs), "\n\n".join(transcript_outputs_list), None, int(10 + 80 * (processed / total_files))
211
 
212
+ result = model.transcribe(wav_path)
213
+ transcript = result.get("text", "")
214
 
215
+ # Save transcript to /tmp
216
+ base = os.path.splitext(basename)[0]
217
+ save_path = os.path.join('/tmp', f"{base}-transcript.txt")
218
+ with open(save_path, 'w', encoding='utf-8') as f:
219
+ f.write(transcript)
220
 
221
+ log_outputs.append(f"File processed: {basename} -> {save_path}")
222
+ transcript_outputs_list.append(f"Transcript for {basename}:\n{transcript}")
 
 
223
 
224
+ except Exception as e:
225
+ log_outputs.append(f"Error processing {basename}: {e}")
226
+ transcript_outputs_list.append(f"Could not transcribe {basename} due to an error: {e}")
227
+
228
+ finally:
229
+ # remove temporary wav if we created one
230
+ if wav_path != path and os.path.exists(wav_path):
231
+ try:
232
+ os.unlink(wav_path)
233
+ except Exception:
234
+ pass
235
 
236
+ processed += 1
237
+ percent = int(5 + 90 * (processed / total_files))
238
+ yield "\n\n".join(log_outputs), "\n\n".join(transcript_outputs_list), None, percent
239
 
240
  except Exception as e:
241
+ log_outputs.append(f"Unexpected error with {basename}: {e}")
242
+ transcript_outputs_list.append(f"Unexpected error with {basename}: {e}")
243
+ processed += 1
244
+ percent = int(5 + 90 * (processed / total_files))
245
+ yield "\n\n".join(log_outputs), "\n\n".join(transcript_outputs_list), None, percent
246
 
247
+ # After all files processed, possibly save merged Word file
248
  combined_transcript_string = "\n\n---\n\n".join(transcript_outputs_list)
249
 
250
+ if merge_checkbox and combined_transcript_string.strip():
251
  try:
252
  word_filename = save_as_word(combined_transcript_string)
253
  log_outputs.append(f"Merged transcript saved to: {word_filename}")
 
255
  except Exception as e:
256
  log_outputs.append(f"Error saving merged transcript to Word file: {e}")
257
 
258
+ # cleanup extracted files
 
259
  if os.path.exists(temp_extract_dir):
260
  try:
261
  shutil.rmtree(temp_extract_dir)
262
  log_outputs.append(f"Cleaned up temporary directory: {temp_extract_dir}")
263
  except OSError as e:
264
+ log_outputs.append(f"Warning: Could not clean up temporary temporary directory {temp_extract_dir}: {e}")
 
265
 
266
+ # final yield at 100%
267
+ yield "\n\n".join(log_outputs), combined_transcript_string, word_file_path, 100
268
 
269
 
270
  # Gradio UI
271
  with gr.Blocks() as demo:
272
+ gr.Markdown("## Whisper Transcription Tool (Multiple Files) — .dct support + progress")
273
 
274
  with gr.Row():
275
  model_dropdown = gr.Dropdown(
 
290
 
291
  log_output = gr.Textbox(label="Log Output", lines=10)
292
  transcript_output = gr.Textbox(label="Transcripts", lines=20)
293
+ word_file_output = gr.File(label="Download Merged Transcript (.docx)")
294
+ progress_num = gr.Number(value=0, label="Progress (%)")
295
 
296
  def update_file_visibility(merge_checked):
297
+ return gr.update(visible=merge_checked)
298
 
299
  merge_checkbox.change(
300
  update_file_visibility,
 
303
  api_name="update_file_visibility"
304
  )
305
 
 
306
  transcribe_btn.click(
307
  transcribe_multiple,
308
  inputs=[audio_input, model_dropdown, advanced_checkbox, merge_checkbox, zip_input, zip_password_input],
309
+ outputs=[log_output, transcript_output, word_file_output, progress_num]
310
  )
311
 
312
+
313
+ demo.launch()