"""Helper utilities for the Streamlit front-end.""" from __future__ import annotations import base64 import hashlib import hmac import json import os import time from dataclasses import dataclass from functools import lru_cache from pathlib import Path from typing import Any, Dict, Iterable, Optional import requests import streamlit as st from botocore.exceptions import ClientError import boto3 from utils.settings import HF_IMG_BASE_URL DEFAULT_TIMEOUT = 30 _URL_ENV_KEY = "CHAPTLY_API_URL" _API_KEY_ENV_KEY = "CHAPTLY_API_KEY" _HMAC_SECRET_ENV_KEY = "CHAPTLY_API_SECRET" _S3_BUCKET_ENV_KEY = "CHAPTLY_S3_BUCKET" _AWS_REGION_ENV_KEY = "AWS_REGION" _YOUTUBE_API_KEY_ENV = "YOUTUBE_V3_DATA_API_KEY" _SIG_HEADER = "X-Signature" _TS_HEADER = "X-Timestamp" DEFAULT_S3_BUCKET = "chaptly-rag" DEFAULT_AWS_REGION = "ap-southeast-1" IMAGE_REQUEST_TIMEOUT = 10 _IMG_DIR = Path(__file__).resolve().parents[2] / "img" YOUTUBE_API_URL = "https://www.googleapis.com/youtube/v3/videos" @dataclass(frozen=True) class APIConfig: base_url: str api_key: str hmac_secret: str def _get_secret_or_env(key: str, default: str = "") -> str: return os.environ.get(key, default).strip() @st.cache_resource(show_spinner=False) def _get_s3_client(): region = _get_secret_or_env(_AWS_REGION_ENV_KEY, DEFAULT_AWS_REGION) or None return boto3.client("s3", region_name=region) def _get_s3_bucket_name() -> str: return _get_secret_or_env(_S3_BUCKET_ENV_KEY, DEFAULT_S3_BUCKET) or DEFAULT_S3_BUCKET @st.cache_data(show_spinner=False) def get_api_config() -> APIConfig: """Return the API URL/key pair, cached for the session.""" base = _get_secret_or_env(_URL_ENV_KEY) api_key = _get_secret_or_env(_API_KEY_ENV_KEY) secret = _get_secret_or_env(_HMAC_SECRET_ENV_KEY) return APIConfig(base_url=base.rstrip("/"), api_key=api_key, hmac_secret=secret) def _request( method: str, path: str, *, params: Optional[Dict[str, Any]] = None, json_payload: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: config = get_api_config() if not config.base_url: raise RuntimeError("CHAPTLY_API_URL is not configured. Set it in Streamlit secrets or environment.") if not config.api_key: raise RuntimeError("CHAPTLY_API_KEY is not configured. Set it in Streamlit secrets or environment.") if not config.hmac_secret: raise RuntimeError("CHAPTLY_API_SECRET is not configured. Set it in Streamlit secrets or environment.") url = f"{config.base_url}{path}" body_bytes = _serialize_json(json_payload) if json_payload is not None else b"" if json_payload is not None: headers = {"Content-Type": "application/json"} else: headers = {} headers["X-API-Key"] = config.api_key _timestamp, _signature = _sign_request(method, path, body_bytes, config.hmac_secret) headers[_TS_HEADER] = _timestamp headers[_SIG_HEADER] = _signature try: resp = requests.request( method, url, headers=headers, params=params, data=body_bytes if body_bytes else None, timeout=DEFAULT_TIMEOUT, ) except requests.RequestException as exc: raise RuntimeError(f"Network error contacting API: {exc}") from exc if resp.status_code >= 400: error_detail: Any try: error_detail = resp.json() except ValueError: error_detail = resp.text message = _extract_error_message(error_detail) raise RuntimeError(f"API error ({resp.status_code}): {message}") try: return resp.json() except ValueError as exc: raise RuntimeError("API returned invalid JSON") from exc def _serialize_json(payload: Dict[str, Any]) -> bytes: return json.dumps(payload, ensure_ascii=False, separators=(",", ":"), sort_keys=True).encode("utf-8") def _sign_request(method: str, path: str, body: bytes, secret: str) -> tuple[str, str]: timestamp = str(int(time.time())) body_hash = hashlib.sha256(body).hexdigest() canonical = "\n".join([ method.upper(), path, body_hash, timestamp, ]) signature = hmac.new(secret.encode("utf-8"), canonical.encode("utf-8"), hashlib.sha256).hexdigest() return timestamp, signature def _extract_error_message(error_detail: Any) -> str: if isinstance(error_detail, dict): if "detail" in error_detail and isinstance(error_detail["detail"], str): return error_detail["detail"] if "message" in error_detail and isinstance(error_detail["message"], str): return error_detail["message"] return str(error_detail) def ping_health() -> Dict[str, Any]: return _request("GET", "/health") def start_ingestion(video_url: str) -> Dict[str, Any]: return _request("POST", "/videos/process", params={"url": video_url}) def fetch_job_status(job_id: str) -> Dict[str, Any]: return _request("GET", f"/videos/process/{job_id}") def fetch_bookmarks(video_id: str, *, max_sections: int, min_sections: int) -> Iterable[Dict[str, Any]]: payload = _request( "GET", f"/videos/{video_id}/bookmarks", params={"max_sections": max_sections, "min_sections": min_sections}, ) return payload def ask_question(video_id: str, query: str, limit: int = 4) -> Dict[str, Any]: return _request( "POST", f"/videos/{video_id}/qa", json_payload={"query": query, "limit": limit}, ) def fetch_summary(video_id: str, max_words: int = 150) -> Dict[str, Any]: return _request("GET", f"/videos/{video_id}/summary", params={"max_words": max_words}) def fetch_quiz(video_id: str, num_questions: int = 3, style: str = "mixed") -> Iterable[Dict[str, Any]]: return _request( "GET", f"/videos/{video_id}/quiz", params={"num_questions": num_questions, "style": style}, ) def semantic_search(query: str, video_id: str, limit: int = 4) -> Iterable[Dict[str, Any]]: return _request( "POST", "/search", json_payload={"query": query, "video_id": video_id, "limit": limit}, ) def seconds_to_timestamp(total_seconds: int) -> str: minutes, seconds = divmod(max(total_seconds, 0), 60) return f"{minutes:02d}:{seconds:02d}" def youtube_timestamp_link(video_id: str, timestamp_seconds: Optional[int]) -> str: seconds = int(timestamp_seconds or 0) return f"https://www.youtube.com/watch?v={video_id}&t={seconds}s" def render_error(message: str) -> None: st.error(message) def render_success(message: str) -> None: st.success(message) def _build_remote_image_url(filename: str) -> str: sanitized = filename.lstrip("/") return f"{HF_IMG_BASE_URL.rstrip('/')}/{sanitized}" def get_remote_image_url(filename: str) -> str: """Public helper to retrieve the remote URL that serves the given asset.""" return _build_remote_image_url(filename) def resolve_image_source(filename: str) -> str: """Return a local path if available, otherwise fall back to the remote asset URL.""" path = _IMG_DIR / filename if path.exists(): return str(path) return _build_remote_image_url(filename) def _load_remote_image_bytes(filename: str) -> Optional[bytes]: url = _build_remote_image_url(filename) try: resp = requests.get(url, timeout=IMAGE_REQUEST_TIMEOUT) if resp.status_code == 200: return resp.content except requests.RequestException: return None return None @lru_cache(maxsize=128) def get_image_base64(filename: str) -> Optional[str]: """Return a base64-encoded representation of an image from disk or remote storage.""" path = _IMG_DIR / filename if path.exists(): try: return base64.b64encode(path.read_bytes()).decode("utf-8") except OSError: pass data = _load_remote_image_bytes(filename) if data is None: return None return base64.b64encode(data).decode("utf-8") def _object_exists(key: str) -> bool: s3 = _get_s3_client() bucket = _get_s3_bucket_name() try: s3.head_object(Bucket=bucket, Key=key) return True except ClientError as exc: if exc.response.get("Error", {}).get("Code") in {"404", "NoSuchKey"}: return False raise RuntimeError(f"Unable to read S3 object {key}: {exc}") from exc def is_video_cached(video_id: str) -> bool: if not video_id: return False video_prefix = video_id.strip().strip("/") chunks_key = f"{video_prefix}/chunks.json" embeddings_key = f"{video_prefix}/embeddings.npy" return _object_exists(chunks_key) and _object_exists(embeddings_key) def _load_video_metadata(video_id: str) -> Dict[str, Any]: s3 = _get_s3_client() bucket = _get_s3_bucket_name() artifacts_key = f"{video_id}/artifacts.json" try: obj = s3.get_object(Bucket=bucket, Key=artifacts_key) data = json.loads(obj["Body"].read()) return { "video_id": video_id, "title": data.get("title") or video_id, "channel_name": data.get("channel") or data.get("channel_name", ""), "thumbnail_url": data.get("thumbnail_url"), "url": data.get("url") or f"https://www.youtube.com/watch?v={video_id}", } except ClientError: return { "video_id": video_id, "title": video_id, "channel_name": "", "thumbnail_url": None, "url": f"https://www.youtube.com/watch?v={video_id}", } @st.cache_data(show_spinner=False, ttl=300) def list_cached_videos() -> list[Dict[str, Any]]: s3 = _get_s3_client() bucket = _get_s3_bucket_name() paginator = s3.get_paginator("list_objects_v2") items: list[Dict[str, Any]] = [] for page in paginator.paginate(Bucket=bucket, Delimiter="/"): for prefix in page.get("CommonPrefixes", []): video_id = prefix.get("Prefix", "").rstrip("/") if not video_id or video_id == "lambda": continue if is_video_cached(video_id): metadata = _load_video_metadata(video_id) items.append(metadata) return sorted(items, key=lambda item: item.get("title", "")) def extract_youtube_video_id(url: str) -> Optional[str]: """Best-effort extraction of the YouTube video ID from a URL.""" if not url: return None url = url.strip() if "v=" in url: return url.split("v=")[1].split("&")[0] if "youtu.be" in url: return url.rsplit("/", 1)[-1].split("?")[0] return None def fetch_youtube_metadata_v3(video_url: str) -> Optional[Dict[str, Any]]: """Fetch video metadata using the YouTube Data API v3.""" video_id = extract_youtube_video_id(video_url) if not video_id: return None api_key = _get_secret_or_env(_YOUTUBE_API_KEY_ENV) if not api_key: raise RuntimeError("YOUTUBE_V3_DATA_API_KEY is not configured in this environment.") params = { "part": "snippet", "id": video_id, "key": api_key, } try: response = requests.get(YOUTUBE_API_URL, params=params, timeout=DEFAULT_TIMEOUT) response.raise_for_status() payload = response.json() except requests.RequestException as exc: raise RuntimeError(f"Unable to contact YouTube Data API: {exc}") from exc items = payload.get("items") or [] if not items: return None snippet = items[0].get("snippet", {}) return { "title": snippet.get("title"), "author_name": snippet.get("channelTitle"), "thumbnail_url": (snippet.get("thumbnails", {}).get("high", {}) or snippet.get("thumbnails", {}).get("default", {})).get("url"), "video_id": video_id, }