import json import re from typing import Any, Dict, List, Tuple, Optional from datetime import datetime from pathlib import Path import gradio as gr import pandas as pd from second_brain_online.config import settings class CustomerProfileUI: """Gradio UI for displaying customer profile analyses with search functionality.""" def __init__(self, data_dir: str = None): # Default to the offline data directory if data_dir is None: # Go up from second-brain-online to second-brain-offline/data/customer_analyses data_dir = Path(__file__).parent.parent.parent.parent.parent.parent / "second-brain-offline" / "data" / "customer_analyses" self.data_dir = Path(data_dir) self.analyses_cache = None self.load_analyses_from_disk() self.setup_ui() def load_analyses_from_disk(self): """Load all customer analyses from JSON files on disk.""" try: # Check if the all-in-one file exists all_file = self.data_dir / "customer_analyses_all.json" if all_file.exists(): print(f"đ Loading analyses from: {all_file}") with open(all_file, 'r') as f: self.analyses_cache = json.load(f) print(f"â Loaded {len(self.analyses_cache)} analyses from disk") else: # Load individual files print(f"đ Loading analyses from directory: {self.data_dir}") self.analyses_cache = [] for json_file in self.data_dir.glob("customer_analysis_*.json"): try: with open(json_file, 'r') as f: analysis = json.load(f) self.analyses_cache.append(analysis) except Exception as e: print(f"â ī¸ Failed to load {json_file}: {e}") print(f"â Loaded {len(self.analyses_cache)} analyses from {len(list(self.data_dir.glob('customer_analysis_*.json')))} files") except Exception as e: print(f"â Failed to load analyses from disk: {e}") self.analyses_cache = [] def format_text_for_table(self, text_list, max_items=3): """Format a list of text items for better table display with proper bullet point spacing.""" if not text_list: return "No items" if isinstance(text_list, str): return text_list if len(text_list) == 0: return "No items" elif len(text_list) == 1: return f"âĸ {text_list[0]}" else: # Format bullet points with proper spacing formatted_items = [] items_to_show = min(len(text_list), max_items) for i in range(items_to_show): formatted_items.append(f"âĸ {text_list[i]}") # Add indicator for remaining items if any if len(text_list) > max_items: remaining = len(text_list) - max_items formatted_items.append(f"âĸ ... and {remaining} more") return "\n".join(formatted_items) def setup_ui(self): """Setup the Gradio interface for customer profile analyses.""" with gr.Blocks( title="Customer Profile Analysis Dashboard", theme=gr.themes.Soft(), css=""" .customer-card { border: 1px solid #e0e0e0; border-radius: 8px; padding: 16px; margin: 8px 0; background-color: #f8f9fa; } .customer-title { font-weight: bold; color: #2c3e50; margin-bottom: 8px; font-size: 1.1em; } .customer-meta { font-size: 0.9em; color: #6c757d; margin-bottom: 12px; } .key-changes { background-color: #fff3cd; border-left: 4px solid #ffc107; padding: 8px 12px; margin: 8px 0; border-radius: 4px; font-size: 0.9em; } .recommendations { background-color: #d1ecf1; border-left: 4px solid #17a2b8; padding: 8px 12px; margin: 8px 0; border-radius: 4px; font-size: 0.9em; } .email-strategy { background-color: #d4edda; border-left: 4px solid #28a745; padding: 8px 12px; margin: 8px 0; border-radius: 4px; font-size: 0.9em; } .follow-up-email { background-color: #e2e3e5; border-left: 4px solid #6c757d; padding: 8px 12px; margin: 8px 0; border-radius: 4px; font-size: 0.9em; } .search-highlight { background-color: #fff3cd; padding: 2px 4px; border-radius: 3px; } .dataframe { font-size: 0.9em; line-height: 1.4; } .dataframe td { padding: 8px 6px; vertical-align: top; word-wrap: break-word; white-space: pre-wrap; } .dataframe th { padding: 8px 6px; font-weight: bold; background-color: #f8f9fa; } """ ) as self.interface: gr.Markdown("# đ Customer Profile Analysis Dashboard") gr.Markdown("View and search through customer profile analyses with AI-generated insights and follow-up emails.") # Statistics section at the top with gr.Row(): with gr.Column(scale=1): gr.Markdown("### đ Dashboard Statistics") self.stats_view = gr.HTML( value=self.generate_statistics(), label="Analysis Statistics", show_label=False ) # Filter buttons with gr.Row(): gr.Markdown("### đ Quick Filters") with gr.Row(): self.strategy_dropdown = gr.Dropdown( choices=self.get_strategy_choices(), label="Email Strategy", value=None, multiselect=False, scale=2 ) self.priority_dropdown = gr.Dropdown( choices=self.get_priority_choices(), label="Priority Level", value=None, multiselect=False, scale=2 ) self.clear_filters_btn = gr.Button("đ Clear All Filters", scale=1, variant="secondary") gr.Markdown("---") # Separator line # Search functionality with gr.Row(): self.search_input = gr.Textbox( label="Search Customer Analyses", placeholder="Search by company name, customer ID, key changes, recommendations, or email content...", scale=4 ) self.clear_search_btn = gr.Button("Clear Search", scale=1) self.refresh_btn = gr.Button("Refresh Data", scale=1, variant="secondary") # Main data table self.customer_table = gr.Dataframe( headers=["Customer ID", "Company Name", "Analysis Date", "Key Changes", "Recommendations", "Email Strategy"], datatype=["str", "str", "str", "str", "str", "str"], interactive=False, label="Customer Profile Analyses", wrap=True, max_height=600, column_widths=["10%", "20%", "12%", "25%", "25%", "8%"], value=self.load_customer_analyses() ) # Detailed view accordion with gr.Accordion("đ Detailed Analysis View", open=False): self.detailed_view = gr.HTML( value="Select a row from the table above to view detailed analysis", label="Detailed Analysis" ) # Event handlers self.search_input.change( fn=self.filter_customer_analyses, inputs=[self.search_input], outputs=[self.customer_table] ) self.clear_search_btn.click( fn=self.clear_search, inputs=[], outputs=[self.search_input, self.customer_table] ) self.refresh_btn.click( fn=self.refresh_data, inputs=[], outputs=[self.customer_table, self.stats_view] ) self.strategy_dropdown.change( fn=self.filter_by_strategy, inputs=[self.strategy_dropdown], outputs=[self.customer_table] ) self.priority_dropdown.change( fn=self.filter_by_priority, inputs=[self.priority_dropdown], outputs=[self.customer_table] ) self.clear_filters_btn.click( fn=self.clear_filters, inputs=[], outputs=[self.strategy_dropdown, self.priority_dropdown, self.customer_table] ) self.customer_table.select( fn=self.show_detailed_analysis, inputs=[self.customer_table], outputs=[self.detailed_view] ) def get_strategy_choices(self): """Get unique email strategy choices for dropdown.""" if not self.analyses_cache: return [] strategies = set() for doc in self.analyses_cache: strategy = doc.get("email_strategy", {}).get("email_type", "Unknown") strategies.add(strategy) return sorted(list(strategies)) def get_priority_choices(self): """Get unique priority choices for dropdown.""" if not self.analyses_cache: return [] priorities = set() for doc in self.analyses_cache: priority = doc.get("email_strategy", {}).get("priority", "Unknown") priorities.add(priority) return sorted(list(priorities)) def filter_by_strategy(self, strategy): """Filter analyses by email strategy.""" if not strategy: return self.load_customer_analyses() if not self.analyses_cache: return pd.DataFrame(columns=["Customer ID", "Company Name", "Analysis Date", "Key Changes", "Recommendations", "Email Strategy"]) try: filtered_docs = [] for doc in self.analyses_cache: doc_strategy = doc.get("email_strategy", {}).get("email_type", "Unknown") if doc_strategy == strategy: filtered_docs.append(doc) return self.format_analyses_for_table(filtered_docs) except Exception as e: print(f"â Error filtering by strategy: {e}") return self.load_customer_analyses() def filter_by_priority(self, priority): """Filter analyses by priority level.""" if not priority: return self.load_customer_analyses() if not self.analyses_cache: return pd.DataFrame(columns=["Customer ID", "Company Name", "Analysis Date", "Key Changes", "Recommendations", "Email Strategy"]) try: filtered_docs = [] for doc in self.analyses_cache: doc_priority = doc.get("email_strategy", {}).get("priority", "Unknown") if doc_priority == priority: filtered_docs.append(doc) return self.format_analyses_for_table(filtered_docs) except Exception as e: print(f"â Error filtering by priority: {e}") return self.load_customer_analyses() def clear_filters(self): """Clear all filters and reload data.""" return None, None, self.load_customer_analyses() def format_analyses_for_table(self, docs): """Format a list of documents for table display.""" data = [] for doc in docs: customer_id = str(doc.get("customer_id", "Unknown")) company_name = doc.get("company_name", "Unknown Company") analysis_date = doc.get("analysis_date", "Unknown Date") # Format key changes with proper bullet points key_changes = doc.get("key_changes", []) key_changes_text = self.format_text_for_table(key_changes, max_items=3) # Format recommendations with proper bullet points recommendations = doc.get("recommendations", []) recommendations_text = self.format_text_for_table(recommendations, max_items=3) # Format email strategy email_strategy = doc.get("email_strategy", {}) if isinstance(email_strategy, dict): strategy_type = email_strategy.get("email_type", "Unknown") priority = email_strategy.get("priority", "Unknown") email_strategy_text = f"{strategy_type.replace('_', ' ').title()}\n({priority.title()})" else: email_strategy_text = str(email_strategy)[:60] + "..." if len(str(email_strategy)) > 60 else str(email_strategy) data.append([ customer_id, company_name, analysis_date, key_changes_text, recommendations_text, email_strategy_text ]) return pd.DataFrame(data, columns=["Customer ID", "Company Name", "Analysis Date", "Key Changes", "Recommendations", "Email Strategy"]) def load_customer_analyses(self, limit: int = 100) -> pd.DataFrame: """Load customer analyses from disk and format for display.""" if not self.analyses_cache: return pd.DataFrame(columns=["Customer ID", "Company Name", "Analysis Date", "Key Changes", "Recommendations", "Email Strategy"]) try: return self.format_analyses_for_table(self.analyses_cache[:limit]) except Exception as e: print(f"â Error loading customer analyses: {e}") return pd.DataFrame(columns=["Customer ID", "Company Name", "Analysis Date", "Key Changes", "Recommendations", "Email Strategy"]) def filter_customer_analyses(self, search_term: str) -> pd.DataFrame: """Filter customer analyses based on search term.""" if not search_term.strip(): return self.load_customer_analyses() if not self.analyses_cache: return pd.DataFrame(columns=["Customer ID", "Company Name", "Analysis Date", "Key Changes", "Recommendations", "Email Strategy"]) try: # Filter analyses based on search term (case-insensitive) search_lower = search_term.lower() filtered_docs = [] for doc in self.analyses_cache: # Search in various fields if (search_lower in str(doc.get("customer_id", "")).lower() or search_lower in doc.get("company_name", "").lower() or any(search_lower in change.lower() for change in doc.get("key_changes", [])) or any(search_lower in rec.lower() for rec in doc.get("recommendations", [])) or search_lower in str(doc.get("email_strategy", {}).get("key_messaging", "")).lower() or search_lower in str(doc.get("follow_up_email", {}).get("subject", "")).lower() or search_lower in str(doc.get("follow_up_email", {}).get("body", "")).lower()): filtered_docs.append(doc) return self.format_analyses_for_table(filtered_docs[:100]) # Limit to 100 results except Exception as e: print(f"â Error filtering customer analyses: {e}") return self.load_customer_analyses() def clear_search(self): """Clear search input and reload all data.""" return "", self.load_customer_analyses() def refresh_data(self): """Refresh the data from disk.""" self.load_analyses_from_disk() return self.load_customer_analyses(), self.generate_statistics() def show_detailed_analysis(self, table_data, evt: gr.SelectData): """Show detailed analysis for selected row.""" try: if evt.index[0] >= len(table_data): return "Please select a valid row from the table." # Get the row data using iloc for proper pandas indexing row_data = table_data.iloc[evt.index[0]] customer_id = str(row_data.iloc[0]) # Customer ID is the first column # Find document in cache by customer_id if not self.analyses_cache: return "No analyses loaded from disk." doc = None for analysis in self.analyses_cache: if str(analysis.get("customer_id")) == customer_id: doc = analysis break if not doc: return f"No detailed data found for customer {customer_id}" # Format detailed analysis html = self.format_detailed_analysis(doc) return html except Exception as e: return f"Error loading detailed analysis: {str(e)}" def format_detailed_analysis(self, doc: dict) -> str: """Format detailed analysis as HTML.""" customer_id = doc.get("customer_id", "Unknown") company_name = doc.get("company_name", "Unknown Company") analysis_date = doc.get("analysis_date", "Unknown Date") # Format key changes key_changes = doc.get("key_changes", []) key_changes_html = "" if isinstance(key_changes, list): for i, change in enumerate(key_changes, 1): key_changes_html += f"