import gradio as gr from transformers import pipeline import torch import requests import re import tempfile import os import xml.etree.ElementTree as ET import torchaudio import concurrent.futures import uuid # Load Telegram credentials from env vars TELEGRAM_TOKEN = os.environ.get('TELEGRAM_TOKEN') TELEGRAM_CHAT_ID = os.environ.get('TELEGRAM_CHAT_ID') if not TELEGRAM_TOKEN or not TELEGRAM_CHAT_ID: raise ValueError("TELEGRAM_TOKEN and TELEGRAM_CHAT_ID must be set as environment variables in HF Space settings.") # Global cache for pipelines to avoid reloading models pipelines = {} # List of available Whisper models (from smallest/fastest to largest/most accurate) MODEL_OPTIONS = [ "openai/whisper-tiny", # ~39M params, fastest but least accurate "openai/whisper-base", # ~74M params, good balance "openai/whisper-small", # ~244M params, better accuracy "openai/whisper-medium", # ~769M params, high accuracy "openai/whisper-large", # ~1550M params, very high accuracy "openai/whisper-large-v3", # ~1550M params, latest with improvements ] # Function to get or load a pipeline for a given model def get_pipeline(model_id): if model_id not in pipelines: print(f"Loading model: {model_id}...") # Log for debugging in Spaces pipelines[model_id] = pipeline( "automatic-speech-recognition", model=model_id, device="cuda" if torch.cuda.is_available() else "cpu" # Use GPU if available ) return pipelines[model_id] # Function to send message to Telegram def send_to_telegram(message): url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage" payload = { "chat_id": TELEGRAM_CHAT_ID, "text": message, "parse_mode": "Markdown" } try: response = requests.post(url, json=payload) response.raise_for_status() return True except Exception as e: print(f"Telegram send error: {e}") return False # Function to fetch MP3 from Google Drive shareable link def fetch_from_google_drive(drive_link): match = re.search(r'/d/([a-zA-Z0-9_-]+)', drive_link) if not match: return None, "Invalid Google Drive link. Use a shareable link like https://drive.google.com/file/d/FILE_ID/view." file_id = match.group(1) download_url = f"https://drive.google.com/uc?export=download&id={file_id}" headers = {"User-Agent": "Mozilla/5.0 (compatible; PodcastTranscriber/1.0)"} try: response = requests.get(download_url, headers=headers, stream=True, allow_redirects=True) if "confirm" in response.url: confirm_match = re.search(r'confirm=([0-9A-Za-z_-]+)', response.url) if confirm_match: confirm_token = confirm_match.group(1) download_url = f"https://drive.google.com/uc?export=download&confirm={confirm_token}&id={file_id}" response = requests.get(download_url, headers=headers, stream=True) response.raise_for_status() total_size = int(response.headers.get('content-length', 0)) downloaded = 0 chunk_size = 1024 * 1024 with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: for chunk in response.iter_content(chunk_size=chunk_size): if chunk: tmp_file.write(chunk) downloaded += len(chunk) temp_path = tmp_file.name size_mb = downloaded / (1024 * 1024) return temp_path, f"Downloaded from Drive: {size_mb:.1f} MB" except Exception as e: return None, f"Error fetching from Drive: {str(e)} (Ensure the file is shared publicly or with 'Anyone with the link')" # Background transcription task def background_transcribe(task_id, audio_input, model_id, language, return_timestamps, podcast_url, drive_link): audio_file = None status_msg = f"Task {task_id}: Starting..." try: if drive_link: audio_file, msg = fetch_from_google_drive(drive_link) if not audio_file: send_to_telegram(f"Task {task_id} failed: {msg}") return status_msg += f"\n{msg}" elif podcast_url: podcast_match = re.search(r'id(\d+)', podcast_url) if not podcast_match: send_to_telegram(f"Task {task_id} failed: Invalid URL: No podcast ID.") return podcast_id = podcast_match.group(1) episode_match = re.search(r'i=(\d+)', podcast_url) if not episode_match: send_to_telegram(f"Task {task_id} failed: Invalid URL: No episode ID.") return episode_id = episode_match.group(1) headers = {"User-Agent": "Mozilla/5.0 (compatible; PodcastTranscriber/1.0)"} api_url = f"https://itunes.apple.com/lookup?id={podcast_id}&entity=podcast" api_response = requests.get(api_url, headers=headers) api_response.raise_for_status() data = api_response.json() if data['resultCount'] == 0: send_to_telegram(f"Task {task_id} failed: Podcast not found.") return feed_url = data['results'][0]['feedUrl'] rss_response = requests.get(feed_url, headers=headers) rss_response.raise_for_status() root = ET.fromstring(rss_response.content) ns = {'itunes': 'http://www.itunes.com/dtds/podcast-1.0.dtd'} mp3_url = None for item in root.findall('.//item'): episode_guid = item.find('guid') if episode_guid is not None and episode_id in episode_guid.text: enclosure = item.find('enclosure') if enclosure is not None: mp3_url = enclosure.get('url') break episode_elem = item.find('itunes:episode', ns) if episode_elem is not None and episode_elem.text == episode_id: enclosure = item.find('enclosure') if enclosure is not None: mp3_url = enclosure.get('url') break if not mp3_url: send_to_telegram(f"Task {task_id} failed: Episode not found.") return mp3_response = requests.get(mp3_url, headers=headers, stream=True) mp3_response.raise_for_status() total_size = int(mp3_response.headers.get('content-length', 0)) downloaded = 0 with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: for chunk in mp3_response.iter_content(chunk_size=1024 * 1024): if chunk: tmp_file.write(chunk) downloaded += len(chunk) audio_file = tmp_file.name size_mb = downloaded / (1024 * 1024) status_msg += f"\nDownloaded from podcast: {size_mb:.1f} MB" else: if audio_input is None: send_to_telegram(f"Task {task_id} failed: No audio provided.") return audio_file = audio_input waveform, sample_rate = torchaudio.load(audio_file) if waveform.shape[0] > 1: waveform = torch.mean(waveform, dim=0, keepdim=True) num_samples = waveform.shape[1] duration = num_samples / sample_rate status_msg += f"\nAudio duration: {duration / 60:.1f} minutes" pipe = get_pipeline(model_id) generate_kwargs = {"task": "transcribe", "language": language} chunk_length_s = 30 stride_length_s = 5 chunk_samples = int(chunk_length_s * sample_rate) stride_samples = int(stride_length_s * sample_rate) chunks = [] offsets = [] start = 0 while start < num_samples: end = min(start + chunk_samples, num_samples) chunks.append(waveform[:, start:end]) offsets.append(start / sample_rate) start += chunk_samples - 2 * stride_samples num_chunks = len(chunks) full_text = "" all_chunk_outputs = [] for i, (chunk, offset) in enumerate(zip(chunks, offsets)): output = pipe( {"waveform": chunk, "sampling_rate": sample_rate}, max_new_tokens=128, generate_kwargs=generate_kwargs, return_timestamps=return_timestamps, batch_size=1 ) if return_timestamps and "chunks" in output: adjusted_chunks = [] for ch in output["chunks"]: ts = list(ch["timestamp"]) if ts[0] is not None: ts[0] += offset if ts[1] is not None: ts[1] += offset adjusted_chunks.append({"text": ch["text"], "timestamp": tuple(ts)}) all_chunk_outputs.extend(adjusted_chunks) else: full_text += output["text"] + " " if os.path.exists(audio_file): os.unlink(audio_file) if return_timestamps: formatted = [] for chunk in all_chunk_outputs: start = f"{chunk['timestamp'][0]:.2f}s" if chunk['timestamp'][0] is not None else "0.00s" end = f"{chunk['timestamp'][1]:.2f}s" if chunk['timestamp'][1] is not None else "?.?s" formatted.append(f"[{start} - {end}] {chunk['text']}") transcript = "\n".join(formatted) else: transcript = full_text.strip() success = send_to_telegram(f"**Task {task_id} Complete!**\n\nTranscript:\n{transcript}") if not success: print(f"Failed to send task {task_id} to Telegram.") except Exception as e: send_to_telegram(f"Task {task_id} failed: {str(e)}") # Starter function for uploaded file def start_transcribe_upload(audio_input, model_id, language, timestamps_checkbox): task_id = str(uuid.uuid4())[:8] with concurrent.futures.ThreadPoolExecutor() as executor: executor.submit(background_transcribe, task_id, audio_input, model_id, language, timestamps_checkbox, None, None) return f"Task {task_id} started! Transcript will be sent to your Telegram bot when complete. You can close the browser." # Starter for podcast def start_transcribe_podcast(podcast_input, model_id, language, timestamps_checkbox): task_id = str(uuid.uuid4())[:8] with concurrent.futures.ThreadPoolExecutor() as executor: executor.submit(background_transcribe, task_id, None, model_id, language, timestamps_checkbox, podcast_input, None) return f"Task {task_id} started! Transcript will be sent to your Telegram bot when complete. You can close the browser." # Starter for Drive def start_transcribe_drive(drive_input, model_id, language, timestamps_checkbox): task_id = str(uuid.uuid4())[:8] with concurrent.futures.ThreadPoolExecutor() as executor: executor.submit(background_transcribe, task_id, None, model_id, language, timestamps_checkbox, None, drive_input) return f"Task {task_id} started! Transcript will be sent to your Telegram bot when complete. You can close the browser." # Create the Gradio app with a colorful, responsive theme theme = gr.themes.Soft( primary_hue="blue", secondary_hue="purple", neutral_hue="slate", font=[gr.themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui"] ) with gr.Blocks(theme=theme, title="MP3 to Text Transcriber") as demo: gr.Markdown( """ # 🎤 MP3 to Text Transcription Tool Upload an MP3, paste an Apple Podcasts URL, or provide a Google Drive shareable link to transcribe asynchronously. Results are sent to your Telegram bot—no need to wait in the browser! (Bot token and chat ID are set as secrets in HF Space settings.) """, elem_classes=["centered"] ) with gr.Row(variant="panel", elem_classes=["max-w-4xl mx-auto"]): with gr.Column(scale=1): # Inputs (no Telegram fields anymore) audio_input = gr.Audio( sources="upload", type="filepath", label="📁 Upload Audio File (MP3/WAV/etc.)", elem_classes=["w-full"] ) podcast_input = gr.Textbox( label="🔗 Apple Podcasts Episode URL (optional)", placeholder="e.g., https://podcasts.apple.com/us/podcast/.../id123?i=456", elem_classes=["w-full"] ) drive_input = gr.Textbox( label="📂 Google Drive Shareable Link (optional)", placeholder="e.g., https://drive.google.com/file/d/ABC123/view?usp=sharing", elem_classes=["w-full"] ) model_dropdown = gr.Dropdown( choices=MODEL_OPTIONS, value=MODEL_OPTIONS[1], label="🤖 Select Whisper Model", info="Tiny: Fastest | Large-v3: Most accurate (slower on CPU)", elem_classes=["w-full"] ) language_dropdown = gr.Dropdown( choices=["english", "french", "german", "spanish", "italian", "portuguese", "dutch", "russian", "swedish", "chinese", "japanese", "korean", "arabic", "hindi"], value="english", label="🌍 Language (for better accuracy)", elem_classes=["w-full"] ) timestamps_checkbox = gr.Checkbox( label="⏰ Include Timestamps?", value=False, info="Adds [start - end] tags to the transcript.", elem_classes=["w-full"] ) with gr.Column(scale=1): status_output = gr.Markdown("Ready to start task! 💬", elem_classes=["text-center"]) # Buttons with gr.Row(elem_classes=["w-full"]): transcribe_btn = gr.Button("🚀 Start Transcribe Upload", variant="secondary", elem_classes=["flex-1"]) podcast_btn = gr.Button("📡 Start Podcast Transcribe", variant="primary", elem_classes=["flex-1"]) drive_btn = gr.Button("📂 Start Drive Transcribe", variant="primary", elem_classes=["flex-1"]) # Events (removed Telegram inputs) transcribe_btn.click( fn=start_transcribe_upload, inputs=[audio_input, model_dropdown, language_dropdown, timestamps_checkbox], outputs=status_output ) podcast_btn.click( fn=start_transcribe_podcast, inputs=[podcast_input, model_dropdown, language_dropdown, timestamps_checkbox], outputs=status_output ) drive_btn.click( fn=start_transcribe_drive, inputs=[drive_input, model_dropdown, language_dropdown, timestamps_checkbox], outputs=status_output ) if __name__ == "__main__": demo.launch()