|
|
"""UI logic for the "Validació" page."""
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
from datetime import datetime
|
|
|
from pathlib import Path
|
|
|
from typing import Dict
|
|
|
import sys
|
|
|
|
|
|
import shutil
|
|
|
import os
|
|
|
import tempfile
|
|
|
import zipfile
|
|
|
import io
|
|
|
import requests
|
|
|
import streamlit as st
|
|
|
|
|
|
from databases import (
|
|
|
get_accessible_videos_with_sha1,
|
|
|
get_audiodescription,
|
|
|
get_audiodescription_history,
|
|
|
update_audiodescription_text,
|
|
|
update_audiodescription_info_ad,
|
|
|
log_action,
|
|
|
update_video_status,
|
|
|
get_video_owner_by_sha1,
|
|
|
get_videos_by_status,
|
|
|
insert_action,
|
|
|
)
|
|
|
from persistent_data_gate import _load_data_origin
|
|
|
|
|
|
|
|
|
def _log(msg: str) -> None:
|
|
|
"""Helper de logging a stderr amb timestamp (coherent amb auth.py)."""
|
|
|
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
sys.stderr.write(f"[{ts}] {msg}\n")
|
|
|
sys.stderr.flush()
|
|
|
|
|
|
|
|
|
def render_validation_page(
|
|
|
compliance_client,
|
|
|
runtime_videos: Path,
|
|
|
permissions: Dict[str, bool],
|
|
|
username: str,
|
|
|
) -> None:
|
|
|
if not permissions.get("validar", False):
|
|
|
st.warning("⚠️ No tens permisos per accedir a aquesta secció de validació.")
|
|
|
st.stop()
|
|
|
|
|
|
st.header("🔍 Validació de Vídeos")
|
|
|
|
|
|
tab_videos, tab_ads = st.tabs(["📹 Validar Vídeos", "🎬 Validar Audiodescripcions"])
|
|
|
|
|
|
base_dir = Path(__file__).resolve().parent.parent
|
|
|
data_origin = _load_data_origin(base_dir)
|
|
|
|
|
|
|
|
|
config_path = base_dir / "config.yaml"
|
|
|
user_sms_enabled = False
|
|
|
try:
|
|
|
if config_path.exists():
|
|
|
import yaml
|
|
|
|
|
|
with config_path.open("r", encoding="utf-8") as f:
|
|
|
cfg = yaml.safe_load(f) or {}
|
|
|
validation_cfg = cfg.get("validation", {}) or {}
|
|
|
user_sms_enabled = bool(validation_cfg.get("user_sms_enabled", False))
|
|
|
except Exception:
|
|
|
user_sms_enabled = False
|
|
|
|
|
|
|
|
|
session_id = st.session_state.get("session_id")
|
|
|
accessible_rows = get_accessible_videos_with_sha1(session_id) if data_origin == "internal" else []
|
|
|
|
|
|
|
|
|
base_media_dir = base_dir / "temp" / "media"
|
|
|
pending_root = base_dir / "temp" / "pending_videos"
|
|
|
|
|
|
with tab_videos:
|
|
|
st.subheader("📹 Validar Vídeos Pujats")
|
|
|
|
|
|
video_folders = []
|
|
|
|
|
|
|
|
|
col_refresh_list, _ = st.columns([1, 3])
|
|
|
with col_refresh_list:
|
|
|
if st.button("🔄 Actualitzar llista de vídeos pendents", key="refresh_pending_videos_list"):
|
|
|
st.rerun()
|
|
|
|
|
|
if data_origin == "internal":
|
|
|
|
|
|
if pending_root.exists() and pending_root.is_dir():
|
|
|
for folder in sorted(pending_root.iterdir()):
|
|
|
if not folder.is_dir():
|
|
|
continue
|
|
|
sha1 = folder.name
|
|
|
video_files = list(folder.glob("*.mp4")) + list(folder.glob("*.avi")) + list(folder.glob("*.mov"))
|
|
|
if not video_files:
|
|
|
continue
|
|
|
mod_time = folder.stat().st_mtime
|
|
|
fecha = datetime.fromtimestamp(mod_time).strftime("%Y-%m-%d %H:%M")
|
|
|
video_folders.append(
|
|
|
{
|
|
|
"sha1sum": sha1,
|
|
|
"video_name": sha1,
|
|
|
"path": str(folder),
|
|
|
"created_at": fecha,
|
|
|
"video_files": video_files,
|
|
|
}
|
|
|
)
|
|
|
else:
|
|
|
|
|
|
api_client = st.session_state.get("api_client")
|
|
|
if api_client is not None:
|
|
|
try:
|
|
|
resp = api_client.list_pending_videos()
|
|
|
_log(f"[pending_videos] list_pending_videos raw resp type= {type(resp)}")
|
|
|
_log(f"[pending_videos] list_pending_videos raw resp content= {repr(resp)}")
|
|
|
except Exception as e_list:
|
|
|
_log(f"[pending_videos] Error cridant list_pending_videos: {e_list}")
|
|
|
resp = {"error": "exception"}
|
|
|
|
|
|
pending_list = []
|
|
|
if isinstance(resp, dict) and not resp.get("error"):
|
|
|
|
|
|
if isinstance(resp.get("videos"), list):
|
|
|
pending_list = resp["videos"]
|
|
|
elif isinstance(resp.get("items"), list):
|
|
|
pending_list = resp["items"]
|
|
|
elif isinstance(resp.get("results"), list):
|
|
|
pending_list = resp["results"]
|
|
|
elif isinstance(resp, list):
|
|
|
pending_list = resp
|
|
|
elif isinstance(resp, list):
|
|
|
pending_list = resp
|
|
|
|
|
|
_log(f"[pending_videos] parsed pending_list length= {len(pending_list) if isinstance(pending_list, list) else 'N/A'}")
|
|
|
if isinstance(pending_list, list) and pending_list:
|
|
|
_log(f"[pending_videos] first items: {pending_list[:3]}")
|
|
|
|
|
|
for item in pending_list:
|
|
|
sha1 = item.get("sha1") or item.get("video_hash") or item.get("id")
|
|
|
if not sha1:
|
|
|
continue
|
|
|
video_name = item.get("latest_video") or sha1
|
|
|
|
|
|
folder = pending_root / sha1
|
|
|
if folder.exists():
|
|
|
video_files = list(folder.glob("*.mp4"))
|
|
|
else:
|
|
|
video_files = []
|
|
|
created_at = item.get("created_at") or datetime.utcnow().strftime("%Y-%m-%d %H:%M")
|
|
|
video_folders.append(
|
|
|
{
|
|
|
"sha1sum": sha1,
|
|
|
"video_name": video_name,
|
|
|
"path": str(folder),
|
|
|
"created_at": created_at,
|
|
|
"video_files": video_files,
|
|
|
}
|
|
|
)
|
|
|
|
|
|
if not video_folders:
|
|
|
st.info("📝 No hi ha vídeos pujats pendents de validació.")
|
|
|
else:
|
|
|
opciones_video = [f"{video['video_name']} - {video['created_at']}" for video in video_folders]
|
|
|
seleccion = st.selectbox(
|
|
|
"Selecciona un vídeo per validar:",
|
|
|
opciones_video,
|
|
|
index=0 if opciones_video else None,
|
|
|
)
|
|
|
|
|
|
if seleccion:
|
|
|
indice = opciones_video.index(seleccion)
|
|
|
video_seleccionat = video_folders[indice]
|
|
|
|
|
|
col1, col2 = st.columns([2, 1])
|
|
|
|
|
|
with col1:
|
|
|
st.markdown("### 📹 Informació del Vídeo")
|
|
|
st.markdown(f"**Nom:** {video_seleccionat['video_name']}")
|
|
|
st.markdown(f"**Data:** {video_seleccionat['created_at']}")
|
|
|
st.markdown(f"**Arxius:** {len(video_seleccionat['video_files'])} vídeos trobats")
|
|
|
|
|
|
|
|
|
if data_origin == "external" and not video_seleccionat["video_files"]:
|
|
|
api_client = st.session_state.get("api_client")
|
|
|
if api_client is not None:
|
|
|
try:
|
|
|
resp = api_client.download_pending_video(video_seleccionat["sha1sum"])
|
|
|
except Exception:
|
|
|
resp = {"error": "exception"}
|
|
|
|
|
|
video_bytes = (
|
|
|
resp.get("video_bytes")
|
|
|
if isinstance(resp, dict)
|
|
|
else None
|
|
|
)
|
|
|
if video_bytes:
|
|
|
local_folder = pending_root / video_seleccionat["sha1sum"]
|
|
|
local_folder.mkdir(parents=True, exist_ok=True)
|
|
|
local_path = local_folder / "video.mp4"
|
|
|
with local_path.open("wb") as f:
|
|
|
f.write(video_bytes)
|
|
|
video_seleccionat["video_files"] = [local_path]
|
|
|
|
|
|
if video_seleccionat["video_files"]:
|
|
|
video_path = str(video_seleccionat["video_files"][0])
|
|
|
st.markdown("**Vídeo principal:**")
|
|
|
st.video(video_path)
|
|
|
else:
|
|
|
st.warning("⚠️ No s'han trobat arxius de vídeo.")
|
|
|
|
|
|
with col2:
|
|
|
st.markdown("### 🔍 Accions de Validació")
|
|
|
|
|
|
col_btn1, col_btn2 = st.columns(2)
|
|
|
|
|
|
with col_btn1:
|
|
|
if st.button("✅ Acceptar", type="primary", key=f"accept_video_{video_seleccionat['sha1sum']}"):
|
|
|
|
|
|
success = compliance_client.record_validator_decision(
|
|
|
document_id=f"video_{video_seleccionat['video_name']}",
|
|
|
validator_email=f"{username}@veureu.local",
|
|
|
decision="acceptat",
|
|
|
comments=f"Vídeo validat per {username}",
|
|
|
)
|
|
|
|
|
|
|
|
|
session_id = st.session_state.get("session_id") or ""
|
|
|
phone = st.session_state.get("phone_number") or ""
|
|
|
|
|
|
try:
|
|
|
log_action(
|
|
|
session=session_id,
|
|
|
user=username or "",
|
|
|
phone=phone,
|
|
|
action="input-OK",
|
|
|
sha1sum=video_seleccionat["sha1sum"] or "",
|
|
|
)
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
if success:
|
|
|
st.success("✅ Vídeo acceptat, registrat al servei de compliance i marcat com input-OK")
|
|
|
else:
|
|
|
st.error("❌ Error registrant el veredicte al servei de compliance")
|
|
|
|
|
|
|
|
|
sha1 = video_seleccionat["sha1sum"]
|
|
|
local_pending_dir = pending_root / sha1
|
|
|
local_media_dir = base_media_dir / sha1
|
|
|
try:
|
|
|
local_media_dir.mkdir(parents=True, exist_ok=True)
|
|
|
src = local_pending_dir / "video.mp4"
|
|
|
if src.exists():
|
|
|
dst = local_media_dir / "video.mp4"
|
|
|
shutil.copy2(src, dst)
|
|
|
if local_pending_dir.exists():
|
|
|
shutil.rmtree(local_pending_dir)
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
|
|
|
try:
|
|
|
update_video_status(sha1, "input-OK")
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
|
|
|
if user_sms_enabled:
|
|
|
try:
|
|
|
owner_phone = get_video_owner_by_sha1(sha1)
|
|
|
except Exception:
|
|
|
owner_phone = ""
|
|
|
|
|
|
if owner_phone:
|
|
|
try:
|
|
|
|
|
|
msg = "Su vídeo ha sido aprobado. Puede entrar en la aplicación y subirlo de nuevo para generar la audiodescripción"
|
|
|
sms_ok = compliance_client.notify_user_video_approved(
|
|
|
phone=owner_phone,
|
|
|
message=msg,
|
|
|
sha1sum=sha1,
|
|
|
)
|
|
|
if sms_ok:
|
|
|
try:
|
|
|
from databases import log_action
|
|
|
|
|
|
session_id = st.session_state.get("session_id", "")
|
|
|
log_action(
|
|
|
session=session_id,
|
|
|
user=username or "",
|
|
|
phone=owner_phone,
|
|
|
action="SMS sent to user for video approval",
|
|
|
sha1sum=sha1 or "",
|
|
|
)
|
|
|
except Exception:
|
|
|
pass
|
|
|
except Exception as e_sms:
|
|
|
_log(f"[VIDEO USER SMS] Error enviant SMS a l'usuari: {e_sms}")
|
|
|
|
|
|
with col_btn2:
|
|
|
if st.button("❌ Rebutjar", type="secondary", key=f"reject_video_{video_seleccionat['video_name']}"):
|
|
|
success = compliance_client.record_validator_decision(
|
|
|
document_id=f"video_{video_seleccionat['video_name']}",
|
|
|
validator_email=f"{username}@veureu.local",
|
|
|
decision="rebutjat",
|
|
|
comments=f"Vídeo rebutjat per {username}",
|
|
|
)
|
|
|
if success:
|
|
|
st.success("✅ Vídeo rebutjat i registrat al servei de compliance")
|
|
|
else:
|
|
|
st.error("❌ Error registrant el veredicte")
|
|
|
|
|
|
with tab_ads:
|
|
|
st.subheader("🎬 Validar Audiodescripcions")
|
|
|
|
|
|
|
|
|
pending_videos = get_videos_by_status("UNE-pending")
|
|
|
|
|
|
if not pending_videos:
|
|
|
st.info("📝 No hi ha audiodescripcions pendents de validació UNE.")
|
|
|
else:
|
|
|
options = [f"{v['video_name']} ({v['sha1sum']})" for v in pending_videos]
|
|
|
seleccion_ad = st.selectbox(
|
|
|
"Selecciona un vídeo per validar la seva audiodescripció:",
|
|
|
options,
|
|
|
index=0 if options else None,
|
|
|
)
|
|
|
|
|
|
if seleccion_ad:
|
|
|
idx = options.index(seleccion_ad)
|
|
|
sel = pending_videos[idx]
|
|
|
sha1 = sel["sha1sum"]
|
|
|
video_name = sel["video_name"]
|
|
|
|
|
|
|
|
|
available_versions = []
|
|
|
for v_name in ("Salamandra", "MoE"):
|
|
|
rows = get_audiodescription_history(sha1, v_name)
|
|
|
if rows:
|
|
|
available_versions.append(v_name)
|
|
|
|
|
|
if not available_versions:
|
|
|
st.error("No s'ha trobat cap audiodescripció per a aquest vídeo a la base de dades.")
|
|
|
else:
|
|
|
selected_version = st.selectbox(
|
|
|
"Selecciona la versió a validar:",
|
|
|
available_versions,
|
|
|
index=0,
|
|
|
key=f"ad_version_{sha1}",
|
|
|
)
|
|
|
|
|
|
rows = get_audiodescription_history(sha1, selected_version) or []
|
|
|
row_ad = rows[-1]
|
|
|
current_une = row_ad.get("une_ad") or ""
|
|
|
current_free = row_ad.get("free_ad") or ""
|
|
|
|
|
|
col1, col2 = st.columns([2, 1])
|
|
|
|
|
|
with col1:
|
|
|
st.markdown("### 🎬 Informació de l'Audiodescripció")
|
|
|
st.markdown(f"**Vídeo:** {video_name}")
|
|
|
st.markdown(f"**SHA1:** {sha1}")
|
|
|
st.markdown(f"**Versió:** {selected_version}")
|
|
|
|
|
|
video_dir = base_media_dir / sha1
|
|
|
video_file = None
|
|
|
if video_dir.exists():
|
|
|
for cand in [video_dir / "video.mp4", video_dir / "video.avi", video_dir / "video.mov"]:
|
|
|
if cand.exists():
|
|
|
video_file = cand
|
|
|
break
|
|
|
if video_file is not None:
|
|
|
st.video(str(video_file))
|
|
|
else:
|
|
|
st.warning("⚠️ No s'ha trobat el fitxer de vídeo original.")
|
|
|
|
|
|
st.markdown("#### 📝 Audiodescripció UNE-153010 (SRT)")
|
|
|
new_une = st.text_area(
|
|
|
"Text SRT UNE-153010",
|
|
|
value=current_une,
|
|
|
height=220,
|
|
|
key=f"une_editor_{sha1}_{selected_version}",
|
|
|
)
|
|
|
|
|
|
st.markdown("#### 🗒️ Narració lliure")
|
|
|
new_free = st.text_area(
|
|
|
"Text narració lliure",
|
|
|
value=current_free,
|
|
|
height=220,
|
|
|
key=f"free_editor_{sha1}_{selected_version}",
|
|
|
)
|
|
|
|
|
|
with col2:
|
|
|
st.markdown("### 🔍 Accions de Validació")
|
|
|
|
|
|
if st.button("✅ Acceptar", type="primary", key=f"accept_ad_{sha1}_{selected_version}"):
|
|
|
try:
|
|
|
|
|
|
try:
|
|
|
success = compliance_client.record_validator_decision(
|
|
|
document_id=f"ad_{video_name}",
|
|
|
validator_email=f"{username}@veureu.local",
|
|
|
decision="acceptat",
|
|
|
comments=f"Audiodescripció UNE validada per {username}",
|
|
|
)
|
|
|
if not success:
|
|
|
st.warning("⚠️ No s'ha pogut registrar el veredicte al servei de compliance.")
|
|
|
except Exception as e_comp:
|
|
|
_log(f"[UNE validation] Error amb compliance: {e_comp}")
|
|
|
|
|
|
|
|
|
update_audiodescription_text(
|
|
|
sha1sum=sha1,
|
|
|
version=selected_version,
|
|
|
ok_une_ad=new_une,
|
|
|
ok_free_ad=new_free,
|
|
|
)
|
|
|
|
|
|
|
|
|
try:
|
|
|
update_video_status(sha1, "UNE-OK")
|
|
|
except Exception as e_stat:
|
|
|
_log(f"[UNE validation] Error actualitzant status a UNE-OK: {e_stat}")
|
|
|
|
|
|
|
|
|
try:
|
|
|
session_id_actions = session_id or ""
|
|
|
user_for_action = username or ""
|
|
|
phone_for_action = st.session_state.get("phone_number") or ""
|
|
|
insert_action(
|
|
|
session=session_id_actions,
|
|
|
user=user_for_action,
|
|
|
phone=phone_for_action,
|
|
|
action="UNE-OK",
|
|
|
sha1sum=sha1,
|
|
|
)
|
|
|
except Exception as e_act:
|
|
|
_log(f"[UNE validation] Error registrant acció UNE-OK: {e_act}")
|
|
|
|
|
|
|
|
|
if user_sms_enabled:
|
|
|
try:
|
|
|
owner_phone = get_video_owner_by_sha1(sha1)
|
|
|
except Exception:
|
|
|
owner_phone = ""
|
|
|
|
|
|
if owner_phone:
|
|
|
try:
|
|
|
msg = (
|
|
|
"La seva audiodescripció ha estat validada segons la norma UNE-153020. "
|
|
|
"Pots tornar a l'aplicació per revisar-la i descarregar-la."
|
|
|
)
|
|
|
compliance_client.notify_user_video_approved(
|
|
|
phone=owner_phone,
|
|
|
message=msg,
|
|
|
sha1sum=sha1,
|
|
|
)
|
|
|
except Exception as e_sms:
|
|
|
_log(f"[UNE USER SMS] Error enviant SMS a l'usuari: {e_sms}")
|
|
|
|
|
|
|
|
|
try:
|
|
|
if new_une.strip():
|
|
|
base_media_dir = base_dir / "temp" / "media"
|
|
|
video_dir = base_media_dir / sha1
|
|
|
video_path = None
|
|
|
if video_dir.exists():
|
|
|
for cand in [video_dir / "video.mp4", video_dir / "video.avi", video_dir / "video.mov"]:
|
|
|
if cand.exists():
|
|
|
video_path = cand
|
|
|
break
|
|
|
|
|
|
if video_path is not None:
|
|
|
hitl_ok_dir = video_dir / "HITL OK"
|
|
|
hitl_ok_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
tts_url = os.getenv("API_TTS_URL", "").strip()
|
|
|
if tts_url:
|
|
|
with tempfile.TemporaryDirectory(prefix="tts_hitl_ok_") as td:
|
|
|
td_path = Path(td)
|
|
|
srt_tmp = td_path / "ad_ok.srt"
|
|
|
srt_tmp.write_text(new_une, encoding="utf-8")
|
|
|
|
|
|
files = {
|
|
|
"srt": ("ad_ok.srt", srt_tmp.open("rb"), "text/plain"),
|
|
|
"video": (video_path.name, video_path.open("rb"), "video/mp4"),
|
|
|
}
|
|
|
data = {
|
|
|
"voice": "central/grau",
|
|
|
"ad_format": "mp3",
|
|
|
"include_final_mp4": "1",
|
|
|
}
|
|
|
|
|
|
resp = requests.post(
|
|
|
f"{tts_url.rstrip('/')}/tts/srt",
|
|
|
files=files,
|
|
|
data=data,
|
|
|
timeout=300,
|
|
|
)
|
|
|
resp.raise_for_status()
|
|
|
|
|
|
zip_bytes = resp.content
|
|
|
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
|
|
|
for member in zf.infolist():
|
|
|
name = member.filename
|
|
|
lower = name.lower()
|
|
|
if lower.endswith("ad_master.mp3"):
|
|
|
target = hitl_ok_dir / "free_ad.mp3"
|
|
|
with zf.open(member) as src, target.open("wb") as dst:
|
|
|
shutil.copyfileobj(src, dst)
|
|
|
elif lower.endswith("video_con_ad.mp4"):
|
|
|
target = hitl_ok_dir / "une_ad.mp4"
|
|
|
with zf.open(member) as src, target.open("wb") as dst:
|
|
|
shutil.copyfileobj(src, dst)
|
|
|
else:
|
|
|
_log("[UNE TTS] API_TTS_URL no configurada; s'omet la generació de free_ad.mp3/une_ad.mp4 (HITL OK)")
|
|
|
except Exception as e_tts:
|
|
|
_log(f"[UNE TTS] Error generant assets HITL OK: {e_tts}")
|
|
|
|
|
|
|
|
|
try:
|
|
|
log_action(
|
|
|
session=session_id or "",
|
|
|
user=username or "",
|
|
|
phone=st.session_state.get("phone_number") or "",
|
|
|
action="validate_ad_une_153020",
|
|
|
sha1sum=sha1,
|
|
|
)
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
st.success("✅ Audiodescripció UNE-153010 validada i desada (HITL OK).")
|
|
|
st.rerun()
|
|
|
except Exception as e_val:
|
|
|
st.error(f"❌ Error durant la validació de l'audiodescripció: {e_val}")
|
|
|
|
|
|
st.markdown("---")
|
|
|
st.markdown("### ℹ️ Informació del Procés de Validació")
|
|
|
st.markdown(
|
|
|
"""
|
|
|
- **Tots els veredictes** es registren al servei de compliance per garantir la traçabilitat
|
|
|
- **Cada validació** inclou veredicte, nom del vídeo i validador responsable
|
|
|
- **Els registres** compleixen amb la normativa AI Act i GDPR
|
|
|
"""
|
|
|
)
|
|
|
|