Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from transformers import pipeline | |
| import torch | |
| import requests | |
| import re | |
| import tempfile | |
| import os | |
| import xml.etree.ElementTree as ET | |
| import torchaudio | |
| import concurrent.futures | |
| import uuid | |
| # Load Telegram credentials from env vars | |
| TELEGRAM_TOKEN = os.environ.get('TELEGRAM_TOKEN') | |
| TELEGRAM_CHAT_ID = os.environ.get('TELEGRAM_CHAT_ID') | |
| if not TELEGRAM_TOKEN or not TELEGRAM_CHAT_ID: | |
| raise ValueError("TELEGRAM_TOKEN and TELEGRAM_CHAT_ID must be set as environment variables in HF Space settings.") | |
| # Global cache for pipelines to avoid reloading models | |
| pipelines = {} | |
| # List of available Whisper models (from smallest/fastest to largest/most accurate) | |
| MODEL_OPTIONS = [ | |
| "openai/whisper-tiny", # ~39M params, fastest but least accurate | |
| "openai/whisper-base", # ~74M params, good balance | |
| "openai/whisper-small", # ~244M params, better accuracy | |
| "openai/whisper-medium", # ~769M params, high accuracy | |
| "openai/whisper-large", # ~1550M params, very high accuracy | |
| "openai/whisper-large-v3", # ~1550M params, latest with improvements | |
| ] | |
| # Function to get or load a pipeline for a given model | |
| def get_pipeline(model_id): | |
| if model_id not in pipelines: | |
| print(f"Loading model: {model_id}...") # Log for debugging in Spaces | |
| pipelines[model_id] = pipeline( | |
| "automatic-speech-recognition", | |
| model=model_id, | |
| device="cuda" if torch.cuda.is_available() else "cpu" # Use GPU if available | |
| ) | |
| return pipelines[model_id] | |
| # Function to send message to Telegram | |
| def send_to_telegram(message): | |
| url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage" | |
| payload = { | |
| "chat_id": TELEGRAM_CHAT_ID, | |
| "text": message, | |
| "parse_mode": "Markdown" | |
| } | |
| try: | |
| response = requests.post(url, json=payload) | |
| response.raise_for_status() | |
| return True | |
| except Exception as e: | |
| print(f"Telegram send error: {e}") | |
| return False | |
| # Function to fetch MP3 from Google Drive shareable link | |
| def fetch_from_google_drive(drive_link): | |
| match = re.search(r'/d/([a-zA-Z0-9_-]+)', drive_link) | |
| if not match: | |
| return None, "Invalid Google Drive link. Use a shareable link like https://drive.google.com/file/d/FILE_ID/view." | |
| file_id = match.group(1) | |
| download_url = f"https://drive.google.com/uc?export=download&id={file_id}" | |
| headers = {"User-Agent": "Mozilla/5.0 (compatible; PodcastTranscriber/1.0)"} | |
| try: | |
| response = requests.get(download_url, headers=headers, stream=True, allow_redirects=True) | |
| if "confirm" in response.url: | |
| confirm_match = re.search(r'confirm=([0-9A-Za-z_-]+)', response.url) | |
| if confirm_match: | |
| confirm_token = confirm_match.group(1) | |
| download_url = f"https://drive.google.com/uc?export=download&confirm={confirm_token}&id={file_id}" | |
| response = requests.get(download_url, headers=headers, stream=True) | |
| response.raise_for_status() | |
| total_size = int(response.headers.get('content-length', 0)) | |
| downloaded = 0 | |
| chunk_size = 1024 * 1024 | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: | |
| for chunk in response.iter_content(chunk_size=chunk_size): | |
| if chunk: | |
| tmp_file.write(chunk) | |
| downloaded += len(chunk) | |
| temp_path = tmp_file.name | |
| size_mb = downloaded / (1024 * 1024) | |
| return temp_path, f"Downloaded from Drive: {size_mb:.1f} MB" | |
| except Exception as e: | |
| return None, f"Error fetching from Drive: {str(e)} (Ensure the file is shared publicly or with 'Anyone with the link')" | |
| # Background transcription task | |
| def background_transcribe(task_id, audio_input, model_id, language, return_timestamps, podcast_url, drive_link): | |
| audio_file = None | |
| status_msg = f"Task {task_id}: Starting..." | |
| try: | |
| if drive_link: | |
| audio_file, msg = fetch_from_google_drive(drive_link) | |
| if not audio_file: | |
| send_to_telegram(f"Task {task_id} failed: {msg}") | |
| return | |
| status_msg += f"\n{msg}" | |
| elif podcast_url: | |
| podcast_match = re.search(r'id(\d+)', podcast_url) | |
| if not podcast_match: | |
| send_to_telegram(f"Task {task_id} failed: Invalid URL: No podcast ID.") | |
| return | |
| podcast_id = podcast_match.group(1) | |
| episode_match = re.search(r'i=(\d+)', podcast_url) | |
| if not episode_match: | |
| send_to_telegram(f"Task {task_id} failed: Invalid URL: No episode ID.") | |
| return | |
| episode_id = episode_match.group(1) | |
| headers = {"User-Agent": "Mozilla/5.0 (compatible; PodcastTranscriber/1.0)"} | |
| api_url = f"https://itunes.apple.com/lookup?id={podcast_id}&entity=podcast" | |
| api_response = requests.get(api_url, headers=headers) | |
| api_response.raise_for_status() | |
| data = api_response.json() | |
| if data['resultCount'] == 0: | |
| send_to_telegram(f"Task {task_id} failed: Podcast not found.") | |
| return | |
| feed_url = data['results'][0]['feedUrl'] | |
| rss_response = requests.get(feed_url, headers=headers) | |
| rss_response.raise_for_status() | |
| root = ET.fromstring(rss_response.content) | |
| ns = {'itunes': 'http://www.itunes.com/dtds/podcast-1.0.dtd'} | |
| mp3_url = None | |
| for item in root.findall('.//item'): | |
| episode_guid = item.find('guid') | |
| if episode_guid is not None and episode_id in episode_guid.text: | |
| enclosure = item.find('enclosure') | |
| if enclosure is not None: | |
| mp3_url = enclosure.get('url') | |
| break | |
| episode_elem = item.find('itunes:episode', ns) | |
| if episode_elem is not None and episode_elem.text == episode_id: | |
| enclosure = item.find('enclosure') | |
| if enclosure is not None: | |
| mp3_url = enclosure.get('url') | |
| break | |
| if not mp3_url: | |
| send_to_telegram(f"Task {task_id} failed: Episode not found.") | |
| return | |
| mp3_response = requests.get(mp3_url, headers=headers, stream=True) | |
| mp3_response.raise_for_status() | |
| total_size = int(mp3_response.headers.get('content-length', 0)) | |
| downloaded = 0 | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: | |
| for chunk in mp3_response.iter_content(chunk_size=1024 * 1024): | |
| if chunk: | |
| tmp_file.write(chunk) | |
| downloaded += len(chunk) | |
| audio_file = tmp_file.name | |
| size_mb = downloaded / (1024 * 1024) | |
| status_msg += f"\nDownloaded from podcast: {size_mb:.1f} MB" | |
| else: | |
| if audio_input is None: | |
| send_to_telegram(f"Task {task_id} failed: No audio provided.") | |
| return | |
| audio_file = audio_input | |
| waveform, sample_rate = torchaudio.load(audio_file) | |
| if waveform.shape[0] > 1: | |
| waveform = torch.mean(waveform, dim=0, keepdim=True) | |
| num_samples = waveform.shape[1] | |
| duration = num_samples / sample_rate | |
| status_msg += f"\nAudio duration: {duration / 60:.1f} minutes" | |
| pipe = get_pipeline(model_id) | |
| generate_kwargs = {"task": "transcribe", "language": language} | |
| chunk_length_s = 30 | |
| stride_length_s = 5 | |
| chunk_samples = int(chunk_length_s * sample_rate) | |
| stride_samples = int(stride_length_s * sample_rate) | |
| chunks = [] | |
| offsets = [] | |
| start = 0 | |
| while start < num_samples: | |
| end = min(start + chunk_samples, num_samples) | |
| chunks.append(waveform[:, start:end]) | |
| offsets.append(start / sample_rate) | |
| start += chunk_samples - 2 * stride_samples | |
| num_chunks = len(chunks) | |
| full_text = "" | |
| all_chunk_outputs = [] | |
| for i, (chunk, offset) in enumerate(zip(chunks, offsets)): | |
| output = pipe( | |
| {"waveform": chunk, "sampling_rate": sample_rate}, | |
| max_new_tokens=128, | |
| generate_kwargs=generate_kwargs, | |
| return_timestamps=return_timestamps, | |
| batch_size=1 | |
| ) | |
| if return_timestamps and "chunks" in output: | |
| adjusted_chunks = [] | |
| for ch in output["chunks"]: | |
| ts = list(ch["timestamp"]) | |
| if ts[0] is not None: | |
| ts[0] += offset | |
| if ts[1] is not None: | |
| ts[1] += offset | |
| adjusted_chunks.append({"text": ch["text"], "timestamp": tuple(ts)}) | |
| all_chunk_outputs.extend(adjusted_chunks) | |
| else: | |
| full_text += output["text"] + " " | |
| if os.path.exists(audio_file): | |
| os.unlink(audio_file) | |
| if return_timestamps: | |
| formatted = [] | |
| for chunk in all_chunk_outputs: | |
| start = f"{chunk['timestamp'][0]:.2f}s" if chunk['timestamp'][0] is not None else "0.00s" | |
| end = f"{chunk['timestamp'][1]:.2f}s" if chunk['timestamp'][1] is not None else "?.?s" | |
| formatted.append(f"[{start} - {end}] {chunk['text']}") | |
| transcript = "\n".join(formatted) | |
| else: | |
| transcript = full_text.strip() | |
| success = send_to_telegram(f"**Task {task_id} Complete!**\n\nTranscript:\n{transcript}") | |
| if not success: | |
| print(f"Failed to send task {task_id} to Telegram.") | |
| except Exception as e: | |
| send_to_telegram(f"Task {task_id} failed: {str(e)}") | |
| # Starter function for uploaded file | |
| def start_transcribe_upload(audio_input, model_id, language, timestamps_checkbox): | |
| task_id = str(uuid.uuid4())[:8] | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| executor.submit(background_transcribe, task_id, audio_input, model_id, language, timestamps_checkbox, None, None) | |
| return f"Task {task_id} started! Transcript will be sent to your Telegram bot when complete. You can close the browser." | |
| # Starter for podcast | |
| def start_transcribe_podcast(podcast_input, model_id, language, timestamps_checkbox): | |
| task_id = str(uuid.uuid4())[:8] | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| executor.submit(background_transcribe, task_id, None, model_id, language, timestamps_checkbox, podcast_input, None) | |
| return f"Task {task_id} started! Transcript will be sent to your Telegram bot when complete. You can close the browser." | |
| # Starter for Drive | |
| def start_transcribe_drive(drive_input, model_id, language, timestamps_checkbox): | |
| task_id = str(uuid.uuid4())[:8] | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| executor.submit(background_transcribe, task_id, None, model_id, language, timestamps_checkbox, None, drive_input) | |
| return f"Task {task_id} started! Transcript will be sent to your Telegram bot when complete. You can close the browser." | |
| # Create the Gradio app with a colorful, responsive theme | |
| theme = gr.themes.Soft( | |
| primary_hue="blue", | |
| secondary_hue="purple", | |
| neutral_hue="slate", | |
| font=[gr.themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui"] | |
| ) | |
| with gr.Blocks(theme=theme, title="MP3 to Text Transcriber") as demo: | |
| gr.Markdown( | |
| """ | |
| # π€ MP3 to Text Transcription Tool | |
| Upload an MP3, paste an Apple Podcasts URL, or provide a Google Drive shareable link to transcribe asynchronously. | |
| Results are sent to your Telegram botβno need to wait in the browser! | |
| (Bot token and chat ID are set as secrets in HF Space settings.) | |
| """, | |
| elem_classes=["centered"] | |
| ) | |
| with gr.Row(variant="panel", elem_classes=["max-w-4xl mx-auto"]): | |
| with gr.Column(scale=1): | |
| # Inputs (no Telegram fields anymore) | |
| audio_input = gr.Audio( | |
| sources="upload", | |
| type="filepath", | |
| label="π Upload Audio File (MP3/WAV/etc.)", | |
| elem_classes=["w-full"] | |
| ) | |
| podcast_input = gr.Textbox( | |
| label="π Apple Podcasts Episode URL (optional)", | |
| placeholder="e.g., https://podcasts.apple.com/us/podcast/.../id123?i=456", | |
| elem_classes=["w-full"] | |
| ) | |
| drive_input = gr.Textbox( | |
| label="π Google Drive Shareable Link (optional)", | |
| placeholder="e.g., https://drive.google.com/file/d/ABC123/view?usp=sharing", | |
| elem_classes=["w-full"] | |
| ) | |
| model_dropdown = gr.Dropdown( | |
| choices=MODEL_OPTIONS, | |
| value=MODEL_OPTIONS[1], | |
| label="π€ Select Whisper Model", | |
| info="Tiny: Fastest | Large-v3: Most accurate (slower on CPU)", | |
| elem_classes=["w-full"] | |
| ) | |
| language_dropdown = gr.Dropdown( | |
| choices=["english", "french", "german", "spanish", "italian", "portuguese", "dutch", "russian", "swedish", "chinese", "japanese", "korean", "arabic", "hindi"], | |
| value="english", | |
| label="π Language (for better accuracy)", | |
| elem_classes=["w-full"] | |
| ) | |
| timestamps_checkbox = gr.Checkbox( | |
| label="β° Include Timestamps?", | |
| value=False, | |
| info="Adds [start - end] tags to the transcript.", | |
| elem_classes=["w-full"] | |
| ) | |
| with gr.Column(scale=1): | |
| status_output = gr.Markdown("Ready to start task! π¬", elem_classes=["text-center"]) | |
| # Buttons | |
| with gr.Row(elem_classes=["w-full"]): | |
| transcribe_btn = gr.Button("π Start Transcribe Upload", variant="secondary", elem_classes=["flex-1"]) | |
| podcast_btn = gr.Button("π‘ Start Podcast Transcribe", variant="primary", elem_classes=["flex-1"]) | |
| drive_btn = gr.Button("π Start Drive Transcribe", variant="primary", elem_classes=["flex-1"]) | |
| # Events (removed Telegram inputs) | |
| transcribe_btn.click( | |
| fn=start_transcribe_upload, | |
| inputs=[audio_input, model_dropdown, language_dropdown, timestamps_checkbox], | |
| outputs=status_output | |
| ) | |
| podcast_btn.click( | |
| fn=start_transcribe_podcast, | |
| inputs=[podcast_input, model_dropdown, language_dropdown, timestamps_checkbox], | |
| outputs=status_output | |
| ) | |
| drive_btn.click( | |
| fn=start_transcribe_drive, | |
| inputs=[drive_input, model_dropdown, language_dropdown, timestamps_checkbox], | |
| outputs=status_output | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |