Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import pandas as pd | |
| from googleapiclient.discovery import build | |
| import plotly.express as px | |
| import base64 | |
| import numpy as np | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.cluster import KMeans | |
| from datetime import datetime, timedelta | |
| import os | |
| from huggingface_hub import InferenceClient # Hugging Face Hub API μ¬μ© | |
| # μ¬κΈ°μ YouTube API ν€λ₯Ό μ λ ₯νμΈμ | |
| YOUTUBE_API_KEY = "AIza" | |
| def create_client(model_name): | |
| token = os.getenv("HF_TOKEN") | |
| return InferenceClient(model=model_name, token=token) | |
| client = create_client("CohereForAI/c4ai-command-r-plus") | |
| def get_video_stats(video_id): | |
| youtube = build("youtube", "v3", developerKey=YOUTUBE_API_KEY) | |
| video_response = youtube.videos().list( | |
| part="snippet,statistics", | |
| id=video_id | |
| ).execute() | |
| video = video_response["items"][0] | |
| title = video["snippet"]["title"] | |
| channel_id = video["snippet"]["channelId"] | |
| publish_time = video["snippet"]["publishedAt"] | |
| view_count = int(video["statistics"].get("viewCount", 0)) | |
| like_count = int(video["statistics"].get("likeCount", 0)) | |
| comment_count = int(video["statistics"].get("commentCount", 0)) | |
| return { | |
| "λμμ ID": video_id, | |
| "μ λͺ©": title, | |
| "κ²μ μκ°": publish_time, | |
| "μ±λ ID": channel_id, | |
| "μ‘°νμ": view_count, | |
| "μ’μμ μ": like_count, | |
| "λκΈ μ": comment_count | |
| } | |
| def get_channel_stats(channel_id): | |
| youtube = build("youtube", "v3", developerKey=YOUTUBE_API_KEY) | |
| channel_response = youtube.channels().list( | |
| part="statistics", | |
| id=channel_id | |
| ).execute() | |
| if channel_response["items"]: | |
| channel = channel_response["items"][0] | |
| subscriber_count = int(channel["statistics"]["subscriberCount"]) | |
| else: | |
| subscriber_count = 0 | |
| return subscriber_count | |
| def get_video_data(query, max_results, published_after, published_before): | |
| youtube = build("youtube", "v3", developerKey=YOUTUBE_API_KEY) | |
| video_ids = [] | |
| next_page_token = None | |
| while len(video_ids) < max_results: | |
| search_response = youtube.search().list( | |
| q=query, | |
| type="video", | |
| part="id", | |
| maxResults=50, | |
| pageToken=next_page_token, | |
| order="viewCount", | |
| publishedAfter=published_after, | |
| publishedBefore=published_before | |
| ).execute() | |
| video_ids.extend([item["id"]["videoId"] for item in search_response["items"]]) | |
| next_page_token = search_response.get("nextPageToken") | |
| if not next_page_token: | |
| break | |
| video_ids = video_ids[:max_results] | |
| video_stats = [] | |
| for video_id in video_ids: | |
| stats = get_video_stats(video_id) | |
| channel_id = stats["μ±λ ID"] | |
| subscriber_count = get_channel_stats(channel_id) | |
| stats["ꡬλ μ μ"] = subscriber_count | |
| video_stats.append(stats) | |
| video_stats_df = pd.DataFrame(video_stats) | |
| return video_stats_df | |
| def download_csv(df, filename): | |
| csv = df.to_csv(index=False) | |
| b64 = base64.b64encode(csv.encode()).decode() | |
| href = f'<a href="data:file/csv;base64,{b64}" download="{filename}.csv">λ€μ΄λ‘λ {filename} CSV</a>' | |
| return href | |
| def visualize_video_ranking(video_stats_df): | |
| video_stats_df["νμ± μ§μ"] = video_stats_df["μ‘°νμ"] / video_stats_df["ꡬλ μ μ"] | |
| csv_download_link = download_csv(video_stats_df, "video_stats") | |
| fig = px.bar(video_stats_df, x="λμμ ID", y="νμ± μ§μ", color="μ‘°νμ", | |
| labels={"λμμ ID": "λμμ ID", "νμ± μ§μ": "νμ± μ§μ"}, | |
| title="λμμ νμ± μ§μ") | |
| fig.update_layout(height=500, width=500) | |
| return video_stats_df, fig, csv_download_link | |
| def analyze_titles(video_stats_df, n_clusters=5): | |
| titles = video_stats_df['μ λͺ©'].tolist() | |
| vectorizer = TfidfVectorizer() | |
| tfidf_matrix = vectorizer.fit_transform(titles) | |
| kmeans = KMeans(n_clusters=n_clusters, random_state=42) | |
| kmeans.fit(tfidf_matrix) | |
| labels = kmeans.labels_ | |
| video_stats_df["ν΄λ¬μ€ν°"] = labels | |
| cluster_summaries = [] | |
| for i in range(n_clusters): | |
| cluster_titles = video_stats_df[video_stats_df["ν΄λ¬μ€ν°"] == i]['μ λͺ©'].tolist() | |
| cluster_text = ' '.join(cluster_titles) | |
| summary = summarize_cluster(cluster_text, i) | |
| cluster_summaries.append(summary) | |
| cluster_summary_df = pd.DataFrame({'ν΄λ¬μ€ν°': range(n_clusters), 'μμ½': cluster_summaries}) | |
| return cluster_summary_df | |
| def summarize_cluster(cluster_text, cluster_num): | |
| prompt = f"λ€μ λμμμ λΆμνμ¬ μμ½νκ³ , 500μ μ΄λ΄λ‘ λμμμ νΉμ§ λ° μΈκΈ° μμΈμ μ€λͺ ν΄μ£ΌμΈμ: {cluster_text}" | |
| response = client.generate(inputs=prompt) | |
| summary = response.generated_text.strip() | |
| return summary | |
| def main(query, max_results, period, page, n_clusters=5): | |
| if query: | |
| # κΈ°κ° μ€μ | |
| now = datetime.utcnow() | |
| published_before = now.isoformat("T") + "Z" | |
| if period == "1μ£ΌμΌ": | |
| published_after = (now - timedelta(days=7)).isoformat("T") + "Z" | |
| elif period == "1κ°μ": | |
| published_after = (now - timedelta(days=30)).isoformat("T") + "Z" | |
| elif period == "3κ°μ": | |
| published_after = (now - timedelta(days=90)).isoformat("T") + "Z" | |
| else: | |
| published_after = (now - timedelta(days=30)).isoformat("T") + "Z" # κΈ°λ³Έκ° 1κ°μ | |
| video_stats_df = get_video_data(query, max_results, published_after, published_before) | |
| if page == "Video Ranking": | |
| video_stats_df, fig, csv_download_link = visualize_video_ranking(video_stats_df) | |
| return video_stats_df, fig, csv_download_link | |
| elif page == "Title Analysis": | |
| cluster_summary_df = analyze_titles(video_stats_df, n_clusters) | |
| return cluster_summary_df, None, None | |
| iface = gr.Interface( | |
| fn=main, | |
| inputs=[ | |
| gr.components.Textbox(label="κ²μ 쿼리"), | |
| gr.components.Number(label="μ΅λ κ²°κ³Ό μ", value=5, precision=0, minimum=1, maximum=1000), | |
| gr.components.Dropdown(["1μ£ΌμΌ", "1κ°μ", "3κ°μ"], label="κΈ°κ°"), | |
| gr.components.Dropdown(["Video Ranking", "Title Analysis"], label="νμ΄μ§"), | |
| gr.components.Number(label="ν΄λ¬μ€ν° μ", value=5, precision=0, minimum=2, maximum=10) | |
| ], | |
| outputs=[ | |
| gr.components.Dataframe(label="κ²°κ³Ό"), | |
| gr.components.Plot(label="κ·Έλν"), | |
| gr.components.HTML(label="CSV λ€μ΄λ‘λ λ§ν¬") | |
| ], | |
| live=False, | |
| title="YouTube λΆμ λꡬ" | |
| ) | |
| if __name__ == "__main__": | |
| iface.launch() | |