| import gradio as gr |
| import sqlite3 |
| import json |
| import requests |
| from datetime import datetime, timedelta, timezone |
| from typing import List, Dict, Any, Optional |
| import google.generativeai as genai |
| from googleapiclient.discovery import build |
| import pandas as pd |
| import re |
| from collections import defaultdict |
| import base64 |
|
|
| |
| |
| YOUTUBE_API_KEY: Optional[str] = None |
| GEMINI_API_KEY: Optional[str] = None |
| model = None |
| youtube = None |
|
|
|
|
| def set_api_keys(youtube_key: Optional[str], gemini_key: Optional[str]) -> tuple[str, str, str]: |
| """Apply API keys provided by the user at runtime. |
| |
| This will configure the Gemini client and the YouTube Data API client |
| so the rest of the app uses the provided keys instead of environment vars. |
| """ |
| global YOUTUBE_API_KEY, GEMINI_API_KEY, model, youtube |
| messages = [] |
|
|
| |
| if gemini_key: |
| try: |
| genai.configure(api_key=gemini_key) |
| model = genai.GenerativeModel('gemini-2.5-flash') |
| GEMINI_API_KEY = gemini_key |
| messages.append("Gemini API key applied successfully.") |
| except Exception as e: |
| messages.append(f"Failed to apply Gemini API key: {e}") |
|
|
| |
| if youtube_key: |
| try: |
| youtube = build('youtube', 'v3', developerKey=youtube_key) |
| YOUTUBE_API_KEY = youtube_key |
| messages.append("YouTube API key applied successfully.") |
| except Exception as e: |
| messages.append(f"Failed to apply YouTube API key: {e}") |
|
|
| if not messages: |
| |
| return "No API keys provided.", "", "" |
|
|
| |
| return "\n".join(messages), YOUTUBE_API_KEY or "", GEMINI_API_KEY or "" |
|
|
|
|
| class YouTubeCompetitorAnalyzer: |
| def __init__(self): |
| self.init_database() |
| |
| def init_database(self): |
| """Initialize the database""" |
| conn = sqlite3.connect('competitor_data.db') |
| cursor = conn.cursor() |
| |
| |
| cursor.execute(''' |
| CREATE TABLE IF NOT EXISTS channels ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| channel_id TEXT UNIQUE, |
| channel_name TEXT, |
| channel_icon_url TEXT, |
| subscriber_count INTEGER, |
| added_date TEXT, |
| last_updated_at TEXT |
| ) |
| ''') |
| |
| |
| cursor.execute(''' |
| CREATE TABLE IF NOT EXISTS videos ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| video_id TEXT UNIQUE, |
| channel_id TEXT, |
| title TEXT, |
| description TEXT, |
| tags TEXT, |
| published_at TEXT, |
| view_count INTEGER, |
| thumbnail_url TEXT, |
| detected_person TEXT, |
| detection_source TEXT, |
| importance_level TEXT, |
| created_at TEXT |
| ) |
| ''') |
| |
| |
| cursor.execute(''' |
| CREATE TABLE IF NOT EXISTS trends ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| person_name TEXT, |
| video_ids TEXT, |
| trend_date TEXT, |
| is_active BOOLEAN |
| ) |
| ''') |
| |
| conn.commit() |
| conn.close() |
| |
| def extract_person_from_title_with_gemini(self, title: str) -> Optional[str]: |
| """Extract a person's name from the title using Gemini (global, highest priority)""" |
| if not model: |
| return None |
| try: |
| prompt = f""" |
| Please extract a single famous person's name (historical or contemporary) from this YouTube title. |
| |
| Title: "{title}" |
| |
| Target: |
| Globally well-known individuals (no restriction on nationality, era, or field) |
| - People from any country or region worldwide |
| - From ancient to modern times |
| - Any field: politics, business, philosophy, literature, science, arts, religion, sports, etc. |
| - Real historical or contemporary figures |
| |
| Criteria: |
| - Widely known at a general-knowledge level |
| - Frequently mentioned in books, education, or media |
| - Identifiable as a specific real person by proper name |
| |
| Response format: |
| - If a matching person exists: return the person's name only (in Japanese) |
| - If none: return "なし" |
| - If multiple apply: return the single most relevant person |
| |
| Note: Do not restrict by nationality, era, or field. Consider notable people worldwide. |
| |
| Examples: |
| "The secret of innovation by Steve Jobs" -> Steve Jobs |
| "Learning leadership from Confucius" -> Confucius |
| "Introduction to Einstein's theory of relativity" -> Einstein |
| "Konosuke Matsushita on business philosophy" -> Konosuke Matsushita |
| "General success tips" -> none |
| """ |
| |
| response = model.generate_content(prompt) |
| result = response.text.strip() |
| |
| |
| if not result or result.lower() in ['なし', 'none', '該当なし', '不明']: |
| return None |
| |
| |
| clean_result = re.sub(r'[「」『』【】|\n\r\t]', '', result).strip() |
| |
| |
| global_name_pattern = r'^[\u4E00-\u9FAF\u3040-\u309F\u30A0-\u30FF\uAC00-\uD7AF\u0041-\u005A\u0061-\u007A\u00C0-\u017F\u0100-\u024F\s\u30FB\u00B7\u2022]{2,15}$' |
| if re.match(global_name_pattern, clean_result): |
| return clean_result |
| else: |
| return None |
| |
| except Exception as e: |
| print(f"Gemini global title parsing error: {e}") |
| return None |
| def extract_person_from_description_with_gemini(self, description: str) -> Optional[str]: |
| if not description or len(description.strip()) < 10 or not model: |
| return None |
| |
| try: |
| |
| desc_excerpt = description[:500] if len(description) > 500 else description |
|
|
| prompt = f""" |
| Please extract a single famous person's name (historical or contemporary) from this YouTube video's description. |
| |
| Description excerpt: "{desc_excerpt}" |
| |
| Target: |
| Globally well-known individuals (no restriction on nationality, era, or field) |
| |
| Criteria: |
| - Widely known at a general-knowledge level |
| - Frequently mentioned in books, education, or media |
| - Identifiable as a specific real person by proper name |
| |
| Response format: |
| - If a matching person exists: return the person's name only (in Japanese) |
| - If none: return "なし" |
| - If multiple apply: return the single most relevant person |
| - Hashtags (e.g., #SteveJobs, #Confucius) should also be considered |
| |
| Note: Do not restrict by nationality, era, or field. Consider notable people worldwide. |
| """ |
| |
| response = model.generate_content(prompt) |
| result = response.text.strip() |
| |
| if not result or result.lower() in ['なし', 'none', '該当なし', '不明']: |
| return None |
| |
| clean_result = re.sub(r'[「」『』【】|#\n\r\t]', '', result).strip() |
| |
| |
| global_name_pattern = r'^[\u4E00-\u9FAF\u3040-\u309F\u30A0-\u30FF\uAC00-\uD7AF\u0041-\u005A\u0061-\u007A\u00C0-\u017F\u0100-\u024F\s\u30FB\u00B7\u2022]{2,15}$' |
| if re.match(global_name_pattern, clean_result): |
| return clean_result |
| else: |
| return None |
| |
| except Exception as e: |
| print(f"Gemini global description parsing error: {e}") |
| return None |
| |
| def extract_person_from_tags(self, tags: List[str]) -> Optional[str]: |
| """Extract a person's name from tags (global, priority 3)""" |
| if not tags: |
| return None |
| |
| |
| global_name_pattern = r'^[\u4E00-\u9FAF\u3040-\u309F\u30A0-\u30FF\uAC00-\uD7AF\u0041-\u005A\u0061-\u007A\u00C0-\u017F\u0100-\u024F\s\u30FB\u00B7\u2022]{2,15}$' |
| |
| |
| for tag in tags: |
| if re.match(global_name_pattern, tag): |
| |
| exclude_words = [ |
| '動画', '投稿', '更新', '配信', '人生', '経営', '仕事', '成功', '失敗', |
| 'video', 'life', 'business', 'success', 'leadership', 'philosophy', |
| 'motivation', 'inspiration', 'education', 'training', 'coach' |
| ] |
| if tag not in exclude_words and tag.lower() not in [word.lower() for word in exclude_words]: |
| return tag |
| |
| return None |
| |
| def analyze_thumbnail_ocr(self, thumbnail_url: str) -> Optional[str]: |
| """Thumbnail OCR analysis (priority 4)""" |
| if not model: |
| return None |
| try: |
| response = requests.get(thumbnail_url, timeout=10) |
| image_data = base64.b64encode(response.content).decode() |
| |
| prompt = """ |
| Extract text from this YouTube thumbnail image. |
| Pay special attention to names of famous individuals (worldwide, historical or modern). |
| |
| Reply in JSON using the following format: |
| { |
| "detected_text": "All text read by OCR", |
| "person_names": ["List of extracted person names"] |
| } |
| """ |
| |
| image_part = { |
| "mime_type": "image/jpeg", |
| "data": image_data |
| } |
| |
| response = model.generate_content([prompt, image_part]) |
| result_text = response.text |
| |
| |
| json_match = re.search(r'```json\n(.*?)\n```', result_text, re.DOTALL) |
| if json_match: |
| result_text = json_match.group(1) |
| |
| try: |
| result = json.loads(result_text) |
| person_names = result.get('person_names', []) |
| return person_names[0] if person_names else None |
| except json.JSONDecodeError: |
| return None |
| |
| except Exception as e: |
| print(f"Thumbnail OCR analysis error: {e}") |
| return None |
| |
| def analyze_thumbnail_face_recognition(self, thumbnail_url: str) -> Optional[str]: |
| """Thumbnail face recognition (priority 5)""" |
| if not model: |
| return None |
| try: |
| response = requests.get(thumbnail_url, timeout=10) |
| image_data = base64.b64encode(response.content).decode() |
| |
| prompt = """ |
| Identify the person shown in this image. |
| Consider famous people worldwide, including historical figures, philosophers, business leaders, writers, and scientists. |
| |
| Only return a person's name if you can identify them with confidence. |
| If unknown, return null. |
| |
| Respond in JSON: |
| { |
| "person_name": "Identified person name or null" |
| } |
| """ |
| |
| image_part = { |
| "mime_type": "image/jpeg", |
| "data": image_data |
| } |
| |
| response = model.generate_content([prompt, image_part]) |
| result_text = response.text |
| |
| json_match = re.search(r'```json\n(.*?)\n```', result_text, re.DOTALL) |
| if json_match: |
| result_text = json_match.group(1) |
| |
| try: |
| result = json.loads(result_text) |
| return result.get('person_name') |
| except json.JSONDecodeError: |
| return None |
| |
| except Exception as e: |
| print(f"Face recognition analysis error: {e}") |
| return None |
| |
| def extract_person_comprehensive(self, video_data: Dict) -> tuple[Optional[str], str]: |
| """Comprehensive person extraction (Gemini prioritized, global system)""" |
| title = video_data.get('title', '') |
| description = video_data.get('description', '') |
| tags = video_data.get('tags', []) |
| thumbnail_url = video_data.get('thumbnail_url', '') |
| |
| |
| person = self.extract_person_from_title_with_gemini(title) |
| if person: |
| return person, "Gemini-GlobalTitle" |
| |
| |
| person = self.extract_person_from_description_with_gemini(description) |
| if person: |
| return person, "Gemini-GlobalDescription" |
| |
| |
| person = self.extract_person_from_tags(tags) |
| if person: |
| return person, "GlobalTag" |
| |
| |
| person = self.analyze_thumbnail_ocr(thumbnail_url) |
| if person: |
| return person, "ThumbnailOCR" |
| |
| |
| person = self.analyze_thumbnail_face_recognition(thumbnail_url) |
| if person: |
| return person, "FaceRecognition" |
| |
| return None, "Not detected" |
| |
| def add_channel(self, channel_id: str) -> str: |
| """Add a channel""" |
| if not youtube: |
| return "YouTube API key is not set." |
| try: |
| |
| response = youtube.channels().list( |
| part='snippet,statistics', |
| id=channel_id |
| ).execute() |
| |
| if not response['items']: |
| return f"ID: {channel_id} - Channel not found" |
| |
| channel_info = response['items'][0] |
| channel_name = channel_info['snippet']['title'] |
| channel_icon = channel_info['snippet']['thumbnails']['default']['url'] |
| subscriber_count = int(channel_info['statistics'].get('subscriberCount', 0)) |
| |
| conn = sqlite3.connect('competitor_data.db') |
| cursor = conn.cursor() |
| |
| cursor.execute(''' |
| INSERT OR REPLACE INTO channels |
| (channel_id, channel_name, channel_icon_url, subscriber_count, added_date) |
| VALUES (?, ?, ?, ?, ?) |
| ''', (channel_id, channel_name, channel_icon, subscriber_count, datetime.now().isoformat())) |
| |
| conn.commit() |
| conn.close() |
| |
| return f"Channel '{channel_name}' added" |
| |
| except Exception as e: |
| return f"ID: {channel_id} - Error: {str(e)}" |
| |
| def delete_channel(self, channel_id: str) -> str: |
| """Delete a channel""" |
| try: |
| conn = sqlite3.connect('competitor_data.db') |
| cursor = conn.cursor() |
| |
| |
| cursor.execute('SELECT channel_name FROM channels WHERE channel_id = ?', (channel_id,)) |
| result = cursor.fetchone() |
| |
| if not result: |
| conn.close() |
| return "Channel not found" |
| |
| channel_name = result[0] |
| |
| |
| cursor.execute('DELETE FROM videos WHERE channel_id = ?', (channel_id,)) |
| cursor.execute('DELETE FROM channels WHERE channel_id = ?', (channel_id,)) |
| |
| conn.commit() |
| conn.close() |
| |
| return f"Channel '{channel_name}' deleted" |
| |
| except Exception as e: |
| return f"Deletion error: {str(e)}" |
| |
| def update_channel_name(self, channel_id: str, new_name: str) -> str: |
| """Update channel name""" |
| try: |
| conn = sqlite3.connect('competitor_data.db') |
| cursor = conn.cursor() |
| |
| cursor.execute(''' |
| UPDATE channels |
| SET channel_name = ? |
| WHERE channel_id = ? |
| ''', (new_name, channel_id)) |
| |
| if cursor.rowcount == 0: |
| conn.close() |
| return "Channel not found" |
| |
| conn.commit() |
| conn.close() |
| |
| return f"Channel name updated to '{new_name}'" |
| |
| except Exception as e: |
| return f"Update error: {str(e)}" |
| |
| def get_channels(self) -> List[Dict]: |
| """Get list of registered channels""" |
| conn = sqlite3.connect('competitor_data.db') |
| cursor = conn.cursor() |
| |
| cursor.execute(''' |
| SELECT channel_id, channel_name, channel_icon_url, subscriber_count, added_date, last_updated_at |
| FROM channels |
| ORDER BY added_date DESC |
| ''') |
| |
| channels = [] |
| for row in cursor.fetchall(): |
| channels.append({ |
| 'id': row[0], |
| 'name': row[1], |
| 'icon_url': row[2], |
| 'subscriber_count': row[3], |
| 'added_date': row[4], |
| 'last_updated_at': row[5] |
| }) |
| |
| conn.close() |
| return channels |
| |
| def fetch_videos_from_channel(self, channel_id: str, since_date: Optional[str] = None) -> List[Dict]: |
| """Fetch videos from a channel since the specified date""" |
| if not youtube: |
| return [] |
| try: |
| |
| if not since_date: |
| since_date_dt = datetime.now(timezone.utc) - timedelta(days=7) |
| else: |
| since_date_dt = datetime.fromisoformat(since_date) |
|
|
| |
| published_after = since_date_dt.isoformat().replace('+00:00', 'Z') |
|
|
| response = youtube.search().list( |
| part='snippet', |
| channelId=channel_id, |
| maxResults=50, |
| order='date', |
| publishedAfter=published_after, |
| type='video' |
| ).execute() |
| |
| videos = [] |
| video_ids = [item['id']['videoId'] for item in response['items']] |
| |
| |
| if video_ids: |
| video_details = youtube.videos().list( |
| part='statistics,snippet', |
| id=','.join(video_ids) |
| ).execute() |
| |
| for item in video_details['items']: |
| videos.append({ |
| 'video_id': item['id'], |
| 'title': item['snippet']['title'], |
| 'description': item['snippet'].get('description', ''), |
| 'tags': item['snippet'].get('tags', []), |
| 'published_at': item['snippet']['publishedAt'], |
| 'view_count': int(item['statistics'].get('viewCount', 0)), |
| 'thumbnail_url': item['snippet']['thumbnails']['high']['url'] |
| }) |
| |
| return videos |
| |
| except Exception as e: |
| print(f"Video fetch error: {e}") |
| return [] |
| |
| def determine_importance(self, video_data: Dict) -> str: |
| """Determine importance level""" |
| published_at_str = video_data['published_at'] |
| |
| if 'Z' not in published_at_str and '+' not in published_at_str: |
| published_at_str += 'Z' |
|
|
| published_at = datetime.fromisoformat(published_at_str.replace('Z', '+00:00')) |
|
|
| now_utc = datetime.now(published_at.tzinfo) |
| hours_since_published = (now_utc - published_at).total_seconds() / 3600 |
| view_count = video_data['view_count'] |
|
|
| if hours_since_published <= 24 and view_count >= 10000: |
| return "Critical" |
| elif hours_since_published <= 48 and view_count >= 10000: |
| return "Important" |
| else: |
| return "Normal" |
| |
| def detect_trends(self) -> List[Dict]: |
| """Detect trending clusters""" |
| conn = sqlite3.connect('competitor_data.db') |
| cursor = conn.cursor() |
| |
| |
| two_days_ago = (datetime.now() - timedelta(days=2)).isoformat() |
| |
| cursor.execute(''' |
| SELECT detected_person, COUNT(*) as video_count, |
| GROUP_CONCAT(video_id) as video_ids, |
| GROUP_CONCAT(DISTINCT channel_id) as channels, |
| GROUP_CONCAT(detection_source) as sources |
| FROM videos |
| WHERE detected_person IS NOT NULL |
| AND detected_person != '' |
| AND published_at > ? |
| GROUP BY detected_person |
| HAVING COUNT(*) >= 2 |
| AND COUNT(DISTINCT channel_id) >= 2 |
| ORDER BY video_count DESC |
| ''', (two_days_ago,)) |
| |
| trends = [] |
| for row in cursor.fetchall(): |
| person_name, count, video_ids, channels, sources = row |
| unique_channels = len(set(channels.split(','))) |
| |
| trends.append({ |
| 'person_name': person_name, |
| 'video_count': count, |
| 'unique_channels': unique_channels, |
| 'video_ids': video_ids.split(','), |
| 'detection_sources': sources.split(',') |
| }) |
| |
| conn.close() |
| return trends |
| |
| def update_all_data(self) -> str: |
| """Update data for all channels""" |
| channels = self.get_channels() |
| total_new_videos = 0 |
| |
| conn = sqlite3.connect('competitor_data.db') |
| cursor = conn.cursor() |
| |
| for channel in channels: |
| channel_id = channel['id'] |
| last_update = channel['last_updated_at'] |
| |
| |
| videos = self.fetch_videos_from_channel(channel_id, since_date=last_update) |
| |
| for video in videos: |
| |
| detected_person, detection_source = self.extract_person_comprehensive(video) |
| |
| |
| importance = self.determine_importance(video) |
| |
| |
| cursor.execute(''' |
| INSERT OR IGNORE INTO videos |
| (video_id, channel_id, title, description, tags, published_at, view_count, |
| thumbnail_url, detected_person, detection_source, importance_level, created_at) |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
| ''', ( |
| video['video_id'], |
| channel_id, |
| video['title'], |
| video['description'], |
| ','.join(video['tags']) if video['tags'] else '', |
| video['published_at'], |
| video['view_count'], |
| video['thumbnail_url'], |
| detected_person, |
| detection_source, |
| importance, |
| datetime.now(timezone.utc).isoformat() |
| )) |
| |
| if cursor.rowcount > 0: |
| total_new_videos += 1 |
|
|
| |
| cursor.execute(''' |
| UPDATE channels |
| SET last_updated_at = ? |
| WHERE channel_id = ? |
| ''', (datetime.now(timezone.utc).isoformat(), channel_id)) |
| |
| conn.commit() |
| conn.close() |
| |
| return f"Update complete: added {total_new_videos} new videos" |
| |
| def get_recent_videos_by_timerange(self, hours: int, limit: int = 50) -> List[Dict]: |
| """Get videos within the specified time range sorted by view count (JST-based)""" |
| conn = sqlite3.connect('competitor_data.db') |
| cursor = conn.cursor() |
| |
| |
| jst = timezone(timedelta(hours=9)) |
| cutoff_time_jst = datetime.now(jst) - timedelta(hours=hours) |
| cutoff_time_utc = cutoff_time_jst.astimezone(timezone.utc) |
| |
| cursor.execute(''' |
| SELECT v.video_id, v.title, v.published_at, v.view_count, v.thumbnail_url, |
| v.detected_person, v.detection_source, v.importance_level, |
| c.channel_name, c.channel_icon_url, v.channel_id |
| FROM videos v |
| JOIN channels c ON v.channel_id = c.channel_id |
| WHERE v.published_at > ? |
| ORDER BY v.view_count DESC |
| LIMIT ? |
| ''', (cutoff_time_utc.isoformat(), limit)) |
| |
| videos = [] |
| for row in cursor.fetchall(): |
| video_id, title, published_at, view_count, thumbnail_url, detected_person, \ |
| detection_source, importance_level, channel_name, channel_icon_url, channel_id = row |
| |
| |
| published_at_utc = datetime.fromisoformat(published_at.replace('Z', '+00:00')) |
| published_at_jst = published_at_utc.astimezone(jst) |
| |
| videos.append({ |
| 'video_id': video_id, |
| 'title': title, |
| 'published_at': published_at, |
| 'published_at_jst': published_at_jst, |
| 'view_count': view_count, |
| 'thumbnail_url': thumbnail_url, |
| 'detected_person': detected_person or 'Not detected', |
| 'detection_source': detection_source or '-', |
| 'importance_level': importance_level or 'Normal', |
| 'channel_name': channel_name, |
| 'channel_icon_url': channel_icon_url, |
| 'channel_id': channel_id |
| }) |
| |
| conn.close() |
| return videos |
| |
| def generate_recent_videos_html(self, hours: int, limit: int = 50) -> str: |
| """Generate HTML for the recent videos list""" |
| videos = self.get_recent_videos_by_timerange(hours, limit) |
| |
| |
| time_range_text = f"Past {hours} hours" |
| |
| html = f""" |
| <!DOCTYPE html> |
| <html lang="en"> |
| <head> |
| <meta charset="UTF-8"> |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| <title>Recent Videos - {time_range_text}</title> |
| <style> |
| body {{ font-family: 'Helvetica Neue', Arial, sans-serif; margin: 0; padding: 20px; background-color: #000000 !important; color: #ffffff !important; }} |
| .container {{ max-width: 1200px; margin: 0 auto; background-color: transparent !important; }} |
| h1 {{ color: #ffffff !important; text-align: center; margin-bottom: 30px; }} |
| .stats-info {{ background: rgba(255,255,255,0.03) !important; padding: 15px !important; border-radius: 8px !important; margin-bottom: 20px !important; text-align: center !important; font-size: 16px !important; color: #bcdffb !important; }} |
| .video-item {{ display: flex !important; align-items: flex-start !important; background: #0b0b0b !important; margin-bottom: 15px !important; padding: 15px !important; border-radius: 8px !important; box-shadow: 0 2px 6px rgba(0,0,0,0.6) !important; border-left: 4px solid #222 !important; }} |
| .video-item.Critical {{ border-left-color: #b71c1c !important; background-color: #120202 !important; }} |
| .video-item.Important {{ border-left-color: #bf360c !important; background-color: #241100 !important; }} |
| .thumbnail {{ width: 160px !important; height: 90px !important; object-fit: cover !important; margin-right: 15px !important; border-radius: 4px !important; flex-shrink: 0 !important; }} |
| .video-info {{ flex: 1 !important; }} |
| .video-title {{ font-weight: bold !important; margin-bottom: 8px !important; font-size: 16px !important; line-height: 1.4 !important; color: #ffffff !important; }} |
| .video-meta {{ color: #cfcfcf !important; font-size: 14px !important; line-height: 1.6 !important; margin-bottom: 5px !important; }} |
| .channel-info {{ display: flex !important; align-items: center !important; margin-bottom: 8px !important; }} |
| .channel-icon {{ width: 24px !important; height: 24px !important; border-radius: 50% !important; margin-right: 8px !important; }} |
| .stats {{ color: #64b5f6 !important; font-weight: bold !important; }} |
| .importance-badge {{ display: inline-block !important; padding: 2px 8px !important; border-radius: 12px !important; font-size: 12px !important; font-weight: bold !important; margin-left: 10px !important; }} |
| .importance-badge.Critical {{ background: #b71c1c !important; color: white !important; }} |
| .importance-badge.Important {{ background: #bf360c !important; color: white !important; }} |
| .importance-badge.Normal {{ background: #2e7d32 !important; color: white !important; }} |
| .detection-badge {{ display: inline-block !important; padding: 2px 6px !important; border-radius: 8px !important; font-size: 11px !important; background: #1976d2 !important; color: white !important; margin-left: 5px !important; }} |
| .video-links {{ margin-top: 8px !important; }} |
| .video-links a {{ display: inline-block !important; margin-right: 15px !important; color: #90caf9 !important; text-decoration: none !important; font-size: 14px !important; padding: 4px 8px !important; border: 1px solid #263238 !important; border-radius: 4px !important; transition: background-color 0.3s !important; }} |
| .video-links a:hover {{ background-color: rgba(227,242,253,0.04) !important; }} |
| .no-videos {{ text-align: center !important; padding: 40px !important; color: #bdbdbd !important; font-size: 16px !important; }} |
| </style> |
| </head> |
| <body> |
| <div class="container"> |
| <h1>📺 Recent Videos ({time_range_text} · Sorted by views)</h1> |
| |
| <div class="stats-info"> |
| 📊 Showing {len(videos)} videos (up to {limit}) | 🕐 Times shown in JST | 📈 Sorted by views |
| </div> |
| """ |
| |
| if videos: |
| for video in videos: |
| |
| published_jst = video['published_at_jst'].strftime('%m/%d %H:%M') |
| |
| |
| video_url = f"https://www.youtube.com/watch?v={video['video_id']}" |
| channel_url = f"https://www.youtube.com/channel/{video['channel_id']}" |
| |
| html += f""" |
| <div class="video-item {video['importance_level']}"> |
| <img src="{video['thumbnail_url']}" alt="thumbnail" class="thumbnail"> |
| <div class="video-info"> |
| <div class="video-title">{video['title']}</div> |
| <div class="channel-info"> |
| <img src="{video['channel_icon_url']}" alt="channel" class="channel-icon"> |
| <span>{video['channel_name']}</span> |
| </div> |
| <div class="video-meta"> |
| 📅 {published_jst} (JST) | |
| <span class="stats">👀 {video['view_count']:,} views</span> | |
| 👤 {video['detected_person']} |
| <span class="detection-badge">{video['detection_source']}</span> |
| <span class="importance-badge {video['importance_level']}">{video['importance_level']}</span> |
| </div> |
| <div class="video-links"> |
| <a href="{video_url}" target="_blank">🎬 Watch video</a> |
| <a href="{channel_url}" target="_blank">📺 Channel</a> |
| </div> |
| </div> |
| </div> |
| """ |
| else: |
| html += f""" |
| <div class="no-videos"> |
| 📭 No videos were posted in the {time_range_text}.<br> |
| Try updating data or expanding the time range. |
| </div> |
| """ |
| |
| html += """ |
| </div> |
| </body> |
| </html> |
| """ |
| |
| return html |
| |
| def generate_dashboard(self) -> str: |
| """Generate the HTML dashboard""" |
| trends = self.detect_trends() |
| |
| conn = sqlite3.connect('competitor_data.db') |
| cursor = conn.cursor() |
| |
| |
| cursor.execute(''' |
| SELECT v.title, v.published_at, v.view_count, v.detected_person, |
| v.importance_level, c.channel_name, v.thumbnail_url, v.video_id, |
| v.channel_id, v.detection_source |
| FROM videos v |
| JOIN channels c ON v.channel_id = c.channel_id |
| WHERE v.published_at > ? |
| ORDER BY |
| CASE v.importance_level |
| WHEN 'Critical' THEN 1 |
| WHEN 'Important' THEN 2 |
| ELSE 3 |
| END, |
| v.view_count DESC |
| ''', ((datetime.now() - timedelta(days=7)).isoformat(),)) |
| |
| videos = cursor.fetchall() |
| conn.close() |
| |
| |
| html = """ |
| <!DOCTYPE html> |
| <html lang="en"> |
| <head> |
| <meta charset="UTF-8"> |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| <title>YouTube Competitor Analysis Dashboard (Global)</title> |
| <style> |
| body {{ font-family: 'Helvetica Neue', Arial, sans-serif; margin: 0; padding: 20px; background-color: #000000 !important; color: #ffffff !important; }} |
| .container {{ max-width: 1200px; margin: 0 auto; background-color: transparent !important; }} |
| h1 {{ color: #ffffff !important; text-align: center; margin-bottom: 30px; }} |
| h2 {{ border-bottom: 2px solid rgba(255,255,255,0.06) !important; padding-bottom: 10px !important; }} |
| .section {{ background: #070707 !important; margin-bottom: 30px !important; padding: 20px !important; border-radius: 8px !important; box-shadow: 0 2px 6px rgba(0,0,0,0.6) !important; }} |
| .trend-item {{ border: 1px solid #1f1f1f !important; margin-bottom: 15px !important; padding: 15px !important; border-radius: 5px !important; background: #0b0b0b !important; }} |
| .trend-title {{ font-size: 18px !important; font-weight: bold !important; color: #ff8a80 !important; margin-bottom: 10px !important; }} |
| .trend-meta {{ font-size: 14px !important; color: #bdbdbd !important; margin-bottom: 5px !important; }} |
| .video-item {{ display: flex !important; align-items: flex-start !important; margin-bottom: 15px !important; padding: 10px !important; border-left: 4px solid #222 !important; }} |
| .video-item.Critical {{ border-left-color: #b71c1c !important; background-color: #120202 !important; }} |
| .video-item.Important {{ border-left-color: #bf360c !important; background-color: #241100 !important; }} |
| .thumbnail {{ width: 160px !important; height: 90px !important; object-fit: cover !important; margin-right: 15px !important; border-radius: 4px !important; flex-shrink: 0 !important; }} |
| .video-info {{ flex: 1 !important; }} |
| .video-title {{ font-weight: bold !important; margin-bottom: 5px !important; font-size: 16px !important; color: #ffffff !important; }} |
| .video-meta {{ color: #cfcfcf !important; font-size: 14px !important; line-height: 1.6 !important; }} |
| .importance-badge {{ display: inline-block !important; padding: 2px 8px !important; border-radius: 12px !important; font-size: 12px !important; font-weight: bold !important; margin-left: 10px !important; }} |
| .importance-badge.Critical {{ background: #b71c1c !important; color: white !important; }} |
| .importance-badge.Important {{ background: #bf360c !important; color: white !important; }} |
| .importance-badge.Normal {{ background: #2e7d32 !important; color: white !important; }} |
| .detection-badge {{ display: inline-block !important; padding: 2px 6px !important; border-radius: 8px !important; font-size: 11px !important; background: #1976d2 !important; color: white !important; margin-left: 5px !important; }} |
| .stats {{ color: #64b5f6 !important; font-weight: bold !important; }} |
| .video-links {{ margin-top: 8px !important; }} |
| .video-links a {{ display: inline-block !important; margin-right: 15px !important; color: #90caf9 !important; text-decoration: none !important; font-size: 14px !important; padding: 4px 8px !important; border: 1px solid #263238 !important; border-radius: 4px !important; transition: background-color 0.3s !important; }} |
| .video-links a:hover {{ background-color: rgba(227,242,253,0.04) !important; }} |
| </style> |
| </head> |
| <body> |
| <div class="container"> |
| <h1>🌍 YouTube Competitor Analysis Dashboard (Global)</h1> |
| |
| <div class="section"> |
| <h2>🔥 Currently Trending Clusters</h2> |
| """ |
| |
| if trends: |
| for trend in trends: |
| detection_sources = list(set(trend['detection_sources'])) |
| sources_text = ', '.join(detection_sources) |
| |
| html += f""" |
| <div class="trend-item"> |
| <div class="trend-title">👤 {trend['person_name']}</div> |
| <div class="trend-meta">📺 <strong>{trend['video_count']}</strong> videos posted across <strong>{trend['unique_channels']}</strong> channels</div> |
| <div class="trend-meta">🔍 Detection methods: {sources_text}</div> |
| </div> |
| """ |
| else: |
| html += "<p>There are currently no trending clusters.</p>" |
| |
| html += """ |
| </div> |
| |
| <div class="section"> |
| <h2>📺 Recent Videos (Global person detection)</h2> |
| """ |
| |
| for video in videos[:20]: |
| (title, published_at, view_count, detected_person, importance, |
| channel_name, thumbnail_url, video_id, channel_id, detection_source) = video |
| |
| published_date = datetime.fromisoformat(published_at.replace('Z', '+00:00')).strftime('%m/%d %H:%M') |
| person_display = detected_person if detected_person else "Unknown" |
| |
| |
| video_url = f"https://www.youtube.com/watch?v={video_id}" |
| channel_url = f"https://www.youtube.com/channel/{channel_id}" |
| |
| html += f""" |
| <div class="video-item {importance}"> |
| <img src="{thumbnail_url}" alt="thumbnail" class="thumbnail"> |
| <div class="video-info"> |
| <div class="video-title">{title}</div> |
| <div class="video-meta"> |
| 📺 {channel_name} | 📅 {published_date} | |
| <span class="stats">👀 {view_count:,} views</span> | |
| 👤 {person_display} |
| <span class="detection-badge">{detection_source}</span> |
| <span class="importance-badge {importance}">{importance}</span> |
| </div> |
| <div class="video-links"> |
| <a href="{video_url}" target="_blank">🎬 Watch video</a> |
| <a href="{channel_url}" target="_blank">📺 Channel</a> |
| </div> |
| </div> |
| </div> |
| """ |
| |
| html += """ |
| </div> |
| </div> |
| </body> |
| </html> |
| """ |
| |
| return html |
| |
| def generate_channel_management_html(self) -> str: |
| """Generate HTML for channel management""" |
| channels = self.get_channels() |
| |
| html = """ |
| <!DOCTYPE html> |
| <html lang="en"> |
| <head> |
| <meta charset="UTF-8"> |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| <title>Channel Management (Global)</title> |
| <style> |
| body {{ font-family: 'Helvetica Neue', Arial, sans-serif; margin: 0; padding: 20px; background-color: #000000 !important; color: #ffffff !important; }} |
| .container {{ max-width: 800px; margin: 0 auto; background-color: transparent !important; }} |
| h2 {{ border-bottom: 2px solid rgba(255,255,255,0.06) !important; padding-bottom: 10px !important; }} |
| .channel-item {{ display: flex !important; align-items: center !important; background: #0b0b0b !important; margin-bottom: 15px !important; padding: 15px !important; border-radius: 8px !important; box-shadow: 0 2px 6px rgba(0,0,0,0.6) !important; }} |
| .channel-icon {{ width: 48px !important; height: 48px !important; border-radius: 50% !important; margin-right: 15px !important; object-fit: cover !important; }} |
| .channel-info {{ flex: 1 !important; }} |
| .channel-name {{ font-weight: bold !important; font-size: 16px !important; margin-bottom: 5px !important; color: #ffffff !important; }} |
| .channel-meta {{ color: #bdbdbd !important; font-size: 14px !important; }} |
| .channel-actions {{ display: flex !important; gap: 10px !important; }} |
| .btn {{ padding: 6px 12px !important; border: none !important; border-radius: 4px !important; cursor: pointer !important; font-size: 12px !important; text-decoration: none !important; display: inline-block !important; transition: opacity 0.3s !important; }} |
| .btn-edit {{ background: #1976d2 !important; color: white !important; border: 1px solid #263238 !important; }} |
| .btn-delete {{ background: #b71c1c !important; color: white !important; border: 1px solid #331111 !important; }} |
| .btn:hover {{ opacity: 0.9 !important; }} |
| </style> |
| </head> |
| <body> |
| <div class="container"> |
| <h2>🌍 Registered Channels (Global person detection)</h2> |
| """ |
| |
| if channels: |
| for channel in channels: |
| added_date = datetime.fromisoformat(channel['added_date']).strftime('%Y/%m/%d') |
| subscriber_text = f"{channel['subscriber_count']:,} subscribers" if channel['subscriber_count'] else "Private" |
| |
| html += f""" |
| <div class="channel-item"> |
| <img src="{channel['icon_url']}" alt="icon" class="channel-icon"> |
| <div class="channel-info"> |
| <div class="channel-name">{channel['name']}</div> |
| <div class="channel-meta"> |
| 👥 Subscribers: {subscriber_text} | 📅 Added: {added_date} |
| </div> |
| <div class="channel-meta"> |
| 🆔 {channel['id']} |
| </div> |
| </div> |
| <div class="channel-actions"> |
| <button class="btn btn-edit" onclick="editChannel('{channel['id']}', '{channel['name']}')">✏️ Edit</button> |
| <button class="btn btn-delete" onclick="deleteChannel('{channel['id']}')">🗑️ Delete</button> |
| </div> |
| </div> |
| """ |
| else: |
| html += "<p>No channels registered.</p>" |
| |
| html += """ |
| <script> |
| function editChannel(channelId, currentName) { |
| const newName = prompt('Enter new channel name:', currentName); |
| if (newName && newName !== currentName) { |
| alert('Channel name updates must be performed from the Gradio interface.'); |
| } |
| } |
| |
| function deleteChannel(channelId) { |
| if (confirm('Are you sure you want to delete this channel?\\nRelated video data will also be removed.')) { |
| alert('Channel deletion must be performed from the Gradio interface.\\nChannel ID: ' + channelId); |
| } |
| } |
| </script> |
| </div> |
| </body> |
| </html> |
| """ |
| |
| return html |
|
|
| |
| analyzer = YouTubeCompetitorAnalyzer() |
|
|
| |
| |
| DARK_THEME_CSS = """ |
| :root, html, body, .gradio-container { |
| background-color: #000000 !important; |
| color: #ffffff !important; |
| color-scheme: dark !important; |
| } |
| .gradio-container, .gradio-container * { |
| background-color: transparent !important; |
| color: #ffffff !important; |
| border-color: #333333 !important; |
| } |
| /* Inputs, buttons and textareas */ |
| button, .gr-button, input, textarea, select, .gradio-textbox, .gradio-file, .gradio-dropdown, .gradio-button { |
| background-color: #0b0b0b !important; |
| color: #ffffff !important; |
| border: 1px solid #333333 !important; |
| } |
| input::placeholder, textarea::placeholder { |
| color: #bfbfbf !important; |
| } |
| .gradio-markdown, .gradio-html, .gradio-label, .gradio-textbox, .gradio-output { |
| color: #ffffff !important; |
| } |
| /* Ensure components that Gradio or Spaces might wrap still show dark backgrounds */ |
| .gradio-container .container, .gradio-container .section, .gradio-container .card { |
| background-color: #000000 !important; |
| color: #ffffff !important; |
| } |
| /* Give high contrast to borders and badges */ |
| .importance-badge, .detection-badge { |
| color: #ffffff !important; |
| } |
| """ |
|
|
| |
| def add_channel_interface(channel_ids_text): |
| """Interface function that supports adding multiple channel IDs""" |
| if not channel_ids_text: |
| return "Please enter channel ID" |
| |
| |
| channel_ids = [cid.strip() for cid in channel_ids_text.split('\n') if cid.strip()] |
| |
| if not channel_ids: |
| return "No valid channel IDs provided." |
|
|
| results = [] |
| |
| for channel_id in channel_ids: |
| result = analyzer.add_channel(channel_id) |
| results.append(result) |
| |
| |
| return "\n".join(results) |
|
|
| def delete_channel_interface(channel_id): |
| if not channel_id: |
| return "Please enter a channel ID to delete" |
| return analyzer.delete_channel(channel_id.strip()) |
|
|
| def update_channel_name_interface(channel_id, new_name): |
| if not channel_id or not new_name: |
| return "Please enter channel ID and new name" |
| return analyzer.update_channel_name(channel_id.strip(), new_name.strip()) |
|
|
| def update_data_interface(): |
| return analyzer.update_all_data() |
|
|
| def show_dashboard(): |
| return analyzer.generate_dashboard() |
|
|
| def show_channel_management(): |
| return analyzer.generate_channel_management_html() |
|
|
| def show_recent_videos_interface(hours_selection, limit_selection): |
| """Interface function for recent videos list""" |
| hours_map = { |
| "6 hours": 6, |
| "12 hours": 12, |
| "24 hours": 24, |
| "48 hours": 48 |
| } |
| |
| limit_map = { |
| "20 items": 20, |
| "50 items": 50, |
| "100 items": 100, |
| "200 items": 200 |
| } |
| |
| hours = hours_map.get(hours_selection, 24) |
| limit = limit_map.get(limit_selection, 50) |
| |
| return analyzer.generate_recent_videos_html(hours, limit) |
|
|
| |
| with gr.Blocks(title="YouTube Competitor Analysis (Global)", theme=gr.themes.Monochrome(), css=DARK_THEME_CSS) as app: |
| gr.Markdown("# 🌍 YouTube Competitor Analysis App (Global)") |
| gr.Markdown("Analyze competitor channel uploads and detect global clustered trends using **Gemini 2.5 Flash**.") |
| |
| with gr.Tab("📊 Dashboard"): |
| gr.Markdown("## Global Analysis Dashboard") |
| |
| refresh_btn = gr.Button("📈 Refresh Dashboard", variant="secondary") |
| dashboard_html = gr.HTML() |
| |
| refresh_btn.click(show_dashboard, inputs=[], outputs=[dashboard_html]) |
| |
| |
| app.load(show_dashboard, inputs=[], outputs=[dashboard_html]) |
| |
| with gr.Tab("📺 Recent Videos"): |
| gr.Markdown("## 📺 Recent Videos (select time range and max items)") |
| gr.Markdown(""" |
| ### Features |
| - ⏰ Time range selection: choose between 6 to 48 hours |
| - 📊 Sorted by view count: show the highest-view videos first |
| - 🕐 Times displayed in JST (UTC+9) |
| - 🔢 Max items: limit display between 20 and 200 |
| - 🌍 Global detection: detect notable people worldwide |
| """) |
| |
| with gr.Row(): |
| hours_dropdown = gr.Dropdown( |
| choices=["6 hours", "12 hours", "24 hours", "48 hours"], |
| value="24 hours", |
| label="⏰ Time Range" |
| ) |
| limit_dropdown = gr.Dropdown( |
| choices=["20 items", "50 items", "100 items", "200 items"], |
| value="50 items", |
| label="🔢 Max items" |
| ) |
| |
| update_recent_btn = gr.Button("🔄 Update Recent Videos", variant="primary", size="lg") |
| recent_videos_html = gr.HTML() |
| |
| update_recent_btn.click( |
| show_recent_videos_interface, |
| inputs=[hours_dropdown, limit_dropdown], |
| outputs=[recent_videos_html] |
| ) |
| |
| |
| app.load( |
| show_recent_videos_interface, |
| inputs=[gr.State("24 hours"), gr.State("50 items")], |
| outputs=[recent_videos_html] |
| ) |
| |
| with gr.Tab("🔄 Data Update"): |
| gr.Markdown("## 🌍 Global AI High-Precision Data Update System") |
| gr.Markdown(""" |
| ### 🎯 Person extraction using Gemini 2.5 Flash (priority order) |
| 1. **🤖 Gemini title analysis** - high-precision extraction of notable people from titles |
| 2. **🤖 Gemini description analysis** - detect people via hashtags and description |
| 3. **🏷️ Tag analysis** - identify multilingual person names from tags |
| 4. **🖼️ Thumbnail OCR** - read text from thumbnails |
| 5. **👤 Face recognition** - identify persons via face recognition |
| |
| **🌍 Global support**: detects people regardless of nationality, era, or field |
| """) |
|
|
| |
| with gr.Row(): |
| youtube_key_input = gr.Textbox(label="YouTube API Key", placeholder="Enter YouTube API key", type="password") |
| gemini_key_input = gr.Textbox(label="Gemini API Key", placeholder="Enter Gemini API key", type="password") |
|
|
| apply_keys_btn = gr.Button("Apply API Keys", variant="secondary") |
| api_keys_result = gr.Textbox(label="API Key Status", interactive=False) |
|
|
| |
| youtube_key_state = gr.State("") |
| gemini_key_state = gr.State("") |
|
|
| |
| apply_keys_btn.click( |
| set_api_keys, |
| inputs=[youtube_key_input, gemini_key_input], |
| outputs=[api_keys_result, youtube_key_state, gemini_key_state] |
| ) |
|
|
| update_btn = gr.Button("🌍 Start Global Data Update", variant="primary", size="lg") |
| update_result = gr.Textbox(label="Update Result", interactive=False) |
|
|
| update_btn.click(update_data_interface, inputs=[], outputs=[update_result]) |
|
|
| with gr.Tab("📺 Channel Management"): |
| with gr.Row(): |
| with gr.Column(scale=2): |
| gr.Markdown("## Add Channels") |
| |
| channel_input = gr.TextArea( |
| label="Channel ID (one per line)", |
| placeholder="UCxxxxxxxxxxxxxxxxxxxxxxxx\nUCyyyyyyyyyyyyyyyyyyyyyyyy", |
| info="Enter multiple YouTube channel IDs separated by newlines" |
| ) |
| add_btn = gr.Button("Add Channels", variant="primary") |
| add_result = gr.Textbox(label="Result", interactive=False) |
| |
| add_btn.click(add_channel_interface, inputs=[channel_input], outputs=[add_result]) |
| |
| with gr.Column(scale=2): |
| gr.Markdown("## Delete Channel") |
| delete_channel_input = gr.Textbox( |
| label="Channel ID to delete", |
| placeholder="UCxxxxxxxxxxxxxxxxxxxxxxxx" |
| ) |
| delete_btn = gr.Button("Delete Channel", variant="stop") |
| delete_result = gr.Textbox(label="Delete Result", interactive=False) |
| |
| delete_btn.click(delete_channel_interface, inputs=[delete_channel_input], outputs=[delete_result]) |
| |
| with gr.Row(): |
| with gr.Column(): |
| gr.Markdown("## Edit Channel Name") |
| edit_channel_id = gr.Textbox(label="Channel ID to edit", placeholder="UCxxxxxxxxxxxxxxxxxxxxxxxx") |
| new_channel_name = gr.Textbox(label="New channel name", placeholder="Enter new name") |
| update_name_btn = gr.Button("Update Name", variant="secondary") |
| update_name_result = gr.Textbox(label="Update Result", interactive=False) |
| |
| update_name_btn.click( |
| update_channel_name_interface, |
| inputs=[edit_channel_id, new_channel_name], |
| outputs=[update_name_result] |
| ) |
| |
| gr.Markdown("## Registered Channels") |
| channel_list_html = gr.HTML() |
| refresh_channels_btn = gr.Button("Refresh list", variant="secondary") |
| refresh_channels_btn.click(show_channel_management, inputs=[], outputs=[channel_list_html]) |
| |
| |
| app.load(show_channel_management, inputs=[], outputs=[channel_list_html]) |
|
|
| if __name__ == "__main__": |
| app.launch() |