Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| import networkx as nx | |
| import os | |
| def process_file(file): | |
| file_ext = os.path.splitext(file.name)[1].lower() | |
| if file_ext == '.csv': | |
| df = pd.read_csv(file.name) | |
| elif file_ext in ['.xls', '.xlsx']: | |
| df = pd.read_excel(file.name) | |
| else: | |
| return "Unsupported file format", None, None, None, None, None | |
| required_cols = ['comment_id', 'text', 'like_count', 'author_name', 'author_channel_id', 'published_at', 'parent_id'] | |
| missing = [col for col in required_cols if col not in df.columns] | |
| if missing: | |
| return f"Missing columns: {', '.join(missing)}", None, None, None, None, None | |
| df['date'] = pd.to_datetime(df['published_at'], errors='coerce') | |
| df = df.dropna(subset=['date']) | |
| df['date_only'] = df['date'].dt.date | |
| # Messages per Day | |
| messages_per_day = df.groupby("date_only").size().reset_index(name="count") | |
| fig1 = px.line(messages_per_day, x="date_only", y="count", title="Messages per Day") | |
| # Top Authors | |
| top_authors = df['author_name'].value_counts().nlargest(20).reset_index() | |
| top_authors.columns = ['author', 'count'] | |
| fig2 = px.bar(top_authors, x='author', y='count', title="Top 20 Authors", text='count') | |
| fig2.update_layout(xaxis_tickangle=-45) | |
| # Timeline of Comments (like_count if exists else 0) | |
| y_data = df['like_count'].fillna(0) | |
| hover_cols = ['author_name', 'text', 'like_count'] | |
| fig3 = px.scatter( | |
| df, | |
| x='date', | |
| y=y_data, | |
| hover_data=hover_cols, | |
| title="Comments Over Time (Likes)", | |
| labels={'like_count': 'Like Count', 'date': 'Date'} | |
| ) | |
| fig3.update_traces(marker=dict(size=6, opacity=0.7)) | |
| fig3.update_layout(yaxis=dict(title='Like Count')) | |
| # Save to CSV for keyword search | |
| df.to_csv("latest_data.csv", index=False) | |
| # Build network HTML and save permanently | |
| network_html_content, network_path = build_network_html_plotly(df) | |
| return "Success", fig1, fig2, fig3, network_html_content, network_path | |
| def build_network_html_plotly(df): | |
| G = nx.DiGraph() | |
| for _, row in df.iterrows(): | |
| author = str(row['author_name']) | |
| comment_id = str(row['comment_id']) | |
| parent_id = row['parent_id'] | |
| G.add_node(comment_id, label=author) | |
| G.add_edge(author, comment_id) | |
| if pd.notna(parent_id): | |
| G.add_edge(str(parent_id), comment_id) | |
| pos = nx.spring_layout(G, seed=42) | |
| edge_x = [] | |
| edge_y = [] | |
| for edge in G.edges(): | |
| x0, y0 = pos[edge[0]] | |
| x1, y1 = pos[edge[1]] | |
| edge_x += [x0, x1, None] | |
| edge_y += [y0, y1, None] | |
| edge_trace = go.Scatter( | |
| x=edge_x, y=edge_y, | |
| line=dict(width=0.5, color='#888'), | |
| hoverinfo='none', | |
| mode='lines') | |
| node_x = [] | |
| node_y = [] | |
| text = [] | |
| for node in G.nodes(): | |
| x, y = pos[node] | |
| node_x.append(x) | |
| node_y.append(y) | |
| text.append(G.nodes[node].get('label', node)) | |
| node_trace = go.Scatter( | |
| x=node_x, y=node_y, | |
| mode='markers+text', | |
| text=text, | |
| textposition="top center", | |
| hoverinfo='text', | |
| marker=dict( | |
| showscale=False, | |
| color='LightSkyBlue', | |
| size=10, | |
| line_width=2)) | |
| fig = go.Figure(data=[edge_trace, node_trace], | |
| layout=go.Layout( | |
| title='Comment Thread Network', | |
| showlegend=False, | |
| hovermode='closest', | |
| margin=dict(b=20, l=5, r=5, t=40), | |
| xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), | |
| yaxis=dict(showgrid=False, zeroline=False, showticklabels=False)) | |
| ) | |
| # Save to permanent HTML file | |
| html_path = "network.html" | |
| fig.write_html(html_path) | |
| with open(html_path, "r", encoding="utf-8") as f: | |
| html_content = f.read() | |
| return html_content, html_path | |
| def search_keyword(keyword): | |
| if not os.path.exists("latest_data.csv"): | |
| return pd.DataFrame(columns=['date', 'author_name', 'like_count', 'text']) | |
| df = pd.read_csv("latest_data.csv") | |
| if 'text' not in df.columns: | |
| return pd.DataFrame(columns=['date', 'author_name', 'like_count', 'text']) | |
| mask = df['text'].astype(str).str.contains(keyword, case=False, na=False) | |
| result = df.loc[mask, ['published_at', 'author_name', 'like_count', 'text']] | |
| result['date'] = pd.to_datetime(result['published_at'], errors='coerce') | |
| result = result.dropna(subset=['date']) | |
| return result[['date', 'author_name', 'like_count', 'text']].head(100) | |
| # Gradio interface | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## π§ YouTube Comment Thread Analyzer with Timeline + Search") | |
| file_input = gr.File(label="π Upload CSV or XLSX (YouTube API v3 format)") | |
| status = gr.Textbox(label="β Status") | |
| plot1 = gr.Plot(label="π Messages per Day") | |
| plot2 = gr.Plot(label="π€ Top 20 Authors") | |
| timeline = gr.Plot(label="π Comment Timeline") | |
| network_html = gr.HTML(label="π§΅ Thread Network") | |
| download_network = gr.File(label="β¬οΈ Download Network HTML", interactive=False) | |
| with gr.Row(): | |
| keyword_input = gr.Textbox(label="π Search Keyword in Comments") | |
| search_button = gr.Button("Search") | |
| search_results = gr.Dataframe(headers=["date", "author_name", "like_count", "text"], label="π Search Results") | |
| file_input.change( | |
| fn=process_file, | |
| inputs=file_input, | |
| outputs=[status, plot1, plot2, timeline, network_html, download_network] | |
| ) | |
| search_button.click(fn=search_keyword, inputs=keyword_input, outputs=search_results) | |
| if __name__ == "__main__": | |
| demo.launch() | |