hanjunjung
[upload]
6b8f0f2
#!/usr/bin/env python3
"""
YouTube Comment Analyzer - Complete Self-contained Hugging Face Spaces App
MCP Hackathon 2025 - Track 1 Submission
All dependencies included in single file for security and simplicity
"""
import gradio as gr
import asyncio
import os
import logging
import re
from typing import Dict, Any, Optional, Tuple, List
from pathlib import Path
import sys
from collections import Counter
from datetime import datetime
import traceback
import warnings
import anthropic
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import isodate
from dataclasses import dataclass
# Suppress warnings
warnings.filterwarnings('ignore')
# Chart libraries
import plotly.express as px
import pandas as pd
# WordCloud and visualization libraries
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
plt.ioff()
from wordcloud import WordCloud
import base64
import io
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
print("=" * 60)
print("🎬 YouTube Comment Analyzer - MCP Hackathon 2025")
print("=" * 60)
print(f"🔑 YouTube API Key: {'✅ 있음' if os.getenv('YOUTUBE_API_KEY') else '❌ 없음'}")
print(f"🔑 Anthropic API Key: {'✅ 있음' if os.getenv('ANTHROPIC_API_KEY') else '❌ 없음'}")
# ============================================================================
# YouTube API Models
# ============================================================================
@dataclass
class YouTubeChannel:
id: str
title: str
description: str
subscriber_count: Optional[int] = None
video_count: Optional[int] = None
view_count: Optional[int] = None
thumbnail_url: Optional[str] = None
custom_url: Optional[str] = None
@dataclass
class YouTubeVideo:
id: str
title: str
description: str
channel_id: str
channel_title: str
published_at: datetime
duration: Optional[str] = None
view_count: Optional[int] = None
like_count: Optional[int] = None
comment_count: Optional[int] = None
thumbnail_url: Optional[str] = None
tags: List[str] = None
def __post_init__(self):
if self.tags is None:
self.tags = []
@dataclass
class YouTubeComment:
id: str
text: str
author_name: str
author_channel_id: Optional[str]
like_count: int
published_at: datetime
updated_at: Optional[datetime] = None
reply_count: int = 0
# ============================================================================
# YouTube API Client
# ============================================================================
class YouTubeAPIError(Exception):
pass
class QuotaExceededError(YouTubeAPIError):
pass
class YouTubeClient:
def __init__(self):
api_key = os.getenv('YOUTUBE_API_KEY')
if not api_key:
raise ValueError("YOUTUBE_API_KEY not found in environment variables")
self.youtube = build('youtube', 'v3', developerKey=api_key)
self.quota_used = 0
def _handle_api_error(self, error: HttpError):
"""Handle API errors"""
if error.resp.status == 403:
error_details = error.error_details
if any('quotaExceeded' in str(detail) for detail in error_details):
raise QuotaExceededError("YouTube API daily quota exceeded.")
logger.error(f"YouTube API error: {error}")
raise YouTubeAPIError(f"YouTube API call failed: {error}")
def search_channels(self, query: str, max_results: int = 5) -> List[YouTubeChannel]:
"""Search channels"""
try:
logger.info(f"Channel search: {query}")
# Execute search (100 units)
search_response = self.youtube.search().list(
q=query,
part='snippet',
type='channel',
maxResults=max_results,
regionCode='US'
).execute()
self.quota_used += 100
channels = []
if search_response.get('items'):
# Get channel details (1 unit)
channel_ids = [item['id']['channelId'] for item in search_response['items']]
channels_response = self.youtube.channels().list(
part='snippet,statistics',
id=','.join(channel_ids)
).execute()
self.quota_used += 1
for item in channels_response['items']:
snippet = item['snippet']
statistics = item.get('statistics', {})
channel = YouTubeChannel(
id=item['id'],
title=snippet['title'],
description=snippet['description'],
subscriber_count=int(statistics.get('subscriberCount', 0)),
video_count=int(statistics.get('videoCount', 0)),
view_count=int(statistics.get('viewCount', 0)),
thumbnail_url=snippet['thumbnails']['default']['url'],
custom_url=snippet.get('customUrl')
)
channels.append(channel)
return channels
except HttpError as e:
self._handle_api_error(e)
except Exception as e:
logger.error(f"Channel search error: {e}")
raise YouTubeAPIError(f"Channel search failed: {e}")
def get_channel_videos(self, channel_id: str, max_results: int = 10,
sort_by: str = 'recent') -> List[YouTubeVideo]:
"""Get channel video list"""
try:
# Get channel's uploads playlist ID (1 unit)
channel_response = self.youtube.channels().list(
part='contentDetails',
id=channel_id
).execute()
self.quota_used += 1
if not channel_response['items']:
raise YouTubeAPIError(f"Channel not found: {channel_id}")
uploads_playlist_id = channel_response['items'][0]['contentDetails']['relatedPlaylists']['uploads']
# Get recent video list (1 unit)
playlist_response = self.youtube.playlistItems().list(
part='snippet',
playlistId=uploads_playlist_id,
maxResults=max_results
).execute()
self.quota_used += 1
if not playlist_response['items']:
return []
video_ids = [item['snippet']['resourceId']['videoId'] for item in playlist_response['items']]
return self._get_video_details(video_ids)
except HttpError as e:
self._handle_api_error(e)
except Exception as e:
logger.error(f"Video retrieval error: {e}")
raise YouTubeAPIError(f"Video retrieval failed: {e}")
def _get_video_details(self, video_ids: List[str]) -> List[YouTubeVideo]:
"""Get video details"""
videos_response = self.youtube.videos().list(
part='snippet,statistics,contentDetails',
id=','.join(video_ids)
).execute()
self.quota_used += 1
videos = []
for item in videos_response['items']:
snippet = item['snippet']
statistics = item.get('statistics', {})
content_details = item['contentDetails']
# Convert ISO 8601 duration to user-friendly format
try:
duration_iso = content_details.get('duration', 'PT0S')
duration_seconds = int(isodate.parse_duration(duration_iso).total_seconds())
hours = duration_seconds // 3600
minutes = (duration_seconds % 3600) // 60
seconds = duration_seconds % 60
if hours > 0:
duration_str = f"{hours}:{minutes:02d}:{seconds:02d}"
else:
duration_str = f"{minutes}:{seconds:02d}"
except:
duration_str = "N/A"
video = YouTubeVideo(
id=item['id'],
title=snippet['title'],
description=snippet['description'],
channel_id=snippet['channelId'],
channel_title=snippet['channelTitle'],
published_at=datetime.fromisoformat(snippet['publishedAt'].replace('Z', '+00:00')),
duration=duration_str,
view_count=int(statistics.get('viewCount', 0)),
like_count=int(statistics.get('likeCount', 0)),
comment_count=int(statistics.get('commentCount', 0)),
thumbnail_url=snippet['thumbnails']['high']['url'],
tags=snippet.get('tags', [])
)
videos.append(video)
return videos
def get_video_comments(self, video_id: str, max_results: int = 1000,
order: str = "relevance") -> List[YouTubeComment]:
"""Get comments"""
try:
comments = []
next_page_token = None
collected = 0
while collected < max_results:
batch_size = min(100, max_results - collected)
comment_response = self.youtube.commentThreads().list(
part="snippet",
videoId=video_id,
maxResults=batch_size,
order=order,
pageToken=next_page_token
).execute()
self.quota_used += 1
for item in comment_response['items']:
comment_snippet = item['snippet']['topLevelComment']['snippet']
comment = YouTubeComment(
id=item['snippet']['topLevelComment']['id'],
text=comment_snippet['textDisplay'],
author_name=comment_snippet['authorDisplayName'],
author_channel_id=comment_snippet.get('authorChannelId', {}).get('value'),
like_count=comment_snippet['likeCount'],
published_at=datetime.fromisoformat(comment_snippet['publishedAt'].replace('Z', '+00:00')),
updated_at=datetime.fromisoformat(comment_snippet['updatedAt'].replace('Z', '+00:00')) if comment_snippet.get('updatedAt') else None,
reply_count=item['snippet']['totalReplyCount']
)
comments.append(comment)
collected = len(comments)
next_page_token = comment_response.get('nextPageToken')
if not next_page_token:
break
return comments
except HttpError as e:
if e.resp.status == 403 and 'commentsDisabled' in str(e):
logger.warning(f"Comments disabled for video: {video_id}")
return []
self._handle_api_error(e)
except Exception as e:
logger.error(f"Comment retrieval error: {e}")
raise YouTubeAPIError(f"Comment retrieval failed: {e}")
# ============================================================================
# Claude Analyzer
# ============================================================================
class ClaudeAnalyzer:
def __init__(self):
api_key = os.getenv('ANTHROPIC_API_KEY')
if not api_key:
raise ValueError("ANTHROPIC_API_KEY not found in environment variables")
self.client = anthropic.Anthropic(api_key=api_key)
self.model = "claude-3-haiku-20240307"
self.max_tokens = 4000
async def analyze_comments(
self,
comments: List[str],
video_title: str = "Unknown Video",
channel_name: str = "Unknown Channel"
) -> Dict[str, Any]:
"""Analyze comments for single video"""
try:
# Simple batch processing
batch_size = 50
all_analyses = []
for i in range(0, len(comments), batch_size):
batch_comments = comments[i:i + batch_size]
comments_text = "\n".join([f"Comment {j+1}: {comment}"
for j, comment in enumerate(batch_comments)])
prompt = f"""
You are a professional YouTube Creator Consultant. Analyze the viewer comments for the video "{video_title}" from channel "{channel_name}" and provide practical insights to help the creator grow their channel and increase revenue.
Comments to analyze:
{comments_text}
Please analyze from the following creator-focused perspectives:
## 🎯 Content Performance Analysis
1. **Success Factors of This Video**
- Specific parts viewers particularly enjoyed
- Elements mentioned as entertaining or engaging
- Points that drove actions like "subscribe", "like", "turn on notifications"
2. **Content Structure Feedback**
- Viewer reactions to video length
- Opinions on intro/outro segments
- Reactions to editing style, subtitles, background music
## 💡 Improvement Opportunities
3. **Immediate Improvement Points**
- Technical improvements needed (audio, video quality)
- Areas lacking sufficient explanation
- Content viewers want to see more of
4. **Next Video Ideas**
- Topics or series requested by viewers
- Follow-up content suggestions related to this video
## 📈 Channel Growth Insights
5. **Subscriber Conversion Analysis**
- Elements that influenced subscription decisions
- Warning signs of potential viewer churn
6. **Community Engagement**
- Factors that increase comment participation
- Viewer interaction patterns
Please provide specific, actionable recommendations with sentiment analysis including specific percentages (e.g., "60% positive, 25% neutral, 15% negative").
"""
response = self.client.messages.create(
model=self.model,
max_tokens=self.max_tokens,
messages=[{"role": "user", "content": prompt}]
)
all_analyses.append(response.content[0].text)
await asyncio.sleep(0.2) # Rate limiting
# Combine all analyses
final_analysis = "\n\n".join(all_analyses)
return {
"success": True,
"data": final_analysis,
"metadata": {
"total_comments": len(comments),
"video_title": video_title,
"channel_name": channel_name
}
}
except Exception as e:
logger.error(f"Analysis error: {str(e)}")
return {
"success": False,
"error": str(e),
"data": None
}
async def analyze_multi_video_comments(
self,
comments: List[str],
videos_info: List[Dict],
channel_name: str = "Unknown Channel"
) -> Dict[str, Any]:
"""Analyze comments for multiple videos"""
try:
# Prepare video information context
videos_context = ""
for i, video_info in enumerate(videos_info, 1):
videos_context += f"{i}. '{video_info['title']}' (Views: {video_info.get('view_count', 'N/A')}, Comments: {video_info['collected_comments_count']})\n"
# Process comments in batches
batch_size = 75
all_analyses = []
for i in range(0, len(comments), batch_size):
batch_comments = comments[i:i + batch_size]
comments_text = "\n".join([f"Comment {j+1}: {comment}"
for j, comment in enumerate(batch_comments)])
prompt = f"""
You are a professional YouTube Channel Strategy Consultant. Analyze comments collected from multiple videos of channel "{channel_name}" and provide strategic insights for channel growth and revenue increase.
📺 **Videos Analyzed:**
{videos_context}
💬 **Collected Comments:**
{comments_text}
Please analyze from the following channel strategy perspectives:
## 🎯 Channel Identity & Branding Analysis
1. **Core Channel Values & Identity**
- How viewers categorize this channel
- Unique characteristics or differentiation points
- Creator personal brand perception
2. **Brand Consistency & Recognition**
- Consistency in tone and manner across videos
- Content style expectations from viewers
## 📊 Audience Analysis & Targeting
3. **Viewer Demographics Analysis**
- Main viewer characteristics (inferred from comment tone)
- Characteristics of loyal core fanbase
- Differences between new and existing subscribers
4. **Community Culture & Engagement**
- Comment participation patterns and communication styles
- Level of fanbase culture development
## 🚀 Content Strategy & Growth Opportunities
5. **Content Performance Patterns**
- Best-performing content types
- High-engagement elements
6. **Expansion Possibilities & New Content Opportunities**
- New content directions requested by viewers
- Potential for collaboration opportunities
## 💰 Monetization & Business Opportunities
7. **Commercial Potential Assessment**
- Viewer acceptance of sponsorships
- Potential products or services for commercialization
8. **Competitive Analysis & Market Positioning**
- Differentiation factors vs competing channels
- Market position and growth potential
Please provide specific, actionable insights with sentiment analysis including percentages.
"""
response = self.client.messages.create(
model=self.model,
max_tokens=self.max_tokens,
messages=[{"role": "user", "content": prompt}]
)
all_analyses.append(response.content[0].text)
await asyncio.sleep(0.2)
# Combine all analyses
final_analysis = "\n\n".join(all_analyses)
return {
"success": True,
"data": final_analysis,
"metadata": {
"total_comments": len(comments),
"videos_count": len(videos_info),
"channel_name": channel_name
}
}
except Exception as e:
logger.error(f"Multi-video analysis error: {str(e)}")
return {
"success": False,
"error": str(e),
"data": None
}
# ============================================================================
# Main App Class
# ============================================================================
class ComprehensiveYouTubeAnalyzer:
def __init__(self):
self.logger = logger
self.youtube_client = None
self.claude_analyzer = None
# Analysis data storage
self._current_comments = []
self._current_video_title = ""
self._current_channel_name = ""
self._multi_comments = []
self._multi_videos_info = []
self._multi_channel_info = {}
self._shorts_comments = []
self._shorts_videos_info = []
self._shorts_channel_info = {}
self._latest_analysis = None
# Initialize clients
self._init_clients()
def _init_clients(self):
"""Initialize API clients"""
try:
# Check for API keys
youtube_key = os.getenv('YOUTUBE_API_KEY')
anthropic_key = os.getenv('ANTHROPIC_API_KEY')
print(f"🔑 YouTube API Key: {'✅ 있음' if youtube_key else '❌ 없음'}")
print(f"🔑 Anthropic API Key: {'✅ 있음' if anthropic_key else '❌ 없음'}")
if not youtube_key:
raise ValueError("YOUTUBE_API_KEY not found in environment variables")
if not anthropic_key:
raise ValueError("ANTHROPIC_API_KEY not found in environment variables")
# Initialize clients
self.youtube_client = YouTubeClient()
self.claude_analyzer = ClaudeAnalyzer()
print("✅ 모든 클라이언트 초기화 성공")
self.logger.info("✅ All clients initialized successfully")
except Exception as e:
self.logger.error(f"❌ Client initialization error: {str(e)}")
print(f"❌ 초기화 오류: {str(e)}")
# Set clients to None for graceful degradation
self.youtube_client = None
self.claude_analyzer = None
def _check_clients(self):
"""Check if clients are properly initialized"""
if not self.youtube_client:
return "❌ **YouTube API client not initialized**\n\nPlease check if YOUTUBE_API_KEY is properly set in the environment."
if not self.claude_analyzer:
return "❌ **Claude AI client not initialized**\n\nPlease check if ANTHROPIC_API_KEY is properly set in the environment."
return None
def _extract_channel_id(self, selected_channel: str) -> Optional[str]:
"""Extract channel ID from dropdown selection"""
if not selected_channel:
return None
match = re.search(r'\(([^)]+)\)$', selected_channel)
return match.group(1) if match else None
def _extract_video_id(self, selected_video: str) -> Optional[str]:
"""Extract video ID from dropdown selection"""
if not selected_video:
return None
match = re.search(r'\(([^)]+)\)$', selected_video)
return match.group(1) if match else None
def search_channels(self, creator_name: str):
"""Search for YouTube channels - supports both name and channel ID"""
error_check = self._check_clients()
if error_check:
return error_check, gr.update(choices=[])
if not creator_name.strip():
return "❌ Please enter a creator name or channel ID.", gr.update(choices=[])
try:
self.logger.info(f"Searching for channels: {creator_name}")
# Check if input is a channel ID (UC format)
channel_id_pattern = r'^UC[a-zA-Z0-9_-]{22}$'
if re.match(channel_id_pattern, creator_name.strip()):
# Direct channel ID lookup
channel_id = creator_name.strip()
channels_response = self.youtube_client.youtube.channels().list(
part='snippet,statistics',
id=channel_id
).execute()
if channels_response.get('items'):
channel_item = channels_response['items'][0]
snippet = channel_item['snippet']
statistics = channel_item.get('statistics', {})
channel = YouTubeChannel(
id=channel_item['id'],
title=snippet['title'],
description=snippet['description'],
subscriber_count=int(statistics.get('subscriberCount', 0)),
video_count=int(statistics.get('videoCount', 0)),
view_count=int(statistics.get('viewCount', 0)),
thumbnail_url=snippet['thumbnails']['default']['url'],
custom_url=snippet.get('customUrl')
)
channels = [channel]
else:
return f"❌ Channel not found with ID '{channel_id}'.", gr.update(choices=[])
else:
# Regular search by name
channels = self.youtube_client.search_channels(creator_name, max_results=5)
if channels:
result_text = f"✅ **Found {len(channels)} channels for '{creator_name}'**\n\n"
options = []
for i, channel in enumerate(channels, 1):
result_text += f"**{i}. {channel.title}**\n"
result_text += f"- Subscribers: {channel.subscriber_count or 'Hidden'}\n"
result_text += f"- Videos: {channel.video_count or 'N/A'}\n\n"
options.append(f"{channel.title} ({channel.id})")
return result_text, gr.update(choices=options, value=options[0] if options else None)
else:
return f"❌ No channels found for '{creator_name}'", gr.update(choices=[])
except Exception as e:
self.logger.error(f"Search error: {str(e)}")
return f"❌ Search error: {str(e)}", gr.update(choices=[])
def get_videos(self, selected_channel: str, max_videos: int):
"""Get video list from selected channel"""
error_check = self._check_clients()
if error_check:
return error_check, gr.update(choices=[])
channel_id = self._extract_channel_id(selected_channel)
if not channel_id:
return "❌ Please select a channel first.", gr.update(choices=[])
try:
self.logger.info(f"Getting videos for channel: {channel_id}")
videos = self.youtube_client.get_channel_videos(channel_id, max_results=max_videos)
if videos:
result_text = f"✅ **Found {len(videos)} recent videos**\n\n"
options = []
for i, video in enumerate(videos, 1):
result_text += f"**{i}. {video.title}**\n"
result_text += f"- Views: {video.view_count or 'N/A'}\n"
result_text += f"- Comments: {video.comment_count or 'N/A'}\n"
result_text += f"- Duration: {video.duration or 'N/A'}\n\n"
options.append(f"{video.title} ({video.id})")
return result_text, gr.update(choices=options, value=options[0] if options else None)
else:
return "❌ No videos found for this channel.", gr.update(choices=[])
except Exception as e:
self.logger.error(f"Video retrieval error: {str(e)}")
return f"❌ Error retrieving videos: {str(e)}", gr.update(choices=[])
def collect_single_comments(self, selected_video: str, max_comments: int):
"""Collect comments for single video analysis"""
error_check = self._check_clients()
if error_check:
return error_check
video_id = self._extract_video_id(selected_video)
if not video_id:
return "❌ Please select a video first."
try:
self.logger.info(f"Collecting comments for video: {video_id}")
comments = self.youtube_client.get_video_comments(
video_id,
max_results=max_comments,
order="relevance"
)
if comments:
self._current_comments = [comment.text for comment in comments]
self._current_video_title = selected_video.split(' (')[0]
result_text = f"✅ **Successfully collected {len(comments)} comments**\n\n"
result_text += "**Comment Preview:**\n\n"
for i, comment in enumerate(comments[:3], 1):
preview_text = comment.text[:100] + "..." if len(comment.text) > 100 else comment.text
result_text += f"{i}. **{comment.author_name}**: {preview_text}\n\n"
result_text += f"📊 **Ready for analysis!** Click 'Claude AI Analysis' button below."
return result_text
else:
return "❌ No comments found. This video might have comments disabled."
except Exception as e:
if "commentsDisabled" in str(e):
return "❌ **Comments are disabled** for this video. Please select another video."
return f"❌ Error collecting comments: {str(e)}"
def collect_multi_comments(self, selected_channel: str, num_videos: int, comments_per_video: int):
"""Collect multi-video comments"""
error_check = self._check_clients()
if error_check:
return error_check
channel_id = self._extract_channel_id(selected_channel)
if not channel_id:
return "❌ Please select a channel."
try:
self._multi_channel_info = {
'name': selected_channel.split(' (')[0],
'id': channel_id
}
videos = self.youtube_client.get_channel_videos(channel_id, max_results=num_videos)
if not videos:
return "❌ No videos found."
return self._process_video_comments(videos, comments_per_video, "multi", "Multi-Video")
except Exception as e:
return f"❌ Multi-comment collection error: {str(e)}"
def collect_shorts_comments(self, selected_channel: str, num_shorts: int, comments_per_short: int):
"""Collect YouTube Shorts comments (fallback to regular videos)"""
error_check = self._check_clients()
if error_check:
return error_check
channel_id = self._extract_channel_id(selected_channel)
if not channel_id:
return "❌ Please select a channel."
try:
self._shorts_channel_info = {
'name': selected_channel.split(' (')[0],
'id': channel_id
}
# Get regular videos and filter for shorts-like content
videos = self.youtube_client.get_channel_videos(channel_id, max_results=num_shorts * 2)
# Filter for short-duration videos (approximate Shorts detection)
shorts_candidates = []
for video in videos:
if video.duration and ':' in video.duration:
duration_parts = video.duration.split(':')
if len(duration_parts) == 2: # MM:SS format
minutes = int(duration_parts[0])
if minutes <= 1: # 1 minute or less
shorts_candidates.append(video)
if len(shorts_candidates) >= num_shorts:
break
if not shorts_candidates:
# Fallback to recent videos
shorts_candidates = videos[:num_shorts]
if not shorts_candidates:
return "❌ No suitable content found for Shorts analysis."
return self._process_video_comments(shorts_candidates, comments_per_short, "shorts", "Shorts")
except Exception as e:
return f"❌ Shorts comment collection error: {str(e)}"
def _process_video_comments(self, videos: List, comments_per_video: int, content_type: str, display_name: str):
"""Generic method to process video comments"""
channel_info = self._multi_channel_info if content_type == "multi" else self._shorts_channel_info
result_text = f"✅ **{display_name} comment collection started**\n\n"
result_text += f"Channel: **{channel_info['name']}**\n"
result_text += f"Target {display_name}: **{len(videos)} videos**\n"
result_text += f"Comments per Video: **Max {comments_per_video}** (Most Popular)\n\n"
all_comments = []
videos_info = []
successful_videos = 0
for i, video in enumerate(videos, 1):
try:
title_preview = video.title[:40] if content_type == "shorts" else video.title[:50]
result_text += f"**{i}/{len(videos)}** Processing: {title_preview}...\n"
comments = self.youtube_client.get_video_comments(
video.id, max_results=comments_per_video, order="relevance"
)
if comments:
comment_texts = [comment.text for comment in comments]
all_comments.extend(comment_texts)
video_info = {
'title': video.title,
'id': video.id,
'collected_comments_count': len(comments),
'actual_comment_count': video.comment_count,
'view_count': video.view_count or 0,
'like_count': video.like_count or 0,
'published_at': video.published_at,
'duration': video.duration
}
if content_type == "shorts":
video_info['is_shorts'] = True
videos_info.append(video_info)
successful_videos += 1
duration_info = f" (Duration: {video.duration})" if content_type == "shorts" else ""
result_text += f" ✅ {len(comments)} comments collected{duration_info}\n"
else:
result_text += f" ⚠️ No comments (disabled or restricted)\n"
except Exception as e:
result_text += f" ❌ Error: {str(e)[:50]}...\n"
continue
# Store results
if content_type == "multi":
self._multi_comments = all_comments
self._multi_videos_info = videos_info
else: # shorts
self._shorts_comments = all_comments
self._shorts_videos_info = videos_info
result_text += f"\n📊 **{display_name} Collection Complete!**\n"
result_text += f"- Successful Videos: **{successful_videos}**\n"
result_text += f"- Total Comments: **{len(all_comments)}**\n"
result_text += f"- Sort Order: **Most Popular**\n"
result_text += f"- Average Comments per Video: **{len(all_comments) // max(successful_videos, 1)}**\n\n"
button_text = "Multi-Video Analysis" if content_type == "multi" else "Shorts Analysis"
result_text += f"🤖 **Click '{button_text}' button below.**"
return result_text
# Analysis methods
def analyze_single_comments(self):
"""Single video comment analysis"""
return asyncio.run(self._analyze_single_comments_async())
def analyze_multi_comments(self):
"""Multi-video comment analysis"""
return asyncio.run(self._analyze_multi_comments_async())
def analyze_shorts_comments(self):
"""Shorts comment analysis"""
return asyncio.run(self._analyze_shorts_comments_async())
async def _analyze_single_comments_async(self):
"""Single video analysis implementation"""
error_check = self._check_clients()
if error_check:
return error_check, None, None, None, None
if not self._current_comments:
return "❌ No comments to analyze. Please collect comments first.", None, None, None, None
try:
self.logger.info(f"Starting analysis of {len(self._current_comments)} comments")
result = await self.claude_analyzer.analyze_comments(
comments=self._current_comments,
video_title=self._current_video_title,
channel_name=self._current_channel_name or "Selected Channel"
)
if result.get("success", False):
self._latest_analysis = {
'type': 'single_video',
'video_title': self._current_video_title,
'comments_count': len(self._current_comments),
'analysis_text': result['data'],
'timestamp': datetime.now()
}
# Generate visualizations
charts = self._create_single_video_charts()
wordcloud_html = self._create_wordcloud(self._current_comments, "Video Keywords")
formatted_result = f"""## 🎯 Single Video Analysis Results
### 📊 Analysis Overview
- **Video**: {self._current_video_title}
- **Comments Analyzed**: {len(self._current_comments)}
- **Analysis Time**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
---
{result['data']}
---
### 📈 Generated Visualizations
Charts and WordCloud are displayed in the sections below.
"""
return (
formatted_result,
charts.get('sentiment'),
charts.get('participation'),
charts.get('reactions'),
wordcloud_html
)
else:
error_msg = f"❌ Analysis failed: {result.get('error', 'Unknown error')}"
return error_msg, None, None, None, None
except Exception as e:
self.logger.error(f"Analysis error: {str(e)}")
error_msg = f"❌ Analysis error: {str(e)}"
return error_msg, None, None, None, None
async def _analyze_multi_comments_async(self):
"""Multi-video analysis implementation"""
error_check = self._check_clients()
if error_check:
return error_check, None, None, None, None, None
if not self._multi_comments:
return "❌ No comments to analyze. Please collect multi-video comments first.", None, None, None, None, None
try:
result = await self.claude_analyzer.analyze_multi_video_comments(
comments=self._multi_comments,
videos_info=self._multi_videos_info,
channel_name=self._multi_channel_info['name']
)
if result.get("success", False):
self._latest_analysis = {
'type': 'multi_video',
'channel_name': self._multi_channel_info['name'],
'videos_count': len(self._multi_videos_info),
'total_comments': len(self._multi_comments),
'videos_info': self._multi_videos_info,
'analysis_text': result['data'],
'timestamp': datetime.now()
}
charts = self._create_multi_video_charts()
wordcloud_html = self._create_wordcloud(self._multi_comments, "Channel Keywords")
formatted_result = f"""## 🎯 Multi-Video Analysis Results
### 📊 Analysis Overview
- **Channel**: {self._multi_channel_info['name']}
- **Videos Analyzed**: {len(self._multi_videos_info)}
- **Total Comments**: {len(self._multi_comments)}
- **Analysis Time**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
### 🎬 Video List
"""
for i, video_info in enumerate(self._multi_videos_info, 1):
formatted_result += f"{i}. **{video_info['title']}** ({video_info['collected_comments_count']} comments)\n"
formatted_result += f"""
---
{result['data']}
---
### 📈 Comprehensive Channel Analytics
Charts and WordCloud generated based on multi-video analysis results.
"""
return (
formatted_result,
charts.get('sentiment'),
charts.get('participation'),
charts.get('video_interest_trends'),
charts.get('competitive'),
wordcloud_html
)
else:
error_msg = f"❌ Analysis failed: {result.get('error', 'Unknown error')}"
return error_msg, None, None, None, None, None
except Exception as e:
self.logger.error(f"Multi-video analysis error: {str(e)}")
error_msg = f"❌ Analysis error: {str(e)}"
return error_msg, None, None, None, None, None
async def _analyze_shorts_comments_async(self):
"""Shorts analysis implementation"""
error_check = self._check_clients()
if error_check:
return error_check, None, None, None, None, None
if not self._shorts_comments:
return "❌ No Shorts comments to analyze. Please collect Shorts comments first.", None, None, None, None, None
try:
# Basic Claude analysis
result = await self.claude_analyzer.analyze_multi_video_comments(
comments=self._shorts_comments,
videos_info=self._shorts_videos_info,
channel_name=self._shorts_channel_info['name']
)
if result.get("success", False):
self._latest_analysis = {
'type': 'shorts_analysis',
'channel_name': self._shorts_channel_info['name'],
'shorts_count': len(self._shorts_videos_info),
'total_comments': len(self._shorts_comments),
'shorts_info': self._shorts_videos_info,
'analysis_text': result['data'],
'timestamp': datetime.now()
}
# Shorts-specific analysis
shorts_characteristics = self._analyze_shorts_characteristics()
charts = self._create_shorts_charts()
wordcloud_html = self._create_wordcloud(self._shorts_comments, "Shorts Keywords")
# Comparison with regular videos if available
comparison_text = ""
if self._multi_comments:
comparison_text = f"\n\n---\n\n## 🔄 Format Comparison\n\n{self._compare_shorts_vs_regular()}"
formatted_result = f"""## 🎬 YouTube Shorts Analysis Results
### 📊 Analysis Overview
- **Channel**: {self._shorts_channel_info['name']}
- **Shorts Analyzed**: {len(self._shorts_videos_info)}
- **Total Comments**: {len(self._shorts_comments)}
- **Analysis Time**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
### 🎬 Shorts List
"""
for i, short_info in enumerate(self._shorts_videos_info, 1):
formatted_result += f"{i}. **{short_info['title']}** ({short_info['collected_comments_count']} comments, {short_info.get('duration', 'N/A')})\n"
formatted_result += f"""
---
### 🤖 Claude AI Analysis
{result['data']}
---
{shorts_characteristics}
{comparison_text}
---
### 📈 Shorts Analytics
Charts and WordCloud generated based on Shorts analysis results.
"""
return (
formatted_result,
charts.get('optimization_score'),
charts.get('engagement_types'),
charts.get('participation'),
charts.get('sentiment'),
wordcloud_html
)
else:
error_msg = f"❌ Shorts analysis failed: {result.get('error', 'Unknown error')}"
return error_msg, None, None, None, None, None
except Exception as e:
self.logger.error(f"Shorts analysis error: {str(e)}")
error_msg = f"❌ Analysis error: {str(e)}"
return error_msg, None, None, None, None, None
def _analyze_shorts_characteristics(self) -> str:
"""Basic Shorts characteristics analysis"""
try:
if not self._shorts_comments:
return "❌ No Shorts data available for characteristics analysis."
# Simple characteristics analysis
result_text = "## 🎬 Shorts Characteristics Analysis\n\n"
# Calculate basic metrics
avg_length = sum(len(comment) for comment in self._shorts_comments) / len(self._shorts_comments)
short_comments = sum(1 for comment in self._shorts_comments if len(comment) <= 30)
emoji_comments = sum(1 for comment in self._shorts_comments if re.search(r'[😀-🙏]', comment))
short_ratio = (short_comments / len(self._shorts_comments)) * 100
emoji_ratio = (emoji_comments / len(self._shorts_comments)) * 100
# Calculate optimization score
optimization_score = (short_ratio * 0.4) + (emoji_ratio * 0.3) + min((50 / max(avg_length, 1)) * 30, 30)
result_text += f"**Shorts Optimization Score**: {optimization_score:.1f}/100\n\n"
result_text += f"### 📝 Comment Patterns\n"
result_text += f"- Average Length: {avg_length:.1f} characters\n"
result_text += f"- Short Comments: {short_ratio:.1f}%\n"
result_text += f"- Emoji Usage: {emoji_ratio:.1f}%\n\n"
if optimization_score >= 70:
result_text += "🎬 **Highly optimized for Shorts format** - Content shows strong Shorts characteristics\n"
elif optimization_score >= 40:
result_text += "📱 **Moderately suitable for Shorts** - Good potential with optimization\n"
else:
result_text += "📺 **Better suited for regular videos** - Consider adapting content for Shorts format\n"
return result_text
except Exception as e:
self.logger.error(f"Shorts characteristics analysis error: {str(e)}")
return f"❌ Error analyzing Shorts characteristics: {str(e)}"
def _compare_shorts_vs_regular(self) -> str:
"""Basic comparison between Shorts and regular videos"""
try:
if not self._multi_comments or not self._shorts_comments:
return "❌ Need both regular video and Shorts data for comparison."
result_text = "## 📊 Shorts vs Regular Videos Comparison\n\n"
# Calculate averages
regular_avg_length = sum(len(comment) for comment in self._multi_comments) / len(self._multi_comments)
shorts_avg_length = sum(len(comment) for comment in self._shorts_comments) / len(self._shorts_comments)
regular_emoji = sum(1 for comment in self._multi_comments if re.search(r'[😀-🙏]', comment))
shorts_emoji = sum(1 for comment in self._shorts_comments if re.search(r'[😀-🙏]', comment))
regular_emoji_ratio = (regular_emoji / len(self._multi_comments)) * 100
shorts_emoji_ratio = (shorts_emoji / len(self._shorts_comments)) * 100
result_text += "### 👥 Audience Behavior\n"
result_text += f"- Regular Video Comments: {regular_avg_length:.1f} chars avg\n"
result_text += f"- Shorts Comments: {shorts_avg_length:.1f} chars avg\n"
result_text += f"- Regular Emoji Usage: {regular_emoji_ratio:.1f}%\n"
result_text += f"- Shorts Emoji Usage: {shorts_emoji_ratio:.1f}%\n\n"
# Recommendations
result_text += "### 💡 Recommendations\n"
if shorts_avg_length < regular_avg_length:
result_text += "- Shorts generate more concise, immediate reactions\n"
if shorts_emoji_ratio > regular_emoji_ratio:
result_text += "- Shorts audience uses more visual expressions (emojis)\n"
return result_text
except Exception as e:
self.logger.error(f"Comparison analysis error: {str(e)}")
return f"❌ Error comparing formats: {str(e)}"
# Chart creation methods
def _create_single_video_charts(self) -> Dict:
"""Create charts for single video analysis"""
charts = {}
try:
# Sentiment analysis
sentiment_data = self._extract_sentiment_from_analysis()
charts['sentiment'] = px.pie(
values=list(sentiment_data.values()),
names=list(sentiment_data.keys()),
title="💭 Sentiment Distribution",
color_discrete_sequence=['#28a745', '#dc3545', '#6c757d']
)
# Viewer participation
if self._current_comments:
participation_analysis = self._analyze_viewer_participation(self._current_comments)
if participation_analysis:
participation_types = list(participation_analysis.keys())
participation_counts = list(participation_analysis.values())
charts['participation'] = px.bar(
x=participation_counts,
y=participation_types,
orientation='h',
title="🎯 Viewer Participation Analysis",
labels={'x': 'Comment Count', 'y': 'Participation Type'},
color=participation_counts,
color_continuous_scale='viridis'
)
charts['participation'].update_layout(yaxis={'categoryorder': 'total ascending'})
# Viewer reactions
reaction_analysis = self._analyze_viewer_reactions(self._current_comments)
if reaction_analysis:
reaction_types = list(reaction_analysis.keys())
reaction_counts = list(reaction_analysis.values())
charts['reactions'] = px.bar(
x=reaction_counts,
y=reaction_types,
orientation='h',
title="🎭 Viewer Response Types",
labels={'x': 'Mention Count', 'y': 'Response Type'},
color=reaction_counts,
color_continuous_scale='Blues'
)
charts['reactions'].update_layout(yaxis={'categoryorder': 'total ascending'})
except Exception as e:
self.logger.error(f"Chart creation error: {str(e)}")
return charts
def _create_multi_video_charts(self) -> Dict:
"""Create charts for multi-video analysis"""
charts = {}
try:
# Channel sentiment
sentiment_data = self._extract_sentiment_from_analysis()
charts['sentiment'] = px.pie(
values=list(sentiment_data.values()),
names=list(sentiment_data.keys()),
title="📊 Channel Sentiment Analysis",
color_discrete_sequence=['#2E8B57', '#DC143C', '#4682B4']
)
# Channel participation
if self._multi_comments:
participation_analysis = self._analyze_viewer_participation(self._multi_comments)
if participation_analysis:
participation_types = list(participation_analysis.keys())
participation_counts = list(participation_analysis.values())
charts['participation'] = px.bar(
x=participation_counts,
y=participation_types,
orientation='h',
title="🎯 Channel Participation Analysis",
labels={'x': 'Comment Count', 'y': 'Participation Type'},
color=participation_counts,
color_continuous_scale='plasma'
)
charts['participation'].update_layout(yaxis={'categoryorder': 'total ascending'})
# Video interest trends
if self._multi_videos_info:
video_titles = [info['title'][:30] + '...' if len(info['title']) > 30 else info['title']
for info in self._multi_videos_info]
total_comment_counts = [info.get('actual_comment_count', 0) for info in self._multi_videos_info]
charts['video_interest_trends'] = px.bar(
x=video_titles,
y=total_comment_counts,
title="📊 Viewer Interest Trends by Video",
labels={'x': 'Video', 'y': 'Total Comments'},
color=total_comment_counts,
color_continuous_scale='viridis'
)
charts['video_interest_trends'].update_layout(xaxis_tickangle=45)
# Competitive advantage
if self._multi_comments:
competitive_analysis = self._analyze_competitive_advantage(self._multi_comments)
if competitive_analysis:
comp_types = list(competitive_analysis.keys())
comp_counts = list(competitive_analysis.values())
charts['competitive'] = px.pie(
values=comp_counts,
names=comp_types,
title="🏆 Channel Competitive Edge",
color_discrete_sequence=['#ff9999', '#66b3ff', '#99ff99', '#ffcc99', '#ff99cc']
)
except Exception as e:
self.logger.error(f"Multi-video chart creation error: {str(e)}")
return charts
def _create_shorts_charts(self) -> Dict:
"""Create charts for Shorts analysis"""
charts = {}
try:
if self._shorts_videos_info and self._shorts_comments:
# Shorts optimization score
optimization_analysis = self._analyze_shorts_optimization()
if optimization_analysis:
categories = list(optimization_analysis.keys())
scores = list(optimization_analysis.values())
charts['optimization_score'] = px.bar(
x=categories,
y=scores,
title="⚡ Shorts Optimization Score",
labels={'x': 'Optimization Factor', 'y': 'Score (0-100)'},
color=scores,
color_continuous_scale='RdYlGn'
)
charts['optimization_score'].update_layout(xaxis_tickangle=45)
# Engagement types
engagement_types = self._analyze_engagement_types(self._shorts_comments)
if engagement_types:
engagement_labels = list(engagement_types.keys())
engagement_counts = list(engagement_types.values())
charts['engagement_types'] = px.pie(
values=engagement_counts,
names=engagement_labels,
title="🎯 Viewer Engagement Types",
color_discrete_sequence=['#ff6b6b', '#4ecdc4', '#45b7d1', '#96ceb4', '#feca57']
)
# Participation analysis
participation_analysis = self._analyze_viewer_participation(self._shorts_comments)
if participation_analysis:
participation_types = list(participation_analysis.keys())
participation_counts = list(participation_analysis.values())
charts['participation'] = px.bar(
x=participation_counts,
y=participation_types,
orientation='h',
title="🎯 Shorts Participation Analysis",
labels={'x': 'Comment Count', 'y': 'Participation Type'},
color=participation_counts,
color_continuous_scale='viridis'
)
charts['participation'].update_layout(yaxis={'categoryorder': 'total ascending'})
# Sentiment
sentiment_data = self._extract_sentiment_from_analysis()
charts['sentiment'] = px.pie(
values=list(sentiment_data.values()),
names=list(sentiment_data.keys()),
title="📊 Shorts Sentiment Analysis",
color_discrete_sequence=['#2E8B57', '#DC143C', '#4682B4']
)
except Exception as e:
self.logger.error(f"Shorts chart creation error: {str(e)}")
return charts
# Analysis helper methods
def _analyze_viewer_participation(self, comments: List[str]) -> Dict[str, int]:
"""Analyze viewer participation patterns"""
if not comments:
return {}
participation_patterns = {
'🎉 Enthusiastic Reactions': ['대박', '완전', '미쳤다', 'amazing', 'incredible', 'awesome', '짱', '🔥', '💯'],
'💬 Detailed Feedback': ['생각', '느낌', '의견', 'think', 'feel', 'opinion', '추천', 'recommend'],
'❓ Questions & Curiosity': ['궁금', '질문', '뭐야', '어떻게', 'what', 'how', 'why', '?'],
'🔔 Engagement Actions': ['구독', '좋아요', '알림', 'subscribe', 'like', 'notification', '팔로우'],
'💡 Content Requests': ['해주세요', '만들어', '다음', '또', 'please', 'more', 'next']
}
participation_counts = {category: 0 for category in participation_patterns.keys()}
for comment in comments:
comment_lower = comment.lower()
for category, keywords in participation_patterns.items():
if any(keyword in comment_lower for keyword in keywords):
participation_counts[category] += 1
return {k: v for k, v in participation_counts.items() if v > 0}
def _analyze_viewer_reactions(self, comments: List[str]) -> Dict[str, int]:
"""Analyze viewer reaction types"""
if not comments:
return {}
reaction_patterns = {
'👍 Praise/Positive': ['좋', '최고', '대박', 'good', 'great', 'awesome', '👍', '❤️', '재미'],
'❓ Questions/Curiosity': ['궁금', '질문', '뭐야', 'what', 'how', 'why', '?'],
'📞 Subscribe/Engagement': ['구독', '좋아요', '알림', 'subscribe', 'like', 'notification'],
'💡 Suggestions/Requests': ['해주세요', '만들어', '다음에', 'please', '요청', '추천']
}
reaction_counts = {category: 0 for category in reaction_patterns.keys()}
for comment in comments:
comment_lower = comment.lower()
for category, keywords in reaction_patterns.items():
if any(keyword in comment_lower for keyword in keywords):
reaction_counts[category] += 1
return {k: v for k, v in reaction_counts.items() if v > 0}
def _analyze_competitive_advantage(self, comments: List[str]) -> Dict[str, int]:
"""Analyze competitive advantage indicators"""
if not comments:
return {}
advantage_indicators = {
'🏆 Direct Comparisons': 0,
'✨ Uniqueness Claims': 0,
'🔄 Channel Switching': 0,
'📢 Recommendation Intent': 0
}
comparison_keywords = ['다른', '비교', 'compared', 'unlike', 'different', 'better than']
uniqueness_keywords = ['유일', '처음', '독특', 'unique', 'first time', 'only', 'special']
switching_keywords = ['구독취소', '갈아탔다', 'unsubscribed', 'switched', '이제여기만']
recommendation_keywords = ['추천', '공유', '알려', 'recommend', 'share', 'tell others']
for comment in comments:
comment_lower = comment.lower()
if any(keyword in comment_lower for keyword in comparison_keywords):
advantage_indicators['🏆 Direct Comparisons'] += 1
if any(keyword in comment_lower for keyword in uniqueness_keywords):
advantage_indicators['✨ Uniqueness Claims'] += 1
if any(keyword in comment_lower for keyword in switching_keywords):
advantage_indicators['🔄 Channel Switching'] += 1
if any(keyword in comment_lower for keyword in recommendation_keywords):
advantage_indicators['📢 Recommendation Intent'] += 1
return {k: v for k, v in advantage_indicators.items() if v > 0}
def _analyze_shorts_optimization(self) -> Dict[str, float]:
"""Analyze Shorts optimization factors"""
if not self._shorts_comments:
return {}
comments = self._shorts_comments
# Calculate metrics
instant_reactions = ['와', '헐', '대박', 'wow', 'omg', '미쳤다']
instant_count = sum(1 for comment in comments
if any(reaction in comment.lower() for reaction in instant_reactions))
short_comments = sum(1 for comment in comments if len(comment) <= 30)
emoji_comments = sum(1 for comment in comments if re.search(r'[😀-🙏]', comment))
loop_keywords = ['다시', '또', 'again', 'replay', '반복']
loop_count = sum(1 for comment in comments
if any(keyword in comment.lower() for keyword in loop_keywords))
algo_keywords = ['추천', '떴다', 'recommended', 'fyp']
algo_count = sum(1 for comment in comments
if any(keyword in comment.lower() for keyword in algo_keywords))
return {
'Instant Reactions': (instant_count / len(comments)) * 100,
'Conciseness': (short_comments / len(comments)) * 100,
'Visual Reactions': (emoji_comments / len(comments)) * 100,
'Retention': (loop_count / len(comments)) * 100,
'Algorithm Response': (algo_count / len(comments)) * 100
}
def _analyze_engagement_types(self, comments: List[str]) -> Dict[str, int]:
"""Analyze engagement types for Shorts"""
if not comments:
return {}
engagement_types = {
'⚡ Instant Reactions': 0,
'💬 Opinion Expression': 0,
'❓ Questions/Curiosity': 0,
'🔄 Repeat Viewing': 0,
'📢 Sharing/Recommendation': 0
}
for comment in comments:
comment_lower = comment.lower()
comment_length = len(comment)
# Categorize based on content and length
instant_keywords = ['와', '헐', '대박', 'wow', 'omg', '👍', '🔥']
if (comment_length <= 20 and
any(keyword in comment_lower for keyword in instant_keywords)):
engagement_types['⚡ Instant Reactions'] += 1
elif ('?' in comment or '궁금' in comment_lower or
any(q in comment_lower for q in ['what', 'how', 'why', '뭐야'])):
engagement_types['❓ Questions/Curiosity'] += 1
elif any(keyword in comment_lower for keyword in ['다시', '또', 'again', 'replay']):
engagement_types['🔄 Repeat Viewing'] += 1
elif any(keyword in comment_lower for keyword in ['공유', '보내', 'share', '추천']):
engagement_types['📢 Sharing/Recommendation'] += 1
else:
engagement_types['💬 Opinion Expression'] += 1
return {k: v for k, v in engagement_types.items() if v > 0}
# def _extract_sentiment_from_analysis(self) -> Dict[str, float]:
# """Extract sentiment from analysis text or estimate from comments"""
# if self._latest_analysis:
# analysis_text = self._latest_analysis['analysis_text']
# # Try to extract sentiment percentages from analysis
# patterns = {
# 'Positive': [r'positive.*?(\d+(?:\.\d+)?)%?', r'(\d+(?:\.\d+)?)%?.*?positive'],
# 'Negative': [r'negative.*?(\d+(?:\.\d+)?)%?', r'(\d+(?:\.\d+)?)%?.*?negative'],
# 'Neutral': [r'neutral.*?(\d+(?:\.\d+)?)%?', r'(\d+(?:\.\d+)?)%?.*?neutral']
# }
# sentiment_values = {}
# for sentiment, pattern_list in patterns.items():
# for pattern in pattern_list:
# match = re.search(pattern, analysis_text, re.IGNORECASE)
# if match:
# sentiment_values[sentiment] = float(match.group(1))
# break
# if sentiment_values and sum(sentiment_values.values()) > 0:
# return sentiment_values
# # Fallback to estimation
# return self._estimate_sentiment_from_comments()
def _extract_sentiment_from_analysis(self) -> Dict[str, float]:
"""Extract sentiment from analysis text with improved parsing"""
# Claude 분석 결과에서 sentiment 추출 시도
if self._latest_analysis:
analysis_text = self._latest_analysis['analysis_text']
# 다양한 패턴으로 sentiment 퍼센트 찾기
sentiment_patterns = [
# "60% positive, 25% neutral, 15% negative" 형태
r'(\d+(?:\.\d+)?)%?\s*positive.*?(\d+(?:\.\d+)?)%?\s*neutral.*?(\d+(?:\.\d+)?)%?\s*negative',
r'(\d+(?:\.\d+)?)%?\s*positive.*?(\d+(?:\.\d+)?)%?\s*negative.*?(\d+(?:\.\d+)?)%?\s*neutral',
# "Positive: 60%, Negative: 15%, Neutral: 25%" 형태
r'positive.*?(\d+(?:\.\d+)?)%.*?negative.*?(\d+(?:\.\d+)?)%.*?neutral.*?(\d+(?:\.\d+)?)%',
r'positive.*?(\d+(?:\.\d+)?)%.*?neutral.*?(\d+(?:\.\d+)?)%.*?negative.*?(\d+(?:\.\d+)?)%',
# 개별 매칭
r'positive.*?(\d+(?:\.\d+)?)%',
r'negative.*?(\d+(?:\.\d+)?)%',
r'neutral.*?(\d+(?:\.\d+)?)%'
]
# 첫 번째 완전한 패턴 시도
for pattern in sentiment_patterns[:4]:
match = re.search(pattern, analysis_text, re.IGNORECASE)
if match:
groups = match.groups()
if len(groups) == 3:
# 패턴에 따라 순서 결정
if 'positive.*?neutral.*?negative' in pattern:
pos, neu, neg = float(groups[0]), float(groups[1]), float(groups[2])
elif 'positive.*?negative.*?neutral' in pattern:
pos, neg, neu = float(groups[0]), float(groups[1]), float(groups[2])
else:
pos, neg, neu = float(groups[0]), float(groups[1]), float(groups[2])
# 유효성 검사
total = pos + neg + neu
if 80 <= total <= 120: # 대략적으로 100% 근처
# 정규화
if total > 0:
return {
'Positive': (pos / total) * 100,
'Negative': (neg / total) * 100,
'Neutral': (neu / total) * 100
}
# 개별 매칭 시도
individual_sentiments = {}
pos_match = re.search(r'positive.*?(\d+(?:\.\d+)?)%?', analysis_text, re.IGNORECASE)
neg_match = re.search(r'negative.*?(\d+(?:\.\d+)?)%?', analysis_text, re.IGNORECASE)
neu_match = re.search(r'neutral.*?(\d+(?:\.\d+)?)%?', analysis_text, re.IGNORECASE)
if pos_match:
individual_sentiments['Positive'] = float(pos_match.group(1))
if neg_match:
individual_sentiments['Negative'] = float(neg_match.group(1))
if neu_match:
individual_sentiments['Neutral'] = float(neu_match.group(1))
# 3개 모두 찾았으면 사용
if len(individual_sentiments) == 3:
total = sum(individual_sentiments.values())
if total > 0:
return {k: (v / total) * 100 for k, v in individual_sentiments.items()}
# Claude에서 추출 실패시 댓글 기반 추정
return self._estimate_sentiment_from_comments()
def _estimate_sentiment_from_comments(self) -> Dict[str, float]:
"""Estimate sentiment from comment content"""
comments = (self._current_comments or self._multi_comments or
self._shorts_comments or [])
if not comments:
return {'Positive': 60, 'Negative': 20, 'Neutral': 20}
positive_keywords = [
# 영어
'good', 'great', 'awesome', 'amazing', 'excellent', 'love', 'like', 'best',
'wonderful', 'perfect', 'brilliant', 'fantastic', 'superb', 'incredible',
'outstanding', 'impressive', 'nice', 'cool', 'sweet', 'epic', 'fire',
# 한글
'좋', '멋지', '재미', '최고', '대박', '훌륭', '감사', '구독', '짱', '완전',
'굿', '쩐다', '지린다', '개좋', '꿀잼', '핵잼', '존잼', '레전드', '갓',
# 이모지
'👍', '❤️', '😍', '🔥', '💯', '🎉', '😊', '😂', '👏', '🙌', '✨', '⭐'
]
negative_keywords = [
# 영어
'bad', 'terrible', 'awful', 'hate', 'worst', 'boring', 'stupid', 'disappointed',
'annoying', 'useless', 'waste', 'fail', 'sucks', 'disgusting', 'horrible',
'trash', 'garbage', 'lame', 'weak', 'cringe',
# 한글
'별로', '싫', '안좋', '실망', '지루', '노잼', '재미없', '별거없', '구려',
'망작', '쓰레기', '최악', '짜증', '화나', '어이없', '헛소리',
# 이모지
'👎', '😞', '😠', '😡', '🤮', '💩', '😤', '🙄', '😒'
]
sample_size = min(100, len(comments))
sample_comments = comments[:sample_size]
positive_count = sum(1 for comment in sample_comments
if any(kw in comment.lower() for kw in positive_keywords))
negative_count = sum(1 for comment in sample_comments
if any(kw in comment.lower() for kw in negative_keywords))
neutral_count = sample_size - positive_count - negative_count
return {
'Positive': max(positive_count, 0),
'Negative': max(negative_count, 0),
'Neutral': max(neutral_count, 0)
}
def _create_wordcloud(self, comments: List[str], title: str = "Word Cloud") -> str:
"""Create WordCloud visualization with enhanced Korean font support and better filtering"""
try:
if not comments:
return "<p>No data available for WordCloud</p>"
# Extract keywords with improved filtering
all_text = ' '.join(comments)
english_words = re.findall(r'[a-zA-Z]{3,}', all_text)
korean_words = re.findall(r'[가-힣]{2,}', all_text)
all_words = english_words + korean_words
# Enhanced stopwords list including HTML tags and technical terms
enhanced_stopwords = {
# English common words
'the', 'and', 'this', 'that', 'with', 'have', 'will', 'from', 'they', 'been',
'are', 'was', 'but', 'not', 'you', 'all', 'can', 'her', 'his', 'she', 'for',
# YouTube/video related
'video', 'like', 'subscribe', 'comment', 'watch', 'channel', 'youtube',
# Korean common words
'이거', '그거', '진짜', '정말', '너무', '완전', '그냥', '좀', '약간',
'영상', '댓글', '구독', '좋아요', '채널', '유튜브',
# Web/HTML related (enhanced for your issue)
'www', 'http', 'https', 'amp', 'com', 'net', 'org', 'html', 'htm',
'href', 'link', 'url', 'src', 'img', 'div', 'span', 'class', 'style',
'nbsp', 'quot', 'amp', 'lt', 'gt', 'script', 'meta', 'head', 'body',
'title', 'alt', 'width', 'height', 'border', 'target', 'blank',
# Technical/encoding terms from your WordCloud
'ntoxymm', 'zhd', 'uckszu', 'pbkr', 'dzesm', 'yaya', 'yes', 'jineer',
'robak', 'xds', 'iew', 'yes', 'yeah', 'ugh', 'hmm',
# Additional filtering
'bit', 'ly', 'tinyurl', 'shortlink', 'goo', 'gl', 'yt', 'youtu', 'be',
'redirect', 'click', 'here', 'more', 'info', 'details'
}
# More aggressive filtering
filtered_words = []
for word in all_words:
word_lower = word.lower()
# Skip if in stopwords
if word_lower in enhanced_stopwords:
continue
# Skip if looks like HTML/technical gibberish
if len(word) >= 2 and not re.match(r'^[a-zA-Z가-힣]+$', word):
continue
# Skip very short words unless they're meaningful
if len(word) < 2:
continue
# Skip if it's mostly repeated characters
if len(set(word.lower())) <= 2 and len(word) > 3:
continue
filtered_words.append(word_lower)
if not filtered_words:
return "<p>No meaningful keywords available for WordCloud</p>"
word_freq = Counter(filtered_words)
# Try to use Korean font if available
font_path = None
korean_font_candidates = [
'Nanum-Bold.ttf', # Your uploaded font
'./Nanum-Bold.ttf',
'/usr/share/fonts/truetype/nanum/NanumGothic.ttf',
'/System/Library/Fonts/AppleGothic.ttf',
'/Windows/Fonts/malgun.ttf'
]
for candidate in korean_font_candidates:
if Path(candidate).exists():
font_path = candidate
logger.info(f"Using Korean font: {candidate}")
break
try:
# Generate WordCloud with Korean font support
if font_path:
wordcloud = WordCloud(
font_path=font_path,
width=900,
height=500,
background_color='white',
max_words=100,
colormap='viridis',
relative_scaling=0.4,
min_font_size=12,
prefer_horizontal=0.7,
max_font_size=80,
collocations=False
).generate_from_frequencies(word_freq)
else:
# Fallback to English-only WordCloud
english_only = [word for word in filtered_words if re.match(r'^[a-zA-Z]+$', word)]
if english_only:
english_freq = Counter(english_only)
wordcloud = WordCloud(
width=900,
height=500,
background_color='white',
max_words=80,
colormap='viridis',
relative_scaling=0.4,
min_font_size=12,
collocations=False
).generate_from_frequencies(english_freq)
else:
return "<p>No English keywords available for WordCloud</p>"
except Exception as wc_error:
logger.warning(f"WordCloud generation failed: {wc_error}")
# Final fallback
return f"<p>WordCloud generation failed: {str(wc_error)}</p>"
# Create image without displaying
plt.figure(figsize=(12, 6))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
# Convert to base64 efficiently
buffer = io.BytesIO()
plt.savefig(buffer, format='png', bbox_inches='tight', dpi=150,
facecolor='white', edgecolor='none')
plt.close() # Important: close to prevent display
buffer.seek(0)
img_base64 = base64.b64encode(buffer.getvalue()).decode()
return f'<img src="data:image/png;base64,{img_base64}" style="max-width: 100%; height: auto;" alt="{title} WordCloud">'
except Exception as e:
self.logger.error(f"WordCloud generation error: {str(e)}")
return f"<p>WordCloud generation failed: {str(e)}</p>"
# ============================================================================
# Gradio Interface
# ============================================================================
def create_comprehensive_interface():
"""Create the comprehensive Gradio interface"""
app = ComprehensiveYouTubeAnalyzer()
# Custom CSS
css = """
.gradio-container {
max-width: 1400px !important;
margin: auto !important;
}
"""
with gr.Blocks(
title="🎬 YouTube Comment Analyzer - MCP Hackathon 2025",
theme=gr.themes.Soft(),
css=css
) as demo:
gr.Markdown("""
# 🎬 YouTube Comment Analyzer
## MCP Hackathon 2025 - Track 1 Submission
**AI-powered comment sentiment analysis with comprehensive creator insights**
🔧 **MCP Server Implementation** | 📊 **Creator Intelligence** | 🎯 **Growth Optimization**
""")
# System Status - UPDATED with Shorts location clarification
with gr.Row():
gr.Markdown(f"""
### 🔧 System Status
- **YouTube API**: {'✅ Ready' if app.youtube_client else '❌ Not Available'}
- **Claude AI**: {'✅ Ready' if app.claude_analyzer else '❌ Not Available'}
- **MCP Server**: ✅ Implemented with 4 tools
- **Analysis Modes**: 🎬 Single Video | 📊 Multi-Video | ⚡ Shorts *(located in Multi-Video tab)*
""")
with gr.Tab("🎬 Single Video Analysis"):
gr.Markdown("### 1️⃣ Search for Channel")
with gr.Row():
creator_input_single = gr.Textbox(
label="Creator Name or Channel ID",
placeholder="Enter channel name (e.g., MrBeast) or Channel ID (e.g., UC-lHJZR3Gqxm24_Vd_AJ5Yw)",
scale=3
)
search_btn_single = gr.Button("🔍 Search", variant="primary", scale=1)
search_result_single = gr.Markdown()
channel_dropdown_single = gr.Dropdown(label="📺 Select Channel", choices=[])
gr.Markdown("### 2️⃣ Get Videos")
with gr.Row():
max_videos_single = gr.Slider(5, 50, 10, step=5, label="Max Videos to Retrieve")
get_videos_btn_single = gr.Button("📹 Get Videos", variant="secondary")
videos_result_single = gr.Markdown()
video_dropdown_single = gr.Dropdown(label="🎬 Select Video", choices=[])
gr.Markdown("### 3️⃣ Collect & Analyze Comments")
max_comments_single = gr.Slider(50, 1000, 200, step=50, label="Max Comments to Analyze")
with gr.Row():
collect_btn_single = gr.Button("💬 Collect Comments", variant="secondary")
analyze_btn_single = gr.Button("🤖 Analyze with Claude AI", variant="primary", size="lg")
comments_result_single = gr.Markdown()
analysis_result_single = gr.Markdown()
gr.Markdown("### 📈 Visual Analytics")
with gr.Row():
sentiment_chart_single = gr.Plot(label="Sentiment Distribution")
participation_chart_single = gr.Plot(label="Viewer Participation")
reaction_chart_single = gr.Plot(label="Response Types")
gr.Markdown("### 🔤 WordCloud")
wordcloud_display_single = gr.HTML(label="Keywords WordCloud")
with gr.Tab("📊 Multi-Video Analysis"):
with gr.Row():
# LEFT: Regular Videos
with gr.Column(scale=1):
gr.Markdown("## 📹 Regular Videos Analysis")
with gr.Row():
creator_input_multi = gr.Textbox(
label="Creator Name or Channel ID",
placeholder="Enter channel name or Channel ID"
)
search_btn_multi = gr.Button("🔍 Search", variant="primary")
search_result_multi = gr.Markdown()
channel_dropdown_multi = gr.Dropdown(label="📺 Select Channel", choices=[])
gr.Markdown("**Comment Sort Order**: Most Popular (Developer Configured)")
with gr.Row():
num_videos_multi = gr.Slider(3, 20, 5, step=1, label="Videos")
comments_per_video_multi = gr.Slider(50, 500, 100, step=50, label="Comments/Video")
collect_btn_multi = gr.Button("📊 Collect Comments", variant="secondary")
comments_result_multi = gr.Markdown()
analyze_btn_multi = gr.Button("🎯 Multi-Video Analysis", variant="primary")
analysis_result_multi = gr.Markdown()
gr.Markdown("### 📈 Channel Analytics")
sentiment_chart_multi = gr.Plot(label="Channel Sentiment")
participation_chart_multi = gr.Plot(label="Channel Participation")
interest_chart_multi = gr.Plot(label="Video Interest Trends")
competitive_chart_multi = gr.Plot(label="Competitive Edge")
gr.Markdown("### 🔤 Channel WordCloud")
wordcloud_display_multi = gr.HTML(label="Channel Keywords WordCloud")
# RIGHT: Shorts
with gr.Column(scale=1):
gr.Markdown("## ⚡ YouTube Shorts Analysis")
with gr.Row():
creator_input_shorts = gr.Textbox(
label="Creator Name or Channel ID",
placeholder="Enter channel name or Channel ID"
)
search_btn_shorts = gr.Button("🔍 Search", variant="primary")
search_result_shorts = gr.Markdown()
channel_dropdown_shorts = gr.Dropdown(label="📺 Select Channel", choices=[])
gr.Markdown("**Comment Sort Order**: Most Popular (Developer Configured)")
with gr.Row():
num_shorts = gr.Slider(3, 15, 5, step=1, label="Shorts")
comments_per_short = gr.Slider(25, 300, 50, step=25, label="Comments/Short")
collect_btn_shorts = gr.Button("⚡ Collect Comments", variant="secondary")
comments_result_shorts = gr.Markdown()
analyze_btn_shorts = gr.Button("🎭 Shorts Analysis", variant="primary")
analysis_result_shorts = gr.Markdown()
gr.Markdown("### 📈 Shorts Analytics")
optimization_chart_shorts = gr.Plot(label="Optimization Score")
engagement_chart_shorts = gr.Plot(label="Engagement Types")
participation_chart_shorts = gr.Plot(label="Participation Analysis")
sentiment_chart_shorts = gr.Plot(label="Sentiment Analysis")
gr.Markdown("### 🔤 Shorts WordCloud")
wordcloud_display_shorts = gr.HTML(label="Shorts Keywords WordCloud")
with gr.Tab("ℹ️ About & MCP Integration"):
gr.Markdown("""
## 🎯 Project Overview
This comprehensive YouTube Comment Analyzer demonstrates **Model Context Protocol (MCP)** implementation for advanced content creator intelligence. Our platform transforms raw comment data into strategic business insights across three specialized analysis scenarios.
### 🔧 MCP Server Implementation
- **4 Integrated MCP Tools**: Complete workflow from channel discovery to AI insights
- **Claude Desktop Compatible**: Seamless integration with MCP protocol
- **Real-time Processing**: Streaming analysis with comprehensive visual feedback
### 🎯 Three Analysis Scenarios
#### 🎬 **Single Video Deep Dive**
- Detailed performance breakdown for individual videos
- Specific feedback analysis and improvement recommendations
- Subscription conversion factor identification
- Technical and creative enhancement suggestions
#### 📊 **Multi-Video Channel Strategy**
- Cross-video pattern recognition and trend analysis
- Brand consistency evaluation and audience segmentation
- Content mix optimization and growth strategy development
- Long-term channel development planning
#### ⚡ **YouTube Shorts Optimization** *(Located in Multi-Video Tab)*
- Short-form content performance metrics and viral potential assessment
- Instant reaction pattern analysis and engagement optimization
- Format-specific recommendations and competitive positioning
- Shorts vs Regular video comparative analysis
### 🤖 AI-Powered Creator Intelligence
#### **Advanced Analytics Capabilities**
- **Sentiment Classification**: Multi-dimensional emotional analysis beyond basic positive/negative
- **Behavioral Insights**: Deep audience psychology and engagement pattern recognition
- **Competitive Intelligence**: Market positioning and differentiation analysis
- **Monetization Optimization**: Revenue growth opportunities and brand partnership insights
#### **Visual Intelligence Dashboard**
- **Real-time Charts**: Interactive sentiment, participation, and performance analytics
- **WordCloud Generation**: Keyword extraction with multilingual support and enhanced filtering
- **Trend Visualization**: Engagement patterns and audience behavior mapping
- **Comparative Analytics**: Cross-format and competitive benchmarking
### 🚀 Business Impact & Creator Value
#### **Immediate Actionable Results**
- **Content Optimization**: Specific technical and creative improvements
- **Audience Development**: Demographic insights and retention strategies
- **Growth Acceleration**: Data-driven subscriber acquisition tactics
- **Risk Management**: Early warning systems for potential issues
#### **Revenue Optimization**
- **Brand Partnership Matching**: Sponsor suitability analysis
- **Product Development**: Market demand identification from viewer requests
- **Monetization Strategy**: Multi-channel revenue stream optimization
- **ROI Measurement**: Performance tracking and business outcome correlation
### 🏆 MCP Hackathon Innovation
This project showcases the transformative potential of MCP protocol in creating practical, business-focused AI tools. By combining YouTube's vast content ecosystem with Claude's analytical capabilities through standardized MCP integration, we've developed a solution that directly addresses real-world creator challenges and drives measurable business outcomes.
### 🔬 Technical Excellence
- **Scalable Architecture**: Modular design supporting future enhancements
- **API Optimization**: Efficient resource usage with comprehensive error handling
- **Security Implementation**: Secure credential management and data protection
- **Multi-format Support**: Adaptive analysis for diverse content types
- **Enhanced Filtering**: Advanced WordCloud generation with HTML tag removal and Korean font support
**Built for Hugging Face MCP Hackathon 2025 - Track 1: MCP Server Implementation**
Demonstrating the future of AI-powered creator tools through practical MCP integration.
""")
# Event handlers - Single Video
search_btn_single.click(
app.search_channels,
inputs=[creator_input_single],
outputs=[search_result_single, channel_dropdown_single]
)
get_videos_btn_single.click(
app.get_videos,
inputs=[channel_dropdown_single, max_videos_single],
outputs=[videos_result_single, video_dropdown_single]
)
collect_btn_single.click(
app.collect_single_comments,
inputs=[video_dropdown_single, max_comments_single],
outputs=[comments_result_single]
)
analyze_btn_single.click(
app.analyze_single_comments,
outputs=[
analysis_result_single,
sentiment_chart_single,
participation_chart_single,
reaction_chart_single,
wordcloud_display_single
]
)
# Event handlers - Multi Video
search_btn_multi.click(
app.search_channels,
inputs=[creator_input_multi],
outputs=[search_result_multi, channel_dropdown_multi]
)
collect_btn_multi.click(
app.collect_multi_comments,
inputs=[channel_dropdown_multi, num_videos_multi, comments_per_video_multi],
outputs=[comments_result_multi]
)
analyze_btn_multi.click(
app.analyze_multi_comments,
outputs=[
analysis_result_multi,
sentiment_chart_multi,
participation_chart_multi,
interest_chart_multi,
competitive_chart_multi,
wordcloud_display_multi
]
)
# Event handlers - Shorts
search_btn_shorts.click(
app.search_channels,
inputs=[creator_input_shorts],
outputs=[search_result_shorts, channel_dropdown_shorts]
)
collect_btn_shorts.click(
app.collect_shorts_comments,
inputs=[channel_dropdown_shorts, num_shorts, comments_per_short],
outputs=[comments_result_shorts]
)
analyze_btn_shorts.click(
app.analyze_shorts_comments,
outputs=[
analysis_result_shorts,
optimization_chart_shorts,
engagement_chart_shorts,
participation_chart_shorts,
sentiment_chart_shorts,
wordcloud_display_shorts
]
)
return demo
# ============================================================================
# Launch App
# ============================================================================
if __name__ == "__main__":
print("🚀 Launching YouTube Comment Analyzer...")
demo = create_comprehensive_interface()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False
)