"""UI logic for the "Estadístiques" page.""" from __future__ import annotations import re from pathlib import Path from typing import Dict, List, Tuple import altair as alt import pandas as pd import streamlit as st import yaml from databases import ( get_audiodescription_history, get_feedback_rows_for_video_version, get_feedback_video_stats, get_videos_from_audiodescriptions, ) def render_statistics_page() -> None: st.header("Estadístiques") col1, col2 = st.columns(2) with col1: mode_label = st.selectbox( "Mode d'agregació", ["mitjana", "mediana", "inicial", "actual"], help=( "mitjana: mitjana de totes les valoracions; " "mediana: valor central; " "inicial: primer registre en el temps; " "actual: darrer registre en el temps." ), ) # Etiquetes humanes per als sis ítems (a partir de config.yaml -> labels) cfg_path = Path(__file__).resolve().parent.parent / "config.yaml" try: with cfg_path.open("r", encoding="utf-8") as f: cfg = yaml.safe_load(f) or {} except FileNotFoundError: cfg = {} labels_cfg = cfg.get("labels", {}) or {} raw_labels = [ labels_cfg.get("score_1", "score_1"), labels_cfg.get("score_2", "score_2"), labels_cfg.get("score_3", "score_3"), labels_cfg.get("score_4", "score_4"), labels_cfg.get("score_5", "score_5"), labels_cfg.get("score_6", "score_6"), ] label_map = {f"score_{i+1}": raw_labels[i] for i in range(6)} order_options = {"nom": "video_name"} for i in range(6): key = f"score_{i+1}" human = raw_labels[i] order_options[human] = key with col2: order_label = st.selectbox( "Ordenar per", list(order_options.keys()), help=( "Indica el camp pel qual s'ordenen els vídeos a la taula: " "nom del vídeo o alguna de les sis característiques d'avaluació." ), ) stats = get_feedback_video_stats(agg=mode_label) if not stats: st.caption("Encara no hi ha valoracions a demo/temp/feedback.db.") st.stop() df = pd.DataFrame(stats) # Ordenació segons el selector order_key = order_options[order_label] ascending = order_key == "video_name" df = df.sort_values(order_key, ascending=ascending, na_position="last") # Preparar taula per mostrar: seleccionar columnes i arrodonir valors numèrics display_cols = [ "video_name", "n", "score_1", "score_2", "score_3", "score_4", "score_5", "score_6", ] df_display = df[display_cols].copy() # Arrodonir scores a la unitat (0 decimals) score_cols = [c for c in display_cols if c.startswith("score_")] df_display[score_cols] = df_display[score_cols].round(0) st.subheader("Taula agregada per vídeo") st.dataframe( df_display.rename(columns=label_map), use_container_width=True, hide_index=True, ) st.markdown("---") st.subheader("Distribució temporal de l'audiodescripció i personatges") session_id = st.session_state.get("session_id", "") role = None if st.session_state.get("user") and isinstance(st.session_state.get("user"), dict): role = st.session_state["user"].get("role") accessible_rows = get_videos_from_audiodescriptions(session_id=session_id or None, role=role) if not accessible_rows: st.info("No hi ha cap vídeo amb audiodescripció disponible a audiodescriptions.db.") return video_row = st.selectbox( "Selecciona un vídeo per analitzar l'audiodescripció:", accessible_rows, format_func=lambda r: r["video_name"], ) sha1 = video_row["sha1sum"] selected_video_name = video_row["video_name"] version_options = ["Salamandra", "MoE"] version = st.selectbox("Versió d'audiodescripció:", version_options) hist_options = ["Original", "HITL OK", "HITL Test"] hist_key_suffix = f"{sha1}_{version or 'none'}" hist_choice = st.radio( "Edició d'audiodescripció a analitzar", hist_options, index=1, key=f"stats_ad_hist_choice_{hist_key_suffix}", horizontal=True, ) rows = get_audiodescription_history(sha1, version) if not rows: st.info("No s'ha trobat cap registre d'audiodescripció per a aquest vídeo i versió.") return row = rows[-1] if hist_choice == "Original": src_une = "une_ad" elif hist_choice == "HITL OK": src_une = "ok_une_ad" else: src_une = "test_une_ad" srt_text = row[src_une] if src_une in row.keys() and row[src_une] else "" if not srt_text: st.info("No hi ha contingut UNE-153010 per a l'opció seleccionada.") return ad_plus_time, ad_minus_time, character_times = _compute_time_distribution_from_srt(srt_text) labels: List[str] = [] values: List[float] = [] if ad_plus_time > 0: labels.append("AD+") values.append(ad_plus_time) if ad_minus_time > 0: labels.append("AD-") values.append(ad_minus_time) for name, t in character_times.items(): if t > 0: labels.append(name) values.append(t) if not labels: st.info("No s'ha pogut calcular cap distribució temporal a partir del SRT proporcionat.") return total_time = sum(values) legend_labels: List[str] = [] label_seconds: List[float] = [] label_pct: List[float] = [] for label, val in zip(labels, values): pct = (val / total_time * 100.0) if total_time > 0 else 0.0 legend_labels.append(f"{label} ({val:.1f} s, {pct:.1f}%)") label_seconds.append(val) label_pct.append(pct) pie_df = pd.DataFrame( { "label": labels, "seconds": label_seconds, "percent": label_pct, "legend": legend_labels, } ) pie_chart = ( alt.Chart(pie_df) .mark_arc() .encode( theta=alt.Theta("seconds", stack=True), color=alt.Color("label", legend=alt.Legend(title="Categoria")), tooltip=[ alt.Tooltip("label", title="Categoria"), alt.Tooltip("seconds", title="Temps (s)", format=".1f"), alt.Tooltip("percent", title="Percentatge", format=".1f"), ], ) ) st.altair_chart(pie_chart, use_container_width=True) total_duration, segments = _compute_timeline_segments_from_srt(srt_text) if total_duration <= 0 or not segments: return y_order: List[str] = [] for base in ["AD-", "AD+"]: if base in labels and base not in y_order: y_order.append(base) for label in labels: if label in {"AD-", "AD+"}: continue if label not in y_order: y_order.append(label) seg_df = pd.DataFrame( [ { "category": cat, "start_pct": (start_s / total_duration) * 100.0, "end_pct": (end_s / total_duration) * 100.0, } for start_s, end_s, cat in segments if cat in y_order and end_s > start_s ] ) if seg_df.empty: return # Ordenació explícita de categories per l'eix Y category_scale = alt.Scale(domain=y_order) timeline_chart = ( alt.Chart(seg_df) .mark_bar(size=8) .encode( x=alt.X("start_pct", title="Percentatge de durada del vídeo (%)", scale=alt.Scale(domain=[0, 100])), x2="end_pct", y=alt.Y("category", title="", scale=category_scale), color=alt.Color("category", legend=None), ) ) st.altair_chart(timeline_chart, use_container_width=True) rows_fb = get_feedback_rows_for_video_version(selected_video_name, version) if not rows_fb: return df_fb = pd.DataFrame([dict(r) for r in rows_fb]) if df_fb.empty: return st.markdown("---") st.subheader("Distribució de les valoracions per a aquest vídeo i versió") score_cols = ["score_1", "score_2", "score_3", "score_4", "score_5", "score_6"] # Convertir a format llarg per poder fer un facet amb Altair long_df_rows: List[Dict[str, float | str]] = [] for col in score_cols: if col not in df_fb.columns: continue for val in df_fb[col].dropna().tolist(): long_df_rows.append( { "score_name": col, "score_label": label_map.get(col, col), "value": float(val), } ) if not long_df_rows: return long_df = pd.DataFrame(long_df_rows) box_chart = ( alt.Chart(long_df) .mark_boxplot() .encode( y=alt.Y("value", title="Score (0-100)", scale=alt.Scale(domain=[0, 100])), # Eix X només s'utilitza per separar facetes; amaguem etiquetes i ticks x=alt.X("score_label:N", axis=alt.Axis(title=None, labels=False, ticks=False)), ) .properties(width=130, height=150) .facet( column=alt.Column("score_label:N", title=None, header=alt.Header(labelAngle=0)), ) .resolve_scale(y="shared") ) # Configurar 2 files x 3 columnes aprox. mitjançant amplada i wrapping box_chart = box_chart.configure_facet(columns=3) st.altair_chart(box_chart, use_container_width=True) _SRT_TS = re.compile( r"(?P

