myspace / app.py
popaaln's picture
Update app.py
942cd6a verified
import gradio as gr
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from itertools import combinations
import re
def parse_labels(label_str):
if pd.isna(label_str):
return []
if label_str.startswith('[') and label_str.endswith(']'):
matches = re.findall(r"'([^']*)'|\"([^\"]*)\"", label_str)
return [m[0] or m[1] for m in matches]
return [label_str]
def analyze_coverage(df, sources, omniscan_sets=1, selected_tasks=None):
results = {}
# Filter by selected tasks if provided
if selected_tasks:
df = df[df['task'].isin(selected_tasks)]
tasks_to_process = selected_tasks
else:
tasks_to_process = df['task'].unique().tolist()
for asin in df['asin'].unique():
asin_data = df[df['asin'] == asin]
# Check coverage for each task
task_coverage = {}
all_unobservable_labels = []
for task in tasks_to_process:
task_data = asin_data[asin_data['task'] == task]
if task_data.empty:
continue
task_covered = False
task_unobservable = []
# Handle omniscan combinations for this task
if 'omniscan' in sources and 'omniscan' in task_data['source_type'].values:
omniscan_data = task_data[task_data['source_type'] == 'omniscan']
# Sort by timestamp and take earliest N captures
if 'timestamp' in omniscan_data.columns:
omniscan_data = omniscan_data.sort_values('timestamp')
num_captures = min(omniscan_sets, len(omniscan_data))
selected_captures = omniscan_data.head(num_captures)
all_parsed = []
for label in selected_captures['label']:
all_parsed.extend(parse_labels(label))
non_unobservable = [l for l in all_parsed if 'UNOBSERVABLE' not in l.upper()]
if non_unobservable:
task_covered = True
else:
task_unobservable.extend([l for l in all_parsed if 'UNOBSERVABLE' in l.upper()])
# Handle other sources for this task
if not task_covered:
for source in sources:
if source != 'omniscan':
source_data = task_data[task_data['source_type'] == source]
if not source_data.empty:
all_parsed = []
for label in source_data['label']:
all_parsed.extend(parse_labels(label))
non_unobservable = [l for l in all_parsed if 'UNOBSERVABLE' not in l.upper()]
if non_unobservable:
task_covered = True
break
else:
task_unobservable.extend([l for l in all_parsed if 'UNOBSERVABLE' in l.upper()])
task_coverage[task] = task_covered
if not task_covered:
all_unobservable_labels.extend(task_unobservable)
# ASIN is covered only if ALL tasks are covered
asin_covered = all(task_coverage.values()) if task_coverage else False
# Custom rule for German ingredients/allergens
if ('ingredients-german' in tasks_to_process and 'iallergens-german' in tasks_to_process and
'ingredients-german' in task_coverage and 'iallergens-german' in task_coverage):
# If ingredients-german is covered but iallergens-german is not
if (task_coverage['ingredients-german'] and not task_coverage['iallergens-german']):
# Check if iallergens-german failed only due to "UNOBSERVABLE" (not other unobservable types)
iallergens_data = asin_data[asin_data['task'] == 'iallergens-german']
if not iallergens_data.empty:
all_iallergens_labels = []
for label in iallergens_data['label']:
all_iallergens_labels.extend(parse_labels(label))
# Check if all unobservable labels are exactly "UNOBSERVABLE"
if (all_iallergens_labels and
all(label.upper() == 'UNOBSERVABLE' for label in all_iallergens_labels)):
asin_covered = True
task_coverage['iallergens-german'] = True
results[asin] = {
'covered': asin_covered,
'task_coverage': task_coverage,
'unobservable_labels': all_unobservable_labels
}
return results
def create_analysis(csv_file, marketing, omniscan, pics, detailed_page, omniscan_sets, task_checkboxes):
if csv_file is None:
return None, "Please upload a CSV file"
df = pd.read_csv(csv_file.name)
# Get selected tasks
selected_tasks = task_checkboxes if task_checkboxes else []
if not selected_tasks:
return None, "Please select at least one task"
# Get available sources
available_sources = df['source_type'].unique()
# Build selected sources list
sources = []
if marketing and 'marketing' in available_sources:
sources.append('marketing')
if omniscan and 'omniscan' in available_sources:
sources.append('omniscan')
if pics and 'pics' in available_sources:
sources.append('pics')
if detailed_page and 'detailed_page' in available_sources:
sources.append('detailed_page')
if not sources:
return None, "Please select at least one available source"
# Analyze coverage
results = analyze_coverage(df, sources, omniscan_sets, selected_tasks)
# Calculate coverage statistics
total_asins = len(results)
covered_asins = sum(1 for r in results.values() if r['covered'])
uncovered_asins = total_asins - covered_asins
asin_coverage_rate = covered_asins / total_asins if total_asins > 0 else 0
uncovered_rate = uncovered_asins / total_asins if total_asins > 0 else 0
# Collect unobservable labels only from uncovered ASINs
all_unobservable = []
for result in results.values():
if not result['covered']:
all_unobservable.extend(result['unobservable_labels'])
# Create pie chart for unobservable issues
if all_unobservable:
unobservable_counts = pd.Series(all_unobservable).value_counts()
fig = px.pie(values=unobservable_counts.values, names=unobservable_counts.index,
title=f"Unobservable Issues from {uncovered_asins} Uncovered ASINs ({uncovered_rate:.1%} of total)")
else:
fig = px.pie(values=[1], names=['All Covered'],
title=f"ASIN Coverage: {asin_coverage_rate:.1%}")
stats = f"## πŸ“Š **ASIN Coverage: {covered_asins}/{total_asins} ASINs ({asin_coverage_rate:.1%})**"
return fig, stats
def create_source_coverage_analysis(csv_file, marketing, omniscan, pics, detailed_page, task_checkboxes):
if csv_file is None:
return None, "Please upload a CSV file"
df = pd.read_csv(csv_file.name)
# Get selected tasks
selected_tasks = task_checkboxes if task_checkboxes else []
if not selected_tasks:
return None, "Please select at least one task"
# Get available sources
available_sources = df['source_type'].unique()
# Build selected sources list
selected_sources = []
if marketing and 'marketing' in available_sources:
selected_sources.append('marketing')
if omniscan and 'omniscan' in available_sources:
selected_sources.append('omniscan')
if pics and 'pics' in available_sources:
selected_sources.append('pics')
if detailed_page and 'detailed_page' in available_sources:
selected_sources.append('detailed_page')
if not selected_sources:
return None, "Please select at least one available source"
# Calculate coverage for all combinations using the same logic as main analysis
coverage_data = []
# Single sources
for source in selected_sources:
results = analyze_coverage(df, [source], 1, selected_tasks)
covered_asins = sum(1 for r in results.values() if r['covered'])
coverage_data.append((source, covered_asins))
# Pairs
for combo in combinations(selected_sources, 2):
results = analyze_coverage(df, list(combo), 1, selected_tasks)
covered_asins = sum(1 for r in results.values() if r['covered'])
coverage_data.append((f"{combo[0]}<br>{combo[1]}", covered_asins))
# All combinations of 3 or more
if len(selected_sources) >= 3:
for r in range(3, len(selected_sources) + 1):
for combo in combinations(selected_sources, r):
results = analyze_coverage(df, list(combo), 1, selected_tasks)
covered_asins = sum(1 for res in results.values() if res['covered'])
coverage_data.append(("<br>".join(combo), covered_asins))
# Create spider/radar chart
labels, values = zip(*coverage_data)
# Calculate total ASINs for percentage calculation
total_asins = len(df['asin'].unique())
# Create text labels with value and percentage
text_labels = [f"{value} ({value/total_asins*100:.1f}%)" for value in values]
fig = go.Figure()
fig.add_trace(go.Scatterpolar(
r=values,
theta=labels,
fill='toself',
name='ASIN Coverage',
line_color='rgb(0, 123, 255)',
fillcolor='rgba(0, 123, 255, 0.3)',
text=text_labels,
textposition='top right',
mode='markers+text+lines'
))
fig.update_layout(
polar=dict(
radialaxis=dict(
visible=False, # Hide radial axis values
range=[0, max(values) * 1.1] if values else [0, 100]
)
),
title='ASIN Coverage by Source Combination (Spider Chart)',
height=600,
showlegend=True
)
# Create statistics text
stats_text = "## πŸ“Š **Source Coverage Statistics**\n```\n"
for label, value in coverage_data:
stats_text += f"{label:<30}: {value} ASINs\n"
stats_text += "```"
return fig, stats_text
def create_omniscan_capture_analysis(csv_file, task_checkboxes):
if csv_file is None:
return None, "Please upload a CSV file"
df = pd.read_csv(csv_file.name)
# Get selected tasks
selected_tasks = task_checkboxes if task_checkboxes else []
if not selected_tasks:
return None, "Please select at least one task"
# Check if omniscan data exists
if 'omniscan' not in df['source_type'].values:
return None, "No omniscan data found in the dataset"
# Get max omniscan captures available
max_captures = df[df['source_type'] == 'omniscan'].groupby('asin').size().max()
# Analyze coverage for different numbers of omniscan captures
capture_data = []
for num_captures in range(1, min(max_captures + 1, 11)): # Limit to 10 captures max
results = analyze_coverage(df, ['omniscan'], num_captures, selected_tasks)
covered_asins = sum(1 for r in results.values() if r['covered'])
total_asins = len(results)
coverage_pct = (covered_asins / total_asins * 100) if total_asins > 0 else 0
capture_data.append((num_captures, covered_asins, coverage_pct))
# Create line chart
captures, counts, percentages = zip(*capture_data)
fig = go.Figure()
fig.add_trace(go.Scatter(
x=captures,
y=percentages,
mode='lines+markers',
name='Coverage %',
line=dict(color='rgb(0, 123, 255)', width=3),
marker=dict(size=8),
text=[f"{count} ASINs ({pct:.1f}%)" for count, pct in zip(counts, percentages)],
textposition='top center'
))
fig.update_layout(
title='Coverage Gains by Number of Omniscan Captures',
xaxis_title='Number of Omniscan Captures',
yaxis_title='Coverage Percentage (%)',
height=500,
showlegend=False
)
# Create statistics text
stats_text = "## πŸ“ˆ **Omniscan Capture Analysis**\n```\n"
for captures, count, pct in capture_data:
gain = pct - capture_data[0][2] if captures > 1 else 0
stats_text += f"{captures} capture(s): {count:3d} ASINs ({pct:5.1f}%) [+{gain:4.1f}% gain]\n"
stats_text += "```"
return fig, stats_text
def update_source_buttons(csv_file):
if csv_file is None:
return (gr.Checkbox(interactive=False), gr.Checkbox(interactive=False),
gr.Checkbox(interactive=False), gr.Checkbox(interactive=False),
gr.Slider(interactive=False), gr.CheckboxGroup(choices=[], interactive=False))
df = pd.read_csv(csv_file.name)
available_sources = df['source_type'].unique()
available_tasks = sorted(df['task'].unique().tolist())
marketing_available = 'marketing' in available_sources
omniscan_available = 'omniscan' in available_sources
pics_available = 'pics' in available_sources
detailed_page_available = 'detailed_page' in available_sources
# Get max omniscan sets for slider
max_omniscan = 1
if omniscan_available:
max_omniscan = df[df['source_type'] == 'omniscan'].groupby('asin').size().max()
return (gr.Checkbox(interactive=marketing_available, value=False),
gr.Checkbox(interactive=omniscan_available, value=False),
gr.Checkbox(interactive=pics_available, value=False),
gr.Checkbox(interactive=detailed_page_available, value=False),
gr.Slider(minimum=1, maximum=min(max_omniscan, 10), value=1, step=1, interactive=omniscan_available),
gr.CheckboxGroup(choices=available_tasks, value=[], interactive=True))
with gr.Blocks() as demo:
gr.Markdown("# Omniscan Multi-Capture Multi-Source Analysis Tool")
csv_input = gr.File(label="Upload CSV file", file_types=[".csv"])
with gr.Row():
with gr.Column():
gr.Markdown("### πŸ“Š Data Sources")
marketing_cb = gr.Checkbox(label="Marketing", interactive=False)
omniscan_cb = gr.Checkbox(label="Omniscan", interactive=False)
pics_cb = gr.Checkbox(label="PICS", interactive=False)
detailed_page_cb = gr.Checkbox(label="Detailed Page Text", interactive=False)
gr.Markdown("### 🏷️ Task Selection")
task_checkboxes = gr.CheckboxGroup(label="Select Tasks", choices=[], interactive=False)
gr.Markdown("### βš™οΈ Omniscan Settings")
omniscan_sets = gr.Slider(label="Max Omniscan Image Sets", minimum=1, maximum=10,
value=1, step=1, interactive=False)
with gr.Column():
analyze_btn = gr.Button("πŸ“ˆ Analyze Coverage")
stats_output = gr.Markdown(label="Statistics")
plot_output = gr.Plot()
gr.Markdown("---")
source_coverage_btn = gr.Button("πŸ” Analyze Source Coverage")
source_stats_output = gr.Markdown(label="Source Coverage Statistics")
source_plot_output = gr.Plot()
gr.Markdown("---")
omniscan_capture_btn = gr.Button("πŸ“ˆ Analyze Omniscan Captures")
omniscan_capture_stats_output = gr.Markdown(label="Omniscan Capture Statistics")
omniscan_capture_plot_output = gr.Plot()
# Update source availability when CSV is uploaded
csv_input.change(
update_source_buttons,
inputs=csv_input,
outputs=[marketing_cb, omniscan_cb, pics_cb, detailed_page_cb, omniscan_sets, task_checkboxes]
)
# Run analysis
analyze_btn.click(
create_analysis,
inputs=[csv_input, marketing_cb, omniscan_cb, pics_cb, detailed_page_cb, omniscan_sets, task_checkboxes],
outputs=[plot_output, stats_output]
)
# Run source coverage analysis
source_coverage_btn.click(
create_source_coverage_analysis,
inputs=[csv_input, marketing_cb, omniscan_cb, pics_cb, detailed_page_cb, task_checkboxes],
outputs=[source_plot_output, source_stats_output]
)
# Run omniscan capture analysis
omniscan_capture_btn.click(
create_omniscan_capture_analysis,
inputs=[csv_input, task_checkboxes],
outputs=[omniscan_capture_plot_output, omniscan_capture_stats_output]
)
#demo.launch()