Woziii commited on
Commit
ec37ecb
·
verified ·
1 Parent(s): 9023c36

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +74 -71
app.py CHANGED
@@ -8,13 +8,13 @@ from pydub import AudioSegment
8
  from transformers import pipeline
9
 
10
  # -------------------------------------------------
11
- # Configuration du modèle
12
  # -------------------------------------------------
13
  MODEL_NAME = "openai/whisper-large-v3"
14
  device = "cuda" if torch.cuda.is_available() else "cpu"
15
 
16
  pipe = pipeline(
17
- "automatic-speech-recognition",
18
  model=MODEL_NAME,
19
  device=device,
20
  model_kwargs={"low_cpu_mem_usage": True},
@@ -23,73 +23,70 @@ pipe = pipeline(
23
  TEMP_DIR = "./temp_audio"
24
  os.makedirs(TEMP_DIR, exist_ok=True)
25
 
26
- # -------------------------------------------------
27
- # Initialisation de l'état
28
- # -------------------------------------------------
29
  def init_metadata_state():
30
  return []
31
 
32
- # -------------------------------------------------
33
- # Étape 2 : Transcription avec Whisper
34
- # -------------------------------------------------
35
  def transcribe_audio(audio_path):
36
  if not audio_path:
37
- return "Aucun fichier audio fourni", [], None
38
-
39
- print("📌 Début de la transcription avec Whisper...")
40
 
41
- generate_kwargs = {
42
- "language": "french",
43
- "max_new_tokens": 448,
44
- "num_beams": 1,
45
- "condition_on_prev_tokens": False,
46
- "compression_ratio_threshold": 1.35,
47
- "temperature": (0.0, 0.2, 0.4, 0.6, 0.8, 1.0),
48
- "logprob_threshold": -1.0,
49
- "no_speech_threshold": 0.6,
50
- "return_timestamps": True,
51
- }
52
-
53
- result = pipe(audio_path, return_timestamps="word", generate_kwargs=generate_kwargs)
54
 
55
- print("📌 DEBUG - Timestamps reçus de Whisper:", result["chunks"])
 
56
 
57
- raw_transcription = " ".join([chunk["text"] for chunk in result["chunks"]])
58
- word_timestamps = [(chunk["text"], (chunk["timestamp"][0], chunk["timestamp"][1])) for chunk in result["chunks"]]
59
-
60
- return raw_transcription, word_timestamps, audio_path
61
-
62
- # -------------------------------------------------
63
- # Étape 5 : Validation des segments
64
- # -------------------------------------------------
65
- def validate_segments(audio_path, table_data, metadata_state, word_timestamps):
66
- if not audio_path or not word_timestamps:
67
- return [], metadata_state
68
-
69
- original_audio = AudioSegment.from_file(audio_path)
70
- segment_paths = []
71
- updated_metadata = []
72
 
 
 
 
73
  for i, row in enumerate(table_data):
74
  if not row or len(row) < 1 or not row[0].strip():
75
  continue
76
 
77
  text = row[0].strip()
78
  segment_id = f"seg_{i+1:02d}"
79
- matching_timestamps = [(start, end) for word, (start, end) in word_timestamps if word in text]
 
 
 
80
 
81
  if matching_timestamps:
82
  start_time, end_time = matching_timestamps[0]
83
  else:
84
- print(f"⚠️ Aucun timestamp trouvé pour '{text}', estimation en cours...")
85
- start_time = 0
86
- end_time = start_time + 1.0
87
 
88
- start_ms = int(float(start_time) * 1000)
89
- end_ms = int(float(end_time) * 1000)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90
 
91
  segment_filename = f"{Path(audio_path).stem}_{segment_id}.wav"
92
  segment_path = os.path.join(TEMP_DIR, segment_filename)
 
93
  extract = original_audio[start_ms:end_ms]
94
  extract.export(segment_path, format="wav")
95
 
@@ -101,34 +98,30 @@ def validate_segments(audio_path, table_data, metadata_state, word_timestamps):
101
  "end_time": end_time,
102
  "id": segment_id,
103
  })
104
-
105
  return segment_paths, updated_metadata
106
 
107
- # -------------------------------------------------
108
- # Étape 8 : Génération du ZIP
109
- # -------------------------------------------------
110
  def generate_zip(metadata_state):
111
  if not metadata_state:
112
  return None
113
-
114
  zip_path = os.path.join(TEMP_DIR, "dataset.zip")
