import json import re from typing import Any, Dict, List, Tuple, Optional, Union from datetime import datetime import gradio as gr import pandas as pd from pymongo import MongoClient from smolagents import ToolCallingAgent from second_brain_online.config import settings class CustomGradioUI: """Custom Gradio UI for better formatting of agent responses with source attribution.""" def __init__(self, agent: Union[ToolCallingAgent, Any]): """Initialize the UI with either a ToolCallingAgent or AgentWrapper. Args: agent: Either a raw ToolCallingAgent or an AgentWrapper that wraps it. """ self.agent = agent self.mongodb_client = None self.database = None self.conversation_collection = None self.setup_mongodb() self.setup_ui() def setup_mongodb(self): """Setup MongoDB connection.""" try: self.mongodb_client = MongoClient(settings.MONGODB_URI) self.database = self.mongodb_client[settings.MONGODB_DATABASE_NAME] self.conversation_collection = self.database["test_conversation_documents"] print("āœ… MongoDB connection established successfully") except Exception as e: print(f"āŒ Failed to connect to MongoDB: {e}") self.mongodb_client = None self.database = None self.conversation_collection = None def setup_ui(self): """Setup the Gradio interface with custom components.""" with gr.Blocks( title="Second Brain AI Assistant", theme=gr.themes.Soft(), css=""" .source-card { border: 1px solid #e0e0e0; border-radius: 8px; padding: 12px; margin: 8px 0; background-color: #f8f9fa; } .source-title { font-weight: bold; color: #2c3e50; margin-bottom: 4px; } .source-date { font-size: 0.9em; color: #6c757d; margin-bottom: 8px; } .answer-section { background-color: #ffffff; border: 1px solid #dee2e6; border-radius: 8px; padding: 16px; margin-bottom: 16px; } .tool-usage { background-color: #e3f2fd; border-left: 4px solid #2196f3; padding: 8px 12px; margin: 8px 0; border-radius: 4px; font-size: 0.9em; } """ ) as self.interface: gr.Markdown("# 🧠 Second Brain AI Assistant") gr.Markdown("Ask questions about your documents and get AI-powered insights with source attribution.") self.query_input = gr.Textbox( label="Ask a question", placeholder="What pricing objections were raised in the meetings?", lines=2 ) self.submit_btn = gr.Button("Ask", variant="primary", size="lg") with gr.Row(): with gr.Column(): self.answer_output = gr.HTML(label="Answer") with gr.Accordion("šŸ“Š Conversations", open=False): with gr.Row(): self.conversation_search = gr.Textbox( label="Search Conversations", placeholder="Search by conversation ID, customer info, summary, or key findings...", scale=4 ) self.clear_search_btn = gr.Button("Clear", scale=1) self.conversation_table = gr.Dataframe( headers=["Conversation ID", "Customer Info", "Summary", "Key Findings", "Follow-up Email"], datatype=["str", "str", "str", "str", "str"], interactive=False, label="Available Conversations", wrap=True, max_height=400, value=self.load_conversations() ) with gr.Accordion("šŸ“š Sources", open=False): self.sources_output = gr.HTML(label="Sources") with gr.Accordion("šŸ› ļø Tools Used", open=False): self.tools_output = gr.HTML(label="Tools Used") with gr.Accordion("šŸ” Debug: Raw Response", open=False): self.debug_output = gr.Textbox( label="Raw Agent Response", lines=10, max_lines=20, interactive=False ) # Event handlers self.submit_btn.click( fn=self.process_query, inputs=[self.query_input], outputs=[self.answer_output, self.sources_output, self.tools_output, self.debug_output, self.conversation_table], show_progress="full" # Show progress indicator ) self.query_input.submit( fn=self.process_query, inputs=[self.query_input], outputs=[self.answer_output, self.sources_output, self.tools_output, self.debug_output, self.conversation_table], show_progress="full" # Show progress indicator ) # Conversation search handlers self.conversation_search.change( fn=self.filter_conversations, inputs=[self.conversation_search], outputs=[self.conversation_table] ) self.clear_search_btn.click( fn=self.clear_conversation_search, inputs=[], outputs=[self.conversation_search, self.conversation_table] ) def process_query(self, query: str, progress=gr.Progress()) -> Tuple[str, str, str, str, pd.DataFrame]: """Process the user query and return formatted response components.""" if not query.strip(): # Clear all outputs when query is empty return "", "", "", "", self.load_conversations() try: # Show progress indicator with descriptive message progress(0, desc="šŸ” Starting query processing...") # Run the agent (this takes 30-60 seconds) # Use None for indeterminate progress during long operation progress(None, desc="šŸ” Searching knowledge base and retrieving documents...") result = self.agent.run(query) # Quick post-processing steps progress(0.8, desc="✨ Displaying results...") # CRITICAL DEBUG: Print what result actually is print("\n" + "="*80) print("DEBUG: WHAT IS RESULT?") print("="*80) print(f"Type: {type(result)}") print(f"Is string? {isinstance(result, str)}") print(f"Has šŸ“š Sources? {'šŸ“š Sources' in str(result) if result else False}") print(f"First 1500 chars of result:\n{str(result)[:1500]}") print("="*80) # Convert result to string result_str = str(result) # Debug information print("\n" + "="*80) print("DEBUG: RAW AGENT RESULT") print("="*80) print(f"Type: {type(result)}") print(f"Full Content:\n{result_str}") print("="*80) # Extract tools used from agent logs (for Tools Used section) agent_logs = getattr(self.agent, 'logs', []) if hasattr(self.agent, 'logs') else [] tools_used = [] if agent_logs: for step in agent_logs: if hasattr(step, 'tool_calls') and step.tool_calls: for tool_call in step.tool_calls: if hasattr(tool_call, 'name'): tools_used.append(tool_call.name) tools_used = list(set(tools_used)) # Remove duplicates # Format the raw answer with proper HTML structure (no parsing, just formatting) answer_html = self._format_raw_answer(result_str) # Leave Sources section empty (already in the answer) sources_html = "" # Format tools tools_html = self.format_tools(tools_used) # Debug text debug_text = result_str # Show all conversations (no filtering since we're not parsing sources) progress(0.95, desc="šŸ“Š Loading conversations...") all_conversations = self.load_conversations() progress(1.0, desc="āœ… Complete!") return answer_html, sources_html, tools_html, debug_text, all_conversations except Exception as e: error_msg = f"
Error: {str(e)}
" return error_msg, "", "", str(e), self.load_conversations() def _format_raw_answer(self, answer: str) -> str: """Format the raw answer with basic HTML structure without parsing. Just converts markdown-style formatting to HTML and preserves the structure. """ if not answer: return "

No answer provided.

" # Convert markdown bold to HTML bold answer = re.sub(r'\*\*(.+?)\*\*', r'\1', answer) # Convert line breaks to HTML answer = answer.replace('\n', '
') # Clean up multiple line breaks answer = re.sub(r'(
){3,}', '

', answer) return f"""
{answer}
""" def _parse_sources_from_text(self, sources_text: str) -> List[Dict]: """Parse sources from the formatted text output. Expected format: Doc 1: Title (Date) Source: Type | Document ID: ID | URL | User ID Summary: ... Key Findings: - [Type/Impact] Finding """ sources = [] # Split by "Doc X:" pattern doc_pattern = r'Doc\s+(\d+):\s*([^\n]+)' doc_matches = re.finditer(doc_pattern, sources_text) for match in doc_matches: doc_num = match.group(1) title_line = match.group(2).strip() # Find the next Doc or end of string start_pos = match.end() next_match = re.search(r'Doc\s+\d+:', sources_text[start_pos:]) if next_match: end_pos = start_pos + next_match.start() doc_content = sources_text[start_pos:end_pos] else: doc_content = sources_text[start_pos:] # Extract title and date from title line title_date_match = re.match(r'(.+?)\s*\(([^)]+)\)', title_line) if title_date_match: title = title_date_match.group(1).strip() date = title_date_match.group(2).strip() else: title = title_line date = "" # Extract document ID doc_id = "" id_match = re.search(r'Document ID:\s*([a-zA-Z0-9]+)', doc_content) if id_match: doc_id = id_match.group(1) # Extract summary summary = "" summary_match = re.search(r'Summary:\s*([^\n]+)', doc_content) if summary_match: summary = summary_match.group(1).strip() # Extract key findings key_findings = [] findings_section = re.search(r'Key Findings:\s*(.+?)(?=\n\nDoc\s+\d+:|$)', doc_content, re.DOTALL) if findings_section: findings_text = findings_section.group(1) # Extract each finding line finding_lines = re.findall(r'-\s*\[([^\]]+)\]\s*([^\n]+)', findings_text) for finding_type, finding_text in finding_lines: key_findings.append(f"[{finding_type}] {finding_text.strip()}") sources.append({ "id": doc_id, "title": title, "date": date, "summary": summary, "key_findings": key_findings, "quotes": [] # Not using quotes in new format }) return sources def parse_agent_response(self, result: Any, agent_logs: List = None) -> Tuple[str, List[Dict], List[str]]: """Parse the agent response to extract answer, sources, and tools used.""" answer = "" sources = [] tools_used = [] # Convert result to string if it's not already result_str = str(result) # Extract the answer from the result # Pattern 1: JSON format with "answer" key json_match = re.search(r'{"answer":\s*"([^"]+)"}', result_str) if json_match: answer = json_match.group(1) # Unescape the JSON string answer = answer.replace('\\n', '\n').replace('\\"', '"') else: # Pattern 2: Look for "Final answer:" followed by content final_answer_match = re.search(r'Final answer:\s*(.+?)(?=\n\n|\Z)', result_str, re.DOTALL) if final_answer_match: answer = final_answer_match.group(1).strip() # Try to extract JSON from final answer json_in_final = re.search(r'{"answer":\s*"([^"]+)"}', answer) if json_in_final: answer = json_in_final.group(1).replace('\\n', '\n').replace('\\"', '"') else: # Pattern 3: Use the entire result as answer if no specific pattern matches answer = result_str # NEW: Split answer and sources section # Look for the Sources section marker (šŸ“š Sources:) sources_split = re.split(r'šŸ“š\s*Sources:?', answer, maxsplit=1, flags=re.IGNORECASE) if len(sources_split) == 2: # We found a Sources section answer_only = sources_split[0].strip() sources_text = sources_split[1].strip() # Parse sources from the text sources = self._parse_sources_from_text(sources_text) # Update answer to only include the answer part answer = answer_only else: # No sources section found, answer remains as-is pass # If we have agent logs, extract tools and sources from them if agent_logs: for step in agent_logs: # Extract tool calls if hasattr(step, 'tool_calls') and step.tool_calls: for tool_call in step.tool_calls: if hasattr(tool_call, 'name'): tools_used.append(tool_call.name) # Extract sources from observations if hasattr(step, 'observations') and step.observations: print(f"DEBUG: Processing observations: {step.observations[:500]}...") # Look for complete document blocks with all content document_pattern = r'\s*(.*?)\s*(.*?)\s*(.*?)\s*(.*?)\s*(.*?)' document_matches = re.findall(document_pattern, step.observations, re.DOTALL) print(f"DEBUG: Found {len(document_matches)} document matches with full pattern") for doc_id, doc_title, doc_date, contextual_summary, marketing_insights, content in document_matches: # Clean up the basic fields clean_title = doc_title.strip() clean_date = doc_date.strip() clean_summary = contextual_summary.strip() # Extract key findings from marketing insights key_findings = [] key_findings_pattern = r'(.*?)' key_findings_match = re.search(key_findings_pattern, marketing_insights, re.DOTALL) if key_findings_match: key_findings_text = key_findings_match.group(1).strip() # Split by lines and clean up key_findings = [line.strip() for line in key_findings_text.split('\n') if line.strip() and line.strip().startswith('-')] # Extract quotes from marketing insights quotes = [] quotes_pattern = r'(.*?)' quotes_match = re.search(quotes_pattern, marketing_insights, re.DOTALL) if quotes_match: quotes_text = quotes_match.group(1).strip() # Split by lines and clean up quotes = [line.strip() for line in quotes_text.split('\n') if line.strip() and line.strip().startswith('-')] sources.append({ "id": doc_id, "title": clean_title, "date": clean_date, "summary": clean_summary, "key_findings": key_findings, "quotes": quotes }) # Fallback: Look for simpler document patterns if the full pattern didn't match if not document_matches: print("DEBUG: Trying fallback document patterns...") # Pattern 1: Simple document with ID and title simple_pattern = r'\s*(.*?)' simple_matches = re.findall(simple_pattern, step.observations, re.DOTALL) print(f"DEBUG: Found {len(simple_matches)} simple document matches") for doc_id, doc_title in simple_matches: sources.append({ "id": doc_id, "title": doc_title.strip(), "date": "", "summary": "", "key_findings": [], "quotes": [] }) # Pattern 2: Look for conversation IDs in the content conv_id_pattern = r'conversation[_\s]*id[:\s]*(\d+)' conv_id_matches = re.findall(conv_id_pattern, step.observations, re.IGNORECASE) print(f"DEBUG: Found {len(conv_id_matches)} conversation ID matches: {conv_id_matches}") for conv_id in conv_id_matches: sources.append({ "id": conv_id, "title": f"Conversation {conv_id}", "date": "", "summary": "", "key_findings": [], "quotes": [] }) # Fallback: Try to extract from result string if no logs provided if not agent_logs: # Extract tool usage from the result first # Pattern 1: šŸ› ļø Used tool toolname tool_pattern1 = r'šŸ› ļø Used tool (\w+)' tool_matches1 = re.findall(tool_pattern1, result_str) # Pattern 2: Calling tool: 'toolname' (with single quotes) tool_pattern2 = r"Calling tool:\s*'([^']+)'" tool_matches2 = re.findall(tool_pattern2, result_str) # Pattern 3: Calling tool: 'toolname' (with double quotes) tool_pattern3 = r'Calling tool:\s*"([^"]+)"' tool_matches3 = re.findall(tool_pattern3, result_str) # Pattern 4: Calling tool: toolname (without quotes) tool_pattern4 = r'Calling tool:\s*([a-zA-Z_][a-zA-Z0-9_]*)' tool_matches4 = re.findall(tool_pattern4, result_str) # Combine all patterns all_tool_matches = tool_matches1 + tool_matches2 + tool_matches3 + tool_matches4 tools_used = list(set(all_tool_matches)) # Remove duplicates # Extract sources from the structured search_results format # Look for tags in the search results document_pattern = r'\s*(.*?)\s*(.*?)' document_matches = re.findall(document_pattern, result_str, re.DOTALL) for doc_id, doc_title, doc_date in document_matches: # Clean up the title and date clean_title = doc_title.strip() clean_date = doc_date.strip() sources.append({ "id": doc_id, "title": clean_title, "date": clean_date }) # Remove duplicates based on document ID (keep all unique documents) unique_sources = [] seen = set() for source in sources: # Use document ID as the unique key, fallback to title+date if no ID key = source.get("id", f"{source['title']}_{source['date']}") if key not in seen: seen.add(key) unique_sources.append(source) # Remove duplicate tools tools_used = list(set(tools_used)) return answer, unique_sources, tools_used def format_answer(self, answer: str) -> str: """Format the answer with proper HTML structure.""" if not answer: return "

No answer provided.

" # Check if the answer is a JSON string and extract the actual answer if answer.strip().startswith('{"answer":') and answer.strip().endswith('}'): try: import json answer_data = json.loads(answer) if isinstance(answer_data, dict) and 'answer' in answer_data: answer = answer_data['answer'] except (json.JSONDecodeError, KeyError): # If JSON parsing fails, use the original answer pass # Remove source references from the answer text for cleaner display answer = re.sub(r'\(Document:[^)]+\)', '', answer) # Clean up extra whitespace but preserve intentional line breaks answer = re.sub(r'[ \t]+', ' ', answer) # Replace multiple spaces/tabs with single space answer = re.sub(r' *\n *', '\n', answer) # Clean up spaces around newlines # Format numbered lists and bullet points answer = re.sub(r'\n\s*\d+\.\s*', '\n\n', answer) # Numbered lists answer = re.sub(r'\n\s*•\s*', '\n• ', answer) # Bullet points answer = re.sub(r'\n\s*-\s*', '\n• ', answer) # Dash points # Format bold text (markdown style) answer = re.sub(r'\*\*(.*?)\*\*', r'\1', answer) # Convert line breaks to HTML answer = answer.replace('\n', '
') # Clean up multiple line breaks answer = re.sub(r'(
){3,}', '

', answer) return f"""

