| | import gradio as gr |
| | import asyncio |
| | from asyncio import Semaphore |
| | from pathlib import Path |
| | import os |
| | import tempfile |
| | import traceback |
| | import re |
| | import pandas as pd |
| | from dataclasses import dataclass |
| | from typing import Dict, AsyncGenerator, Tuple, Any, List |
| |
|
| | |
| | |
| | from google import genai |
| | from youtube_transcript_api import YouTubeTranscriptApi |
| |
|
| | |
| | from pydub import AudioSegment |
| | from pydub.exceptions import CouldntDecodeError |
| |
|
| |
|
| | |
| | PROMPT_KEYS = ["titles_and_thumbnails", "description", "previews", "clips", "timestamps"] |
| | PROMPT_DISPLAY_NAMES = { |
| | "titles_and_thumbnails": "Titles and Thumbnails", |
| | "description": "Twitter Description", |
| | "previews": "Preview Clips", |
| | "clips": "Twitter Clips", |
| | "timestamps": "Timestamps" |
| | } |
| | |
| | AUDIO_CHUNK_DURATION_MS = 30 * 60 * 1000 |
| | |
| | MAX_CONCURRENT_TRANSCRIPTIONS = 3 |
| | MAX_CONCURRENT_GENERATIONS = 4 |
| |
|
| | |
| | |
| | @dataclass |
| | class ContentRequest: |
| | prompt_key: str |
| |
|
| | class ContentGenerator: |
| | def __init__(self): |
| | self.current_prompts = self._load_default_prompts() |
| | self.client: genai.Client | None = None |
| |
|
| | def _load_default_prompts(self) -> Dict[str, str]: |
| | |
| | prompts = {} |
| | timestamp_examples, title_examples, description_examples, clip_examples = "", "", "", "" |
| | try: |
| | data_dir = Path("data") |
| | if data_dir.is_dir(): |
| | try: timestamps_df = pd.read_csv(data_dir / "Timestamps.csv"); timestamp_examples = "\n\n".join(timestamps_df['Timestamps'].dropna().tolist()) |
| | except Exception as e: print(f"Warning: Loading Timestamps.csv failed: {e}") |
| | try: titles_df = pd.read_csv(data_dir / "Titles & Thumbnails.csv"); title_examples = "\n".join([f'Title: "{r.Titles}"\nThumbnail: "{r.Thumbnail}"' for _, r in titles_df.iterrows() if pd.notna(r.Titles) and pd.notna(r.Thumbnail)]) |
| | except Exception as e: print(f"Warning: Loading Titles & Thumbnails.csv failed: {e}") |
| | try: descriptions_df = pd.read_csv(data_dir / "Viral Episode Descriptions.csv"); description_examples = "\n".join([f'Tweet: "{r["Tweet Text"]}"' for _, r in descriptions_df.iterrows() if pd.notna(r["Tweet Text"])]) |
| | except Exception as e: print(f"Warning: Loading Viral Episode Descriptions.csv failed: {e}") |
| | try: clips_df = pd.read_csv(data_dir / "Viral Twitter Clips.csv"); clip_examples = "\n\n".join([f'Tweet Text: "{r["Tweet Text"]}"\nClip Transcript: "{r["Clip Transcript"]}"' for _, r in clips_df.iterrows() if pd.notna(r["Tweet Text"]) and pd.notna(r["Clip Transcript"])]) |
| | except Exception as e: print(f"Warning: Loading Viral Twitter Clips.csv failed: {e}") |
| | else: print("Warning: 'data' directory not found.") |
| | except Exception as e: print(f"Warning: Error accessing 'data' directory: {e}") |
| |
|
| | prompts_dir = Path("prompts") |
| | if not prompts_dir.is_dir(): |
| | print("Error: 'prompts' directory not found.") |
| | return {key: f"ERROR: Prompt directory missing." for key in PROMPT_KEYS} |
| | for key in PROMPT_KEYS: |
| | try: |
| | prompt = (prompts_dir / f"{key}.txt").read_text(encoding='utf-8') |
| | if key == "timestamps": prompt = prompt.replace("{timestamps_examples}", timestamp_examples) |
| | elif key == "titles_and_thumbnails": prompt = prompt.replace("{title_examples}", title_examples) |
| | elif key == "description": prompt = prompt.replace("{description_examples}", description_examples) |
| | elif key == "clips": prompt = prompt.replace("{clip_examples}", clip_examples) |
| | prompts[key] = prompt |
| | except Exception as e: |
| | print(f"Warning: Loading prompt prompts/{key}.txt failed: {e}") |
| | prompts[key] = f"Generate {key} based on the transcript. Do not use markdown formatting." |
| | for key in PROMPT_KEYS: prompts.setdefault(key, f"Generate {key} based on the transcript. Do not use markdown formatting.") |
| | return prompts |
| |
|
| | async def generate_content(self, request: ContentRequest, transcript: str) -> str: |
| | |
| | if not self.client: return "ERROR_CONFIGURATION: Gemini Client not initialized." |
| | if not transcript: return "ERROR_INTERNAL: Empty transcript provided for content generation." |
| | try: |
| | system_prompt = self.current_prompts.get(request.prompt_key) |
| | if not system_prompt: return f"ERROR_INTERNAL: System prompt for '{request.prompt_key}' missing." |
| | contents_for_api = [system_prompt, transcript] |
| | |
| | model_name = "gemini-2.5-pro-preview-03-25" |
| | response = await asyncio.to_thread( |
| | self.client.models.generate_content, model=model_name, contents=contents_for_api |
| | ) |
| | if not response: return f"ERROR_API: No response received for {request.prompt_key}." |
| | try: |
| | if response.text: |
| | try: |
| | if hasattr(response, 'prompt_feedback') and response.prompt_feedback.block_reason: |
| | reason = response.prompt_feedback.block_reason.name; return f"ERROR_BLOCKED: Blocked by API. Reason: {reason}" |
| | except AttributeError: pass |
| | return str(response.text.strip()) |
| | else: |
| | if response.candidates and response.candidates[0].content and response.candidates[0].content.parts: |
| | full_text = "".join(part.text for part in response.candidates[0].content.parts if hasattr(part, 'text')).strip() |
| | if full_text: |
| | print(f"Warning: Used fallback text extraction via candidates for {request.prompt_key}") |
| | return str(full_text) |
| | return f"ERROR_NO_TEXT: Could not extract text from response for {request.prompt_key}." |
| | except (ValueError, AttributeError) as e: |
| | print(f"Error accessing response text/feedback for {request.prompt_key} (potentially blocked): {e}") |
| | reason = "Unknown" |
| | try: |
| | if hasattr(response, 'prompt_feedback') and response.prompt_feedback.block_reason: reason = response.prompt_feedback.block_reason.name |
| | except AttributeError: pass |
| | return f"ERROR_BLOCKED: Content generation failed (possibly blocked). Reason: {reason}" |
| | except Exception as e: |
| | print(f"Error generating content for {request.prompt_key}: {traceback.format_exc()}") |
| | error_str = str(e).lower() |
| | |
| | if "rate limit exceeded" in error_str or "quota exceeded" in error_str or "429" in error_str: |
| | return f"ERROR_RATE_LIMIT: API limit likely exceeded. Details: {str(e)}" |
| | elif "permission denied" in error_str or "api key not valid" in error_str: return f"ERROR_PERMISSION_DENIED: API Error (Permission Denied?). Check Key. Details: {str(e)}" |
| | |
| | elif "model" in error_str and "not found" in error_str: return f"ERROR_MODEL_NOT_FOUND: Model name likely incorrect. Details: {str(e)}" |
| | else: return f"ERROR_API_GENERAL: API Error during generation. Details: {str(e)}" |
| |
|
| | def update_prompts(self, *values): |
| | |
| | updated_keys = [] |
| | for key, value in zip(PROMPT_KEYS, values): |
| | if isinstance(value, str): self.current_prompts[key] = value; updated_keys.append(key) |
| | return f"Prompts updated: {', '.join(updated_keys)}" if updated_keys else "No prompts updated." |
| |
|
| | |
| | def extract_video_id(url: str) -> str | None: |
| | patterns = [r"(?:v=|\/)([0-9A-Za-z_-]{11}).*", r"youtu\.be\/([0-9A-Za-z_-]{11})"] |
| | for pattern in patterns: |
| | match = re.search(pattern, url); |
| | if match: return match.group(1) |
| | return None |
| |
|
| | def get_transcript(video_id: str) -> str: |
| | if not video_id: raise ValueError("Invalid Video ID") |
| | try: |
| | t_list = YouTubeTranscriptApi.list_transcripts(video_id) |
| | transcript = t_list.find_transcript(['en', 'en-US']) |
| | fetched = transcript.fetch() |
| | if not fetched: raise ValueError("Fetched transcript empty") |
| | return " ".join(entry.get("text", "") for entry in fetched).strip() |
| | except Exception as e: |
| | return f"ERROR_TRANSCRIPT_FETCH: Failed for ID '{video_id}'. Reason: {e}" |
| |
|
| | |
| | class TranscriptProcessor: |
| | def __init__(self): |
| | self.generator = ContentGenerator() |
| |
|
| | |
| | def _get_youtube_transcript(self, url: str) -> str: |
| | |
| | print(f"Extracting Video ID from: {url}") |
| | video_id = extract_video_id(url) |
| | if not video_id: raise ValueError(f"Invalid YouTube URL/ID: {url}") |
| | print(f"Video ID: {video_id}. Fetching transcript...") |
| | try: |
| | transcript = get_transcript(video_id) |
| | if transcript.startswith("ERROR_TRANSCRIPT_FETCH"): raise Exception(transcript) |
| | if not transcript: raise ValueError(f"Empty transcript for ID: {video_id}") |
| | print(f"Transcript fetched (length: {len(transcript)}).") |
| | return transcript |
| | except Exception as e: print(f"Error fetching YouTube transcript: {e}"); raise Exception(f"Failed to get YouTube transcript: {str(e)}") |
| |
|
| |
|
| | |
| | async def _transcribe_chunk(self, client: genai.Client, chunk_path: Path, chunk_index: int, total_chunks: int, semaphore: Semaphore) -> str: |
| | """Transcribes a single audio chunk using Gemini API, respecting the semaphore.""" |
| | |
| | async with semaphore: |
| | print(f"Semaphore acquired for chunk {chunk_index + 1}/{total_chunks}. Processing...") |
| | gemini_audio_file_ref = None |
| | try: |
| | print(f"Uploading chunk {chunk_index + 1}/{total_chunks}: {chunk_path.name}") |
| | gemini_audio_file_ref = await asyncio.to_thread(client.files.upload, file=chunk_path) |
| | print(f"Chunk {chunk_index + 1} uploaded. File Ref: {gemini_audio_file_ref.name}") |
| |
|
| | prompt_for_transcription = "Transcribe the following audio file accurately." |
| | contents = [prompt_for_transcription, gemini_audio_file_ref] |
| | |
| | model_name = "gemini-2.5-pro-preview-03-25" |
| |
|
| | print(f"Requesting transcription for chunk {chunk_index + 1}...") |
| | |
| | transcription_response = await asyncio.to_thread( |
| | client.models.generate_content, model=model_name, contents=contents |
| | ) |
| | print(f"Transcription response received for chunk {chunk_index + 1}.") |
| |
|
| | |
| | transcript_piece = "" |
| | try: |
| | if transcription_response.text: |
| | transcript_piece = transcription_response.text.strip() |
| | elif transcription_response.candidates and transcription_response.candidates[0].content and transcription_response.candidates[0].content.parts: |
| | transcript_piece = "".join(part.text for part in transcription_response.candidates[0].content.parts if hasattr(part, 'text')).strip() |
| |
|
| | if not transcript_piece and hasattr(transcription_response, 'prompt_feedback') and transcription_response.prompt_feedback.block_reason: |
| | reason = transcription_response.prompt_feedback.block_reason.name |
| | print(f"Warning: Transcription blocked for chunk {chunk_index + 1}. Reason: {reason}") |
| | return f"[CHUNK_ERROR: Blocked - {reason}]" |
| |
|
| | print(f"Chunk {chunk_index + 1} transcript length: {len(transcript_piece)}") |
| | return str(transcript_piece) |
| |
|
| | except (ValueError, AttributeError, Exception) as extraction_err: |
| | print(f"Error extracting transcript for chunk {chunk_index + 1}: {extraction_err}. Response: {transcription_response}") |
| | return f"[CHUNK_ERROR: Extraction Failed - {str(extraction_err)}]" |
| |
|
| | except Exception as e: |
| | print(f"Error processing chunk {chunk_index + 1} (within semaphore): {traceback.format_exc()}") |
| | error_str = str(e).lower() |
| | |
| | if "rate limit exceeded" in error_str or "quota exceeded" in error_str or "429" in error_str: |
| | return f"[CHUNK_ERROR: API Rate Limit Exceeded - {str(e)}]" |
| | elif "permission denied" in error_str or "api key not valid" in error_str: |
| | return f"[CHUNK_ERROR: API Permission Denied - {str(e)}]" |
| | elif "file size" in error_str: |
| | return f"[CHUNK_ERROR: File Size Limit Exceeded - {str(e)}]" |
| | else: |
| | return f"[CHUNK_ERROR: General API/Processing Error - {str(e)}]" |
| | finally: |
| | |
| | if gemini_audio_file_ref: |
| | |
| | asyncio.create_task(self.delete_uploaded_file(client, gemini_audio_file_ref.name, f"chunk {chunk_index + 1} cleanup")) |
| | if chunk_path.exists(): |
| | try: |
| | os.remove(chunk_path) |
| | except OSError as e: |
| | print(f"Warning: Could not delete local temp chunk file {chunk_path}: {e}") |
| | print(f"Semaphore released for chunk {chunk_index + 1}/{total_chunks}.") |
| | |
| |
|
| |
|
| | async def process_transcript(self, client: genai.Client, audio_file: Any) -> AsyncGenerator[Tuple[str, Any], None]: |
| | """ |
| | Processes audio with larger chunks and controlled concurrency using Semaphores. |
| | """ |
| | if AudioSegment is None: |
| | yield "error", "Audio processing library (pydub) not loaded. Cannot proceed." |
| | return |
| | if not client: |
| | yield "error", "Gemini Client object was not provided." |
| | return |
| | self.generator.client = client |
| | if not audio_file: |
| | yield "error", "No audio file provided." |
| | return |
| |
|
| | audio_path_str = getattr(audio_file, 'name', None) |
| | if not audio_path_str: |
| | yield "error", "Invalid audio file object." |
| | return |
| | original_audio_path = Path(audio_path_str) |
| | if not original_audio_path.exists(): |
| | yield "error", f"Audio file not found: {original_audio_path}" |
| | return |
| |
|
| | |
| | transcription_semaphore = Semaphore(MAX_CONCURRENT_TRANSCRIPTIONS) |
| | generation_semaphore = Semaphore(MAX_CONCURRENT_GENERATIONS) |
| |
|
| | try: |
| | yield "status", f"Loading audio file: {original_audio_path.name}..." |
| | print(f"Loading audio file with pydub: {original_audio_path}") |
| | try: |
| | file_format = original_audio_path.suffix.lower().replace('.', '') |
| | audio = AudioSegment.from_file(original_audio_path, format=file_format if file_format else None) |
| | except CouldntDecodeError as decode_error: |
| | print(f"pydub decode error: {decode_error}. Make sure ffmpeg is installed.") |
| | yield "error", f"Failed to load/decode audio file: {original_audio_path.name}. Ensure valid format and ffmpeg." |
| | return |
| | except Exception as load_err: |
| | print(f"Error loading audio with pydub: {traceback.format_exc()}") |
| | yield "error", f"Error loading audio file {original_audio_path.name}: {load_err}" |
| | return |
| |
|
| | duration_ms = len(audio) |
| | |
| | total_chunks = (duration_ms + AUDIO_CHUNK_DURATION_MS - 1) // AUDIO_CHUNK_DURATION_MS |
| | print(f"Audio loaded. Duration: {duration_ms / 1000:.2f}s. Splitting into {total_chunks} x {AUDIO_CHUNK_DURATION_MS / 60000:.1f}min chunks.") |
| | yield "status", f"Audio loaded ({duration_ms / 1000:.2f}s). Transcribing in {total_chunks} chunks (max {MAX_CONCURRENT_TRANSCRIPTIONS} concurrent)..." |
| |
|
| | transcript_pieces = [""] * total_chunks |
| | transcription_tasks = [] |
| |
|
| | |
| | for i in range(total_chunks): |
| | start_ms = i * AUDIO_CHUNK_DURATION_MS |
| | end_ms = min((i + 1) * AUDIO_CHUNK_DURATION_MS, duration_ms) |
| | chunk = audio[start_ms:end_ms] |
| |
|
| | with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_chunk_file: |
| | chunk_path = Path(temp_chunk_file.name) |
| | try: |
| | chunk.export(chunk_path, format="wav") |
| | except Exception as export_err: |
| | print(f"Error exporting chunk {i+1}: {traceback.format_exc()}") |
| | yield "error", f"Failed to create temporary audio chunk file: {export_err}" |
| | if chunk_path.exists(): os.remove(chunk_path) |
| | return |
| |
|
| | |
| | task = asyncio.create_task(self._transcribe_chunk(client, chunk_path, i, total_chunks, transcription_semaphore)) |
| | |
| | transcription_tasks.append((i, task)) |
| |
|
| | |
| | processed_chunks = 0 |
| | |
| | |
| | |
| |
|
| | |
| | temp_results = {} |
| | for index, task in transcription_tasks: |
| | try: |
| | result = await task |
| | temp_results[index] = result |
| | processed_chunks += 1 |
| | yield "status", f"Transcribed chunk {processed_chunks}/{total_chunks}..." |
| | |
| | if isinstance(result, str) and ("[CHUNK_ERROR: API Rate Limit Exceeded" in result or \ |
| | "[CHUNK_ERROR: API Permission Denied" in result or \ |
| | "[CHUNK_ERROR: API Quota Exceeded" in result): |
| | print(f"Critical API error in chunk {index + 1}, stopping transcription. Error: {result}") |
| | yield "error", f"Transcription stopped. Critical API error in chunk {index + 1}: {result.split('-', 1)[-1].strip()}" |
| | |
| | for j, other_task in transcription_tasks: |
| | if not other_task.done(): |
| | other_task.cancel() |
| | return |
| | except asyncio.CancelledError: |
| | print(f"Transcription task for chunk {index + 1} was cancelled.") |
| | temp_results[index] = "[CHUNK_ERROR: Cancelled]" |
| | |
| | if processed_chunks < total_chunks: |
| | yield "error", "Transcription process was cancelled." |
| | return |
| | except Exception as e: |
| | print(f"Error waiting for transcription task {index + 1}: {traceback.format_exc()}") |
| | temp_results[index] = f"[CHUNK_ERROR: Task Processing Failed - {str(e)}]" |
| | processed_chunks += 1 |
| |
|
| | |
| | transcript_pieces = [temp_results.get(i, "[CHUNK_ERROR: Missing Result]") for i in range(total_chunks)] |
| | full_transcript = " ".join(transcript_pieces).strip() |
| |
|
| | |
| | if not full_transcript or full_transcript.isspace() or all(s.startswith("[CHUNK_ERROR") for s in transcript_pieces if s): |
| | error_summary = " ".join(p for p in transcript_pieces if p.startswith("[CHUNK_ERROR")) |
| | print(f"Transcription failed or resulted in only errors. Summary: {error_summary}") |
| | yield "error", f"Failed to transcribe audio or all chunks failed. Errors: {error_summary[:200]}" |
| | return |
| |
|
| | print(f"Full transcript concatenated (length: {len(full_transcript)}).") |
| | yield "status", "Transcription complete. Generating content..." |
| |
|
| | |
| | generation_tasks = [] |
| | for key in PROMPT_KEYS: |
| | |
| | task = asyncio.create_task(self._generate_single_item(key, full_transcript, generation_semaphore)) |
| | generation_tasks.append(task) |
| |
|
| | generated_items = 0 |
| | total_items = len(PROMPT_KEYS) |
| | |
| | for future in asyncio.as_completed(generation_tasks): |
| | try: |
| | key, result = await future |
| | yield "progress", (key, result) |
| | generated_items += 1 |
| | |
| | yield "status", f"Generating content ({key} done, {generated_items}/{total_items} total)..." |
| | except asyncio.CancelledError: |
| | |
| | print("Content generation task was cancelled.") |
| | yield "error", "Content generation cancelled." |
| | return |
| | except Exception as e: |
| | print(f"Error processing completed generation task: {traceback.format_exc()}") |
| | yield "status", f"Error during content generation phase: {str(e)}" |
| | |
| | |
| | |
| |
|
| | yield "status", "All content generation tasks complete." |
| |
|
| | except FileNotFoundError as e: |
| | yield "error", f"File Error: {str(e)}" |
| | return |
| | except Exception as e: |
| | print(f"Error during transcription setup/chunking phase: {traceback.format_exc()}") |
| | yield "error", f"System Error during transcription setup: {str(e)}" |
| | return |
| |
|
| |
|
| | async def delete_uploaded_file(self, client: genai.Client, file_name: str, context: str): |
| | |
| | if not client or not file_name: |
| | |
| | return |
| | try: |
| | |
| | await asyncio.to_thread(client.files.delete, name=file_name) |
| | print(f"Successfully cleaned up Gemini file: {file_name} ({context})") |
| | except Exception as cleanup_err: |
| | if "not found" in str(cleanup_err).lower() or "404" in str(cleanup_err): |
| | pass |
| | |
| | else: |
| | print(f"Warning: Failed Gemini file cleanup for {file_name} ({context}): {cleanup_err}") |
| |
|
| |
|
| | |
| | async def _generate_single_item(self, key: str, transcript: str, semaphore: Semaphore) -> Tuple[str, str]: |
| | """Helper to generate one piece of content, respecting the semaphore.""" |
| | |
| | async with semaphore: |
| | print(f"Semaphore acquired for generating: {key}. Calling API...") |
| | result = await self.generator.generate_content(ContentRequest(key), transcript) |
| | print(f"Finished generation task for: {key}. Semaphore released.") |
| | |
| | return key, result |
| |
|
| | def update_prompts(self, *values) -> str: |
| | |
| | return self.generator.update_prompts(*values) |
| |
|
| |
|
| | |
| |
|
| | def create_interface(): |
| | """Create the Gradio interface (UI definition identical to last version).""" |
| | processor = TranscriptProcessor() |
| |
|
| | key_titles = "titles_and_thumbnails" |
| | key_desc = "description" |
| | key_previews = "previews" |
| | key_clips = "clips" |
| | key_timestamps = "timestamps" |
| | display_titles = PROMPT_DISPLAY_NAMES[key_titles] |
| | display_desc = PROMPT_DISPLAY_NAMES[key_desc] |
| | display_previews = PROMPT_DISPLAY_NAMES[key_previews] |
| | display_clips = PROMPT_DISPLAY_NAMES[key_clips] |
| | display_timestamps = PROMPT_DISPLAY_NAMES[key_timestamps] |
| |
|
| | with gr.Blocks(title="Gemini Podcast Content Generator") as app: |
| | gr.Markdown( |
| | """ |
| | # Gemini Podcast Content Generator |
| | Generate social media content from podcast audio using Gemini. |
| | Enter your Google API key below and upload an audio file. |
| | Audio will be processed in larger (~30min) chunks with controlled concurrency. |
| | """ |
| | ) |
| |
|
| | with gr.Tab("Generate Content"): |
| | google_api_key_input = gr.Textbox( |
| | label="Google API Key", type="password", |
| | placeholder="Enter your Google API Key here", |
| | info="Your GCP account needs to have billing enabled to use the 2.5 pro model." |
| | ) |
| | input_audio = gr.File( |
| | label="Upload Audio File", file_count="single", |
| | file_types=["audio", ".mp3", ".wav", ".ogg", ".flac", ".m4a", ".aac"] |
| | ) |
| | submit_btn = gr.Button("Generate with Gemini", variant="huggingface") |
| |
|
| | gr.Markdown("### Processing Status") |
| | output_status = gr.Textbox(label="Current Status", value="Idle.", interactive=False, lines=1, max_lines=5) |
| |
|
| | gr.Markdown(f"### {display_titles}") |
| | output_titles = gr.Textbox(label="", value="...", interactive=False, lines=3, max_lines=10) |
| |
|
| | gr.Markdown(f"### {display_desc}") |
| | output_desc = gr.Textbox(label="", value="...", interactive=False, lines=3, max_lines=10) |
| |
|
| | gr.Markdown(f"### {display_previews}") |
| | output_previews = gr.Textbox(label="", value="...", interactive=False, lines=3, max_lines=10) |
| |
|
| | gr.Markdown(f"### {display_clips}") |
| | output_clips = gr.Textbox(label="", value="...", interactive=False, lines=3, max_lines=10) |
| |
|
| | gr.Markdown(f"### {display_timestamps}") |
| | output_timestamps = gr.Textbox(label="", value="...", interactive=False, lines=3, max_lines=10) |
| |
|
| | outputs_list = [ |
| | output_status, |
| | output_titles, output_desc, output_previews, |
| | output_clips, output_timestamps |
| | ] |
| | results_component_map = { |
| | key_titles: output_titles, key_desc: output_desc, key_previews: output_previews, |
| | key_clips: output_clips, key_timestamps: output_timestamps |
| | } |
| |
|
| | |
| | async def process_wrapper(google_key, audio_file_obj, progress=gr.Progress(track_tqdm=True)): |
| | print("Started Processing...") |
| | initial_updates = { |
| | output_status: gr.update(value="Initiating..."), |
| | output_titles: gr.update(value="β³ Pending..."), |
| | output_desc: gr.update(value="β³ Pending..."), |
| | output_previews: gr.update(value="β³ Pending..."), |
| | output_clips: gr.update(value="β³ Pending..."), |
| | output_timestamps: gr.update(value="β³ Pending..."), |
| | } |
| | yield initial_updates |
| |
|
| | if not google_key: |
| | yield {output_status: gr.update(value="π Error: Missing Google API Key.")} |
| | return |
| | if not audio_file_obj: |
| | yield {output_status: gr.update(value="π Error: No audio file uploaded.")} |
| | return |
| |
|
| | masked_key = f"{'*'*(len(google_key)-4)}{google_key[-4:]}" if len(google_key) > 4 else "****" |
| | print(f"Using Google Key: {masked_key}") |
| | print(f"Audio file: Name='{getattr(audio_file_obj, 'name', 'N/A')}'") |
| | client: genai.Client | None = None |
| | try: |
| | yield {output_status: gr.update(value="β³ Initializing Gemini Client...")} |
| | client = await asyncio.to_thread(genai.Client, api_key=google_key) |
| | print("Gemini Client initialized successfully.") |
| | yield {output_status: gr.update(value="β
Client Initialized.")} |
| | except Exception as e: |
| | error_msg = f"π Error: Failed Client Initialization: {e}" |
| | print(f"Client Init Error: {traceback.format_exc()}") |
| | yield {output_status: gr.update(value=error_msg)} |
| | return |
| |
|
| | updates_to_yield = {} |
| | try: |
| | |
| | async for update_type, data in processor.process_transcript(client, audio_file_obj): |
| | updates_to_yield = {} |
| | if update_type == "status": |
| | updates_to_yield[output_status] = gr.update(value=f"β³ {data}") |
| | elif update_type == "progress": |
| | key, result = data |
| | component_to_update = results_component_map.get(key) |
| | if component_to_update: |
| | ui_result = "" |
| | if isinstance(result, str) and result.startswith("ERROR_"): |
| | |
| | if result.startswith("ERROR_RATE_LIMIT"): |
| | ui_result = f"β Error (Rate Limit):\n{result.split(':', 1)[-1].strip()}" |
| | else: |
| | try: |
| | error_type, error_detail = result.split(':', 1) |
| | error_type_display = error_type.replace('ERROR_', '').replace('_', ' ').title() |
| | ui_result = f"β Error ({error_type_display}):\n{error_detail.strip()}" |
| | except ValueError: |
| | ui_result = f"β Error:\n{result}" |
| | else: |
| | ui_result = str(result) |
| | updates_to_yield[component_to_update] = gr.update(value=ui_result) |
| | else: |
| | print(f"Warning: No UI component mapped for result key '{key}'") |
| | elif update_type == "error": |
| | error_message = f"π Processing Error: {data}" |
| | updates_to_yield[output_status] = gr.update(value=error_message) |
| | yield updates_to_yield |
| | return |
| |
|
| | if updates_to_yield: |
| | yield updates_to_yield |
| |
|
| | final_success_update = {output_status: gr.update(value="β
Processing Complete.")} |
| | final_success_update.update(updates_to_yield) |
| | yield final_success_update |
| | print("Process wrapper finished successfully.") |
| |
|
| | except Exception as e: |
| | print(f"Error in process_wrapper async loop: {traceback.format_exc()}") |
| | error_msg = f"π Unexpected wrapper error: {e}" |
| | yield {output_status: gr.update(value=error_msg)} |
| |
|
| | submit_btn.click( |
| | fn=process_wrapper, |
| | inputs=[google_api_key_input, input_audio], |
| | outputs=outputs_list |
| | ) |
| |
|
| | with gr.Tab("Customize Prompts"): |
| | |
| | gr.Markdown("## Customize Generation Prompts") |
| | prompt_inputs = [] |
| | default_prompts = processor.generator.current_prompts |
| | for key in PROMPT_KEYS: |
| | display_name = PROMPT_DISPLAY_NAMES.get(key, key.replace('_', ' ').title()) |
| | default_value = default_prompts.get(key, "") |
| | prompt_inputs.append(gr.Textbox(label=f"{display_name} Prompt", lines=10, value=default_value or "")) |
| |
|
| | status_prompt_tab = gr.Textbox(label="Status", interactive=False) |
| | update_btn = gr.Button("Update Session Prompts") |
| | update_btn.click(fn=processor.update_prompts, inputs=prompt_inputs, outputs=[status_prompt_tab]) |
| |
|
| | reset_btn = gr.Button("Reset to Default Prompts") |
| | def reset_prompts_ui(): |
| | try: |
| | defaults = processor.generator._load_default_prompts() |
| | if any(isinstance(v, str) and v.startswith("ERROR:") for v in defaults.values()): raise ValueError("Failed to load one or more default prompts.") |
| | processor.generator.current_prompts = defaults |
| | updates = {status_prompt_tab: gr.update(value="Prompts reset to defaults!")} |
| | for i, key in enumerate(PROMPT_KEYS): |
| | updates[prompt_inputs[i]] = gr.update(value=defaults.get(key, "")) |
| | return updates |
| | except Exception as e: |
| | print(f"Error during prompt reset: {e}") |
| | return {status_prompt_tab: gr.update(value=f"Error resetting prompts: {str(e)}")} |
| |
|
| | reset_btn.click( |
| | fn=reset_prompts_ui, |
| | inputs=None, |
| | outputs=[status_prompt_tab] + prompt_inputs |
| | ) |
| |
|
| | return app |
| |
|
| | |
| | if __name__ == "__main__": |
| | if AudioSegment is None: |
| | print("\nFATAL ERROR: pydub is required but could not be imported.") |
| | print("Please install it ('pip install pydub') and ensure ffmpeg is available.") |
| | print("Application cannot start correctly.") |
| | exit(1) |
| |
|
| | Path("prompts").mkdir(exist_ok=True) |
| | Path("data").mkdir(exist_ok=True) |
| | _prompt_dir = Path("prompts") |
| | for key in PROMPT_KEYS: |
| | prompt_file = _prompt_dir / f"{key}.txt" |
| | if not prompt_file.exists(): |
| | |
| | default_content = f"This is the default placeholder prompt for {PROMPT_DISPLAY_NAMES[key]}. Process the transcript provided. Important: Generate the response as plain text only. Do not use any Markdown formatting (no '#', '*', '_', list formatting, bolding, etc.)." |
| | if key == "titles_and_thumbnails": default_content += "\n\nExamples:\n{title_examples}" |
| | elif key == "description": default_content += "\n\nExamples:\n{description_examples}" |
| | elif key == "clips": default_content += "\n\nExamples:\n{clip_examples}" |
| | elif key == "timestamps": default_content += "\n\nExamples:\n{timestamps_examples}" |
| | prompt_file.write_text(default_content, encoding='utf-8') |
| | print(f"Created dummy prompt file: {prompt_file}") |
| |
|
| | print("Starting Gradio application...") |
| | app = create_interface() |
| | app.launch() |
| | print("Gradio application stopped.") |