115
  if os.path.exists(zip_path):
116
  os.remove(zip_path)
117
-
118
  metadata_csv_path = os.path.join(TEMP_DIR, "metadata.csv")
119
  with open(metadata_csv_path, "w", encoding="utf-8") as f:
120
  f.write("audio_file|text|speaker_name|API\n")
121
  for seg in metadata_state:
122
- line = f"{seg['audio_file']}|{seg['text']}|projectname|/API_PHONETIC/\n"
123
- f.write(line)
124
-
125
  with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
126
  zf.write(metadata_csv_path, "metadata.csv")
127
  for seg in metadata_state:
128
  file_path = os.path.join(TEMP_DIR, seg["audio_file"])
129
  if os.path.exists(file_path):
130
  zf.write(file_path, seg["audio_file"])
131
-
132
  return zip_path
133
 
134
  # -------------------------------------------------
@@ -136,26 +129,36 @@ def generate_zip(metadata_state):
136
  # -------------------------------------------------
137
  with gr.Blocks() as demo:
138
  gr.Markdown("# Application de Découpe Audio")
139
-
140
  metadata_state = gr.State(init_metadata_state())
 
 
 
 
 
 
 
 
141
 
142
- with gr.Column():
143
- gr.Markdown("### 1. Téléversez un fichier audio")
144
- audio_input = gr.Audio(type="filepath", label="Fichier audio")
145
 
146
- raw_transcription = gr.Textbox(label="Transcription (Whisper)", interactive=False)
147
- table = gr.Dataframe(headers=["Texte"], datatype=["str"], row_count="dynamic")
148
- validate_button = gr.Button("Valider et générer les extraits")
149
 
150
- @gr.render(inputs=table)
151
- def render_audio_players(data):
152
- return [gr.Audio(label=f"Extrait {i+1}", interactive=False) for i in range(len(data))]
 
 
 
 
153
 
154
- generate_button = gr.Button("Générer le fichier ZIP")
155
- zip_file = gr.File(label="Télécharger le ZIP")
 
 
 
 
 
156
 
157
- audio_input.change(transcribe_audio, inputs=audio_input, outputs=[raw_transcription, table, audio_input])
158
- validate_button.click(validate_segments, inputs=[audio_input, table, metadata_state], outputs=[metadata_state])
159
  generate_button.click(generate_zip, inputs=metadata_state, outputs=zip_file)
160
 
161
- demo.queue().launch()
 
8
  from transformers import pipeline
9
 
10
  # -------------------------------------------------
11
+ # Configuration
12
  # -------------------------------------------------
13
  MODEL_NAME = "openai/whisper-large-v3"
14
  device = "cuda" if torch.cuda.is_available() else "cpu"
15
 
16
  pipe = pipeline(
17
+ task="automatic-speech-recognition",
18
  model=MODEL_NAME,
19
  device=device,
20
  model_kwargs={"low_cpu_mem_usage": True},
 
23
  TEMP_DIR = "./temp_audio"
24
  os.makedirs(TEMP_DIR, exist_ok=True)
25
 
 
 
 
26
  def init_metadata_state():
27
  return []
28
 
 
 
 
29
  def transcribe_audio(audio_path):
30
  if not audio_path:
31
+ return "Aucun fichier audio fourni", [], None, []
 
 
32
 
33
+ result = pipe(audio_path, return_timestamps="word", generate_kwargs={"language": "french"})
34
+ words = result.get("chunks", [])
 
 
 
 
 
 
 
 
 
 
 
35
 
36
+ if not words:
37
+ return "Erreur lors de la récupération des timestamps", [], None, []
38
 
39
+ raw_transcription = " ".join([w["text"] for w in words])
40
+ word_timestamps = [(w["text"], w["timestamp"]) for w in words]
41
+
42
+ return raw_transcription, [], audio_path, word_timestamps
 
 
 
 
 
 
 
 
 
 
 
43
 
44
+ def preprocess_segments(table_data, word_timestamps):
45
+ formatted_data = []
46
+
47
  for i, row in enumerate(table_data):
48
  if not row or len(row) < 1 or not row[0].strip():
49
  continue
50
 
51
  text = row[0].strip()
52
  segment_id = f"seg_{i+1:02d}"
53
+
54
+ matching_timestamps = [
55
+ (start, end) for word, (start, end) in word_timestamps if word in text
56
+ ]
57
 
58
  if matching_timestamps:
59
  start_time, end_time = matching_timestamps[0]
60
  else:
61
+ start_time, end_time = None, None
62
+
63
+ formatted_data.append([text, start_time, end_time, segment_id])
64
 
65
+ return formatted_data
66
+
67
+ def validate_segments(audio_path, table_data, metadata_state, word_timestamps):
68
+ if not audio_path or not word_timestamps:
69
+ return [], metadata_state
70
+
71
+ if os.path.exists(TEMP_DIR):
72
+ shutil.rmtree(TEMP_DIR)
73
+ os.makedirs(TEMP_DIR, exist_ok=True)
74
+
75
+ original_audio = AudioSegment.from_file(audio_path)
76
+ segment_paths = []
77
+ updated_metadata = []
78
+
79
+ for text, start_time, end_time, segment_id in table_data:
80
+ if start_time is None or end_time is None:
81
+ continue
82
+
83
+ start_ms, end_ms = int(float(start_time) * 1000), int(float(end_time) * 1000)
84
+ if start_ms < 0 or end_ms <= start_ms:
85
+ continue
86
 
87
  segment_filename = f"{Path(audio_path).stem}_{segment_id}.wav"
88
  segment_path = os.path.join(TEMP_DIR, segment_filename)
89
+
90
  extract = original_audio[start_ms:end_ms]
91
  extract.export(segment_path, format="wav")
92
 
 
98
  "end_time": end_time,
99
  "id": segment_id,
100
  })
