Spaces:
Sleeping
Sleeping
| """ | |
| app.py β Gradio UI for BERTopic Agentic Thematic Analysis | |
| """ | |
| import gradio as gr | |
| import pandas as pd | |
| from agent import run_agent | |
| def format_chat_history(history): | |
| """Convert list-of-tuples to Gradio chatbot format.""" | |
| # Keep for compatibility; actual normalization happens in handlers. | |
| return history | |
| def send_message(user_message, chat_history, file_path, thread_id): | |
| """Forward user message to agent and return updated chat + state.""" | |
| if not user_message.strip(): | |
| return chat_history, "", gr.update(), gr.update() | |
| # Normalize incoming chat_history (Gradio may provide list of dicts) | |
| def _to_agent_history(hist): | |
| if not hist: | |
| return [] | |
| if isinstance(hist[0], dict): | |
| agent_hist = [] | |
| i = 0 | |
| while i < len(hist) - 1: | |
| a, b = hist[i], hist[i+1] | |
| if a.get("role", "") in ("user", "human") and b.get("role", "") in ("assistant", "ai"): | |
| agent_hist.append((a.get("content", ""), b.get("content", ""))) | |
| i += 2 | |
| else: | |
| i += 1 | |
| return agent_hist | |
| return hist or [] | |
| def _to_gradio_history_from_agent(hist): | |
| gr_hist = [] | |
| for t in hist: | |
| if isinstance(t, (list, tuple)) and len(t) >= 2: | |
| gr_hist.append({"role": "user", "content": t[0]}) | |
| gr_hist.append({"role": "assistant", "content": t[1]}) | |
| return gr_hist | |
| agent_chat_history = _to_agent_history(chat_history) | |
| context = {"file_path": file_path, "thread_id": thread_id} | |
| response, review_data, phase_html = run_agent(user_message, context, agent_chat_history) | |
| # Build gradio-compatible history | |
| if isinstance(chat_history, list) and chat_history and isinstance(chat_history[0], dict): | |
| new_chat = chat_history.copy() | |
| else: | |
| new_chat = _to_gradio_history_from_agent(agent_chat_history) | |
| new_chat.append({"role": "user", "content": user_message}) | |
| new_chat.append({"role": "assistant", "content": response}) | |
| review_df = pd.DataFrame(review_data) if review_data else pd.DataFrame( | |
| columns=["#", "Topic Label", "Top Evidence", "Sentences", "Papers", | |
| "Approve", "Rename To", "Reasoning"] | |
| ) | |
| return new_chat, "", review_df, phase_html | |
| def submit_review(review_df, chat_history, file_path, thread_id): | |
| """Send the edited review table back to the agent.""" | |
| table_json = review_df.to_json(orient="records") | |
| review_message = f"[REVIEW_TABLE_SUBMITTED]\n{table_json}" | |
| context = {"file_path": file_path, "thread_id": thread_id} | |
| # Normalize incoming history similar to send_message | |
| def _to_agent_history_for_submit(hist): | |
| if not hist: | |
| return [] | |
| if isinstance(hist[0], dict): | |
| agent_hist = [] | |
| i = 0 | |
| while i < len(hist) - 1: | |
| a, b = hist[i], hist[i+1] | |
| if a.get("role", "") in ("user", "human") and b.get("role", "") in ("assistant", "ai"): | |
| agent_hist.append((a.get("content", ""), b.get("content", ""))) | |
| i += 2 | |
| else: | |
| i += 1 | |
| return agent_hist | |
| return hist or [] | |
| agent_chat_history = _to_agent_history_for_submit(chat_history) | |
| response, new_review_data, phase_html = run_agent(review_message, context, agent_chat_history) | |
| # Build gradio-compatible history | |
| if isinstance(chat_history, list) and chat_history and isinstance(chat_history[0], dict): | |
| new_chat = chat_history.copy() | |
| else: | |
| def _to_gradio(hist): | |
| out = [] | |
| for t in (hist or []): | |
| if isinstance(t, (list, tuple)) and len(t) >= 2: | |
| out.append({"role": "user", "content": t[0]}) | |
| out.append({"role": "assistant", "content": t[1]}) | |
| return out | |
| new_chat = _to_gradio(agent_chat_history) | |
| new_chat.append({"role": "user", "content": "(Review table submitted)"}) | |
| new_chat.append({"role": "assistant", "content": response}) | |
| new_df = pd.DataFrame(new_review_data) if new_review_data else review_df | |
| return new_chat, new_df, phase_html | |
| def get_download_files(): | |
| """Collect output files available for download.""" | |
| import os, glob | |
| files = glob.glob("outputs/*.csv") + glob.glob("outputs/*.json") + glob.glob("outputs/*.txt") | |
| return files if files else None | |
| with gr.Blocks(title="BERTopic Agentic Thematic Analysis") as demo: | |
| thread_id_state = gr.State("thread-001") | |
| uploaded_path_state = gr.State(None) | |
| gr.Markdown( | |
| "# π¬ BERTopic Agentic Thematic Analysis\n" | |
| "Upload your Scopus CSV and follow the agent through Braun & Clarke's 6 phases." | |
| ) | |
| phase_bar = gr.HTML( | |
| value=""" | |
| <div style='padding:10px;background:#f0f4ff;border-radius:8px;font-family:sans-serif'> | |
| <b>Phase Progress:</b> | |
| <span style='margin-left:12px'>β¬ P1</span> | |
| <span style='margin-left:8px'>β¬ P2</span> | |
| <span style='margin-left:8px'>β¬ P3</span> | |
| <span style='margin-left:8px'>β¬ P4</span> | |
| <span style='margin-left:8px'>β¬ P5</span> | |
| <span style='margin-left:8px'>β¬ P5.5</span> | |
| <span style='margin-left:8px'>β¬ P6</span> | |
| </div> | |
| """, | |
| label="Phase Tracker" | |
| ) | |
| with gr.Group(): | |
| gr.Markdown("## π Section 1: Upload Scopus CSV") | |
| csv_upload = gr.File( | |
| label="Upload Scopus CSV", | |
| file_types=[".csv"], | |
| type="filepath" | |
| ) | |
| upload_status = gr.Textbox(label="Upload Status", interactive=False) | |
| def handle_upload(filepath): | |
| if filepath is None: | |
| return "No file uploaded.", None | |
| return f"β File loaded: {filepath}", filepath | |
| csv_upload.change( | |
| fn=handle_upload, | |
| inputs=[csv_upload], | |
| outputs=[upload_status, uploaded_path_state] | |
| ) | |
| with gr.Group(): | |
| gr.Markdown("## π¬ Section 2: Agent Chat") | |
| gr.Markdown( | |
| "_Start with:_ **'Start Phase 1'** to begin familiarisation, " | |
| "then follow the agent's instructions phase by phase." | |
| ) | |
| chatbot = gr.Chatbot(height=420, label="Agent Conversation") | |
| with gr.Row(): | |
| user_input = gr.Textbox( | |
| placeholder="Type your message or command here...", | |
| label="Your Message", | |
| scale=5 | |
| ) | |
| send_btn = gr.Button("Send βΆ", variant="primary", scale=1) | |
| with gr.Group(): | |
| gr.Markdown("## π Section 3: Results") | |
| # Review Table | |
| gr.Markdown("### ποΈ Topic Review Table") | |
| gr.Markdown( | |
| "Edit the **Approve** (True/False), **Rename To**, and **Reasoning** columns, " | |
| "then click **Submit Review** to proceed." | |
| ) | |
| review_table = gr.Dataframe( | |
| headers=["#", "Topic Label", "Top Evidence", "Sentences", | |
| "Papers", "Approve", "Rename To", "Reasoning"], | |
| datatype=["number", "str", "str", "number", "number", "bool", "str", "str"], | |
| interactive=True, | |
| label="Review Table", | |
| wrap=True, | |
| row_count=(5, "dynamic"), | |
| column_count=(8, "fixed") | |
| ) | |
| submit_review_btn = gr.Button("β Submit Review", variant="secondary") | |
| gr.Markdown("### π Topic Charts") | |
| with gr.Row(): | |
| chart_selector = gr.Dropdown( | |
| choices=["Topic Distribution", "Similarity Heatmap", | |
| "Top Keywords per Topic", "Abstract vs Title Comparison"], | |
| label="Select Chart", | |
| value="Topic Distribution" | |
| ) | |
| chart_display = gr.HTML(label="Chart") | |
| def load_chart(chart_name): | |
| """Load pre-generated Plotly chart HTML from disk.""" | |
| import os | |
| import html as _html | |
| chart_map = { | |
| "Topic Distribution": "outputs/chart_distribution.html", | |
| "Similarity Heatmap": "outputs/chart_heatmap.html", | |
| "Top Keywords per Topic": "outputs/chart_keywords.html", | |
| "Abstract vs Title Comparison":"outputs/chart_comparison.html", | |
| } | |
| path = chart_map.get(chart_name, "") | |
| if os.path.exists(path): | |
| with open(path, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| # Embed the full HTML in an iframe via srcdoc so scripts execute | |
| # Escape attribute characters but preserve the document structure. | |
| srcdoc = _html.escape(content, quote=True) | |
| iframe = ( | |
| f"<iframe srcdoc=\"{srcdoc}\" style=\"border:0; width:100%; height:700px;\"></iframe>" | |
| ) | |
| return iframe | |
| return "<p style='color:grey'>Chart not yet generated. Complete the relevant phase first.</p>" | |
| chart_selector.change(fn=load_chart, inputs=[chart_selector], outputs=[chart_display]) | |
| gr.Markdown("### π₯ Download Outputs") | |
| download_btn = gr.Button("π Refresh Download List") | |
| download_files = gr.File(label="Available Output Files", file_count="multiple") | |
| download_btn.click(fn=get_download_files, inputs=[], outputs=[download_files]) | |
| send_btn.click( | |
| fn=send_message, | |
| inputs=[user_input, chatbot, uploaded_path_state, thread_id_state], | |
| outputs=[chatbot, user_input, review_table, phase_bar] | |
| ) | |
| user_input.submit( | |
| fn=send_message, | |
| inputs=[user_input, chatbot, uploaded_path_state, thread_id_state], | |
| outputs=[chatbot, user_input, review_table, phase_bar] | |
| ) | |
| submit_review_btn.click( | |
| fn=submit_review, | |
| inputs=[review_table, chatbot, uploaded_path_state, thread_id_state], | |
| outputs=[chatbot, review_table, phase_bar] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch( | |
| share=False, | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| theme=gr.themes.Soft(), | |
| ) | |