| from __future__ import annotations |
|
|
| from typing import Any, Dict, List, Tuple |
| import streamlit as st |
|
|
| from utils.helpers import ( |
| ask_question, |
| fetch_bookmarks, |
| fetch_job_status, |
| fetch_quiz, |
| fetch_summary, |
| fetch_youtube_metadata_v3, |
| get_image_base64, |
| get_remote_image_url, |
| list_cached_videos, |
| ping_health, |
| resolve_image_source, |
| semantic_search, |
| seconds_to_timestamp, |
| start_ingestion, |
| youtube_timestamp_link, |
| ) |
| from utils.settings import APPROVED_CHANNELS |
|
|
| BOOKMARK_SUMMARY_WORDS = 100 |
|
|
|
|
| def _init_session_state() -> None: |
| defaults = { |
| "step_done": 0, |
| "auto_open": {1: True, 2: False, 3: False, 4: False, 5: False}, |
| "selected_video_id": "", |
| "selected_video_title": "", |
| "selected_channel_name": "", |
| "job_id_by_video": {}, |
| "recent_job_status": None, |
| "cached_videos": [], |
| "bookmarks_by_video": {}, |
| "bookmark_summaries_by_video": {}, |
| "summary_by_video": {}, |
| "quiz_by_video": {}, |
| "search_results_by_video": {}, |
| "qa_history_by_video": {}, |
| "step1_tab_choice": "Insert YouTube URL", |
| } |
| for key, value in defaults.items(): |
| st.session_state.setdefault(key, value) |
| st.session_state.setdefault("chat_history", []) |
| st.session_state.setdefault("chat_video_id", "") |
| if not st.session_state.get("cached_videos"): |
| st.session_state["cached_videos"] = list_cached_videos() |
|
|
|
|
| def _is_step_open(step: int) -> bool: |
| return bool(st.session_state.get("auto_open", {}).get(step, False)) |
|
|
|
|
| def _advance_to_step(step: int) -> None: |
| st.session_state["step_done"] = max(st.session_state["step_done"], step) |
| if step < 5: |
| st.session_state["auto_open"][step] = False |
| st.session_state["auto_open"][step + 1] = True |
| _set_expander_override(_step_expander_key(step + 1), True) |
|
|
|
|
| def _expander_overrides() -> Dict[str, bool]: |
| return st.session_state.setdefault("_expander_overrides", {}) |
|
|
|
|
| def _set_expander_override(key: str, expanded: bool) -> None: |
| _expander_overrides()[key] = expanded |
| st.session_state.pop(f"_expander_initialized_{key}", None) |
|
|
|
|
| def _expander_container(key: str, label: str, default_open: bool): |
| overrides = _expander_overrides() |
| init_key = f"_expander_initialized_{key}" |
| expanded_arg = None |
| if key in overrides: |
| expanded_arg = overrides.pop(key) |
| elif not st.session_state.get(init_key): |
| expanded_arg = default_open |
| if expanded_arg is None: |
| container = st.expander(label) |
| else: |
| container = st.expander(label, expanded=expanded_arg) |
| st.session_state[init_key] = True |
| return container |
|
|
|
|
| def _step_expander_key(step: int) -> str: |
| return f"step_{step}" |
|
|
|
|
| def _step_expander(step: int, label: str): |
| return _expander_container(_step_expander_key(step), label, _is_step_open(step)) |
|
|
|
|
| def _named_expander(key: str, label: str, default_open: bool = False): |
| return _expander_container(key, label, default_open) |
|
|
|
|
| def _keep_step_expander_open(step: int) -> None: |
| _set_expander_override(_step_expander_key(step), True) |
|
|
|
|
| def _refresh_cached_videos_list() -> None: |
| if hasattr(list_cached_videos, "clear"): |
| list_cached_videos.clear() |
| st.session_state["cached_videos"] = list_cached_videos() |
|
|
|
|
| def _render_sidebar_stepper() -> None: |
| with st.sidebar: |
| st.markdown("### Progress") |
| sidebar_labels = [ |
| "Insert YouTube Lecture URL", |
| "Browse bookmarks", |
| "Summary & Quizzes", |
| "Q&A", |
| "Smart Search (Optional)", |
| ] |
| for step, label in enumerate(sidebar_labels, start=1): |
| status = "โ
" if st.session_state["step_done"] >= step else "โฌ๏ธ" |
| st.write(f"{status} Step {step}: {label}") |
|
|
|
|
| def _render_api_status() -> None: |
| try: |
| health = ping_health() |
| if health.get("status") == "ok": |
| st.caption("โ
API online") |
| else: |
| st.caption("โ API offline") |
| except RuntimeError as exc: |
| st.caption(f"โ {str(exc)}") |
|
|
|
|
| def _run_status_popover() -> None: |
| with st.popover("What does Run and Job Refresh Status do?"): |
| st.markdown( |
| "- Checks whether the lecture artifacts already exist in S3\n" |
| "- Queues ingestion via the FastAPI backend if missing\n" |
| "- `Refresh job status` polls `/videos/process/{job_id}` until the artifacts are ready." |
| ) |
|
|
|
|
| def _render_whitelist_content() -> None: |
| channel_data = [ |
| ("MIT OpenCourseWare.png", "https://www.youtube.com/@mitocw", "MIT OpenCourseWare"), |
| ("Stanford Online.png", "https://www.youtube.com/@stanfordonline", "Stanford Online"), |
| ("Harvard.png", "https://www.youtube.com/@harvard", "Harvard"), |
| ("freeCodeCamp.png", "https://www.youtube.com/@freecodecamp", "freeCodeCamp.org"), |
| ("TEDx.png", "https://www.youtube.com/@TEDx", "TEDx Talks"), |
| ("SciShow.png", "https://www.youtube.com/@SciShow", "SciShow"), |
| ("Udacity.png", "https://www.youtube.com/@Udacity", "Udacity"), |
| ("ProgrammingWithMosh.png", "https://www.youtube.com/@programmingwithmosh", "Programming with Mosh"), |
| ("Computerphile.png", "https://www.youtube.com/@Computerphile", "Computerphile"), |
| ("edX.png", "https://www.youtube.com/@edXOnline", "edX"), |
| ("TED.png", "https://www.youtube.com/@TED", "TED"), |
| ("YaleCourses.png", "https://www.youtube.com/@YaleCourses", "YaleCourses"), |
| ("Veritasium.png", "https://www.youtube.com/@veritasium", "Veritasium"), |
| ("3blue1brown.png", "https://www.youtube.com/@3blue1brown", "3blue1brown"), |
| ("CrashCourse.png", "https://www.youtube.com/@crashcourse", "CrashCourse"), |
| ("KhanAcademy.png", "https://www.youtube.com/@khanacademy", "Khan Academy"), |
| ("minutephysics.png", "https://www.youtube.com/@minutephysics", "minutephysics"), |
| ("numberphile.png", "https://www.youtube.com/@numberphile", "numberphile"), |
| ("TEDEd.png", "https://www.youtube.com/@TEDEd", "TED-Ed"), |
| ("coursera.png", "https://www.youtube.com/@coursera", "Coursera"), |
| ] |
| num_cols = 4 |
| for row_start in range(0, len(channel_data), num_cols): |
| cols = st.columns(num_cols) |
| for col_idx in range(num_cols): |
| idx = row_start + col_idx |
| if idx >= len(channel_data): |
| continue |
| img_filename, url, alt = channel_data[idx] |
| img_base64 = get_image_base64(img_filename) |
| img_src = ( |
| f"data:image/png;base64,{img_base64}" |
| if img_base64 |
| else get_remote_image_url(img_filename) |
| ) |
| with cols[col_idx]: |
| st.markdown( |
| f""" |
| <a href='{url}' target='_blank'> |
| <img src='{img_src}' style='display:block;margin:auto;width:48px;height:48px;object-fit:contain;' alt='{alt}'/> |
| </a> |
| """, |
| unsafe_allow_html=True, |
| ) |
| st.markdown( |
| f"<div style='text-align:center;'><a href='{url}' target='_blank'>{alt}</a></div>", |
| unsafe_allow_html=True, |
| ) |
|
|
|
|
| def _whitelist_popover() -> None: |
| with st.popover("Whitelisted YouTube Channels โ
"): |
| _render_whitelist_content() |
|
|
|
|
| def _fetch_youtube_metadata(video_url: str) -> Dict[str, Any] | None: |
| if not video_url: |
| return None |
| try: |
| metadata = fetch_youtube_metadata_v3(video_url) |
| if metadata: |
| return metadata |
| except RuntimeError as exc: |
| st.warning(str(exc)) |
| return None |
| return None |
|
|
|
|
| def _render_whitelist_expander() -> None: |
| channel_data = [ |
| ("MIT OpenCourseWare.png", "https://www.youtube.com/@mitocw", "MIT OpenCourseWare"), |
| ("Stanford Online.png", "https://www.youtube.com/@stanfordonline", "Stanford Online"), |
| ("Harvard.png", "https://www.youtube.com/@harvard", "Harvard"), |
| ("freeCodeCamp.png", "https://www.youtube.com/@freecodecamp", "freeCodeCamp.org"), |
| ("TEDx.png", "https://www.youtube.com/@TEDx", "TEDx Talks"), |
| ("SciShow.png", "https://www.youtube.com/@SciShow", "SciShow"), |
| ("Udacity.png", "https://www.youtube.com/@Udacity", "Udacity"), |
| ("ProgrammingWithMosh.png", "https://www.youtube.com/@programmingwithmosh", "Programming with Mosh"), |
| ("Computerphile.png", "https://www.youtube.com/@Computerphile", "Computerphile"), |
| ("edX.png", "https://www.youtube.com/@edXOnline", "edX"), |
| ("TED.png", "https://www.youtube.com/@TED", "TED"), |
| ("YaleCourses.png", "https://www.youtube.com/@YaleCourses", "YaleCourses"), |
| ("Veritasium.png", "https://www.youtube.com/@veritasium", "Veritasium"), |
| ("3blue1brown.png", "https://www.youtube.com/@3blue1brown", "3blue1brown"), |
| ("CrashCourse.png", "https://www.youtube.com/@crashcourse", "CrashCourse"), |
| ("KhanAcademy.png", "https://www.youtube.com/@khanacademy", "Khan Academy"), |
| ("minutephysics.png", "https://www.youtube.com/@minutephysics", "minutephysics"), |
| ("numberphile.png", "https://www.youtube.com/@numberphile", "numberphile"), |
| ("TEDEd.png", "https://www.youtube.com/@TEDEd", "TED-Ed"), |
| ("coursera.png", "https://www.youtube.com/@coursera", "Coursera"), |
| ] |
| num_cols = 4 |
| with st.expander("Whitelisted YouTube Channels โ
", expanded=False): |
| for row_start in range(0, len(channel_data), num_cols): |
| cols = st.columns(num_cols) |
| for col_idx in range(num_cols): |
| idx = row_start + col_idx |
| if idx >= len(channel_data): |
| continue |
| img_filename, url, alt = channel_data[idx] |
| img_base64 = get_image_base64(img_filename) |
| img_src = ( |
| f"data:image/png;base64,{img_base64}" |
| if img_base64 |
| else get_remote_image_url(img_filename) |
| ) |
| with cols[col_idx]: |
| st.markdown( |
| f""" |
| <a href='{url}' target='_blank'> |
| <img src='{img_src}' style='display:block;margin:auto;width:48px;height:48px;object-fit:contain;' alt='{alt}'/> |
| </a> |
| """, |
| unsafe_allow_html=True, |
| ) |
| st.markdown( |
| f"<div style='text-align:center;'><a href='{url}' target='_blank'>{alt}</a></div>", |
| unsafe_allow_html=True, |
| ) |
|
|
|
|
| def _render_step_one() -> None: |
| with _step_expander(1, "Step 1 โ Load YouTube Lecture"): |
| tab_options = ["Insert YouTube URL", "Use Cached Video"] |
| default_tab = st.session_state.get("step1_tab_choice", tab_options[0]) |
| tab_choice = st.pills( |
| options=tab_options, |
| format_func=None, |
| label="", |
| help=None, |
| key="step1_tab_pill", |
| disabled=False, |
| label_visibility="collapsed" |
| ) |
| st.session_state["step1_tab_choice"] = tab_choice |
| if tab_choice == tab_options[0]: |
| video_url = st.text_input( |
| "YouTube URL", |
| value=st.session_state.get("step1_video_url", ""), |
| placeholder="https://www.youtube.com/watch?v=GOgA8JGUiwI", |
| key="step1_video_url_input", |
| ) |
| metadata = st.session_state.get("step1_metadata") |
| if video_url and video_url != st.session_state.get("step1_metadata_url"): |
| metadata = _fetch_youtube_metadata(video_url) |
| st.session_state["step1_metadata"] = metadata |
| st.session_state["step1_metadata_url"] = video_url |
| if metadata: |
| st.markdown(f"<span style='font-size:1.3rem;color:#F63366;font-weight:600;'>{metadata.get('title', 'Untitled lecture')}</span>", unsafe_allow_html=True) |
| st.markdown(f"<span style='font-size:1.3rem;color:#666;font-weight:500;'>Channel: {metadata.get('author_name', 'Unknown')}</span>", unsafe_allow_html=True) |
| st.video(video_url) |
| elif video_url: |
| st.warning("Could not fetch metadata for this URL.") |
| run_col, refresh_col = st.columns([3, 1]) |
| with run_col: |
| if st.button("Run", type="primary", disabled=not bool(video_url), key="run_btn_step1"): |
| with st.spinner("Running ingestion..."): |
| _keep_step_expander_open(1) |
| _handle_run(video_url, metadata) if video_url else None |
| with refresh_col: |
| if st.button( |
| "Refresh Job Status", |
| disabled=not bool(video_url), |
| key="refresh_job_status_btn" |
| ): |
| _keep_step_expander_open(1) |
| _handle_refresh_job_status() |
|
|
| if st.session_state.get("recent_job_status"): |
| status = st.session_state["recent_job_status"] |
| state_raw = status.get("state", "unknown") or "unknown" |
| state = state_raw.lower() |
| message = status.get("message", "") |
| job_id = status.get("job_id") |
| show_processing_hint = False |
| processing_states = {"queued", "pending", "running", "processing", "started", "submitted"} |
| if state == "failed" and "not in the approved list" in message: |
| st.error(f"Job {job_id} โ failed: YouTube Channel not in the approved list.") |
| elif state == "failed": |
| st.error(f"Job {job_id} โ failed: {message}") |
| else: |
| st.info(f"Job {job_id} โ {state_raw}: {message}") |
| if state in processing_states: |
| show_processing_hint = True |
| if show_processing_hint: |
| st.info("Processing may take some time to ingest the YouTube video, transcribe it, and convert it into embeddings.") |
| col_pop_left, col_pop_right = st.columns(2) |
| with col_pop_left: |
| _whitelist_popover() |
| with col_pop_right: |
| _run_status_popover() |
| else: |
| st.markdown("#### Use an existing cached lecture") |
| _render_cached_videos() |
| _whitelist_popover() |
|
|
|
|
| def _handle_run(video_url: str, metadata: Dict[str, Any] | None) -> None: |
| if not video_url or not metadata: |
| st.warning("Provide a valid YouTube URL and ensure metadata loads before running.") |
| return |
| try: |
| response = start_ingestion(video_url) |
| except RuntimeError as exc: |
| st.error(str(exc)) |
| _render_whitelist_hint() |
| return |
| video_id = response.get("video_id") |
| job_id = response.get("job_id") |
| if video_id and job_id: |
| st.session_state["job_id_by_video"][video_id] = job_id |
| st.session_state["selected_video_id"] = video_id |
| st.session_state["selected_video_title"] = metadata.get("title", video_id) |
| st.session_state["selected_channel_name"] = metadata.get("author_name", "") |
| |
| try: |
| status = fetch_job_status(job_id) |
| st.session_state["recent_job_status"] = status |
| except RuntimeError as exc: |
| st.session_state["recent_job_status"] = {"job_id": job_id, "state": "failed", "message": str(exc)} |
| _advance_to_step(1) |
| else: |
| st.warning("Backend did not return a job_id/video_id. Check logs.") |
|
|
|
|
| def _handle_refresh_job_status() -> None: |
| video_id = st.session_state.get("selected_video_id") |
| if not video_id: |
| st.info("Select or ingest a video first.") |
| return |
| job_id = st.session_state["job_id_by_video"].get(video_id) |
| if not job_id: |
| st.warning("No job associated with this video yet.") |
| return |
| try: |
| status = fetch_job_status(job_id) |
| st.session_state["recent_job_status"] = status |
| _refresh_cached_videos_list() |
| |
| st.info(f"State: {status.get('state')} โ {status.get('message', '')}") |
| except RuntimeError as exc: |
| st.error(str(exc)) |
|
|
|
|
| def _render_whitelist_hint() -> None: |
| st.warning("Channel not currently approved. Approved channels include:") |
| st.write(", ".join(APPROVED_CHANNELS)) |
|
|
|
|
| def _render_cached_videos() -> None: |
| if st.button("Refresh list", key="refresh_cache_list", use_container_width=True): |
| _refresh_cached_videos_list() |
| cached = st.session_state.get("cached_videos") or [] |
| if not cached: |
| st.info("No cached videos detected yet.") |
| return |
| signature = [ |
| ( |
| video.get("video_id"), |
| video.get("title"), |
| video.get("channel_name"), |
| video.get("thumbnail_url"), |
| ) |
| for video in cached |
| ] |
| if st.session_state.get("_cached_videos_signature") != signature: |
| processed_videos: List[Dict[str, Any]] = [] |
| for video in cached: |
| video_id = video.get("video_id") |
| video_title = video.get("title") |
| channel_name = video.get("channel_name") |
| thumbnail_url = video.get("thumbnail_url") |
| video_url = video.get("url") or f"https://www.youtube.com/watch?v={video_id}" |
| if not video_title or not channel_name: |
| metadata = _fetch_youtube_metadata(video_url) |
| if metadata: |
| video_title = metadata.get("title", video_id) |
| channel_name = metadata.get("author_name", "") |
| processed_videos.append( |
| { |
| "video_id": video_id, |
| "video_title": video_title or video_id, |
| "channel_name": channel_name or "", |
| "thumbnail_url": thumbnail_url, |
| "video_url": video_url, |
| } |
| ) |
| st.session_state["_cached_videos_signature"] = signature |
| st.session_state["_cached_videos_processed"] = processed_videos |
|
|
| processed_videos = st.session_state.get("_cached_videos_processed", []) |
| with st.container(height=400): |
| if processed_videos: |
| for video in processed_videos: |
| cols = st.columns([1, 3, 1]) |
| thumbnail_url = video.get("thumbnail_url") |
| video_id = video.get("video_id") |
| video_title = video.get("video_title") |
| channel_name = video.get("channel_name") |
| video_url = video.get("video_url") |
| if thumbnail_url: |
| cols[0].image(thumbnail_url, use_container_width=True) |
| else: |
| yt_thumb = f"https://img.youtube.com/vi/{video_id}/hqdefault.jpg" |
| cols[0].image(yt_thumb, use_container_width=True) |
| cols[1].markdown(f"<span style='font-size:1.1rem;font-weight:600;color:#F63366;'>Video: {video_title}</span>", unsafe_allow_html=True) |
| cols[1].markdown(f"<span style='font-size:1.1rem;font-weight:500;color:#666;'>Channel: {channel_name}</span>", unsafe_allow_html=True) |
| cols[1].markdown(f"[Watch on YouTube โ]({video_url})") |
| if cols[2].button("Use this video", key=f"use_{video_id}"): |
| st.session_state["selected_video_id"] = video_id |
| st.session_state["selected_video_title"] = video_title |
| st.session_state["selected_channel_name"] = channel_name |
| _advance_to_step(1) |
| else: |
| st.markdown("<div style='height:100%;display:flex;align-items:center;justify-content:center;'><span style='color:#888;'>No cached videos detected yet.</span></div>", unsafe_allow_html=True) |
|
|
|
|
| def _select_video(label: str, key: str) -> str: |
| cached = st.session_state.get("cached_videos") or [] |
| options: List[Tuple[str, str]] = [] |
| selected_id = st.session_state.get("selected_video_id", "") |
| selected_label = st.session_state.get("selected_video_title") |
| selected_channel = st.session_state.get("selected_channel_name") |
| |
| if selected_id: |
| if not selected_label or not selected_channel: |
| |
| cached = st.session_state.get("cached_videos") or [] |
| video_url = None |
| for video in cached: |
| if video.get("video_id") == selected_id: |
| video_url = video.get("url") or f"https://www.youtube.com/watch?v={selected_id}" |
| break |
| if video_url: |
| metadata = _fetch_youtube_metadata(video_url) |
| if metadata: |
| selected_label = metadata.get("title", selected_id) |
| selected_channel = metadata.get("author_name", "") |
| st.session_state["selected_video_title"] = selected_label |
| st.session_state["selected_channel_name"] = selected_channel |
| else: |
| selected_label = selected_label or selected_id |
| selected_channel = selected_channel or "" |
| options.append((selected_id, f"{selected_label} ({selected_channel})")) |
| for video in cached: |
| video_id = video.get("video_id") |
| video_title = video.get("title") |
| channel_name = video.get("channel_name") |
| video_url = video.get("url") or f"https://www.youtube.com/watch?v={video_id}" |
| |
| if not video_title or not channel_name: |
| metadata = _fetch_youtube_metadata(video_url) |
| if metadata: |
| video_title = metadata.get("title", video_id) |
| channel_name = metadata.get("author_name", "") |
| else: |
| video_title = video_title or video_id |
| channel_name = channel_name or "" |
| if video_id and video_id != selected_id: |
| options.append((video_id, f"{video_title} ({channel_name})")) |
| if not options: |
| options.append(("", "No cached videos yet")) |
| |
| choice = st.selectbox("Video Title", options, key=key, format_func=lambda item: item[1]) |
| video_id = choice[0] if isinstance(choice, tuple) else choice |
| if video_id: |
| st.session_state["selected_video_id"] = video_id |
| |
| for video in cached: |
| if video.get("video_id") == video_id: |
| st.session_state["selected_video_title"] = video.get("title", video_id) |
| st.session_state["selected_channel_name"] = video.get("channel_name", "") |
| break |
| return video_id |
|
|
|
|
| def _render_step_two() -> None: |
| with _step_expander(2, "Step 2 โ Browse bookmarks"): |
| video_id = _select_video("Video", key="bookmarks_video_select") |
| cols = st.columns(2) |
| min_sections = cols[0].slider("Min sections", 1, 10, 3) |
| max_sections = cols[1].slider("Max sections", 3, 15, 8) |
| if st.button("Browse bookmarks", key="browse_bookmarks_btn"): |
| with st.spinner("Fetching bookmarks..."): |
| if not video_id: |
| st.warning("Select a video first.") |
| return |
| try: |
| bookmarks = list(fetch_bookmarks(video_id, max_sections=max_sections, min_sections=min_sections)) |
| st.session_state["bookmarks_by_video"][video_id] = bookmarks |
| st.session_state["bookmark_summaries_by_video"][video_id] = _summarize_bookmarks(video_id, bookmarks) |
| _advance_to_step(2) |
| except RuntimeError as exc: |
| st.error(str(exc)) |
|
|
| _bookmarks = st.session_state["bookmarks_by_video"].get(video_id) or [] |
| _summaries = st.session_state["bookmark_summaries_by_video"].get(video_id) or [] |
| if _bookmarks: |
| st.markdown("#### Bookmarks") |
| for idx, bookmark in enumerate(_bookmarks): |
| timestamp = int(bookmark.get("timestamp_seconds") or 0) |
| ts_str = bookmark.get("timestamp") or seconds_to_timestamp(timestamp) |
| link = youtube_timestamp_link(video_id, timestamp) |
| title = bookmark.get("title", f"Section {idx + 1}") |
| st.markdown(f"**{idx + 1}. {title}** โ `{ts_str}` [Watch โ]({link})") |
| summary = _summaries[idx]["summary"] if idx < len(_summaries) else bookmark.get("description", "") |
| st.write(summary) |
| sources = _summaries[idx].get("sources", []) if idx < len(_summaries) else [] |
| if sources: |
| with st.expander("Sources", expanded=False): |
| for source in sources: |
| ts = int(source.get("timestamp_seconds") or 0) |
| ts_str_source = source.get("timestamp") or seconds_to_timestamp(ts) |
| link_source = youtube_timestamp_link(source.get("video_id", video_id), ts) |
| st.write(f"[{ts_str_source}] {source.get('text', '')}") |
| st.markdown(f"[Open clip โ]({link_source})") |
| st.divider() |
|
|
|
|
| def _summarize_bookmarks(video_id: str, bookmarks: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
| results: List[Dict[str, Any]] = [] |
| for idx, bookmark in enumerate(bookmarks): |
| bookmark_title = bookmark.get("title") or f"Section {idx + 1}" |
| prompt = ( |
| f"Summarize the section of the lecture about '{bookmark_title}' in {BOOKMARK_SUMMARY_WORDS} words or fewer. Highlight the key ideas only." |
| ) |
| try: |
| response = ask_question(video_id, prompt) |
| results.append({"summary": response.get("answer", ""), "sources": response.get("sources", [])}) |
| except RuntimeError: |
| results.append({"summary": bookmark.get("description", "Summary unavailable."), "sources": []}) |
| return results |
|
|
|
|
| def _render_step_three() -> None: |
| with _step_expander(3, "Step 3 โ Summary & quizzes"): |
| video_id = _select_video("Video", key="summary_video_select") |
| style_display = st.selectbox( |
| "Quiz style", |
| [ |
| "Mixed", |
| "Multiple Choice Questions (MCQ)", |
| "Open-ended Questions", |
| ], |
| key="quiz_style_display", |
| ) |
| style_map = { |
| "Mixed": "mixed", |
| "Multiple Choice Questions (MCQ)": "mcq", |
| "Open-ended Questions": "open", |
| } |
| num_questions = st.slider("Number of questions", 1, 10, 3, key="quiz_question_slider") |
| max_words = st.slider("Summary length (words)", 200, 500, 200, key="summary_word_slider") |
| if st.button("Summary & quizzes", key="summary_quiz_btn"): |
| with st.spinner("Generating summary and quiz..."): |
| if not video_id: |
| st.warning("Select a video first.") |
| return |
| try: |
| st.session_state["summary_by_video"][video_id] = fetch_summary(video_id, max_words=max_words) |
| st.session_state["quiz_by_video"][video_id] = list( |
| fetch_quiz(video_id, num_questions=num_questions, style=style_map[style_display]) |
| ) |
| _advance_to_step(3) |
| except RuntimeError as exc: |
| st.error(str(exc)) |
| summary_payload = st.session_state["summary_by_video"].get(video_id) |
| quiz_items = st.session_state["quiz_by_video"].get(video_id) |
| bookmark_summaries = st.session_state["bookmark_summaries_by_video"].get(video_id, []) |
| if summary_payload or quiz_items: |
| overall_tab, quiz_tab = st.tabs(["Overall", "Quiz"]) |
| with overall_tab: |
| if summary_payload: |
| summary_text = summary_payload.get("summary", "") |
| st.write(summary_text) |
| if summary_text.strip() == "Insufficient context.": |
| st.warning("If you see 'Insufficient context.', the lecture may not be fully processed yetโtry re-running ingestion or checking the video's status.") |
| else: |
| st.info("Run the summary to populate this tab.") |
| with quiz_tab: |
| if quiz_items: |
| for idx, item in enumerate(quiz_items, start=1): |
| st.markdown(f"**Q{idx}: {item.get('question')}**") |
| choices = item.get("choices") or [] |
| for choice in choices: |
| st.write(f"- {choice}") |
| st.write(f"**Answer:** {item.get('answer')}") |
| if item.get("explanation"): |
| st.caption(item["explanation"]) |
| else: |
| st.info("Generate a quiz to populate this tab.") |
|
|
|
|
| def _render_step_four() -> None: |
| with _step_expander(5, "Step 5 โ Smart Search (Optional)"): |
| video_id = _select_video("Video", key="search_video_select") |
| query = st.text_area( |
| "Query", |
| key="semantic_query_input", |
| height=80, |
| placeholder="Locate the key segments in the video that discusses the core subject of the video (please be as specific as possible)." |
| ) |
| st.caption("Enter a natural language query to find relevant segments in the video. For example, you can ask for key topics, explanations, or specific concepts. Please be as specific as possible in your query.") |
| limit = st.slider("Results", 1, 10, 4, key="semantic_limit_slider") |
| st.caption("The number of top matching segments to display based on your query.") |
| if st.button("Search", key="semantic_search_btn"): |
| with st.spinner("Searching..."): |
| if not video_id or not query: |
| st.warning("Provide a video and query.") |
| return |
| try: |
| results = list(semantic_search(query, video_id, limit)) |
| st.session_state["search_results_by_video"][video_id] = results |
| _advance_to_step(4) |
| except RuntimeError as exc: |
| st.error(str(exc)) |
|
|
| results = st.session_state["search_results_by_video"].get(video_id) or [] |
| for item in results: |
| ts = int(item.get("timestamp_seconds") or 0) |
| ts_str = seconds_to_timestamp(ts) |
| link = youtube_timestamp_link(video_id, ts) |
| st.markdown(f"`{ts_str}` {item.get('text', '')}") |
| st.markdown(f"[Open clip โ]({link})") |
|
|
|
|
| def _render_step_five() -> None: |
| with _step_expander(4, "Step 4 โ Q&A"): |
| video_id = _select_video("Video", key="qa_video_select") |
| question = st.text_area( |
| "Question", |
| key="qa_question_input", |
| height=80, |
| placeholder="Ask a grounded question" |
| ) |
| if st.button("Run", key="qa_generate_btn"): |
| if not video_id or not question: |
| st.warning("Provide a question and select a video.") |
| return |
| try: |
| response = ask_question(video_id, question) |
| history = st.session_state["qa_history_by_video"].setdefault(video_id, []) |
| history.append({"question": question, "answer": response.get("answer", ""), "sources": response.get("sources", [])}) |
| _advance_to_step(5) |
| except RuntimeError as exc: |
| st.error(str(exc)) |
|
|
| history = st.session_state["qa_history_by_video"].get(video_id) or [] |
| if history: |
| st.markdown("#### Multi-turn Q&A History") |
| for idx, item in enumerate(history, start=1): |
| st.markdown(f"<div style='background:#FFF0F5;border-radius:10px;padding:14px 14px 10px 14px;margin-bottom:12px;border:1px solid #F63366;'>" |
| f"<b style='color:#F63366'>Q{idx}:</b> <span style='color:#D72660;font-weight:500'>{item.get('question')}</span><br>" |
| f"<b style='color:#F63366'>A{idx}:</b> <span style='color:#333'>{item.get('answer', '')}</span>" |
| "</div>", unsafe_allow_html=True) |
| if item.get("sources"): |
| with st.expander("Sources", expanded=False): |
| for source in item["sources"]: |
| ts = int(source.get("timestamp_seconds") or 0) |
| ts_str = source.get("timestamp") or seconds_to_timestamp(ts) |
| link = youtube_timestamp_link(source.get("video_id", video_id), ts) |
| st.write(f"[{ts_str}] {source.get('text', '')}") |
| st.markdown(f"[Open clip โ]({link})") |
|
|
|
|
| def home_page() -> None: |
| st.markdown(""" |
| <div style='display: flex; align-items: center;'> |
| <span style='font-size: 2.5rem; margin-right: 0.5rem;'>๐</span> |
| <span style='font-size: 2.2rem; font-weight: bold;'>Chaptive AI</span> |
| </div> |
| """, unsafe_allow_html=True) |
| st.markdown("<span style='font-size:1.3rem;color:#F63366;font-weight:500;'>Your AI-powered YouTube study companion</span>", unsafe_allow_html=True) |
| st.caption("Chaptive AI: Transcribe, Summarize, Generate Bookmarks, Quizzes and Q&A.") |
| _init_session_state() |
| _render_sidebar_stepper() |
| _render_api_status() |
| _render_step_one() |
| _render_step_two() |
| _render_step_three() |
| _render_step_five() |
| _render_step_four() |
|
|
|
|
| if __name__ == "__main__": |
| st.set_page_config(page_title="Chaptive AI", page_icon="๐", layout="wide") |
| home_page() |