Spaces:
Running
Running
| import os | |
| import re | |
| import math | |
| import shutil | |
| import tempfile | |
| from datetime import timedelta | |
| from pydub import AudioSegment | |
| from pydub.utils import which | |
| from openai import OpenAI | |
| from dotenv import load_dotenv | |
| import gradio as gr | |
| # === CONFIG === | |
| chunk_duration_min = 9 | |
| chunk_dir = "temp_chunks" | |
| AudioSegment.converter = which("ffmpeg") | |
| load_dotenv() | |
| api_key = os.getenv("OPENAI_API_KEY") | |
| client = OpenAI(api_key=api_key) | |
| def split_audio_to_chunks(audio_file_path): | |
| os.makedirs(chunk_dir, exist_ok=True) | |
| audio = AudioSegment.from_file(audio_file_path) | |
| audio = audio.set_channels(1).set_frame_rate(16000) | |
| chunk_duration_ms = chunk_duration_min * 60 * 1000 | |
| total_chunks = math.ceil(len(audio) / chunk_duration_ms) | |
| chunk_paths = [] | |
| for i in range(total_chunks): | |
| start = i * chunk_duration_ms | |
| end = min(len(audio), start + chunk_duration_ms) | |
| chunk = audio[start:end] | |
| chunk_path = os.path.join(chunk_dir, f"chunk_{i+1}.mp3") | |
| chunk.export(chunk_path, format="mp3", bitrate="32k") | |
| chunk_paths.append(chunk_path) | |
| return chunk_paths | |
| def shift_srt_timestamps(srt_text, offset_seconds): | |
| def shift_timecode(tc): | |
| h, m, s_ms = tc.split(":") | |
| s, ms = s_ms.split(",") | |
| original = timedelta(hours=int(h), minutes=int(m), seconds=int(s), milliseconds=int(ms)) | |
| shifted = original + timedelta(seconds=offset_seconds) | |
| total_seconds = int(shifted.total_seconds()) | |
| ms = int(shifted.microseconds / 1000) | |
| h = total_seconds // 3600 | |
| m = (total_seconds % 3600) // 60 | |
| s = total_seconds % 60 | |
| return f"{h:02}:{m:02}:{s:02},{ms:03}" | |
| updated_lines = [] | |
| for line in srt_text.splitlines(): | |
| if " --> " in line: | |
| start, end = line.split(" --> ") | |
| new_start = shift_timecode(start.strip()) | |
| new_end = shift_timecode(end.strip()) | |
| updated_lines.append(f"{new_start} --> {new_end}") | |
| else: | |
| updated_lines.append(line) | |
| return "\n".join(updated_lines) | |
| def transcribe_chunks(chunk_paths): | |
| srt_blocks = [] | |
| for i, chunk_path in enumerate(chunk_paths): | |
| with open(chunk_path, "rb") as audio_file: | |
| result = client.audio.transcriptions.create( | |
| model="whisper-1", | |
| file=audio_file, | |
| response_format="srt" | |
| ) | |
| offset_sec = i * chunk_duration_min * 60 | |
| shifted = shift_srt_timestamps(result, offset_sec) | |
| srt_blocks.append(shifted) | |
| return "\n\n".join(srt_blocks) | |
| def parse_srt_paragraphs(srt_str): | |
| blocks = srt_str.strip().split("\n\n") | |
| paragraphs = [] | |
| current_paragraph = "" | |
| current_timestamp = "" | |
| for block in blocks: | |
| lines = block.strip().split("\n") | |
| if len(lines) >= 3: | |
| timestamp = lines[1].split(" --> ")[0].strip() | |
| text = " ".join(lines[2:]).strip() | |
| if not current_paragraph: | |
| current_timestamp = timestamp | |
| current_paragraph = text | |
| else: | |
| current_paragraph += " " + text | |
| if re.search(r'(?<=[.!?])["\']?\s', current_paragraph): | |
| paragraphs.append((current_timestamp, current_paragraph.strip())) | |
| current_paragraph = "" | |
| current_timestamp = "" | |
| # Flush any remaining text at the end | |
| if current_paragraph: | |
| paragraphs.append((current_timestamp, current_paragraph.strip())) | |
| return paragraphs | |
| def process_audio(audio_path): | |
| try: | |
| tmp_audio_path = audio_path # Already a file path from Gradio | |
| chunk_paths = split_audio_to_chunks(tmp_audio_path) | |
| merged_srt = transcribe_chunks(chunk_paths) | |
| transcript = parse_srt_paragraphs(merged_srt) | |
| output_lines = [] | |
| display_text = "" | |
| def timestamp_to_seconds(ts): | |
| h, m, s_ms = ts.split(":") | |
| s, ms = s_ms.split(",") | |
| total_seconds = int(h) * 3600 + int(m) * 60 + int(s) | |
| return total_seconds # integer seconds only | |
| for ts, para in transcript: | |
| seconds = timestamp_to_seconds(ts) | |
| out = f"{seconds} {para}" | |
| output_lines.append(out) | |
| display_text += f"**{seconds}s** β {para}\n\n" | |
| output_txt_path = tempfile.NamedTemporaryFile(delete=False, suffix=".txt").name | |
| with open(output_txt_path, "w", encoding="utf-8") as f: | |
| f.write("\n".join(output_lines)) | |
| return display_text, output_txt_path | |
| finally: | |
| shutil.rmtree(chunk_dir, ignore_errors=True) | |
| # === Gradio Interface === | |
| demo = gr.Interface( | |
| fn=process_audio, | |
| inputs=gr.Audio(type="filepath", label="π§ Upload MP3 Audio"), | |
| outputs=[ | |
| gr.Markdown(label="π Timestamped Transcript"), | |
| gr.File(label="π₯ Download TXT File") | |
| ], | |
| title="π Audio Timestamp Generator", | |
| # description="Upload an MP3 file. The tool splits the audio into chunks, transcribes them with Whisper, and returns a paragraph-wise timestamped transcript (timestamps in integer seconds).", | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(ssr_mode=False) |