101
+
102
  return segment_paths, updated_metadata
103
 
 
 
 
104
  def generate_zip(metadata_state):
105
  if not metadata_state:
106
  return None
107
+
108
  zip_path = os.path.join(TEMP_DIR, "dataset.zip")
109
  if os.path.exists(zip_path):
110
  os.remove(zip_path)
111
+
112
  metadata_csv_path = os.path.join(TEMP_DIR, "metadata.csv")
113
  with open(metadata_csv_path, "w", encoding="utf-8") as f:
114
  f.write("audio_file|text|speaker_name|API\n")
115
  for seg in metadata_state:
116
+ f.write(f"{seg['audio_file']}|{seg['text']}|projectname|/API_PHONETIC/\n")
117
+
 
118
  with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
119
  zf.write(metadata_csv_path, "metadata.csv")
120
  for seg in metadata_state:
121
  file_path = os.path.join(TEMP_DIR, seg["audio_file"])
122
  if os.path.exists(file_path):
123
  zf.write(file_path, seg["audio_file"])
124
+
125
  return zip_path
126
 
127
  # -------------------------------------------------
 
129
  # -------------------------------------------------
130
  with gr.Blocks() as demo:
131
  gr.Markdown("# Application de Découpe Audio")
 
132
  metadata_state = gr.State(init_metadata_state())
133
+ extracted_segments = gr.State([])
134
+
135
+ audio_input = gr.Audio(type="filepath", label="Fichier audio")
136
+ raw_transcription = gr.Textbox(label="Transcription", interactive=False)
137
+ table = gr.Dataframe(headers=["Texte"], datatype=["str"], row_count=(1, "dynamic"), col_count=1)
138
+ validate_button = gr.Button("Valider")
139
+ generate_button = gr.Button("Générer ZIP")
140
+ zip_file = gr.File(label="Télécharger le ZIP")
141
 
142
+ word_timestamps = gr.State()
 
 
143
 
144
+ audio_input.change(transcribe_audio, inputs=audio_input, outputs=[raw_transcription, table, audio_input, word_timestamps])
 
 
145
 
146
+ validate_button.click(
147
+ fn=lambda table_data, word_timestamps, audio_path, metadata_state: validate_segments(
148
+ audio_path, preprocess_segments(table_data, word_timestamps), metadata_state, word_timestamps
149
+ ),
150
+ inputs=[table, word_timestamps, audio_input, metadata_state],
151
+ outputs=[extracted_segments, metadata_state],
152
+ )
153
 
154
+ @gr.render(inputs=extracted_segments)
155
+ def show_audio_excerpts(segments):
156
+ if not segments:
157
+ gr.Markdown("Aucun extrait généré.")
158
+ else:
159
+ for i, seg in enumerate(segments):
160
+ gr.Audio(label=f"Extrait {i+1}", value=seg)
161
 
 
 
162
  generate_button.click(generate_zip, inputs=metadata_state, outputs=zip_file)
163
 
164
+ demo.queue().launch()