| | import os |
| | import time |
| | from typing import Any, Dict, Optional |
| |
|
| | import requests |
| | import pandas as pd |
| | import streamlit as st |
| | import altair as alt |
| | import matplotlib.pyplot as plt |
| |
|
| | import os |
| | print("BOOT OK. BACKEND_URL env:", os.getenv("BACKEND_URL")) |
| |
|
| |
|
| | |
| | |
| | |
| | st.set_page_config( |
| | page_title="Multimodal Video Sentiment Dashboard", |
| | page_icon="π¬", |
| | layout="wide", |
| | ) |
| |
|
| | DEFAULT_POLL_INTERVAL_S = 2 |
| | DEFAULT_TIMEOUT_S = 10 * 60 |
| |
|
| |
|
| | |
| | |
| | |
| | def get_backend_url() -> str: |
| | |
| | url = os.getenv("BACKEND_URL", "") |
| | return (url or "").strip().rstrip("/") |
| |
|
| |
|
| |
|
| | def safe_request_json(method: str, url: str, **kwargs) -> Dict[str, Any]: |
| | """ |
| | Wrapper requests dengan error message yang user-friendly. |
| | """ |
| | try: |
| | resp = requests.request(method, url, timeout=kwargs.pop("timeout", 60), **kwargs) |
| | except requests.exceptions.RequestException as e: |
| | raise RuntimeError(f"Gagal menghubungi backend. Detail: {e}") |
| |
|
| | if not (200 <= resp.status_code < 300): |
| | |
| | try: |
| | payload = resp.json() |
| | detail = payload.get("detail") or payload.get("error") or payload |
| | except Exception: |
| | detail = resp.text[:500] |
| | raise RuntimeError(f"Backend error (HTTP {resp.status_code}): {detail}") |
| |
|
| | try: |
| | return resp.json() |
| | except Exception: |
| | raise RuntimeError("Response backend bukan JSON yang valid.") |
| |
|
| |
|
| | def check_health(backend_url: str) -> bool: |
| | data = safe_request_json("GET", f"{backend_url}/health", timeout=20) |
| | return bool(data.get("ok", False)) |
| |
|
| |
|
| | def upload_video(backend_url: str, file_bytes: bytes, filename: str) -> str: |
| | files = {"file": (filename, file_bytes)} |
| | data = safe_request_json("POST", f"{backend_url}/jobs/upload", files=files, timeout=120) |
| | job_id = data.get("job_id") |
| | if not job_id: |
| | raise RuntimeError("Upload sukses, tapi backend tidak mengembalikan job_id.") |
| | return job_id |
| |
|
| |
|
| | def get_job(backend_url: str, job_id: str) -> Dict[str, Any]: |
| | return safe_request_json("GET", f"{backend_url}/jobs/{job_id}", timeout=30) |
| |
|
| |
|
| | def label_badge(label_text: str) -> str: |
| | |
| | if label_text.upper() == "POSITIVE": |
| | return "β
POSITIVE" |
| | if label_text.upper() == "NEGATIVE": |
| | return "β NEGATIVE" |
| | return label_text |
| |
|
| |
|
| | def render_dashboard(result: Dict[str, Any]) -> None: |
| | summary = result.get("summary", {}) or {} |
| | charts = result.get("charts", {}) or {} |
| | clips = result.get("clips", []) or [] |
| | video_level = result.get("video_level", {}) or {} |
| | meta = result.get("meta", {}) or {} |
| |
|
| | |
| | pos_clips = summary.get("positive_clips", None) |
| | neg_clips = summary.get("negative_clips", None) |
| |
|
| | if pos_clips is None or neg_clips is None: |
| | if isinstance(clips, list) and len(clips) > 0: |
| | pos_clips = sum(1 for c in clips if c.get("label") == 1) |
| | neg_clips = sum(1 for c in clips if c.get("label") == 0) |
| | else: |
| | pos_clips, neg_clips = 0, 0 |
| |
|
| | total_clips = summary.get("total_clips", None) |
| | if total_clips is None: |
| | total_clips = int(pos_clips) + int(neg_clips) |
| |
|
| | |
| | if pos_clips > neg_clips: |
| | majority_label_text = "POSITIVE" |
| | elif neg_clips > pos_clips: |
| | majority_label_text = "NEGATIVE" |
| | else: |
| | majority_label_text = "TIE" |
| |
|
| | |
| | if majority_label_text == "TIE": |
| | if video_level.get("label") == 1: |
| | label_text = "POSITIVE" |
| | elif video_level.get("label") == 0: |
| | label_text = "NEGATIVE" |
| | else: |
| | label_text = "TIE" |
| | tie_note = "Jumlah klip seimbang; label ditentukan dari rata-rata probabilitas (mean)." |
| | else: |
| | label_text = majority_label_text |
| | tie_note = None |
| |
|
| | |
| | |
| | |
| | |
| | st.markdown("## Hasil") |
| | |
| | with st.container(border=True): |
| | |
| | |
| | if label_text == "POSITIVE": |
| | color = "#16a34a" |
| | elif label_text == "NEGATIVE": |
| | color = "#dc2626" |
| | else: |
| | color = "#6b7280" |
| |
|
| | st.markdown( |
| | f""" |
| | <div style="text-align:center; padding: 10px 0;"> |
| | <div style="font-size: 38px; font-weight: 800; color: {color};"> |
| | {label_text} |
| | </div> |
| | </div> |
| | """, |
| | unsafe_allow_html=True |
| | ) |
| | |
| | |
| | m1, m2, m3 = st.columns(3) |
| | m1.metric("Positive Clips", value=str(pos_clips)) |
| | m2.metric("Negative Clips", value=str(neg_clips)) |
| | m3.metric("Total Clips", value=str(total_clips)) |
| | |
| | st.divider() |
| |
|
| | |
| | left, right = st.columns([2, 1]) |
| |
|
| | with left: |
| | st.markdown("### Probabilitas per menit") |
| | by_minute = charts.get("by_minute", []) or [] |
| | if len(by_minute) == 0: |
| | st.info("Tidak ada data charts.by_minute.") |
| | else: |
| | dfm = pd.DataFrame(by_minute) |
| | for col in ["minute", "range", "prob_mean", "n_clips"]: |
| | if col not in dfm.columns: |
| | dfm[col] = None |
| |
|
| | bar = ( |
| | alt.Chart(dfm) |
| | .mark_bar() |
| | .encode( |
| | x=alt.X("minute:O", title="Menit"), |
| | y=alt.Y("prob_mean:Q", title="Rata-rata prob_pos", scale=alt.Scale(domain=[0, 1])), |
| | tooltip=[ |
| | alt.Tooltip("range:N", title="Range menit"), |
| | alt.Tooltip("prob_mean:Q", title="prob_mean", format=".3f"), |
| | alt.Tooltip("n_clips:Q", title="n_clips"), |
| | ], |
| | ) |
| | .properties(height=320) |
| | ) |
| | st.altair_chart(bar, use_container_width=True) |
| |
|
| | with st.expander("Lihat data by_minute"): |
| | st.dataframe(dfm, use_container_width=True) |
| |
|
| | with right: |
| | st.markdown("### Distribusi label clip") |
| | pie = charts.get("pie", {}) or {} |
| | pos = int(pie.get("positive", pos_clips) or 0) |
| | neg = int(pie.get("negative", neg_clips) or 0) |
| |
|
| | if (pos + neg) == 0: |
| | st.info("Tidak ada data charts.pie.") |
| | else: |
| | fig, ax = plt.subplots() |
| | ax.pie([pos, neg], labels=["Positive", "Negative"], autopct="%1.1f%%", startangle=90) |
| | ax.axis("equal") |
| | st.pyplot(fig, clear_figure=True) |
| |
|
| | st.caption(f"Positive: **{pos}** | Negative: **{neg}**") |
| |
|
| | st.divider() |
| |
|
| | |
| | st.markdown("### Detail per clip") |
| | if len(clips) == 0: |
| | st.warning("Tidak ada data clips.") |
| | return |
| |
|
| | dfc = pd.DataFrame(clips) |
| |
|
| | desired_cols = ["idx", "start_s", "end_s", "prob_pos", "label", "logit"] |
| | for col in desired_cols: |
| | if col not in dfc.columns: |
| | dfc[col] = None |
| |
|
| | if "label" in dfc.columns: |
| | dfc["label_text"] = dfc["label"].apply(lambda x: "POSITIVE" if x == 1 else ("NEGATIVE" if x == 0 else None)) |
| |
|
| | if "mask_stats" in dfc.columns: |
| | def pick_mask(d, k): |
| | if isinstance(d, dict): |
| | return d.get(k) |
| | return None |
| | dfc["text_valid"] = dfc["mask_stats"].apply(lambda d: pick_mask(d, "text_valid")) |
| | dfc["audio_valid"] = dfc["mask_stats"].apply(lambda d: pick_mask(d, "audio_valid")) |
| | dfc["visual_valid"] = dfc["mask_stats"].apply(lambda d: pick_mask(d, "visual_valid")) |
| |
|
| | show_cols = ["idx", "start_s", "end_s", "prob_pos", "label_text", "text_valid", "audio_valid", "visual_valid", "asr_text"] |
| | show_cols = [c for c in show_cols if c in dfc.columns] |
| |
|
| | for col in ["start_s", "end_s", "prob_pos", "logit"]: |
| | if col in dfc.columns: |
| | dfc[col] = pd.to_numeric(dfc[col], errors="coerce") |
| |
|
| | st.dataframe(dfc[show_cols].sort_values("idx"), use_container_width=True, height=420) |
| |
|
| | st.download_button( |
| | label="β¬οΈ Download result JSON", |
| | data=pd.Series(result).to_json(indent=2, force_ascii=False), |
| | file_name="result.json", |
| | mime="application/json", |
| | ) |
| |
|
| |
|
| |
|
| | |
| | |
| | |
| | st.title("π¬ Multimodal Sentiment Analysis Video") |
| |
|
| | backend_url = get_backend_url() |
| |
|
| | with st.sidebar: |
| | st.header("βοΈ Konfigurasi") |
| | st.write("Backend URL diambil dari **HF Secret** `BACKEND_URL`.") |
| | st.code(backend_url if backend_url else "(BACKEND_URL belum di-set)", language="text") |
| |
|
| | poll_interval = st.number_input("Polling interval (detik)", min_value=1, max_value=10, value=DEFAULT_POLL_INTERVAL_S) |
| | timeout_s = st.number_input("Timeout (detik)", min_value=30, max_value=3600, value=DEFAULT_TIMEOUT_S, step=30) |
| |
|
| | st.divider() |
| | st.subheader("Cek koneksi backend") |
| | if st.button("π Health check"): |
| | if not backend_url: |
| | st.error("BACKEND_URL belum di-set di Secrets.") |
| | else: |
| | try: |
| | ok = check_health(backend_url) |
| | if ok: |
| | st.success("Backend OK β
") |
| | else: |
| | st.error("Backend merespons tapi ok=false.") |
| | except Exception as e: |
| | st.error(str(e)) |
| |
|
| | if not backend_url: |
| | st.error("BACKEND_URL belum di-set. Tambahkan di HF Space β Settings β Repository secrets β BACKEND_URL.") |
| | st.stop() |
| |
|
| | st.markdown("### Upload Video") |
| | uploaded = st.file_uploader("Pilih file video", type=["mp4", "mov", "mkv", "webm"]) |
| |
|
| | if uploaded is not None: |
| | st.video(uploaded) |
| |
|
| | analyze = st.button("π Analisis", type="primary", disabled=(uploaded is None)) |
| |
|
| | if analyze: |
| | |
| | try: |
| | _ = check_health(backend_url) |
| | except Exception as e: |
| | st.error(f"Backend tidak reachable: {e}") |
| | st.stop() |
| |
|
| | |
| | try: |
| | with st.spinner("Mengupload video ke backend..."): |
| | file_bytes = uploaded.getvalue() |
| | job_id = upload_video(backend_url, file_bytes, uploaded.name) |
| | st.success(f"Upload sukses. job_id = {job_id}") |
| | except Exception as e: |
| | st.error(f"Upload gagal: {e}") |
| | st.stop() |
| |
|
| | |
| | status_box = st.empty() |
| | progress = st.progress(0) |
| | t0 = time.time() |
| |
|
| | last_status: Optional[str] = None |
| | payload: Optional[Dict[str, Any]] = None |
| |
|
| | while True: |
| | elapsed = time.time() - t0 |
| | if elapsed > float(timeout_s): |
| | st.error(f"Timeout: job belum selesai setelah {int(timeout_s)} detik.") |
| | st.stop() |
| |
|
| | try: |
| | payload = get_job(backend_url, job_id) |
| | except Exception as e: |
| | st.error(f"Gagal polling job: {e}") |
| | st.stop() |
| |
|
| | status = payload.get("status") |
| | if status != last_status: |
| | status_box.info(f"Status job: **{status}**") |
| | last_status = status |
| |
|
| | |
| | progress_value = min(0.95, elapsed / float(timeout_s)) |
| | progress.progress(progress_value) |
| |
|
| | if status == "done": |
| | progress.progress(1.0) |
| | st.success("Selesai β
") |
| | result = payload.get("result") |
| | if not isinstance(result, dict): |
| | st.error("Job done tapi result kosong / format tidak valid.") |
| | st.stop() |
| | render_dashboard(result) |
| | break |
| |
|
| | if status == "failed": |
| | progress.progress(1.0) |
| | err = payload.get("error") or "Unknown error" |
| | st.error(f"Job failed β\n\n{err}") |
| | st.stop() |
| |
|
| | |
| | time.sleep(float(poll_interval)) |
| |
|