import gradio as gr import sqlite3 import json import requests from datetime import datetime, timedelta, timezone from typing import List, Dict, Any, Optional import google.generativeai as genai from googleapiclient.discovery import build import pandas as pd import re from collections import defaultdict import base64 # Do NOT auto-load API keys from environment; keys should be provided by the user via the UI. # Keep variables here so other functions can reference them after the user provides keys. YOUTUBE_API_KEY: Optional[str] = None GEMINI_API_KEY: Optional[str] = None model = None youtube = None def set_api_keys(youtube_key: Optional[str], gemini_key: Optional[str]) -> tuple[str, str, str]: """Apply API keys provided by the user at runtime. This will configure the Gemini client and the YouTube Data API client so the rest of the app uses the provided keys instead of environment vars. """ global YOUTUBE_API_KEY, GEMINI_API_KEY, model, youtube messages = [] # Configure Gemini (Generative AI) if gemini_key: try: genai.configure(api_key=gemini_key) model = genai.GenerativeModel('gemini-2.5-flash') GEMINI_API_KEY = gemini_key messages.append("Gemini API key applied successfully.") except Exception as e: messages.append(f"Failed to apply Gemini API key: {e}") # Configure YouTube Data API if youtube_key: try: youtube = build('youtube', 'v3', developerKey=youtube_key) YOUTUBE_API_KEY = youtube_key messages.append("YouTube API key applied successfully.") except Exception as e: messages.append(f"Failed to apply YouTube API key: {e}") if not messages: # Return status and empty keys return "No API keys provided.", "", "" # Return status plus the applied keys so the UI can store them in state return "\n".join(messages), YOUTUBE_API_KEY or "", GEMINI_API_KEY or "" class YouTubeCompetitorAnalyzer: def __init__(self): self.init_database() def init_database(self): """Initialize the database""" conn = sqlite3.connect('competitor_data.db') cursor = conn.cursor() # Channel table (added last_updated_at column) cursor.execute(''' CREATE TABLE IF NOT EXISTS channels ( id INTEGER PRIMARY KEY AUTOINCREMENT, channel_id TEXT UNIQUE, channel_name TEXT, channel_icon_url TEXT, subscriber_count INTEGER, added_date TEXT, last_updated_at TEXT ) ''') # Video data table (added description and tags) cursor.execute(''' CREATE TABLE IF NOT EXISTS videos ( id INTEGER PRIMARY KEY AUTOINCREMENT, video_id TEXT UNIQUE, channel_id TEXT, title TEXT, description TEXT, tags TEXT, published_at TEXT, view_count INTEGER, thumbnail_url TEXT, detected_person TEXT, detection_source TEXT, importance_level TEXT, created_at TEXT ) ''') # Trend clusters table cursor.execute(''' CREATE TABLE IF NOT EXISTS trends ( id INTEGER PRIMARY KEY AUTOINCREMENT, person_name TEXT, video_ids TEXT, trend_date TEXT, is_active BOOLEAN ) ''') conn.commit() conn.close() def extract_person_from_title_with_gemini(self, title: str) -> Optional[str]: """Extract a person's name from the title using Gemini (global, highest priority)""" if not model: return None try: prompt = f""" Please extract a single famous person's name (historical or contemporary) from this YouTube title. Title: "{title}" Target: Globally well-known individuals (no restriction on nationality, era, or field) - People from any country or region worldwide - From ancient to modern times - Any field: politics, business, philosophy, literature, science, arts, religion, sports, etc. - Real historical or contemporary figures Criteria: - Widely known at a general-knowledge level - Frequently mentioned in books, education, or media - Identifiable as a specific real person by proper name Response format: - If a matching person exists: return the person's name only (in Japanese) - If none: return "なし" - If multiple apply: return the single most relevant person Note: Do not restrict by nationality, era, or field. Consider notable people worldwide. Examples: "The secret of innovation by Steve Jobs" -> Steve Jobs "Learning leadership from Confucius" -> Confucius "Introduction to Einstein's theory of relativity" -> Einstein "Konosuke Matsushita on business philosophy" -> Konosuke Matsushita "General success tips" -> none """ response = model.generate_content(prompt) result = response.text.strip() # If result is "なし" or empty, return None if not result or result.lower() in ['なし', 'none', '該当なし', '不明']: return None # Remove line breaks and extra characters to isolate the person's name clean_result = re.sub(r'[「」『』【】|\n\r\t]', '', result).strip() # Check global name pattern (2-15 chars: supports Japanese, English, Chinese, etc.) global_name_pattern = r'^[\u4E00-\u9FAF\u3040-\u309F\u30A0-\u30FF\uAC00-\uD7AF\u0041-\u005A\u0061-\u007A\u00C0-\u017F\u0100-\u024F\s\u30FB\u00B7\u2022]{2,15}$' if re.match(global_name_pattern, clean_result): return clean_result else: return None except Exception as e: print(f"Gemini global title parsing error: {e}") return None def extract_person_from_description_with_gemini(self, description: str) -> Optional[str]: if not description or len(description.strip()) < 10 or not model: return None try: # If the description is too long, limit to the first 500 characters desc_excerpt = description[:500] if len(description) > 500 else description prompt = f""" Please extract a single famous person's name (historical or contemporary) from this YouTube video's description. Description excerpt: "{desc_excerpt}" Target: Globally well-known individuals (no restriction on nationality, era, or field) Criteria: - Widely known at a general-knowledge level - Frequently mentioned in books, education, or media - Identifiable as a specific real person by proper name Response format: - If a matching person exists: return the person's name only (in Japanese) - If none: return "なし" - If multiple apply: return the single most relevant person - Hashtags (e.g., #SteveJobs, #Confucius) should also be considered Note: Do not restrict by nationality, era, or field. Consider notable people worldwide. """ response = model.generate_content(prompt) result = response.text.strip() if not result or result.lower() in ['なし', 'none', '該当なし', '不明']: return None clean_result = re.sub(r'[「」『』【】|#\n\r\t]', '', result).strip() # Check global name pattern global_name_pattern = r'^[\u4E00-\u9FAF\u3040-\u309F\u30A0-\u30FF\uAC00-\uD7AF\u0041-\u005A\u0061-\u007A\u00C0-\u017F\u0100-\u024F\s\u30FB\u00B7\u2022]{2,15}$' if re.match(global_name_pattern, clean_result): return clean_result else: return None except Exception as e: print(f"Gemini global description parsing error: {e}") return None def extract_person_from_tags(self, tags: List[str]) -> Optional[str]: """Extract a person's name from tags (global, priority 3)""" if not tags: return None # Define global name pattern (supports Japanese, English, Chinese, Korean, etc.) global_name_pattern = r'^[\u4E00-\u9FAF\u3040-\u309F\u30A0-\u30FF\uAC00-\uD7AF\u0041-\u005A\u0061-\u007A\u00C0-\u017F\u0100-\u024F\s\u30FB\u00B7\u2022]{2,15}$' # タグから人物名らしきものを探す for tag in tags: if re.match(global_name_pattern, tag): # Exclude overly generic words (global support) exclude_words = [ '動画', '投稿', '更新', '配信', '人生', '経営', '仕事', '成功', '失敗', 'video', 'life', 'business', 'success', 'leadership', 'philosophy', 'motivation', 'inspiration', 'education', 'training', 'coach' ] if tag not in exclude_words and tag.lower() not in [word.lower() for word in exclude_words]: return tag return None def analyze_thumbnail_ocr(self, thumbnail_url: str) -> Optional[str]: """Thumbnail OCR analysis (priority 4)""" if not model: return None try: response = requests.get(thumbnail_url, timeout=10) image_data = base64.b64encode(response.content).decode() prompt = """ Extract text from this YouTube thumbnail image. Pay special attention to names of famous individuals (worldwide, historical or modern). Reply in JSON using the following format: { "detected_text": "All text read by OCR", "person_names": ["List of extracted person names"] } """ image_part = { "mime_type": "image/jpeg", "data": image_data } response = model.generate_content([prompt, image_part]) result_text = response.text # Extract JSON json_match = re.search(r'```json\n(.*?)\n```', result_text, re.DOTALL) if json_match: result_text = json_match.group(1) try: result = json.loads(result_text) person_names = result.get('person_names', []) return person_names[0] if person_names else None except json.JSONDecodeError: return None except Exception as e: print(f"Thumbnail OCR analysis error: {e}") return None def analyze_thumbnail_face_recognition(self, thumbnail_url: str) -> Optional[str]: """Thumbnail face recognition (priority 5)""" if not model: return None try: response = requests.get(thumbnail_url, timeout=10) image_data = base64.b64encode(response.content).decode() prompt = """ Identify the person shown in this image. Consider famous people worldwide, including historical figures, philosophers, business leaders, writers, and scientists. Only return a person's name if you can identify them with confidence. If unknown, return null. Respond in JSON: { "person_name": "Identified person name or null" } """ image_part = { "mime_type": "image/jpeg", "data": image_data } response = model.generate_content([prompt, image_part]) result_text = response.text json_match = re.search(r'```json\n(.*?)\n```', result_text, re.DOTALL) if json_match: result_text = json_match.group(1) try: result = json.loads(result_text) return result.get('person_name') except json.JSONDecodeError: return None except Exception as e: print(f"Face recognition analysis error: {e}") return None def extract_person_comprehensive(self, video_data: Dict) -> tuple[Optional[str], str]: """Comprehensive person extraction (Gemini prioritized, global system)""" title = video_data.get('title', '') description = video_data.get('description', '') tags = video_data.get('tags', []) thumbnail_url = video_data.get('thumbnail_url', '') # Priority 1: Gemini title analysis (global, highest priority) person = self.extract_person_from_title_with_gemini(title) if person: return person, "Gemini-GlobalTitle" # Priority 2: Gemini description analysis (global) person = self.extract_person_from_description_with_gemini(description) if person: return person, "Gemini-GlobalDescription" # Priority 3: Tag analysis (global) person = self.extract_person_from_tags(tags) if person: return person, "GlobalTag" # Priority 4: Thumbnail OCR person = self.analyze_thumbnail_ocr(thumbnail_url) if person: return person, "ThumbnailOCR" # Priority 5: Face recognition person = self.analyze_thumbnail_face_recognition(thumbnail_url) if person: return person, "FaceRecognition" return None, "Not detected" def add_channel(self, channel_id: str) -> str: """Add a channel""" if not youtube: return "YouTube API key is not set." try: # Retrieve channel info response = youtube.channels().list( part='snippet,statistics', id=channel_id ).execute() if not response['items']: return f"ID: {channel_id} - Channel not found" channel_info = response['items'][0] channel_name = channel_info['snippet']['title'] channel_icon = channel_info['snippet']['thumbnails']['default']['url'] subscriber_count = int(channel_info['statistics'].get('subscriberCount', 0)) conn = sqlite3.connect('competitor_data.db') cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO channels (channel_id, channel_name, channel_icon_url, subscriber_count, added_date) VALUES (?, ?, ?, ?, ?) ''', (channel_id, channel_name, channel_icon, subscriber_count, datetime.now().isoformat())) conn.commit() conn.close() return f"Channel '{channel_name}' added" except Exception as e: return f"ID: {channel_id} - Error: {str(e)}" def delete_channel(self, channel_id: str) -> str: """Delete a channel""" try: conn = sqlite3.connect('competitor_data.db') cursor = conn.cursor() # Retrieve channel name cursor.execute('SELECT channel_name FROM channels WHERE channel_id = ?', (channel_id,)) result = cursor.fetchone() if not result: conn.close() return "Channel not found" channel_name = result[0] # Delete channel and related video data cursor.execute('DELETE FROM videos WHERE channel_id = ?', (channel_id,)) cursor.execute('DELETE FROM channels WHERE channel_id = ?', (channel_id,)) conn.commit() conn.close() return f"Channel '{channel_name}' deleted" except Exception as e: return f"Deletion error: {str(e)}" def update_channel_name(self, channel_id: str, new_name: str) -> str: """Update channel name""" try: conn = sqlite3.connect('competitor_data.db') cursor = conn.cursor() cursor.execute(''' UPDATE channels SET channel_name = ? WHERE channel_id = ? ''', (new_name, channel_id)) if cursor.rowcount == 0: conn.close() return "Channel not found" conn.commit() conn.close() return f"Channel name updated to '{new_name}'" except Exception as e: return f"Update error: {str(e)}" def get_channels(self) -> List[Dict]: """Get list of registered channels""" conn = sqlite3.connect('competitor_data.db') cursor = conn.cursor() cursor.execute(''' SELECT channel_id, channel_name, channel_icon_url, subscriber_count, added_date, last_updated_at FROM channels ORDER BY added_date DESC ''') channels = [] for row in cursor.fetchall(): channels.append({ 'id': row[0], 'name': row[1], 'icon_url': row[2], 'subscriber_count': row[3], 'added_date': row[4], 'last_updated_at': row[5] }) conn.close() return channels def fetch_videos_from_channel(self, channel_id: str, since_date: Optional[str] = None) -> List[Dict]: """Fetch videos from a channel since the specified date""" if not youtube: return [] try: # since_dateがなければ、過去7日間に設定 if not since_date: since_date_dt = datetime.now(timezone.utc) - timedelta(days=7) else: since_date_dt = datetime.fromisoformat(since_date) # YouTube APIのフォーマットに変換 published_after = since_date_dt.isoformat().replace('+00:00', 'Z') response = youtube.search().list( part='snippet', channelId=channel_id, maxResults=50, order='date', publishedAfter=published_after, type='video' ).execute() videos = [] video_ids = [item['id']['videoId'] for item in response['items']] # 動画の詳細情報(再生回数、概要欄、タグなど)を取得 if video_ids: video_details = youtube.videos().list( part='statistics,snippet', id=','.join(video_ids) ).execute() for item in video_details['items']: videos.append({ 'video_id': item['id'], 'title': item['snippet']['title'], 'description': item['snippet'].get('description', ''), 'tags': item['snippet'].get('tags', []), 'published_at': item['snippet']['publishedAt'], 'view_count': int(item['statistics'].get('viewCount', 0)), 'thumbnail_url': item['snippet']['thumbnails']['high']['url'] }) return videos except Exception as e: print(f"Video fetch error: {e}") return [] def determine_importance(self, video_data: Dict) -> str: """Determine importance level""" published_at_str = video_data['published_at'] # Add 'Z' when timezone information is missing if 'Z' not in published_at_str and '+' not in published_at_str: published_at_str += 'Z' published_at = datetime.fromisoformat(published_at_str.replace('Z', '+00:00')) now_utc = datetime.now(published_at.tzinfo) hours_since_published = (now_utc - published_at).total_seconds() / 3600 view_count = video_data['view_count'] if hours_since_published <= 24 and view_count >= 10000: return "Critical" elif hours_since_published <= 48 and view_count >= 10000: return "Important" else: return "Normal" def detect_trends(self) -> List[Dict]: """Detect trending clusters""" conn = sqlite3.connect('competitor_data.db') cursor = conn.cursor() # 過去2日以内の動画で人物が特定されたものを取得 two_days_ago = (datetime.now() - timedelta(days=2)).isoformat() cursor.execute(''' SELECT detected_person, COUNT(*) as video_count, GROUP_CONCAT(video_id) as video_ids, GROUP_CONCAT(DISTINCT channel_id) as channels, GROUP_CONCAT(detection_source) as sources FROM videos WHERE detected_person IS NOT NULL AND detected_person != '' AND published_at > ? GROUP BY detected_person HAVING COUNT(*) >= 2 AND COUNT(DISTINCT channel_id) >= 2 ORDER BY video_count DESC ''', (two_days_ago,)) trends = [] for row in cursor.fetchall(): person_name, count, video_ids, channels, sources = row unique_channels = len(set(channels.split(','))) trends.append({ 'person_name': person_name, 'video_count': count, 'unique_channels': unique_channels, 'video_ids': video_ids.split(','), 'detection_sources': sources.split(',') }) conn.close() return trends def update_all_data(self) -> str: """Update data for all channels""" channels = self.get_channels() total_new_videos = 0 conn = sqlite3.connect('competitor_data.db') cursor = conn.cursor() for channel in channels: channel_id = channel['id'] last_update = channel['last_updated_at'] # Only fetch videos published after the channel's last_updated_at videos = self.fetch_videos_from_channel(channel_id, since_date=last_update) for video in videos: # 包括的な人物名抽出 detected_person, detection_source = self.extract_person_comprehensive(video) # 重要度を判定 importance = self.determine_importance(video) # データベースに保存 cursor.execute(''' INSERT OR IGNORE INTO videos (video_id, channel_id, title, description, tags, published_at, view_count, thumbnail_url, detected_person, detection_source, importance_level, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( video['video_id'], channel_id, video['title'], video['description'], ','.join(video['tags']) if video['tags'] else '', video['published_at'], video['view_count'], video['thumbnail_url'], detected_person, detection_source, importance, datetime.now(timezone.utc).isoformat() )) if cursor.rowcount > 0: total_new_videos += 1 # Update this channel's last_updated_at to now cursor.execute(''' UPDATE channels SET last_updated_at = ? WHERE channel_id = ? ''', (datetime.now(timezone.utc).isoformat(), channel_id)) conn.commit() conn.close() return f"Update complete: added {total_new_videos} new videos" def get_recent_videos_by_timerange(self, hours: int, limit: int = 50) -> List[Dict]: """Get videos within the specified time range sorted by view count (JST-based)""" conn = sqlite3.connect('competitor_data.db') cursor = conn.cursor() # 日本時間(JST = UTC+9)で指定時間前の日時を計算 jst = timezone(timedelta(hours=9)) cutoff_time_jst = datetime.now(jst) - timedelta(hours=hours) cutoff_time_utc = cutoff_time_jst.astimezone(timezone.utc) cursor.execute(''' SELECT v.video_id, v.title, v.published_at, v.view_count, v.thumbnail_url, v.detected_person, v.detection_source, v.importance_level, c.channel_name, c.channel_icon_url, v.channel_id FROM videos v JOIN channels c ON v.channel_id = c.channel_id WHERE v.published_at > ? ORDER BY v.view_count DESC LIMIT ? ''', (cutoff_time_utc.isoformat(), limit)) videos = [] for row in cursor.fetchall(): video_id, title, published_at, view_count, thumbnail_url, detected_person, \ detection_source, importance_level, channel_name, channel_icon_url, channel_id = row # UTC時間をJSTに変換 published_at_utc = datetime.fromisoformat(published_at.replace('Z', '+00:00')) published_at_jst = published_at_utc.astimezone(jst) videos.append({ 'video_id': video_id, 'title': title, 'published_at': published_at, 'published_at_jst': published_at_jst, 'view_count': view_count, 'thumbnail_url': thumbnail_url, 'detected_person': detected_person or 'Not detected', 'detection_source': detection_source or '-', 'importance_level': importance_level or 'Normal', 'channel_name': channel_name, 'channel_icon_url': channel_icon_url, 'channel_id': channel_id }) conn.close() return videos def generate_recent_videos_html(self, hours: int, limit: int = 50) -> str: """Generate HTML for the recent videos list""" videos = self.get_recent_videos_by_timerange(hours, limit) # Build a description for the time range time_range_text = f"Past {hours} hours" html = f"""
There are currently no trending clusters.
" html += """No channels registered.
" html += """