demo / page_modules /statistics.py
VeuReu's picture
Upload 6 files
dea50db verified
"""UI logic for the "Estad铆stiques" page."""
from __future__ import annotations
import re
from pathlib import Path
from typing import Dict, List, Tuple
import altair as alt
import pandas as pd
import streamlit as st
import yaml
from databases import (
get_audiodescription_history,
get_feedback_rows_for_video_version,
get_feedback_video_stats,
get_videos_from_audiodescriptions,
)
def render_statistics_page() -> None:
st.header("Estad铆stiques")
col1, col2 = st.columns(2)
with col1:
mode_label = st.selectbox(
"Mode d'agregaci贸",
["mitjana", "mediana", "inicial", "actual"],
help=(
"mitjana: mitjana de totes les valoracions; "
"mediana: valor central; "
"inicial: primer registre en el temps; "
"actual: darrer registre en el temps."
),
)
# Etiquetes humanes per als sis 铆tems (a partir de config.yaml -> labels)
cfg_path = Path(__file__).resolve().parent.parent / "config.yaml"
try:
with cfg_path.open("r", encoding="utf-8") as f:
cfg = yaml.safe_load(f) or {}
except FileNotFoundError:
cfg = {}
labels_cfg = cfg.get("labels", {}) or {}
raw_labels = [
labels_cfg.get("score_1", "score_1"),
labels_cfg.get("score_2", "score_2"),
labels_cfg.get("score_3", "score_3"),
labels_cfg.get("score_4", "score_4"),
labels_cfg.get("score_5", "score_5"),
labels_cfg.get("score_6", "score_6"),
]
label_map = {f"score_{i+1}": raw_labels[i] for i in range(6)}
order_options = {"nom": "video_name"}
for i in range(6):
key = f"score_{i+1}"
human = raw_labels[i]
order_options[human] = key
with col2:
order_label = st.selectbox(
"Ordenar per",
list(order_options.keys()),
help=(
"Indica el camp pel qual s'ordenen els v铆deos a la taula: "
"nom del v铆deo o alguna de les sis caracter铆stiques d'avaluaci贸."
),
)
stats = get_feedback_video_stats(agg=mode_label)
if not stats:
st.caption("Encara no hi ha valoracions a demo/temp/feedback.db.")
st.stop()
df = pd.DataFrame(stats)
# Ordenaci贸 segons el selector
order_key = order_options[order_label]
ascending = order_key == "video_name"
df = df.sort_values(order_key, ascending=ascending, na_position="last")
# Preparar taula per mostrar: seleccionar columnes i arrodonir valors num猫rics
display_cols = [
"video_name",
"n",
"score_1",
"score_2",
"score_3",
"score_4",
"score_5",
"score_6",
]
df_display = df[display_cols].copy()
# Arrodonir scores a la unitat (0 decimals)
score_cols = [c for c in display_cols if c.startswith("score_")]
df_display[score_cols] = df_display[score_cols].round(0)
st.subheader("Taula agregada per v铆deo")
st.dataframe(
df_display.rename(columns=label_map),
use_container_width=True,
hide_index=True,
)
st.markdown("---")
st.subheader("Distribuci贸 temporal de l'audiodescripci贸 i personatges")
session_id = st.session_state.get("session_id", "")
role = None
if st.session_state.get("user") and isinstance(st.session_state.get("user"), dict):
role = st.session_state["user"].get("role")
accessible_rows = get_videos_from_audiodescriptions(session_id=session_id or None, role=role)
if not accessible_rows:
st.info("No hi ha cap v铆deo amb audiodescripci贸 disponible a audiodescriptions.db.")
return
video_row = st.selectbox(
"Selecciona un v铆deo per analitzar l'audiodescripci贸:",
accessible_rows,
format_func=lambda r: r["video_name"],
)
sha1 = video_row["sha1sum"]
selected_video_name = video_row["video_name"]
version_options = ["Salamandra", "MoE"]
version = st.selectbox("Versi贸 d'audiodescripci贸:", version_options)
hist_options = ["Original", "HITL OK", "HITL Test"]
hist_key_suffix = f"{sha1}_{version or 'none'}"
hist_choice = st.radio(
"Edici贸 d'audiodescripci贸 a analitzar",
hist_options,
index=1,
key=f"stats_ad_hist_choice_{hist_key_suffix}",
horizontal=True,
)
rows = get_audiodescription_history(sha1, version)
if not rows:
st.info("No s'ha trobat cap registre d'audiodescripci贸 per a aquest v铆deo i versi贸.")
return
row = rows[-1]
if hist_choice == "Original":
src_une = "une_ad"
elif hist_choice == "HITL OK":
src_une = "ok_une_ad"
else:
src_une = "test_une_ad"
srt_text = row[src_une] if src_une in row.keys() and row[src_une] else ""
if not srt_text:
st.info("No hi ha contingut UNE-153010 per a l'opci贸 seleccionada.")
return
ad_plus_time, ad_minus_time, character_times = _compute_time_distribution_from_srt(srt_text)
labels: List[str] = []
values: List[float] = []
if ad_plus_time > 0:
labels.append("AD+")
values.append(ad_plus_time)
if ad_minus_time > 0:
labels.append("AD-")
values.append(ad_minus_time)
for name, t in character_times.items():
if t > 0:
labels.append(name)
values.append(t)
if not labels:
st.info("No s'ha pogut calcular cap distribuci贸 temporal a partir del SRT proporcionat.")
return
total_time = sum(values)
legend_labels: List[str] = []
label_seconds: List[float] = []
label_pct: List[float] = []
for label, val in zip(labels, values):
pct = (val / total_time * 100.0) if total_time > 0 else 0.0
legend_labels.append(f"{label} ({val:.1f} s, {pct:.1f}%)")
label_seconds.append(val)
label_pct.append(pct)
pie_df = pd.DataFrame(
{
"label": labels,
"seconds": label_seconds,
"percent": label_pct,
"legend": legend_labels,
}
)
pie_chart = (
alt.Chart(pie_df)
.mark_arc()
.encode(
theta=alt.Theta("seconds", stack=True),
color=alt.Color("label", legend=alt.Legend(title="Categoria")),
tooltip=[
alt.Tooltip("label", title="Categoria"),
alt.Tooltip("seconds", title="Temps (s)", format=".1f"),
alt.Tooltip("percent", title="Percentatge", format=".1f"),
],
)
)
st.altair_chart(pie_chart, use_container_width=True)
total_duration, segments = _compute_timeline_segments_from_srt(srt_text)
if total_duration <= 0 or not segments:
return
y_order: List[str] = []
for base in ["AD-", "AD+"]:
if base in labels and base not in y_order:
y_order.append(base)
for label in labels:
if label in {"AD-", "AD+"}:
continue
if label not in y_order:
y_order.append(label)
seg_df = pd.DataFrame(
[
{
"category": cat,
"start_pct": (start_s / total_duration) * 100.0,
"end_pct": (end_s / total_duration) * 100.0,
}
for start_s, end_s, cat in segments
if cat in y_order and end_s > start_s
]
)
if seg_df.empty:
return
# Ordenaci贸 expl铆cita de categories per l'eix Y
category_scale = alt.Scale(domain=y_order)
timeline_chart = (
alt.Chart(seg_df)
.mark_bar(size=8)
.encode(
x=alt.X("start_pct", title="Percentatge de durada del v铆deo (%)", scale=alt.Scale(domain=[0, 100])),
x2="end_pct",
y=alt.Y("category", title="", scale=category_scale),
color=alt.Color("category", legend=None),
)
)
st.altair_chart(timeline_chart, use_container_width=True)
rows_fb = get_feedback_rows_for_video_version(selected_video_name, version)
if not rows_fb:
return
df_fb = pd.DataFrame([dict(r) for r in rows_fb])
if df_fb.empty:
return
st.markdown("---")
st.subheader("Distribuci贸 de les valoracions per a aquest v铆deo i versi贸")
score_cols = ["score_1", "score_2", "score_3", "score_4", "score_5", "score_6"]
# Convertir a format llarg per poder fer un facet amb Altair
long_df_rows: List[Dict[str, float | str]] = []
for col in score_cols:
if col not in df_fb.columns:
continue
for val in df_fb[col].dropna().tolist():
long_df_rows.append(
{
"score_name": col,
"score_label": label_map.get(col, col),
"value": float(val),
}
)
if not long_df_rows:
return
long_df = pd.DataFrame(long_df_rows)
box_chart = (
alt.Chart(long_df)
.mark_boxplot()
.encode(
y=alt.Y("value", title="Score (0-100)", scale=alt.Scale(domain=[0, 100])),
# Eix X nom茅s s'utilitza per separar facetes; amaguem etiquetes i ticks
x=alt.X("score_label:N", axis=alt.Axis(title=None, labels=False, ticks=False)),
)
.properties(width=130, height=150)
.facet(
column=alt.Column("score_label:N", title=None, header=alt.Header(labelAngle=0)),
)
.resolve_scale(y="shared")
)
# Configurar 2 files x 3 columnes aprox. mitjan莽ant amplada i wrapping
box_chart = box_chart.configure_facet(columns=3)
st.altair_chart(box_chart, use_container_width=True)
_SRT_TS = re.compile(
r"(?P<h1>\d{2}):(?P<m1>\d{2}):(?P<s1>\d{2}),(?P<ms1>\d{3})\s*-->\s*"
r"(?P<h2>\d{2}):(?P<m2>\d{2}):(?P<s2>\d{2}),(?P<ms2>\d{3})"
)
def _ts_to_seconds(h: str, m: str, s: str, ms: str) -> float:
return int(h) * 3600 + int(m) * 60 + int(s) + int(ms) / 1000.0
def _split_srt_blocks(srt_text: str) -> List[str]:
text = srt_text.replace("\r\n", "\n").replace("\r", "\n")
return [b.strip() for b in re.split(r"\n\s*\n", text) if b.strip()]
def _parse_block_time_and_lines(block: str) -> Tuple[Tuple[float, float] | None, List[str]]:
lines = block.split("\n")
if len(lines) < 2:
return None, []
ts_line = lines[1].strip()
m = _SRT_TS.match(ts_line)
if not m:
return None, []
start_s = _ts_to_seconds(m["h1"], m["m1"], m["s1"], m["ms1"])
end_s = _ts_to_seconds(m["h2"], m["m2"], m["s2"], m["ms2"])
return (start_s, end_s), [l.strip() for l in lines[2:] if l.strip()]
def _compute_time_distribution_from_srt(srt_text: str) -> Tuple[float, float, Dict[str, float]]:
ad_plus = 0.0
ad_minus = 0.0
character_times: Dict[str, float] = {}
blocks = _split_srt_blocks(srt_text)
for block in blocks:
time_info, text_lines = _parse_block_time_and_lines(block)
if not time_info or not text_lines:
continue
start_s, end_s = time_info
duration = max(0.0, end_s - start_s)
if duration <= 0:
continue
first_line = text_lines[0]
if first_line.startswith("(AD)"):
ad_text_parts: List[str] = []
for line in text_lines:
if line.startswith("(AD)"):
ad_text_parts.append(line[len("(AD)") :].lstrip())
ad_text = " ".join(ad_text_parts).strip()
if ad_text:
num_words = len(re.findall(r"\w+", ad_text, flags=re.UNICODE))
else:
num_words = 0
spoken_time = num_words / 2.5 if num_words > 0 else 0.0
if spoken_time >= duration:
ad_plus += duration
else:
ad_plus += spoken_time
ad_minus += duration - spoken_time
continue
m_char = re.match(r"^([A-Z脕脡脥脫脷脌脠脤脪脵脟脺脩][^:]{0,40}):", first_line)
if m_char:
name = m_char.group(1).strip()
character_times[name] = character_times.get(name, 0.0) + duration
return ad_plus, ad_minus, character_times
def _compute_timeline_segments_from_srt(srt_text: str) -> Tuple[float, List[Tuple[float, float, str]]]:
total_duration = 0.0
segments: List[Tuple[float, float, str]] = []
blocks = _split_srt_blocks(srt_text)
for block in blocks:
time_info, text_lines = _parse_block_time_and_lines(block)
if not time_info or not text_lines:
continue
start_s, end_s = time_info
duration = max(0.0, end_s - start_s)
if duration <= 0:
continue
if end_s > total_duration:
total_duration = end_s
first_line = text_lines[0]
if first_line.startswith("(AD)"):
ad_text_parts: List[str] = []
for line in text_lines:
if line.startswith("(AD)"):
ad_text_parts.append(line[len("(AD)") :].lstrip())
ad_text = " ".join(ad_text_parts).strip()
if ad_text:
num_words = len(re.findall(r"\w+", ad_text, flags=re.UNICODE))
else:
num_words = 0
spoken_time = num_words / 2.5 if num_words > 0 else 0.0
spoken_time = min(spoken_time, duration)
if spoken_time > 0:
segments.append((start_s, start_s + spoken_time, "AD+"))
rest = duration - spoken_time
if rest > 0:
segments.append((start_s + spoken_time, end_s, "AD-"))
continue
m_char = re.match(r"^([A-Z脕脡脥脫脷脌脠脤脪脵脟脺脩][^:]{0,40}):", first_line)
if m_char:
name = m_char.group(1).strip()
segments.append((start_s, end_s, name))
return total_duration, segments