Chaptive / src /utils /helpers.py
Jing997's picture
fix request metadata issue
f8ea563
"""Helper utilities for the Streamlit front-end."""
from __future__ import annotations
import base64
import hashlib
import hmac
import json
import os
import time
from dataclasses import dataclass
from functools import lru_cache
from pathlib import Path
from typing import Any, Dict, Iterable, Optional
import requests
import streamlit as st
from botocore.exceptions import ClientError
import boto3
from utils.settings import HF_IMG_BASE_URL
DEFAULT_TIMEOUT = 30
_URL_ENV_KEY = "CHAPTLY_API_URL"
_API_KEY_ENV_KEY = "CHAPTLY_API_KEY"
_HMAC_SECRET_ENV_KEY = "CHAPTLY_API_SECRET"
_S3_BUCKET_ENV_KEY = "CHAPTLY_S3_BUCKET"
_AWS_REGION_ENV_KEY = "AWS_REGION"
_YOUTUBE_API_KEY_ENV = "YOUTUBE_V3_DATA_API_KEY"
_SIG_HEADER = "X-Signature"
_TS_HEADER = "X-Timestamp"
DEFAULT_S3_BUCKET = "chaptly-rag"
DEFAULT_AWS_REGION = "ap-southeast-1"
IMAGE_REQUEST_TIMEOUT = 10
_IMG_DIR = Path(__file__).resolve().parents[2] / "img"
YOUTUBE_API_URL = "https://www.googleapis.com/youtube/v3/videos"
@dataclass(frozen=True)
class APIConfig:
base_url: str
api_key: str
hmac_secret: str
def _get_secret_or_env(key: str, default: str = "") -> str:
return os.environ.get(key, default).strip()
@st.cache_resource(show_spinner=False)
def _get_s3_client():
region = _get_secret_or_env(_AWS_REGION_ENV_KEY, DEFAULT_AWS_REGION) or None
return boto3.client("s3", region_name=region)
def _get_s3_bucket_name() -> str:
return _get_secret_or_env(_S3_BUCKET_ENV_KEY, DEFAULT_S3_BUCKET) or DEFAULT_S3_BUCKET
@st.cache_data(show_spinner=False)
def get_api_config() -> APIConfig:
"""Return the API URL/key pair, cached for the session."""
base = _get_secret_or_env(_URL_ENV_KEY)
api_key = _get_secret_or_env(_API_KEY_ENV_KEY)
secret = _get_secret_or_env(_HMAC_SECRET_ENV_KEY)
return APIConfig(base_url=base.rstrip("/"), api_key=api_key, hmac_secret=secret)
def _request(
method: str,
path: str,
*,
params: Optional[Dict[str, Any]] = None,
json_payload: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
config = get_api_config()
if not config.base_url:
raise RuntimeError("CHAPTLY_API_URL is not configured. Set it in Streamlit secrets or environment.")
if not config.api_key:
raise RuntimeError("CHAPTLY_API_KEY is not configured. Set it in Streamlit secrets or environment.")
if not config.hmac_secret:
raise RuntimeError("CHAPTLY_API_SECRET is not configured. Set it in Streamlit secrets or environment.")
url = f"{config.base_url}{path}"
body_bytes = _serialize_json(json_payload) if json_payload is not None else b""
if json_payload is not None:
headers = {"Content-Type": "application/json"}
else:
headers = {}
headers["X-API-Key"] = config.api_key
_timestamp, _signature = _sign_request(method, path, body_bytes, config.hmac_secret)
headers[_TS_HEADER] = _timestamp
headers[_SIG_HEADER] = _signature
try:
resp = requests.request(
method,
url,
headers=headers,
params=params,
data=body_bytes if body_bytes else None,
timeout=DEFAULT_TIMEOUT,
)
except requests.RequestException as exc:
raise RuntimeError(f"Network error contacting API: {exc}") from exc
if resp.status_code >= 400:
error_detail: Any
try:
error_detail = resp.json()
except ValueError:
error_detail = resp.text
message = _extract_error_message(error_detail)
raise RuntimeError(f"API error ({resp.status_code}): {message}")
try:
return resp.json()
except ValueError as exc:
raise RuntimeError("API returned invalid JSON") from exc
def _serialize_json(payload: Dict[str, Any]) -> bytes:
return json.dumps(payload, ensure_ascii=False, separators=(",", ":"), sort_keys=True).encode("utf-8")
def _sign_request(method: str, path: str, body: bytes, secret: str) -> tuple[str, str]:
timestamp = str(int(time.time()))
body_hash = hashlib.sha256(body).hexdigest()
canonical = "\n".join([
method.upper(),
path,
body_hash,
timestamp,
])
signature = hmac.new(secret.encode("utf-8"), canonical.encode("utf-8"), hashlib.sha256).hexdigest()
return timestamp, signature
def _extract_error_message(error_detail: Any) -> str:
if isinstance(error_detail, dict):
if "detail" in error_detail and isinstance(error_detail["detail"], str):
return error_detail["detail"]
if "message" in error_detail and isinstance(error_detail["message"], str):
return error_detail["message"]
return str(error_detail)
def ping_health() -> Dict[str, Any]:
return _request("GET", "/health")
def start_ingestion(video_url: str) -> Dict[str, Any]:
return _request("POST", "/videos/process", params={"url": video_url})
def fetch_job_status(job_id: str) -> Dict[str, Any]:
return _request("GET", f"/videos/process/{job_id}")
def fetch_bookmarks(video_id: str, *, max_sections: int, min_sections: int) -> Iterable[Dict[str, Any]]:
payload = _request(
"GET",
f"/videos/{video_id}/bookmarks",
params={"max_sections": max_sections, "min_sections": min_sections},
)
return payload
def ask_question(video_id: str, query: str, limit: int = 4) -> Dict[str, Any]:
return _request(
"POST",
f"/videos/{video_id}/qa",
json_payload={"query": query, "limit": limit},
)
def fetch_summary(video_id: str, max_words: int = 150) -> Dict[str, Any]:
return _request("GET", f"/videos/{video_id}/summary", params={"max_words": max_words})
def fetch_quiz(video_id: str, num_questions: int = 3, style: str = "mixed") -> Iterable[Dict[str, Any]]:
return _request(
"GET",
f"/videos/{video_id}/quiz",
params={"num_questions": num_questions, "style": style},
)
def semantic_search(query: str, video_id: str, limit: int = 4) -> Iterable[Dict[str, Any]]:
return _request(
"POST",
"/search",
json_payload={"query": query, "video_id": video_id, "limit": limit},
)
def seconds_to_timestamp(total_seconds: int) -> str:
minutes, seconds = divmod(max(total_seconds, 0), 60)
return f"{minutes:02d}:{seconds:02d}"
def youtube_timestamp_link(video_id: str, timestamp_seconds: Optional[int]) -> str:
seconds = int(timestamp_seconds or 0)
return f"https://www.youtube.com/watch?v={video_id}&t={seconds}s"
def render_error(message: str) -> None:
st.error(message)
def render_success(message: str) -> None:
st.success(message)
def _build_remote_image_url(filename: str) -> str:
sanitized = filename.lstrip("/")
return f"{HF_IMG_BASE_URL.rstrip('/')}/{sanitized}"
def get_remote_image_url(filename: str) -> str:
"""Public helper to retrieve the remote URL that serves the given asset."""
return _build_remote_image_url(filename)
def resolve_image_source(filename: str) -> str:
"""Return a local path if available, otherwise fall back to the remote asset URL."""
path = _IMG_DIR / filename
if path.exists():
return str(path)
return _build_remote_image_url(filename)
def _load_remote_image_bytes(filename: str) -> Optional[bytes]:
url = _build_remote_image_url(filename)
try:
resp = requests.get(url, timeout=IMAGE_REQUEST_TIMEOUT)
if resp.status_code == 200:
return resp.content
except requests.RequestException:
return None
return None
@lru_cache(maxsize=128)
def get_image_base64(filename: str) -> Optional[str]:
"""Return a base64-encoded representation of an image from disk or remote storage."""
path = _IMG_DIR / filename
if path.exists():
try:
return base64.b64encode(path.read_bytes()).decode("utf-8")
except OSError:
pass
data = _load_remote_image_bytes(filename)
if data is None:
return None
return base64.b64encode(data).decode("utf-8")
def _object_exists(key: str) -> bool:
s3 = _get_s3_client()
bucket = _get_s3_bucket_name()
try:
s3.head_object(Bucket=bucket, Key=key)
return True
except ClientError as exc:
if exc.response.get("Error", {}).get("Code") in {"404", "NoSuchKey"}:
return False
raise RuntimeError(f"Unable to read S3 object {key}: {exc}") from exc
def is_video_cached(video_id: str) -> bool:
if not video_id:
return False
video_prefix = video_id.strip().strip("/")
chunks_key = f"{video_prefix}/chunks.json"
embeddings_key = f"{video_prefix}/embeddings.npy"
return _object_exists(chunks_key) and _object_exists(embeddings_key)
def _load_video_metadata(video_id: str) -> Dict[str, Any]:
s3 = _get_s3_client()
bucket = _get_s3_bucket_name()
artifacts_key = f"{video_id}/artifacts.json"
try:
obj = s3.get_object(Bucket=bucket, Key=artifacts_key)
data = json.loads(obj["Body"].read())
return {
"video_id": video_id,
"title": data.get("title") or video_id,
"channel_name": data.get("channel") or data.get("channel_name", ""),
"thumbnail_url": data.get("thumbnail_url"),
"url": data.get("url") or f"https://www.youtube.com/watch?v={video_id}",
}
except ClientError:
return {
"video_id": video_id,
"title": video_id,
"channel_name": "",
"thumbnail_url": None,
"url": f"https://www.youtube.com/watch?v={video_id}",
}
@st.cache_data(show_spinner=False, ttl=300)
def list_cached_videos() -> list[Dict[str, Any]]:
s3 = _get_s3_client()
bucket = _get_s3_bucket_name()
paginator = s3.get_paginator("list_objects_v2")
items: list[Dict[str, Any]] = []
for page in paginator.paginate(Bucket=bucket, Delimiter="/"):
for prefix in page.get("CommonPrefixes", []):
video_id = prefix.get("Prefix", "").rstrip("/")
if not video_id or video_id == "lambda":
continue
if is_video_cached(video_id):
metadata = _load_video_metadata(video_id)
items.append(metadata)
return sorted(items, key=lambda item: item.get("title", ""))
def extract_youtube_video_id(url: str) -> Optional[str]:
"""Best-effort extraction of the YouTube video ID from a URL."""
if not url:
return None
url = url.strip()
if "v=" in url:
return url.split("v=")[1].split("&")[0]
if "youtu.be" in url:
return url.rsplit("/", 1)[-1].split("?")[0]
return None
def fetch_youtube_metadata_v3(video_url: str) -> Optional[Dict[str, Any]]:
"""Fetch video metadata using the YouTube Data API v3."""
video_id = extract_youtube_video_id(video_url)
if not video_id:
return None
api_key = _get_secret_or_env(_YOUTUBE_API_KEY_ENV)
if not api_key:
raise RuntimeError("YOUTUBE_V3_DATA_API_KEY is not configured in this environment.")
params = {
"part": "snippet",
"id": video_id,
"key": api_key,
}
try:
response = requests.get(YOUTUBE_API_URL, params=params, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
payload = response.json()
except requests.RequestException as exc:
raise RuntimeError(f"Unable to contact YouTube Data API: {exc}") from exc
items = payload.get("items") or []
if not items:
return None
snippet = items[0].get("snippet", {})
return {
"title": snippet.get("title"),
"author_name": snippet.get("channelTitle"),
"thumbnail_url": (snippet.get("thumbnails", {}).get("high", {}) or snippet.get("thumbnails", {}).get("default", {})).get("url"),
"video_id": video_id,
}