import json import re from typing import Any, Dict, List, Tuple, Optional from datetime import datetime from pathlib import Path import gradio as gr import pandas as pd from second_brain_online.config import settings class CustomerProfileUI: """Gradio UI for displaying customer profile analyses with search functionality.""" def __init__(self, data_dir: str = None): # Default to the offline data directory if data_dir is None: # Go up from second-brain-online to second-brain-offline/data/customer_analyses data_dir = Path(__file__).parent.parent.parent.parent.parent.parent / "second-brain-offline" / "data" / "customer_analyses" self.data_dir = Path(data_dir) self.analyses_cache = None self.load_analyses_from_disk() self.setup_ui() def load_analyses_from_disk(self): """Load all customer analyses from JSON files on disk.""" try: # Check if the all-in-one file exists all_file = self.data_dir / "customer_analyses_all.json" if all_file.exists(): print(f"📂 Loading analyses from: {all_file}") with open(all_file, 'r') as f: self.analyses_cache = json.load(f) print(f"✅ Loaded {len(self.analyses_cache)} analyses from disk") else: # Load individual files print(f"📂 Loading analyses from directory: {self.data_dir}") self.analyses_cache = [] for json_file in self.data_dir.glob("customer_analysis_*.json"): try: with open(json_file, 'r') as f: analysis = json.load(f) self.analyses_cache.append(analysis) except Exception as e: print(f"âš ī¸ Failed to load {json_file}: {e}") print(f"✅ Loaded {len(self.analyses_cache)} analyses from {len(list(self.data_dir.glob('customer_analysis_*.json')))} files") except Exception as e: print(f"❌ Failed to load analyses from disk: {e}") self.analyses_cache = [] def format_text_for_table(self, text_list, max_items=3): """Format a list of text items for better table display with proper bullet point spacing.""" if not text_list: return "No items" if isinstance(text_list, str): return text_list if len(text_list) == 0: return "No items" elif len(text_list) == 1: return f"â€ĸ {text_list[0]}" else: # Format bullet points with proper spacing formatted_items = [] items_to_show = min(len(text_list), max_items) for i in range(items_to_show): formatted_items.append(f"â€ĸ {text_list[i]}") # Add indicator for remaining items if any if len(text_list) > max_items: remaining = len(text_list) - max_items formatted_items.append(f"â€ĸ ... and {remaining} more") return "\n".join(formatted_items) def setup_ui(self): """Setup the Gradio interface for customer profile analyses.""" with gr.Blocks( title="Customer Profile Analysis Dashboard", theme=gr.themes.Soft(), css=""" .customer-card { border: 1px solid #e0e0e0; border-radius: 8px; padding: 16px; margin: 8px 0; background-color: #f8f9fa; } .customer-title { font-weight: bold; color: #2c3e50; margin-bottom: 8px; font-size: 1.1em; } .customer-meta { font-size: 0.9em; color: #6c757d; margin-bottom: 12px; } .key-changes { background-color: #fff3cd; border-left: 4px solid #ffc107; padding: 8px 12px; margin: 8px 0; border-radius: 4px; font-size: 0.9em; } .recommendations { background-color: #d1ecf1; border-left: 4px solid #17a2b8; padding: 8px 12px; margin: 8px 0; border-radius: 4px; font-size: 0.9em; } .email-strategy { background-color: #d4edda; border-left: 4px solid #28a745; padding: 8px 12px; margin: 8px 0; border-radius: 4px; font-size: 0.9em; } .follow-up-email { background-color: #e2e3e5; border-left: 4px solid #6c757d; padding: 8px 12px; margin: 8px 0; border-radius: 4px; font-size: 0.9em; } .search-highlight { background-color: #fff3cd; padding: 2px 4px; border-radius: 3px; } .dataframe { font-size: 0.9em; line-height: 1.4; } .dataframe td { padding: 8px 6px; vertical-align: top; word-wrap: break-word; white-space: pre-wrap; } .dataframe th { padding: 8px 6px; font-weight: bold; background-color: #f8f9fa; } """ ) as self.interface: gr.Markdown("# 📊 Customer Profile Analysis Dashboard") gr.Markdown("View and search through customer profile analyses with AI-generated insights and follow-up emails.") # Statistics section at the top with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 📈 Dashboard Statistics") self.stats_view = gr.HTML( value=self.generate_statistics(), label="Analysis Statistics", show_label=False ) # Filter buttons with gr.Row(): gr.Markdown("### 🔍 Quick Filters") with gr.Row(): self.strategy_dropdown = gr.Dropdown( choices=self.get_strategy_choices(), label="Email Strategy", value=None, multiselect=False, scale=2 ) self.priority_dropdown = gr.Dropdown( choices=self.get_priority_choices(), label="Priority Level", value=None, multiselect=False, scale=2 ) self.clear_filters_btn = gr.Button("🔄 Clear All Filters", scale=1, variant="secondary") gr.Markdown("---") # Separator line # Search functionality with gr.Row(): self.search_input = gr.Textbox( label="Search Customer Analyses", placeholder="Search by company name, customer ID, key changes, recommendations, or email content...", scale=4 ) self.clear_search_btn = gr.Button("Clear Search", scale=1) self.refresh_btn = gr.Button("Refresh Data", scale=1, variant="secondary") # Main data table self.customer_table = gr.Dataframe( headers=["Customer ID", "Company Name", "Analysis Date", "Key Changes", "Recommendations", "Email Strategy"], datatype=["str", "str", "str", "str", "str", "str"], interactive=False, label="Customer Profile Analyses", wrap=True, max_height=600, column_widths=["10%", "20%", "12%", "25%", "25%", "8%"], value=self.load_customer_analyses() ) # Detailed view accordion with gr.Accordion("📋 Detailed Analysis View", open=False): self.detailed_view = gr.HTML( value="Select a row from the table above to view detailed analysis", label="Detailed Analysis" ) # Event handlers self.search_input.change( fn=self.filter_customer_analyses, inputs=[self.search_input], outputs=[self.customer_table] ) self.clear_search_btn.click( fn=self.clear_search, inputs=[], outputs=[self.search_input, self.customer_table] ) self.refresh_btn.click( fn=self.refresh_data, inputs=[], outputs=[self.customer_table, self.stats_view] ) self.strategy_dropdown.change( fn=self.filter_by_strategy, inputs=[self.strategy_dropdown], outputs=[self.customer_table] ) self.priority_dropdown.change( fn=self.filter_by_priority, inputs=[self.priority_dropdown], outputs=[self.customer_table] ) self.clear_filters_btn.click( fn=self.clear_filters, inputs=[], outputs=[self.strategy_dropdown, self.priority_dropdown, self.customer_table] ) self.customer_table.select( fn=self.show_detailed_analysis, inputs=[self.customer_table], outputs=[self.detailed_view] ) def get_strategy_choices(self): """Get unique email strategy choices for dropdown.""" if not self.analyses_cache: return [] strategies = set() for doc in self.analyses_cache: strategy = doc.get("email_strategy", {}).get("email_type", "Unknown") strategies.add(strategy) return sorted(list(strategies)) def get_priority_choices(self): """Get unique priority choices for dropdown.""" if not self.analyses_cache: return [] priorities = set() for doc in self.analyses_cache: priority = doc.get("email_strategy", {}).get("priority", "Unknown") priorities.add(priority) return sorted(list(priorities)) def filter_by_strategy(self, strategy): """Filter analyses by email strategy.""" if not strategy: return self.load_customer_analyses() if not self.analyses_cache: return pd.DataFrame(columns=["Customer ID", "Company Name", "Analysis Date", "Key Changes", "Recommendations", "Email Strategy"]) try: filtered_docs = [] for doc in self.analyses_cache: doc_strategy = doc.get("email_strategy", {}).get("email_type", "Unknown") if doc_strategy == strategy: filtered_docs.append(doc) return self.format_analyses_for_table(filtered_docs) except Exception as e: print(f"❌ Error filtering by strategy: {e}") return self.load_customer_analyses() def filter_by_priority(self, priority): """Filter analyses by priority level.""" if not priority: return self.load_customer_analyses() if not self.analyses_cache: return pd.DataFrame(columns=["Customer ID", "Company Name", "Analysis Date", "Key Changes", "Recommendations", "Email Strategy"]) try: filtered_docs = [] for doc in self.analyses_cache: doc_priority = doc.get("email_strategy", {}).get("priority", "Unknown") if doc_priority == priority: filtered_docs.append(doc) return self.format_analyses_for_table(filtered_docs) except Exception as e: print(f"❌ Error filtering by priority: {e}") return self.load_customer_analyses() def clear_filters(self): """Clear all filters and reload data.""" return None, None, self.load_customer_analyses() def format_analyses_for_table(self, docs): """Format a list of documents for table display.""" data = [] for doc in docs: customer_id = str(doc.get("customer_id", "Unknown")) company_name = doc.get("company_name", "Unknown Company") analysis_date = doc.get("analysis_date", "Unknown Date") # Format key changes with proper bullet points key_changes = doc.get("key_changes", []) key_changes_text = self.format_text_for_table(key_changes, max_items=3) # Format recommendations with proper bullet points recommendations = doc.get("recommendations", []) recommendations_text = self.format_text_for_table(recommendations, max_items=3) # Format email strategy email_strategy = doc.get("email_strategy", {}) if isinstance(email_strategy, dict): strategy_type = email_strategy.get("email_type", "Unknown") priority = email_strategy.get("priority", "Unknown") email_strategy_text = f"{strategy_type.replace('_', ' ').title()}\n({priority.title()})" else: email_strategy_text = str(email_strategy)[:60] + "..." if len(str(email_strategy)) > 60 else str(email_strategy) data.append([ customer_id, company_name, analysis_date, key_changes_text, recommendations_text, email_strategy_text ]) return pd.DataFrame(data, columns=["Customer ID", "Company Name", "Analysis Date", "Key Changes", "Recommendations", "Email Strategy"]) def load_customer_analyses(self, limit: int = 100) -> pd.DataFrame: """Load customer analyses from disk and format for display.""" if not self.analyses_cache: return pd.DataFrame(columns=["Customer ID", "Company Name", "Analysis Date", "Key Changes", "Recommendations", "Email Strategy"]) try: return self.format_analyses_for_table(self.analyses_cache[:limit]) except Exception as e: print(f"❌ Error loading customer analyses: {e}") return pd.DataFrame(columns=["Customer ID", "Company Name", "Analysis Date", "Key Changes", "Recommendations", "Email Strategy"]) def filter_customer_analyses(self, search_term: str) -> pd.DataFrame: """Filter customer analyses based on search term.""" if not search_term.strip(): return self.load_customer_analyses() if not self.analyses_cache: return pd.DataFrame(columns=["Customer ID", "Company Name", "Analysis Date", "Key Changes", "Recommendations", "Email Strategy"]) try: # Filter analyses based on search term (case-insensitive) search_lower = search_term.lower() filtered_docs = [] for doc in self.analyses_cache: # Search in various fields if (search_lower in str(doc.get("customer_id", "")).lower() or search_lower in doc.get("company_name", "").lower() or any(search_lower in change.lower() for change in doc.get("key_changes", [])) or any(search_lower in rec.lower() for rec in doc.get("recommendations", [])) or search_lower in str(doc.get("email_strategy", {}).get("key_messaging", "")).lower() or search_lower in str(doc.get("follow_up_email", {}).get("subject", "")).lower() or search_lower in str(doc.get("follow_up_email", {}).get("body", "")).lower()): filtered_docs.append(doc) return self.format_analyses_for_table(filtered_docs[:100]) # Limit to 100 results except Exception as e: print(f"❌ Error filtering customer analyses: {e}") return self.load_customer_analyses() def clear_search(self): """Clear search input and reload all data.""" return "", self.load_customer_analyses() def refresh_data(self): """Refresh the data from disk.""" self.load_analyses_from_disk() return self.load_customer_analyses(), self.generate_statistics() def show_detailed_analysis(self, table_data, evt: gr.SelectData): """Show detailed analysis for selected row.""" try: if evt.index[0] >= len(table_data): return "Please select a valid row from the table." # Get the row data using iloc for proper pandas indexing row_data = table_data.iloc[evt.index[0]] customer_id = str(row_data.iloc[0]) # Customer ID is the first column # Find document in cache by customer_id if not self.analyses_cache: return "No analyses loaded from disk." doc = None for analysis in self.analyses_cache: if str(analysis.get("customer_id")) == customer_id: doc = analysis break if not doc: return f"No detailed data found for customer {customer_id}" # Format detailed analysis html = self.format_detailed_analysis(doc) return html except Exception as e: return f"Error loading detailed analysis: {str(e)}" def format_detailed_analysis(self, doc: dict) -> str: """Format detailed analysis as HTML.""" customer_id = doc.get("customer_id", "Unknown") company_name = doc.get("company_name", "Unknown Company") analysis_date = doc.get("analysis_date", "Unknown Date") # Format key changes key_changes = doc.get("key_changes", []) key_changes_html = "" if isinstance(key_changes, list): for i, change in enumerate(key_changes, 1): key_changes_html += f"
  • {change}
  • " else: key_changes_html = f"
  • {key_changes}
  • " # Format recommendations recommendations = doc.get("recommendations", []) recommendations_html = "" if isinstance(recommendations, list): for i, rec in enumerate(recommendations, 1): recommendations_html += f"
  • {rec}
  • " else: recommendations_html = f"
  • {recommendations}
  • " # Format email strategy email_strategy = doc.get("email_strategy", {}) strategy_html = "" if isinstance(email_strategy, dict): strategy_type = email_strategy.get("email_type", "Unknown") priority = email_strategy.get("priority", "Unknown") key_messaging = email_strategy.get("key_messaging", "No messaging provided") call_to_action = email_strategy.get("call_to_action", "No call to action") strategy_html = f"""
    Type: {strategy_type.title()}
    Priority: {priority.title()}
    Key Messaging: {key_messaging}
    Call to Action: {call_to_action}
    """ else: strategy_html = f"
    {email_strategy}
    " # Format conversation insights conversation_insights = doc.get("conversation_insights", []) insights_html = "" if conversation_insights: insights_html = """

    đŸ’Ŧ Recent Conversation Insights

    """ for i, conv in enumerate(conversation_insights[:5], 1): # Show up to 5 conversations title = conv.get('title', 'Unknown') source = conv.get('source', 'Unknown') datetime_str = conv.get('datetime', 'Unknown') summary = conv.get('summary', 'No summary available') key_findings = conv.get('key_findings', []) insights_html += f"""
    {i}. {title}
    {source} | {datetime_str}
    Summary: {summary}
    """ if key_findings: insights_html += '
    Key Findings:
      ' for finding in key_findings[:3]: # Show top 3 findings finding_text = finding.get('finding', '') impact = finding.get('impact', '') insight_type = finding.get('insight_type', '') # Color code by impact impact_color = "#dc2626" if impact.lower() == "high" else "#f59e0b" if impact.lower() == "medium" else "#10b981" insights_html += f"""
    • {insight_type}/{impact} {finding_text}
    • """ insights_html += '
    ' insights_html += '
    ' insights_html += '
    ' else: insights_html = """
    â„šī¸ No conversation insights available for this customer
    """ # Format follow-up email follow_up_email = doc.get("follow_up_email", {}) email_html = "" if isinstance(follow_up_email, dict): subject = follow_up_email.get("subject", "No Subject") body = follow_up_email.get("body", "No body content") call_to_action = follow_up_email.get("call_to_action", "No call to action") priority = follow_up_email.get("priority", "Unknown") # Color code priority priority_color = "#dc2626" if priority.lower() == "high" else "#f59e0b" if priority.lower() == "medium" else "#10b981" email_html = f"""
    Subject: {subject}
    Priority: {priority.upper()}
    Body:
    {body}

    Call to Action: {call_to_action}
    """ else: email_html = f"
    {follow_up_email}
    " html = f"""
    {company_name} (ID: {customer_id})
    Analysis Date: {analysis_date}

    🔍 Key Changes

    💡 Recommendations

    đŸ’Ŧ Conversation Insights

    {insights_html}

    📧 Email Strategy

    {strategy_html}

    📨 Follow-up Email

    {email_html}
    """ return html def generate_statistics(self) -> str: """Generate statistics about the customer analyses.""" if not self.analyses_cache: return "No analyses loaded from disk." try: # Get total count total_count = len(self.analyses_cache) # Get email strategy distribution strategy_counts = {} for doc in self.analyses_cache: strategy_type = doc.get("email_strategy", {}).get("email_type", "Unknown") strategy_counts[strategy_type] = strategy_counts.get(strategy_type, 0) + 1 strategy_stats = [{"_id": k, "count": v} for k, v in sorted(strategy_counts.items(), key=lambda x: x[1], reverse=True)] # Get priority distribution priority_counts = {} for doc in self.analyses_cache: priority = doc.get("email_strategy", {}).get("priority", "Unknown") priority_counts[priority] = priority_counts.get(priority, 0) + 1 priority_stats = [{"_id": k, "count": v} for k, v in sorted(priority_counts.items(), key=lambda x: x[1], reverse=True)] # Count recent analyses (just show total for now since we don't have created_at timestamps) recent_count = total_count # Format statistics with enhanced visual design and clickable filters stats_html = f"""

    📊 Total Accounts

    {total_count}

    📧 Email Strategies

    """ for stat in strategy_stats: strategy_type = stat["_id"] or "Unknown" count = stat["count"] percentage = (count / total_count * 100) if total_count > 0 else 0 stats_html += f"""
    {strategy_type.title()} {count} ({percentage:.1f}%)
    """ stats_html += """

    ⚡ Priority Levels

    """ for stat in priority_stats: priority = stat["_id"] or "Unknown" count = stat["count"] percentage = (count / total_count * 100) if total_count > 0 else 0 # Color code based on priority color = "#ff6b6b" if priority.lower() == "high" else "#feca57" if priority.lower() == "medium" else "#48dbfb" stats_html += f"""
    {priority.title()} {count} ({percentage:.1f}%)
    """ stats_html += """
    """ return stats_html except Exception as e: return f"Error generating statistics: {str(e)}" def launch(self, **kwargs): """Launch the Gradio interface.""" return self.interface.launch(**kwargs)