YouTube_Creator_MetaData / gemini_helper.py
@woai
🧹 Major code cleanup and internationalization - Remove Russian comments/strings, translate UI to English, clean linter errors, remove hardcoded tokens, delete test files. Ready for production deployment
e775565
raw
history blame
15.6 kB
import os
from google import genai
from google.genai import types
from google.api_core import retry
from dotenv import load_dotenv
from typing import List, Dict, Any, Optional
import traceback
# Load environment variables
load_dotenv()
# Get Gemini API key from environment variables
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
print(f"GEMINI_API_KEY is set: {'Yes' if GEMINI_API_KEY else 'No'}")
# Initialize Gemini API
client = None
if GEMINI_API_KEY:
try:
client = genai.Client(api_key=GEMINI_API_KEY)
print("Gemini client successfully initialized")
# Configure retry logic for API errors
def is_retriable(e):
return (isinstance(e, Exception) and
(hasattr(e, 'code') and e.code in {429, 503}))
# Apply retry to generate_content method
if hasattr(client.aio.models, 'generate_content'):
original_method = client.aio.models.generate_content
client.aio.models.generate_content = retry.Retry(
predicate=is_retriable,
initial=1.0, # Initial delay in seconds
maximum=60.0, # Maximum delay in seconds
multiplier=2.0, # Backoff multiplier
deadline=300.0 # Total timeout in seconds
)(original_method)
print("Retry logic configured for Gemini API")
except Exception as e:
print(f"Error initializing Gemini client: {str(e)}")
traceback.print_exc()
else:
print("WARNING: Gemini API key not configured. LLM timecode generation functions will be unavailable.")
# Default Gemini model
DEFAULT_MODEL = "gemini-2.0-flash-001"
# Alternative models if main one doesn't work
ALTERNATIVE_MODELS = ["gemini-1.5-flash-001"]
def format_transcript_for_prompt(transcript_entries: List[Dict[str, Any]], video_duration_seconds: int = None) -> str:
"""Formats transcript for passing to prompt."""
formatted_transcript = ""
# Determine maximum time in transcript if video duration is not provided
if video_duration_seconds is None:
if transcript_entries:
last_entry = transcript_entries[-1]
# Handle both dict format and FetchedTranscriptSnippet objects
if hasattr(last_entry, 'start'): # FetchedTranscriptSnippet object
max_time = last_entry.start + last_entry.duration
elif isinstance(last_entry, dict): # Dict format
max_time = last_entry.get("start", 0) + last_entry.get("duration", 0)
else:
max_time = 0
video_duration_seconds = int(max_time) + 10 # Add small buffer
# For very long videos (>60 min), sample transcript to ensure full coverage
if video_duration_seconds and video_duration_seconds > 3600: # More than 60 minutes
# Sample every 3rd entry to reduce size but maintain coverage
sampled_entries = transcript_entries[::3]
print(f"Sampled transcript: {len(sampled_entries)} entries from {len(transcript_entries)} total")
elif video_duration_seconds and video_duration_seconds > 1800: # More than 30 minutes
# Sample every 2nd entry
sampled_entries = transcript_entries[::2]
print(f"Sampled transcript: {len(sampled_entries)} entries from {len(transcript_entries)} total")
else:
sampled_entries = transcript_entries
for entry in sampled_entries:
# Handle both dict format and FetchedTranscriptSnippet objects
if hasattr(entry, 'start'): # FetchedTranscriptSnippet object
start_time = entry.start
text = entry.text
elif isinstance(entry, dict): # Dict format
start_time = entry.get("start", 0)
text = entry.get("text", "")
else:
continue # Skip invalid entries
# Check that time doesn't exceed total video duration
if video_duration_seconds and start_time > video_duration_seconds:
continue
# Format time in hours:minutes:seconds format
time_str = format_time_hms(start_time)
formatted_transcript += f"[{time_str}] {text}\n"
return formatted_transcript
def format_time_hms(seconds: float) -> str:
"""
Formats time in seconds to hours:minutes:seconds format.
For videos shorter than an hour, uses minutes:seconds format.
"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
if hours > 0:
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
else:
return f"{minutes:02d}:{secs:02d}"
def get_timecode_prompt(video_title: str, transcript: str, format_type: str = "youtube", language: str = None, video_duration_minutes: int = None, timecode_count: str = None, interval_text: str = None) -> str:
"""Creates prompt for generating timecodes based on transcript."""
# Determine prompt language based on video language
if language and (language.lower().startswith('uk') or language.lower().startswith('ua')):
target_language = "Ukrainian"
example_description = "Discussion of main principles"
elif language and language.lower().startswith('ru'):
target_language = "Russian"
example_description = "Discussion of main principles"
else:
target_language = "the same language as the video transcript"
example_description = "Discussion of main principles"
prompt = f"""
You are a YouTube assistant. Analyze the FULL TRANSCRIPT below and identify all major topic shifts or sections.
Your task:
- Generate timestamps that cover the ENTIRE {video_duration_minutes}-minute video
- Each timestamp must be paired with a precise time from the transcript
- Timestamps must reflect the actual content flow throughout the video
Format requirements:
- Plain text output ONLY
- Each line format: MM:SS Topic description (or HH:MM:SS for longer videos)
- Use {target_language} for descriptions (3-6 words each)
- Start with early timestamp (first few minutes)
- End with late timestamp (last 10-15 minutes of video)
- NO explanations, NO numbering, NO extra text
CRITICAL: The transcript below spans {video_duration_minutes} minutes. You MUST create timestamps that span from beginning to end, not just the first portion.
Full transcript to analyze:
{transcript}
Generate {timecode_count} timestamps covering the complete {video_duration_minutes}-minute duration:
"""
return prompt
async def generate_timecodes_with_gemini(
transcript_entries: List[Dict[str, Any]],
video_title: str,
format_type: str = "youtube",
model_name: Optional[str] = None,
language: Optional[str] = None
) -> Dict[str, Any]:
"""
Generates timecodes using Gemini based on transcript.
Args:
transcript_entries: List of transcript entries
video_title: Video title
format_type: Timecode format (youtube, markdown)
model_name: Gemini model name (defaults to DEFAULT_MODEL)
language: Transcript language (if known)
Returns:
Dictionary with generation results
"""
if not GEMINI_API_KEY or client is None:
return {
"error": "Gemini API key is not configured. Please add GEMINI_API_KEY to .env file"
}
try:
print(f"Starting timecode generation with model: {model_name or DEFAULT_MODEL}")
# Determine transcript language if not provided
detected_language = language
if not detected_language:
# Simple heuristic for language detection from first 10 segments
# Handle both dict format and FetchedTranscriptSnippet objects
text_sample_parts = []
for entry in transcript_entries[:10]:
if hasattr(entry, 'text'): # FetchedTranscriptSnippet object
text_sample_parts.append(entry.text)
elif isinstance(entry, dict): # Dict format
text_sample_parts.append(entry.get("text", ""))
text_sample = " ".join(text_sample_parts)
# Set of Ukrainian letters that differ from Russian alphabet
ukrainian_specific = set("ґєії")
# If there's at least one specific Ukrainian letter
if any(char in ukrainian_specific for char in text_sample.lower()):
detected_language = "uk"
print("Detected transcript language: Ukrainian")
# Check for Cyrillic in general
elif any(ord('а') <= ord(char) <= ord('я') for char in text_sample.lower()):
detected_language = "ru"
print("Detected transcript language: Russian")
else:
detected_language = "en"
print("Detected transcript language: English (or other)")
# Determine video duration (in seconds and minutes)
video_duration_seconds = 0
max_timecodes = 30 # Default value
if transcript_entries:
last_entry = transcript_entries[-1]
# Handle both dict format and FetchedTranscriptSnippet objects
if hasattr(last_entry, 'start'): # FetchedTranscriptSnippet object
video_duration_seconds = last_entry.start + last_entry.duration
elif isinstance(last_entry, dict): # Dict format
video_duration_seconds = last_entry.get("start", 0) + last_entry.get("duration", 0)
video_duration_minutes = int(video_duration_seconds / 60)
print(f"Determined video duration: {video_duration_minutes} minutes ({video_duration_seconds} seconds)")
# Set max_timecodes based on video duration
if video_duration_minutes <= 30:
max_timecodes = 20
elif video_duration_minutes <= 60:
max_timecodes = 35
elif video_duration_minutes <= 120:
max_timecodes = 50
else:
max_timecodes = 60
else:
video_duration_minutes = None
# Determine number of timecodes based on video duration
if video_duration_minutes:
if video_duration_minutes <= 30:
timecode_count = "8-12"
max_timecodes = 15
elif video_duration_minutes <= 60:
timecode_count = "12-18"
max_timecodes = 20
elif video_duration_minutes <= 120:
timecode_count = "18-25"
max_timecodes = 30
else:
timecode_count = "25-35"
max_timecodes = 40
else:
timecode_count = "10-15"
max_timecodes = 20
# Format transcript for prompt
formatted_transcript = format_transcript_for_prompt(transcript_entries, video_duration_seconds)
# Create prompt considering language and duration
# Calculate recommended interval for timestamps
if video_duration_minutes and timecode_count:
target_count = int(timecode_count.split('-')[0]) if timecode_count.split('-')[0].isdigit() else 20
interval_minutes = video_duration_minutes // target_count
interval_text = f"approximately every {interval_minutes}-{interval_minutes + 2} minutes"
else:
interval_text = "evenly throughout the video"
prompt = get_timecode_prompt(
video_title,
formatted_transcript,
format_type,
detected_language,
video_duration_minutes,
timecode_count,
interval_text
)
print(f"Prompt prepared, length: {len(prompt)} characters")
# List of models to try
models_to_try = [model_name or DEFAULT_MODEL] + [m for m in ALTERNATIVE_MODELS if m != (model_name or DEFAULT_MODEL)]
last_error = None
for current_model in models_to_try:
try:
# Use async API client for content generation
print(f"Making request to Gemini API with model {current_model}...")
response = await client.aio.models.generate_content(
model=current_model,
contents=prompt,
config=types.GenerateContentConfig(
temperature=0.2, # Low temperature for more deterministic results
max_output_tokens=2048, # Enough for timecode list
)
)
print(f"Response received: {type(response)}")
# Get response text
timecodes_text = response.text
print(f"Response text length: {len(timecodes_text)}")
# Split into lines and clean
timecodes = [line.strip() for line in timecodes_text.split('\n') if line.strip()]
# Filter timecodes to remove "video start" and "video end"
filtered_timecodes = []
for tc in timecodes:
# Extract description (everything after time)
parts = tc.split(" ", 1)
if len(parts) > 1:
time_part, description = parts
# Skip timecodes with "video start" or "video end"
lowercase_desc = description.lower()
if any(phrase in lowercase_desc for phrase in [
"video start", "video end", "start of video", "end of video",
"beginning", "conclusion", "intro", "outro"
]):
continue
filtered_timecodes.append(tc)
# If too many timecodes, select evenly distributed ones
if len(filtered_timecodes) > max_timecodes:
print(f"Too many timecodes ({len(filtered_timecodes)}), reducing to {max_timecodes}")
# Calculate step for selecting timecodes evenly
step = len(filtered_timecodes) / max_timecodes
# Select indices for timecodes
indices = [int(i * step) for i in range(max_timecodes)]
# Ensure we have first and last timecode
if indices[-1] != len(filtered_timecodes) - 1:
indices[-1] = len(filtered_timecodes) - 1
# Select timecodes by indices
final_timecodes = [filtered_timecodes[i] for i in indices]
else:
final_timecodes = filtered_timecodes
print(f"Final timecodes count after processing: {len(final_timecodes)}")
return {
"timecodes": final_timecodes,
"format": format_type,
"model": current_model,
"video_title": video_title,
"detected_language": detected_language,
"video_duration_minutes": video_duration_minutes
}
except Exception as api_error:
print(f"Error with model {current_model}: {str(api_error)}")
traceback.print_exc()
last_error = api_error
continue
# If all models failed
return {
"error": f"Failed to execute request with any model. Last error: {str(last_error)}"
}
except Exception as e:
print(f"General error: {str(e)}")
traceback.print_exc()
return {
"error": f"Error generating timecodes with Gemini: {str(e)}"
}