|
|
import gradio as gr |
|
|
import pandas as pd |
|
|
import plotly.express as px |
|
|
import plotly.graph_objects as go |
|
|
from itertools import combinations |
|
|
import re |
|
|
|
|
|
def parse_labels(label_str): |
|
|
if pd.isna(label_str): |
|
|
return [] |
|
|
if label_str.startswith('[') and label_str.endswith(']'): |
|
|
matches = re.findall(r"'([^']*)'|\"([^\"]*)\"", label_str) |
|
|
return [m[0] or m[1] for m in matches] |
|
|
return [label_str] |
|
|
|
|
|
def analyze_coverage(df, sources, omniscan_sets=1, selected_tasks=None): |
|
|
results = {} |
|
|
|
|
|
|
|
|
if selected_tasks: |
|
|
df = df[df['task'].isin(selected_tasks)] |
|
|
tasks_to_process = selected_tasks |
|
|
else: |
|
|
tasks_to_process = df['task'].unique().tolist() |
|
|
|
|
|
for asin in df['asin'].unique(): |
|
|
asin_data = df[df['asin'] == asin] |
|
|
|
|
|
|
|
|
task_coverage = {} |
|
|
all_unobservable_labels = [] |
|
|
|
|
|
for task in tasks_to_process: |
|
|
task_data = asin_data[asin_data['task'] == task] |
|
|
if task_data.empty: |
|
|
continue |
|
|
|
|
|
task_covered = False |
|
|
task_unobservable = [] |
|
|
|
|
|
|
|
|
if 'omniscan' in sources and 'omniscan' in task_data['source_type'].values: |
|
|
omniscan_data = task_data[task_data['source_type'] == 'omniscan'] |
|
|
|
|
|
|
|
|
if 'timestamp' in omniscan_data.columns: |
|
|
omniscan_data = omniscan_data.sort_values('timestamp') |
|
|
|
|
|
num_captures = min(omniscan_sets, len(omniscan_data)) |
|
|
selected_captures = omniscan_data.head(num_captures) |
|
|
|
|
|
all_parsed = [] |
|
|
for label in selected_captures['label']: |
|
|
all_parsed.extend(parse_labels(label)) |
|
|
|
|
|
non_unobservable = [l for l in all_parsed if 'UNOBSERVABLE' not in l.upper()] |
|
|
if non_unobservable: |
|
|
task_covered = True |
|
|
else: |
|
|
task_unobservable.extend([l for l in all_parsed if 'UNOBSERVABLE' in l.upper()]) |
|
|
|
|
|
|
|
|
if not task_covered: |
|
|
for source in sources: |
|
|
if source != 'omniscan': |
|
|
source_data = task_data[task_data['source_type'] == source] |
|
|
if not source_data.empty: |
|
|
all_parsed = [] |
|
|
for label in source_data['label']: |
|
|
all_parsed.extend(parse_labels(label)) |
|
|
non_unobservable = [l for l in all_parsed if 'UNOBSERVABLE' not in l.upper()] |
|
|
if non_unobservable: |
|
|
task_covered = True |
|
|
break |
|
|
else: |
|
|
task_unobservable.extend([l for l in all_parsed if 'UNOBSERVABLE' in l.upper()]) |
|
|
|
|
|
task_coverage[task] = task_covered |
|
|
if not task_covered: |
|
|
all_unobservable_labels.extend(task_unobservable) |
|
|
|
|
|
|
|
|
asin_covered = all(task_coverage.values()) if task_coverage else False |
|
|
|
|
|
|
|
|
if ('ingredients-german' in tasks_to_process and 'iallergens-german' in tasks_to_process and |
|
|
'ingredients-german' in task_coverage and 'iallergens-german' in task_coverage): |
|
|
|
|
|
|
|
|
if (task_coverage['ingredients-german'] and not task_coverage['iallergens-german']): |
|
|
|
|
|
iallergens_data = asin_data[asin_data['task'] == 'iallergens-german'] |
|
|
if not iallergens_data.empty: |
|
|
all_iallergens_labels = [] |
|
|
for label in iallergens_data['label']: |
|
|
all_iallergens_labels.extend(parse_labels(label)) |
|
|
|
|
|
|
|
|
if (all_iallergens_labels and |
|
|
all(label.upper() == 'UNOBSERVABLE' for label in all_iallergens_labels)): |
|
|
asin_covered = True |
|
|
task_coverage['iallergens-german'] = True |
|
|
|
|
|
results[asin] = { |
|
|
'covered': asin_covered, |
|
|
'task_coverage': task_coverage, |
|
|
'unobservable_labels': all_unobservable_labels |
|
|
} |
|
|
|
|
|
return results |
|
|
|
|
|
def create_analysis(csv_file, marketing, omniscan, pics, detailed_page, omniscan_sets, task_checkboxes): |
|
|
if csv_file is None: |
|
|
return None, "Please upload a CSV file" |
|
|
|
|
|
df = pd.read_csv(csv_file.name) |
|
|
|
|
|
|
|
|
selected_tasks = task_checkboxes if task_checkboxes else [] |
|
|
if not selected_tasks: |
|
|
return None, "Please select at least one task" |
|
|
|
|
|
|
|
|
available_sources = df['source_type'].unique() |
|
|
|
|
|
|
|
|
sources = [] |
|
|
if marketing and 'marketing' in available_sources: |
|
|
sources.append('marketing') |
|
|
if omniscan and 'omniscan' in available_sources: |
|
|
sources.append('omniscan') |
|
|
if pics and 'pics' in available_sources: |
|
|
sources.append('pics') |
|
|
if detailed_page and 'detailed_page' in available_sources: |
|
|
sources.append('detailed_page') |
|
|
|
|
|
if not sources: |
|
|
return None, "Please select at least one available source" |
|
|
|
|
|
|
|
|
results = analyze_coverage(df, sources, omniscan_sets, selected_tasks) |
|
|
|
|
|
|
|
|
total_asins = len(results) |
|
|
covered_asins = sum(1 for r in results.values() if r['covered']) |
|
|
uncovered_asins = total_asins - covered_asins |
|
|
asin_coverage_rate = covered_asins / total_asins if total_asins > 0 else 0 |
|
|
uncovered_rate = uncovered_asins / total_asins if total_asins > 0 else 0 |
|
|
|
|
|
|
|
|
all_unobservable = [] |
|
|
for result in results.values(): |
|
|
if not result['covered']: |
|
|
all_unobservable.extend(result['unobservable_labels']) |
|
|
|
|
|
|
|
|
if all_unobservable: |
|
|
unobservable_counts = pd.Series(all_unobservable).value_counts() |
|
|
fig = px.pie(values=unobservable_counts.values, names=unobservable_counts.index, |
|
|
title=f"Unobservable Issues from {uncovered_asins} Uncovered ASINs ({uncovered_rate:.1%} of total)") |
|
|
else: |
|
|
fig = px.pie(values=[1], names=['All Covered'], |
|
|
title=f"ASIN Coverage: {asin_coverage_rate:.1%}") |
|
|
|
|
|
stats = f"## π **ASIN Coverage: {covered_asins}/{total_asins} ASINs ({asin_coverage_rate:.1%})**" |
|
|
return fig, stats |
|
|
|
|
|
def create_source_coverage_analysis(csv_file, marketing, omniscan, pics, detailed_page, task_checkboxes): |
|
|
if csv_file is None: |
|
|
return None, "Please upload a CSV file" |
|
|
|
|
|
df = pd.read_csv(csv_file.name) |
|
|
|
|
|
|
|
|
selected_tasks = task_checkboxes if task_checkboxes else [] |
|
|
if not selected_tasks: |
|
|
return None, "Please select at least one task" |
|
|
|
|
|
|
|
|
available_sources = df['source_type'].unique() |
|
|
|
|
|
|
|
|
selected_sources = [] |
|
|
if marketing and 'marketing' in available_sources: |
|
|
selected_sources.append('marketing') |
|
|
if omniscan and 'omniscan' in available_sources: |
|
|
selected_sources.append('omniscan') |
|
|
if pics and 'pics' in available_sources: |
|
|
selected_sources.append('pics') |
|
|
if detailed_page and 'detailed_page' in available_sources: |
|
|
selected_sources.append('detailed_page') |
|
|
|
|
|
if not selected_sources: |
|
|
return None, "Please select at least one available source" |
|
|
|
|
|
|
|
|
coverage_data = [] |
|
|
|
|
|
|
|
|
for source in selected_sources: |
|
|
results = analyze_coverage(df, [source], 1, selected_tasks) |
|
|
covered_asins = sum(1 for r in results.values() if r['covered']) |
|
|
coverage_data.append((source, covered_asins)) |
|
|
|
|
|
|
|
|
for combo in combinations(selected_sources, 2): |
|
|
results = analyze_coverage(df, list(combo), 1, selected_tasks) |
|
|
covered_asins = sum(1 for r in results.values() if r['covered']) |
|
|
coverage_data.append((f"{combo[0]}<br>{combo[1]}", covered_asins)) |
|
|
|
|
|
|
|
|
if len(selected_sources) >= 3: |
|
|
for r in range(3, len(selected_sources) + 1): |
|
|
for combo in combinations(selected_sources, r): |
|
|
results = analyze_coverage(df, list(combo), 1, selected_tasks) |
|
|
covered_asins = sum(1 for res in results.values() if res['covered']) |
|
|
coverage_data.append(("<br>".join(combo), covered_asins)) |
|
|
|
|
|
|
|
|
labels, values = zip(*coverage_data) |
|
|
|
|
|
|
|
|
total_asins = len(df['asin'].unique()) |
|
|
|
|
|
|
|
|
text_labels = [f"{value} ({value/total_asins*100:.1f}%)" for value in values] |
|
|
|
|
|
fig = go.Figure() |
|
|
|
|
|
fig.add_trace(go.Scatterpolar( |
|
|
r=values, |
|
|
theta=labels, |
|
|
fill='toself', |
|
|
name='ASIN Coverage', |
|
|
line_color='rgb(0, 123, 255)', |
|
|
fillcolor='rgba(0, 123, 255, 0.3)', |
|
|
text=text_labels, |
|
|
textposition='top right', |
|
|
mode='markers+text+lines' |
|
|
)) |
|
|
|
|
|
fig.update_layout( |
|
|
polar=dict( |
|
|
radialaxis=dict( |
|
|
visible=False, |
|
|
range=[0, max(values) * 1.1] if values else [0, 100] |
|
|
) |
|
|
), |
|
|
title='ASIN Coverage by Source Combination (Spider Chart)', |
|
|
height=600, |
|
|
showlegend=True |
|
|
) |
|
|
|
|
|
|
|
|
stats_text = "## π **Source Coverage Statistics**\n```\n" |
|
|
for label, value in coverage_data: |
|
|
stats_text += f"{label:<30}: {value} ASINs\n" |
|
|
stats_text += "```" |
|
|
|
|
|
return fig, stats_text |
|
|
|
|
|
def create_omniscan_capture_analysis(csv_file, task_checkboxes): |
|
|
if csv_file is None: |
|
|
return None, "Please upload a CSV file" |
|
|
|
|
|
df = pd.read_csv(csv_file.name) |
|
|
|
|
|
|
|
|
selected_tasks = task_checkboxes if task_checkboxes else [] |
|
|
if not selected_tasks: |
|
|
return None, "Please select at least one task" |
|
|
|
|
|
|
|
|
if 'omniscan' not in df['source_type'].values: |
|
|
return None, "No omniscan data found in the dataset" |
|
|
|
|
|
|
|
|
max_captures = df[df['source_type'] == 'omniscan'].groupby('asin').size().max() |
|
|
|
|
|
|
|
|
capture_data = [] |
|
|
|
|
|
for num_captures in range(1, min(max_captures + 1, 11)): |
|
|
results = analyze_coverage(df, ['omniscan'], num_captures, selected_tasks) |
|
|
covered_asins = sum(1 for r in results.values() if r['covered']) |
|
|
total_asins = len(results) |
|
|
coverage_pct = (covered_asins / total_asins * 100) if total_asins > 0 else 0 |
|
|
capture_data.append((num_captures, covered_asins, coverage_pct)) |
|
|
|
|
|
|
|
|
captures, counts, percentages = zip(*capture_data) |
|
|
|
|
|
fig = go.Figure() |
|
|
|
|
|
fig.add_trace(go.Scatter( |
|
|
x=captures, |
|
|
y=percentages, |
|
|
mode='lines+markers', |
|
|
name='Coverage %', |
|
|
line=dict(color='rgb(0, 123, 255)', width=3), |
|
|
marker=dict(size=8), |
|
|
text=[f"{count} ASINs ({pct:.1f}%)" for count, pct in zip(counts, percentages)], |
|
|
textposition='top center' |
|
|
)) |
|
|
|
|
|
fig.update_layout( |
|
|
title='Coverage Gains by Number of Omniscan Captures', |
|
|
xaxis_title='Number of Omniscan Captures', |
|
|
yaxis_title='Coverage Percentage (%)', |
|
|
height=500, |
|
|
showlegend=False |
|
|
) |
|
|
|
|
|
|
|
|
stats_text = "## π **Omniscan Capture Analysis**\n```\n" |
|
|
for captures, count, pct in capture_data: |
|
|
gain = pct - capture_data[0][2] if captures > 1 else 0 |
|
|
stats_text += f"{captures} capture(s): {count:3d} ASINs ({pct:5.1f}%) [+{gain:4.1f}% gain]\n" |
|
|
stats_text += "```" |
|
|
|
|
|
return fig, stats_text |
|
|
|
|
|
def update_source_buttons(csv_file): |
|
|
if csv_file is None: |
|
|
return (gr.Checkbox(interactive=False), gr.Checkbox(interactive=False), |
|
|
gr.Checkbox(interactive=False), gr.Checkbox(interactive=False), |
|
|
gr.Slider(interactive=False), gr.CheckboxGroup(choices=[], interactive=False)) |
|
|
|
|
|
df = pd.read_csv(csv_file.name) |
|
|
available_sources = df['source_type'].unique() |
|
|
available_tasks = sorted(df['task'].unique().tolist()) |
|
|
|
|
|
marketing_available = 'marketing' in available_sources |
|
|
omniscan_available = 'omniscan' in available_sources |
|
|
pics_available = 'pics' in available_sources |
|
|
detailed_page_available = 'detailed_page' in available_sources |
|
|
|
|
|
|
|
|
max_omniscan = 1 |
|
|
if omniscan_available: |
|
|
max_omniscan = df[df['source_type'] == 'omniscan'].groupby('asin').size().max() |
|
|
|
|
|
return (gr.Checkbox(interactive=marketing_available, value=False), |
|
|
gr.Checkbox(interactive=omniscan_available, value=False), |
|
|
gr.Checkbox(interactive=pics_available, value=False), |
|
|
gr.Checkbox(interactive=detailed_page_available, value=False), |
|
|
gr.Slider(minimum=1, maximum=min(max_omniscan, 10), value=1, step=1, interactive=omniscan_available), |
|
|
gr.CheckboxGroup(choices=available_tasks, value=[], interactive=True)) |
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
gr.Markdown("# Omniscan Multi-Capture Multi-Source Analysis Tool") |
|
|
|
|
|
csv_input = gr.File(label="Upload CSV file", file_types=[".csv"]) |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
gr.Markdown("### π Data Sources") |
|
|
marketing_cb = gr.Checkbox(label="Marketing", interactive=False) |
|
|
omniscan_cb = gr.Checkbox(label="Omniscan", interactive=False) |
|
|
pics_cb = gr.Checkbox(label="PICS", interactive=False) |
|
|
detailed_page_cb = gr.Checkbox(label="Detailed Page Text", interactive=False) |
|
|
|
|
|
gr.Markdown("### π·οΈ Task Selection") |
|
|
task_checkboxes = gr.CheckboxGroup(label="Select Tasks", choices=[], interactive=False) |
|
|
|
|
|
gr.Markdown("### βοΈ Omniscan Settings") |
|
|
omniscan_sets = gr.Slider(label="Max Omniscan Image Sets", minimum=1, maximum=10, |
|
|
value=1, step=1, interactive=False) |
|
|
|
|
|
with gr.Column(): |
|
|
analyze_btn = gr.Button("π Analyze Coverage") |
|
|
stats_output = gr.Markdown(label="Statistics") |
|
|
plot_output = gr.Plot() |
|
|
|
|
|
gr.Markdown("---") |
|
|
source_coverage_btn = gr.Button("π Analyze Source Coverage") |
|
|
source_stats_output = gr.Markdown(label="Source Coverage Statistics") |
|
|
source_plot_output = gr.Plot() |
|
|
|
|
|
gr.Markdown("---") |
|
|
omniscan_capture_btn = gr.Button("π Analyze Omniscan Captures") |
|
|
omniscan_capture_stats_output = gr.Markdown(label="Omniscan Capture Statistics") |
|
|
omniscan_capture_plot_output = gr.Plot() |
|
|
|
|
|
|
|
|
csv_input.change( |
|
|
update_source_buttons, |
|
|
inputs=csv_input, |
|
|
outputs=[marketing_cb, omniscan_cb, pics_cb, detailed_page_cb, omniscan_sets, task_checkboxes] |
|
|
) |
|
|
|
|
|
|
|
|
analyze_btn.click( |
|
|
create_analysis, |
|
|
inputs=[csv_input, marketing_cb, omniscan_cb, pics_cb, detailed_page_cb, omniscan_sets, task_checkboxes], |
|
|
outputs=[plot_output, stats_output] |
|
|
) |
|
|
|
|
|
|
|
|
source_coverage_btn.click( |
|
|
create_source_coverage_analysis, |
|
|
inputs=[csv_input, marketing_cb, omniscan_cb, pics_cb, detailed_page_cb, task_checkboxes], |
|
|
outputs=[source_plot_output, source_stats_output] |
|
|
) |
|
|
|
|
|
|
|
|
omniscan_capture_btn.click( |
|
|
create_omniscan_capture_analysis, |
|
|
inputs=[csv_input, task_checkboxes], |
|
|
outputs=[omniscan_capture_plot_output, omniscan_capture_stats_output] |
|
|
) |
|
|
|
|
|
|