|
|
|
|
|
""" |
|
|
YouTube Comment Analyzer - Complete Self-contained Hugging Face Spaces App |
|
|
MCP Hackathon 2025 - Track 1 Submission |
|
|
All dependencies included in single file for security and simplicity |
|
|
""" |
|
|
|
|
|
import gradio as gr |
|
|
import asyncio |
|
|
import os |
|
|
import logging |
|
|
import re |
|
|
from typing import Dict, Any, Optional, Tuple, List |
|
|
from pathlib import Path |
|
|
import sys |
|
|
from collections import Counter |
|
|
from datetime import datetime |
|
|
import traceback |
|
|
import warnings |
|
|
import anthropic |
|
|
from googleapiclient.discovery import build |
|
|
from googleapiclient.errors import HttpError |
|
|
import isodate |
|
|
from dataclasses import dataclass |
|
|
|
|
|
|
|
|
warnings.filterwarnings('ignore') |
|
|
|
|
|
|
|
|
import plotly.express as px |
|
|
import pandas as pd |
|
|
|
|
|
|
|
|
import matplotlib |
|
|
matplotlib.use('Agg') |
|
|
import matplotlib.pyplot as plt |
|
|
plt.ioff() |
|
|
from wordcloud import WordCloud |
|
|
import base64 |
|
|
import io |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
print("=" * 60) |
|
|
print("🎬 YouTube Comment Analyzer - MCP Hackathon 2025") |
|
|
print("=" * 60) |
|
|
print(f"🔑 YouTube API Key: {'✅ 있음' if os.getenv('YOUTUBE_API_KEY') else '❌ 없음'}") |
|
|
print(f"🔑 Anthropic API Key: {'✅ 있음' if os.getenv('ANTHROPIC_API_KEY') else '❌ 없음'}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass |
|
|
class YouTubeChannel: |
|
|
id: str |
|
|
title: str |
|
|
description: str |
|
|
subscriber_count: Optional[int] = None |
|
|
video_count: Optional[int] = None |
|
|
view_count: Optional[int] = None |
|
|
thumbnail_url: Optional[str] = None |
|
|
custom_url: Optional[str] = None |
|
|
|
|
|
@dataclass |
|
|
class YouTubeVideo: |
|
|
id: str |
|
|
title: str |
|
|
description: str |
|
|
channel_id: str |
|
|
channel_title: str |
|
|
published_at: datetime |
|
|
duration: Optional[str] = None |
|
|
view_count: Optional[int] = None |
|
|
like_count: Optional[int] = None |
|
|
comment_count: Optional[int] = None |
|
|
thumbnail_url: Optional[str] = None |
|
|
tags: List[str] = None |
|
|
|
|
|
def __post_init__(self): |
|
|
if self.tags is None: |
|
|
self.tags = [] |
|
|
|
|
|
@dataclass |
|
|
class YouTubeComment: |
|
|
id: str |
|
|
text: str |
|
|
author_name: str |
|
|
author_channel_id: Optional[str] |
|
|
like_count: int |
|
|
published_at: datetime |
|
|
updated_at: Optional[datetime] = None |
|
|
reply_count: int = 0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class YouTubeAPIError(Exception): |
|
|
pass |
|
|
|
|
|
class QuotaExceededError(YouTubeAPIError): |
|
|
pass |
|
|
|
|
|
class YouTubeClient: |
|
|
def __init__(self): |
|
|
api_key = os.getenv('YOUTUBE_API_KEY') |
|
|
if not api_key: |
|
|
raise ValueError("YOUTUBE_API_KEY not found in environment variables") |
|
|
|
|
|
self.youtube = build('youtube', 'v3', developerKey=api_key) |
|
|
self.quota_used = 0 |
|
|
|
|
|
def _handle_api_error(self, error: HttpError): |
|
|
"""Handle API errors""" |
|
|
if error.resp.status == 403: |
|
|
error_details = error.error_details |
|
|
if any('quotaExceeded' in str(detail) for detail in error_details): |
|
|
raise QuotaExceededError("YouTube API daily quota exceeded.") |
|
|
|
|
|
logger.error(f"YouTube API error: {error}") |
|
|
raise YouTubeAPIError(f"YouTube API call failed: {error}") |
|
|
|
|
|
def search_channels(self, query: str, max_results: int = 5) -> List[YouTubeChannel]: |
|
|
"""Search channels""" |
|
|
try: |
|
|
logger.info(f"Channel search: {query}") |
|
|
|
|
|
|
|
|
search_response = self.youtube.search().list( |
|
|
q=query, |
|
|
part='snippet', |
|
|
type='channel', |
|
|
maxResults=max_results, |
|
|
regionCode='US' |
|
|
).execute() |
|
|
|
|
|
self.quota_used += 100 |
|
|
|
|
|
channels = [] |
|
|
if search_response.get('items'): |
|
|
|
|
|
channel_ids = [item['id']['channelId'] for item in search_response['items']] |
|
|
|
|
|
channels_response = self.youtube.channels().list( |
|
|
part='snippet,statistics', |
|
|
id=','.join(channel_ids) |
|
|
).execute() |
|
|
|
|
|
self.quota_used += 1 |
|
|
|
|
|
for item in channels_response['items']: |
|
|
snippet = item['snippet'] |
|
|
statistics = item.get('statistics', {}) |
|
|
|
|
|
channel = YouTubeChannel( |
|
|
id=item['id'], |
|
|
title=snippet['title'], |
|
|
description=snippet['description'], |
|
|
subscriber_count=int(statistics.get('subscriberCount', 0)), |
|
|
video_count=int(statistics.get('videoCount', 0)), |
|
|
view_count=int(statistics.get('viewCount', 0)), |
|
|
thumbnail_url=snippet['thumbnails']['default']['url'], |
|
|
custom_url=snippet.get('customUrl') |
|
|
) |
|
|
channels.append(channel) |
|
|
|
|
|
return channels |
|
|
|
|
|
except HttpError as e: |
|
|
self._handle_api_error(e) |
|
|
except Exception as e: |
|
|
logger.error(f"Channel search error: {e}") |
|
|
raise YouTubeAPIError(f"Channel search failed: {e}") |
|
|
|
|
|
def get_channel_videos(self, channel_id: str, max_results: int = 10, |
|
|
sort_by: str = 'recent') -> List[YouTubeVideo]: |
|
|
"""Get channel video list""" |
|
|
try: |
|
|
|
|
|
channel_response = self.youtube.channels().list( |
|
|
part='contentDetails', |
|
|
id=channel_id |
|
|
).execute() |
|
|
|
|
|
self.quota_used += 1 |
|
|
|
|
|
if not channel_response['items']: |
|
|
raise YouTubeAPIError(f"Channel not found: {channel_id}") |
|
|
|
|
|
uploads_playlist_id = channel_response['items'][0]['contentDetails']['relatedPlaylists']['uploads'] |
|
|
|
|
|
|
|
|
playlist_response = self.youtube.playlistItems().list( |
|
|
part='snippet', |
|
|
playlistId=uploads_playlist_id, |
|
|
maxResults=max_results |
|
|
).execute() |
|
|
|
|
|
self.quota_used += 1 |
|
|
|
|
|
if not playlist_response['items']: |
|
|
return [] |
|
|
|
|
|
video_ids = [item['snippet']['resourceId']['videoId'] for item in playlist_response['items']] |
|
|
return self._get_video_details(video_ids) |
|
|
|
|
|
except HttpError as e: |
|
|
self._handle_api_error(e) |
|
|
except Exception as e: |
|
|
logger.error(f"Video retrieval error: {e}") |
|
|
raise YouTubeAPIError(f"Video retrieval failed: {e}") |
|
|
|
|
|
def _get_video_details(self, video_ids: List[str]) -> List[YouTubeVideo]: |
|
|
"""Get video details""" |
|
|
videos_response = self.youtube.videos().list( |
|
|
part='snippet,statistics,contentDetails', |
|
|
id=','.join(video_ids) |
|
|
).execute() |
|
|
|
|
|
self.quota_used += 1 |
|
|
|
|
|
videos = [] |
|
|
for item in videos_response['items']: |
|
|
snippet = item['snippet'] |
|
|
statistics = item.get('statistics', {}) |
|
|
content_details = item['contentDetails'] |
|
|
|
|
|
|
|
|
try: |
|
|
duration_iso = content_details.get('duration', 'PT0S') |
|
|
duration_seconds = int(isodate.parse_duration(duration_iso).total_seconds()) |
|
|
|
|
|
hours = duration_seconds // 3600 |
|
|
minutes = (duration_seconds % 3600) // 60 |
|
|
seconds = duration_seconds % 60 |
|
|
|
|
|
if hours > 0: |
|
|
duration_str = f"{hours}:{minutes:02d}:{seconds:02d}" |
|
|
else: |
|
|
duration_str = f"{minutes}:{seconds:02d}" |
|
|
except: |
|
|
duration_str = "N/A" |
|
|
|
|
|
video = YouTubeVideo( |
|
|
id=item['id'], |
|
|
title=snippet['title'], |
|
|
description=snippet['description'], |
|
|
channel_id=snippet['channelId'], |
|
|
channel_title=snippet['channelTitle'], |
|
|
published_at=datetime.fromisoformat(snippet['publishedAt'].replace('Z', '+00:00')), |
|
|
duration=duration_str, |
|
|
view_count=int(statistics.get('viewCount', 0)), |
|
|
like_count=int(statistics.get('likeCount', 0)), |
|
|
comment_count=int(statistics.get('commentCount', 0)), |
|
|
thumbnail_url=snippet['thumbnails']['high']['url'], |
|
|
tags=snippet.get('tags', []) |
|
|
) |
|
|
videos.append(video) |
|
|
|
|
|
return videos |
|
|
|
|
|
def get_video_comments(self, video_id: str, max_results: int = 1000, |
|
|
order: str = "relevance") -> List[YouTubeComment]: |
|
|
"""Get comments""" |
|
|
try: |
|
|
comments = [] |
|
|
next_page_token = None |
|
|
collected = 0 |
|
|
|
|
|
while collected < max_results: |
|
|
batch_size = min(100, max_results - collected) |
|
|
|
|
|
comment_response = self.youtube.commentThreads().list( |
|
|
part="snippet", |
|
|
videoId=video_id, |
|
|
maxResults=batch_size, |
|
|
order=order, |
|
|
pageToken=next_page_token |
|
|
).execute() |
|
|
|
|
|
self.quota_used += 1 |
|
|
|
|
|
for item in comment_response['items']: |
|
|
comment_snippet = item['snippet']['topLevelComment']['snippet'] |
|
|
|
|
|
comment = YouTubeComment( |
|
|
id=item['snippet']['topLevelComment']['id'], |
|
|
text=comment_snippet['textDisplay'], |
|
|
author_name=comment_snippet['authorDisplayName'], |
|
|
author_channel_id=comment_snippet.get('authorChannelId', {}).get('value'), |
|
|
like_count=comment_snippet['likeCount'], |
|
|
published_at=datetime.fromisoformat(comment_snippet['publishedAt'].replace('Z', '+00:00')), |
|
|
updated_at=datetime.fromisoformat(comment_snippet['updatedAt'].replace('Z', '+00:00')) if comment_snippet.get('updatedAt') else None, |
|
|
reply_count=item['snippet']['totalReplyCount'] |
|
|
) |
|
|
comments.append(comment) |
|
|
|
|
|
collected = len(comments) |
|
|
next_page_token = comment_response.get('nextPageToken') |
|
|
|
|
|
if not next_page_token: |
|
|
break |
|
|
|
|
|
return comments |
|
|
|
|
|
except HttpError as e: |
|
|
if e.resp.status == 403 and 'commentsDisabled' in str(e): |
|
|
logger.warning(f"Comments disabled for video: {video_id}") |
|
|
return [] |
|
|
self._handle_api_error(e) |
|
|
except Exception as e: |
|
|
logger.error(f"Comment retrieval error: {e}") |
|
|
raise YouTubeAPIError(f"Comment retrieval failed: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ClaudeAnalyzer: |
|
|
def __init__(self): |
|
|
api_key = os.getenv('ANTHROPIC_API_KEY') |
|
|
if not api_key: |
|
|
raise ValueError("ANTHROPIC_API_KEY not found in environment variables") |
|
|
|
|
|
self.client = anthropic.Anthropic(api_key=api_key) |
|
|
self.model = "claude-3-haiku-20240307" |
|
|
self.max_tokens = 4000 |
|
|
|
|
|
async def analyze_comments( |
|
|
self, |
|
|
comments: List[str], |
|
|
video_title: str = "Unknown Video", |
|
|
channel_name: str = "Unknown Channel" |
|
|
) -> Dict[str, Any]: |
|
|
"""Analyze comments for single video""" |
|
|
try: |
|
|
|
|
|
batch_size = 50 |
|
|
all_analyses = [] |
|
|
|
|
|
for i in range(0, len(comments), batch_size): |
|
|
batch_comments = comments[i:i + batch_size] |
|
|
|
|
|
comments_text = "\n".join([f"Comment {j+1}: {comment}" |
|
|
for j, comment in enumerate(batch_comments)]) |
|
|
|
|
|
prompt = f""" |
|
|
You are a professional YouTube Creator Consultant. Analyze the viewer comments for the video "{video_title}" from channel "{channel_name}" and provide practical insights to help the creator grow their channel and increase revenue. |
|
|
|
|
|
Comments to analyze: |
|
|
{comments_text} |
|
|
|
|
|
Please analyze from the following creator-focused perspectives: |
|
|
|
|
|
## 🎯 Content Performance Analysis |
|
|
1. **Success Factors of This Video** |
|
|
- Specific parts viewers particularly enjoyed |
|
|
- Elements mentioned as entertaining or engaging |
|
|
- Points that drove actions like "subscribe", "like", "turn on notifications" |
|
|
|
|
|
2. **Content Structure Feedback** |
|
|
- Viewer reactions to video length |
|
|
- Opinions on intro/outro segments |
|
|
- Reactions to editing style, subtitles, background music |
|
|
|
|
|
## 💡 Improvement Opportunities |
|
|
3. **Immediate Improvement Points** |
|
|
- Technical improvements needed (audio, video quality) |
|
|
- Areas lacking sufficient explanation |
|
|
- Content viewers want to see more of |
|
|
|
|
|
4. **Next Video Ideas** |
|
|
- Topics or series requested by viewers |
|
|
- Follow-up content suggestions related to this video |
|
|
|
|
|
## 📈 Channel Growth Insights |
|
|
5. **Subscriber Conversion Analysis** |
|
|
- Elements that influenced subscription decisions |
|
|
- Warning signs of potential viewer churn |
|
|
|
|
|
6. **Community Engagement** |
|
|
- Factors that increase comment participation |
|
|
- Viewer interaction patterns |
|
|
|
|
|
Please provide specific, actionable recommendations with sentiment analysis including specific percentages (e.g., "60% positive, 25% neutral, 15% negative"). |
|
|
""" |
|
|
|
|
|
response = self.client.messages.create( |
|
|
model=self.model, |
|
|
max_tokens=self.max_tokens, |
|
|
messages=[{"role": "user", "content": prompt}] |
|
|
) |
|
|
|
|
|
all_analyses.append(response.content[0].text) |
|
|
await asyncio.sleep(0.2) |
|
|
|
|
|
|
|
|
final_analysis = "\n\n".join(all_analyses) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"data": final_analysis, |
|
|
"metadata": { |
|
|
"total_comments": len(comments), |
|
|
"video_title": video_title, |
|
|
"channel_name": channel_name |
|
|
} |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Analysis error: {str(e)}") |
|
|
return { |
|
|
"success": False, |
|
|
"error": str(e), |
|
|
"data": None |
|
|
} |
|
|
|
|
|
async def analyze_multi_video_comments( |
|
|
self, |
|
|
comments: List[str], |
|
|
videos_info: List[Dict], |
|
|
channel_name: str = "Unknown Channel" |
|
|
) -> Dict[str, Any]: |
|
|
"""Analyze comments for multiple videos""" |
|
|
try: |
|
|
|
|
|
videos_context = "" |
|
|
for i, video_info in enumerate(videos_info, 1): |
|
|
videos_context += f"{i}. '{video_info['title']}' (Views: {video_info.get('view_count', 'N/A')}, Comments: {video_info['collected_comments_count']})\n" |
|
|
|
|
|
|
|
|
batch_size = 75 |
|
|
all_analyses = [] |
|
|
|
|
|
for i in range(0, len(comments), batch_size): |
|
|
batch_comments = comments[i:i + batch_size] |
|
|
|
|
|
comments_text = "\n".join([f"Comment {j+1}: {comment}" |
|
|
for j, comment in enumerate(batch_comments)]) |
|
|
|
|
|
prompt = f""" |
|
|
You are a professional YouTube Channel Strategy Consultant. Analyze comments collected from multiple videos of channel "{channel_name}" and provide strategic insights for channel growth and revenue increase. |
|
|
|
|
|
📺 **Videos Analyzed:** |
|
|
{videos_context} |
|
|
|
|
|
💬 **Collected Comments:** |
|
|
{comments_text} |
|
|
|
|
|
Please analyze from the following channel strategy perspectives: |
|
|
|
|
|
## 🎯 Channel Identity & Branding Analysis |
|
|
1. **Core Channel Values & Identity** |
|
|
- How viewers categorize this channel |
|
|
- Unique characteristics or differentiation points |
|
|
- Creator personal brand perception |
|
|
|
|
|
2. **Brand Consistency & Recognition** |
|
|
- Consistency in tone and manner across videos |
|
|
- Content style expectations from viewers |
|
|
|
|
|
## 📊 Audience Analysis & Targeting |
|
|
3. **Viewer Demographics Analysis** |
|
|
- Main viewer characteristics (inferred from comment tone) |
|
|
- Characteristics of loyal core fanbase |
|
|
- Differences between new and existing subscribers |
|
|
|
|
|
4. **Community Culture & Engagement** |
|
|
- Comment participation patterns and communication styles |
|
|
- Level of fanbase culture development |
|
|
|
|
|
## 🚀 Content Strategy & Growth Opportunities |
|
|
5. **Content Performance Patterns** |
|
|
- Best-performing content types |
|
|
- High-engagement elements |
|
|
|
|
|
6. **Expansion Possibilities & New Content Opportunities** |
|
|
- New content directions requested by viewers |
|
|
- Potential for collaboration opportunities |
|
|
|
|
|
## 💰 Monetization & Business Opportunities |
|
|
7. **Commercial Potential Assessment** |
|
|
- Viewer acceptance of sponsorships |
|
|
- Potential products or services for commercialization |
|
|
|
|
|
8. **Competitive Analysis & Market Positioning** |
|
|
- Differentiation factors vs competing channels |
|
|
- Market position and growth potential |
|
|
|
|
|
Please provide specific, actionable insights with sentiment analysis including percentages. |
|
|
""" |
|
|
|
|
|
response = self.client.messages.create( |
|
|
model=self.model, |
|
|
max_tokens=self.max_tokens, |
|
|
messages=[{"role": "user", "content": prompt}] |
|
|
) |
|
|
|
|
|
all_analyses.append(response.content[0].text) |
|
|
await asyncio.sleep(0.2) |
|
|
|
|
|
|
|
|
final_analysis = "\n\n".join(all_analyses) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"data": final_analysis, |
|
|
"metadata": { |
|
|
"total_comments": len(comments), |
|
|
"videos_count": len(videos_info), |
|
|
"channel_name": channel_name |
|
|
} |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Multi-video analysis error: {str(e)}") |
|
|
return { |
|
|
"success": False, |
|
|
"error": str(e), |
|
|
"data": None |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ComprehensiveYouTubeAnalyzer: |
|
|
def __init__(self): |
|
|
self.logger = logger |
|
|
self.youtube_client = None |
|
|
self.claude_analyzer = None |
|
|
|
|
|
|
|
|
self._current_comments = [] |
|
|
self._current_video_title = "" |
|
|
self._current_channel_name = "" |
|
|
|
|
|
self._multi_comments = [] |
|
|
self._multi_videos_info = [] |
|
|
self._multi_channel_info = {} |
|
|
|
|
|
self._shorts_comments = [] |
|
|
self._shorts_videos_info = [] |
|
|
self._shorts_channel_info = {} |
|
|
|
|
|
self._latest_analysis = None |
|
|
|
|
|
|
|
|
self._init_clients() |
|
|
|
|
|
def _init_clients(self): |
|
|
"""Initialize API clients""" |
|
|
try: |
|
|
|
|
|
youtube_key = os.getenv('YOUTUBE_API_KEY') |
|
|
anthropic_key = os.getenv('ANTHROPIC_API_KEY') |
|
|
|
|
|
print(f"🔑 YouTube API Key: {'✅ 있음' if youtube_key else '❌ 없음'}") |
|
|
print(f"🔑 Anthropic API Key: {'✅ 있음' if anthropic_key else '❌ 없음'}") |
|
|
|
|
|
if not youtube_key: |
|
|
raise ValueError("YOUTUBE_API_KEY not found in environment variables") |
|
|
if not anthropic_key: |
|
|
raise ValueError("ANTHROPIC_API_KEY not found in environment variables") |
|
|
|
|
|
|
|
|
self.youtube_client = YouTubeClient() |
|
|
self.claude_analyzer = ClaudeAnalyzer() |
|
|
|
|
|
print("✅ 모든 클라이언트 초기화 성공") |
|
|
self.logger.info("✅ All clients initialized successfully") |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"❌ Client initialization error: {str(e)}") |
|
|
print(f"❌ 초기화 오류: {str(e)}") |
|
|
|
|
|
self.youtube_client = None |
|
|
self.claude_analyzer = None |
|
|
|
|
|
def _check_clients(self): |
|
|
"""Check if clients are properly initialized""" |
|
|
if not self.youtube_client: |
|
|
return "❌ **YouTube API client not initialized**\n\nPlease check if YOUTUBE_API_KEY is properly set in the environment." |
|
|
if not self.claude_analyzer: |
|
|
return "❌ **Claude AI client not initialized**\n\nPlease check if ANTHROPIC_API_KEY is properly set in the environment." |
|
|
return None |
|
|
|
|
|
def _extract_channel_id(self, selected_channel: str) -> Optional[str]: |
|
|
"""Extract channel ID from dropdown selection""" |
|
|
if not selected_channel: |
|
|
return None |
|
|
match = re.search(r'\(([^)]+)\)$', selected_channel) |
|
|
return match.group(1) if match else None |
|
|
|
|
|
def _extract_video_id(self, selected_video: str) -> Optional[str]: |
|
|
"""Extract video ID from dropdown selection""" |
|
|
if not selected_video: |
|
|
return None |
|
|
match = re.search(r'\(([^)]+)\)$', selected_video) |
|
|
return match.group(1) if match else None |
|
|
|
|
|
def search_channels(self, creator_name: str): |
|
|
"""Search for YouTube channels - supports both name and channel ID""" |
|
|
error_check = self._check_clients() |
|
|
if error_check: |
|
|
return error_check, gr.update(choices=[]) |
|
|
|
|
|
if not creator_name.strip(): |
|
|
return "❌ Please enter a creator name or channel ID.", gr.update(choices=[]) |
|
|
|
|
|
try: |
|
|
self.logger.info(f"Searching for channels: {creator_name}") |
|
|
|
|
|
|
|
|
channel_id_pattern = r'^UC[a-zA-Z0-9_-]{22}$' |
|
|
|
|
|
if re.match(channel_id_pattern, creator_name.strip()): |
|
|
|
|
|
channel_id = creator_name.strip() |
|
|
channels_response = self.youtube_client.youtube.channels().list( |
|
|
part='snippet,statistics', |
|
|
id=channel_id |
|
|
).execute() |
|
|
|
|
|
if channels_response.get('items'): |
|
|
channel_item = channels_response['items'][0] |
|
|
snippet = channel_item['snippet'] |
|
|
statistics = channel_item.get('statistics', {}) |
|
|
|
|
|
channel = YouTubeChannel( |
|
|
id=channel_item['id'], |
|
|
title=snippet['title'], |
|
|
description=snippet['description'], |
|
|
subscriber_count=int(statistics.get('subscriberCount', 0)), |
|
|
video_count=int(statistics.get('videoCount', 0)), |
|
|
view_count=int(statistics.get('viewCount', 0)), |
|
|
thumbnail_url=snippet['thumbnails']['default']['url'], |
|
|
custom_url=snippet.get('customUrl') |
|
|
) |
|
|
channels = [channel] |
|
|
else: |
|
|
return f"❌ Channel not found with ID '{channel_id}'.", gr.update(choices=[]) |
|
|
else: |
|
|
|
|
|
channels = self.youtube_client.search_channels(creator_name, max_results=5) |
|
|
|
|
|
if channels: |
|
|
result_text = f"✅ **Found {len(channels)} channels for '{creator_name}'**\n\n" |
|
|
options = [] |
|
|
|
|
|
for i, channel in enumerate(channels, 1): |
|
|
result_text += f"**{i}. {channel.title}**\n" |
|
|
result_text += f"- Subscribers: {channel.subscriber_count or 'Hidden'}\n" |
|
|
result_text += f"- Videos: {channel.video_count or 'N/A'}\n\n" |
|
|
|
|
|
options.append(f"{channel.title} ({channel.id})") |
|
|
|
|
|
return result_text, gr.update(choices=options, value=options[0] if options else None) |
|
|
else: |
|
|
return f"❌ No channels found for '{creator_name}'", gr.update(choices=[]) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Search error: {str(e)}") |
|
|
return f"❌ Search error: {str(e)}", gr.update(choices=[]) |
|
|
|
|
|
def get_videos(self, selected_channel: str, max_videos: int): |
|
|
"""Get video list from selected channel""" |
|
|
error_check = self._check_clients() |
|
|
if error_check: |
|
|
return error_check, gr.update(choices=[]) |
|
|
|
|
|
channel_id = self._extract_channel_id(selected_channel) |
|
|
if not channel_id: |
|
|
return "❌ Please select a channel first.", gr.update(choices=[]) |
|
|
|
|
|
try: |
|
|
self.logger.info(f"Getting videos for channel: {channel_id}") |
|
|
videos = self.youtube_client.get_channel_videos(channel_id, max_results=max_videos) |
|
|
|
|
|
if videos: |
|
|
result_text = f"✅ **Found {len(videos)} recent videos**\n\n" |
|
|
options = [] |
|
|
|
|
|
for i, video in enumerate(videos, 1): |
|
|
result_text += f"**{i}. {video.title}**\n" |
|
|
result_text += f"- Views: {video.view_count or 'N/A'}\n" |
|
|
result_text += f"- Comments: {video.comment_count or 'N/A'}\n" |
|
|
result_text += f"- Duration: {video.duration or 'N/A'}\n\n" |
|
|
|
|
|
options.append(f"{video.title} ({video.id})") |
|
|
|
|
|
return result_text, gr.update(choices=options, value=options[0] if options else None) |
|
|
else: |
|
|
return "❌ No videos found for this channel.", gr.update(choices=[]) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Video retrieval error: {str(e)}") |
|
|
return f"❌ Error retrieving videos: {str(e)}", gr.update(choices=[]) |
|
|
|
|
|
def collect_single_comments(self, selected_video: str, max_comments: int): |
|
|
"""Collect comments for single video analysis""" |
|
|
error_check = self._check_clients() |
|
|
if error_check: |
|
|
return error_check |
|
|
|
|
|
video_id = self._extract_video_id(selected_video) |
|
|
if not video_id: |
|
|
return "❌ Please select a video first." |
|
|
|
|
|
try: |
|
|
self.logger.info(f"Collecting comments for video: {video_id}") |
|
|
comments = self.youtube_client.get_video_comments( |
|
|
video_id, |
|
|
max_results=max_comments, |
|
|
order="relevance" |
|
|
) |
|
|
|
|
|
if comments: |
|
|
self._current_comments = [comment.text for comment in comments] |
|
|
self._current_video_title = selected_video.split(' (')[0] |
|
|
|
|
|
result_text = f"✅ **Successfully collected {len(comments)} comments**\n\n" |
|
|
result_text += "**Comment Preview:**\n\n" |
|
|
|
|
|
for i, comment in enumerate(comments[:3], 1): |
|
|
preview_text = comment.text[:100] + "..." if len(comment.text) > 100 else comment.text |
|
|
result_text += f"{i}. **{comment.author_name}**: {preview_text}\n\n" |
|
|
|
|
|
result_text += f"📊 **Ready for analysis!** Click 'Claude AI Analysis' button below." |
|
|
return result_text |
|
|
else: |
|
|
return "❌ No comments found. This video might have comments disabled." |
|
|
|
|
|
except Exception as e: |
|
|
if "commentsDisabled" in str(e): |
|
|
return "❌ **Comments are disabled** for this video. Please select another video." |
|
|
return f"❌ Error collecting comments: {str(e)}" |
|
|
|
|
|
def collect_multi_comments(self, selected_channel: str, num_videos: int, comments_per_video: int): |
|
|
"""Collect multi-video comments""" |
|
|
error_check = self._check_clients() |
|
|
if error_check: |
|
|
return error_check |
|
|
|
|
|
channel_id = self._extract_channel_id(selected_channel) |
|
|
if not channel_id: |
|
|
return "❌ Please select a channel." |
|
|
|
|
|
try: |
|
|
self._multi_channel_info = { |
|
|
'name': selected_channel.split(' (')[0], |
|
|
'id': channel_id |
|
|
} |
|
|
|
|
|
videos = self.youtube_client.get_channel_videos(channel_id, max_results=num_videos) |
|
|
|
|
|
if not videos: |
|
|
return "❌ No videos found." |
|
|
|
|
|
return self._process_video_comments(videos, comments_per_video, "multi", "Multi-Video") |
|
|
|
|
|
except Exception as e: |
|
|
return f"❌ Multi-comment collection error: {str(e)}" |
|
|
|
|
|
def collect_shorts_comments(self, selected_channel: str, num_shorts: int, comments_per_short: int): |
|
|
"""Collect YouTube Shorts comments (fallback to regular videos)""" |
|
|
error_check = self._check_clients() |
|
|
if error_check: |
|
|
return error_check |
|
|
|
|
|
channel_id = self._extract_channel_id(selected_channel) |
|
|
if not channel_id: |
|
|
return "❌ Please select a channel." |
|
|
|
|
|
try: |
|
|
self._shorts_channel_info = { |
|
|
'name': selected_channel.split(' (')[0], |
|
|
'id': channel_id |
|
|
} |
|
|
|
|
|
|
|
|
videos = self.youtube_client.get_channel_videos(channel_id, max_results=num_shorts * 2) |
|
|
|
|
|
|
|
|
shorts_candidates = [] |
|
|
for video in videos: |
|
|
if video.duration and ':' in video.duration: |
|
|
duration_parts = video.duration.split(':') |
|
|
if len(duration_parts) == 2: |
|
|
minutes = int(duration_parts[0]) |
|
|
if minutes <= 1: |
|
|
shorts_candidates.append(video) |
|
|
if len(shorts_candidates) >= num_shorts: |
|
|
break |
|
|
|
|
|
if not shorts_candidates: |
|
|
|
|
|
shorts_candidates = videos[:num_shorts] |
|
|
|
|
|
if not shorts_candidates: |
|
|
return "❌ No suitable content found for Shorts analysis." |
|
|
|
|
|
return self._process_video_comments(shorts_candidates, comments_per_short, "shorts", "Shorts") |
|
|
|
|
|
except Exception as e: |
|
|
return f"❌ Shorts comment collection error: {str(e)}" |
|
|
|
|
|
def _process_video_comments(self, videos: List, comments_per_video: int, content_type: str, display_name: str): |
|
|
"""Generic method to process video comments""" |
|
|
channel_info = self._multi_channel_info if content_type == "multi" else self._shorts_channel_info |
|
|
|
|
|
result_text = f"✅ **{display_name} comment collection started**\n\n" |
|
|
result_text += f"Channel: **{channel_info['name']}**\n" |
|
|
result_text += f"Target {display_name}: **{len(videos)} videos**\n" |
|
|
result_text += f"Comments per Video: **Max {comments_per_video}** (Most Popular)\n\n" |
|
|
|
|
|
all_comments = [] |
|
|
videos_info = [] |
|
|
successful_videos = 0 |
|
|
|
|
|
for i, video in enumerate(videos, 1): |
|
|
try: |
|
|
title_preview = video.title[:40] if content_type == "shorts" else video.title[:50] |
|
|
result_text += f"**{i}/{len(videos)}** Processing: {title_preview}...\n" |
|
|
|
|
|
comments = self.youtube_client.get_video_comments( |
|
|
video.id, max_results=comments_per_video, order="relevance" |
|
|
) |
|
|
|
|
|
if comments: |
|
|
comment_texts = [comment.text for comment in comments] |
|
|
all_comments.extend(comment_texts) |
|
|
|
|
|
video_info = { |
|
|
'title': video.title, |
|
|
'id': video.id, |
|
|
'collected_comments_count': len(comments), |
|
|
'actual_comment_count': video.comment_count, |
|
|
'view_count': video.view_count or 0, |
|
|
'like_count': video.like_count or 0, |
|
|
'published_at': video.published_at, |
|
|
'duration': video.duration |
|
|
} |
|
|
|
|
|
if content_type == "shorts": |
|
|
video_info['is_shorts'] = True |
|
|
|
|
|
videos_info.append(video_info) |
|
|
successful_videos += 1 |
|
|
|
|
|
duration_info = f" (Duration: {video.duration})" if content_type == "shorts" else "" |
|
|
result_text += f" ✅ {len(comments)} comments collected{duration_info}\n" |
|
|
else: |
|
|
result_text += f" ⚠️ No comments (disabled or restricted)\n" |
|
|
|
|
|
except Exception as e: |
|
|
result_text += f" ❌ Error: {str(e)[:50]}...\n" |
|
|
continue |
|
|
|
|
|
|
|
|
if content_type == "multi": |
|
|
self._multi_comments = all_comments |
|
|
self._multi_videos_info = videos_info |
|
|
else: |
|
|
self._shorts_comments = all_comments |
|
|
self._shorts_videos_info = videos_info |
|
|
|
|
|
result_text += f"\n📊 **{display_name} Collection Complete!**\n" |
|
|
result_text += f"- Successful Videos: **{successful_videos}**\n" |
|
|
result_text += f"- Total Comments: **{len(all_comments)}**\n" |
|
|
result_text += f"- Sort Order: **Most Popular**\n" |
|
|
result_text += f"- Average Comments per Video: **{len(all_comments) // max(successful_videos, 1)}**\n\n" |
|
|
|
|
|
button_text = "Multi-Video Analysis" if content_type == "multi" else "Shorts Analysis" |
|
|
result_text += f"🤖 **Click '{button_text}' button below.**" |
|
|
|
|
|
return result_text |
|
|
|
|
|
|
|
|
def analyze_single_comments(self): |
|
|
"""Single video comment analysis""" |
|
|
return asyncio.run(self._analyze_single_comments_async()) |
|
|
|
|
|
def analyze_multi_comments(self): |
|
|
"""Multi-video comment analysis""" |
|
|
return asyncio.run(self._analyze_multi_comments_async()) |
|
|
|
|
|
def analyze_shorts_comments(self): |
|
|
"""Shorts comment analysis""" |
|
|
return asyncio.run(self._analyze_shorts_comments_async()) |
|
|
|
|
|
async def _analyze_single_comments_async(self): |
|
|
"""Single video analysis implementation""" |
|
|
error_check = self._check_clients() |
|
|
if error_check: |
|
|
return error_check, None, None, None, None |
|
|
|
|
|
if not self._current_comments: |
|
|
return "❌ No comments to analyze. Please collect comments first.", None, None, None, None |
|
|
|
|
|
try: |
|
|
self.logger.info(f"Starting analysis of {len(self._current_comments)} comments") |
|
|
|
|
|
result = await self.claude_analyzer.analyze_comments( |
|
|
comments=self._current_comments, |
|
|
video_title=self._current_video_title, |
|
|
channel_name=self._current_channel_name or "Selected Channel" |
|
|
) |
|
|
|
|
|
if result.get("success", False): |
|
|
self._latest_analysis = { |
|
|
'type': 'single_video', |
|
|
'video_title': self._current_video_title, |
|
|
'comments_count': len(self._current_comments), |
|
|
'analysis_text': result['data'], |
|
|
'timestamp': datetime.now() |
|
|
} |
|
|
|
|
|
|
|
|
charts = self._create_single_video_charts() |
|
|
wordcloud_html = self._create_wordcloud(self._current_comments, "Video Keywords") |
|
|
|
|
|
formatted_result = f"""## 🎯 Single Video Analysis Results |
|
|
|
|
|
### 📊 Analysis Overview |
|
|
- **Video**: {self._current_video_title} |
|
|
- **Comments Analyzed**: {len(self._current_comments)} |
|
|
- **Analysis Time**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} |
|
|
|
|
|
--- |
|
|
|
|
|
{result['data']} |
|
|
|
|
|
--- |
|
|
|
|
|
### 📈 Generated Visualizations |
|
|
Charts and WordCloud are displayed in the sections below. |
|
|
""" |
|
|
|
|
|
return ( |
|
|
formatted_result, |
|
|
charts.get('sentiment'), |
|
|
charts.get('participation'), |
|
|
charts.get('reactions'), |
|
|
wordcloud_html |
|
|
) |
|
|
else: |
|
|
error_msg = f"❌ Analysis failed: {result.get('error', 'Unknown error')}" |
|
|
return error_msg, None, None, None, None |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Analysis error: {str(e)}") |
|
|
error_msg = f"❌ Analysis error: {str(e)}" |
|
|
return error_msg, None, None, None, None |
|
|
|
|
|
async def _analyze_multi_comments_async(self): |
|
|
"""Multi-video analysis implementation""" |
|
|
error_check = self._check_clients() |
|
|
if error_check: |
|
|
return error_check, None, None, None, None, None |
|
|
|
|
|
if not self._multi_comments: |
|
|
return "❌ No comments to analyze. Please collect multi-video comments first.", None, None, None, None, None |
|
|
|
|
|
try: |
|
|
result = await self.claude_analyzer.analyze_multi_video_comments( |
|
|
comments=self._multi_comments, |
|
|
videos_info=self._multi_videos_info, |
|
|
channel_name=self._multi_channel_info['name'] |
|
|
) |
|
|
|
|
|
if result.get("success", False): |
|
|
self._latest_analysis = { |
|
|
'type': 'multi_video', |
|
|
'channel_name': self._multi_channel_info['name'], |
|
|
'videos_count': len(self._multi_videos_info), |
|
|
'total_comments': len(self._multi_comments), |
|
|
'videos_info': self._multi_videos_info, |
|
|
'analysis_text': result['data'], |
|
|
'timestamp': datetime.now() |
|
|
} |
|
|
|
|
|
charts = self._create_multi_video_charts() |
|
|
wordcloud_html = self._create_wordcloud(self._multi_comments, "Channel Keywords") |
|
|
|
|
|
formatted_result = f"""## 🎯 Multi-Video Analysis Results |
|
|
|
|
|
### 📊 Analysis Overview |
|
|
- **Channel**: {self._multi_channel_info['name']} |
|
|
- **Videos Analyzed**: {len(self._multi_videos_info)} |
|
|
- **Total Comments**: {len(self._multi_comments)} |
|
|
- **Analysis Time**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} |
|
|
|
|
|
### 🎬 Video List |
|
|
""" |
|
|
for i, video_info in enumerate(self._multi_videos_info, 1): |
|
|
formatted_result += f"{i}. **{video_info['title']}** ({video_info['collected_comments_count']} comments)\n" |
|
|
|
|
|
formatted_result += f""" |
|
|
|
|
|
--- |
|
|
|
|
|
{result['data']} |
|
|
|
|
|
--- |
|
|
|
|
|
### 📈 Comprehensive Channel Analytics |
|
|
Charts and WordCloud generated based on multi-video analysis results. |
|
|
""" |
|
|
|
|
|
return ( |
|
|
formatted_result, |
|
|
charts.get('sentiment'), |
|
|
charts.get('participation'), |
|
|
charts.get('video_interest_trends'), |
|
|
charts.get('competitive'), |
|
|
wordcloud_html |
|
|
) |
|
|
else: |
|
|
error_msg = f"❌ Analysis failed: {result.get('error', 'Unknown error')}" |
|
|
return error_msg, None, None, None, None, None |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Multi-video analysis error: {str(e)}") |
|
|
error_msg = f"❌ Analysis error: {str(e)}" |
|
|
return error_msg, None, None, None, None, None |
|
|
|
|
|
async def _analyze_shorts_comments_async(self): |
|
|
"""Shorts analysis implementation""" |
|
|
error_check = self._check_clients() |
|
|
if error_check: |
|
|
return error_check, None, None, None, None, None |
|
|
|
|
|
if not self._shorts_comments: |
|
|
return "❌ No Shorts comments to analyze. Please collect Shorts comments first.", None, None, None, None, None |
|
|
|
|
|
try: |
|
|
|
|
|
result = await self.claude_analyzer.analyze_multi_video_comments( |
|
|
comments=self._shorts_comments, |
|
|
videos_info=self._shorts_videos_info, |
|
|
channel_name=self._shorts_channel_info['name'] |
|
|
) |
|
|
|
|
|
if result.get("success", False): |
|
|
self._latest_analysis = { |
|
|
'type': 'shorts_analysis', |
|
|
'channel_name': self._shorts_channel_info['name'], |
|
|
'shorts_count': len(self._shorts_videos_info), |
|
|
'total_comments': len(self._shorts_comments), |
|
|
'shorts_info': self._shorts_videos_info, |
|
|
'analysis_text': result['data'], |
|
|
'timestamp': datetime.now() |
|
|
} |
|
|
|
|
|
|
|
|
shorts_characteristics = self._analyze_shorts_characteristics() |
|
|
|
|
|
charts = self._create_shorts_charts() |
|
|
wordcloud_html = self._create_wordcloud(self._shorts_comments, "Shorts Keywords") |
|
|
|
|
|
|
|
|
comparison_text = "" |
|
|
if self._multi_comments: |
|
|
comparison_text = f"\n\n---\n\n## 🔄 Format Comparison\n\n{self._compare_shorts_vs_regular()}" |
|
|
|
|
|
formatted_result = f"""## 🎬 YouTube Shorts Analysis Results |
|
|
|
|
|
### 📊 Analysis Overview |
|
|
- **Channel**: {self._shorts_channel_info['name']} |
|
|
- **Shorts Analyzed**: {len(self._shorts_videos_info)} |
|
|
- **Total Comments**: {len(self._shorts_comments)} |
|
|
- **Analysis Time**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} |
|
|
|
|
|
### 🎬 Shorts List |
|
|
""" |
|
|
for i, short_info in enumerate(self._shorts_videos_info, 1): |
|
|
formatted_result += f"{i}. **{short_info['title']}** ({short_info['collected_comments_count']} comments, {short_info.get('duration', 'N/A')})\n" |
|
|
|
|
|
formatted_result += f""" |
|
|
|
|
|
--- |
|
|
|
|
|
### 🤖 Claude AI Analysis |
|
|
{result['data']} |
|
|
|
|
|
--- |
|
|
|
|
|
{shorts_characteristics} |
|
|
|
|
|
{comparison_text} |
|
|
|
|
|
--- |
|
|
|
|
|
### 📈 Shorts Analytics |
|
|
Charts and WordCloud generated based on Shorts analysis results. |
|
|
""" |
|
|
|
|
|
return ( |
|
|
formatted_result, |
|
|
charts.get('optimization_score'), |
|
|
charts.get('engagement_types'), |
|
|
charts.get('participation'), |
|
|
charts.get('sentiment'), |
|
|
wordcloud_html |
|
|
) |
|
|
else: |
|
|
error_msg = f"❌ Shorts analysis failed: {result.get('error', 'Unknown error')}" |
|
|
return error_msg, None, None, None, None, None |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Shorts analysis error: {str(e)}") |
|
|
error_msg = f"❌ Analysis error: {str(e)}" |
|
|
return error_msg, None, None, None, None, None |
|
|
|
|
|
def _analyze_shorts_characteristics(self) -> str: |
|
|
"""Basic Shorts characteristics analysis""" |
|
|
try: |
|
|
if not self._shorts_comments: |
|
|
return "❌ No Shorts data available for characteristics analysis." |
|
|
|
|
|
|
|
|
result_text = "## 🎬 Shorts Characteristics Analysis\n\n" |
|
|
|
|
|
|
|
|
avg_length = sum(len(comment) for comment in self._shorts_comments) / len(self._shorts_comments) |
|
|
short_comments = sum(1 for comment in self._shorts_comments if len(comment) <= 30) |
|
|
emoji_comments = sum(1 for comment in self._shorts_comments if re.search(r'[😀-🙏]', comment)) |
|
|
|
|
|
short_ratio = (short_comments / len(self._shorts_comments)) * 100 |
|
|
emoji_ratio = (emoji_comments / len(self._shorts_comments)) * 100 |
|
|
|
|
|
|
|
|
optimization_score = (short_ratio * 0.4) + (emoji_ratio * 0.3) + min((50 / max(avg_length, 1)) * 30, 30) |
|
|
|
|
|
result_text += f"**Shorts Optimization Score**: {optimization_score:.1f}/100\n\n" |
|
|
result_text += f"### 📝 Comment Patterns\n" |
|
|
result_text += f"- Average Length: {avg_length:.1f} characters\n" |
|
|
result_text += f"- Short Comments: {short_ratio:.1f}%\n" |
|
|
result_text += f"- Emoji Usage: {emoji_ratio:.1f}%\n\n" |
|
|
|
|
|
if optimization_score >= 70: |
|
|
result_text += "🎬 **Highly optimized for Shorts format** - Content shows strong Shorts characteristics\n" |
|
|
elif optimization_score >= 40: |
|
|
result_text += "📱 **Moderately suitable for Shorts** - Good potential with optimization\n" |
|
|
else: |
|
|
result_text += "📺 **Better suited for regular videos** - Consider adapting content for Shorts format\n" |
|
|
|
|
|
return result_text |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Shorts characteristics analysis error: {str(e)}") |
|
|
return f"❌ Error analyzing Shorts characteristics: {str(e)}" |
|
|
|
|
|
def _compare_shorts_vs_regular(self) -> str: |
|
|
"""Basic comparison between Shorts and regular videos""" |
|
|
try: |
|
|
if not self._multi_comments or not self._shorts_comments: |
|
|
return "❌ Need both regular video and Shorts data for comparison." |
|
|
|
|
|
result_text = "## 📊 Shorts vs Regular Videos Comparison\n\n" |
|
|
|
|
|
|
|
|
regular_avg_length = sum(len(comment) for comment in self._multi_comments) / len(self._multi_comments) |
|
|
shorts_avg_length = sum(len(comment) for comment in self._shorts_comments) / len(self._shorts_comments) |
|
|
|
|
|
regular_emoji = sum(1 for comment in self._multi_comments if re.search(r'[😀-🙏]', comment)) |
|
|
shorts_emoji = sum(1 for comment in self._shorts_comments if re.search(r'[😀-🙏]', comment)) |
|
|
|
|
|
regular_emoji_ratio = (regular_emoji / len(self._multi_comments)) * 100 |
|
|
shorts_emoji_ratio = (shorts_emoji / len(self._shorts_comments)) * 100 |
|
|
|
|
|
result_text += "### 👥 Audience Behavior\n" |
|
|
result_text += f"- Regular Video Comments: {regular_avg_length:.1f} chars avg\n" |
|
|
result_text += f"- Shorts Comments: {shorts_avg_length:.1f} chars avg\n" |
|
|
result_text += f"- Regular Emoji Usage: {regular_emoji_ratio:.1f}%\n" |
|
|
result_text += f"- Shorts Emoji Usage: {shorts_emoji_ratio:.1f}%\n\n" |
|
|
|
|
|
|
|
|
result_text += "### 💡 Recommendations\n" |
|
|
if shorts_avg_length < regular_avg_length: |
|
|
result_text += "- Shorts generate more concise, immediate reactions\n" |
|
|
if shorts_emoji_ratio > regular_emoji_ratio: |
|
|
result_text += "- Shorts audience uses more visual expressions (emojis)\n" |
|
|
|
|
|
return result_text |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Comparison analysis error: {str(e)}") |
|
|
return f"❌ Error comparing formats: {str(e)}" |
|
|
|
|
|
|
|
|
def _create_single_video_charts(self) -> Dict: |
|
|
"""Create charts for single video analysis""" |
|
|
charts = {} |
|
|
|
|
|
try: |
|
|
|
|
|
sentiment_data = self._extract_sentiment_from_analysis() |
|
|
charts['sentiment'] = px.pie( |
|
|
values=list(sentiment_data.values()), |
|
|
names=list(sentiment_data.keys()), |
|
|
title="💭 Sentiment Distribution", |
|
|
color_discrete_sequence=['#28a745', '#dc3545', '#6c757d'] |
|
|
) |
|
|
|
|
|
|
|
|
if self._current_comments: |
|
|
participation_analysis = self._analyze_viewer_participation(self._current_comments) |
|
|
if participation_analysis: |
|
|
participation_types = list(participation_analysis.keys()) |
|
|
participation_counts = list(participation_analysis.values()) |
|
|
|
|
|
charts['participation'] = px.bar( |
|
|
x=participation_counts, |
|
|
y=participation_types, |
|
|
orientation='h', |
|
|
title="🎯 Viewer Participation Analysis", |
|
|
labels={'x': 'Comment Count', 'y': 'Participation Type'}, |
|
|
color=participation_counts, |
|
|
color_continuous_scale='viridis' |
|
|
) |
|
|
charts['participation'].update_layout(yaxis={'categoryorder': 'total ascending'}) |
|
|
|
|
|
|
|
|
reaction_analysis = self._analyze_viewer_reactions(self._current_comments) |
|
|
if reaction_analysis: |
|
|
reaction_types = list(reaction_analysis.keys()) |
|
|
reaction_counts = list(reaction_analysis.values()) |
|
|
|
|
|
charts['reactions'] = px.bar( |
|
|
x=reaction_counts, |
|
|
y=reaction_types, |
|
|
orientation='h', |
|
|
title="🎭 Viewer Response Types", |
|
|
labels={'x': 'Mention Count', 'y': 'Response Type'}, |
|
|
color=reaction_counts, |
|
|
color_continuous_scale='Blues' |
|
|
) |
|
|
charts['reactions'].update_layout(yaxis={'categoryorder': 'total ascending'}) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Chart creation error: {str(e)}") |
|
|
|
|
|
return charts |
|
|
|
|
|
def _create_multi_video_charts(self) -> Dict: |
|
|
"""Create charts for multi-video analysis""" |
|
|
charts = {} |
|
|
|
|
|
try: |
|
|
|
|
|
sentiment_data = self._extract_sentiment_from_analysis() |
|
|
charts['sentiment'] = px.pie( |
|
|
values=list(sentiment_data.values()), |
|
|
names=list(sentiment_data.keys()), |
|
|
title="📊 Channel Sentiment Analysis", |
|
|
color_discrete_sequence=['#2E8B57', '#DC143C', '#4682B4'] |
|
|
) |
|
|
|
|
|
|
|
|
if self._multi_comments: |
|
|
participation_analysis = self._analyze_viewer_participation(self._multi_comments) |
|
|
if participation_analysis: |
|
|
participation_types = list(participation_analysis.keys()) |
|
|
participation_counts = list(participation_analysis.values()) |
|
|
|
|
|
charts['participation'] = px.bar( |
|
|
x=participation_counts, |
|
|
y=participation_types, |
|
|
orientation='h', |
|
|
title="🎯 Channel Participation Analysis", |
|
|
labels={'x': 'Comment Count', 'y': 'Participation Type'}, |
|
|
color=participation_counts, |
|
|
color_continuous_scale='plasma' |
|
|
) |
|
|
charts['participation'].update_layout(yaxis={'categoryorder': 'total ascending'}) |
|
|
|
|
|
|
|
|
if self._multi_videos_info: |
|
|
video_titles = [info['title'][:30] + '...' if len(info['title']) > 30 else info['title'] |
|
|
for info in self._multi_videos_info] |
|
|
total_comment_counts = [info.get('actual_comment_count', 0) for info in self._multi_videos_info] |
|
|
|
|
|
charts['video_interest_trends'] = px.bar( |
|
|
x=video_titles, |
|
|
y=total_comment_counts, |
|
|
title="📊 Viewer Interest Trends by Video", |
|
|
labels={'x': 'Video', 'y': 'Total Comments'}, |
|
|
color=total_comment_counts, |
|
|
color_continuous_scale='viridis' |
|
|
) |
|
|
charts['video_interest_trends'].update_layout(xaxis_tickangle=45) |
|
|
|
|
|
|
|
|
if self._multi_comments: |
|
|
competitive_analysis = self._analyze_competitive_advantage(self._multi_comments) |
|
|
if competitive_analysis: |
|
|
comp_types = list(competitive_analysis.keys()) |
|
|
comp_counts = list(competitive_analysis.values()) |
|
|
|
|
|
charts['competitive'] = px.pie( |
|
|
values=comp_counts, |
|
|
names=comp_types, |
|
|
title="🏆 Channel Competitive Edge", |
|
|
color_discrete_sequence=['#ff9999', '#66b3ff', '#99ff99', '#ffcc99', '#ff99cc'] |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Multi-video chart creation error: {str(e)}") |
|
|
|
|
|
return charts |
|
|
|
|
|
def _create_shorts_charts(self) -> Dict: |
|
|
"""Create charts for Shorts analysis""" |
|
|
charts = {} |
|
|
|
|
|
try: |
|
|
if self._shorts_videos_info and self._shorts_comments: |
|
|
|
|
|
optimization_analysis = self._analyze_shorts_optimization() |
|
|
if optimization_analysis: |
|
|
categories = list(optimization_analysis.keys()) |
|
|
scores = list(optimization_analysis.values()) |
|
|
|
|
|
charts['optimization_score'] = px.bar( |
|
|
x=categories, |
|
|
y=scores, |
|
|
title="⚡ Shorts Optimization Score", |
|
|
labels={'x': 'Optimization Factor', 'y': 'Score (0-100)'}, |
|
|
color=scores, |
|
|
color_continuous_scale='RdYlGn' |
|
|
) |
|
|
charts['optimization_score'].update_layout(xaxis_tickangle=45) |
|
|
|
|
|
|
|
|
engagement_types = self._analyze_engagement_types(self._shorts_comments) |
|
|
if engagement_types: |
|
|
engagement_labels = list(engagement_types.keys()) |
|
|
engagement_counts = list(engagement_types.values()) |
|
|
|
|
|
charts['engagement_types'] = px.pie( |
|
|
values=engagement_counts, |
|
|
names=engagement_labels, |
|
|
title="🎯 Viewer Engagement Types", |
|
|
color_discrete_sequence=['#ff6b6b', '#4ecdc4', '#45b7d1', '#96ceb4', '#feca57'] |
|
|
) |
|
|
|
|
|
|
|
|
participation_analysis = self._analyze_viewer_participation(self._shorts_comments) |
|
|
if participation_analysis: |
|
|
participation_types = list(participation_analysis.keys()) |
|
|
participation_counts = list(participation_analysis.values()) |
|
|
|
|
|
charts['participation'] = px.bar( |
|
|
x=participation_counts, |
|
|
y=participation_types, |
|
|
orientation='h', |
|
|
title="🎯 Shorts Participation Analysis", |
|
|
labels={'x': 'Comment Count', 'y': 'Participation Type'}, |
|
|
color=participation_counts, |
|
|
color_continuous_scale='viridis' |
|
|
) |
|
|
charts['participation'].update_layout(yaxis={'categoryorder': 'total ascending'}) |
|
|
|
|
|
|
|
|
sentiment_data = self._extract_sentiment_from_analysis() |
|
|
charts['sentiment'] = px.pie( |
|
|
values=list(sentiment_data.values()), |
|
|
names=list(sentiment_data.keys()), |
|
|
title="📊 Shorts Sentiment Analysis", |
|
|
color_discrete_sequence=['#2E8B57', '#DC143C', '#4682B4'] |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Shorts chart creation error: {str(e)}") |
|
|
|
|
|
return charts |
|
|
|
|
|
|
|
|
def _analyze_viewer_participation(self, comments: List[str]) -> Dict[str, int]: |
|
|
"""Analyze viewer participation patterns""" |
|
|
if not comments: |
|
|
return {} |
|
|
|
|
|
participation_patterns = { |
|
|
'🎉 Enthusiastic Reactions': ['대박', '완전', '미쳤다', 'amazing', 'incredible', 'awesome', '짱', '🔥', '💯'], |
|
|
'💬 Detailed Feedback': ['생각', '느낌', '의견', 'think', 'feel', 'opinion', '추천', 'recommend'], |
|
|
'❓ Questions & Curiosity': ['궁금', '질문', '뭐야', '어떻게', 'what', 'how', 'why', '?'], |
|
|
'🔔 Engagement Actions': ['구독', '좋아요', '알림', 'subscribe', 'like', 'notification', '팔로우'], |
|
|
'💡 Content Requests': ['해주세요', '만들어', '다음', '또', 'please', 'more', 'next'] |
|
|
} |
|
|
|
|
|
participation_counts = {category: 0 for category in participation_patterns.keys()} |
|
|
|
|
|
for comment in comments: |
|
|
comment_lower = comment.lower() |
|
|
for category, keywords in participation_patterns.items(): |
|
|
if any(keyword in comment_lower for keyword in keywords): |
|
|
participation_counts[category] += 1 |
|
|
|
|
|
return {k: v for k, v in participation_counts.items() if v > 0} |
|
|
|
|
|
def _analyze_viewer_reactions(self, comments: List[str]) -> Dict[str, int]: |
|
|
"""Analyze viewer reaction types""" |
|
|
if not comments: |
|
|
return {} |
|
|
|
|
|
reaction_patterns = { |
|
|
'👍 Praise/Positive': ['좋', '최고', '대박', 'good', 'great', 'awesome', '👍', '❤️', '재미'], |
|
|
'❓ Questions/Curiosity': ['궁금', '질문', '뭐야', 'what', 'how', 'why', '?'], |
|
|
'📞 Subscribe/Engagement': ['구독', '좋아요', '알림', 'subscribe', 'like', 'notification'], |
|
|
'💡 Suggestions/Requests': ['해주세요', '만들어', '다음에', 'please', '요청', '추천'] |
|
|
} |
|
|
|
|
|
reaction_counts = {category: 0 for category in reaction_patterns.keys()} |
|
|
|
|
|
for comment in comments: |
|
|
comment_lower = comment.lower() |
|
|
for category, keywords in reaction_patterns.items(): |
|
|
if any(keyword in comment_lower for keyword in keywords): |
|
|
reaction_counts[category] += 1 |
|
|
|
|
|
return {k: v for k, v in reaction_counts.items() if v > 0} |
|
|
|
|
|
def _analyze_competitive_advantage(self, comments: List[str]) -> Dict[str, int]: |
|
|
"""Analyze competitive advantage indicators""" |
|
|
if not comments: |
|
|
return {} |
|
|
|
|
|
advantage_indicators = { |
|
|
'🏆 Direct Comparisons': 0, |
|
|
'✨ Uniqueness Claims': 0, |
|
|
'🔄 Channel Switching': 0, |
|
|
'📢 Recommendation Intent': 0 |
|
|
} |
|
|
|
|
|
comparison_keywords = ['다른', '비교', 'compared', 'unlike', 'different', 'better than'] |
|
|
uniqueness_keywords = ['유일', '처음', '독특', 'unique', 'first time', 'only', 'special'] |
|
|
switching_keywords = ['구독취소', '갈아탔다', 'unsubscribed', 'switched', '이제여기만'] |
|
|
recommendation_keywords = ['추천', '공유', '알려', 'recommend', 'share', 'tell others'] |
|
|
|
|
|
for comment in comments: |
|
|
comment_lower = comment.lower() |
|
|
|
|
|
if any(keyword in comment_lower for keyword in comparison_keywords): |
|
|
advantage_indicators['🏆 Direct Comparisons'] += 1 |
|
|
if any(keyword in comment_lower for keyword in uniqueness_keywords): |
|
|
advantage_indicators['✨ Uniqueness Claims'] += 1 |
|
|
if any(keyword in comment_lower for keyword in switching_keywords): |
|
|
advantage_indicators['🔄 Channel Switching'] += 1 |
|
|
if any(keyword in comment_lower for keyword in recommendation_keywords): |
|
|
advantage_indicators['📢 Recommendation Intent'] += 1 |
|
|
|
|
|
return {k: v for k, v in advantage_indicators.items() if v > 0} |
|
|
|
|
|
def _analyze_shorts_optimization(self) -> Dict[str, float]: |
|
|
"""Analyze Shorts optimization factors""" |
|
|
if not self._shorts_comments: |
|
|
return {} |
|
|
|
|
|
comments = self._shorts_comments |
|
|
|
|
|
|
|
|
instant_reactions = ['와', '헐', '대박', 'wow', 'omg', '미쳤다'] |
|
|
instant_count = sum(1 for comment in comments |
|
|
if any(reaction in comment.lower() for reaction in instant_reactions)) |
|
|
|
|
|
short_comments = sum(1 for comment in comments if len(comment) <= 30) |
|
|
emoji_comments = sum(1 for comment in comments if re.search(r'[😀-🙏]', comment)) |
|
|
|
|
|
loop_keywords = ['다시', '또', 'again', 'replay', '반복'] |
|
|
loop_count = sum(1 for comment in comments |
|
|
if any(keyword in comment.lower() for keyword in loop_keywords)) |
|
|
|
|
|
algo_keywords = ['추천', '떴다', 'recommended', 'fyp'] |
|
|
algo_count = sum(1 for comment in comments |
|
|
if any(keyword in comment.lower() for keyword in algo_keywords)) |
|
|
|
|
|
return { |
|
|
'Instant Reactions': (instant_count / len(comments)) * 100, |
|
|
'Conciseness': (short_comments / len(comments)) * 100, |
|
|
'Visual Reactions': (emoji_comments / len(comments)) * 100, |
|
|
'Retention': (loop_count / len(comments)) * 100, |
|
|
'Algorithm Response': (algo_count / len(comments)) * 100 |
|
|
} |
|
|
|
|
|
def _analyze_engagement_types(self, comments: List[str]) -> Dict[str, int]: |
|
|
"""Analyze engagement types for Shorts""" |
|
|
if not comments: |
|
|
return {} |
|
|
|
|
|
engagement_types = { |
|
|
'⚡ Instant Reactions': 0, |
|
|
'💬 Opinion Expression': 0, |
|
|
'❓ Questions/Curiosity': 0, |
|
|
'🔄 Repeat Viewing': 0, |
|
|
'📢 Sharing/Recommendation': 0 |
|
|
} |
|
|
|
|
|
for comment in comments: |
|
|
comment_lower = comment.lower() |
|
|
comment_length = len(comment) |
|
|
|
|
|
|
|
|
instant_keywords = ['와', '헐', '대박', 'wow', 'omg', '👍', '🔥'] |
|
|
if (comment_length <= 20 and |
|
|
any(keyword in comment_lower for keyword in instant_keywords)): |
|
|
engagement_types['⚡ Instant Reactions'] += 1 |
|
|
elif ('?' in comment or '궁금' in comment_lower or |
|
|
any(q in comment_lower for q in ['what', 'how', 'why', '뭐야'])): |
|
|
engagement_types['❓ Questions/Curiosity'] += 1 |
|
|
elif any(keyword in comment_lower for keyword in ['다시', '또', 'again', 'replay']): |
|
|
engagement_types['🔄 Repeat Viewing'] += 1 |
|
|
elif any(keyword in comment_lower for keyword in ['공유', '보내', 'share', '추천']): |
|
|
engagement_types['📢 Sharing/Recommendation'] += 1 |
|
|
else: |
|
|
engagement_types['💬 Opinion Expression'] += 1 |
|
|
|
|
|
return {k: v for k, v in engagement_types.items() if v > 0} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _extract_sentiment_from_analysis(self) -> Dict[str, float]: |
|
|
"""Extract sentiment from analysis text with improved parsing""" |
|
|
|
|
|
if self._latest_analysis: |
|
|
analysis_text = self._latest_analysis['analysis_text'] |
|
|
|
|
|
|
|
|
sentiment_patterns = [ |
|
|
|
|
|
r'(\d+(?:\.\d+)?)%?\s*positive.*?(\d+(?:\.\d+)?)%?\s*neutral.*?(\d+(?:\.\d+)?)%?\s*negative', |
|
|
r'(\d+(?:\.\d+)?)%?\s*positive.*?(\d+(?:\.\d+)?)%?\s*negative.*?(\d+(?:\.\d+)?)%?\s*neutral', |
|
|
|
|
|
r'positive.*?(\d+(?:\.\d+)?)%.*?negative.*?(\d+(?:\.\d+)?)%.*?neutral.*?(\d+(?:\.\d+)?)%', |
|
|
r'positive.*?(\d+(?:\.\d+)?)%.*?neutral.*?(\d+(?:\.\d+)?)%.*?negative.*?(\d+(?:\.\d+)?)%', |
|
|
|
|
|
r'positive.*?(\d+(?:\.\d+)?)%', |
|
|
r'negative.*?(\d+(?:\.\d+)?)%', |
|
|
r'neutral.*?(\d+(?:\.\d+)?)%' |
|
|
] |
|
|
|
|
|
|
|
|
for pattern in sentiment_patterns[:4]: |
|
|
match = re.search(pattern, analysis_text, re.IGNORECASE) |
|
|
if match: |
|
|
groups = match.groups() |
|
|
if len(groups) == 3: |
|
|
|
|
|
if 'positive.*?neutral.*?negative' in pattern: |
|
|
pos, neu, neg = float(groups[0]), float(groups[1]), float(groups[2]) |
|
|
elif 'positive.*?negative.*?neutral' in pattern: |
|
|
pos, neg, neu = float(groups[0]), float(groups[1]), float(groups[2]) |
|
|
else: |
|
|
pos, neg, neu = float(groups[0]), float(groups[1]), float(groups[2]) |
|
|
|
|
|
|
|
|
total = pos + neg + neu |
|
|
if 80 <= total <= 120: |
|
|
|
|
|
if total > 0: |
|
|
return { |
|
|
'Positive': (pos / total) * 100, |
|
|
'Negative': (neg / total) * 100, |
|
|
'Neutral': (neu / total) * 100 |
|
|
} |
|
|
|
|
|
|
|
|
individual_sentiments = {} |
|
|
pos_match = re.search(r'positive.*?(\d+(?:\.\d+)?)%?', analysis_text, re.IGNORECASE) |
|
|
neg_match = re.search(r'negative.*?(\d+(?:\.\d+)?)%?', analysis_text, re.IGNORECASE) |
|
|
neu_match = re.search(r'neutral.*?(\d+(?:\.\d+)?)%?', analysis_text, re.IGNORECASE) |
|
|
|
|
|
if pos_match: |
|
|
individual_sentiments['Positive'] = float(pos_match.group(1)) |
|
|
if neg_match: |
|
|
individual_sentiments['Negative'] = float(neg_match.group(1)) |
|
|
if neu_match: |
|
|
individual_sentiments['Neutral'] = float(neu_match.group(1)) |
|
|
|
|
|
|
|
|
if len(individual_sentiments) == 3: |
|
|
total = sum(individual_sentiments.values()) |
|
|
if total > 0: |
|
|
return {k: (v / total) * 100 for k, v in individual_sentiments.items()} |
|
|
|
|
|
|
|
|
return self._estimate_sentiment_from_comments() |
|
|
|
|
|
|
|
|
def _estimate_sentiment_from_comments(self) -> Dict[str, float]: |
|
|
"""Estimate sentiment from comment content""" |
|
|
comments = (self._current_comments or self._multi_comments or |
|
|
self._shorts_comments or []) |
|
|
|
|
|
if not comments: |
|
|
return {'Positive': 60, 'Negative': 20, 'Neutral': 20} |
|
|
|
|
|
positive_keywords = [ |
|
|
|
|
|
'good', 'great', 'awesome', 'amazing', 'excellent', 'love', 'like', 'best', |
|
|
'wonderful', 'perfect', 'brilliant', 'fantastic', 'superb', 'incredible', |
|
|
'outstanding', 'impressive', 'nice', 'cool', 'sweet', 'epic', 'fire', |
|
|
|
|
|
'좋', '멋지', '재미', '최고', '대박', '훌륭', '감사', '구독', '짱', '완전', |
|
|
'굿', '쩐다', '지린다', '개좋', '꿀잼', '핵잼', '존잼', '레전드', '갓', |
|
|
|
|
|
'👍', '❤️', '😍', '🔥', '💯', '🎉', '😊', '😂', '👏', '🙌', '✨', '⭐' |
|
|
] |
|
|
|
|
|
negative_keywords = [ |
|
|
|
|
|
'bad', 'terrible', 'awful', 'hate', 'worst', 'boring', 'stupid', 'disappointed', |
|
|
'annoying', 'useless', 'waste', 'fail', 'sucks', 'disgusting', 'horrible', |
|
|
'trash', 'garbage', 'lame', 'weak', 'cringe', |
|
|
|
|
|
'별로', '싫', '안좋', '실망', '지루', '노잼', '재미없', '별거없', '구려', |
|
|
'망작', '쓰레기', '최악', '짜증', '화나', '어이없', '헛소리', |
|
|
|
|
|
'👎', '😞', '😠', '😡', '🤮', '💩', '😤', '🙄', '😒' |
|
|
] |
|
|
|
|
|
sample_size = min(100, len(comments)) |
|
|
sample_comments = comments[:sample_size] |
|
|
|
|
|
positive_count = sum(1 for comment in sample_comments |
|
|
if any(kw in comment.lower() for kw in positive_keywords)) |
|
|
negative_count = sum(1 for comment in sample_comments |
|
|
if any(kw in comment.lower() for kw in negative_keywords)) |
|
|
neutral_count = sample_size - positive_count - negative_count |
|
|
|
|
|
return { |
|
|
'Positive': max(positive_count, 0), |
|
|
'Negative': max(negative_count, 0), |
|
|
'Neutral': max(neutral_count, 0) |
|
|
} |
|
|
|
|
|
def _create_wordcloud(self, comments: List[str], title: str = "Word Cloud") -> str: |
|
|
"""Create WordCloud visualization with enhanced Korean font support and better filtering""" |
|
|
try: |
|
|
if not comments: |
|
|
return "<p>No data available for WordCloud</p>" |
|
|
|
|
|
|
|
|
all_text = ' '.join(comments) |
|
|
english_words = re.findall(r'[a-zA-Z]{3,}', all_text) |
|
|
korean_words = re.findall(r'[가-힣]{2,}', all_text) |
|
|
all_words = english_words + korean_words |
|
|
|
|
|
|
|
|
enhanced_stopwords = { |
|
|
|
|
|
'the', 'and', 'this', 'that', 'with', 'have', 'will', 'from', 'they', 'been', |
|
|
'are', 'was', 'but', 'not', 'you', 'all', 'can', 'her', 'his', 'she', 'for', |
|
|
|
|
|
|
|
|
'video', 'like', 'subscribe', 'comment', 'watch', 'channel', 'youtube', |
|
|
|
|
|
|
|
|
'이거', '그거', '진짜', '정말', '너무', '완전', '그냥', '좀', '약간', |
|
|
'영상', '댓글', '구독', '좋아요', '채널', '유튜브', |
|
|
|
|
|
|
|
|
'www', 'http', 'https', 'amp', 'com', 'net', 'org', 'html', 'htm', |
|
|
'href', 'link', 'url', 'src', 'img', 'div', 'span', 'class', 'style', |
|
|
'nbsp', 'quot', 'amp', 'lt', 'gt', 'script', 'meta', 'head', 'body', |
|
|
'title', 'alt', 'width', 'height', 'border', 'target', 'blank', |
|
|
|
|
|
|
|
|
'ntoxymm', 'zhd', 'uckszu', 'pbkr', 'dzesm', 'yaya', 'yes', 'jineer', |
|
|
'robak', 'xds', 'iew', 'yes', 'yeah', 'ugh', 'hmm', |
|
|
|
|
|
|
|
|
'bit', 'ly', 'tinyurl', 'shortlink', 'goo', 'gl', 'yt', 'youtu', 'be', |
|
|
'redirect', 'click', 'here', 'more', 'info', 'details' |
|
|
} |
|
|
|
|
|
|
|
|
filtered_words = [] |
|
|
for word in all_words: |
|
|
word_lower = word.lower() |
|
|
|
|
|
if word_lower in enhanced_stopwords: |
|
|
continue |
|
|
|
|
|
if len(word) >= 2 and not re.match(r'^[a-zA-Z가-힣]+$', word): |
|
|
continue |
|
|
|
|
|
if len(word) < 2: |
|
|
continue |
|
|
|
|
|
if len(set(word.lower())) <= 2 and len(word) > 3: |
|
|
continue |
|
|
|
|
|
filtered_words.append(word_lower) |
|
|
|
|
|
if not filtered_words: |
|
|
return "<p>No meaningful keywords available for WordCloud</p>" |
|
|
|
|
|
word_freq = Counter(filtered_words) |
|
|
|
|
|
|
|
|
font_path = None |
|
|
korean_font_candidates = [ |
|
|
'Nanum-Bold.ttf', |
|
|
'./Nanum-Bold.ttf', |
|
|
'/usr/share/fonts/truetype/nanum/NanumGothic.ttf', |
|
|
'/System/Library/Fonts/AppleGothic.ttf', |
|
|
'/Windows/Fonts/malgun.ttf' |
|
|
] |
|
|
|
|
|
for candidate in korean_font_candidates: |
|
|
if Path(candidate).exists(): |
|
|
font_path = candidate |
|
|
logger.info(f"Using Korean font: {candidate}") |
|
|
break |
|
|
|
|
|
try: |
|
|
|
|
|
if font_path: |
|
|
wordcloud = WordCloud( |
|
|
font_path=font_path, |
|
|
width=900, |
|
|
height=500, |
|
|
background_color='white', |
|
|
max_words=100, |
|
|
colormap='viridis', |
|
|
relative_scaling=0.4, |
|
|
min_font_size=12, |
|
|
prefer_horizontal=0.7, |
|
|
max_font_size=80, |
|
|
collocations=False |
|
|
).generate_from_frequencies(word_freq) |
|
|
else: |
|
|
|
|
|
english_only = [word for word in filtered_words if re.match(r'^[a-zA-Z]+$', word)] |
|
|
if english_only: |
|
|
english_freq = Counter(english_only) |
|
|
wordcloud = WordCloud( |
|
|
width=900, |
|
|
height=500, |
|
|
background_color='white', |
|
|
max_words=80, |
|
|
colormap='viridis', |
|
|
relative_scaling=0.4, |
|
|
min_font_size=12, |
|
|
collocations=False |
|
|
).generate_from_frequencies(english_freq) |
|
|
else: |
|
|
return "<p>No English keywords available for WordCloud</p>" |
|
|
|
|
|
except Exception as wc_error: |
|
|
logger.warning(f"WordCloud generation failed: {wc_error}") |
|
|
|
|
|
return f"<p>WordCloud generation failed: {str(wc_error)}</p>" |
|
|
|
|
|
|
|
|
plt.figure(figsize=(12, 6)) |
|
|
plt.imshow(wordcloud, interpolation='bilinear') |
|
|
plt.axis('off') |
|
|
|
|
|
|
|
|
buffer = io.BytesIO() |
|
|
plt.savefig(buffer, format='png', bbox_inches='tight', dpi=150, |
|
|
facecolor='white', edgecolor='none') |
|
|
plt.close() |
|
|
buffer.seek(0) |
|
|
|
|
|
img_base64 = base64.b64encode(buffer.getvalue()).decode() |
|
|
|
|
|
return f'<img src="data:image/png;base64,{img_base64}" style="max-width: 100%; height: auto;" alt="{title} WordCloud">' |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"WordCloud generation error: {str(e)}") |
|
|
return f"<p>WordCloud generation failed: {str(e)}</p>" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_comprehensive_interface(): |
|
|
"""Create the comprehensive Gradio interface""" |
|
|
|
|
|
app = ComprehensiveYouTubeAnalyzer() |
|
|
|
|
|
|
|
|
css = """ |
|
|
.gradio-container { |
|
|
max-width: 1400px !important; |
|
|
margin: auto !important; |
|
|
} |
|
|
""" |
|
|
|
|
|
with gr.Blocks( |
|
|
title="🎬 YouTube Comment Analyzer - MCP Hackathon 2025", |
|
|
theme=gr.themes.Soft(), |
|
|
css=css |
|
|
) as demo: |
|
|
|
|
|
gr.Markdown(""" |
|
|
# 🎬 YouTube Comment Analyzer |
|
|
## MCP Hackathon 2025 - Track 1 Submission |
|
|
|
|
|
**AI-powered comment sentiment analysis with comprehensive creator insights** |
|
|
|
|
|
🔧 **MCP Server Implementation** | 📊 **Creator Intelligence** | 🎯 **Growth Optimization** |
|
|
""") |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
gr.Markdown(f""" |
|
|
### 🔧 System Status |
|
|
- **YouTube API**: {'✅ Ready' if app.youtube_client else '❌ Not Available'} |
|
|
- **Claude AI**: {'✅ Ready' if app.claude_analyzer else '❌ Not Available'} |
|
|
- **MCP Server**: ✅ Implemented with 4 tools |
|
|
- **Analysis Modes**: 🎬 Single Video | 📊 Multi-Video | ⚡ Shorts *(located in Multi-Video tab)* |
|
|
""") |
|
|
|
|
|
with gr.Tab("🎬 Single Video Analysis"): |
|
|
|
|
|
gr.Markdown("### 1️⃣ Search for Channel") |
|
|
with gr.Row(): |
|
|
creator_input_single = gr.Textbox( |
|
|
label="Creator Name or Channel ID", |
|
|
placeholder="Enter channel name (e.g., MrBeast) or Channel ID (e.g., UC-lHJZR3Gqxm24_Vd_AJ5Yw)", |
|
|
scale=3 |
|
|
) |
|
|
search_btn_single = gr.Button("🔍 Search", variant="primary", scale=1) |
|
|
|
|
|
search_result_single = gr.Markdown() |
|
|
channel_dropdown_single = gr.Dropdown(label="📺 Select Channel", choices=[]) |
|
|
|
|
|
gr.Markdown("### 2️⃣ Get Videos") |
|
|
with gr.Row(): |
|
|
max_videos_single = gr.Slider(5, 50, 10, step=5, label="Max Videos to Retrieve") |
|
|
get_videos_btn_single = gr.Button("📹 Get Videos", variant="secondary") |
|
|
|
|
|
videos_result_single = gr.Markdown() |
|
|
video_dropdown_single = gr.Dropdown(label="🎬 Select Video", choices=[]) |
|
|
|
|
|
gr.Markdown("### 3️⃣ Collect & Analyze Comments") |
|
|
max_comments_single = gr.Slider(50, 1000, 200, step=50, label="Max Comments to Analyze") |
|
|
|
|
|
with gr.Row(): |
|
|
collect_btn_single = gr.Button("💬 Collect Comments", variant="secondary") |
|
|
analyze_btn_single = gr.Button("🤖 Analyze with Claude AI", variant="primary", size="lg") |
|
|
|
|
|
comments_result_single = gr.Markdown() |
|
|
analysis_result_single = gr.Markdown() |
|
|
|
|
|
gr.Markdown("### 📈 Visual Analytics") |
|
|
with gr.Row(): |
|
|
sentiment_chart_single = gr.Plot(label="Sentiment Distribution") |
|
|
participation_chart_single = gr.Plot(label="Viewer Participation") |
|
|
|
|
|
reaction_chart_single = gr.Plot(label="Response Types") |
|
|
|
|
|
gr.Markdown("### 🔤 WordCloud") |
|
|
wordcloud_display_single = gr.HTML(label="Keywords WordCloud") |
|
|
|
|
|
with gr.Tab("📊 Multi-Video Analysis"): |
|
|
|
|
|
with gr.Row(): |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("## 📹 Regular Videos Analysis") |
|
|
|
|
|
with gr.Row(): |
|
|
creator_input_multi = gr.Textbox( |
|
|
label="Creator Name or Channel ID", |
|
|
placeholder="Enter channel name or Channel ID" |
|
|
) |
|
|
search_btn_multi = gr.Button("🔍 Search", variant="primary") |
|
|
|
|
|
search_result_multi = gr.Markdown() |
|
|
channel_dropdown_multi = gr.Dropdown(label="📺 Select Channel", choices=[]) |
|
|
|
|
|
gr.Markdown("**Comment Sort Order**: Most Popular (Developer Configured)") |
|
|
|
|
|
with gr.Row(): |
|
|
num_videos_multi = gr.Slider(3, 20, 5, step=1, label="Videos") |
|
|
comments_per_video_multi = gr.Slider(50, 500, 100, step=50, label="Comments/Video") |
|
|
|
|
|
collect_btn_multi = gr.Button("📊 Collect Comments", variant="secondary") |
|
|
comments_result_multi = gr.Markdown() |
|
|
|
|
|
analyze_btn_multi = gr.Button("🎯 Multi-Video Analysis", variant="primary") |
|
|
|
|
|
analysis_result_multi = gr.Markdown() |
|
|
|
|
|
gr.Markdown("### 📈 Channel Analytics") |
|
|
sentiment_chart_multi = gr.Plot(label="Channel Sentiment") |
|
|
participation_chart_multi = gr.Plot(label="Channel Participation") |
|
|
interest_chart_multi = gr.Plot(label="Video Interest Trends") |
|
|
competitive_chart_multi = gr.Plot(label="Competitive Edge") |
|
|
|
|
|
gr.Markdown("### 🔤 Channel WordCloud") |
|
|
wordcloud_display_multi = gr.HTML(label="Channel Keywords WordCloud") |
|
|
|
|
|
|
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("## ⚡ YouTube Shorts Analysis") |
|
|
|
|
|
with gr.Row(): |
|
|
creator_input_shorts = gr.Textbox( |
|
|
label="Creator Name or Channel ID", |
|
|
placeholder="Enter channel name or Channel ID" |
|
|
) |
|
|
search_btn_shorts = gr.Button("🔍 Search", variant="primary") |
|
|
|
|
|
search_result_shorts = gr.Markdown() |
|
|
channel_dropdown_shorts = gr.Dropdown(label="📺 Select Channel", choices=[]) |
|
|
|
|
|
gr.Markdown("**Comment Sort Order**: Most Popular (Developer Configured)") |
|
|
|
|
|
with gr.Row(): |
|
|
num_shorts = gr.Slider(3, 15, 5, step=1, label="Shorts") |
|
|
comments_per_short = gr.Slider(25, 300, 50, step=25, label="Comments/Short") |
|
|
|
|
|
collect_btn_shorts = gr.Button("⚡ Collect Comments", variant="secondary") |
|
|
comments_result_shorts = gr.Markdown() |
|
|
|
|
|
analyze_btn_shorts = gr.Button("🎭 Shorts Analysis", variant="primary") |
|
|
|
|
|
analysis_result_shorts = gr.Markdown() |
|
|
|
|
|
gr.Markdown("### 📈 Shorts Analytics") |
|
|
optimization_chart_shorts = gr.Plot(label="Optimization Score") |
|
|
engagement_chart_shorts = gr.Plot(label="Engagement Types") |
|
|
participation_chart_shorts = gr.Plot(label="Participation Analysis") |
|
|
sentiment_chart_shorts = gr.Plot(label="Sentiment Analysis") |
|
|
|
|
|
gr.Markdown("### 🔤 Shorts WordCloud") |
|
|
wordcloud_display_shorts = gr.HTML(label="Shorts Keywords WordCloud") |
|
|
|
|
|
with gr.Tab("ℹ️ About & MCP Integration"): |
|
|
gr.Markdown(""" |
|
|
## 🎯 Project Overview |
|
|
|
|
|
This comprehensive YouTube Comment Analyzer demonstrates **Model Context Protocol (MCP)** implementation for advanced content creator intelligence. Our platform transforms raw comment data into strategic business insights across three specialized analysis scenarios. |
|
|
|
|
|
### 🔧 MCP Server Implementation |
|
|
- **4 Integrated MCP Tools**: Complete workflow from channel discovery to AI insights |
|
|
- **Claude Desktop Compatible**: Seamless integration with MCP protocol |
|
|
- **Real-time Processing**: Streaming analysis with comprehensive visual feedback |
|
|
|
|
|
### 🎯 Three Analysis Scenarios |
|
|
|
|
|
#### 🎬 **Single Video Deep Dive** |
|
|
- Detailed performance breakdown for individual videos |
|
|
- Specific feedback analysis and improvement recommendations |
|
|
- Subscription conversion factor identification |
|
|
- Technical and creative enhancement suggestions |
|
|
|
|
|
#### 📊 **Multi-Video Channel Strategy** |
|
|
- Cross-video pattern recognition and trend analysis |
|
|
- Brand consistency evaluation and audience segmentation |
|
|
- Content mix optimization and growth strategy development |
|
|
- Long-term channel development planning |
|
|
|
|
|
#### ⚡ **YouTube Shorts Optimization** *(Located in Multi-Video Tab)* |
|
|
- Short-form content performance metrics and viral potential assessment |
|
|
- Instant reaction pattern analysis and engagement optimization |
|
|
- Format-specific recommendations and competitive positioning |
|
|
- Shorts vs Regular video comparative analysis |
|
|
|
|
|
### 🤖 AI-Powered Creator Intelligence |
|
|
|
|
|
#### **Advanced Analytics Capabilities** |
|
|
- **Sentiment Classification**: Multi-dimensional emotional analysis beyond basic positive/negative |
|
|
- **Behavioral Insights**: Deep audience psychology and engagement pattern recognition |
|
|
- **Competitive Intelligence**: Market positioning and differentiation analysis |
|
|
- **Monetization Optimization**: Revenue growth opportunities and brand partnership insights |
|
|
|
|
|
#### **Visual Intelligence Dashboard** |
|
|
- **Real-time Charts**: Interactive sentiment, participation, and performance analytics |
|
|
- **WordCloud Generation**: Keyword extraction with multilingual support and enhanced filtering |
|
|
- **Trend Visualization**: Engagement patterns and audience behavior mapping |
|
|
- **Comparative Analytics**: Cross-format and competitive benchmarking |
|
|
|
|
|
### 🚀 Business Impact & Creator Value |
|
|
|
|
|
#### **Immediate Actionable Results** |
|
|
- **Content Optimization**: Specific technical and creative improvements |
|
|
- **Audience Development**: Demographic insights and retention strategies |
|
|
- **Growth Acceleration**: Data-driven subscriber acquisition tactics |
|
|
- **Risk Management**: Early warning systems for potential issues |
|
|
|
|
|
#### **Revenue Optimization** |
|
|
- **Brand Partnership Matching**: Sponsor suitability analysis |
|
|
- **Product Development**: Market demand identification from viewer requests |
|
|
- **Monetization Strategy**: Multi-channel revenue stream optimization |
|
|
- **ROI Measurement**: Performance tracking and business outcome correlation |
|
|
|
|
|
### 🏆 MCP Hackathon Innovation |
|
|
|
|
|
This project showcases the transformative potential of MCP protocol in creating practical, business-focused AI tools. By combining YouTube's vast content ecosystem with Claude's analytical capabilities through standardized MCP integration, we've developed a solution that directly addresses real-world creator challenges and drives measurable business outcomes. |
|
|
|
|
|
### 🔬 Technical Excellence |
|
|
- **Scalable Architecture**: Modular design supporting future enhancements |
|
|
- **API Optimization**: Efficient resource usage with comprehensive error handling |
|
|
- **Security Implementation**: Secure credential management and data protection |
|
|
- **Multi-format Support**: Adaptive analysis for diverse content types |
|
|
- **Enhanced Filtering**: Advanced WordCloud generation with HTML tag removal and Korean font support |
|
|
|
|
|
**Built for Hugging Face MCP Hackathon 2025 - Track 1: MCP Server Implementation** |
|
|
|
|
|
Demonstrating the future of AI-powered creator tools through practical MCP integration. |
|
|
""") |
|
|
|
|
|
|
|
|
search_btn_single.click( |
|
|
app.search_channels, |
|
|
inputs=[creator_input_single], |
|
|
outputs=[search_result_single, channel_dropdown_single] |
|
|
) |
|
|
|
|
|
get_videos_btn_single.click( |
|
|
app.get_videos, |
|
|
inputs=[channel_dropdown_single, max_videos_single], |
|
|
outputs=[videos_result_single, video_dropdown_single] |
|
|
) |
|
|
|
|
|
collect_btn_single.click( |
|
|
app.collect_single_comments, |
|
|
inputs=[video_dropdown_single, max_comments_single], |
|
|
outputs=[comments_result_single] |
|
|
) |
|
|
|
|
|
analyze_btn_single.click( |
|
|
app.analyze_single_comments, |
|
|
outputs=[ |
|
|
analysis_result_single, |
|
|
sentiment_chart_single, |
|
|
participation_chart_single, |
|
|
reaction_chart_single, |
|
|
wordcloud_display_single |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
search_btn_multi.click( |
|
|
app.search_channels, |
|
|
inputs=[creator_input_multi], |
|
|
outputs=[search_result_multi, channel_dropdown_multi] |
|
|
) |
|
|
|
|
|
collect_btn_multi.click( |
|
|
app.collect_multi_comments, |
|
|
inputs=[channel_dropdown_multi, num_videos_multi, comments_per_video_multi], |
|
|
outputs=[comments_result_multi] |
|
|
) |
|
|
|
|
|
analyze_btn_multi.click( |
|
|
app.analyze_multi_comments, |
|
|
outputs=[ |
|
|
analysis_result_multi, |
|
|
sentiment_chart_multi, |
|
|
participation_chart_multi, |
|
|
interest_chart_multi, |
|
|
competitive_chart_multi, |
|
|
wordcloud_display_multi |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
search_btn_shorts.click( |
|
|
app.search_channels, |
|
|
inputs=[creator_input_shorts], |
|
|
outputs=[search_result_shorts, channel_dropdown_shorts] |
|
|
) |
|
|
|
|
|
collect_btn_shorts.click( |
|
|
app.collect_shorts_comments, |
|
|
inputs=[channel_dropdown_shorts, num_shorts, comments_per_short], |
|
|
outputs=[comments_result_shorts] |
|
|
) |
|
|
|
|
|
analyze_btn_shorts.click( |
|
|
app.analyze_shorts_comments, |
|
|
outputs=[ |
|
|
analysis_result_shorts, |
|
|
optimization_chart_shorts, |
|
|
engagement_chart_shorts, |
|
|
participation_chart_shorts, |
|
|
sentiment_chart_shorts, |
|
|
wordcloud_display_shorts |
|
|
] |
|
|
) |
|
|
|
|
|
return demo |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
print("🚀 Launching YouTube Comment Analyzer...") |
|
|
demo = create_comprehensive_interface() |
|
|
demo.launch( |
|
|
server_name="0.0.0.0", |
|
|
server_port=7860, |
|
|
share=False |
|
|
) |