šŸ“ Answer

{answer}
""" def format_sources(self, sources: List[Dict]) -> str: """Format the sources with rich information including key findings and marketing insights.""" if not sources: return "

No sources found.

" sources_html = "
" for i, source in enumerate(sources, 1): title = source.get("title", "Unknown") date = source.get("date", "Unknown") doc_id = source.get("id", "") summary = source.get("summary", "") key_findings = source.get("key_findings", []) quotes = source.get("quotes", []) sources_html += f"""
{i}. {title}
šŸ“… {date} {f" | ID: {doc_id}" if doc_id else ""}
""" if summary: sources_html += f"""
Summary: {summary}
""" if key_findings: sources_html += """
Key Findings:
    """ for finding in key_findings: clean_finding = finding.lstrip('- ').strip() sources_html += f"
  • {clean_finding}
  • " sources_html += "
" if quotes: sources_html += """
Key Quotes:
    """ for quote in quotes: clean_quote = quote.lstrip('- ').strip() sources_html += f"
  • {clean_quote}
  • " sources_html += "
" sources_html += "
" sources_html += "
" return sources_html def format_tools(self, tools_used: List[str]) -> str: """Format the tools used with proper HTML structure.""" if not tools_used: return "

No tools used.

" tools_html = "
" for tool in tools_used: tools_html += f"""
šŸ”§ {tool.replace('_', ' ').title()}
""" tools_html += "
" return tools_html def load_conversations(self, limit: int = 50) -> pd.DataFrame: """Load conversations from MongoDB and format for display.""" if self.conversation_collection is None: return pd.DataFrame(columns=["Conversation ID", "Customer Info", "Summary", "Key Findings", "Follow-up Email"]) try: # Query for documents with conversation_analysis pipeline = [ {"$match": {"conversation_analysis": {"$exists": True}}}, {"$limit": limit}, {"$project": { "conversation_id": "$metadata.properties.conversation_id", "user_id": "$metadata.properties.user_id", "icp_region": "$metadata.properties.icp_region", "icp_country": "$metadata.properties.icp_country", "team_size": "$metadata.properties.team_size", "summary": "$conversation_analysis.aggregated_contextual_summary", "key_findings": "$conversation_analysis.aggregated_marketing_insights.key_findings", "follow_up_email": "$conversation_analysis.follow_up_email" }} ] docs = list(self.conversation_collection.aggregate(pipeline)) data = [] for doc in docs: conversation_id = doc.get("conversation_id", "Unknown") user_id = doc.get("user_id", "N/A") icp_region = doc.get("icp_region", "N/A") icp_country = doc.get("icp_country", "N/A") team_size = doc.get("team_size", "N/A") summary = doc.get("summary", "No summary available") follow_up_email = doc.get("follow_up_email", "No follow-up email available") # Format customer info into a single column customer_info_parts = [] if user_id != "N/A": customer_info_parts.append(f"User: {user_id}") if icp_region != "N/A": customer_info_parts.append(f"Region: {icp_region}") if icp_country != "N/A": customer_info_parts.append(f"Country: {icp_country}") if team_size != "N/A": customer_info_parts.append(f"Team Size: {team_size}") customer_info = "\n".join(customer_info_parts) if customer_info_parts else "No customer info available" # Format key findings key_findings = doc.get("key_findings", []) if key_findings and isinstance(key_findings, list): findings_text = "\n".join([f"• {finding.get('finding', '')}" for finding in key_findings[:3]]) # Limit to 3 findings if len(key_findings) > 3: findings_text += f"\n... and {len(key_findings) - 3} more" else: findings_text = "No key findings available" # Truncate summary for table display if len(summary) > 200: summary = summary[:200] + "..." # Truncate follow-up email for table display if len(follow_up_email) > 150: follow_up_email = follow_up_email[:150] + "..." data.append({ "Conversation ID": conversation_id, "Customer Info": customer_info, "Summary": summary, "Key Findings": findings_text, "Follow-up Email": follow_up_email }) return pd.DataFrame(data) except Exception as e: print(f"Error loading conversations: {e}") return pd.DataFrame(columns=["Conversation ID", "Customer Info", "Summary", "Key Findings", "Follow-up Email"]) def filter_conversations_by_sources(self, sources: List[Dict]) -> pd.DataFrame: """Filter conversations to show only those used in the current query.""" if not sources or self.conversation_collection is None: return self.load_conversations() try: # Extract conversation IDs from sources source_conversation_ids = set() print(f"DEBUG: Filtering conversations based on {len(sources)} sources") for source in sources: print(f"DEBUG: Processing source: {source}") # Try to extract conversation ID from various possible fields doc_id = source.get("id", "") title = source.get("title", "") # Method 1: Try to extract conversation ID from title (if it contains conversation ID) if title and "conversation" in title.lower(): # Look for conversation ID pattern in title import re conv_id_match = re.search(r'conversation[_\s]*(\d+)', title, re.IGNORECASE) if conv_id_match: conv_id = conv_id_match.group(1) source_conversation_ids.add(conv_id) print(f"DEBUG: Found conversation ID from title: {conv_id}") continue # Method 2: Query the RAG collection to find the conversation ID for this document if doc_id: try: # Use the correct collection name for RAG data rag_collection = self.database["rag_conversations"] # Try different query patterns doc = None # Try by _id if it's a valid ObjectId if doc_id.isdigit(): doc = rag_collection.find_one({"_id": int(doc_id)}) if not doc: # Try by properties.conversation_id doc = rag_collection.find_one({"properties.conversation_id": doc_id}) if not doc: # Try by conversation_id in properties doc = rag_collection.find_one({"properties.conversation_id": str(doc_id)}) if doc and "properties" in doc and "conversation_id" in doc["properties"]: conv_id = doc["properties"]["conversation_id"] if conv_id: source_conversation_ids.add(str(conv_id)) print(f"DEBUG: Found conversation ID from RAG query: {conv_id}") else: print(f"DEBUG: No conversation ID found for doc_id: {doc_id}") except Exception as e: print(f"DEBUG: Error querying RAG collection for doc_id {doc_id}: {e}") print(f"DEBUG: Found {len(source_conversation_ids)} unique conversation IDs: {source_conversation_ids}") if not source_conversation_ids: print("DEBUG: No conversation IDs found, returning all conversations") return self.load_conversations() # Query for conversations that match the source conversation IDs pipeline = [ {"$match": { "conversation_analysis": {"$exists": True}, "metadata.properties.conversation_id": {"$in": list(source_conversation_ids)} }}, {"$project": { "conversation_id": "$metadata.properties.conversation_id", "user_id": "$metadata.properties.user_id", "icp_region": "$metadata.properties.icp_region", "icp_country": "$metadata.properties.icp_country", "team_size": "$metadata.properties.team_size", "summary": "$conversation_analysis.aggregated_contextual_summary", "key_findings": "$conversation_analysis.aggregated_marketing_insights.key_findings", "follow_up_email": "$conversation_analysis.follow_up_email" }} ] docs = list(self.conversation_collection.aggregate(pipeline)) print(f"DEBUG: Found {len(docs)} matching conversation documents") data = [] for doc in docs: conversation_id = doc.get("conversation_id", "Unknown") user_id = doc.get("user_id", "N/A") icp_region = doc.get("icp_region", "N/A") icp_country = doc.get("icp_country", "N/A") team_size = doc.get("team_size", "N/A") summary = doc.get("summary", "No summary available") follow_up_email = doc.get("follow_up_email", "No follow-up email available") # Format customer info into a single column customer_info_parts = [] if user_id != "N/A": customer_info_parts.append(f"User: {user_id}") if icp_region != "N/A": customer_info_parts.append(f"Region: {icp_region}") if icp_country != "N/A": customer_info_parts.append(f"Country: {icp_country}") if team_size != "N/A": customer_info_parts.append(f"Team Size: {team_size}") customer_info = "\n".join(customer_info_parts) if customer_info_parts else "No customer info available" # Format key findings key_findings = doc.get("key_findings", []) if key_findings and isinstance(key_findings, list): findings_text = "\n".join([f"• {finding.get('finding', '')}" for finding in key_findings[:3]]) if len(key_findings) > 3: findings_text += f"\n... and {len(key_findings) - 3} more" else: findings_text = "No key findings available" # Truncate summary for table display if len(summary) > 200: summary = summary[:200] + "..." # Truncate follow-up email for table display if len(follow_up_email) > 150: follow_up_email = follow_up_email[:150] + "..." data.append({ "Conversation ID": conversation_id, "Customer Info": customer_info, "Summary": summary, "Key Findings": findings_text, "Follow-up Email": follow_up_email }) print(f"DEBUG: Returning {len(data)} filtered conversations") return pd.DataFrame(data) except Exception as e: print(f"Error filtering conversations: {e}") import traceback traceback.print_exc() return self.load_conversations() def filter_conversations(self, search_term: str) -> pd.DataFrame: """Filter conversations based on search term.""" if not search_term or not search_term.strip(): return self.load_conversations() try: # Load all conversations first all_conversations = self.load_conversations(limit=1000) # Load more for filtering if all_conversations.empty: return all_conversations # Convert search term to lowercase for case-insensitive search search_lower = search_term.lower().strip() # Filter conversations based on search term filtered_data = [] for _, row in all_conversations.iterrows(): # Search in conversation ID, customer info, summary, key findings, and follow-up email conversation_id = str(row.get("Conversation ID", "")).lower() customer_info = str(row.get("Customer Info", "")).lower() summary = str(row.get("Summary", "")).lower() key_findings = str(row.get("Key Findings", "")).lower() follow_up_email = str(row.get("Follow-up Email", "")).lower() # Check if search term matches any field if (search_lower in conversation_id or search_lower in customer_info or search_lower in summary or search_lower in key_findings or search_lower in follow_up_email): filtered_data.append(row.to_dict()) return pd.DataFrame(filtered_data) except Exception as e: print(f"Error filtering conversations: {e}") return self.load_conversations() def clear_conversation_search(self) -> Tuple[str, pd.DataFrame]: """Clear the search and show all conversations.""" return "", self.load_conversations() def reset_ui_state(self) -> Tuple[str, str, str, str, pd.DataFrame]: """Reset the UI state to show all conversations and clear outputs.""" return "", "", "", "", self.load_conversations() def launch(self, **kwargs): """Launch the Gradio interface.""" return self.interface.launch(**kwargs)