import os import html import json import re import xml.etree.ElementTree as ET from urllib.parse import parse_qs, urlparse import certifi import httpx import streamlit as st import validators from langchain_community.document_loaders import UnstructuredURLLoader from langchain_core.documents import Document from langchain_core.prompts import PromptTemplate from langchain_groq import ChatGroq from requests.exceptions import RequestException from youtube_transcript_api import YouTubeTranscriptApi from yt_dlp import YoutubeDL os.environ.setdefault("SSL_CERT_FILE", certifi.where()) os.environ.setdefault("REQUESTS_CA_BUNDLE", certifi.where()) # Streamlit App st.set_page_config(page_title="SnapSummaryAI — YouTube & Web Summarizer", page_icon="🦜") st.title("🔗📝 SnapSummaryAI — YouTube & Web Summarizer") st.subheader("Summarize URL") # Sidebar with st.sidebar: groq_api_key = st.text_input("Groq API Key", value="", type="password") youtube_debug = st.checkbox("Debug YouTube Reachability", value=False) generic_url = st.text_input("URL", label_visibility="collapsed") prompt_template = """ Provide a summary of the following content in 300 words: Content: {text} """ prompt = PromptTemplate(template=prompt_template, input_variables=["text"]) LANGUAGE_PREFERENCE = ["en", "en-US", "en-orig", "en-GB"] SUBTITLE_EXT_PREFERENCE = ["json3", "srv3", "vtt", "ttml", "srv1", "srv2"] def is_youtube_url(url: str) -> bool: return "youtube.com" in url or "youtu.be" in url def extract_video_id(url: str) -> str | None: parsed_url = urlparse(url) host = (parsed_url.hostname or "").lower() if host in {"youtu.be", "www.youtu.be"}: return parsed_url.path.strip("/") or None if "youtube.com" in host: if parsed_url.path == "/watch": return parse_qs(parsed_url.query).get("v", [None])[0] if parsed_url.path.startswith("/shorts/") or parsed_url.path.startswith("/embed/"): path_parts = [part for part in parsed_url.path.split("/") if part] if len(path_parts) >= 2: return path_parts[1] return None def build_web_loader(url: str) -> UnstructuredURLLoader: return UnstructuredURLLoader( urls=[url], ssl_verify=False, headers={ "User-Agent": ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_5_1) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/116.0.0.0 Safari/537.36" ) }, ) def normalize_text(text: str) -> str: return re.sub(r"\s+", " ", html.unescape(text or "")).strip() def debug_youtube_reachability(video_id: str) -> None: try: response = httpx.get(f"https://www.youtube.com/watch?v={video_id}", timeout=10) st.info(f"YouTube reachable: {response.status_code}") except Exception as exc: st.warning(f"YouTube blocked or unreachable: {exc}") def parse_json3_subtitle(payload: str) -> str: try: data = json.loads(payload) except json.JSONDecodeError: return "" parts = [] for event in data.get("events", []): for segment in event.get("segs", []): piece = segment.get("utf8", "") if piece: parts.append(piece.replace("\n", " ")) return normalize_text(" ".join(parts)) def parse_xml_subtitle(payload: str) -> str: try: root = ET.fromstring(payload) except ET.ParseError: return "" chunks = [] for node in root.iter(): tag = node.tag.lower() if isinstance(node.tag, str) else "" if tag.endswith("text") or tag.endswith("p"): text_value = "".join(node.itertext()) if text_value: chunks.append(text_value.replace("\n", " ")) return normalize_text(" ".join(chunks)) def parse_vtt_subtitle(payload: str) -> str: lines = [] for raw_line in payload.splitlines(): line = raw_line.strip().lstrip("\ufeff") if not line or line.upper().startswith("WEBVTT"): continue if "-->" in line: continue if re.match(r"^\d+$", line): continue if line.startswith(("NOTE", "STYLE", "REGION")): continue lines.append(line) return normalize_text(" ".join(lines)) def fetch_subtitle_track(track_url: str, ext: str) -> str: response = httpx.get(track_url, timeout=15) response.raise_for_status() payload = response.text ext_lower = (ext or "").lower() if ext_lower == "json3": return parse_json3_subtitle(payload) if ext_lower in {"srv3", "ttml", "xml"}: return parse_xml_subtitle(payload) if ext_lower in {"vtt", "srv1", "srv2"}: return parse_vtt_subtitle(payload) for parser in (parse_json3_subtitle, parse_xml_subtitle, parse_vtt_subtitle): text = parser(payload) if text: return text return "" def extract_ydlp_subtitles(video_info: dict) -> str: ext_rank = {ext: idx for idx, ext in enumerate(SUBTITLE_EXT_PREFERENCE)} candidates = [] subtitle_maps = [ video_info.get("subtitles", {}) or {}, video_info.get("automatic_captions", {}) or {}, ] for subtitle_map in subtitle_maps: for lang, entries in subtitle_map.items(): language_rank = ( LANGUAGE_PREFERENCE.index(lang) if lang in LANGUAGE_PREFERENCE else len(LANGUAGE_PREFERENCE) ) if not isinstance(entries, list): continue for entry in entries: track_url = entry.get("url") ext = (entry.get("ext") or "").lower() if not track_url: continue candidates.append( ( language_rank, ext_rank.get(ext, len(SUBTITLE_EXT_PREFERENCE)), track_url, ext, ) ) candidates.sort(key=lambda item: (item[0], item[1])) for _, _, track_url, ext in candidates: try: transcript_text = fetch_subtitle_track(track_url, ext) if transcript_text: return transcript_text except Exception: continue return "" def fetch_transcript_with_proxy(video_id: str) -> str: proxy_url = os.getenv("YOUTUBE_PROXY_URL", "").strip() proxy_username = os.getenv("WEBSHARE_PROXY_USERNAME", "").strip() proxy_password = os.getenv("WEBSHARE_PROXY_PASSWORD", "").strip() if proxy_username and proxy_password: try: from youtube_transcript_api.proxies import WebshareProxyConfig transcript_api = YouTubeTranscriptApi( proxies=WebshareProxyConfig( proxy_username=proxy_username, proxy_password=proxy_password, ) ) transcript_data = transcript_api.fetch(video_id) transcript_text = normalize_text( " ".join( item.text if hasattr(item, "text") else item.get("text", "") for item in transcript_data ) ) if transcript_text: return transcript_text except Exception: pass proxy_kwargs = {} if proxy_url: proxy_kwargs["proxies"] = {"https": proxy_url, "http": proxy_url} attempts = [ {"languages": ["en", "en-US"], **proxy_kwargs}, {"languages": ["en"], **proxy_kwargs}, {"languages": ["en", "en-US"]}, ] for kwargs in attempts: call_kwargs = dict(kwargs) try: transcript = YouTubeTranscriptApi.get_transcript(video_id, **call_kwargs) except TypeError: call_kwargs.pop("proxies", None) try: transcript = YouTubeTranscriptApi.get_transcript(video_id, **call_kwargs) except Exception: continue except Exception: continue transcript_text = normalize_text(" ".join(item.get("text", "") for item in transcript)) if transcript_text: return transcript_text return "" def load_youtube_docs(url: str, debug_reachability: bool = False) -> list[Document]: video_id = extract_video_id(url) if not video_id: raise ValueError("Could not parse YouTube video id from URL.") if debug_reachability: debug_youtube_reachability(video_id) try: ydl_options = { "quiet": True, "skip_download": True, "noplaylist": True, "socket_timeout": 20, "writesubtitles": True, "writeautomaticsub": True, "subtitleslangs": ["en", "en-US"], "subtitlesformat": "json3", } with YoutubeDL(ydl_options) as ydl: video_info = ydl.extract_info(url, download=False) except Exception as exc: transcript_text = fetch_transcript_with_proxy(video_id) if transcript_text: return [ Document( page_content=transcript_text, metadata={"source": url, "kind": "transcript_proxy"}, ) ] raise RuntimeError(f"YouTube fetch failed: {exc}") from exc transcript_text = extract_ydlp_subtitles(video_info) if transcript_text: return [ Document( page_content=transcript_text, metadata={"source": url, "kind": "transcript"}, ) ] transcript_text = fetch_transcript_with_proxy(video_id) if transcript_text: return [ Document( page_content=transcript_text, metadata={"source": url, "kind": "transcript_proxy"}, ) ] st.warning("No subtitles found. Falling back to video metadata.") title = (video_info.get("title") or "").strip() description = (video_info.get("description") or "").strip() fallback_text = f"Title: {title}\n\nDescription:\n{description}".strip() if fallback_text.replace("Title:", "").replace("Description:", "").strip(): return [ Document( page_content=fallback_text, metadata={"source": url, "kind": "metadata"}, ) ] raise RuntimeError("No text could be extracted from this YouTube URL.") if st.button("Summarize the Content from YT or Website"): # Validate inputs if not groq_api_key.strip() or not generic_url.strip(): st.error("Please provide the API key and URL to get started.") elif not validators.url(generic_url): st.error("Please enter a valid URL. It may be a YouTube video URL or website URL.") else: llm = ChatGroq( model="openai/gpt-oss-120b", groq_api_key=groq_api_key ) with st.spinner("Waiting..."): # Load data is_youtube = is_youtube_url(generic_url) try: if is_youtube: docs = load_youtube_docs(generic_url, debug_reachability=youtube_debug) else: docs = build_web_loader(generic_url).load() except RequestException: st.error("Network error while loading the URL. Please try again.") st.stop() except Exception as exc: if is_youtube: st.error( "Could not extract YouTube transcript or metadata in this environment. " f"Details: {exc}" ) else: st.error(f"Could not load content from this URL. Details: {exc}") st.stop() if not docs: st.error("No content could be extracted from this URL.") st.stop() combined_text = "\n\n".join(doc.page_content for doc in docs if doc.page_content).strip() if not combined_text: st.error("Extracted content is empty after processing.") st.stop() final_prompt = prompt.format(text=combined_text) try: response = llm.invoke(final_prompt) output_summary = response.content if hasattr(response, "content") else str(response) st.write(output_summary) except Exception as exc: st.error(f"Failed to summarize content. Details: {exc}")