import os
import tempfile
from io import BytesIO
from urllib.parse import quote_plus, urlparse
from xml.etree import ElementTree as ET
from zipfile import ZipFile
import requests
import streamlit as st
import validators
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from langchain_classic.chains.summarize import load_summarize_chain
from langchain_community.document_loaders import UnstructuredURLLoader, YoutubeLoader
from langchain_core.documents import Document
from langchain_core.prompts import PromptTemplate
from langchain_groq import ChatGroq
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pypdf import PdfReader
from requests.adapters import HTTPAdapter
from requests import RequestException
from requests.exceptions import SSLError
from urllib3.util.retry import Retry
from youtube_transcript_api import YouTubeTranscriptApi
load_dotenv()
APP_VERSION = "2026-04-23-hf-youtube-fallbacks-1"
SAMPLE_YOUTUBE_URL = "https://youtu.be/ocBh08fjIfU"
LANGUAGE_OPTIONS = ["Original", "English", "Arabic", "French", "Bahasa Malay"]
LANGUAGE_CODE_MAP = {
"English": "en",
"Arabic": "ar",
"French": "fr",
"Bahasa Malay": "ms",
}
LANGUAGE_LABEL_MAP = {
"English": "English",
"Arabic": "Arabic",
"French": "French",
"Bahasa Malay": "Bahasa Melayu",
}
YOUTUBE_PROXY_ENV_VARS = (
"YOUTUBE_HTTP_PROXY",
"YOUTUBE_HTTPS_PROXY",
"HTTP_PROXY",
"HTTPS_PROXY",
)
YOUTUBE_AUDIO_EXTENSIONS = (".m4a", ".mp3", ".mp4", ".mpeg", ".mpga", ".ogg", ".wav", ".webm")
def _is_youtube_url(url: str) -> bool:
host = urlparse(url).netloc.lower()
return "youtube.com" in host or "youtu.be" in host
st.set_page_config(page_title="Summarize Text From PDF, YouTube, Website", page_icon="📝")
st.title("📝 Summarize Text From PDF, YouTube, Website")
# st.subheader("Summarize URL")
st.markdown(
"""
""",
unsafe_allow_html=True,
)
groq_api_key = os.getenv("GROQ_API_KEY", "")
if "url_input" not in st.session_state:
st.session_state.url_input = ""
if "summary_word_limit" not in st.session_state:
st.session_state.summary_word_limit = 400
if "youtube_transcript_text" not in st.session_state:
st.session_state.youtube_transcript_text = ""
if "youtube_transcript_name" not in st.session_state:
st.session_state.youtube_transcript_name = "youtube_transcript.txt"
if "youtube_transcript_source_url" not in st.session_state:
st.session_state.youtube_transcript_source_url = ""
if "youtube_transcript_language_label" not in st.session_state:
st.session_state.youtube_transcript_language_label = "Original"
if "youtube_transcript_source_mode" not in st.session_state:
st.session_state.youtube_transcript_source_mode = ""
summary_language = "Original"
transcript_language = "Original"
with st.sidebar:
st.header("Options")
st.caption(f"App version: `{APP_VERSION}`")
input_source_mode = st.radio(
"Content source",
options=["URL", "Upload documents", "Both"],
index=0,
help="Choose which source the app should use for summarization.",
)
summary_word_limit = st.slider(
"Summary word limit",
min_value=100,
max_value=1500,
step=50,
key="summary_word_limit",
help="Increase or decrease the target length of the summary.",
)
# summary_language = st.selectbox(
# "Summary language",
# options=LANGUAGE_OPTIONS,
# index=0,
# help="Choose the language for the generated summary. `Original` keeps the source language when possible.",
# )
# transcript_language = st.selectbox(
# "Transcript language",
# options=LANGUAGE_OPTIONS,
# index=0,
# help="Choose the language used for YouTube transcript fetching/export. `Original` keeps the available source transcript language.",
# )
selected_chain_type = st.radio(
"Summarization method",
options=["auto", "stuff", "map_reduce", "refine"],
index=0,
help="`auto` picks the best method based on content size and will upgrade if a simpler method is not a good fit.",
)
st.caption(
"`stuff` is fastest for short content, `map_reduce` is safer for long content, "
"and `refine` is useful when building a summary progressively across chunks."
)
if os.getenv("SPACE_ID"):
if any(os.getenv(var_name) for var_name in YOUTUBE_PROXY_ENV_VARS):
st.info("Hugging Face Space detected. YouTube proxy configuration is present.")
else:
st.warning(
"Hugging Face Space detected. YouTube transcript loading may fail without "
"a proxy because YouTube often blocks datacenter IPs."
)
st.caption(f"Sample YouTube URL: `{SAMPLE_YOUTUBE_URL}`")
if st.button("Use sample YouTube URL"):
st.session_state.url_input = SAMPLE_YOUTUBE_URL
generic_url = ""
uploaded_files = []
youtube_source_mode = "Auto"
manual_transcript_text = ""
manual_transcript_file = None
if input_source_mode in {"URL", "Both"}:
st.markdown('
Summarize URL
', unsafe_allow_html=True)
generic_url = st.text_input(
"URL",
key="url_input",
label_visibility="collapsed",
placeholder=f"Paste a YouTube or website URL, or try {SAMPLE_YOUTUBE_URL}",
help="Enter the full YouTube or website URL you want to summarize.",
)
if input_source_mode in {"Upload documents", "Both"}:
st.markdown('Upload documents
', unsafe_allow_html=True)
uploaded_files = st.file_uploader(
"Upload documents",
type=["pdf", "txt", "md", "csv", "docx"],
accept_multiple_files=True,
label_visibility="collapsed",
help="Upload one or more documents. Supported formats: PDF, TXT, MD, CSV, DOCX.",
)
if uploaded_files:
st.caption(
"Uploaded files: " + ", ".join(uploaded_file.name for uploaded_file in uploaded_files)
)
if input_source_mode in {"URL", "Both"} and generic_url.strip() and _is_youtube_url(generic_url):
st.markdown('YouTube Fallback Options
', unsafe_allow_html=True)
youtube_source_mode = st.radio(
"YouTube transcript source",
options=[
"Auto",
"Direct transcript",
"External transcript API",
"Audio transcription (yt-dlp + Groq)",
"Manual transcript",
],
index=0,
help=(
"`Auto` tries direct transcript first, then external API, then yt-dlp + Groq audio transcription. "
"`Manual transcript` lets you paste or upload transcript text."
),
)
if youtube_source_mode == "Manual transcript":
manual_transcript_text = st.text_area(
"Paste transcript",
height=220,
placeholder="Paste the YouTube transcript here if direct fetching is blocked.",
)
manual_transcript_file = st.file_uploader(
"Upload transcript file",
type=["txt", "md", "csv", "srt", "vtt"],
help="Upload a transcript file to summarize when direct YouTube access is blocked.",
)
else:
configured_modes = []
if any(os.getenv(var_name) for var_name in YOUTUBE_PROXY_ENV_VARS):
configured_modes.append("direct transcript via proxy")
if os.getenv("YOUTUBE_TRANSCRIPT_API_URL"):
configured_modes.append("external transcript API")
configured_modes.append("audio transcription via yt-dlp + Groq")
st.caption("Available fallbacks: " + ", ".join(configured_modes) + ".")
llm = ChatGroq(model="llama-3.1-8b-instant", groq_api_key=groq_api_key)
REQUEST_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36",
"Accept-Language": "en-US,en;q=0.9",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Referer": "https://www.google.com/",
}
def _summary_language_instruction(selected_language: str) -> str:
if selected_language == "Original":
return "Write the summary in the original language of the source content. If the source is mixed-language, use the dominant language."
return f"Write the summary in {LANGUAGE_LABEL_MAP[selected_language]}."
def _translation_language_instruction(selected_language: str) -> str:
if selected_language == "Original":
return "Keep the text in its original language."
return f"Translate the text into {LANGUAGE_LABEL_MAP[selected_language]}."
def _get_summary_prompts(word_limit: int, selected_language: str) -> dict[str, PromptTemplate]:
language_instruction = _summary_language_instruction(selected_language)
stuff_prompt = PromptTemplate(
template=(
f"Provide a clear summary of the following content in about {word_limit} words.\n"
"Focus on the main ideas, important details, and conclusions.\n"
f"{language_instruction}\n"
"Content:\n{text}"
),
input_variables=["text"],
)
map_prompt = PromptTemplate(
template=(
"Write a concise summary of the following section.\n"
f"{language_instruction}\n"
"Content:\n{text}"
),
input_variables=["text"],
)
combine_prompt = PromptTemplate(
template=(
f"Combine the following partial summaries into a final summary in about {word_limit} words.\n"
"Keep the result coherent, non-repetitive, and focused on the most important points.\n"
f"{language_instruction}\n"
"Partial summaries:\n{text}"
),
input_variables=["text"],
)
refine_question_prompt = PromptTemplate(
template=(
f"Provide an initial summary of the following content in about {word_limit} words.\n"
f"{language_instruction}\n"
"Content:\n{text}"
),
input_variables=["text"],
)
refine_prompt = PromptTemplate(
template=(
f"We already have an existing summary:\n{{existing_answer}}\n\n"
"Refine it using the additional content below.\n"
f"Keep the final summary close to {word_limit} words, avoid repetition, and preserve the most important details.\n"
f"{language_instruction}\n"
"Additional content:\n{text}"
),
input_variables=["existing_answer", "text"],
)
return {
"stuff": stuff_prompt,
"map": map_prompt,
"combine": combine_prompt,
"refine_question": refine_question_prompt,
"refine": refine_prompt,
}
def _extract_summary_text(result) -> str:
if isinstance(result, dict):
return result.get("output_text") or result.get("text") or str(result)
return str(result)
def _translate_documents_with_llm(docs: list[Document], target_language: str) -> list[Document]:
if target_language == "Original":
return docs
translation_prompt = PromptTemplate(
template=(
f"{_translation_language_instruction(target_language)}\n"
"Preserve the meaning faithfully. Do not summarize. Return only the translated text.\n"
"Text:\n{text}"
),
input_variables=["text"],
)
translation_chain = load_summarize_chain(
llm,
chain_type="stuff",
prompt=translation_prompt,
)
splitter = RecursiveCharacterTextSplitter(chunk_size=2500, chunk_overlap=200)
translated_docs: list[Document] = []
for doc in docs:
chunks = splitter.split_documents([doc])
translated_chunks = []
for chunk in chunks:
translated_text = _extract_summary_text(
translation_chain.invoke({"input_documents": [chunk]})
)
translated_chunks.append(translated_text.strip())
translated_docs.append(
Document(
page_content="\n\n".join(part for part in translated_chunks if part),
metadata={
**doc.metadata,
"translated_to": target_language,
},
)
)
return translated_docs
def _build_youtube_http_client() -> requests.Session:
session = requests.Session()
session.headers.update(
{
"User-Agent": (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
),
"Accept-Language": "en-US,en;q=0.9",
"Accept": "*/*",
}
)
retry_config = Retry(
total=3,
connect=3,
read=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET"],
raise_on_status=False,
)
adapter = HTTPAdapter(max_retries=retry_config)
session.mount("https://", adapter)
session.mount("http://", adapter)
if os.getenv("YOUTUBE_CA_BUNDLE"):
session.verify = os.getenv("YOUTUBE_CA_BUNDLE")
return session
def _build_youtube_transcript_api() -> YouTubeTranscriptApi:
return YouTubeTranscriptApi(http_client=_build_youtube_http_client())
def _looks_like_youtube_ssl_failure(error: Exception) -> bool:
error_text = str(error)
ssl_markers = (
"HTTPSConnectionPool",
"SSLError",
"UNEXPECTED_EOF_WHILE_READING",
"EOF occurred in violation of protocol",
"Max retries exceeded with url",
)
return isinstance(error, (SSLError, RequestException)) or any(
marker in error_text for marker in ssl_markers
)
def _format_youtube_transcript_error(error: Exception) -> str:
if _looks_like_youtube_ssl_failure(error):
proxy_hint = (
" Configure `YOUTUBE_HTTP_PROXY` / `YOUTUBE_HTTPS_PROXY` "
"or standard `HTTP_PROXY` / `HTTPS_PROXY` in the Space secrets."
if not any(os.getenv(var_name) for var_name in YOUTUBE_PROXY_ENV_VARS)
else " Check that the configured outbound proxy is reachable from the Space."
)
return (
"[HF-YT-SSL-001] The deployment could not establish a stable HTTPS connection to YouTube. "
"This is common on cloud-hosted runtimes such as Hugging Face Spaces because "
"YouTube often blocks or interrupts traffic from datacenter IPs."
f"{proxy_hint}"
)
return str(error)
def _resolve_transcript(video_id: str, selected_language: str):
api = _build_youtube_transcript_api()
try:
transcript_list = api.list(video_id)
except Exception as exc:
raise RuntimeError(_format_youtube_transcript_error(exc)) from exc
available_transcripts = list(transcript_list)
if selected_language == "Original":
if not available_transcripts:
raise ValueError("No transcript is available for this video.")
return available_transcripts[0], "Original"
if not available_transcripts:
raise ValueError("No transcript is available for this video.")
target_language_code = LANGUAGE_CODE_MAP[selected_language]
try:
return transcript_list.find_transcript([target_language_code]), selected_language
except Exception:
for base_transcript in available_transcripts:
if not base_transcript.is_translatable:
continue
try:
return base_transcript.translate(target_language_code), selected_language
except Exception:
continue
available_languages = ", ".join(
sorted(
{
f"{transcript.language} ({transcript.language_code})"
for transcript in available_transcripts
}
)
)
raise ValueError(
f"Could not provide transcript in {selected_language}. "
f"Available transcript languages: {available_languages}"
)
def _load_youtube_documents(url: str, selected_language: str) -> list[Document]:
video_id = YoutubeLoader.extract_video_id(url)
should_translate_with_llm = False
try:
transcript, transcript_language_label = _resolve_transcript(video_id, selected_language)
except ValueError:
if selected_language == "Original":
raise
transcript, transcript_language_label = _resolve_transcript(video_id, "Original")
should_translate_with_llm = True
try:
fetched_transcript = transcript.fetch()
except Exception as exc:
raise RuntimeError(_format_youtube_transcript_error(exc)) from exc
transcript_text = " ".join(snippet.text.strip() for snippet in fetched_transcript if snippet.text.strip())
if not transcript_text:
raise ValueError("No transcript text could be extracted from this video.")
docs = [
Document(
page_content=transcript_text,
metadata={
"source": url,
"video_id": video_id,
"language": fetched_transcript.language,
"language_code": fetched_transcript.language_code,
"is_generated": fetched_transcript.is_generated,
"transcript_language_label": transcript_language_label,
},
)
]
if should_translate_with_llm:
docs = _translate_documents_with_llm(docs, selected_language)
for doc in docs:
doc.metadata["transcript_language_label"] = f"{selected_language} (LLM translated)"
return docs
def _make_transcript_filename(url: str) -> str:
video_id = YoutubeLoader.extract_video_id(url)
return f"youtube_transcript_{video_id}.txt"
def _reset_youtube_transcript_state() -> None:
st.session_state.youtube_transcript_text = ""
st.session_state.youtube_transcript_name = "youtube_transcript.txt"
st.session_state.youtube_transcript_source_url = ""
st.session_state.youtube_transcript_language_label = "Original"
st.session_state.youtube_transcript_source_mode = ""
def _store_youtube_transcript(url: str, docs: list[Document]) -> None:
st.session_state.youtube_transcript_text = "\n\n".join(
doc.page_content for doc in docs if doc.page_content.strip()
)
st.session_state.youtube_transcript_name = _make_transcript_filename(url)
st.session_state.youtube_transcript_source_url = url
st.session_state.youtube_transcript_language_label = docs[0].metadata.get(
"transcript_language_label",
docs[0].metadata.get("language", "Original"),
)
st.session_state.youtube_transcript_source_mode = docs[0].metadata.get(
"transcript_source_mode",
"Direct transcript",
)
def _normalize_transcript_text(raw_text: str) -> str:
lines = [line.strip() for line in raw_text.splitlines()]
return "\n".join(line for line in lines if line)
def _read_uploaded_text_file(uploaded_file) -> str:
return uploaded_file.getvalue().decode("utf-8", errors="ignore").strip()
def _build_transcript_documents(
url: str,
transcript_text: str,
language_label: str,
source_mode: str,
) -> list[Document]:
normalized_text = _normalize_transcript_text(transcript_text)
if not normalized_text:
raise ValueError("Transcript text is empty.")
return [
Document(
page_content=normalized_text,
metadata={
"source": url,
"video_id": YoutubeLoader.extract_video_id(url),
"transcript_language_label": language_label,
"transcript_source_mode": source_mode,
},
)
]
def _load_manual_transcript_documents(
url: str,
selected_language: str,
transcript_text: str,
transcript_file,
) -> list[Document]:
combined_parts = []
if transcript_text.strip():
combined_parts.append(transcript_text.strip())
if transcript_file is not None:
combined_parts.append(_read_uploaded_text_file(transcript_file))
combined_text = "\n\n".join(part for part in combined_parts if part.strip())
if not combined_text.strip():
raise ValueError("Please paste a transcript or upload a transcript file.")
docs = _build_transcript_documents(
url,
combined_text,
"Original",
"Manual transcript",
)
if selected_language != "Original":
docs = _translate_documents_with_llm(docs, selected_language)
for doc in docs:
doc.metadata["transcript_language_label"] = f"{selected_language} (LLM translated)"
return docs
def _extract_transcript_text_from_payload(payload) -> str:
if isinstance(payload, str):
return payload.strip()
if isinstance(payload, list):
text_parts = []
for item in payload:
extracted = _extract_transcript_text_from_payload(item)
if extracted:
text_parts.append(extracted)
return "\n".join(part for part in text_parts if part)
if isinstance(payload, dict):
for key in ("text", "transcript", "content", "full_text", "body"):
value = payload.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
for key in ("data", "result", "results", "transcription", "response"):
if key in payload:
extracted = _extract_transcript_text_from_payload(payload[key])
if extracted:
return extracted
for key in ("segments", "items", "captions", "chunks", "utterances"):
value = payload.get(key)
if isinstance(value, list):
extracted = _extract_transcript_text_from_payload(value)
if extracted:
return extracted
return ""
def _load_youtube_documents_via_external_api(url: str, selected_language: str) -> list[Document]:
api_url = os.getenv("YOUTUBE_TRANSCRIPT_API_URL", "").strip()
if not api_url:
raise ValueError(
"External transcript API is not configured. Set `YOUTUBE_TRANSCRIPT_API_URL` in Space secrets."
)
video_id = YoutubeLoader.extract_video_id(url)
language_code = LANGUAGE_CODE_MAP.get(selected_language, "")
formatted_url = api_url.format(
video_id=video_id,
url=quote_plus(url),
language_code=language_code,
)
method = os.getenv("YOUTUBE_TRANSCRIPT_API_METHOD", "GET").strip().upper()
timeout_seconds = int(os.getenv("YOUTUBE_TRANSCRIPT_API_TIMEOUT", "45"))
api_key = os.getenv("YOUTUBE_TRANSCRIPT_API_KEY", "").strip()
api_key_header = os.getenv("YOUTUBE_TRANSCRIPT_API_KEY_HEADER", "Authorization").strip()
headers = {"Accept": "application/json"}
if api_key:
if api_key_header.lower() == "authorization":
headers[api_key_header] = f"Bearer {api_key}"
else:
headers[api_key_header] = api_key
payload = {
"video_id": video_id,
"url": url,
"language": language_code or None,
}
if method == "POST":
response = requests.post(formatted_url, json=payload, headers=headers, timeout=timeout_seconds)
else:
response = requests.get(formatted_url, params=payload, headers=headers, timeout=timeout_seconds)
response.raise_for_status()
try:
parsed_payload = response.json()
except ValueError:
parsed_payload = response.text
transcript_text = _extract_transcript_text_from_payload(parsed_payload)
if not transcript_text:
raise ValueError("External transcript API response did not contain usable transcript text.")
docs = _build_transcript_documents(
url,
transcript_text,
selected_language if selected_language != "Original" else "Original",
"External transcript API",
)
if selected_language != "Original":
for doc in docs:
doc.metadata["transcript_language_label"] = selected_language
return docs
def _download_youtube_audio(url: str, video_id: str) -> str:
try:
import yt_dlp
except ImportError as exc:
raise RuntimeError("`yt-dlp` is not installed in this Space build.") from exc
with tempfile.TemporaryDirectory() as temp_dir:
output_template = os.path.join(temp_dir, f"{video_id}.%(ext)s")
ydl_opts = {
"format": "bestaudio[ext=m4a]/bestaudio[ext=webm]/bestaudio/best",
"outtmpl": output_template,
"quiet": True,
"no_warnings": True,
"noprogress": True,
"skip_download": False,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.extract_info(url, download=True)
audio_files = [
os.path.join(temp_dir, file_name)
for file_name in os.listdir(temp_dir)
if os.path.splitext(file_name)[1].lower() in YOUTUBE_AUDIO_EXTENSIONS
]
if not audio_files:
raise RuntimeError("yt-dlp did not produce a supported audio file for transcription.")
source_path = max(audio_files, key=os.path.getsize)
persisted_path = os.path.join(tempfile.gettempdir(), os.path.basename(source_path))
with open(source_path, "rb") as source_file, open(persisted_path, "wb") as target_file:
target_file.write(source_file.read())
return persisted_path
def _transcribe_audio_with_groq(audio_path: str, selected_language: str) -> str:
if not groq_api_key.strip():
raise ValueError("`GROQ_API_KEY` is required for audio transcription fallback.")
model_name = os.getenv("GROQ_AUDIO_TRANSCRIPTION_MODEL", "whisper-large-v3-turbo")
payload = {
"model": model_name,
"response_format": "json",
"temperature": "0",
}
if selected_language != "Original":
payload["language"] = LANGUAGE_CODE_MAP[selected_language]
with open(audio_path, "rb") as audio_file:
response = requests.post(
"https://api.groq.com/openai/v1/audio/transcriptions",
headers={"Authorization": f"Bearer {groq_api_key}"},
data=payload,
files={"file": (os.path.basename(audio_path), audio_file)},
timeout=300,
)
response.raise_for_status()
transcript_text = response.json().get("text", "").strip()
if not transcript_text:
raise ValueError("Groq audio transcription returned empty text.")
return transcript_text
def _load_youtube_documents_via_audio_transcription(url: str, selected_language: str) -> list[Document]:
video_id = YoutubeLoader.extract_video_id(url)
audio_path = _download_youtube_audio(url, video_id)
try:
transcript_text = _transcribe_audio_with_groq(audio_path, selected_language)
finally:
if os.path.exists(audio_path):
os.remove(audio_path)
return _build_transcript_documents(
url,
transcript_text,
selected_language if selected_language != "Original" else "Original",
"Audio transcription (yt-dlp + Groq)",
)
def _load_youtube_documents_with_fallbacks(
url: str,
selected_language: str,
source_mode: str,
transcript_text: str,
transcript_file,
) -> list[Document]:
if source_mode == "Manual transcript":
return _load_manual_transcript_documents(url, selected_language, transcript_text, transcript_file)
strategies = []
if source_mode in {"Auto", "Direct transcript"}:
strategies.append(("Direct transcript", lambda: _load_youtube_documents(url, selected_language)))
if source_mode in {"Auto", "External transcript API"}:
strategies.append(
("External transcript API", lambda: _load_youtube_documents_via_external_api(url, selected_language))
)
if source_mode in {"Auto", "Audio transcription (yt-dlp + Groq)"}:
strategies.append(
(
"Audio transcription (yt-dlp + Groq)",
lambda: _load_youtube_documents_via_audio_transcription(url, selected_language),
)
)
failures = []
for strategy_name, loader in strategies:
try:
return loader()
except Exception as exc:
failures.append(f"{strategy_name}: {exc}")
if source_mode == "Auto" and (transcript_text.strip() or transcript_file is not None):
return _load_manual_transcript_documents(url, selected_language, transcript_text, transcript_file)
if not failures:
raise ValueError("No YouTube transcript strategy is available for the selected mode.")
raise RuntimeError("All YouTube transcript strategies failed.\n" + "\n".join(failures))
def _has_meaningful_content(docs: list[Document], min_chars: int = 300) -> bool:
combined_text = " ".join(doc.page_content.strip() for doc in docs if doc.page_content.strip())
return len(combined_text) >= min_chars
def _extract_text_from_html(html: str) -> str:
soup = BeautifulSoup(html, "html.parser")
for tag in soup(["script", "style", "noscript", "svg"]):
tag.decompose()
meta_description = ""
meta_tag = soup.find("meta", attrs={"name": "description"})
if meta_tag and meta_tag.get("content"):
meta_description = meta_tag["content"].strip()
main_candidates = soup.select("main, article, [role='main'], .content, .article-body")
text_parts = []
for candidate in main_candidates:
candidate_text = " ".join(candidate.stripped_strings)
if len(candidate_text) > 200:
text_parts.append(candidate_text)
if not text_parts:
body_text = " ".join(soup.stripped_strings)
if body_text:
text_parts.append(body_text)
if meta_description:
text_parts.insert(0, meta_description)
return "\n\n".join(dict.fromkeys(part for part in text_parts if part))
def _load_web_documents(url: str) -> list[Document]:
try:
loader = UnstructuredURLLoader(
urls=[url],
ssl_verify=False,
headers=REQUEST_HEADERS,
)
docs = loader.load()
if _has_meaningful_content(docs):
return docs
except Exception as loader_error:
last_error = loader_error
else:
last_error = ValueError("Primary URL loader returned too little readable content.")
session = requests.Session()
for candidate_url in [url, url.rstrip("/")]:
if not candidate_url:
continue
try:
response = session.get(
candidate_url,
headers=REQUEST_HEADERS,
timeout=20,
verify=False,
allow_redirects=True,
)
response.encoding = response.encoding or response.apparent_encoding or "utf-8"
if not response.text.strip():
continue
text = _extract_text_from_html(response.text)
if not text or len(text) < 300:
continue
soup = BeautifulSoup(response.text, "html.parser")
title = soup.title.string.strip() if soup.title and soup.title.string else candidate_url
st.info("Primary URL loader failed or returned too little content. Used HTML fallback extraction instead.")
return [
Document(
page_content=text,
metadata={
"source": candidate_url,
"title": title,
"http_status": response.status_code,
},
)
]
except RequestException as request_error:
last_error = request_error
raise ValueError(
f"Could not load readable text from the URL. Last loader error: {last_error}"
)
def _load_uploaded_documents(files) -> list[Document]:
docs: list[Document] = []
for uploaded_file in files:
file_name = uploaded_file.name
extension = os.path.splitext(file_name)[1].lower()
file_bytes = uploaded_file.getvalue()
if extension == ".pdf":
reader = PdfReader(BytesIO(file_bytes))
pages = []
for page_number, page in enumerate(reader.pages, start=1):
page_text = (page.extract_text() or "").strip()
if page_text:
pages.append(
Document(
page_content=page_text,
metadata={
"source": file_name,
"page": page_number,
"type": "uploaded_file",
},
)
)
docs.extend(pages)
continue
if extension in {".txt", ".md", ".csv"}:
text = file_bytes.decode("utf-8", errors="ignore").strip()
if text:
docs.append(
Document(
page_content=text,
metadata={"source": file_name, "type": "uploaded_file"},
)
)
continue
if extension == ".docx":
with ZipFile(BytesIO(file_bytes)) as docx_zip:
document_xml = docx_zip.read("word/document.xml")
root = ET.fromstring(document_xml)
namespace = {"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main"}
paragraphs = []
for paragraph in root.findall(".//w:p", namespace):
texts = [
node.text
for node in paragraph.findall(".//w:t", namespace)
if node.text
]
paragraph_text = "".join(texts).strip()
if paragraph_text:
paragraphs.append(paragraph_text)
text = "\n\n".join(paragraphs).strip()
if text:
docs.append(
Document(
page_content=text,
metadata={"source": file_name, "type": "uploaded_file"},
)
)
continue
raise ValueError(f"Unsupported file type: {file_name}")
return docs
def _build_chain(selected_chain_type: str):
prompts = _get_summary_prompts(summary_word_limit, summary_language)
if selected_chain_type == "stuff":
return load_summarize_chain(llm, chain_type="stuff", prompt=prompts["stuff"])
if selected_chain_type == "map_reduce":
return load_summarize_chain(
llm,
chain_type="map_reduce",
map_prompt=prompts["map"],
combine_prompt=prompts["combine"],
)
return load_summarize_chain(
llm,
chain_type="refine",
question_prompt=prompts["refine_question"],
refine_prompt=prompts["refine"],
)
def _prepare_summary_documents(docs: list[Document], selected_chain_type: str) -> list[Document]:
splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=200)
split_docs = splitter.split_documents(docs)
if selected_chain_type == "stuff":
return split_docs[:3]
if selected_chain_type == "refine":
return split_docs[:10]
return split_docs[:8]
def _choose_effective_chain_type(requested_chain_type: str, docs: list[Document]) -> tuple[str, str | None]:
splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=200)
split_docs = splitter.split_documents(docs)
chunk_count = len(split_docs)
total_chars = sum(len(doc.page_content) for doc in split_docs)
if chunk_count <= 3 and total_chars <= 6000:
recommended = "stuff"
elif chunk_count <= 10:
recommended = "refine"
else:
recommended = "map_reduce"
if requested_chain_type == "auto":
return recommended, f"Auto-selected `{recommended}` based on content size."
if requested_chain_type == "stuff" and recommended != "stuff":
return recommended, f"Switched from `stuff` to `{recommended}` because the content is too large for a reliable single-pass summary."
if requested_chain_type == "refine" and chunk_count > 12:
return "map_reduce", "Switched from `refine` to `map_reduce` because the content is large enough that map-reduce is more reliable."
return requested_chain_type, None
if input_source_mode in {"URL", "Both"} and _is_youtube_url(generic_url):
st.video(generic_url)
transcript_col, export_col = st.columns(2)
with transcript_col:
if st.button("Fetch transcript"):
if not generic_url.strip():
st.error("Please enter a YouTube URL.")
elif not validators.url(generic_url):
st.error("Please enter a valid YouTube URL.")
else:
try:
with st.spinner("Loading transcript..."):
docs = _load_youtube_documents_with_fallbacks(
generic_url,
transcript_language,
youtube_source_mode,
manual_transcript_text,
manual_transcript_file,
)
if not docs:
st.error("No transcript could be extracted from the provided YouTube video.")
else:
_store_youtube_transcript(generic_url, docs)
st.success(
"Transcript ready for export in "
f"{st.session_state.youtube_transcript_language_label} "
f"via {st.session_state.youtube_transcript_source_mode}."
)
except Exception as transcript_err:
st.error(f"Failed to load YouTube transcript: {transcript_err}")
with export_col:
if (
st.session_state.youtube_transcript_text
and st.session_state.youtube_transcript_source_url == generic_url
):
st.caption(
"Prepared transcript: "
f"`{st.session_state.youtube_transcript_language_label}` via "
f"`{st.session_state.youtube_transcript_source_mode}`"
)
st.download_button(
"Export transcript",
data=st.session_state.youtube_transcript_text,
file_name=st.session_state.youtube_transcript_name,
mime="text/plain",
)
if st.button("Summarize content"):
if not groq_api_key.strip():
st.error("Please provide the information to get started")
elif input_source_mode == "URL" and not generic_url.strip():
st.error("Content source is `URL`, so please provide a URL.")
elif input_source_mode == "Upload documents" and not uploaded_files:
st.error("Content source is `Upload documents`, so please upload at least one file.")
elif input_source_mode == "Both" and (not generic_url.strip() or not uploaded_files):
st.error("Content source is `Both`, so please provide a URL and upload at least one file.")
elif generic_url.strip() and not validators.url(generic_url):
st.error("Please enter a valid URL when using the URL field.")
else:
try:
with st.spinner("waiting ...."):
docs: list[Document] = []
if input_source_mode in {"URL", "Both"} and generic_url.strip():
if _is_youtube_url(generic_url):
try:
url_docs = _load_youtube_documents_with_fallbacks(
generic_url,
transcript_language,
youtube_source_mode,
manual_transcript_text,
manual_transcript_file,
)
_store_youtube_transcript(generic_url, url_docs)
except Exception as load_err:
st.error(f"Failed to load YouTube transcript: {load_err}")
st.stop()
else:
_reset_youtube_transcript_state()
try:
url_docs = _load_web_documents(generic_url)
except Exception as load_err:
st.error(f"Failed to fetch URL content: {load_err}")
st.stop()
docs.extend(url_docs)
else:
_reset_youtube_transcript_state()
if input_source_mode in {"Upload documents", "Both"} and uploaded_files:
try:
uploaded_docs = _load_uploaded_documents(uploaded_files)
except Exception as load_err:
st.error(f"Failed to read uploaded document(s): {load_err}")
st.stop()
docs.extend(uploaded_docs)
if input_source_mode == "Both" and generic_url.strip() and uploaded_files:
st.info("Summarizing combined content from the URL and uploaded documents.")
if not docs:
st.error("No content could be extracted from the selected source.")
st.stop()
effective_chain_type, chain_message = _choose_effective_chain_type(
selected_chain_type,
docs,
)
if chain_message:
st.info(chain_message)
docs_for_summary = _prepare_summary_documents(docs, effective_chain_type)
chain = _build_chain(effective_chain_type)
output_summary = _extract_summary_text(
chain.invoke({"input_documents": docs_for_summary})
)
st.success(output_summary)
except Exception as e:
st.error(f"Summarization failed: {e}")