import gradio as gr import sqlite3 import json import requests from datetime import datetime, timedelta, timezone from typing import List, Dict, Any, Optional import google.generativeai as genai from googleapiclient.discovery import build import pandas as pd import re from collections import defaultdict import base64 # Do NOT auto-load API keys from environment; keys should be provided by the user via the UI. # Keep variables here so other functions can reference them after the user provides keys. YOUTUBE_API_KEY: Optional[str] = None GEMINI_API_KEY: Optional[str] = None model = None youtube = None def set_api_keys(youtube_key: Optional[str], gemini_key: Optional[str]) -> tuple[str, str, str]: """Apply API keys provided by the user at runtime. This will configure the Gemini client and the YouTube Data API client so the rest of the app uses the provided keys instead of environment vars. """ global YOUTUBE_API_KEY, GEMINI_API_KEY, model, youtube messages = [] # Configure Gemini (Generative AI) if gemini_key: try: genai.configure(api_key=gemini_key) model = genai.GenerativeModel('gemini-2.5-flash') GEMINI_API_KEY = gemini_key messages.append("Gemini API key applied successfully.") except Exception as e: messages.append(f"Failed to apply Gemini API key: {e}") # Configure YouTube Data API if youtube_key: try: youtube = build('youtube', 'v3', developerKey=youtube_key) YOUTUBE_API_KEY = youtube_key messages.append("YouTube API key applied successfully.") except Exception as e: messages.append(f"Failed to apply YouTube API key: {e}") if not messages: # Return status and empty keys return "No API keys provided.", "", "" # Return status plus the applied keys so the UI can store them in state return "\n".join(messages), YOUTUBE_API_KEY or "", GEMINI_API_KEY or "" class YouTubeCompetitorAnalyzer: def __init__(self): self.init_database() def init_database(self): """Initialize the database""" conn = sqlite3.connect('competitor_data.db') cursor = conn.cursor() # Channel table (added last_updated_at column) cursor.execute(''' CREATE TABLE IF NOT EXISTS channels ( id INTEGER PRIMARY KEY AUTOINCREMENT, channel_id TEXT UNIQUE, channel_name TEXT, channel_icon_url TEXT, subscriber_count INTEGER, added_date TEXT, last_updated_at TEXT ) ''') # Video data table (added description and tags) cursor.execute(''' CREATE TABLE IF NOT EXISTS videos ( id INTEGER PRIMARY KEY AUTOINCREMENT, video_id TEXT UNIQUE, channel_id TEXT, title TEXT, description TEXT, tags TEXT, published_at TEXT, view_count INTEGER, thumbnail_url TEXT, detected_person TEXT, detection_source TEXT, importance_level TEXT, created_at TEXT ) ''') # Trend clusters table cursor.execute(''' CREATE TABLE IF NOT EXISTS trends ( id INTEGER PRIMARY KEY AUTOINCREMENT, person_name TEXT, video_ids TEXT, trend_date TEXT, is_active BOOLEAN ) ''') conn.commit() conn.close() def extract_person_from_title_with_gemini(self, title: str) -> Optional[str]: """Extract a person's name from the title using Gemini (global, highest priority)""" if not model: return None try: prompt = f""" Please extract a single famous person's name (historical or contemporary) from this YouTube title. Title: "{title}" Target: Globally well-known individuals (no restriction on nationality, era, or field) - People from any country or region worldwide - From ancient to modern times - Any field: politics, business, philosophy, literature, science, arts, religion, sports, etc. - Real historical or contemporary figures Criteria: - Widely known at a general-knowledge level - Frequently mentioned in books, education, or media - Identifiable as a specific real person by proper name Response format: - If a matching person exists: return the person's name only (in Japanese) - If none: return "なし" - If multiple apply: return the single most relevant person Note: Do not restrict by nationality, era, or field. Consider notable people worldwide. Examples: "The secret of innovation by Steve Jobs" -> Steve Jobs "Learning leadership from Confucius" -> Confucius "Introduction to Einstein's theory of relativity" -> Einstein "Konosuke Matsushita on business philosophy" -> Konosuke Matsushita "General success tips" -> none """ response = model.generate_content(prompt) result = response.text.strip() # If result is "なし" or empty, return None if not result or result.lower() in ['なし', 'none', '該当なし', '不明']: return None # Remove line breaks and extra characters to isolate the person's name clean_result = re.sub(r'[「」『』【】|\n\r\t]', '', result).strip() # Check global name pattern (2-15 chars: supports Japanese, English, Chinese, etc.) global_name_pattern = r'^[\u4E00-\u9FAF\u3040-\u309F\u30A0-\u30FF\uAC00-\uD7AF\u0041-\u005A\u0061-\u007A\u00C0-\u017F\u0100-\u024F\s\u30FB\u00B7\u2022]{2,15}$' if re.match(global_name_pattern, clean_result): return clean_result else: return None except Exception as e: print(f"Gemini global title parsing error: {e}") return None def extract_person_from_description_with_gemini(self, description: str) -> Optional[str]: if not description or len(description.strip()) < 10 or not model: return None try: # If the description is too long, limit to the first 500 characters desc_excerpt = description[:500] if len(description) > 500 else description prompt = f""" Please extract a single famous person's name (historical or contemporary) from this YouTube video's description. Description excerpt: "{desc_excerpt}" Target: Globally well-known individuals (no restriction on nationality, era, or field) Criteria: - Widely known at a general-knowledge level - Frequently mentioned in books, education, or media - Identifiable as a specific real person by proper name Response format: - If a matching person exists: return the person's name only (in Japanese) - If none: return "なし" - If multiple apply: return the single most relevant person - Hashtags (e.g., #SteveJobs, #Confucius) should also be considered Note: Do not restrict by nationality, era, or field. Consider notable people worldwide. """ response = model.generate_content(prompt) result = response.text.strip() if not result or result.lower() in ['なし', 'none', '該当なし', '不明']: return None clean_result = re.sub(r'[「」『』【】|#\n\r\t]', '', result).strip() # Check global name pattern global_name_pattern = r'^[\u4E00-\u9FAF\u3040-\u309F\u30A0-\u30FF\uAC00-\uD7AF\u0041-\u005A\u0061-\u007A\u00C0-\u017F\u0100-\u024F\s\u30FB\u00B7\u2022]{2,15}$' if re.match(global_name_pattern, clean_result): return clean_result else: return None except Exception as e: print(f"Gemini global description parsing error: {e}") return None def extract_person_from_tags(self, tags: List[str]) -> Optional[str]: """Extract a person's name from tags (global, priority 3)""" if not tags: return None # Define global name pattern (supports Japanese, English, Chinese, Korean, etc.) global_name_pattern = r'^[\u4E00-\u9FAF\u3040-\u309F\u30A0-\u30FF\uAC00-\uD7AF\u0041-\u005A\u0061-\u007A\u00C0-\u017F\u0100-\u024F\s\u30FB\u00B7\u2022]{2,15}$' # タグから人物名らしきものを探す for tag in tags: if re.match(global_name_pattern, tag): # Exclude overly generic words (global support) exclude_words = [ '動画', '投稿', '更新', '配信', '人生', '経営', '仕事', '成功', '失敗', 'video', 'life', 'business', 'success', 'leadership', 'philosophy', 'motivation', 'inspiration', 'education', 'training', 'coach' ] if tag not in exclude_words and tag.lower() not in [word.lower() for word in exclude_words]: return tag return None def analyze_thumbnail_ocr(self, thumbnail_url: str) -> Optional[str]: """Thumbnail OCR analysis (priority 4)""" if not model: return None try: response = requests.get(thumbnail_url, timeout=10) image_data = base64.b64encode(response.content).decode() prompt = """ Extract text from this YouTube thumbnail image. Pay special attention to names of famous individuals (worldwide, historical or modern). Reply in JSON using the following format: { "detected_text": "All text read by OCR", "person_names": ["List of extracted person names"] } """ image_part = { "mime_type": "image/jpeg", "data": image_data } response = model.generate_content([prompt, image_part]) result_text = response.text # Extract JSON json_match = re.search(r'```json\n(.*?)\n```', result_text, re.DOTALL) if json_match: result_text = json_match.group(1) try: result = json.loads(result_text) person_names = result.get('person_names', []) return person_names[0] if person_names else None except json.JSONDecodeError: return None except Exception as e: print(f"Thumbnail OCR analysis error: {e}") return None def analyze_thumbnail_face_recognition(self, thumbnail_url: str) -> Optional[str]: """Thumbnail face recognition (priority 5)""" if not model: return None try: response = requests.get(thumbnail_url, timeout=10) image_data = base64.b64encode(response.content).decode() prompt = """ Identify the person shown in this image. Consider famous people worldwide, including historical figures, philosophers, business leaders, writers, and scientists. Only return a person's name if you can identify them with confidence. If unknown, return null. Respond in JSON: { "person_name": "Identified person name or null" } """ image_part = { "mime_type": "image/jpeg", "data": image_data } response = model.generate_content([prompt, image_part]) result_text = response.text json_match = re.search(r'```json\n(.*?)\n```', result_text, re.DOTALL) if json_match: result_text = json_match.group(1) try: result = json.loads(result_text) return result.get('person_name') except json.JSONDecodeError: return None except Exception as e: print(f"Face recognition analysis error: {e}") return None def extract_person_comprehensive(self, video_data: Dict) -> tuple[Optional[str], str]: """Comprehensive person extraction (Gemini prioritized, global system)""" title = video_data.get('title', '') description = video_data.get('description', '') tags = video_data.get('tags', []) thumbnail_url = video_data.get('thumbnail_url', '') # Priority 1: Gemini title analysis (global, highest priority) person = self.extract_person_from_title_with_gemini(title) if person: return person, "Gemini-GlobalTitle" # Priority 2: Gemini description analysis (global) person = self.extract_person_from_description_with_gemini(description) if person: return person, "Gemini-GlobalDescription" # Priority 3: Tag analysis (global) person = self.extract_person_from_tags(tags) if person: return person, "GlobalTag" # Priority 4: Thumbnail OCR person = self.analyze_thumbnail_ocr(thumbnail_url) if person: return person, "ThumbnailOCR" # Priority 5: Face recognition person = self.analyze_thumbnail_face_recognition(thumbnail_url) if person: return person, "FaceRecognition" return None, "Not detected" def add_channel(self, channel_id: str) -> str: """Add a channel""" if not youtube: return "YouTube API key is not set." try: # Retrieve channel info response = youtube.channels().list( part='snippet,statistics', id=channel_id ).execute() if not response['items']: return f"ID: {channel_id} - Channel not found" channel_info = response['items'][0] channel_name = channel_info['snippet']['title'] channel_icon = channel_info['snippet']['thumbnails']['default']['url'] subscriber_count = int(channel_info['statistics'].get('subscriberCount', 0)) conn = sqlite3.connect('competitor_data.db') cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO channels (channel_id, channel_name, channel_icon_url, subscriber_count, added_date) VALUES (?, ?, ?, ?, ?) ''', (channel_id, channel_name, channel_icon, subscriber_count, datetime.now().isoformat())) conn.commit() conn.close() return f"Channel '{channel_name}' added" except Exception as e: return f"ID: {channel_id} - Error: {str(e)}" def delete_channel(self, channel_id: str) -> str: """Delete a channel""" try: conn = sqlite3.connect('competitor_data.db') cursor = conn.cursor() # Retrieve channel name cursor.execute('SELECT channel_name FROM channels WHERE channel_id = ?', (channel_id,)) result = cursor.fetchone() if not result: conn.close() return "Channel not found" channel_name = result[0] # Delete channel and related video data cursor.execute('DELETE FROM videos WHERE channel_id = ?', (channel_id,)) cursor.execute('DELETE FROM channels WHERE channel_id = ?', (channel_id,)) conn.commit() conn.close() return f"Channel '{channel_name}' deleted" except Exception as e: return f"Deletion error: {str(e)}" def update_channel_name(self, channel_id: str, new_name: str) -> str: """Update channel name""" try: conn = sqlite3.connect('competitor_data.db') cursor = conn.cursor() cursor.execute(''' UPDATE channels SET channel_name = ? WHERE channel_id = ? ''', (new_name, channel_id)) if cursor.rowcount == 0: conn.close() return "Channel not found" conn.commit() conn.close() return f"Channel name updated to '{new_name}'" except Exception as e: return f"Update error: {str(e)}" def get_channels(self) -> List[Dict]: """Get list of registered channels""" conn = sqlite3.connect('competitor_data.db') cursor = conn.cursor() cursor.execute(''' SELECT channel_id, channel_name, channel_icon_url, subscriber_count, added_date, last_updated_at FROM channels ORDER BY added_date DESC ''') channels = [] for row in cursor.fetchall(): channels.append({ 'id': row[0], 'name': row[1], 'icon_url': row[2], 'subscriber_count': row[3], 'added_date': row[4], 'last_updated_at': row[5] }) conn.close() return channels def fetch_videos_from_channel(self, channel_id: str, since_date: Optional[str] = None) -> List[Dict]: """Fetch videos from a channel since the specified date""" if not youtube: return [] try: # since_dateがなければ、過去7日間に設定 if not since_date: since_date_dt = datetime.now(timezone.utc) - timedelta(days=7) else: since_date_dt = datetime.fromisoformat(since_date) # YouTube APIのフォーマットに変換 published_after = since_date_dt.isoformat().replace('+00:00', 'Z') response = youtube.search().list( part='snippet', channelId=channel_id, maxResults=50, order='date', publishedAfter=published_after, type='video' ).execute() videos = [] video_ids = [item['id']['videoId'] for item in response['items']] # 動画の詳細情報(再生回数、概要欄、タグなど)を取得 if video_ids: video_details = youtube.videos().list( part='statistics,snippet', id=','.join(video_ids) ).execute() for item in video_details['items']: videos.append({ 'video_id': item['id'], 'title': item['snippet']['title'], 'description': item['snippet'].get('description', ''), 'tags': item['snippet'].get('tags', []), 'published_at': item['snippet']['publishedAt'], 'view_count': int(item['statistics'].get('viewCount', 0)), 'thumbnail_url': item['snippet']['thumbnails']['high']['url'] }) return videos except Exception as e: print(f"Video fetch error: {e}") return [] def determine_importance(self, video_data: Dict) -> str: """Determine importance level""" published_at_str = video_data['published_at'] # Add 'Z' when timezone information is missing if 'Z' not in published_at_str and '+' not in published_at_str: published_at_str += 'Z' published_at = datetime.fromisoformat(published_at_str.replace('Z', '+00:00')) now_utc = datetime.now(published_at.tzinfo) hours_since_published = (now_utc - published_at).total_seconds() / 3600 view_count = video_data['view_count'] if hours_since_published <= 24 and view_count >= 10000: return "Critical" elif hours_since_published <= 48 and view_count >= 10000: return "Important" else: return "Normal" def detect_trends(self) -> List[Dict]: """Detect trending clusters""" conn = sqlite3.connect('competitor_data.db') cursor = conn.cursor() # 過去2日以内の動画で人物が特定されたものを取得 two_days_ago = (datetime.now() - timedelta(days=2)).isoformat() cursor.execute(''' SELECT detected_person, COUNT(*) as video_count, GROUP_CONCAT(video_id) as video_ids, GROUP_CONCAT(DISTINCT channel_id) as channels, GROUP_CONCAT(detection_source) as sources FROM videos WHERE detected_person IS NOT NULL AND detected_person != '' AND published_at > ? GROUP BY detected_person HAVING COUNT(*) >= 2 AND COUNT(DISTINCT channel_id) >= 2 ORDER BY video_count DESC ''', (two_days_ago,)) trends = [] for row in cursor.fetchall(): person_name, count, video_ids, channels, sources = row unique_channels = len(set(channels.split(','))) trends.append({ 'person_name': person_name, 'video_count': count, 'unique_channels': unique_channels, 'video_ids': video_ids.split(','), 'detection_sources': sources.split(',') }) conn.close() return trends def update_all_data(self) -> str: """Update data for all channels""" channels = self.get_channels() total_new_videos = 0 conn = sqlite3.connect('competitor_data.db') cursor = conn.cursor() for channel in channels: channel_id = channel['id'] last_update = channel['last_updated_at'] # Only fetch videos published after the channel's last_updated_at videos = self.fetch_videos_from_channel(channel_id, since_date=last_update) for video in videos: # 包括的な人物名抽出 detected_person, detection_source = self.extract_person_comprehensive(video) # 重要度を判定 importance = self.determine_importance(video) # データベースに保存 cursor.execute(''' INSERT OR IGNORE INTO videos (video_id, channel_id, title, description, tags, published_at, view_count, thumbnail_url, detected_person, detection_source, importance_level, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( video['video_id'], channel_id, video['title'], video['description'], ','.join(video['tags']) if video['tags'] else '', video['published_at'], video['view_count'], video['thumbnail_url'], detected_person, detection_source, importance, datetime.now(timezone.utc).isoformat() )) if cursor.rowcount > 0: total_new_videos += 1 # Update this channel's last_updated_at to now cursor.execute(''' UPDATE channels SET last_updated_at = ? WHERE channel_id = ? ''', (datetime.now(timezone.utc).isoformat(), channel_id)) conn.commit() conn.close() return f"Update complete: added {total_new_videos} new videos" def get_recent_videos_by_timerange(self, hours: int, limit: int = 50) -> List[Dict]: """Get videos within the specified time range sorted by view count (JST-based)""" conn = sqlite3.connect('competitor_data.db') cursor = conn.cursor() # 日本時間(JST = UTC+9)で指定時間前の日時を計算 jst = timezone(timedelta(hours=9)) cutoff_time_jst = datetime.now(jst) - timedelta(hours=hours) cutoff_time_utc = cutoff_time_jst.astimezone(timezone.utc) cursor.execute(''' SELECT v.video_id, v.title, v.published_at, v.view_count, v.thumbnail_url, v.detected_person, v.detection_source, v.importance_level, c.channel_name, c.channel_icon_url, v.channel_id FROM videos v JOIN channels c ON v.channel_id = c.channel_id WHERE v.published_at > ? ORDER BY v.view_count DESC LIMIT ? ''', (cutoff_time_utc.isoformat(), limit)) videos = [] for row in cursor.fetchall(): video_id, title, published_at, view_count, thumbnail_url, detected_person, \ detection_source, importance_level, channel_name, channel_icon_url, channel_id = row # UTC時間をJSTに変換 published_at_utc = datetime.fromisoformat(published_at.replace('Z', '+00:00')) published_at_jst = published_at_utc.astimezone(jst) videos.append({ 'video_id': video_id, 'title': title, 'published_at': published_at, 'published_at_jst': published_at_jst, 'view_count': view_count, 'thumbnail_url': thumbnail_url, 'detected_person': detected_person or 'Not detected', 'detection_source': detection_source or '-', 'importance_level': importance_level or 'Normal', 'channel_name': channel_name, 'channel_icon_url': channel_icon_url, 'channel_id': channel_id }) conn.close() return videos def generate_recent_videos_html(self, hours: int, limit: int = 50) -> str: """Generate HTML for the recent videos list""" videos = self.get_recent_videos_by_timerange(hours, limit) # Build a description for the time range time_range_text = f"Past {hours} hours" html = f""" Recent Videos - {time_range_text}

