import gradio as gr import pandas as pd import plotly.express as px import plotly.graph_objects as go from itertools import combinations import re def parse_labels(label_str): if pd.isna(label_str): return [] if label_str.startswith('[') and label_str.endswith(']'): matches = re.findall(r"'([^']*)'|\"([^\"]*)\"", label_str) return [m[0] or m[1] for m in matches] return [label_str] def analyze_coverage(df, sources, omniscan_sets=1, selected_tasks=None): results = {} # Filter by selected tasks if provided if selected_tasks: df = df[df['task'].isin(selected_tasks)] tasks_to_process = selected_tasks else: tasks_to_process = df['task'].unique().tolist() for asin in df['asin'].unique(): asin_data = df[df['asin'] == asin] # Check coverage for each task task_coverage = {} all_unobservable_labels = [] for task in tasks_to_process: task_data = asin_data[asin_data['task'] == task] if task_data.empty: continue task_covered = False task_unobservable = [] # Handle omniscan combinations for this task if 'omniscan' in sources and 'omniscan' in task_data['source_type'].values: omniscan_data = task_data[task_data['source_type'] == 'omniscan'] # Sort by timestamp and take earliest N captures if 'timestamp' in omniscan_data.columns: omniscan_data = omniscan_data.sort_values('timestamp') num_captures = min(omniscan_sets, len(omniscan_data)) selected_captures = omniscan_data.head(num_captures) all_parsed = [] for label in selected_captures['label']: all_parsed.extend(parse_labels(label)) non_unobservable = [l for l in all_parsed if 'UNOBSERVABLE' not in l.upper()] if non_unobservable: task_covered = True else: task_unobservable.extend([l for l in all_parsed if 'UNOBSERVABLE' in l.upper()]) # Handle other sources for this task if not task_covered: for source in sources: if source != 'omniscan': source_data = task_data[task_data['source_type'] == source] if not source_data.empty: all_parsed = [] for label in source_data['label']: all_parsed.extend(parse_labels(label)) non_unobservable = [l for l in all_parsed if 'UNOBSERVABLE' not in l.upper()] if non_unobservable: task_covered = True break else: task_unobservable.extend([l for l in all_parsed if 'UNOBSERVABLE' in l.upper()]) task_coverage[task] = task_covered if not task_covered: all_unobservable_labels.extend(task_unobservable) # ASIN is covered only if ALL tasks are covered asin_covered = all(task_coverage.values()) if task_coverage else False # Custom rule for German ingredients/allergens if ('ingredients-german' in tasks_to_process and 'iallergens-german' in tasks_to_process and 'ingredients-german' in task_coverage and 'iallergens-german' in task_coverage): # If ingredients-german is covered but iallergens-german is not if (task_coverage['ingredients-german'] and not task_coverage['iallergens-german']): # Check if iallergens-german failed only due to "UNOBSERVABLE" (not other unobservable types) iallergens_data = asin_data[asin_data['task'] == 'iallergens-german'] if not iallergens_data.empty: all_iallergens_labels = [] for label in iallergens_data['label']: all_iallergens_labels.extend(parse_labels(label)) # Check if all unobservable labels are exactly "UNOBSERVABLE" if (all_iallergens_labels and all(label.upper() == 'UNOBSERVABLE' for label in all_iallergens_labels)): asin_covered = True task_coverage['iallergens-german'] = True results[asin] = { 'covered': asin_covered, 'task_coverage': task_coverage, 'unobservable_labels': all_unobservable_labels } return results def create_analysis(csv_file, marketing, omniscan, pics, detailed_page, omniscan_sets, task_checkboxes): if csv_file is None: return None, "Please upload a CSV file" df = pd.read_csv(csv_file.name) # Get selected tasks selected_tasks = task_checkboxes if task_checkboxes else [] if not selected_tasks: return None, "Please select at least one task" # Get available sources available_sources = df['source_type'].unique() # Build selected sources list sources = [] if marketing and 'marketing' in available_sources: sources.append('marketing') if omniscan and 'omniscan' in available_sources: sources.append('omniscan') if pics and 'pics' in available_sources: sources.append('pics') if detailed_page and 'detailed_page' in available_sources: sources.append('detailed_page') if not sources: return None, "Please select at least one available source" # Analyze coverage results = analyze_coverage(df, sources, omniscan_sets, selected_tasks) # Calculate coverage statistics total_asins = len(results) covered_asins = sum(1 for r in results.values() if r['covered']) uncovered_asins = total_asins - covered_asins asin_coverage_rate = covered_asins / total_asins if total_asins > 0 else 0 uncovered_rate = uncovered_asins / total_asins if total_asins > 0 else 0 # Collect unobservable labels only from uncovered ASINs all_unobservable = [] for result in results.values(): if not result['covered']: all_unobservable.extend(result['unobservable_labels']) # Create pie chart for unobservable issues if all_unobservable: unobservable_counts = pd.Series(all_unobservable).value_counts() fig = px.pie(values=unobservable_counts.values, names=unobservable_counts.index, title=f"Unobservable Issues from {uncovered_asins} Uncovered ASINs ({uncovered_rate:.1%} of total)") else: fig = px.pie(values=[1], names=['All Covered'], title=f"ASIN Coverage: {asin_coverage_rate:.1%}") stats = f"## 📊 **ASIN Coverage: {covered_asins}/{total_asins} ASINs ({asin_coverage_rate:.1%})**" return fig, stats def create_source_coverage_analysis(csv_file, marketing, omniscan, pics, detailed_page, task_checkboxes): if csv_file is None: return None, "Please upload a CSV file" df = pd.read_csv(csv_file.name) # Get selected tasks selected_tasks = task_checkboxes if task_checkboxes else [] if not selected_tasks: return None, "Please select at least one task" # Get available sources available_sources = df['source_type'].unique() # Build selected sources list selected_sources = [] if marketing and 'marketing' in available_sources: selected_sources.append('marketing') if omniscan and 'omniscan' in available_sources: selected_sources.append('omniscan') if pics and 'pics' in available_sources: selected_sources.append('pics') if detailed_page and 'detailed_page' in available_sources: selected_sources.append('detailed_page') if not selected_sources: return None, "Please select at least one available source" # Calculate coverage for all combinations using the same logic as main analysis coverage_data = [] # Single sources for source in selected_sources: results = analyze_coverage(df, [source], 1, selected_tasks) covered_asins = sum(1 for r in results.values() if r['covered']) coverage_data.append((source, covered_asins)) # Pairs for combo in combinations(selected_sources, 2): results = analyze_coverage(df, list(combo), 1, selected_tasks) covered_asins = sum(1 for r in results.values() if r['covered']) coverage_data.append((f"{combo[0]}
{combo[1]}", covered_asins)) # All combinations of 3 or more if len(selected_sources) >= 3: for r in range(3, len(selected_sources) + 1): for combo in combinations(selected_sources, r): results = analyze_coverage(df, list(combo), 1, selected_tasks) covered_asins = sum(1 for res in results.values() if res['covered']) coverage_data.append(("
".join(combo), covered_asins)) # Create spider/radar chart labels, values = zip(*coverage_data) # Calculate total ASINs for percentage calculation total_asins = len(df['asin'].unique()) # Create text labels with value and percentage text_labels = [f"{value} ({value/total_asins*100:.1f}%)" for value in values] fig = go.Figure() fig.add_trace(go.Scatterpolar( r=values, theta=labels, fill='toself', name='ASIN Coverage', line_color='rgb(0, 123, 255)', fillcolor='rgba(0, 123, 255, 0.3)', text=text_labels, textposition='top right', mode='markers+text+lines' )) fig.update_layout( polar=dict( radialaxis=dict( visible=False, # Hide radial axis values range=[0, max(values) * 1.1] if values else [0, 100] ) ), title='ASIN Coverage by Source Combination (Spider Chart)', height=600, showlegend=True ) # Create statistics text stats_text = "## 📊 **Source Coverage Statistics**\n```\n" for label, value in coverage_data: stats_text += f"{label:<30}: {value} ASINs\n" stats_text += "```" return fig, stats_text def create_omniscan_capture_analysis(csv_file, task_checkboxes): if csv_file is None: return None, "Please upload a CSV file" df = pd.read_csv(csv_file.name) # Get selected tasks selected_tasks = task_checkboxes if task_checkboxes else [] if not selected_tasks: return None, "Please select at least one task" # Check if omniscan data exists if 'omniscan' not in df['source_type'].values: return None, "No omniscan data found in the dataset" # Get max omniscan captures available max_captures = df[df['source_type'] == 'omniscan'].groupby('asin').size().max() # Analyze coverage for different numbers of omniscan captures capture_data = [] for num_captures in range(1, min(max_captures + 1, 11)): # Limit to 10 captures max results = analyze_coverage(df, ['omniscan'], num_captures, selected_tasks) covered_asins = sum(1 for r in results.values() if r['covered']) total_asins = len(results) coverage_pct = (covered_asins / total_asins * 100) if total_asins > 0 else 0 capture_data.append((num_captures, covered_asins, coverage_pct)) # Create line chart captures, counts, percentages = zip(*capture_data) fig = go.Figure() fig.add_trace(go.Scatter( x=captures, y=percentages, mode='lines+markers', name='Coverage %', line=dict(color='rgb(0, 123, 255)', width=3), marker=dict(size=8), text=[f"{count} ASINs ({pct:.1f}%)" for count, pct in zip(counts, percentages)], textposition='top center' )) fig.update_layout( title='Coverage Gains by Number of Omniscan Captures', xaxis_title='Number of Omniscan Captures', yaxis_title='Coverage Percentage (%)', height=500, showlegend=False ) # Create statistics text stats_text = "## 📈 **Omniscan Capture Analysis**\n```\n" for captures, count, pct in capture_data: gain = pct - capture_data[0][2] if captures > 1 else 0 stats_text += f"{captures} capture(s): {count:3d} ASINs ({pct:5.1f}%) [+{gain:4.1f}% gain]\n" stats_text += "```" return fig, stats_text def update_source_buttons(csv_file): if csv_file is None: return (gr.Checkbox(interactive=False), gr.Checkbox(interactive=False), gr.Checkbox(interactive=False), gr.Checkbox(interactive=False), gr.Slider(interactive=False), gr.CheckboxGroup(choices=[], interactive=False)) df = pd.read_csv(csv_file.name) available_sources = df['source_type'].unique() available_tasks = sorted(df['task'].unique().tolist()) marketing_available = 'marketing' in available_sources omniscan_available = 'omniscan' in available_sources pics_available = 'pics' in available_sources detailed_page_available = 'detailed_page' in available_sources # Get max omniscan sets for slider max_omniscan = 1 if omniscan_available: max_omniscan = df[df['source_type'] == 'omniscan'].groupby('asin').size().max() return (gr.Checkbox(interactive=marketing_available, value=False), gr.Checkbox(interactive=omniscan_available, value=False), gr.Checkbox(interactive=pics_available, value=False), gr.Checkbox(interactive=detailed_page_available, value=False), gr.Slider(minimum=1, maximum=min(max_omniscan, 10), value=1, step=1, interactive=omniscan_available), gr.CheckboxGroup(choices=available_tasks, value=[], interactive=True)) with gr.Blocks() as demo: gr.Markdown("# Omniscan Multi-Capture Multi-Source Analysis Tool") csv_input = gr.File(label="Upload CSV file", file_types=[".csv"]) with gr.Row(): with gr.Column(): gr.Markdown("### 📊 Data Sources") marketing_cb = gr.Checkbox(label="Marketing", interactive=False) omniscan_cb = gr.Checkbox(label="Omniscan", interactive=False) pics_cb = gr.Checkbox(label="PICS", interactive=False) detailed_page_cb = gr.Checkbox(label="Detailed Page Text", interactive=False) gr.Markdown("### 🏷️ Task Selection") task_checkboxes = gr.CheckboxGroup(label="Select Tasks", choices=[], interactive=False) gr.Markdown("### ⚙️ Omniscan Settings") omniscan_sets = gr.Slider(label="Max Omniscan Image Sets", minimum=1, maximum=10, value=1, step=1, interactive=False) with gr.Column(): analyze_btn = gr.Button("📈 Analyze Coverage") stats_output = gr.Markdown(label="Statistics") plot_output = gr.Plot() gr.Markdown("---") source_coverage_btn = gr.Button("🔍 Analyze Source Coverage") source_stats_output = gr.Markdown(label="Source Coverage Statistics") source_plot_output = gr.Plot() gr.Markdown("---") omniscan_capture_btn = gr.Button("📈 Analyze Omniscan Captures") omniscan_capture_stats_output = gr.Markdown(label="Omniscan Capture Statistics") omniscan_capture_plot_output = gr.Plot() # Update source availability when CSV is uploaded csv_input.change( update_source_buttons, inputs=csv_input, outputs=[marketing_cb, omniscan_cb, pics_cb, detailed_page_cb, omniscan_sets, task_checkboxes] ) # Run analysis analyze_btn.click( create_analysis, inputs=[csv_input, marketing_cb, omniscan_cb, pics_cb, detailed_page_cb, omniscan_sets, task_checkboxes], outputs=[plot_output, stats_output] ) # Run source coverage analysis source_coverage_btn.click( create_source_coverage_analysis, inputs=[csv_input, marketing_cb, omniscan_cb, pics_cb, detailed_page_cb, task_checkboxes], outputs=[source_plot_output, source_stats_output] ) # Run omniscan capture analysis omniscan_capture_btn.click( create_omniscan_capture_analysis, inputs=[csv_input, task_checkboxes], outputs=[omniscan_capture_plot_output, omniscan_capture_stats_output] ) #demo.launch()