|
|
"""Helper utilities for the Streamlit front-end.""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import base64 |
|
|
import hashlib |
|
|
import hmac |
|
|
import json |
|
|
import os |
|
|
import time |
|
|
from dataclasses import dataclass |
|
|
from functools import lru_cache |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, Iterable, Optional |
|
|
|
|
|
import requests |
|
|
import streamlit as st |
|
|
from botocore.exceptions import ClientError |
|
|
import boto3 |
|
|
|
|
|
from utils.settings import HF_IMG_BASE_URL |
|
|
|
|
|
DEFAULT_TIMEOUT = 30 |
|
|
_URL_ENV_KEY = "CHAPTLY_API_URL" |
|
|
_API_KEY_ENV_KEY = "CHAPTLY_API_KEY" |
|
|
_HMAC_SECRET_ENV_KEY = "CHAPTLY_API_SECRET" |
|
|
_S3_BUCKET_ENV_KEY = "CHAPTLY_S3_BUCKET" |
|
|
_AWS_REGION_ENV_KEY = "AWS_REGION" |
|
|
_YOUTUBE_API_KEY_ENV = "YOUTUBE_V3_DATA_API_KEY" |
|
|
_SIG_HEADER = "X-Signature" |
|
|
_TS_HEADER = "X-Timestamp" |
|
|
|
|
|
DEFAULT_S3_BUCKET = "chaptly-rag" |
|
|
DEFAULT_AWS_REGION = "ap-southeast-1" |
|
|
IMAGE_REQUEST_TIMEOUT = 10 |
|
|
_IMG_DIR = Path(__file__).resolve().parents[2] / "img" |
|
|
YOUTUBE_API_URL = "https://www.googleapis.com/youtube/v3/videos" |
|
|
|
|
|
|
|
|
@dataclass(frozen=True) |
|
|
class APIConfig: |
|
|
base_url: str |
|
|
api_key: str |
|
|
hmac_secret: str |
|
|
|
|
|
|
|
|
def _get_secret_or_env(key: str, default: str = "") -> str: |
|
|
return os.environ.get(key, default).strip() |
|
|
|
|
|
|
|
|
@st.cache_resource(show_spinner=False) |
|
|
def _get_s3_client(): |
|
|
region = _get_secret_or_env(_AWS_REGION_ENV_KEY, DEFAULT_AWS_REGION) or None |
|
|
return boto3.client("s3", region_name=region) |
|
|
|
|
|
|
|
|
def _get_s3_bucket_name() -> str: |
|
|
return _get_secret_or_env(_S3_BUCKET_ENV_KEY, DEFAULT_S3_BUCKET) or DEFAULT_S3_BUCKET |
|
|
|
|
|
|
|
|
@st.cache_data(show_spinner=False) |
|
|
def get_api_config() -> APIConfig: |
|
|
"""Return the API URL/key pair, cached for the session.""" |
|
|
base = _get_secret_or_env(_URL_ENV_KEY) |
|
|
api_key = _get_secret_or_env(_API_KEY_ENV_KEY) |
|
|
secret = _get_secret_or_env(_HMAC_SECRET_ENV_KEY) |
|
|
return APIConfig(base_url=base.rstrip("/"), api_key=api_key, hmac_secret=secret) |
|
|
|
|
|
|
|
|
def _request( |
|
|
method: str, |
|
|
path: str, |
|
|
*, |
|
|
params: Optional[Dict[str, Any]] = None, |
|
|
json_payload: Optional[Dict[str, Any]] = None, |
|
|
) -> Dict[str, Any]: |
|
|
config = get_api_config() |
|
|
if not config.base_url: |
|
|
raise RuntimeError("CHAPTLY_API_URL is not configured. Set it in Streamlit secrets or environment.") |
|
|
if not config.api_key: |
|
|
raise RuntimeError("CHAPTLY_API_KEY is not configured. Set it in Streamlit secrets or environment.") |
|
|
if not config.hmac_secret: |
|
|
raise RuntimeError("CHAPTLY_API_SECRET is not configured. Set it in Streamlit secrets or environment.") |
|
|
|
|
|
url = f"{config.base_url}{path}" |
|
|
body_bytes = _serialize_json(json_payload) if json_payload is not None else b"" |
|
|
if json_payload is not None: |
|
|
headers = {"Content-Type": "application/json"} |
|
|
else: |
|
|
headers = {} |
|
|
headers["X-API-Key"] = config.api_key |
|
|
_timestamp, _signature = _sign_request(method, path, body_bytes, config.hmac_secret) |
|
|
headers[_TS_HEADER] = _timestamp |
|
|
headers[_SIG_HEADER] = _signature |
|
|
|
|
|
try: |
|
|
resp = requests.request( |
|
|
method, |
|
|
url, |
|
|
headers=headers, |
|
|
params=params, |
|
|
data=body_bytes if body_bytes else None, |
|
|
timeout=DEFAULT_TIMEOUT, |
|
|
) |
|
|
except requests.RequestException as exc: |
|
|
raise RuntimeError(f"Network error contacting API: {exc}") from exc |
|
|
|
|
|
if resp.status_code >= 400: |
|
|
error_detail: Any |
|
|
try: |
|
|
error_detail = resp.json() |
|
|
except ValueError: |
|
|
error_detail = resp.text |
|
|
message = _extract_error_message(error_detail) |
|
|
raise RuntimeError(f"API error ({resp.status_code}): {message}") |
|
|
|
|
|
try: |
|
|
return resp.json() |
|
|
except ValueError as exc: |
|
|
raise RuntimeError("API returned invalid JSON") from exc |
|
|
|
|
|
|
|
|
def _serialize_json(payload: Dict[str, Any]) -> bytes: |
|
|
return json.dumps(payload, ensure_ascii=False, separators=(",", ":"), sort_keys=True).encode("utf-8") |
|
|
|
|
|
|
|
|
def _sign_request(method: str, path: str, body: bytes, secret: str) -> tuple[str, str]: |
|
|
timestamp = str(int(time.time())) |
|
|
body_hash = hashlib.sha256(body).hexdigest() |
|
|
canonical = "\n".join([ |
|
|
method.upper(), |
|
|
path, |
|
|
body_hash, |
|
|
timestamp, |
|
|
]) |
|
|
signature = hmac.new(secret.encode("utf-8"), canonical.encode("utf-8"), hashlib.sha256).hexdigest() |
|
|
return timestamp, signature |
|
|
|
|
|
|
|
|
def _extract_error_message(error_detail: Any) -> str: |
|
|
if isinstance(error_detail, dict): |
|
|
if "detail" in error_detail and isinstance(error_detail["detail"], str): |
|
|
return error_detail["detail"] |
|
|
if "message" in error_detail and isinstance(error_detail["message"], str): |
|
|
return error_detail["message"] |
|
|
return str(error_detail) |
|
|
|
|
|
|
|
|
def ping_health() -> Dict[str, Any]: |
|
|
return _request("GET", "/health") |
|
|
|
|
|
|
|
|
def start_ingestion(video_url: str) -> Dict[str, Any]: |
|
|
return _request("POST", "/videos/process", params={"url": video_url}) |
|
|
|
|
|
|
|
|
def fetch_job_status(job_id: str) -> Dict[str, Any]: |
|
|
return _request("GET", f"/videos/process/{job_id}") |
|
|
|
|
|
|
|
|
def fetch_bookmarks(video_id: str, *, max_sections: int, min_sections: int) -> Iterable[Dict[str, Any]]: |
|
|
payload = _request( |
|
|
"GET", |
|
|
f"/videos/{video_id}/bookmarks", |
|
|
params={"max_sections": max_sections, "min_sections": min_sections}, |
|
|
) |
|
|
return payload |
|
|
|
|
|
|
|
|
def ask_question(video_id: str, query: str, limit: int = 4) -> Dict[str, Any]: |
|
|
return _request( |
|
|
"POST", |
|
|
f"/videos/{video_id}/qa", |
|
|
json_payload={"query": query, "limit": limit}, |
|
|
) |
|
|
|
|
|
|
|
|
def fetch_summary(video_id: str, max_words: int = 150) -> Dict[str, Any]: |
|
|
return _request("GET", f"/videos/{video_id}/summary", params={"max_words": max_words}) |
|
|
|
|
|
|
|
|
def fetch_quiz(video_id: str, num_questions: int = 3, style: str = "mixed") -> Iterable[Dict[str, Any]]: |
|
|
return _request( |
|
|
"GET", |
|
|
f"/videos/{video_id}/quiz", |
|
|
params={"num_questions": num_questions, "style": style}, |
|
|
) |
|
|
|
|
|
|
|
|
def semantic_search(query: str, video_id: str, limit: int = 4) -> Iterable[Dict[str, Any]]: |
|
|
return _request( |
|
|
"POST", |
|
|
"/search", |
|
|
json_payload={"query": query, "video_id": video_id, "limit": limit}, |
|
|
) |
|
|
|
|
|
|
|
|
def seconds_to_timestamp(total_seconds: int) -> str: |
|
|
minutes, seconds = divmod(max(total_seconds, 0), 60) |
|
|
return f"{minutes:02d}:{seconds:02d}" |
|
|
|
|
|
|
|
|
def youtube_timestamp_link(video_id: str, timestamp_seconds: Optional[int]) -> str: |
|
|
seconds = int(timestamp_seconds or 0) |
|
|
return f"https://www.youtube.com/watch?v={video_id}&t={seconds}s" |
|
|
|
|
|
|
|
|
def render_error(message: str) -> None: |
|
|
st.error(message) |
|
|
|
|
|
|
|
|
def render_success(message: str) -> None: |
|
|
st.success(message) |
|
|
|
|
|
|
|
|
def _build_remote_image_url(filename: str) -> str: |
|
|
sanitized = filename.lstrip("/") |
|
|
return f"{HF_IMG_BASE_URL.rstrip('/')}/{sanitized}" |
|
|
|
|
|
|
|
|
def get_remote_image_url(filename: str) -> str: |
|
|
"""Public helper to retrieve the remote URL that serves the given asset.""" |
|
|
return _build_remote_image_url(filename) |
|
|
|
|
|
|
|
|
def resolve_image_source(filename: str) -> str: |
|
|
"""Return a local path if available, otherwise fall back to the remote asset URL.""" |
|
|
path = _IMG_DIR / filename |
|
|
if path.exists(): |
|
|
return str(path) |
|
|
return _build_remote_image_url(filename) |
|
|
|
|
|
|
|
|
def _load_remote_image_bytes(filename: str) -> Optional[bytes]: |
|
|
url = _build_remote_image_url(filename) |
|
|
try: |
|
|
resp = requests.get(url, timeout=IMAGE_REQUEST_TIMEOUT) |
|
|
if resp.status_code == 200: |
|
|
return resp.content |
|
|
except requests.RequestException: |
|
|
return None |
|
|
return None |
|
|
|
|
|
|
|
|
@lru_cache(maxsize=128) |
|
|
def get_image_base64(filename: str) -> Optional[str]: |
|
|
"""Return a base64-encoded representation of an image from disk or remote storage.""" |
|
|
path = _IMG_DIR / filename |
|
|
if path.exists(): |
|
|
try: |
|
|
return base64.b64encode(path.read_bytes()).decode("utf-8") |
|
|
except OSError: |
|
|
pass |
|
|
data = _load_remote_image_bytes(filename) |
|
|
if data is None: |
|
|
return None |
|
|
return base64.b64encode(data).decode("utf-8") |
|
|
|
|
|
|
|
|
def _object_exists(key: str) -> bool: |
|
|
s3 = _get_s3_client() |
|
|
bucket = _get_s3_bucket_name() |
|
|
try: |
|
|
s3.head_object(Bucket=bucket, Key=key) |
|
|
return True |
|
|
except ClientError as exc: |
|
|
if exc.response.get("Error", {}).get("Code") in {"404", "NoSuchKey"}: |
|
|
return False |
|
|
raise RuntimeError(f"Unable to read S3 object {key}: {exc}") from exc |
|
|
|
|
|
|
|
|
def is_video_cached(video_id: str) -> bool: |
|
|
if not video_id: |
|
|
return False |
|
|
video_prefix = video_id.strip().strip("/") |
|
|
chunks_key = f"{video_prefix}/chunks.json" |
|
|
embeddings_key = f"{video_prefix}/embeddings.npy" |
|
|
return _object_exists(chunks_key) and _object_exists(embeddings_key) |
|
|
|
|
|
|
|
|
def _load_video_metadata(video_id: str) -> Dict[str, Any]: |
|
|
s3 = _get_s3_client() |
|
|
bucket = _get_s3_bucket_name() |
|
|
artifacts_key = f"{video_id}/artifacts.json" |
|
|
try: |
|
|
obj = s3.get_object(Bucket=bucket, Key=artifacts_key) |
|
|
data = json.loads(obj["Body"].read()) |
|
|
return { |
|
|
"video_id": video_id, |
|
|
"title": data.get("title") or video_id, |
|
|
"channel_name": data.get("channel") or data.get("channel_name", ""), |
|
|
"thumbnail_url": data.get("thumbnail_url"), |
|
|
"url": data.get("url") or f"https://www.youtube.com/watch?v={video_id}", |
|
|
} |
|
|
except ClientError: |
|
|
return { |
|
|
"video_id": video_id, |
|
|
"title": video_id, |
|
|
"channel_name": "", |
|
|
"thumbnail_url": None, |
|
|
"url": f"https://www.youtube.com/watch?v={video_id}", |
|
|
} |
|
|
|
|
|
|
|
|
@st.cache_data(show_spinner=False, ttl=300) |
|
|
def list_cached_videos() -> list[Dict[str, Any]]: |
|
|
s3 = _get_s3_client() |
|
|
bucket = _get_s3_bucket_name() |
|
|
paginator = s3.get_paginator("list_objects_v2") |
|
|
items: list[Dict[str, Any]] = [] |
|
|
for page in paginator.paginate(Bucket=bucket, Delimiter="/"): |
|
|
for prefix in page.get("CommonPrefixes", []): |
|
|
video_id = prefix.get("Prefix", "").rstrip("/") |
|
|
if not video_id or video_id == "lambda": |
|
|
continue |
|
|
if is_video_cached(video_id): |
|
|
metadata = _load_video_metadata(video_id) |
|
|
items.append(metadata) |
|
|
return sorted(items, key=lambda item: item.get("title", "")) |
|
|
|
|
|
|
|
|
def extract_youtube_video_id(url: str) -> Optional[str]: |
|
|
"""Best-effort extraction of the YouTube video ID from a URL.""" |
|
|
if not url: |
|
|
return None |
|
|
url = url.strip() |
|
|
if "v=" in url: |
|
|
return url.split("v=")[1].split("&")[0] |
|
|
if "youtu.be" in url: |
|
|
return url.rsplit("/", 1)[-1].split("?")[0] |
|
|
return None |
|
|
|
|
|
|
|
|
def fetch_youtube_metadata_v3(video_url: str) -> Optional[Dict[str, Any]]: |
|
|
"""Fetch video metadata using the YouTube Data API v3.""" |
|
|
video_id = extract_youtube_video_id(video_url) |
|
|
if not video_id: |
|
|
return None |
|
|
api_key = _get_secret_or_env(_YOUTUBE_API_KEY_ENV) |
|
|
if not api_key: |
|
|
raise RuntimeError("YOUTUBE_V3_DATA_API_KEY is not configured in this environment.") |
|
|
params = { |
|
|
"part": "snippet", |
|
|
"id": video_id, |
|
|
"key": api_key, |
|
|
} |
|
|
try: |
|
|
response = requests.get(YOUTUBE_API_URL, params=params, timeout=DEFAULT_TIMEOUT) |
|
|
response.raise_for_status() |
|
|
payload = response.json() |
|
|
except requests.RequestException as exc: |
|
|
raise RuntimeError(f"Unable to contact YouTube Data API: {exc}") from exc |
|
|
items = payload.get("items") or [] |
|
|
if not items: |
|
|
return None |
|
|
snippet = items[0].get("snippet", {}) |
|
|
return { |
|
|
"title": snippet.get("title"), |
|
|
"author_name": snippet.get("channelTitle"), |
|
|
"thumbnail_url": (snippet.get("thumbnails", {}).get("high", {}) or snippet.get("thumbnails", {}).get("default", {})).get("url"), |
|
|
"video_id": video_id, |
|
|
} |
|
|
|