📺 Recent Videos ({time_range_text} · Sorted by views)

📊 Showing {len(videos)} videos (up to {limit}) | 🕐 Times shown in JST | 📈 Sorted by views
""" if videos: for video in videos: # 日本時間での投稿日時をフォーマット published_jst = video['published_at_jst'].strftime('%m/%d %H:%M') # Construct video and channel URLs video_url = f"https://www.youtube.com/watch?v={video['video_id']}" channel_url = f"https://www.youtube.com/channel/{video['channel_id']}" html += f"""
thumbnail
{video['title']}
channel {video['channel_name']}
📅 {published_jst} (JST) | 👀 {video['view_count']:,} views | 👤 {video['detected_person']} {video['detection_source']} {video['importance_level']}
""" else: html += f"""
📭 No videos were posted in the {time_range_text}.
Try updating data or expanding the time range.
""" html += """
""" return html def generate_dashboard(self) -> str: """Generate the HTML dashboard""" trends = self.detect_trends() conn = sqlite3.connect('competitor_data.db') cursor = conn.cursor() # 重要度別の動画を取得 cursor.execute(''' SELECT v.title, v.published_at, v.view_count, v.detected_person, v.importance_level, c.channel_name, v.thumbnail_url, v.video_id, v.channel_id, v.detection_source FROM videos v JOIN channels c ON v.channel_id = c.channel_id WHERE v.published_at > ? ORDER BY CASE v.importance_level WHEN 'Critical' THEN 1 WHEN 'Important' THEN 2 ELSE 3 END, v.view_count DESC ''', ((datetime.now() - timedelta(days=7)).isoformat(),)) videos = cursor.fetchall() conn.close() # Generate HTML html = """ YouTube Competitor Analysis Dashboard (Global)

