import os import tempfile from io import BytesIO from urllib.parse import quote_plus, urlparse from xml.etree import ElementTree as ET from zipfile import ZipFile import requests import streamlit as st import validators from bs4 import BeautifulSoup from dotenv import load_dotenv from langchain_classic.chains.summarize import load_summarize_chain from langchain_community.document_loaders import UnstructuredURLLoader, YoutubeLoader from langchain_core.documents import Document from langchain_core.prompts import PromptTemplate from langchain_groq import ChatGroq from langchain_text_splitters import RecursiveCharacterTextSplitter from pypdf import PdfReader from requests.adapters import HTTPAdapter from requests import RequestException from requests.exceptions import SSLError from urllib3.util.retry import Retry from youtube_transcript_api import YouTubeTranscriptApi load_dotenv() APP_VERSION = "2026-04-23-hf-youtube-fallbacks-1" SAMPLE_YOUTUBE_URL = "https://youtu.be/ocBh08fjIfU" LANGUAGE_OPTIONS = ["Original", "English", "Arabic", "French", "Bahasa Malay"] LANGUAGE_CODE_MAP = { "English": "en", "Arabic": "ar", "French": "fr", "Bahasa Malay": "ms", } LANGUAGE_LABEL_MAP = { "English": "English", "Arabic": "Arabic", "French": "French", "Bahasa Malay": "Bahasa Melayu", } YOUTUBE_PROXY_ENV_VARS = ( "YOUTUBE_HTTP_PROXY", "YOUTUBE_HTTPS_PROXY", "HTTP_PROXY", "HTTPS_PROXY", ) YOUTUBE_AUDIO_EXTENSIONS = (".m4a", ".mp3", ".mp4", ".mpeg", ".mpga", ".ogg", ".wav", ".webm") def _is_youtube_url(url: str) -> bool: host = urlparse(url).netloc.lower() return "youtube.com" in host or "youtu.be" in host st.set_page_config(page_title="Summarize Text From PDF, YouTube, Website", page_icon="📝") st.title("📝 Summarize Text From PDF, YouTube, Website") # st.subheader("Summarize URL") st.markdown( """ """, unsafe_allow_html=True, ) groq_api_key = os.getenv("GROQ_API_KEY", "") if "url_input" not in st.session_state: st.session_state.url_input = "" if "summary_word_limit" not in st.session_state: st.session_state.summary_word_limit = 400 if "youtube_transcript_text" not in st.session_state: st.session_state.youtube_transcript_text = "" if "youtube_transcript_name" not in st.session_state: st.session_state.youtube_transcript_name = "youtube_transcript.txt" if "youtube_transcript_source_url" not in st.session_state: st.session_state.youtube_transcript_source_url = "" if "youtube_transcript_language_label" not in st.session_state: st.session_state.youtube_transcript_language_label = "Original" if "youtube_transcript_source_mode" not in st.session_state: st.session_state.youtube_transcript_source_mode = "" summary_language = "Original" transcript_language = "Original" with st.sidebar: st.header("Options") st.caption(f"App version: `{APP_VERSION}`") input_source_mode = st.radio( "Content source", options=["URL", "Upload documents", "Both"], index=0, help="Choose which source the app should use for summarization.", ) summary_word_limit = st.slider( "Summary word limit", min_value=100, max_value=1500, step=50, key="summary_word_limit", help="Increase or decrease the target length of the summary.", ) # summary_language = st.selectbox( # "Summary language", # options=LANGUAGE_OPTIONS, # index=0, # help="Choose the language for the generated summary. `Original` keeps the source language when possible.", # ) # transcript_language = st.selectbox( # "Transcript language", # options=LANGUAGE_OPTIONS, # index=0, # help="Choose the language used for YouTube transcript fetching/export. `Original` keeps the available source transcript language.", # ) selected_chain_type = st.radio( "Summarization method", options=["auto", "stuff", "map_reduce", "refine"], index=0, help="`auto` picks the best method based on content size and will upgrade if a simpler method is not a good fit.", ) st.caption( "`stuff` is fastest for short content, `map_reduce` is safer for long content, " "and `refine` is useful when building a summary progressively across chunks." ) if os.getenv("SPACE_ID"): if any(os.getenv(var_name) for var_name in YOUTUBE_PROXY_ENV_VARS): st.info("Hugging Face Space detected. YouTube proxy configuration is present.") else: st.warning( "Hugging Face Space detected. YouTube transcript loading may fail without " "a proxy because YouTube often blocks datacenter IPs." ) st.caption(f"Sample YouTube URL: `{SAMPLE_YOUTUBE_URL}`") if st.button("Use sample YouTube URL"): st.session_state.url_input = SAMPLE_YOUTUBE_URL generic_url = "" uploaded_files = [] youtube_source_mode = "Auto" manual_transcript_text = "" manual_transcript_file = None if input_source_mode in {"URL", "Both"}: st.markdown('
Summarize URL
', unsafe_allow_html=True) generic_url = st.text_input( "URL", key="url_input", label_visibility="collapsed", placeholder=f"Paste a YouTube or website URL, or try {SAMPLE_YOUTUBE_URL}", help="Enter the full YouTube or website URL you want to summarize.", ) if input_source_mode in {"Upload documents", "Both"}: st.markdown('
Upload documents
', unsafe_allow_html=True) uploaded_files = st.file_uploader( "Upload documents", type=["pdf", "txt", "md", "csv", "docx"], accept_multiple_files=True, label_visibility="collapsed", help="Upload one or more documents. Supported formats: PDF, TXT, MD, CSV, DOCX.", ) if uploaded_files: st.caption( "Uploaded files: " + ", ".join(uploaded_file.name for uploaded_file in uploaded_files) ) if input_source_mode in {"URL", "Both"} and generic_url.strip() and _is_youtube_url(generic_url): st.markdown('
YouTube Fallback Options
', unsafe_allow_html=True) youtube_source_mode = st.radio( "YouTube transcript source", options=[ "Auto", "Direct transcript", "External transcript API", "Audio transcription (yt-dlp + Groq)", "Manual transcript", ], index=0, help=( "`Auto` tries direct transcript first, then external API, then yt-dlp + Groq audio transcription. " "`Manual transcript` lets you paste or upload transcript text." ), ) if youtube_source_mode == "Manual transcript": manual_transcript_text = st.text_area( "Paste transcript", height=220, placeholder="Paste the YouTube transcript here if direct fetching is blocked.", ) manual_transcript_file = st.file_uploader( "Upload transcript file", type=["txt", "md", "csv", "srt", "vtt"], help="Upload a transcript file to summarize when direct YouTube access is blocked.", ) else: configured_modes = [] if any(os.getenv(var_name) for var_name in YOUTUBE_PROXY_ENV_VARS): configured_modes.append("direct transcript via proxy") if os.getenv("YOUTUBE_TRANSCRIPT_API_URL"): configured_modes.append("external transcript API") configured_modes.append("audio transcription via yt-dlp + Groq") st.caption("Available fallbacks: " + ", ".join(configured_modes) + ".") llm = ChatGroq(model="llama-3.1-8b-instant", groq_api_key=groq_api_key) REQUEST_HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36", "Accept-Language": "en-US,en;q=0.9", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Referer": "https://www.google.com/", } def _summary_language_instruction(selected_language: str) -> str: if selected_language == "Original": return "Write the summary in the original language of the source content. If the source is mixed-language, use the dominant language." return f"Write the summary in {LANGUAGE_LABEL_MAP[selected_language]}." def _translation_language_instruction(selected_language: str) -> str: if selected_language == "Original": return "Keep the text in its original language." return f"Translate the text into {LANGUAGE_LABEL_MAP[selected_language]}." def _get_summary_prompts(word_limit: int, selected_language: str) -> dict[str, PromptTemplate]: language_instruction = _summary_language_instruction(selected_language) stuff_prompt = PromptTemplate( template=( f"Provide a clear summary of the following content in about {word_limit} words.\n" "Focus on the main ideas, important details, and conclusions.\n" f"{language_instruction}\n" "Content:\n{text}" ), input_variables=["text"], ) map_prompt = PromptTemplate( template=( "Write a concise summary of the following section.\n" f"{language_instruction}\n" "Content:\n{text}" ), input_variables=["text"], ) combine_prompt = PromptTemplate( template=( f"Combine the following partial summaries into a final summary in about {word_limit} words.\n" "Keep the result coherent, non-repetitive, and focused on the most important points.\n" f"{language_instruction}\n" "Partial summaries:\n{text}" ), input_variables=["text"], ) refine_question_prompt = PromptTemplate( template=( f"Provide an initial summary of the following content in about {word_limit} words.\n" f"{language_instruction}\n" "Content:\n{text}" ), input_variables=["text"], ) refine_prompt = PromptTemplate( template=( f"We already have an existing summary:\n{{existing_answer}}\n\n" "Refine it using the additional content below.\n" f"Keep the final summary close to {word_limit} words, avoid repetition, and preserve the most important details.\n" f"{language_instruction}\n" "Additional content:\n{text}" ), input_variables=["existing_answer", "text"], ) return { "stuff": stuff_prompt, "map": map_prompt, "combine": combine_prompt, "refine_question": refine_question_prompt, "refine": refine_prompt, } def _extract_summary_text(result) -> str: if isinstance(result, dict): return result.get("output_text") or result.get("text") or str(result) return str(result) def _translate_documents_with_llm(docs: list[Document], target_language: str) -> list[Document]: if target_language == "Original": return docs translation_prompt = PromptTemplate( template=( f"{_translation_language_instruction(target_language)}\n" "Preserve the meaning faithfully. Do not summarize. Return only the translated text.\n" "Text:\n{text}" ), input_variables=["text"], ) translation_chain = load_summarize_chain( llm, chain_type="stuff", prompt=translation_prompt, ) splitter = RecursiveCharacterTextSplitter(chunk_size=2500, chunk_overlap=200) translated_docs: list[Document] = [] for doc in docs: chunks = splitter.split_documents([doc]) translated_chunks = [] for chunk in chunks: translated_text = _extract_summary_text( translation_chain.invoke({"input_documents": [chunk]}) ) translated_chunks.append(translated_text.strip()) translated_docs.append( Document( page_content="\n\n".join(part for part in translated_chunks if part), metadata={ **doc.metadata, "translated_to": target_language, }, ) ) return translated_docs def _build_youtube_http_client() -> requests.Session: session = requests.Session() session.headers.update( { "User-Agent": ( "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" ), "Accept-Language": "en-US,en;q=0.9", "Accept": "*/*", } ) retry_config = Retry( total=3, connect=3, read=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["GET"], raise_on_status=False, ) adapter = HTTPAdapter(max_retries=retry_config) session.mount("https://", adapter) session.mount("http://", adapter) if os.getenv("YOUTUBE_CA_BUNDLE"): session.verify = os.getenv("YOUTUBE_CA_BUNDLE") return session def _build_youtube_transcript_api() -> YouTubeTranscriptApi: return YouTubeTranscriptApi(http_client=_build_youtube_http_client()) def _looks_like_youtube_ssl_failure(error: Exception) -> bool: error_text = str(error) ssl_markers = ( "HTTPSConnectionPool", "SSLError", "UNEXPECTED_EOF_WHILE_READING", "EOF occurred in violation of protocol", "Max retries exceeded with url", ) return isinstance(error, (SSLError, RequestException)) or any( marker in error_text for marker in ssl_markers ) def _format_youtube_transcript_error(error: Exception) -> str: if _looks_like_youtube_ssl_failure(error): proxy_hint = ( " Configure `YOUTUBE_HTTP_PROXY` / `YOUTUBE_HTTPS_PROXY` " "or standard `HTTP_PROXY` / `HTTPS_PROXY` in the Space secrets." if not any(os.getenv(var_name) for var_name in YOUTUBE_PROXY_ENV_VARS) else " Check that the configured outbound proxy is reachable from the Space." ) return ( "[HF-YT-SSL-001] The deployment could not establish a stable HTTPS connection to YouTube. " "This is common on cloud-hosted runtimes such as Hugging Face Spaces because " "YouTube often blocks or interrupts traffic from datacenter IPs." f"{proxy_hint}" ) return str(error) def _resolve_transcript(video_id: str, selected_language: str): api = _build_youtube_transcript_api() try: transcript_list = api.list(video_id) except Exception as exc: raise RuntimeError(_format_youtube_transcript_error(exc)) from exc available_transcripts = list(transcript_list) if selected_language == "Original": if not available_transcripts: raise ValueError("No transcript is available for this video.") return available_transcripts[0], "Original" if not available_transcripts: raise ValueError("No transcript is available for this video.") target_language_code = LANGUAGE_CODE_MAP[selected_language] try: return transcript_list.find_transcript([target_language_code]), selected_language except Exception: for base_transcript in available_transcripts: if not base_transcript.is_translatable: continue try: return base_transcript.translate(target_language_code), selected_language except Exception: continue available_languages = ", ".join( sorted( { f"{transcript.language} ({transcript.language_code})" for transcript in available_transcripts } ) ) raise ValueError( f"Could not provide transcript in {selected_language}. " f"Available transcript languages: {available_languages}" ) def _load_youtube_documents(url: str, selected_language: str) -> list[Document]: video_id = YoutubeLoader.extract_video_id(url) should_translate_with_llm = False try: transcript, transcript_language_label = _resolve_transcript(video_id, selected_language) except ValueError: if selected_language == "Original": raise transcript, transcript_language_label = _resolve_transcript(video_id, "Original") should_translate_with_llm = True try: fetched_transcript = transcript.fetch() except Exception as exc: raise RuntimeError(_format_youtube_transcript_error(exc)) from exc transcript_text = " ".join(snippet.text.strip() for snippet in fetched_transcript if snippet.text.strip()) if not transcript_text: raise ValueError("No transcript text could be extracted from this video.") docs = [ Document( page_content=transcript_text, metadata={ "source": url, "video_id": video_id, "language": fetched_transcript.language, "language_code": fetched_transcript.language_code, "is_generated": fetched_transcript.is_generated, "transcript_language_label": transcript_language_label, }, ) ] if should_translate_with_llm: docs = _translate_documents_with_llm(docs, selected_language) for doc in docs: doc.metadata["transcript_language_label"] = f"{selected_language} (LLM translated)" return docs def _make_transcript_filename(url: str) -> str: video_id = YoutubeLoader.extract_video_id(url) return f"youtube_transcript_{video_id}.txt" def _reset_youtube_transcript_state() -> None: st.session_state.youtube_transcript_text = "" st.session_state.youtube_transcript_name = "youtube_transcript.txt" st.session_state.youtube_transcript_source_url = "" st.session_state.youtube_transcript_language_label = "Original" st.session_state.youtube_transcript_source_mode = "" def _store_youtube_transcript(url: str, docs: list[Document]) -> None: st.session_state.youtube_transcript_text = "\n\n".join( doc.page_content for doc in docs if doc.page_content.strip() ) st.session_state.youtube_transcript_name = _make_transcript_filename(url) st.session_state.youtube_transcript_source_url = url st.session_state.youtube_transcript_language_label = docs[0].metadata.get( "transcript_language_label", docs[0].metadata.get("language", "Original"), ) st.session_state.youtube_transcript_source_mode = docs[0].metadata.get( "transcript_source_mode", "Direct transcript", ) def _normalize_transcript_text(raw_text: str) -> str: lines = [line.strip() for line in raw_text.splitlines()] return "\n".join(line for line in lines if line) def _read_uploaded_text_file(uploaded_file) -> str: return uploaded_file.getvalue().decode("utf-8", errors="ignore").strip() def _build_transcript_documents( url: str, transcript_text: str, language_label: str, source_mode: str, ) -> list[Document]: normalized_text = _normalize_transcript_text(transcript_text) if not normalized_text: raise ValueError("Transcript text is empty.") return [ Document( page_content=normalized_text, metadata={ "source": url, "video_id": YoutubeLoader.extract_video_id(url), "transcript_language_label": language_label, "transcript_source_mode": source_mode, }, ) ] def _load_manual_transcript_documents( url: str, selected_language: str, transcript_text: str, transcript_file, ) -> list[Document]: combined_parts = [] if transcript_text.strip(): combined_parts.append(transcript_text.strip()) if transcript_file is not None: combined_parts.append(_read_uploaded_text_file(transcript_file)) combined_text = "\n\n".join(part for part in combined_parts if part.strip()) if not combined_text.strip(): raise ValueError("Please paste a transcript or upload a transcript file.") docs = _build_transcript_documents( url, combined_text, "Original", "Manual transcript", ) if selected_language != "Original": docs = _translate_documents_with_llm(docs, selected_language) for doc in docs: doc.metadata["transcript_language_label"] = f"{selected_language} (LLM translated)" return docs def _extract_transcript_text_from_payload(payload) -> str: if isinstance(payload, str): return payload.strip() if isinstance(payload, list): text_parts = [] for item in payload: extracted = _extract_transcript_text_from_payload(item) if extracted: text_parts.append(extracted) return "\n".join(part for part in text_parts if part) if isinstance(payload, dict): for key in ("text", "transcript", "content", "full_text", "body"): value = payload.get(key) if isinstance(value, str) and value.strip(): return value.strip() for key in ("data", "result", "results", "transcription", "response"): if key in payload: extracted = _extract_transcript_text_from_payload(payload[key]) if extracted: return extracted for key in ("segments", "items", "captions", "chunks", "utterances"): value = payload.get(key) if isinstance(value, list): extracted = _extract_transcript_text_from_payload(value) if extracted: return extracted return "" def _load_youtube_documents_via_external_api(url: str, selected_language: str) -> list[Document]: api_url = os.getenv("YOUTUBE_TRANSCRIPT_API_URL", "").strip() if not api_url: raise ValueError( "External transcript API is not configured. Set `YOUTUBE_TRANSCRIPT_API_URL` in Space secrets." ) video_id = YoutubeLoader.extract_video_id(url) language_code = LANGUAGE_CODE_MAP.get(selected_language, "") formatted_url = api_url.format( video_id=video_id, url=quote_plus(url), language_code=language_code, ) method = os.getenv("YOUTUBE_TRANSCRIPT_API_METHOD", "GET").strip().upper() timeout_seconds = int(os.getenv("YOUTUBE_TRANSCRIPT_API_TIMEOUT", "45")) api_key = os.getenv("YOUTUBE_TRANSCRIPT_API_KEY", "").strip() api_key_header = os.getenv("YOUTUBE_TRANSCRIPT_API_KEY_HEADER", "Authorization").strip() headers = {"Accept": "application/json"} if api_key: if api_key_header.lower() == "authorization": headers[api_key_header] = f"Bearer {api_key}" else: headers[api_key_header] = api_key payload = { "video_id": video_id, "url": url, "language": language_code or None, } if method == "POST": response = requests.post(formatted_url, json=payload, headers=headers, timeout=timeout_seconds) else: response = requests.get(formatted_url, params=payload, headers=headers, timeout=timeout_seconds) response.raise_for_status() try: parsed_payload = response.json() except ValueError: parsed_payload = response.text transcript_text = _extract_transcript_text_from_payload(parsed_payload) if not transcript_text: raise ValueError("External transcript API response did not contain usable transcript text.") docs = _build_transcript_documents( url, transcript_text, selected_language if selected_language != "Original" else "Original", "External transcript API", ) if selected_language != "Original": for doc in docs: doc.metadata["transcript_language_label"] = selected_language return docs def _download_youtube_audio(url: str, video_id: str) -> str: try: import yt_dlp except ImportError as exc: raise RuntimeError("`yt-dlp` is not installed in this Space build.") from exc with tempfile.TemporaryDirectory() as temp_dir: output_template = os.path.join(temp_dir, f"{video_id}.%(ext)s") ydl_opts = { "format": "bestaudio[ext=m4a]/bestaudio[ext=webm]/bestaudio/best", "outtmpl": output_template, "quiet": True, "no_warnings": True, "noprogress": True, "skip_download": False, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.extract_info(url, download=True) audio_files = [ os.path.join(temp_dir, file_name) for file_name in os.listdir(temp_dir) if os.path.splitext(file_name)[1].lower() in YOUTUBE_AUDIO_EXTENSIONS ] if not audio_files: raise RuntimeError("yt-dlp did not produce a supported audio file for transcription.") source_path = max(audio_files, key=os.path.getsize) persisted_path = os.path.join(tempfile.gettempdir(), os.path.basename(source_path)) with open(source_path, "rb") as source_file, open(persisted_path, "wb") as target_file: target_file.write(source_file.read()) return persisted_path def _transcribe_audio_with_groq(audio_path: str, selected_language: str) -> str: if not groq_api_key.strip(): raise ValueError("`GROQ_API_KEY` is required for audio transcription fallback.") model_name = os.getenv("GROQ_AUDIO_TRANSCRIPTION_MODEL", "whisper-large-v3-turbo") payload = { "model": model_name, "response_format": "json", "temperature": "0", } if selected_language != "Original": payload["language"] = LANGUAGE_CODE_MAP[selected_language] with open(audio_path, "rb") as audio_file: response = requests.post( "https://api.groq.com/openai/v1/audio/transcriptions", headers={"Authorization": f"Bearer {groq_api_key}"}, data=payload, files={"file": (os.path.basename(audio_path), audio_file)}, timeout=300, ) response.raise_for_status() transcript_text = response.json().get("text", "").strip() if not transcript_text: raise ValueError("Groq audio transcription returned empty text.") return transcript_text def _load_youtube_documents_via_audio_transcription(url: str, selected_language: str) -> list[Document]: video_id = YoutubeLoader.extract_video_id(url) audio_path = _download_youtube_audio(url, video_id) try: transcript_text = _transcribe_audio_with_groq(audio_path, selected_language) finally: if os.path.exists(audio_path): os.remove(audio_path) return _build_transcript_documents( url, transcript_text, selected_language if selected_language != "Original" else "Original", "Audio transcription (yt-dlp + Groq)", ) def _load_youtube_documents_with_fallbacks( url: str, selected_language: str, source_mode: str, transcript_text: str, transcript_file, ) -> list[Document]: if source_mode == "Manual transcript": return _load_manual_transcript_documents(url, selected_language, transcript_text, transcript_file) strategies = [] if source_mode in {"Auto", "Direct transcript"}: strategies.append(("Direct transcript", lambda: _load_youtube_documents(url, selected_language))) if source_mode in {"Auto", "External transcript API"}: strategies.append( ("External transcript API", lambda: _load_youtube_documents_via_external_api(url, selected_language)) ) if source_mode in {"Auto", "Audio transcription (yt-dlp + Groq)"}: strategies.append( ( "Audio transcription (yt-dlp + Groq)", lambda: _load_youtube_documents_via_audio_transcription(url, selected_language), ) ) failures = [] for strategy_name, loader in strategies: try: return loader() except Exception as exc: failures.append(f"{strategy_name}: {exc}") if source_mode == "Auto" and (transcript_text.strip() or transcript_file is not None): return _load_manual_transcript_documents(url, selected_language, transcript_text, transcript_file) if not failures: raise ValueError("No YouTube transcript strategy is available for the selected mode.") raise RuntimeError("All YouTube transcript strategies failed.\n" + "\n".join(failures)) def _has_meaningful_content(docs: list[Document], min_chars: int = 300) -> bool: combined_text = " ".join(doc.page_content.strip() for doc in docs if doc.page_content.strip()) return len(combined_text) >= min_chars def _extract_text_from_html(html: str) -> str: soup = BeautifulSoup(html, "html.parser") for tag in soup(["script", "style", "noscript", "svg"]): tag.decompose() meta_description = "" meta_tag = soup.find("meta", attrs={"name": "description"}) if meta_tag and meta_tag.get("content"): meta_description = meta_tag["content"].strip() main_candidates = soup.select("main, article, [role='main'], .content, .article-body") text_parts = [] for candidate in main_candidates: candidate_text = " ".join(candidate.stripped_strings) if len(candidate_text) > 200: text_parts.append(candidate_text) if not text_parts: body_text = " ".join(soup.stripped_strings) if body_text: text_parts.append(body_text) if meta_description: text_parts.insert(0, meta_description) return "\n\n".join(dict.fromkeys(part for part in text_parts if part)) def _load_web_documents(url: str) -> list[Document]: try: loader = UnstructuredURLLoader( urls=[url], ssl_verify=False, headers=REQUEST_HEADERS, ) docs = loader.load() if _has_meaningful_content(docs): return docs except Exception as loader_error: last_error = loader_error else: last_error = ValueError("Primary URL loader returned too little readable content.") session = requests.Session() for candidate_url in [url, url.rstrip("/")]: if not candidate_url: continue try: response = session.get( candidate_url, headers=REQUEST_HEADERS, timeout=20, verify=False, allow_redirects=True, ) response.encoding = response.encoding or response.apparent_encoding or "utf-8" if not response.text.strip(): continue text = _extract_text_from_html(response.text) if not text or len(text) < 300: continue soup = BeautifulSoup(response.text, "html.parser") title = soup.title.string.strip() if soup.title and soup.title.string else candidate_url st.info("Primary URL loader failed or returned too little content. Used HTML fallback extraction instead.") return [ Document( page_content=text, metadata={ "source": candidate_url, "title": title, "http_status": response.status_code, }, ) ] except RequestException as request_error: last_error = request_error raise ValueError( f"Could not load readable text from the URL. Last loader error: {last_error}" ) def _load_uploaded_documents(files) -> list[Document]: docs: list[Document] = [] for uploaded_file in files: file_name = uploaded_file.name extension = os.path.splitext(file_name)[1].lower() file_bytes = uploaded_file.getvalue() if extension == ".pdf": reader = PdfReader(BytesIO(file_bytes)) pages = [] for page_number, page in enumerate(reader.pages, start=1): page_text = (page.extract_text() or "").strip() if page_text: pages.append( Document( page_content=page_text, metadata={ "source": file_name, "page": page_number, "type": "uploaded_file", }, ) ) docs.extend(pages) continue if extension in {".txt", ".md", ".csv"}: text = file_bytes.decode("utf-8", errors="ignore").strip() if text: docs.append( Document( page_content=text, metadata={"source": file_name, "type": "uploaded_file"}, ) ) continue if extension == ".docx": with ZipFile(BytesIO(file_bytes)) as docx_zip: document_xml = docx_zip.read("word/document.xml") root = ET.fromstring(document_xml) namespace = {"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main"} paragraphs = [] for paragraph in root.findall(".//w:p", namespace): texts = [ node.text for node in paragraph.findall(".//w:t", namespace) if node.text ] paragraph_text = "".join(texts).strip() if paragraph_text: paragraphs.append(paragraph_text) text = "\n\n".join(paragraphs).strip() if text: docs.append( Document( page_content=text, metadata={"source": file_name, "type": "uploaded_file"}, ) ) continue raise ValueError(f"Unsupported file type: {file_name}") return docs def _build_chain(selected_chain_type: str): prompts = _get_summary_prompts(summary_word_limit, summary_language) if selected_chain_type == "stuff": return load_summarize_chain(llm, chain_type="stuff", prompt=prompts["stuff"]) if selected_chain_type == "map_reduce": return load_summarize_chain( llm, chain_type="map_reduce", map_prompt=prompts["map"], combine_prompt=prompts["combine"], ) return load_summarize_chain( llm, chain_type="refine", question_prompt=prompts["refine_question"], refine_prompt=prompts["refine"], ) def _prepare_summary_documents(docs: list[Document], selected_chain_type: str) -> list[Document]: splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=200) split_docs = splitter.split_documents(docs) if selected_chain_type == "stuff": return split_docs[:3] if selected_chain_type == "refine": return split_docs[:10] return split_docs[:8] def _choose_effective_chain_type(requested_chain_type: str, docs: list[Document]) -> tuple[str, str | None]: splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=200) split_docs = splitter.split_documents(docs) chunk_count = len(split_docs) total_chars = sum(len(doc.page_content) for doc in split_docs) if chunk_count <= 3 and total_chars <= 6000: recommended = "stuff" elif chunk_count <= 10: recommended = "refine" else: recommended = "map_reduce" if requested_chain_type == "auto": return recommended, f"Auto-selected `{recommended}` based on content size." if requested_chain_type == "stuff" and recommended != "stuff": return recommended, f"Switched from `stuff` to `{recommended}` because the content is too large for a reliable single-pass summary." if requested_chain_type == "refine" and chunk_count > 12: return "map_reduce", "Switched from `refine` to `map_reduce` because the content is large enough that map-reduce is more reliable." return requested_chain_type, None if input_source_mode in {"URL", "Both"} and _is_youtube_url(generic_url): st.video(generic_url) transcript_col, export_col = st.columns(2) with transcript_col: if st.button("Fetch transcript"): if not generic_url.strip(): st.error("Please enter a YouTube URL.") elif not validators.url(generic_url): st.error("Please enter a valid YouTube URL.") else: try: with st.spinner("Loading transcript..."): docs = _load_youtube_documents_with_fallbacks( generic_url, transcript_language, youtube_source_mode, manual_transcript_text, manual_transcript_file, ) if not docs: st.error("No transcript could be extracted from the provided YouTube video.") else: _store_youtube_transcript(generic_url, docs) st.success( "Transcript ready for export in " f"{st.session_state.youtube_transcript_language_label} " f"via {st.session_state.youtube_transcript_source_mode}." ) except Exception as transcript_err: st.error(f"Failed to load YouTube transcript: {transcript_err}") with export_col: if ( st.session_state.youtube_transcript_text and st.session_state.youtube_transcript_source_url == generic_url ): st.caption( "Prepared transcript: " f"`{st.session_state.youtube_transcript_language_label}` via " f"`{st.session_state.youtube_transcript_source_mode}`" ) st.download_button( "Export transcript", data=st.session_state.youtube_transcript_text, file_name=st.session_state.youtube_transcript_name, mime="text/plain", ) if st.button("Summarize content"): if not groq_api_key.strip(): st.error("Please provide the information to get started") elif input_source_mode == "URL" and not generic_url.strip(): st.error("Content source is `URL`, so please provide a URL.") elif input_source_mode == "Upload documents" and not uploaded_files: st.error("Content source is `Upload documents`, so please upload at least one file.") elif input_source_mode == "Both" and (not generic_url.strip() or not uploaded_files): st.error("Content source is `Both`, so please provide a URL and upload at least one file.") elif generic_url.strip() and not validators.url(generic_url): st.error("Please enter a valid URL when using the URL field.") else: try: with st.spinner("waiting ...."): docs: list[Document] = [] if input_source_mode in {"URL", "Both"} and generic_url.strip(): if _is_youtube_url(generic_url): try: url_docs = _load_youtube_documents_with_fallbacks( generic_url, transcript_language, youtube_source_mode, manual_transcript_text, manual_transcript_file, ) _store_youtube_transcript(generic_url, url_docs) except Exception as load_err: st.error(f"Failed to load YouTube transcript: {load_err}") st.stop() else: _reset_youtube_transcript_state() try: url_docs = _load_web_documents(generic_url) except Exception as load_err: st.error(f"Failed to fetch URL content: {load_err}") st.stop() docs.extend(url_docs) else: _reset_youtube_transcript_state() if input_source_mode in {"Upload documents", "Both"} and uploaded_files: try: uploaded_docs = _load_uploaded_documents(uploaded_files) except Exception as load_err: st.error(f"Failed to read uploaded document(s): {load_err}") st.stop() docs.extend(uploaded_docs) if input_source_mode == "Both" and generic_url.strip() and uploaded_files: st.info("Summarizing combined content from the URL and uploaded documents.") if not docs: st.error("No content could be extracted from the selected source.") st.stop() effective_chain_type, chain_message = _choose_effective_chain_type( selected_chain_type, docs, ) if chain_message: st.info(chain_message) docs_for_summary = _prepare_summary_documents(docs, effective_chain_type) chain = _build_chain(effective_chain_type) output_summary = _extract_summary_text( chain.invoke({"input_documents": docs_for_summary}) ) st.success(output_summary) except Exception as e: st.error(f"Summarization failed: {e}")