junaidbashir392's picture
Update app.py
32a19cc verified
import gradio as gr
import sqlite3
import json
import requests
from datetime import datetime, timedelta, timezone
from typing import List, Dict, Any, Optional
import google.generativeai as genai
from googleapiclient.discovery import build
import pandas as pd
import re
from collections import defaultdict
import base64
# Do NOT auto-load API keys from environment; keys should be provided by the user via the UI.
# Keep variables here so other functions can reference them after the user provides keys.
YOUTUBE_API_KEY: Optional[str] = None
GEMINI_API_KEY: Optional[str] = None
model = None
youtube = None
def set_api_keys(youtube_key: Optional[str], gemini_key: Optional[str]) -> tuple[str, str, str]:
"""Apply API keys provided by the user at runtime.
This will configure the Gemini client and the YouTube Data API client
so the rest of the app uses the provided keys instead of environment vars.
"""
global YOUTUBE_API_KEY, GEMINI_API_KEY, model, youtube
messages = []
# Configure Gemini (Generative AI)
if gemini_key:
try:
genai.configure(api_key=gemini_key)
model = genai.GenerativeModel('gemini-2.5-flash')
GEMINI_API_KEY = gemini_key
messages.append("Gemini API key applied successfully.")
except Exception as e:
messages.append(f"Failed to apply Gemini API key: {e}")
# Configure YouTube Data API
if youtube_key:
try:
youtube = build('youtube', 'v3', developerKey=youtube_key)
YOUTUBE_API_KEY = youtube_key
messages.append("YouTube API key applied successfully.")
except Exception as e:
messages.append(f"Failed to apply YouTube API key: {e}")
if not messages:
# Return status and empty keys
return "No API keys provided.", "", ""
# Return status plus the applied keys so the UI can store them in state
return "\n".join(messages), YOUTUBE_API_KEY or "", GEMINI_API_KEY or ""
class YouTubeCompetitorAnalyzer:
def __init__(self):
self.init_database()
def init_database(self):
"""Initialize the database"""
conn = sqlite3.connect('competitor_data.db')
cursor = conn.cursor()
# Channel table (added last_updated_at column)
cursor.execute('''
CREATE TABLE IF NOT EXISTS channels (
id INTEGER PRIMARY KEY AUTOINCREMENT,
channel_id TEXT UNIQUE,
channel_name TEXT,
channel_icon_url TEXT,
subscriber_count INTEGER,
added_date TEXT,
last_updated_at TEXT
)
''')
# Video data table (added description and tags)
cursor.execute('''
CREATE TABLE IF NOT EXISTS videos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
video_id TEXT UNIQUE,
channel_id TEXT,
title TEXT,
description TEXT,
tags TEXT,
published_at TEXT,
view_count INTEGER,
thumbnail_url TEXT,
detected_person TEXT,
detection_source TEXT,
importance_level TEXT,
created_at TEXT
)
''')
# Trend clusters table
cursor.execute('''
CREATE TABLE IF NOT EXISTS trends (
id INTEGER PRIMARY KEY AUTOINCREMENT,
person_name TEXT,
video_ids TEXT,
trend_date TEXT,
is_active BOOLEAN
)
''')
conn.commit()
conn.close()
def extract_person_from_title_with_gemini(self, title: str) -> Optional[str]:
"""Extract a person's name from the title using Gemini (global, highest priority)"""
if not model:
return None
try:
prompt = f"""
Please extract a single famous person's name (historical or contemporary) from this YouTube title.
Title: "{title}"
Target:
Globally well-known individuals (no restriction on nationality, era, or field)
- People from any country or region worldwide
- From ancient to modern times
- Any field: politics, business, philosophy, literature, science, arts, religion, sports, etc.
- Real historical or contemporary figures
Criteria:
- Widely known at a general-knowledge level
- Frequently mentioned in books, education, or media
- Identifiable as a specific real person by proper name
Response format:
- If a matching person exists: return the person's name only (in Japanese)
- If none: return "なし"
- If multiple apply: return the single most relevant person
Note: Do not restrict by nationality, era, or field. Consider notable people worldwide.
Examples:
"The secret of innovation by Steve Jobs" -> Steve Jobs
"Learning leadership from Confucius" -> Confucius
"Introduction to Einstein's theory of relativity" -> Einstein
"Konosuke Matsushita on business philosophy" -> Konosuke Matsushita
"General success tips" -> none
"""
response = model.generate_content(prompt)
result = response.text.strip()
# If result is "なし" or empty, return None
if not result or result.lower() in ['なし', 'none', '該当なし', '不明']:
return None
# Remove line breaks and extra characters to isolate the person's name
clean_result = re.sub(r'[「」『』【】|\n\r\t]', '', result).strip()
# Check global name pattern (2-15 chars: supports Japanese, English, Chinese, etc.)
global_name_pattern = r'^[\u4E00-\u9FAF\u3040-\u309F\u30A0-\u30FF\uAC00-\uD7AF\u0041-\u005A\u0061-\u007A\u00C0-\u017F\u0100-\u024F\s\u30FB\u00B7\u2022]{2,15}$'
if re.match(global_name_pattern, clean_result):
return clean_result
else:
return None
except Exception as e:
print(f"Gemini global title parsing error: {e}")
return None
def extract_person_from_description_with_gemini(self, description: str) -> Optional[str]:
if not description or len(description.strip()) < 10 or not model:
return None
try:
# If the description is too long, limit to the first 500 characters
desc_excerpt = description[:500] if len(description) > 500 else description
prompt = f"""
Please extract a single famous person's name (historical or contemporary) from this YouTube video's description.
Description excerpt: "{desc_excerpt}"
Target:
Globally well-known individuals (no restriction on nationality, era, or field)
Criteria:
- Widely known at a general-knowledge level
- Frequently mentioned in books, education, or media
- Identifiable as a specific real person by proper name
Response format:
- If a matching person exists: return the person's name only (in Japanese)
- If none: return "なし"
- If multiple apply: return the single most relevant person
- Hashtags (e.g., #SteveJobs, #Confucius) should also be considered
Note: Do not restrict by nationality, era, or field. Consider notable people worldwide.
"""
response = model.generate_content(prompt)
result = response.text.strip()
if not result or result.lower() in ['なし', 'none', '該当なし', '不明']:
return None
clean_result = re.sub(r'[「」『』【】|#\n\r\t]', '', result).strip()
# Check global name pattern
global_name_pattern = r'^[\u4E00-\u9FAF\u3040-\u309F\u30A0-\u30FF\uAC00-\uD7AF\u0041-\u005A\u0061-\u007A\u00C0-\u017F\u0100-\u024F\s\u30FB\u00B7\u2022]{2,15}$'
if re.match(global_name_pattern, clean_result):
return clean_result
else:
return None
except Exception as e:
print(f"Gemini global description parsing error: {e}")
return None
def extract_person_from_tags(self, tags: List[str]) -> Optional[str]:
"""Extract a person's name from tags (global, priority 3)"""
if not tags:
return None
# Define global name pattern (supports Japanese, English, Chinese, Korean, etc.)
global_name_pattern = r'^[\u4E00-\u9FAF\u3040-\u309F\u30A0-\u30FF\uAC00-\uD7AF\u0041-\u005A\u0061-\u007A\u00C0-\u017F\u0100-\u024F\s\u30FB\u00B7\u2022]{2,15}$'
# タグから人物名らしきものを探す
for tag in tags:
if re.match(global_name_pattern, tag):
# Exclude overly generic words (global support)
exclude_words = [
'動画', '投稿', '更新', '配信', '人生', '経営', '仕事', '成功', '失敗',
'video', 'life', 'business', 'success', 'leadership', 'philosophy',
'motivation', 'inspiration', 'education', 'training', 'coach'
]
if tag not in exclude_words and tag.lower() not in [word.lower() for word in exclude_words]:
return tag
return None
def analyze_thumbnail_ocr(self, thumbnail_url: str) -> Optional[str]:
"""Thumbnail OCR analysis (priority 4)"""
if not model:
return None
try:
response = requests.get(thumbnail_url, timeout=10)
image_data = base64.b64encode(response.content).decode()
prompt = """
Extract text from this YouTube thumbnail image.
Pay special attention to names of famous individuals (worldwide, historical or modern).
Reply in JSON using the following format:
{
"detected_text": "All text read by OCR",
"person_names": ["List of extracted person names"]
}
"""
image_part = {
"mime_type": "image/jpeg",
"data": image_data
}
response = model.generate_content([prompt, image_part])
result_text = response.text
# Extract JSON
json_match = re.search(r'```json\n(.*?)\n```', result_text, re.DOTALL)
if json_match:
result_text = json_match.group(1)
try:
result = json.loads(result_text)
person_names = result.get('person_names', [])
return person_names[0] if person_names else None
except json.JSONDecodeError:
return None
except Exception as e:
print(f"Thumbnail OCR analysis error: {e}")
return None
def analyze_thumbnail_face_recognition(self, thumbnail_url: str) -> Optional[str]:
"""Thumbnail face recognition (priority 5)"""
if not model:
return None
try:
response = requests.get(thumbnail_url, timeout=10)
image_data = base64.b64encode(response.content).decode()
prompt = """
Identify the person shown in this image.
Consider famous people worldwide, including historical figures, philosophers, business leaders, writers, and scientists.
Only return a person's name if you can identify them with confidence.
If unknown, return null.
Respond in JSON:
{
"person_name": "Identified person name or null"
}
"""
image_part = {
"mime_type": "image/jpeg",
"data": image_data
}
response = model.generate_content([prompt, image_part])
result_text = response.text
json_match = re.search(r'```json\n(.*?)\n```', result_text, re.DOTALL)
if json_match:
result_text = json_match.group(1)
try:
result = json.loads(result_text)
return result.get('person_name')
except json.JSONDecodeError:
return None
except Exception as e:
print(f"Face recognition analysis error: {e}")
return None
def extract_person_comprehensive(self, video_data: Dict) -> tuple[Optional[str], str]:
"""Comprehensive person extraction (Gemini prioritized, global system)"""
title = video_data.get('title', '')
description = video_data.get('description', '')
tags = video_data.get('tags', [])
thumbnail_url = video_data.get('thumbnail_url', '')
# Priority 1: Gemini title analysis (global, highest priority)
person = self.extract_person_from_title_with_gemini(title)
if person:
return person, "Gemini-GlobalTitle"
# Priority 2: Gemini description analysis (global)
person = self.extract_person_from_description_with_gemini(description)
if person:
return person, "Gemini-GlobalDescription"
# Priority 3: Tag analysis (global)
person = self.extract_person_from_tags(tags)
if person:
return person, "GlobalTag"
# Priority 4: Thumbnail OCR
person = self.analyze_thumbnail_ocr(thumbnail_url)
if person:
return person, "ThumbnailOCR"
# Priority 5: Face recognition
person = self.analyze_thumbnail_face_recognition(thumbnail_url)
if person:
return person, "FaceRecognition"
return None, "Not detected"
def add_channel(self, channel_id: str) -> str:
"""Add a channel"""
if not youtube:
return "YouTube API key is not set."
try:
# Retrieve channel info
response = youtube.channels().list(
part='snippet,statistics',
id=channel_id
).execute()
if not response['items']:
return f"ID: {channel_id} - Channel not found"
channel_info = response['items'][0]
channel_name = channel_info['snippet']['title']
channel_icon = channel_info['snippet']['thumbnails']['default']['url']
subscriber_count = int(channel_info['statistics'].get('subscriberCount', 0))
conn = sqlite3.connect('competitor_data.db')
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO channels
(channel_id, channel_name, channel_icon_url, subscriber_count, added_date)
VALUES (?, ?, ?, ?, ?)
''', (channel_id, channel_name, channel_icon, subscriber_count, datetime.now().isoformat()))
conn.commit()
conn.close()
return f"Channel '{channel_name}' added"
except Exception as e:
return f"ID: {channel_id} - Error: {str(e)}"
def delete_channel(self, channel_id: str) -> str:
"""Delete a channel"""
try:
conn = sqlite3.connect('competitor_data.db')
cursor = conn.cursor()
# Retrieve channel name
cursor.execute('SELECT channel_name FROM channels WHERE channel_id = ?', (channel_id,))
result = cursor.fetchone()
if not result:
conn.close()
return "Channel not found"
channel_name = result[0]
# Delete channel and related video data
cursor.execute('DELETE FROM videos WHERE channel_id = ?', (channel_id,))
cursor.execute('DELETE FROM channels WHERE channel_id = ?', (channel_id,))
conn.commit()
conn.close()
return f"Channel '{channel_name}' deleted"
except Exception as e:
return f"Deletion error: {str(e)}"
def update_channel_name(self, channel_id: str, new_name: str) -> str:
"""Update channel name"""
try:
conn = sqlite3.connect('competitor_data.db')
cursor = conn.cursor()
cursor.execute('''
UPDATE channels
SET channel_name = ?
WHERE channel_id = ?
''', (new_name, channel_id))
if cursor.rowcount == 0:
conn.close()
return "Channel not found"
conn.commit()
conn.close()
return f"Channel name updated to '{new_name}'"
except Exception as e:
return f"Update error: {str(e)}"
def get_channels(self) -> List[Dict]:
"""Get list of registered channels"""
conn = sqlite3.connect('competitor_data.db')
cursor = conn.cursor()
cursor.execute('''
SELECT channel_id, channel_name, channel_icon_url, subscriber_count, added_date, last_updated_at
FROM channels
ORDER BY added_date DESC
''')
channels = []
for row in cursor.fetchall():
channels.append({
'id': row[0],
'name': row[1],
'icon_url': row[2],
'subscriber_count': row[3],
'added_date': row[4],
'last_updated_at': row[5]
})
conn.close()
return channels
def fetch_videos_from_channel(self, channel_id: str, since_date: Optional[str] = None) -> List[Dict]:
"""Fetch videos from a channel since the specified date"""
if not youtube:
return []
try:
# since_dateがなければ、過去7日間に設定
if not since_date:
since_date_dt = datetime.now(timezone.utc) - timedelta(days=7)
else:
since_date_dt = datetime.fromisoformat(since_date)
# YouTube APIのフォーマットに変換
published_after = since_date_dt.isoformat().replace('+00:00', 'Z')
response = youtube.search().list(
part='snippet',
channelId=channel_id,
maxResults=50,
order='date',
publishedAfter=published_after,
type='video'
).execute()
videos = []
video_ids = [item['id']['videoId'] for item in response['items']]
# 動画の詳細情報(再生回数、概要欄、タグなど)を取得
if video_ids:
video_details = youtube.videos().list(
part='statistics,snippet',
id=','.join(video_ids)
).execute()
for item in video_details['items']:
videos.append({
'video_id': item['id'],
'title': item['snippet']['title'],
'description': item['snippet'].get('description', ''),
'tags': item['snippet'].get('tags', []),
'published_at': item['snippet']['publishedAt'],
'view_count': int(item['statistics'].get('viewCount', 0)),
'thumbnail_url': item['snippet']['thumbnails']['high']['url']
})
return videos
except Exception as e:
print(f"Video fetch error: {e}")
return []
def determine_importance(self, video_data: Dict) -> str:
"""Determine importance level"""
published_at_str = video_data['published_at']
# Add 'Z' when timezone information is missing
if 'Z' not in published_at_str and '+' not in published_at_str:
published_at_str += 'Z'
published_at = datetime.fromisoformat(published_at_str.replace('Z', '+00:00'))
now_utc = datetime.now(published_at.tzinfo)
hours_since_published = (now_utc - published_at).total_seconds() / 3600
view_count = video_data['view_count']
if hours_since_published <= 24 and view_count >= 10000:
return "Critical"
elif hours_since_published <= 48 and view_count >= 10000:
return "Important"
else:
return "Normal"
def detect_trends(self) -> List[Dict]:
"""Detect trending clusters"""
conn = sqlite3.connect('competitor_data.db')
cursor = conn.cursor()
# 過去2日以内の動画で人物が特定されたものを取得
two_days_ago = (datetime.now() - timedelta(days=2)).isoformat()
cursor.execute('''
SELECT detected_person, COUNT(*) as video_count,
GROUP_CONCAT(video_id) as video_ids,
GROUP_CONCAT(DISTINCT channel_id) as channels,
GROUP_CONCAT(detection_source) as sources
FROM videos
WHERE detected_person IS NOT NULL
AND detected_person != ''
AND published_at > ?
GROUP BY detected_person
HAVING COUNT(*) >= 2
AND COUNT(DISTINCT channel_id) >= 2
ORDER BY video_count DESC
''', (two_days_ago,))
trends = []
for row in cursor.fetchall():
person_name, count, video_ids, channels, sources = row
unique_channels = len(set(channels.split(',')))
trends.append({
'person_name': person_name,
'video_count': count,
'unique_channels': unique_channels,
'video_ids': video_ids.split(','),
'detection_sources': sources.split(',')
})
conn.close()
return trends
def update_all_data(self) -> str:
"""Update data for all channels"""
channels = self.get_channels()
total_new_videos = 0
conn = sqlite3.connect('competitor_data.db')
cursor = conn.cursor()
for channel in channels:
channel_id = channel['id']
last_update = channel['last_updated_at']
# Only fetch videos published after the channel's last_updated_at
videos = self.fetch_videos_from_channel(channel_id, since_date=last_update)
for video in videos:
# 包括的な人物名抽出
detected_person, detection_source = self.extract_person_comprehensive(video)
# 重要度を判定
importance = self.determine_importance(video)
# データベースに保存
cursor.execute('''
INSERT OR IGNORE INTO videos
(video_id, channel_id, title, description, tags, published_at, view_count,
thumbnail_url, detected_person, detection_source, importance_level, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
video['video_id'],
channel_id,
video['title'],
video['description'],
','.join(video['tags']) if video['tags'] else '',
video['published_at'],
video['view_count'],
video['thumbnail_url'],
detected_person,
detection_source,
importance,
datetime.now(timezone.utc).isoformat()
))
if cursor.rowcount > 0:
total_new_videos += 1
# Update this channel's last_updated_at to now
cursor.execute('''
UPDATE channels
SET last_updated_at = ?
WHERE channel_id = ?
''', (datetime.now(timezone.utc).isoformat(), channel_id))
conn.commit()
conn.close()
return f"Update complete: added {total_new_videos} new videos"
def get_recent_videos_by_timerange(self, hours: int, limit: int = 50) -> List[Dict]:
"""Get videos within the specified time range sorted by view count (JST-based)"""
conn = sqlite3.connect('competitor_data.db')
cursor = conn.cursor()
# 日本時間(JST = UTC+9)で指定時間前の日時を計算
jst = timezone(timedelta(hours=9))
cutoff_time_jst = datetime.now(jst) - timedelta(hours=hours)
cutoff_time_utc = cutoff_time_jst.astimezone(timezone.utc)
cursor.execute('''
SELECT v.video_id, v.title, v.published_at, v.view_count, v.thumbnail_url,
v.detected_person, v.detection_source, v.importance_level,
c.channel_name, c.channel_icon_url, v.channel_id
FROM videos v
JOIN channels c ON v.channel_id = c.channel_id
WHERE v.published_at > ?
ORDER BY v.view_count DESC
LIMIT ?
''', (cutoff_time_utc.isoformat(), limit))
videos = []
for row in cursor.fetchall():
video_id, title, published_at, view_count, thumbnail_url, detected_person, \
detection_source, importance_level, channel_name, channel_icon_url, channel_id = row
# UTC時間をJSTに変換
published_at_utc = datetime.fromisoformat(published_at.replace('Z', '+00:00'))
published_at_jst = published_at_utc.astimezone(jst)
videos.append({
'video_id': video_id,
'title': title,
'published_at': published_at,
'published_at_jst': published_at_jst,
'view_count': view_count,
'thumbnail_url': thumbnail_url,
'detected_person': detected_person or 'Not detected',
'detection_source': detection_source or '-',
'importance_level': importance_level or 'Normal',
'channel_name': channel_name,
'channel_icon_url': channel_icon_url,
'channel_id': channel_id
})
conn.close()
return videos
def generate_recent_videos_html(self, hours: int, limit: int = 50) -> str:
"""Generate HTML for the recent videos list"""
videos = self.get_recent_videos_by_timerange(hours, limit)
# Build a description for the time range
time_range_text = f"Past {hours} hours"
html = f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Recent Videos - {time_range_text}</title>
<style>
body {{ font-family: 'Helvetica Neue', Arial, sans-serif; margin: 0; padding: 20px; background-color: #000000 !important; color: #ffffff !important; }}
.container {{ max-width: 1200px; margin: 0 auto; background-color: transparent !important; }}
h1 {{ color: #ffffff !important; text-align: center; margin-bottom: 30px; }}
.stats-info {{ background: rgba(255,255,255,0.03) !important; padding: 15px !important; border-radius: 8px !important; margin-bottom: 20px !important; text-align: center !important; font-size: 16px !important; color: #bcdffb !important; }}
.video-item {{ display: flex !important; align-items: flex-start !important; background: #0b0b0b !important; margin-bottom: 15px !important; padding: 15px !important; border-radius: 8px !important; box-shadow: 0 2px 6px rgba(0,0,0,0.6) !important; border-left: 4px solid #222 !important; }}
.video-item.Critical {{ border-left-color: #b71c1c !important; background-color: #120202 !important; }}
.video-item.Important {{ border-left-color: #bf360c !important; background-color: #241100 !important; }}
.thumbnail {{ width: 160px !important; height: 90px !important; object-fit: cover !important; margin-right: 15px !important; border-radius: 4px !important; flex-shrink: 0 !important; }}
.video-info {{ flex: 1 !important; }}
.video-title {{ font-weight: bold !important; margin-bottom: 8px !important; font-size: 16px !important; line-height: 1.4 !important; color: #ffffff !important; }}
.video-meta {{ color: #cfcfcf !important; font-size: 14px !important; line-height: 1.6 !important; margin-bottom: 5px !important; }}
.channel-info {{ display: flex !important; align-items: center !important; margin-bottom: 8px !important; }}
.channel-icon {{ width: 24px !important; height: 24px !important; border-radius: 50% !important; margin-right: 8px !important; }}
.stats {{ color: #64b5f6 !important; font-weight: bold !important; }}
.importance-badge {{ display: inline-block !important; padding: 2px 8px !important; border-radius: 12px !important; font-size: 12px !important; font-weight: bold !important; margin-left: 10px !important; }}
.importance-badge.Critical {{ background: #b71c1c !important; color: white !important; }}
.importance-badge.Important {{ background: #bf360c !important; color: white !important; }}
.importance-badge.Normal {{ background: #2e7d32 !important; color: white !important; }}
.detection-badge {{ display: inline-block !important; padding: 2px 6px !important; border-radius: 8px !important; font-size: 11px !important; background: #1976d2 !important; color: white !important; margin-left: 5px !important; }}
.video-links {{ margin-top: 8px !important; }}
.video-links a {{ display: inline-block !important; margin-right: 15px !important; color: #90caf9 !important; text-decoration: none !important; font-size: 14px !important; padding: 4px 8px !important; border: 1px solid #263238 !important; border-radius: 4px !important; transition: background-color 0.3s !important; }}
.video-links a:hover {{ background-color: rgba(227,242,253,0.04) !important; }}
.no-videos {{ text-align: center !important; padding: 40px !important; color: #bdbdbd !important; font-size: 16px !important; }}
</style>
</head>
<body>
<div class="container">
<h1>📺 Recent Videos ({time_range_text} · Sorted by views)</h1>
<div class="stats-info">
📊 Showing {len(videos)} videos (up to {limit}) | 🕐 Times shown in JST | 📈 Sorted by views
</div>
"""
if videos:
for video in videos:
# 日本時間での投稿日時をフォーマット
published_jst = video['published_at_jst'].strftime('%m/%d %H:%M')
# Construct video and channel URLs
video_url = f"https://www.youtube.com/watch?v={video['video_id']}"
channel_url = f"https://www.youtube.com/channel/{video['channel_id']}"
html += f"""
<div class="video-item {video['importance_level']}">
<img src="{video['thumbnail_url']}" alt="thumbnail" class="thumbnail">
<div class="video-info">
<div class="video-title">{video['title']}</div>
<div class="channel-info">
<img src="{video['channel_icon_url']}" alt="channel" class="channel-icon">
<span>{video['channel_name']}</span>
</div>
<div class="video-meta">
📅 {published_jst} (JST) |
<span class="stats">👀 {video['view_count']:,} views</span> |
👤 {video['detected_person']}
<span class="detection-badge">{video['detection_source']}</span>
<span class="importance-badge {video['importance_level']}">{video['importance_level']}</span>
</div>
<div class="video-links">
<a href="{video_url}" target="_blank">🎬 Watch video</a>
<a href="{channel_url}" target="_blank">📺 Channel</a>
</div>
</div>
</div>
"""
else:
html += f"""
<div class="no-videos">
📭 No videos were posted in the {time_range_text}.<br>
Try updating data or expanding the time range.
</div>
"""
html += """
</div>
</body>
</html>
"""
return html
def generate_dashboard(self) -> str:
"""Generate the HTML dashboard"""
trends = self.detect_trends()
conn = sqlite3.connect('competitor_data.db')
cursor = conn.cursor()
# 重要度別の動画を取得
cursor.execute('''
SELECT v.title, v.published_at, v.view_count, v.detected_person,
v.importance_level, c.channel_name, v.thumbnail_url, v.video_id,
v.channel_id, v.detection_source
FROM videos v
JOIN channels c ON v.channel_id = c.channel_id
WHERE v.published_at > ?
ORDER BY
CASE v.importance_level
WHEN 'Critical' THEN 1
WHEN 'Important' THEN 2
ELSE 3
END,
v.view_count DESC
''', ((datetime.now() - timedelta(days=7)).isoformat(),))
videos = cursor.fetchall()
conn.close()
# Generate HTML
html = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>YouTube Competitor Analysis Dashboard (Global)</title>
<style>
body {{ font-family: 'Helvetica Neue', Arial, sans-serif; margin: 0; padding: 20px; background-color: #000000 !important; color: #ffffff !important; }}
.container {{ max-width: 1200px; margin: 0 auto; background-color: transparent !important; }}
h1 {{ color: #ffffff !important; text-align: center; margin-bottom: 30px; }}
h2 {{ border-bottom: 2px solid rgba(255,255,255,0.06) !important; padding-bottom: 10px !important; }}
.section {{ background: #070707 !important; margin-bottom: 30px !important; padding: 20px !important; border-radius: 8px !important; box-shadow: 0 2px 6px rgba(0,0,0,0.6) !important; }}
.trend-item {{ border: 1px solid #1f1f1f !important; margin-bottom: 15px !important; padding: 15px !important; border-radius: 5px !important; background: #0b0b0b !important; }}
.trend-title {{ font-size: 18px !important; font-weight: bold !important; color: #ff8a80 !important; margin-bottom: 10px !important; }}
.trend-meta {{ font-size: 14px !important; color: #bdbdbd !important; margin-bottom: 5px !important; }}
.video-item {{ display: flex !important; align-items: flex-start !important; margin-bottom: 15px !important; padding: 10px !important; border-left: 4px solid #222 !important; }}
.video-item.Critical {{ border-left-color: #b71c1c !important; background-color: #120202 !important; }}
.video-item.Important {{ border-left-color: #bf360c !important; background-color: #241100 !important; }}
.thumbnail {{ width: 160px !important; height: 90px !important; object-fit: cover !important; margin-right: 15px !important; border-radius: 4px !important; flex-shrink: 0 !important; }}
.video-info {{ flex: 1 !important; }}
.video-title {{ font-weight: bold !important; margin-bottom: 5px !important; font-size: 16px !important; color: #ffffff !important; }}
.video-meta {{ color: #cfcfcf !important; font-size: 14px !important; line-height: 1.6 !important; }}
.importance-badge {{ display: inline-block !important; padding: 2px 8px !important; border-radius: 12px !important; font-size: 12px !important; font-weight: bold !important; margin-left: 10px !important; }}
.importance-badge.Critical {{ background: #b71c1c !important; color: white !important; }}
.importance-badge.Important {{ background: #bf360c !important; color: white !important; }}
.importance-badge.Normal {{ background: #2e7d32 !important; color: white !important; }}
.detection-badge {{ display: inline-block !important; padding: 2px 6px !important; border-radius: 8px !important; font-size: 11px !important; background: #1976d2 !important; color: white !important; margin-left: 5px !important; }}
.stats {{ color: #64b5f6 !important; font-weight: bold !important; }}
.video-links {{ margin-top: 8px !important; }}
.video-links a {{ display: inline-block !important; margin-right: 15px !important; color: #90caf9 !important; text-decoration: none !important; font-size: 14px !important; padding: 4px 8px !important; border: 1px solid #263238 !important; border-radius: 4px !important; transition: background-color 0.3s !important; }}
.video-links a:hover {{ background-color: rgba(227,242,253,0.04) !important; }}
</style>
</head>
<body>
<div class="container">
<h1>🌍 YouTube Competitor Analysis Dashboard (Global)</h1>
<div class="section">
<h2>🔥 Currently Trending Clusters</h2>
"""
if trends:
for trend in trends:
detection_sources = list(set(trend['detection_sources']))
sources_text = ', '.join(detection_sources)
html += f"""
<div class="trend-item">
<div class="trend-title">👤 {trend['person_name']}</div>
<div class="trend-meta">📺 <strong>{trend['video_count']}</strong> videos posted across <strong>{trend['unique_channels']}</strong> channels</div>
<div class="trend-meta">🔍 Detection methods: {sources_text}</div>
</div>
"""
else:
html += "<p>There are currently no trending clusters.</p>"
html += """
</div>
<div class="section">
<h2>📺 Recent Videos (Global person detection)</h2>
"""
for video in videos[:20]: # 上位20本を表示
(title, published_at, view_count, detected_person, importance,
channel_name, thumbnail_url, video_id, channel_id, detection_source) = video
published_date = datetime.fromisoformat(published_at.replace('Z', '+00:00')).strftime('%m/%d %H:%M')
person_display = detected_person if detected_person else "Unknown"
# URLを生成
video_url = f"https://www.youtube.com/watch?v={video_id}"
channel_url = f"https://www.youtube.com/channel/{channel_id}"
html += f"""
<div class="video-item {importance}">
<img src="{thumbnail_url}" alt="thumbnail" class="thumbnail">
<div class="video-info">
<div class="video-title">{title}</div>
<div class="video-meta">
📺 {channel_name} | 📅 {published_date} |
<span class="stats">👀 {view_count:,} views</span> |
👤 {person_display}
<span class="detection-badge">{detection_source}</span>
<span class="importance-badge {importance}">{importance}</span>
</div>
<div class="video-links">
<a href="{video_url}" target="_blank">🎬 Watch video</a>
<a href="{channel_url}" target="_blank">📺 Channel</a>
</div>
</div>
</div>
"""
html += """
</div>
</div>
</body>
</html>
"""
return html
def generate_channel_management_html(self) -> str:
"""Generate HTML for channel management"""
channels = self.get_channels()
html = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Channel Management (Global)</title>
<style>
body {{ font-family: 'Helvetica Neue', Arial, sans-serif; margin: 0; padding: 20px; background-color: #000000 !important; color: #ffffff !important; }}
.container {{ max-width: 800px; margin: 0 auto; background-color: transparent !important; }}
h2 {{ border-bottom: 2px solid rgba(255,255,255,0.06) !important; padding-bottom: 10px !important; }}
.channel-item {{ display: flex !important; align-items: center !important; background: #0b0b0b !important; margin-bottom: 15px !important; padding: 15px !important; border-radius: 8px !important; box-shadow: 0 2px 6px rgba(0,0,0,0.6) !important; }}
.channel-icon {{ width: 48px !important; height: 48px !important; border-radius: 50% !important; margin-right: 15px !important; object-fit: cover !important; }}
.channel-info {{ flex: 1 !important; }}
.channel-name {{ font-weight: bold !important; font-size: 16px !important; margin-bottom: 5px !important; color: #ffffff !important; }}
.channel-meta {{ color: #bdbdbd !important; font-size: 14px !important; }}
.channel-actions {{ display: flex !important; gap: 10px !important; }}
.btn {{ padding: 6px 12px !important; border: none !important; border-radius: 4px !important; cursor: pointer !important; font-size: 12px !important; text-decoration: none !important; display: inline-block !important; transition: opacity 0.3s !important; }}
.btn-edit {{ background: #1976d2 !important; color: white !important; border: 1px solid #263238 !important; }}
.btn-delete {{ background: #b71c1c !important; color: white !important; border: 1px solid #331111 !important; }}
.btn:hover {{ opacity: 0.9 !important; }}
</style>
</head>
<body>
<div class="container">
<h2>🌍 Registered Channels (Global person detection)</h2>
"""
if channels:
for channel in channels:
added_date = datetime.fromisoformat(channel['added_date']).strftime('%Y/%m/%d')
subscriber_text = f"{channel['subscriber_count']:,} subscribers" if channel['subscriber_count'] else "Private"
html += f"""
<div class="channel-item">
<img src="{channel['icon_url']}" alt="icon" class="channel-icon">
<div class="channel-info">
<div class="channel-name">{channel['name']}</div>
<div class="channel-meta">
👥 Subscribers: {subscriber_text} | 📅 Added: {added_date}
</div>
<div class="channel-meta">
🆔 {channel['id']}
</div>
</div>
<div class="channel-actions">
<button class="btn btn-edit" onclick="editChannel('{channel['id']}', '{channel['name']}')">✏️ Edit</button>
<button class="btn btn-delete" onclick="deleteChannel('{channel['id']}')">🗑️ Delete</button>
</div>
</div>
"""
else:
html += "<p>No channels registered.</p>"
html += """
<script>
function editChannel(channelId, currentName) {
const newName = prompt('Enter new channel name:', currentName);
if (newName && newName !== currentName) {
alert('Channel name updates must be performed from the Gradio interface.');
}
}
function deleteChannel(channelId) {
if (confirm('Are you sure you want to delete this channel?\\nRelated video data will also be removed.')) {
alert('Channel deletion must be performed from the Gradio interface.\\nChannel ID: ' + channelId);
}
}
</script>
</div>
</body>
</html>
"""
return html
# アプリのインスタンスを作成
analyzer = YouTubeCompetitorAnalyzer()
# Strong global dark CSS to force dark mode even if hosting injects light styles.
# Uses high-specificity selectors and !important to override Hugging Face Spaces' theme.
DARK_THEME_CSS = """
:root, html, body, .gradio-container {
background-color: #000000 !important;
color: #ffffff !important;
color-scheme: dark !important;
}
.gradio-container, .gradio-container * {
background-color: transparent !important;
color: #ffffff !important;
border-color: #333333 !important;
}
/* Inputs, buttons and textareas */
button, .gr-button, input, textarea, select, .gradio-textbox, .gradio-file, .gradio-dropdown, .gradio-button {
background-color: #0b0b0b !important;
color: #ffffff !important;
border: 1px solid #333333 !important;
}
input::placeholder, textarea::placeholder {
color: #bfbfbf !important;
}
.gradio-markdown, .gradio-html, .gradio-label, .gradio-textbox, .gradio-output {
color: #ffffff !important;
}
/* Ensure components that Gradio or Spaces might wrap still show dark backgrounds */
.gradio-container .container, .gradio-container .section, .gradio-container .card {
background-color: #000000 !important;
color: #ffffff !important;
}
/* Give high contrast to borders and badges */
.importance-badge, .detection-badge {
color: #ffffff !important;
}
"""
# Gradio インターface
def add_channel_interface(channel_ids_text):
"""Interface function that supports adding multiple channel IDs"""
if not channel_ids_text:
return "Please enter channel ID"
# 改行で分割し、前後の空白を除去し、空行を無視する
channel_ids = [cid.strip() for cid in channel_ids_text.split('\n') if cid.strip()]
if not channel_ids:
return "No valid channel IDs provided."
results = []
# Process each channel ID in order
for channel_id in channel_ids:
result = analyzer.add_channel(channel_id)
results.append(result)
# 結果を改行で連結して返す
return "\n".join(results)
def delete_channel_interface(channel_id):
if not channel_id:
return "Please enter a channel ID to delete"
return analyzer.delete_channel(channel_id.strip())
def update_channel_name_interface(channel_id, new_name):
if not channel_id or not new_name:
return "Please enter channel ID and new name"
return analyzer.update_channel_name(channel_id.strip(), new_name.strip())
def update_data_interface():
return analyzer.update_all_data()
def show_dashboard():
return analyzer.generate_dashboard()
def show_channel_management():
return analyzer.generate_channel_management_html()
def show_recent_videos_interface(hours_selection, limit_selection):
"""Interface function for recent videos list"""
hours_map = {
"6 hours": 6,
"12 hours": 12,
"24 hours": 24,
"48 hours": 48
}
limit_map = {
"20 items": 20,
"50 items": 50,
"100 items": 100,
"200 items": 200
}
hours = hours_map.get(hours_selection, 24)
limit = limit_map.get(limit_selection, 50)
return analyzer.generate_recent_videos_html(hours, limit)
# Gradioアプリの構築
with gr.Blocks(title="YouTube Competitor Analysis (Global)", theme=gr.themes.Monochrome(), css=DARK_THEME_CSS) as app:
gr.Markdown("# 🌍 YouTube Competitor Analysis App (Global)")
gr.Markdown("Analyze competitor channel uploads and detect global clustered trends using **Gemini 2.5 Flash**.")
with gr.Tab("📊 Dashboard"):
gr.Markdown("## Global Analysis Dashboard")
refresh_btn = gr.Button("📈 Refresh Dashboard", variant="secondary")
dashboard_html = gr.HTML()
refresh_btn.click(show_dashboard, inputs=[], outputs=[dashboard_html])
# initial load
app.load(show_dashboard, inputs=[], outputs=[dashboard_html])
with gr.Tab("📺 Recent Videos"):
gr.Markdown("## 📺 Recent Videos (select time range and max items)")
gr.Markdown("""
### Features
- ⏰ Time range selection: choose between 6 to 48 hours
- 📊 Sorted by view count: show the highest-view videos first
- 🕐 Times displayed in JST (UTC+9)
- 🔢 Max items: limit display between 20 and 200
- 🌍 Global detection: detect notable people worldwide
""")
with gr.Row():
hours_dropdown = gr.Dropdown(
choices=["6 hours", "12 hours", "24 hours", "48 hours"],
value="24 hours",
label="⏰ Time Range"
)
limit_dropdown = gr.Dropdown(
choices=["20 items", "50 items", "100 items", "200 items"],
value="50 items",
label="🔢 Max items"
)
update_recent_btn = gr.Button("🔄 Update Recent Videos", variant="primary", size="lg")
recent_videos_html = gr.HTML()
update_recent_btn.click(
show_recent_videos_interface,
inputs=[hours_dropdown, limit_dropdown],
outputs=[recent_videos_html]
)
# initial load (24 hours, 50 items)
app.load(
show_recent_videos_interface,
inputs=[gr.State("24 hours"), gr.State("50 items")],
outputs=[recent_videos_html]
)
with gr.Tab("🔄 Data Update"):
gr.Markdown("## 🌍 Global AI High-Precision Data Update System")
gr.Markdown("""
### 🎯 Person extraction using Gemini 2.5 Flash (priority order)
1. **🤖 Gemini title analysis** - high-precision extraction of notable people from titles
2. **🤖 Gemini description analysis** - detect people via hashtags and description
3. **🏷️ Tag analysis** - identify multilingual person names from tags
4. **🖼️ Thumbnail OCR** - read text from thumbnails
5. **👤 Face recognition** - identify persons via face recognition
**🌍 Global support**: detects people regardless of nationality, era, or field
""")
# API key inputs (allow user to provide keys at runtime instead of env vars)
with gr.Row():
youtube_key_input = gr.Textbox(label="YouTube API Key", placeholder="Enter YouTube API key", type="password")
gemini_key_input = gr.Textbox(label="Gemini API Key", placeholder="Enter Gemini API key", type="password")
apply_keys_btn = gr.Button("Apply API Keys", variant="secondary")
api_keys_result = gr.Textbox(label="API Key Status", interactive=False)
# Keep applied keys in hidden Gradio state so other callbacks can reference them if needed
youtube_key_state = gr.State("")
gemini_key_state = gr.State("")
# set_api_keys now returns (status, youtube_key, gemini_key)
apply_keys_btn.click(
set_api_keys,
inputs=[youtube_key_input, gemini_key_input],
outputs=[api_keys_result, youtube_key_state, gemini_key_state]
)
update_btn = gr.Button("🌍 Start Global Data Update", variant="primary", size="lg")
update_result = gr.Textbox(label="Update Result", interactive=False)
update_btn.click(update_data_interface, inputs=[], outputs=[update_result])
with gr.Tab("📺 Channel Management"):
with gr.Row():
with gr.Column(scale=2):
gr.Markdown("## Add Channels")
# Textbox switched to multiline to support multiple lines
channel_input = gr.TextArea(
label="Channel ID (one per line)",
placeholder="UCxxxxxxxxxxxxxxxxxxxxxxxx\nUCyyyyyyyyyyyyyyyyyyyyyyyy",
info="Enter multiple YouTube channel IDs separated by newlines"
)
add_btn = gr.Button("Add Channels", variant="primary")
add_result = gr.Textbox(label="Result", interactive=False)
add_btn.click(add_channel_interface, inputs=[channel_input], outputs=[add_result])
with gr.Column(scale=2):
gr.Markdown("## Delete Channel")
delete_channel_input = gr.Textbox(
label="Channel ID to delete",
placeholder="UCxxxxxxxxxxxxxxxxxxxxxxxx"
)
delete_btn = gr.Button("Delete Channel", variant="stop")
delete_result = gr.Textbox(label="Delete Result", interactive=False)
delete_btn.click(delete_channel_interface, inputs=[delete_channel_input], outputs=[delete_result])
with gr.Row():
with gr.Column():
gr.Markdown("## Edit Channel Name")
edit_channel_id = gr.Textbox(label="Channel ID to edit", placeholder="UCxxxxxxxxxxxxxxxxxxxxxxxx")
new_channel_name = gr.Textbox(label="New channel name", placeholder="Enter new name")
update_name_btn = gr.Button("Update Name", variant="secondary")
update_name_result = gr.Textbox(label="Update Result", interactive=False)
update_name_btn.click(
update_channel_name_interface,
inputs=[edit_channel_id, new_channel_name],
outputs=[update_name_result]
)
gr.Markdown("## Registered Channels")
channel_list_html = gr.HTML()
refresh_channels_btn = gr.Button("Refresh list", variant="secondary")
refresh_channels_btn.click(show_channel_management, inputs=[], outputs=[channel_list_html])
# initial load
app.load(show_channel_management, inputs=[], outputs=[channel_list_html])
if __name__ == "__main__":
app.launch()