@woai
🧹 Major code cleanup and internationalization - Remove Russian comments/strings, translate UI to English, clean linter errors, remove hardcoded tokens, delete test files. Ready for production deployment
e775565
| import os | |
| from google import genai | |
| from google.genai import types | |
| from google.api_core import retry | |
| from dotenv import load_dotenv | |
| from typing import List, Dict, Any, Optional | |
| import traceback | |
| # Load environment variables | |
| load_dotenv() | |
| # Get Gemini API key from environment variables | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
| print(f"GEMINI_API_KEY is set: {'Yes' if GEMINI_API_KEY else 'No'}") | |
| # Initialize Gemini API | |
| client = None | |
| if GEMINI_API_KEY: | |
| try: | |
| client = genai.Client(api_key=GEMINI_API_KEY) | |
| print("Gemini client successfully initialized") | |
| # Configure retry logic for API errors | |
| def is_retriable(e): | |
| return (isinstance(e, Exception) and | |
| (hasattr(e, 'code') and e.code in {429, 503})) | |
| # Apply retry to generate_content method | |
| if hasattr(client.aio.models, 'generate_content'): | |
| original_method = client.aio.models.generate_content | |
| client.aio.models.generate_content = retry.Retry( | |
| predicate=is_retriable, | |
| initial=1.0, # Initial delay in seconds | |
| maximum=60.0, # Maximum delay in seconds | |
| multiplier=2.0, # Backoff multiplier | |
| deadline=300.0 # Total timeout in seconds | |
| )(original_method) | |
| print("Retry logic configured for Gemini API") | |
| except Exception as e: | |
| print(f"Error initializing Gemini client: {str(e)}") | |
| traceback.print_exc() | |
| else: | |
| print("WARNING: Gemini API key not configured. LLM timecode generation functions will be unavailable.") | |
| # Default Gemini model | |
| DEFAULT_MODEL = "gemini-2.0-flash-001" | |
| # Alternative models if main one doesn't work | |
| ALTERNATIVE_MODELS = ["gemini-1.5-flash-001"] | |
| def format_transcript_for_prompt(transcript_entries: List[Dict[str, Any]], video_duration_seconds: int = None) -> str: | |
| """Formats transcript for passing to prompt.""" | |
| formatted_transcript = "" | |
| # Determine maximum time in transcript if video duration is not provided | |
| if video_duration_seconds is None: | |
| if transcript_entries: | |
| last_entry = transcript_entries[-1] | |
| # Handle both dict format and FetchedTranscriptSnippet objects | |
| if hasattr(last_entry, 'start'): # FetchedTranscriptSnippet object | |
| max_time = last_entry.start + last_entry.duration | |
| elif isinstance(last_entry, dict): # Dict format | |
| max_time = last_entry.get("start", 0) + last_entry.get("duration", 0) | |
| else: | |
| max_time = 0 | |
| video_duration_seconds = int(max_time) + 10 # Add small buffer | |
| # For very long videos (>60 min), sample transcript to ensure full coverage | |
| if video_duration_seconds and video_duration_seconds > 3600: # More than 60 minutes | |
| # Sample every 3rd entry to reduce size but maintain coverage | |
| sampled_entries = transcript_entries[::3] | |
| print(f"Sampled transcript: {len(sampled_entries)} entries from {len(transcript_entries)} total") | |
| elif video_duration_seconds and video_duration_seconds > 1800: # More than 30 minutes | |
| # Sample every 2nd entry | |
| sampled_entries = transcript_entries[::2] | |
| print(f"Sampled transcript: {len(sampled_entries)} entries from {len(transcript_entries)} total") | |
| else: | |
| sampled_entries = transcript_entries | |
| for entry in sampled_entries: | |
| # Handle both dict format and FetchedTranscriptSnippet objects | |
| if hasattr(entry, 'start'): # FetchedTranscriptSnippet object | |
| start_time = entry.start | |
| text = entry.text | |
| elif isinstance(entry, dict): # Dict format | |
| start_time = entry.get("start", 0) | |
| text = entry.get("text", "") | |
| else: | |
| continue # Skip invalid entries | |
| # Check that time doesn't exceed total video duration | |
| if video_duration_seconds and start_time > video_duration_seconds: | |
| continue | |
| # Format time in hours:minutes:seconds format | |
| time_str = format_time_hms(start_time) | |
| formatted_transcript += f"[{time_str}] {text}\n" | |
| return formatted_transcript | |
| def format_time_hms(seconds: float) -> str: | |
| """ | |
| Formats time in seconds to hours:minutes:seconds format. | |
| For videos shorter than an hour, uses minutes:seconds format. | |
| """ | |
| hours = int(seconds // 3600) | |
| minutes = int((seconds % 3600) // 60) | |
| secs = int(seconds % 60) | |
| if hours > 0: | |
| return f"{hours:02d}:{minutes:02d}:{secs:02d}" | |
| else: | |
| return f"{minutes:02d}:{secs:02d}" | |
| def get_timecode_prompt(video_title: str, transcript: str, format_type: str = "youtube", language: str = None, video_duration_minutes: int = None, timecode_count: str = None, interval_text: str = None) -> str: | |
| """Creates prompt for generating timecodes based on transcript.""" | |
| # Determine prompt language based on video language | |
| if language and (language.lower().startswith('uk') or language.lower().startswith('ua')): | |
| target_language = "Ukrainian" | |
| example_description = "Discussion of main principles" | |
| elif language and language.lower().startswith('ru'): | |
| target_language = "Russian" | |
| example_description = "Discussion of main principles" | |
| else: | |
| target_language = "the same language as the video transcript" | |
| example_description = "Discussion of main principles" | |
| prompt = f""" | |
| You are a YouTube assistant. Analyze the FULL TRANSCRIPT below and identify all major topic shifts or sections. | |
| Your task: | |
| - Generate timestamps that cover the ENTIRE {video_duration_minutes}-minute video | |
| - Each timestamp must be paired with a precise time from the transcript | |
| - Timestamps must reflect the actual content flow throughout the video | |
| Format requirements: | |
| - Plain text output ONLY | |
| - Each line format: MM:SS Topic description (or HH:MM:SS for longer videos) | |
| - Use {target_language} for descriptions (3-6 words each) | |
| - Start with early timestamp (first few minutes) | |
| - End with late timestamp (last 10-15 minutes of video) | |
| - NO explanations, NO numbering, NO extra text | |
| CRITICAL: The transcript below spans {video_duration_minutes} minutes. You MUST create timestamps that span from beginning to end, not just the first portion. | |
| Full transcript to analyze: | |
| {transcript} | |
| Generate {timecode_count} timestamps covering the complete {video_duration_minutes}-minute duration: | |
| """ | |
| return prompt | |
| async def generate_timecodes_with_gemini( | |
| transcript_entries: List[Dict[str, Any]], | |
| video_title: str, | |
| format_type: str = "youtube", | |
| model_name: Optional[str] = None, | |
| language: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Generates timecodes using Gemini based on transcript. | |
| Args: | |
| transcript_entries: List of transcript entries | |
| video_title: Video title | |
| format_type: Timecode format (youtube, markdown) | |
| model_name: Gemini model name (defaults to DEFAULT_MODEL) | |
| language: Transcript language (if known) | |
| Returns: | |
| Dictionary with generation results | |
| """ | |
| if not GEMINI_API_KEY or client is None: | |
| return { | |
| "error": "Gemini API key is not configured. Please add GEMINI_API_KEY to .env file" | |
| } | |
| try: | |
| print(f"Starting timecode generation with model: {model_name or DEFAULT_MODEL}") | |
| # Determine transcript language if not provided | |
| detected_language = language | |
| if not detected_language: | |
| # Simple heuristic for language detection from first 10 segments | |
| # Handle both dict format and FetchedTranscriptSnippet objects | |
| text_sample_parts = [] | |
| for entry in transcript_entries[:10]: | |
| if hasattr(entry, 'text'): # FetchedTranscriptSnippet object | |
| text_sample_parts.append(entry.text) | |
| elif isinstance(entry, dict): # Dict format | |
| text_sample_parts.append(entry.get("text", "")) | |
| text_sample = " ".join(text_sample_parts) | |
| # Set of Ukrainian letters that differ from Russian alphabet | |
| ukrainian_specific = set("ґєії") | |
| # If there's at least one specific Ukrainian letter | |
| if any(char in ukrainian_specific for char in text_sample.lower()): | |
| detected_language = "uk" | |
| print("Detected transcript language: Ukrainian") | |
| # Check for Cyrillic in general | |
| elif any(ord('а') <= ord(char) <= ord('я') for char in text_sample.lower()): | |
| detected_language = "ru" | |
| print("Detected transcript language: Russian") | |
| else: | |
| detected_language = "en" | |
| print("Detected transcript language: English (or other)") | |
| # Determine video duration (in seconds and minutes) | |
| video_duration_seconds = 0 | |
| max_timecodes = 30 # Default value | |
| if transcript_entries: | |
| last_entry = transcript_entries[-1] | |
| # Handle both dict format and FetchedTranscriptSnippet objects | |
| if hasattr(last_entry, 'start'): # FetchedTranscriptSnippet object | |
| video_duration_seconds = last_entry.start + last_entry.duration | |
| elif isinstance(last_entry, dict): # Dict format | |
| video_duration_seconds = last_entry.get("start", 0) + last_entry.get("duration", 0) | |
| video_duration_minutes = int(video_duration_seconds / 60) | |
| print(f"Determined video duration: {video_duration_minutes} minutes ({video_duration_seconds} seconds)") | |
| # Set max_timecodes based on video duration | |
| if video_duration_minutes <= 30: | |
| max_timecodes = 20 | |
| elif video_duration_minutes <= 60: | |
| max_timecodes = 35 | |
| elif video_duration_minutes <= 120: | |
| max_timecodes = 50 | |
| else: | |
| max_timecodes = 60 | |
| else: | |
| video_duration_minutes = None | |
| # Determine number of timecodes based on video duration | |
| if video_duration_minutes: | |
| if video_duration_minutes <= 30: | |
| timecode_count = "8-12" | |
| max_timecodes = 15 | |
| elif video_duration_minutes <= 60: | |
| timecode_count = "12-18" | |
| max_timecodes = 20 | |
| elif video_duration_minutes <= 120: | |
| timecode_count = "18-25" | |
| max_timecodes = 30 | |
| else: | |
| timecode_count = "25-35" | |
| max_timecodes = 40 | |
| else: | |
| timecode_count = "10-15" | |
| max_timecodes = 20 | |
| # Format transcript for prompt | |
| formatted_transcript = format_transcript_for_prompt(transcript_entries, video_duration_seconds) | |
| # Create prompt considering language and duration | |
| # Calculate recommended interval for timestamps | |
| if video_duration_minutes and timecode_count: | |
| target_count = int(timecode_count.split('-')[0]) if timecode_count.split('-')[0].isdigit() else 20 | |
| interval_minutes = video_duration_minutes // target_count | |
| interval_text = f"approximately every {interval_minutes}-{interval_minutes + 2} minutes" | |
| else: | |
| interval_text = "evenly throughout the video" | |
| prompt = get_timecode_prompt( | |
| video_title, | |
| formatted_transcript, | |
| format_type, | |
| detected_language, | |
| video_duration_minutes, | |
| timecode_count, | |
| interval_text | |
| ) | |
| print(f"Prompt prepared, length: {len(prompt)} characters") | |
| # List of models to try | |
| models_to_try = [model_name or DEFAULT_MODEL] + [m for m in ALTERNATIVE_MODELS if m != (model_name or DEFAULT_MODEL)] | |
| last_error = None | |
| for current_model in models_to_try: | |
| try: | |
| # Use async API client for content generation | |
| print(f"Making request to Gemini API with model {current_model}...") | |
| response = await client.aio.models.generate_content( | |
| model=current_model, | |
| contents=prompt, | |
| config=types.GenerateContentConfig( | |
| temperature=0.2, # Low temperature for more deterministic results | |
| max_output_tokens=2048, # Enough for timecode list | |
| ) | |
| ) | |
| print(f"Response received: {type(response)}") | |
| # Get response text | |
| timecodes_text = response.text | |
| print(f"Response text length: {len(timecodes_text)}") | |
| # Split into lines and clean | |
| timecodes = [line.strip() for line in timecodes_text.split('\n') if line.strip()] | |
| # Filter timecodes to remove "video start" and "video end" | |
| filtered_timecodes = [] | |
| for tc in timecodes: | |
| # Extract description (everything after time) | |
| parts = tc.split(" ", 1) | |
| if len(parts) > 1: | |
| time_part, description = parts | |
| # Skip timecodes with "video start" or "video end" | |
| lowercase_desc = description.lower() | |
| if any(phrase in lowercase_desc for phrase in [ | |
| "video start", "video end", "start of video", "end of video", | |
| "beginning", "conclusion", "intro", "outro" | |
| ]): | |
| continue | |
| filtered_timecodes.append(tc) | |
| # If too many timecodes, select evenly distributed ones | |
| if len(filtered_timecodes) > max_timecodes: | |
| print(f"Too many timecodes ({len(filtered_timecodes)}), reducing to {max_timecodes}") | |
| # Calculate step for selecting timecodes evenly | |
| step = len(filtered_timecodes) / max_timecodes | |
| # Select indices for timecodes | |
| indices = [int(i * step) for i in range(max_timecodes)] | |
| # Ensure we have first and last timecode | |
| if indices[-1] != len(filtered_timecodes) - 1: | |
| indices[-1] = len(filtered_timecodes) - 1 | |
| # Select timecodes by indices | |
| final_timecodes = [filtered_timecodes[i] for i in indices] | |
| else: | |
| final_timecodes = filtered_timecodes | |
| print(f"Final timecodes count after processing: {len(final_timecodes)}") | |
| return { | |
| "timecodes": final_timecodes, | |
| "format": format_type, | |
| "model": current_model, | |
| "video_title": video_title, | |
| "detected_language": detected_language, | |
| "video_duration_minutes": video_duration_minutes | |
| } | |
| except Exception as api_error: | |
| print(f"Error with model {current_model}: {str(api_error)}") | |
| traceback.print_exc() | |
| last_error = api_error | |
| continue | |
| # If all models failed | |
| return { | |
| "error": f"Failed to execute request with any model. Last error: {str(last_error)}" | |
| } | |
| except Exception as e: | |
| print(f"General error: {str(e)}") | |
| traceback.print_exc() | |
| return { | |
| "error": f"Error generating timecodes with Gemini: {str(e)}" | |
| } |