Gintarė Zokaitytė
Cache validation logic fix
c4ef01c
import re
import os
import pickle
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
import streamlit as st
import pandas as pd
import plotly.graph_objects as go
import requests
GOAL_WORDS = 2_200_000
CATEGORY_GOAL = 1_100_000
OUR_TEAM_PROJECT_IDS = {29, 30, 31, 32, 33, 37}
ANNOTATED_STATES = ["Acceptable", "No Rating"]
GOAL_STATES = ["Acceptable", "No Rating", "ReqAttn (entities)"]
TEAM_COLORS = {
"A.K. (22)": "#0066cc",
"J.Š. (23)": "#00cccc",
"J.Š. (24)": "#00cc00",
"G.Z. (25)": "#ff9900",
"L.M. (26)": "#9933ff",
"M.M. (27)": "#cc0000",
}
# Cache file location (persists between runs)
CACHE_FILE = Path(".cache.pkl")
st.set_page_config(page_title="Annotation Progress", page_icon="📊", layout="wide")
def fetch_project_data(proj, url, headers):
"""Fetch data from one project (for parallel execution)."""
pid, name, task_count = proj["id"], proj.get("title", f"Project {proj['id']}"), proj.get("task_number", 0)
group = "Our Team" if pid in OUR_TEAM_PROJECT_IDS else "Others"
rows = []
submitted_count = 0 # Track submitted (annotated) tasks
page = 1
while True:
resp = requests.get(f"{url}/api/projects/{pid}/tasks", headers=headers, params={"page": page, "page_size": 100}, timeout=30)
resp.raise_for_status()
data = resp.json()
tasks = data if isinstance(data, list) else data.get("tasks", [])
if not tasks:
break
for task in tasks:
task_data = task.get("data", {})
words = task_data.get("words") or len(task_data.get("text", "").split())
category = task_data.get("category")
annots = [a for a in task.get("annotations", []) if not a.get("was_cancelled")]
if not annots:
rows.append(
{
"project_id": pid,
"project": name,
"project_group": group,
"date": None,
"state": "Not Annotated",
"words": int(words),
"category": category,
}
)
continue
# Task has annotations - count as submitted
submitted_count += 1
ann = annots[0]
date = ann.get("created_at", "")[:10] or None
rating = None
for item in ann.get("result", []):
if item.get("type") == "choices" and item.get("from_name") == "text_rating":
rating = item.get("value", {}).get("choices", [None])[0]
break
has_entities = any(i.get("type") == "labels" for i in ann.get("result", []))
if rating is None:
state = "No Rating"
elif rating == "Requires Attention":
state = f"ReqAttn ({'entities' if has_entities else 'empty'})"
elif rating == "Unacceptable":
state = f"Unacceptable ({'entities' if has_entities else 'empty'})"
else:
state = "Acceptable"
rows.append(
{"project_id": pid, "project": name, "project_group": group, "date": date, "state": state, "words": int(words), "category": category}
)
if isinstance(data, list) and len(data) < 100:
break
if isinstance(data, dict) and not data.get("next"):
break
page += 1
return pid, task_count, submitted_count, rows
@st.cache_data(ttl=300)
def load_data(projects_hash):
"""Load annotation data from Label Studio with disk cache.
Args:
projects_hash: Hash of project states to invalidate Streamlit cache when projects change
"""
try:
url = st.secrets.get("LABEL_STUDIO_URL", os.getenv("LABEL_STUDIO_URL", "")).rstrip("/")
key = st.secrets.get("LABEL_STUDIO_API_KEY", os.getenv("LABEL_STUDIO_API_KEY", ""))
except (KeyError, FileNotFoundError, AttributeError):
url = os.getenv("LABEL_STUDIO_URL", "").rstrip("/")
key = os.getenv("LABEL_STUDIO_API_KEY", "")
if not url or not key:
st.error("Missing credentials. Set LABEL_STUDIO_URL and LABEL_STUDIO_API_KEY.")
st.stop()
headers = {"Authorization": f"Token {key}"}
# Fetch all projects
resp = requests.get(f"{url}/api/projects", headers=headers, timeout=30)
resp.raise_for_status()
projects = resp.json().get("results", [])
# Load cache
cache = {}
if CACHE_FILE.exists():
try:
with open(CACHE_FILE, "rb") as f:
cache = pickle.load(f)
except Exception:
cache = {}
# Check which projects need updating
projects_to_fetch = []
all_rows = []
for proj in projects:
pid = proj["id"]
task_count = proj.get("task_number", 0)
# Get submitted task count from Label Studio API
api_submitted_count = proj.get("num_tasks_with_annotations", 0)
cache_key = f"project_{pid}"
# Invalidate cache if:
# 1. No cache exists for this project
# 2. Total task count changed (new tasks added/removed)
# 3. Submitted task count changed (new annotations/submissions)
use_cache = False
if cache_key in cache:
cached = cache[cache_key]
# Use cache only if BOTH counts match
if (cached.get("task_count") == task_count and
cached.get("submitted_count") == api_submitted_count):
use_cache = True
if use_cache:
all_rows.extend(cache[cache_key]["rows"])
else:
projects_to_fetch.append(proj)
# Fetch updated projects in parallel
if projects_to_fetch:
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(fetch_project_data, proj, url, headers) for proj in projects_to_fetch]
progress = st.progress(0, text=f"Loading {len(projects_to_fetch)} projects...")
for i, future in enumerate(futures):
pid, task_count, submitted_count, rows = future.result()
all_rows.extend(rows)
cache[f"project_{pid}"] = {"task_count": task_count, "submitted_count": submitted_count, "rows": rows}
progress.progress((i + 1) / len(futures), text=f"Loaded {i + 1}/{len(futures)} projects")
progress.empty()
# Save cache
try:
with open(CACHE_FILE, "wb") as f:
pickle.dump(cache, f)
except Exception:
pass
# Create dataframe
df = pd.DataFrame(all_rows)
df["words"] = df["words"].astype(int)
df["date"] = pd.to_datetime(df["date"], errors="coerce")
df["is_annotated"] = df["state"].isin(ANNOTATED_STATES)
df["is_goal_state"] = df["state"].isin(GOAL_STATES)
return df
def get_projects_hash():
"""Fetch projects and return a hash of their states for cache invalidation."""
import hashlib
try:
url = st.secrets.get("LABEL_STUDIO_URL", os.getenv("LABEL_STUDIO_URL", "")).rstrip("/")
key = st.secrets.get("LABEL_STUDIO_API_KEY", os.getenv("LABEL_STUDIO_API_KEY", ""))
except (KeyError, FileNotFoundError, AttributeError):
url = os.getenv("LABEL_STUDIO_URL", "").rstrip("/")
key = os.getenv("LABEL_STUDIO_API_KEY", "")
if not url or not key:
return "no-credentials"
headers = {"Authorization": f"Token {key}"}
resp = requests.get(f"{url}/api/projects", headers=headers, timeout=30)
resp.raise_for_status()
projects = resp.json().get("results", [])
# Create hash from project states (id, task_number, num_tasks_with_annotations)
state_string = ""
for proj in projects:
pid = proj["id"]
task_count = proj.get("task_number", 0)
submitted_count = proj.get("num_tasks_with_annotations", 0)
state_string += f"{pid}:{task_count}:{submitted_count};"
return hashlib.md5(state_string.encode()).hexdigest()
def anonymize(name):
"""Convert '26 [Name Lastname]' to 'N.L. (26)'"""
if name == "Others":
return "Others"
match = re.match(r"(\d+)\s+\[(.+?)\]", name)
if match:
num, full = match.groups()
parts = full.split()
if len(parts) >= 2:
return f"{parts[0][0]}.{parts[-1][0]}. ({num})"
return name
st.title("📊 Annotation Progress Dashboard")
st.markdown("---")
# Load data
with st.spinner("Loading..."):
projects_hash = get_projects_hash()
df = load_data(projects_hash)
# Overview metrics
total = df[df["is_goal_state"]]["words"].sum()
remaining = GOAL_WORDS - total
progress = total / GOAL_WORDS * 100
col1, col2 = st.columns(2)
col1.metric("Progress toward 2.2M", f"{total:,}", f"{progress:.1f}%")
col2.metric("Remaining", f"{remaining:,}", f"{100 - progress:.1f}%")
st.markdown("---")
# Tabs
tab1, tab2 = st.tabs(["📊 Weekly Stats", "⏱️ Pacing"])
# ============== TAB 1: Weekly Stats ==============
with tab1:
st.caption("Goal states (Acceptable + No Rating + ReqAttn with entities)")
cutoff_date = pd.Timestamp("2025-12-22")
# Filter data - use GOAL_STATES to match progress metrics
df_week = df[df["is_goal_state"] & df["date"].notna()].copy()
df_week["week_start"] = df_week["date"] - pd.to_timedelta(df_week["date"].dt.dayofweek, unit="d")
df_week["member"] = df_week.apply(lambda r: anonymize(r["project"]) if r["project_group"] == "Our Team" else "Others", axis=1)
# Weekly pivot (all data)
weekly_all = df_week.pivot_table(index="week_start", columns="member", values="words", aggfunc="sum", fill_value=0).astype(int)
# Split into before and after cutoff
weekly_before = weekly_all[weekly_all.index < cutoff_date]
weekly_after = weekly_all[weekly_all.index >= cutoff_date]
# Ensure consistent columns
all_members = set(weekly_all.columns)
if "Others" not in all_members:
all_members.add("Others")
for member in all_members:
if member not in weekly_after.columns:
weekly_after[member] = 0
if member not in weekly_before.columns:
weekly_before[member] = 0
# Sort columns by total contribution
totals = weekly_all.sum().sort_values(ascending=False)
weekly_after = weekly_after[totals.index]
weekly_after["Total"] = weekly_after.sum(axis=1)
# Calculate "Before" summary row
before_totals = weekly_before[totals.index].sum()
before_totals["Total"] = before_totals.sum()
# Format weekly data for display
display = weekly_after.reset_index()
display["Week"] = display["week_start"].dt.strftime("%Y-%m-%d") + " - " + (display["week_start"] + pd.Timedelta(days=6)).dt.strftime("%Y-%m-%d")
display = display.drop("week_start", axis=1)
display = display[["Week"] + list(totals.index) + ["Total"]]
# Add "Before" row at the beginning
before_row = pd.DataFrame([{"Week": f"Before {cutoff_date.strftime('%Y-%m-%d')}", **before_totals}])
display = pd.concat([before_row, display], ignore_index=True)
# Add TOTAL row at the end
all_totals = weekly_all[totals.index].sum()
all_totals["Total"] = all_totals.sum()
total_row = pd.DataFrame([{"Week": "TOTAL", **all_totals}])
display = pd.concat([display, total_row], ignore_index=True)
# Format numbers
for col in display.columns:
if col != "Week":
display[col] = display[col].apply(lambda x: f"{int(x):,}" if pd.notna(x) else "")
# Style and show
def style_row(row):
if row["Week"] == "TOTAL":
return ["font-weight: bold; background-color: #f0f0f0;"] * len(row)
elif row["Week"].startswith("Before"):
return ["font-style: italic; background-color: #f9f9f9;"] * len(row)
return [""] * len(row)
styled = display.style.apply(style_row, axis=1).set_properties(subset=["Total"], **{"font-weight": "bold"})
st.dataframe(styled, hide_index=True, use_container_width=True)
# ============== TAB 2: Pacing ==============
with tab2:
st.subheader("Category Breakdown")
st.caption("Requirement: 1.1M words from each category")
# Split by status: Ready vs Needs Fixing
df_ready = df[df["is_annotated"]] # Acceptable + No Rating
df_needs_fixing = df[df["state"] == "ReqAttn (entities)"]
df_total = df[df["is_goal_state"]]
# Calculate by category
mok_ready = df_ready[df_ready["category"] == "mokslinis"]["words"].sum()
mok_fixing = df_needs_fixing[df_needs_fixing["category"] == "mokslinis"]["words"].sum()
mok_total = mok_ready + mok_fixing
zin_ready = df_ready[df_ready["category"] == "ziniasklaida"]["words"].sum()
zin_fixing = df_needs_fixing[df_needs_fixing["category"] == "ziniasklaida"]["words"].sum()
zin_total = zin_ready + zin_fixing
total_ready = mok_ready + zin_ready
total_fixing = mok_fixing + zin_fixing
total_all = total_ready + total_fixing
cat_df = pd.DataFrame(
{
"Category": ["mokslinis", "ziniasklaida", "TOTAL"],
"Ready": [f"{mok_ready:,}", f"{zin_ready:,}", f"{total_ready:,}"],
"Needs Fixing": [f"{mok_fixing:,}", f"{zin_fixing:,}", f"{total_fixing:,}"],
"Total": [f"{mok_total:,}", f"{zin_total:,}", f"{total_all:,}"],
"Goal": [f"{CATEGORY_GOAL:,}", f"{CATEGORY_GOAL:,}", f"{GOAL_WORDS:,}"],
"Progress": [
f"{mok_total / CATEGORY_GOAL * 100:.1f}%",
f"{zin_total / CATEGORY_GOAL * 100:.1f}%",
f"{total_all / GOAL_WORDS * 100:.1f}%",
],
}
)
st.dataframe(cat_df, hide_index=True, use_container_width=True)
st.markdown("---")
st.header("Cumulative Progress & Projection")
# Cumulative data
df_cum = df[df["is_goal_state"] & df["date"].notna()].copy()
df_cum["member"] = df_cum.apply(lambda r: anonymize(r["project"]) if r["project_group"] == "Our Team" else "Others", axis=1)
daily = df_cum.groupby(["date", "member"])["words"].sum().reset_index()
pivot = daily.pivot_table(index="date", columns="member", values="words", fill_value=0)
cumulative = pivot.sort_index().cumsum()
cumulative["Total"] = cumulative.sum(axis=1)
cumulative = cumulative[cumulative.index >= pd.Timestamp("2025-12-18")]
# Projection calculation
last_date = cumulative.index[-1]
current = cumulative["Total"].iloc[-1]
# Calculate rate from last 14 days
lookback = cumulative[cumulative.index >= last_date - pd.Timedelta(days=14)]
if len(lookback) >= 2:
days = (last_date - lookback.index[0]).days or 1
rate = (current - lookback["Total"].iloc[0]) / days
days_left = (GOAL_WORDS - current) / rate if rate > 0 else 0
completion = last_date + pd.Timedelta(days=days_left)
weekly_rate = rate * 7
else:
rate = completion = weekly_rate = None
# Chart
fig = go.Figure()
# Goal lines
fig.add_hline(y=1_100_000, line_dash="dot", line_color="orange", annotation_text="Midpoint: 1.1M", annotation_position="top left")
fig.add_hline(y=GOAL_WORDS, line_dash="dot", line_color="red", annotation_text="Goal: 2.2M", annotation_position="top left")
# Members
members = [c for c in cumulative.columns if c not in ["Total", "Others"]]
members = sorted(members, key=lambda x: cumulative[x].iloc[-1], reverse=True)
if "Others" in cumulative.columns:
fig.add_trace(
go.Scatter(
x=cumulative.index,
y=cumulative["Others"],
name=f"Others: {cumulative['Others'].iloc[-1]:,.0f}",
mode="lines",
line=dict(width=2, color="#7f8c8d"),
)
)
for m in members:
color = TEAM_COLORS.get(m, "#34495e")
fig.add_trace(
go.Scatter(x=cumulative.index, y=cumulative[m], name=f"{m}: {cumulative[m].iloc[-1]:,.0f}", mode="lines", line=dict(width=2, color=color))
)
# Total
fig.add_trace(
go.Scatter(
x=cumulative.index,
y=cumulative["Total"],
name=f"Total: {cumulative['Total'].iloc[-1]:,.0f}",
mode="lines",
line=dict(width=3, color="#d4af37"),
fill="tozeroy",
fillcolor="rgba(212, 175, 55, 0.1)",
)
)
# Projection
if completion:
proj_dates = pd.date_range(last_date, completion, freq="D")
proj_vals = current + rate * (proj_dates - last_date).days
fig.add_trace(
go.Scatter(
x=proj_dates, y=proj_vals, name=f"Projection ({int(weekly_rate):,}/wk)", mode="lines", line=dict(width=3, color="#d4af37", dash="dot")
)
)
fig.add_trace(
go.Scatter(
x=[completion],
y=[GOAL_WORDS],
mode="markers+text",
marker=dict(size=14, color="#d4af37", symbol="diamond"),
text=[completion.strftime("%b %d")],
textposition="top center",
showlegend=False,
)
)
title = f"Cumulative Progress → Est. {completion.strftime('%B %d, %Y')}"
else:
title = "Cumulative Progress"
fig.update_layout(title=title, xaxis_title="Date", yaxis_title="Cumulative Words", height=600, hovermode="x unified", template="plotly_white")
fig.update_yaxes(tickformat=".2s")
st.plotly_chart(fig, use_container_width=True)
# Metrics
if completion:
st.markdown("### Pacing Estimates")
c1, c2, c3 = st.columns(3)
c1.metric("Per Week Rate", f"{int(weekly_rate):,} words")
c2.metric("Weeks Remaining", f"{days_left / 7:.1f} weeks")
c3.metric("Est. Completion", completion.strftime("%Y-%m-%d"))
# Footer
st.markdown("---")
st.caption(f"Updated: {pd.Timestamp.now(tz='Europe/Vilnius').strftime('%Y-%m-%d %H:%M:%S')} | Auto-refresh: 5 min | Press 'R' to refresh")