Chaptive / src /pages /_home_page.py
Jing997's picture
change app name
497bf08
from __future__ import annotations
from typing import Any, Dict, List, Tuple
import streamlit as st
from utils.helpers import (
ask_question,
fetch_bookmarks,
fetch_job_status,
fetch_quiz,
fetch_summary,
fetch_youtube_metadata_v3,
get_image_base64,
get_remote_image_url,
list_cached_videos,
ping_health,
resolve_image_source,
semantic_search,
seconds_to_timestamp,
start_ingestion,
youtube_timestamp_link,
)
from utils.settings import APPROVED_CHANNELS
BOOKMARK_SUMMARY_WORDS = 100
def _init_session_state() -> None:
defaults = {
"step_done": 0,
"auto_open": {1: True, 2: False, 3: False, 4: False, 5: False},
"selected_video_id": "",
"selected_video_title": "",
"selected_channel_name": "",
"job_id_by_video": {},
"recent_job_status": None,
"cached_videos": [],
"bookmarks_by_video": {},
"bookmark_summaries_by_video": {},
"summary_by_video": {},
"quiz_by_video": {},
"search_results_by_video": {},
"qa_history_by_video": {},
"step1_tab_choice": "Insert YouTube URL",
}
for key, value in defaults.items():
st.session_state.setdefault(key, value)
st.session_state.setdefault("chat_history", [])
st.session_state.setdefault("chat_video_id", "")
if not st.session_state.get("cached_videos"):
st.session_state["cached_videos"] = list_cached_videos()
def _is_step_open(step: int) -> bool:
return bool(st.session_state.get("auto_open", {}).get(step, False))
def _advance_to_step(step: int) -> None:
st.session_state["step_done"] = max(st.session_state["step_done"], step)
if step < 5:
st.session_state["auto_open"][step] = False
st.session_state["auto_open"][step + 1] = True
_set_expander_override(_step_expander_key(step + 1), True)
def _expander_overrides() -> Dict[str, bool]:
return st.session_state.setdefault("_expander_overrides", {})
def _set_expander_override(key: str, expanded: bool) -> None:
_expander_overrides()[key] = expanded
st.session_state.pop(f"_expander_initialized_{key}", None)
def _expander_container(key: str, label: str, default_open: bool):
overrides = _expander_overrides()
init_key = f"_expander_initialized_{key}"
expanded_arg = None
if key in overrides:
expanded_arg = overrides.pop(key)
elif not st.session_state.get(init_key):
expanded_arg = default_open
if expanded_arg is None:
container = st.expander(label)
else:
container = st.expander(label, expanded=expanded_arg)
st.session_state[init_key] = True
return container
def _step_expander_key(step: int) -> str:
return f"step_{step}"
def _step_expander(step: int, label: str):
return _expander_container(_step_expander_key(step), label, _is_step_open(step))
def _named_expander(key: str, label: str, default_open: bool = False):
return _expander_container(key, label, default_open)
def _keep_step_expander_open(step: int) -> None:
_set_expander_override(_step_expander_key(step), True)
def _refresh_cached_videos_list() -> None:
if hasattr(list_cached_videos, "clear"):
list_cached_videos.clear()
st.session_state["cached_videos"] = list_cached_videos()
def _render_sidebar_stepper() -> None:
with st.sidebar:
st.markdown("### Progress")
sidebar_labels = [
"Insert YouTube Lecture URL",
"Browse bookmarks",
"Summary & Quizzes",
"Q&A",
"Smart Search (Optional)",
]
for step, label in enumerate(sidebar_labels, start=1):
status = "โœ…" if st.session_state["step_done"] >= step else "โฌœ๏ธ"
st.write(f"{status} Step {step}: {label}")
def _render_api_status() -> None:
try:
health = ping_health()
if health.get("status") == "ok":
st.caption("โœ… API online")
else:
st.caption("โŒ API offline")
except RuntimeError as exc:
st.caption(f"โŒ {str(exc)}")
def _run_status_popover() -> None:
with st.popover("What does Run and Job Refresh Status do?"):
st.markdown(
"- Checks whether the lecture artifacts already exist in S3\n"
"- Queues ingestion via the FastAPI backend if missing\n"
"- `Refresh job status` polls `/videos/process/{job_id}` until the artifacts are ready."
)
def _render_whitelist_content() -> None:
channel_data = [
("MIT OpenCourseWare.png", "https://www.youtube.com/@mitocw", "MIT OpenCourseWare"),
("Stanford Online.png", "https://www.youtube.com/@stanfordonline", "Stanford Online"),
("Harvard.png", "https://www.youtube.com/@harvard", "Harvard"),
("freeCodeCamp.png", "https://www.youtube.com/@freecodecamp", "freeCodeCamp.org"),
("TEDx.png", "https://www.youtube.com/@TEDx", "TEDx Talks"),
("SciShow.png", "https://www.youtube.com/@SciShow", "SciShow"),
("Udacity.png", "https://www.youtube.com/@Udacity", "Udacity"),
("ProgrammingWithMosh.png", "https://www.youtube.com/@programmingwithmosh", "Programming with Mosh"),
("Computerphile.png", "https://www.youtube.com/@Computerphile", "Computerphile"),
("edX.png", "https://www.youtube.com/@edXOnline", "edX"),
("TED.png", "https://www.youtube.com/@TED", "TED"),
("YaleCourses.png", "https://www.youtube.com/@YaleCourses", "YaleCourses"),
("Veritasium.png", "https://www.youtube.com/@veritasium", "Veritasium"),
("3blue1brown.png", "https://www.youtube.com/@3blue1brown", "3blue1brown"),
("CrashCourse.png", "https://www.youtube.com/@crashcourse", "CrashCourse"),
("KhanAcademy.png", "https://www.youtube.com/@khanacademy", "Khan Academy"),
("minutephysics.png", "https://www.youtube.com/@minutephysics", "minutephysics"),
("numberphile.png", "https://www.youtube.com/@numberphile", "numberphile"),
("TEDEd.png", "https://www.youtube.com/@TEDEd", "TED-Ed"),
("coursera.png", "https://www.youtube.com/@coursera", "Coursera"),
]
num_cols = 4
for row_start in range(0, len(channel_data), num_cols):
cols = st.columns(num_cols)
for col_idx in range(num_cols):
idx = row_start + col_idx
if idx >= len(channel_data):
continue
img_filename, url, alt = channel_data[idx]
img_base64 = get_image_base64(img_filename)
img_src = (
f"data:image/png;base64,{img_base64}"
if img_base64
else get_remote_image_url(img_filename)
)
with cols[col_idx]:
st.markdown(
f"""
<a href='{url}' target='_blank'>
<img src='{img_src}' style='display:block;margin:auto;width:48px;height:48px;object-fit:contain;' alt='{alt}'/>
</a>
""",
unsafe_allow_html=True,
)
st.markdown(
f"<div style='text-align:center;'><a href='{url}' target='_blank'>{alt}</a></div>",
unsafe_allow_html=True,
)
def _whitelist_popover() -> None:
with st.popover("Whitelisted YouTube Channels โœ…"):
_render_whitelist_content()
def _fetch_youtube_metadata(video_url: str) -> Dict[str, Any] | None:
if not video_url:
return None
try:
metadata = fetch_youtube_metadata_v3(video_url)
if metadata:
return metadata
except RuntimeError as exc:
st.warning(str(exc))
return None
return None
def _render_whitelist_expander() -> None:
channel_data = [
("MIT OpenCourseWare.png", "https://www.youtube.com/@mitocw", "MIT OpenCourseWare"),
("Stanford Online.png", "https://www.youtube.com/@stanfordonline", "Stanford Online"),
("Harvard.png", "https://www.youtube.com/@harvard", "Harvard"),
("freeCodeCamp.png", "https://www.youtube.com/@freecodecamp", "freeCodeCamp.org"),
("TEDx.png", "https://www.youtube.com/@TEDx", "TEDx Talks"),
("SciShow.png", "https://www.youtube.com/@SciShow", "SciShow"),
("Udacity.png", "https://www.youtube.com/@Udacity", "Udacity"),
("ProgrammingWithMosh.png", "https://www.youtube.com/@programmingwithmosh", "Programming with Mosh"),
("Computerphile.png", "https://www.youtube.com/@Computerphile", "Computerphile"),
("edX.png", "https://www.youtube.com/@edXOnline", "edX"),
("TED.png", "https://www.youtube.com/@TED", "TED"),
("YaleCourses.png", "https://www.youtube.com/@YaleCourses", "YaleCourses"),
("Veritasium.png", "https://www.youtube.com/@veritasium", "Veritasium"),
("3blue1brown.png", "https://www.youtube.com/@3blue1brown", "3blue1brown"),
("CrashCourse.png", "https://www.youtube.com/@crashcourse", "CrashCourse"),
("KhanAcademy.png", "https://www.youtube.com/@khanacademy", "Khan Academy"),
("minutephysics.png", "https://www.youtube.com/@minutephysics", "minutephysics"),
("numberphile.png", "https://www.youtube.com/@numberphile", "numberphile"),
("TEDEd.png", "https://www.youtube.com/@TEDEd", "TED-Ed"),
("coursera.png", "https://www.youtube.com/@coursera", "Coursera"),
]
num_cols = 4
with st.expander("Whitelisted YouTube Channels โœ…", expanded=False):
for row_start in range(0, len(channel_data), num_cols):
cols = st.columns(num_cols)
for col_idx in range(num_cols):
idx = row_start + col_idx
if idx >= len(channel_data):
continue
img_filename, url, alt = channel_data[idx]
img_base64 = get_image_base64(img_filename)
img_src = (
f"data:image/png;base64,{img_base64}"
if img_base64
else get_remote_image_url(img_filename)
)
with cols[col_idx]:
st.markdown(
f"""
<a href='{url}' target='_blank'>
<img src='{img_src}' style='display:block;margin:auto;width:48px;height:48px;object-fit:contain;' alt='{alt}'/>
</a>
""",
unsafe_allow_html=True,
)
st.markdown(
f"<div style='text-align:center;'><a href='{url}' target='_blank'>{alt}</a></div>",
unsafe_allow_html=True,
)
def _render_step_one() -> None:
with _step_expander(1, "Step 1 โ€” Load YouTube Lecture"):
tab_options = ["Insert YouTube URL", "Use Cached Video"]
default_tab = st.session_state.get("step1_tab_choice", tab_options[0])
tab_choice = st.pills(
options=tab_options,
format_func=None,
label="",
help=None,
key="step1_tab_pill",
disabled=False,
label_visibility="collapsed"
)
st.session_state["step1_tab_choice"] = tab_choice
if tab_choice == tab_options[0]:
video_url = st.text_input(
"YouTube URL",
value=st.session_state.get("step1_video_url", ""),
placeholder="https://www.youtube.com/watch?v=GOgA8JGUiwI",
key="step1_video_url_input",
)
metadata = st.session_state.get("step1_metadata")
if video_url and video_url != st.session_state.get("step1_metadata_url"):
metadata = _fetch_youtube_metadata(video_url)
st.session_state["step1_metadata"] = metadata
st.session_state["step1_metadata_url"] = video_url
if metadata:
st.markdown(f"<span style='font-size:1.3rem;color:#F63366;font-weight:600;'>{metadata.get('title', 'Untitled lecture')}</span>", unsafe_allow_html=True)
st.markdown(f"<span style='font-size:1.3rem;color:#666;font-weight:500;'>Channel: {metadata.get('author_name', 'Unknown')}</span>", unsafe_allow_html=True)
st.video(video_url)
elif video_url:
st.warning("Could not fetch metadata for this URL.")
run_col, refresh_col = st.columns([3, 1])
with run_col:
if st.button("Run", type="primary", disabled=not bool(video_url), key="run_btn_step1"):
with st.spinner("Running ingestion..."):
_keep_step_expander_open(1)
_handle_run(video_url, metadata) if video_url else None
with refresh_col:
if st.button(
"Refresh Job Status",
disabled=not bool(video_url),
key="refresh_job_status_btn"
):
_keep_step_expander_open(1)
_handle_refresh_job_status()
if st.session_state.get("recent_job_status"):
status = st.session_state["recent_job_status"]
state_raw = status.get("state", "unknown") or "unknown"
state = state_raw.lower()
message = status.get("message", "")
job_id = status.get("job_id")
show_processing_hint = False
processing_states = {"queued", "pending", "running", "processing", "started", "submitted"}
if state == "failed" and "not in the approved list" in message:
st.error(f"Job {job_id} โ€” failed: YouTube Channel not in the approved list.")
elif state == "failed":
st.error(f"Job {job_id} โ€” failed: {message}")
else:
st.info(f"Job {job_id} โ€” {state_raw}: {message}")
if state in processing_states:
show_processing_hint = True
if show_processing_hint:
st.info("Processing may take some time to ingest the YouTube video, transcribe it, and convert it into embeddings.")
col_pop_left, col_pop_right = st.columns(2)
with col_pop_left:
_whitelist_popover()
with col_pop_right:
_run_status_popover()
else:
st.markdown("#### Use an existing cached lecture")
_render_cached_videos()
_whitelist_popover()
def _handle_run(video_url: str, metadata: Dict[str, Any] | None) -> None:
if not video_url or not metadata:
st.warning("Provide a valid YouTube URL and ensure metadata loads before running.")
return
try:
response = start_ingestion(video_url)
except RuntimeError as exc:
st.error(str(exc))
_render_whitelist_hint()
return
video_id = response.get("video_id")
job_id = response.get("job_id")
if video_id and job_id:
st.session_state["job_id_by_video"][video_id] = job_id
st.session_state["selected_video_id"] = video_id
st.session_state["selected_video_title"] = metadata.get("title", video_id)
st.session_state["selected_channel_name"] = metadata.get("author_name", "")
# Instead of showing ingestion queued, check job status first
try:
status = fetch_job_status(job_id)
st.session_state["recent_job_status"] = status
except RuntimeError as exc:
st.session_state["recent_job_status"] = {"job_id": job_id, "state": "failed", "message": str(exc)}
_advance_to_step(1)
else:
st.warning("Backend did not return a job_id/video_id. Check logs.")
def _handle_refresh_job_status() -> None:
video_id = st.session_state.get("selected_video_id")
if not video_id:
st.info("Select or ingest a video first.")
return
job_id = st.session_state["job_id_by_video"].get(video_id)
if not job_id:
st.warning("No job associated with this video yet.")
return
try:
status = fetch_job_status(job_id)
st.session_state["recent_job_status"] = status
_refresh_cached_videos_list()
# Do NOT advance step or change auto_open here, so expander state is preserved
st.info(f"State: {status.get('state')} โ€” {status.get('message', '')}")
except RuntimeError as exc:
st.error(str(exc))
def _render_whitelist_hint() -> None:
st.warning("Channel not currently approved. Approved channels include:")
st.write(", ".join(APPROVED_CHANNELS))
def _render_cached_videos() -> None:
if st.button("Refresh list", key="refresh_cache_list", use_container_width=True):
_refresh_cached_videos_list()
cached = st.session_state.get("cached_videos") or []
if not cached:
st.info("No cached videos detected yet.")
return
signature = [
(
video.get("video_id"),
video.get("title"),
video.get("channel_name"),
video.get("thumbnail_url"),
)
for video in cached
]
if st.session_state.get("_cached_videos_signature") != signature:
processed_videos: List[Dict[str, Any]] = []
for video in cached:
video_id = video.get("video_id")
video_title = video.get("title")
channel_name = video.get("channel_name")
thumbnail_url = video.get("thumbnail_url")
video_url = video.get("url") or f"https://www.youtube.com/watch?v={video_id}"
if not video_title or not channel_name:
metadata = _fetch_youtube_metadata(video_url)
if metadata:
video_title = metadata.get("title", video_id)
channel_name = metadata.get("author_name", "")
processed_videos.append(
{
"video_id": video_id,
"video_title": video_title or video_id,
"channel_name": channel_name or "",
"thumbnail_url": thumbnail_url,
"video_url": video_url,
}
)
st.session_state["_cached_videos_signature"] = signature
st.session_state["_cached_videos_processed"] = processed_videos
processed_videos = st.session_state.get("_cached_videos_processed", [])
with st.container(height=400):
if processed_videos:
for video in processed_videos:
cols = st.columns([1, 3, 1])
thumbnail_url = video.get("thumbnail_url")
video_id = video.get("video_id")
video_title = video.get("video_title")
channel_name = video.get("channel_name")
video_url = video.get("video_url")
if thumbnail_url:
cols[0].image(thumbnail_url, use_container_width=True)
else:
yt_thumb = f"https://img.youtube.com/vi/{video_id}/hqdefault.jpg"
cols[0].image(yt_thumb, use_container_width=True)
cols[1].markdown(f"<span style='font-size:1.1rem;font-weight:600;color:#F63366;'>Video: {video_title}</span>", unsafe_allow_html=True)
cols[1].markdown(f"<span style='font-size:1.1rem;font-weight:500;color:#666;'>Channel: {channel_name}</span>", unsafe_allow_html=True)
cols[1].markdown(f"[Watch on YouTube โ†—]({video_url})")
if cols[2].button("Use this video", key=f"use_{video_id}"):
st.session_state["selected_video_id"] = video_id
st.session_state["selected_video_title"] = video_title
st.session_state["selected_channel_name"] = channel_name
_advance_to_step(1)
else:
st.markdown("<div style='height:100%;display:flex;align-items:center;justify-content:center;'><span style='color:#888;'>No cached videos detected yet.</span></div>", unsafe_allow_html=True)
def _select_video(label: str, key: str) -> str:
cached = st.session_state.get("cached_videos") or []
options: List[Tuple[str, str]] = []
selected_id = st.session_state.get("selected_video_id", "")
selected_label = st.session_state.get("selected_video_title")
selected_channel = st.session_state.get("selected_channel_name")
# If selected_label or selected_channel missing, fetch metadata
if selected_id:
if not selected_label or not selected_channel:
# Find video_url from cached videos
cached = st.session_state.get("cached_videos") or []
video_url = None
for video in cached:
if video.get("video_id") == selected_id:
video_url = video.get("url") or f"https://www.youtube.com/watch?v={selected_id}"
break
if video_url:
metadata = _fetch_youtube_metadata(video_url)
if metadata:
selected_label = metadata.get("title", selected_id)
selected_channel = metadata.get("author_name", "")
st.session_state["selected_video_title"] = selected_label
st.session_state["selected_channel_name"] = selected_channel
else:
selected_label = selected_label or selected_id
selected_channel = selected_channel or ""
options.append((selected_id, f"{selected_label} ({selected_channel})"))
for video in cached:
video_id = video.get("video_id")
video_title = video.get("title")
channel_name = video.get("channel_name")
video_url = video.get("url") or f"https://www.youtube.com/watch?v={video_id}"
# If title or channel_name missing, fetch metadata
if not video_title or not channel_name:
metadata = _fetch_youtube_metadata(video_url)
if metadata:
video_title = metadata.get("title", video_id)
channel_name = metadata.get("author_name", "")
else:
video_title = video_title or video_id
channel_name = channel_name or ""
if video_id and video_id != selected_id:
options.append((video_id, f"{video_title} ({channel_name})"))
if not options:
options.append(("", "No cached videos yet"))
# Always use 'Video Title' as the field name
choice = st.selectbox("Video Title", options, key=key, format_func=lambda item: item[1])
video_id = choice[0] if isinstance(choice, tuple) else choice
if video_id:
st.session_state["selected_video_id"] = video_id
# Update title and channel name for selected video
for video in cached:
if video.get("video_id") == video_id:
st.session_state["selected_video_title"] = video.get("title", video_id)
st.session_state["selected_channel_name"] = video.get("channel_name", "")
break
return video_id
def _render_step_two() -> None:
with _step_expander(2, "Step 2 โ€” Browse bookmarks"):
video_id = _select_video("Video", key="bookmarks_video_select")
cols = st.columns(2)
min_sections = cols[0].slider("Min sections", 1, 10, 3)
max_sections = cols[1].slider("Max sections", 3, 15, 8)
if st.button("Browse bookmarks", key="browse_bookmarks_btn"):
with st.spinner("Fetching bookmarks..."):
if not video_id:
st.warning("Select a video first.")
return
try:
bookmarks = list(fetch_bookmarks(video_id, max_sections=max_sections, min_sections=min_sections))
st.session_state["bookmarks_by_video"][video_id] = bookmarks
st.session_state["bookmark_summaries_by_video"][video_id] = _summarize_bookmarks(video_id, bookmarks)
_advance_to_step(2)
except RuntimeError as exc:
st.error(str(exc))
_bookmarks = st.session_state["bookmarks_by_video"].get(video_id) or []
_summaries = st.session_state["bookmark_summaries_by_video"].get(video_id) or []
if _bookmarks:
st.markdown("#### Bookmarks")
for idx, bookmark in enumerate(_bookmarks):
timestamp = int(bookmark.get("timestamp_seconds") or 0)
ts_str = bookmark.get("timestamp") or seconds_to_timestamp(timestamp)
link = youtube_timestamp_link(video_id, timestamp)
title = bookmark.get("title", f"Section {idx + 1}")
st.markdown(f"**{idx + 1}. {title}** โ€” `{ts_str}` [Watch โ†—]({link})")
summary = _summaries[idx]["summary"] if idx < len(_summaries) else bookmark.get("description", "")
st.write(summary)
sources = _summaries[idx].get("sources", []) if idx < len(_summaries) else []
if sources:
with st.expander("Sources", expanded=False):
for source in sources:
ts = int(source.get("timestamp_seconds") or 0)
ts_str_source = source.get("timestamp") or seconds_to_timestamp(ts)
link_source = youtube_timestamp_link(source.get("video_id", video_id), ts)
st.write(f"[{ts_str_source}] {source.get('text', '')}")
st.markdown(f"[Open clip โ†—]({link_source})")
st.divider()
def _summarize_bookmarks(video_id: str, bookmarks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
results: List[Dict[str, Any]] = []
for idx, bookmark in enumerate(bookmarks):
bookmark_title = bookmark.get("title") or f"Section {idx + 1}"
prompt = (
f"Summarize the section of the lecture about '{bookmark_title}' in {BOOKMARK_SUMMARY_WORDS} words or fewer. Highlight the key ideas only."
)
try:
response = ask_question(video_id, prompt)
results.append({"summary": response.get("answer", ""), "sources": response.get("sources", [])})
except RuntimeError:
results.append({"summary": bookmark.get("description", "Summary unavailable."), "sources": []})
return results
def _render_step_three() -> None:
with _step_expander(3, "Step 3 โ€” Summary & quizzes"):
video_id = _select_video("Video", key="summary_video_select")
style_display = st.selectbox(
"Quiz style",
[
"Mixed",
"Multiple Choice Questions (MCQ)",
"Open-ended Questions",
],
key="quiz_style_display",
)
style_map = {
"Mixed": "mixed",
"Multiple Choice Questions (MCQ)": "mcq",
"Open-ended Questions": "open",
}
num_questions = st.slider("Number of questions", 1, 10, 3, key="quiz_question_slider")
max_words = st.slider("Summary length (words)", 200, 500, 200, key="summary_word_slider")
if st.button("Summary & quizzes", key="summary_quiz_btn"):
with st.spinner("Generating summary and quiz..."):
if not video_id:
st.warning("Select a video first.")
return
try:
st.session_state["summary_by_video"][video_id] = fetch_summary(video_id, max_words=max_words)
st.session_state["quiz_by_video"][video_id] = list(
fetch_quiz(video_id, num_questions=num_questions, style=style_map[style_display])
)
_advance_to_step(3)
except RuntimeError as exc:
st.error(str(exc))
summary_payload = st.session_state["summary_by_video"].get(video_id)
quiz_items = st.session_state["quiz_by_video"].get(video_id)
bookmark_summaries = st.session_state["bookmark_summaries_by_video"].get(video_id, [])
if summary_payload or quiz_items:
overall_tab, quiz_tab = st.tabs(["Overall", "Quiz"])
with overall_tab:
if summary_payload:
summary_text = summary_payload.get("summary", "")
st.write(summary_text)
if summary_text.strip() == "Insufficient context.":
st.warning("If you see 'Insufficient context.', the lecture may not be fully processed yetโ€”try re-running ingestion or checking the video's status.")
else:
st.info("Run the summary to populate this tab.")
with quiz_tab:
if quiz_items:
for idx, item in enumerate(quiz_items, start=1):
st.markdown(f"**Q{idx}: {item.get('question')}**")
choices = item.get("choices") or []
for choice in choices:
st.write(f"- {choice}")
st.write(f"**Answer:** {item.get('answer')}")
if item.get("explanation"):
st.caption(item["explanation"])
else:
st.info("Generate a quiz to populate this tab.")
def _render_step_four() -> None:
with _step_expander(5, "Step 5 โ€” Smart Search (Optional)"):
video_id = _select_video("Video", key="search_video_select")
query = st.text_area(
"Query",
key="semantic_query_input",
height=80,
placeholder="Locate the key segments in the video that discusses the core subject of the video (please be as specific as possible)."
)
st.caption("Enter a natural language query to find relevant segments in the video. For example, you can ask for key topics, explanations, or specific concepts. Please be as specific as possible in your query.")
limit = st.slider("Results", 1, 10, 4, key="semantic_limit_slider")
st.caption("The number of top matching segments to display based on your query.")
if st.button("Search", key="semantic_search_btn"):
with st.spinner("Searching..."):
if not video_id or not query:
st.warning("Provide a video and query.")
return
try:
results = list(semantic_search(query, video_id, limit))
st.session_state["search_results_by_video"][video_id] = results
_advance_to_step(4)
except RuntimeError as exc:
st.error(str(exc))
results = st.session_state["search_results_by_video"].get(video_id) or []
for item in results:
ts = int(item.get("timestamp_seconds") or 0)
ts_str = seconds_to_timestamp(ts)
link = youtube_timestamp_link(video_id, ts)
st.markdown(f"`{ts_str}` {item.get('text', '')}")
st.markdown(f"[Open clip โ†—]({link})")
def _render_step_five() -> None:
with _step_expander(4, "Step 4 โ€” Q&A"):
video_id = _select_video("Video", key="qa_video_select")
question = st.text_area(
"Question",
key="qa_question_input",
height=80,
placeholder="Ask a grounded question"
)
if st.button("Run", key="qa_generate_btn"):
if not video_id or not question:
st.warning("Provide a question and select a video.")
return
try:
response = ask_question(video_id, question)
history = st.session_state["qa_history_by_video"].setdefault(video_id, [])
history.append({"question": question, "answer": response.get("answer", ""), "sources": response.get("sources", [])})
_advance_to_step(5)
except RuntimeError as exc:
st.error(str(exc))
history = st.session_state["qa_history_by_video"].get(video_id) or []
if history:
st.markdown("#### Multi-turn Q&A History")
for idx, item in enumerate(history, start=1):
st.markdown(f"<div style='background:#FFF0F5;border-radius:10px;padding:14px 14px 10px 14px;margin-bottom:12px;border:1px solid #F63366;'>"
f"<b style='color:#F63366'>Q{idx}:</b> <span style='color:#D72660;font-weight:500'>{item.get('question')}</span><br>"
f"<b style='color:#F63366'>A{idx}:</b> <span style='color:#333'>{item.get('answer', '')}</span>"
"</div>", unsafe_allow_html=True)
if item.get("sources"):
with st.expander("Sources", expanded=False):
for source in item["sources"]:
ts = int(source.get("timestamp_seconds") or 0)
ts_str = source.get("timestamp") or seconds_to_timestamp(ts)
link = youtube_timestamp_link(source.get("video_id", video_id), ts)
st.write(f"[{ts_str}] {source.get('text', '')}")
st.markdown(f"[Open clip โ†—]({link})")
def home_page() -> None:
st.markdown("""
<div style='display: flex; align-items: center;'>
<span style='font-size: 2.5rem; margin-right: 0.5rem;'>๐Ÿ“š</span>
<span style='font-size: 2.2rem; font-weight: bold;'>Chaptive AI</span>
</div>
""", unsafe_allow_html=True)
st.markdown("<span style='font-size:1.3rem;color:#F63366;font-weight:500;'>Your AI-powered YouTube study companion</span>", unsafe_allow_html=True)
st.caption("Chaptive AI: Transcribe, Summarize, Generate Bookmarks, Quizzes and Q&A.")
_init_session_state()
_render_sidebar_stepper()
_render_api_status()
_render_step_one()
_render_step_two()
_render_step_three()
_render_step_five()
_render_step_four()
if __name__ == "__main__":
st.set_page_config(page_title="Chaptive AI", page_icon="๐Ÿ“š", layout="wide")
home_page()