🌍 YouTube Competitor Analysis Dashboard (Global)

🔥 Currently Trending Clusters

""" if trends: for trend in trends: detection_sources = list(set(trend['detection_sources'])) sources_text = ', '.join(detection_sources) html += f"""
👤 {trend['person_name']}
📺 {trend['video_count']} videos posted across {trend['unique_channels']} channels
🔍 Detection methods: {sources_text}
""" else: html += "

There are currently no trending clusters.

" html += """

📺 Recent Videos (Global person detection)

""" for video in videos[:20]: # 上位20本を表示 (title, published_at, view_count, detected_person, importance, channel_name, thumbnail_url, video_id, channel_id, detection_source) = video published_date = datetime.fromisoformat(published_at.replace('Z', '+00:00')).strftime('%m/%d %H:%M') person_display = detected_person if detected_person else "Unknown" # URLを生成 video_url = f"https://www.youtube.com/watch?v={video_id}" channel_url = f"https://www.youtube.com/channel/{channel_id}" html += f"""
thumbnail
{title}
📺 {channel_name} | 📅 {published_date} | 👀 {view_count:,} views | 👤 {person_display} {detection_source} {importance}
""" html += """
""" return html def generate_channel_management_html(self) -> str: """Generate HTML for channel management""" channels = self.get_channels() html = """ Channel Management (Global)