\d{2}):(?P\d{2}):(?P\d{2}),(?P\d{3})\s*-->\s*" r"(?P

\d{2}):(?P\d{2}):(?P\d{2}),(?P\d{3})" ) def _ts_to_seconds(h: str, m: str, s: str, ms: str) -> float: return int(h) * 3600 + int(m) * 60 + int(s) + int(ms) / 1000.0 def _split_srt_blocks(srt_text: str) -> List[str]: text = srt_text.replace("\r\n", "\n").replace("\r", "\n") return [b.strip() for b in re.split(r"\n\s*\n", text) if b.strip()] def _parse_block_time_and_lines(block: str) -> Tuple[Tuple[float, float] | None, List[str]]: lines = block.split("\n") if len(lines) < 2: return None, [] ts_line = lines[1].strip() m = _SRT_TS.match(ts_line) if not m: return None, [] start_s = _ts_to_seconds(m["h1"], m["m1"], m["s1"], m["ms1"]) end_s = _ts_to_seconds(m["h2"], m["m2"], m["s2"], m["ms2"]) return (start_s, end_s), [l.strip() for l in lines[2:] if l.strip()] def _compute_time_distribution_from_srt(srt_text: str) -> Tuple[float, float, Dict[str, float]]: ad_plus = 0.0 ad_minus = 0.0 character_times: Dict[str, float] = {} blocks = _split_srt_blocks(srt_text) for block in blocks: time_info, text_lines = _parse_block_time_and_lines(block) if not time_info or not text_lines: continue start_s, end_s = time_info duration = max(0.0, end_s - start_s) if duration <= 0: continue first_line = text_lines[0] if first_line.startswith("(AD)"): ad_text_parts: List[str] = [] for line in text_lines: if line.startswith("(AD)"): ad_text_parts.append(line[len("(AD)") :].lstrip()) ad_text = " ".join(ad_text_parts).strip() if ad_text: num_words = len(re.findall(r"\w+", ad_text, flags=re.UNICODE)) else: num_words = 0 spoken_time = num_words / 2.5 if num_words > 0 else 0.0 if spoken_time >= duration: ad_plus += duration else: ad_plus += spoken_time ad_minus += duration - spoken_time continue m_char = re.match(r"^([A-ZÁÉÍÓÚÀÈÌÒÙÇÜÑ][^:]{0,40}):", first_line) if m_char: name = m_char.group(1).strip() character_times[name] = character_times.get(name, 0.0) + duration return ad_plus, ad_minus, character_times def _compute_timeline_segments_from_srt(srt_text: str) -> Tuple[float, List[Tuple[float, float, str]]]: total_duration = 0.0 segments: List[Tuple[float, float, str]] = [] blocks = _split_srt_blocks(srt_text) for block in blocks: time_info, text_lines = _parse_block_time_and_lines(block) if not time_info or not text_lines: continue start_s, end_s = time_info duration = max(0.0, end_s - start_s) if duration <= 0: continue if end_s > total_duration: total_duration = end_s first_line = text_lines[0] if first_line.startswith("(AD)"): ad_text_parts: List[str] = [] for line in text_lines: if line.startswith("(AD)"): ad_text_parts.append(line[len("(AD)") :].lstrip()) ad_text = " ".join(ad_text_parts).strip() if ad_text: num_words = len(re.findall(r"\w+", ad_text, flags=re.UNICODE)) else: num_words = 0 spoken_time = num_words / 2.5 if num_words > 0 else 0.0 spoken_time = min(spoken_time, duration) if spoken_time > 0: segments.append((start_s, start_s + spoken_time, "AD+")) rest = duration - spoken_time if rest > 0: segments.append((start_s + spoken_time, end_s, "AD-")) continue m_char = re.match(r"^([A-ZÁÉÍÓÚÀÈÌÒÙÇÜÑ][^:]{0,40}):", first_line) if m_char: name = m_char.group(1).strip() segments.append((start_s, end_s, name)) return total_duration, segments