🌍 Registered Channels (Global person detection)

""" if channels: for channel in channels: added_date = datetime.fromisoformat(channel['added_date']).strftime('%Y/%m/%d') subscriber_text = f"{channel['subscriber_count']:,} subscribers" if channel['subscriber_count'] else "Private" html += f"""
icon
{channel['name']}
👥 Subscribers: {subscriber_text} | 📅 Added: {added_date}
🆔 {channel['id']}
""" else: html += "

No channels registered.

" html += """
""" return html # アプリのインスタンスを作成 analyzer = YouTubeCompetitorAnalyzer() # Strong global dark CSS to force dark mode even if hosting injects light styles. # Uses high-specificity selectors and !important to override Hugging Face Spaces' theme. DARK_THEME_CSS = """ :root, html, body, .gradio-container { background-color: #000000 !important; color: #ffffff !important; color-scheme: dark !important; } .gradio-container, .gradio-container * { background-color: transparent !important; color: #ffffff !important; border-color: #333333 !important; } /* Inputs, buttons and textareas */ button, .gr-button, input, textarea, select, .gradio-textbox, .gradio-file, .gradio-dropdown, .gradio-button { background-color: #0b0b0b !important; color: #ffffff !important; border: 1px solid #333333 !important; } input::placeholder, textarea::placeholder { color: #bfbfbf !important; } .gradio-markdown, .gradio-html, .gradio-label, .gradio-textbox, .gradio-output { color: #ffffff !important; } /* Ensure components that Gradio or Spaces might wrap still show dark backgrounds */ .gradio-container .container, .gradio-container .section, .gradio-container .card { background-color: #000000 !important; color: #ffffff !important; } /* Give high contrast to borders and badges */ .importance-badge, .detection-badge { color: #ffffff !important; } """ # Gradio インターface def add_channel_interface(channel_ids_text): """Interface function that supports adding multiple channel IDs""" if not channel_ids_text: return "Please enter channel ID" # 改行で分割し、前後の空白を除去し、空行を無視する channel_ids = [cid.strip() for cid in channel_ids_text.split('\n') if cid.strip()] if not channel_ids: return "No valid channel IDs provided." results = [] # Process each channel ID in order for channel_id in channel_ids: result = analyzer.add_channel(channel_id) results.append(result) # 結果を改行で連結して返す return "\n".join(results) def delete_channel_interface(channel_id): if not channel_id: return "Please enter a channel ID to delete" return analyzer.delete_channel(channel_id.strip()) def update_channel_name_interface(channel_id, new_name): if not channel_id or not new_name: return "Please enter channel ID and new name" return analyzer.update_channel_name(channel_id.strip(), new_name.strip()) def update_data_interface(): return analyzer.update_all_data() def show_dashboard(): return analyzer.generate_dashboard() def show_channel_management(): return analyzer.generate_channel_management_html() def show_recent_videos_interface(hours_selection, limit_selection): """Interface function for recent videos list""" hours_map = { "6 hours": 6, "12 hours": 12, "24 hours": 24, "48 hours": 48 } limit_map = { "20 items": 20, "50 items": 50, "100 items": 100, "200 items": 200 } hours = hours_map.get(hours_selection, 24) limit = limit_map.get(limit_selection, 50) return analyzer.generate_recent_videos_html(hours, limit) # Gradioアプリの構築 with gr.Blocks(title="YouTube Competitor Analysis (Global)", theme=gr.themes.Monochrome(), css=DARK_THEME_CSS) as app: gr.Markdown("# 🌍 YouTube Competitor Analysis App (Global)") gr.Markdown("Analyze competitor channel uploads and detect global clustered trends using **Gemini 2.5 Flash**.") with gr.Tab("📊 Dashboard"): gr.Markdown("## Global Analysis Dashboard") refresh_btn = gr.Button("📈 Refresh Dashboard", variant="secondary") dashboard_html = gr.HTML() refresh_btn.click(show_dashboard, inputs=[], outputs=[dashboard_html]) # initial load app.load(show_dashboard, inputs=[], outputs=[dashboard_html]) with gr.Tab("📺 Recent Videos"): gr.Markdown("## 📺 Recent Videos (select time range and max items)") gr.Markdown(""" ### Features - ⏰ Time range selection: choose between 6 to 48 hours - 📊 Sorted by view count: show the highest-view videos first - 🕐 Times displayed in JST (UTC+9) - 🔢 Max items: limit display between 20 and 200 - 🌍 Global detection: detect notable people worldwide """) with gr.Row(): hours_dropdown = gr.Dropdown( choices=["6 hours", "12 hours", "24 hours", "48 hours"], value="24 hours", label="⏰ Time Range" ) limit_dropdown = gr.Dropdown( choices=["20 items", "50 items", "100 items", "200 items"], value="50 items", label="🔢 Max items" ) update_recent_btn = gr.Button("🔄 Update Recent Videos", variant="primary", size="lg") recent_videos_html = gr.HTML() update_recent_btn.click( show_recent_videos_interface, inputs=[hours_dropdown, limit_dropdown], outputs=[recent_videos_html] ) # initial load (24 hours, 50 items) app.load( show_recent_videos_interface, inputs=[gr.State("24 hours"), gr.State("50 items")], outputs=[recent_videos_html] ) with gr.Tab("🔄 Data Update"): gr.Markdown("## 🌍 Global AI High-Precision Data Update System") gr.Markdown(""" ### 🎯 Person extraction using Gemini 2.5 Flash (priority order) 1. **🤖 Gemini title analysis** - high-precision extraction of notable people from titles 2. **🤖 Gemini description analysis** - detect people via hashtags and description 3. **🏷️ Tag analysis** - identify multilingual person names from tags 4. **🖼️ Thumbnail OCR** - read text from thumbnails 5. **👤 Face recognition** - identify persons via face recognition **🌍 Global support**: detects people regardless of nationality, era, or field """) # API key inputs (allow user to provide keys at runtime instead of env vars) with gr.Row(): youtube_key_input = gr.Textbox(label="YouTube API Key", placeholder="Enter YouTube API key", type="password") gemini_key_input = gr.Textbox(label="Gemini API Key", placeholder="Enter Gemini API key", type="password") apply_keys_btn = gr.Button("Apply API Keys", variant="secondary") api_keys_result = gr.Textbox(label="API Key Status", interactive=False) # Keep applied keys in hidden Gradio state so other callbacks can reference them if needed youtube_key_state = gr.State("") gemini_key_state = gr.State("") # set_api_keys now returns (status, youtube_key, gemini_key) apply_keys_btn.click( set_api_keys, inputs=[youtube_key_input, gemini_key_input], outputs=[api_keys_result, youtube_key_state, gemini_key_state] ) update_btn = gr.Button("🌍 Start Global Data Update", variant="primary", size="lg") update_result = gr.Textbox(label="Update Result", interactive=False) update_btn.click(update_data_interface, inputs=[], outputs=[update_result]) with gr.Tab("📺 Channel Management"): with gr.Row(): with gr.Column(scale=2): gr.Markdown("## Add Channels") # Textbox switched to multiline to support multiple lines channel_input = gr.TextArea( label="Channel ID (one per line)", placeholder="UCxxxxxxxxxxxxxxxxxxxxxxxx\nUCyyyyyyyyyyyyyyyyyyyyyyyy", info="Enter multiple YouTube channel IDs separated by newlines" ) add_btn = gr.Button("Add Channels", variant="primary") add_result = gr.Textbox(label="Result", interactive=False) add_btn.click(add_channel_interface, inputs=[channel_input], outputs=[add_result]) with gr.Column(scale=2): gr.Markdown("## Delete Channel") delete_channel_input = gr.Textbox( label="Channel ID to delete", placeholder="UCxxxxxxxxxxxxxxxxxxxxxxxx" ) delete_btn = gr.Button("Delete Channel", variant="stop") delete_result = gr.Textbox(label="Delete Result", interactive=False) delete_btn.click(delete_channel_interface, inputs=[delete_channel_input], outputs=[delete_result]) with gr.Row(): with gr.Column(): gr.Markdown("## Edit Channel Name") edit_channel_id = gr.Textbox(label="Channel ID to edit", placeholder="UCxxxxxxxxxxxxxxxxxxxxxxxx") new_channel_name = gr.Textbox(label="New channel name", placeholder="Enter new name") update_name_btn = gr.Button("Update Name", variant="secondary") update_name_result = gr.Textbox(label="Update Result", interactive=False) update_name_btn.click( update_channel_name_interface, inputs=[edit_channel_id, new_channel_name], outputs=[update_name_result] ) gr.Markdown("## Registered Channels") channel_list_html = gr.HTML() refresh_channels_btn = gr.Button("Refresh list", variant="secondary") refresh_channels_btn.click(show_channel_management, inputs=[], outputs=[channel_list_html]) # initial load app.load(show_channel_management, inputs=[], outputs=[channel_list_html]) if __name__ == "__main__": app.launch()