Spaces:
Sleeping
Sleeping
| import os | |
| import tempfile | |
| from io import BytesIO | |
| from urllib.parse import quote_plus, urlparse | |
| from xml.etree import ElementTree as ET | |
| from zipfile import ZipFile | |
| import requests | |
| import streamlit as st | |
| import validators | |
| from bs4 import BeautifulSoup | |
| from dotenv import load_dotenv | |
| from langchain_classic.chains.summarize import load_summarize_chain | |
| from langchain_community.document_loaders import UnstructuredURLLoader, YoutubeLoader | |
| from langchain_core.documents import Document | |
| from langchain_core.prompts import PromptTemplate | |
| from langchain_groq import ChatGroq | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from pypdf import PdfReader | |
| from requests.adapters import HTTPAdapter | |
| from requests import RequestException | |
| from requests.exceptions import SSLError | |
| from urllib3.util.retry import Retry | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| load_dotenv() | |
| APP_VERSION = "2026-04-23-hf-youtube-fallbacks-1" | |
| SAMPLE_YOUTUBE_URL = "https://youtu.be/ocBh08fjIfU" | |
| LANGUAGE_OPTIONS = ["Original", "English", "Arabic", "French", "Bahasa Malay"] | |
| LANGUAGE_CODE_MAP = { | |
| "English": "en", | |
| "Arabic": "ar", | |
| "French": "fr", | |
| "Bahasa Malay": "ms", | |
| } | |
| LANGUAGE_LABEL_MAP = { | |
| "English": "English", | |
| "Arabic": "Arabic", | |
| "French": "French", | |
| "Bahasa Malay": "Bahasa Melayu", | |
| } | |
| YOUTUBE_PROXY_ENV_VARS = ( | |
| "YOUTUBE_HTTP_PROXY", | |
| "YOUTUBE_HTTPS_PROXY", | |
| "HTTP_PROXY", | |
| "HTTPS_PROXY", | |
| ) | |
| YOUTUBE_AUDIO_EXTENSIONS = (".m4a", ".mp3", ".mp4", ".mpeg", ".mpga", ".ogg", ".wav", ".webm") | |
| def _is_youtube_url(url: str) -> bool: | |
| host = urlparse(url).netloc.lower() | |
| return "youtube.com" in host or "youtu.be" in host | |
| st.set_page_config(page_title="Summarize Text From PDF, YouTube, Website", page_icon="๐") | |
| st.title("๐ Summarize Text From PDF, YouTube, Website") | |
| # st.subheader("Summarize URL") | |
| st.markdown( | |
| """ | |
| <style> | |
| .source-section-label { | |
| font-size: 1rem; | |
| font-weight: 600; | |
| margin-top: 0.35rem; | |
| margin-bottom: 0.3rem; | |
| } | |
| </style> | |
| """, | |
| unsafe_allow_html=True, | |
| ) | |
| groq_api_key = os.getenv("GROQ_API_KEY", "") | |
| if "url_input" not in st.session_state: | |
| st.session_state.url_input = "" | |
| if "summary_word_limit" not in st.session_state: | |
| st.session_state.summary_word_limit = 400 | |
| if "youtube_transcript_text" not in st.session_state: | |
| st.session_state.youtube_transcript_text = "" | |
| if "youtube_transcript_name" not in st.session_state: | |
| st.session_state.youtube_transcript_name = "youtube_transcript.txt" | |
| if "youtube_transcript_source_url" not in st.session_state: | |
| st.session_state.youtube_transcript_source_url = "" | |
| if "youtube_transcript_language_label" not in st.session_state: | |
| st.session_state.youtube_transcript_language_label = "Original" | |
| if "youtube_transcript_source_mode" not in st.session_state: | |
| st.session_state.youtube_transcript_source_mode = "" | |
| summary_language = "Original" | |
| transcript_language = "Original" | |
| with st.sidebar: | |
| st.header("Options") | |
| st.caption(f"App version: `{APP_VERSION}`") | |
| input_source_mode = st.radio( | |
| "Content source", | |
| options=["URL", "Upload documents", "Both"], | |
| index=0, | |
| help="Choose which source the app should use for summarization.", | |
| ) | |
| summary_word_limit = st.slider( | |
| "Summary word limit", | |
| min_value=100, | |
| max_value=1500, | |
| step=50, | |
| key="summary_word_limit", | |
| help="Increase or decrease the target length of the summary.", | |
| ) | |
| # summary_language = st.selectbox( | |
| # "Summary language", | |
| # options=LANGUAGE_OPTIONS, | |
| # index=0, | |
| # help="Choose the language for the generated summary. `Original` keeps the source language when possible.", | |
| # ) | |
| # transcript_language = st.selectbox( | |
| # "Transcript language", | |
| # options=LANGUAGE_OPTIONS, | |
| # index=0, | |
| # help="Choose the language used for YouTube transcript fetching/export. `Original` keeps the available source transcript language.", | |
| # ) | |
| selected_chain_type = st.radio( | |
| "Summarization method", | |
| options=["auto", "stuff", "map_reduce", "refine"], | |
| index=0, | |
| help="`auto` picks the best method based on content size and will upgrade if a simpler method is not a good fit.", | |
| ) | |
| st.caption( | |
| "`stuff` is fastest for short content, `map_reduce` is safer for long content, " | |
| "and `refine` is useful when building a summary progressively across chunks." | |
| ) | |
| if os.getenv("SPACE_ID"): | |
| if any(os.getenv(var_name) for var_name in YOUTUBE_PROXY_ENV_VARS): | |
| st.info("Hugging Face Space detected. YouTube proxy configuration is present.") | |
| else: | |
| st.warning( | |
| "Hugging Face Space detected. YouTube transcript loading may fail without " | |
| "a proxy because YouTube often blocks datacenter IPs." | |
| ) | |
| st.caption(f"Sample YouTube URL: `{SAMPLE_YOUTUBE_URL}`") | |
| if st.button("Use sample YouTube URL"): | |
| st.session_state.url_input = SAMPLE_YOUTUBE_URL | |
| generic_url = "" | |
| uploaded_files = [] | |
| youtube_source_mode = "Auto" | |
| manual_transcript_text = "" | |
| manual_transcript_file = None | |
| if input_source_mode in {"URL", "Both"}: | |
| st.markdown('<div class="source-section-label">Summarize URL</div>', unsafe_allow_html=True) | |
| generic_url = st.text_input( | |
| "URL", | |
| key="url_input", | |
| label_visibility="collapsed", | |
| placeholder=f"Paste a YouTube or website URL, or try {SAMPLE_YOUTUBE_URL}", | |
| help="Enter the full YouTube or website URL you want to summarize.", | |
| ) | |
| if input_source_mode in {"Upload documents", "Both"}: | |
| st.markdown('<div class="source-section-label">Upload documents</div>', unsafe_allow_html=True) | |
| uploaded_files = st.file_uploader( | |
| "Upload documents", | |
| type=["pdf", "txt", "md", "csv", "docx"], | |
| accept_multiple_files=True, | |
| label_visibility="collapsed", | |
| help="Upload one or more documents. Supported formats: PDF, TXT, MD, CSV, DOCX.", | |
| ) | |
| if uploaded_files: | |
| st.caption( | |
| "Uploaded files: " + ", ".join(uploaded_file.name for uploaded_file in uploaded_files) | |
| ) | |
| if input_source_mode in {"URL", "Both"} and generic_url.strip() and _is_youtube_url(generic_url): | |
| st.markdown('<div class="source-section-label">YouTube Fallback Options</div>', unsafe_allow_html=True) | |
| youtube_source_mode = st.radio( | |
| "YouTube transcript source", | |
| options=[ | |
| "Auto", | |
| "Direct transcript", | |
| "External transcript API", | |
| "Audio transcription (yt-dlp + Groq)", | |
| "Manual transcript", | |
| ], | |
| index=0, | |
| help=( | |
| "`Auto` tries direct transcript first, then external API, then yt-dlp + Groq audio transcription. " | |
| "`Manual transcript` lets you paste or upload transcript text." | |
| ), | |
| ) | |
| if youtube_source_mode == "Manual transcript": | |
| manual_transcript_text = st.text_area( | |
| "Paste transcript", | |
| height=220, | |
| placeholder="Paste the YouTube transcript here if direct fetching is blocked.", | |
| ) | |
| manual_transcript_file = st.file_uploader( | |
| "Upload transcript file", | |
| type=["txt", "md", "csv", "srt", "vtt"], | |
| help="Upload a transcript file to summarize when direct YouTube access is blocked.", | |
| ) | |
| else: | |
| configured_modes = [] | |
| if any(os.getenv(var_name) for var_name in YOUTUBE_PROXY_ENV_VARS): | |
| configured_modes.append("direct transcript via proxy") | |
| if os.getenv("YOUTUBE_TRANSCRIPT_API_URL"): | |
| configured_modes.append("external transcript API") | |
| configured_modes.append("audio transcription via yt-dlp + Groq") | |
| st.caption("Available fallbacks: " + ", ".join(configured_modes) + ".") | |
| llm = ChatGroq(model="llama-3.1-8b-instant", groq_api_key=groq_api_key) | |
| REQUEST_HEADERS = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36", | |
| "Accept-Language": "en-US,en;q=0.9", | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", | |
| "Referer": "https://www.google.com/", | |
| } | |
| def _summary_language_instruction(selected_language: str) -> str: | |
| if selected_language == "Original": | |
| return "Write the summary in the original language of the source content. If the source is mixed-language, use the dominant language." | |
| return f"Write the summary in {LANGUAGE_LABEL_MAP[selected_language]}." | |
| def _translation_language_instruction(selected_language: str) -> str: | |
| if selected_language == "Original": | |
| return "Keep the text in its original language." | |
| return f"Translate the text into {LANGUAGE_LABEL_MAP[selected_language]}." | |
| def _get_summary_prompts(word_limit: int, selected_language: str) -> dict[str, PromptTemplate]: | |
| language_instruction = _summary_language_instruction(selected_language) | |
| stuff_prompt = PromptTemplate( | |
| template=( | |
| f"Provide a clear summary of the following content in about {word_limit} words.\n" | |
| "Focus on the main ideas, important details, and conclusions.\n" | |
| f"{language_instruction}\n" | |
| "Content:\n{text}" | |
| ), | |
| input_variables=["text"], | |
| ) | |
| map_prompt = PromptTemplate( | |
| template=( | |
| "Write a concise summary of the following section.\n" | |
| f"{language_instruction}\n" | |
| "Content:\n{text}" | |
| ), | |
| input_variables=["text"], | |
| ) | |
| combine_prompt = PromptTemplate( | |
| template=( | |
| f"Combine the following partial summaries into a final summary in about {word_limit} words.\n" | |
| "Keep the result coherent, non-repetitive, and focused on the most important points.\n" | |
| f"{language_instruction}\n" | |
| "Partial summaries:\n{text}" | |
| ), | |
| input_variables=["text"], | |
| ) | |
| refine_question_prompt = PromptTemplate( | |
| template=( | |
| f"Provide an initial summary of the following content in about {word_limit} words.\n" | |
| f"{language_instruction}\n" | |
| "Content:\n{text}" | |
| ), | |
| input_variables=["text"], | |
| ) | |
| refine_prompt = PromptTemplate( | |
| template=( | |
| f"We already have an existing summary:\n{{existing_answer}}\n\n" | |
| "Refine it using the additional content below.\n" | |
| f"Keep the final summary close to {word_limit} words, avoid repetition, and preserve the most important details.\n" | |
| f"{language_instruction}\n" | |
| "Additional content:\n{text}" | |
| ), | |
| input_variables=["existing_answer", "text"], | |
| ) | |
| return { | |
| "stuff": stuff_prompt, | |
| "map": map_prompt, | |
| "combine": combine_prompt, | |
| "refine_question": refine_question_prompt, | |
| "refine": refine_prompt, | |
| } | |
| def _extract_summary_text(result) -> str: | |
| if isinstance(result, dict): | |
| return result.get("output_text") or result.get("text") or str(result) | |
| return str(result) | |
| def _translate_documents_with_llm(docs: list[Document], target_language: str) -> list[Document]: | |
| if target_language == "Original": | |
| return docs | |
| translation_prompt = PromptTemplate( | |
| template=( | |
| f"{_translation_language_instruction(target_language)}\n" | |
| "Preserve the meaning faithfully. Do not summarize. Return only the translated text.\n" | |
| "Text:\n{text}" | |
| ), | |
| input_variables=["text"], | |
| ) | |
| translation_chain = load_summarize_chain( | |
| llm, | |
| chain_type="stuff", | |
| prompt=translation_prompt, | |
| ) | |
| splitter = RecursiveCharacterTextSplitter(chunk_size=2500, chunk_overlap=200) | |
| translated_docs: list[Document] = [] | |
| for doc in docs: | |
| chunks = splitter.split_documents([doc]) | |
| translated_chunks = [] | |
| for chunk in chunks: | |
| translated_text = _extract_summary_text( | |
| translation_chain.invoke({"input_documents": [chunk]}) | |
| ) | |
| translated_chunks.append(translated_text.strip()) | |
| translated_docs.append( | |
| Document( | |
| page_content="\n\n".join(part for part in translated_chunks if part), | |
| metadata={ | |
| **doc.metadata, | |
| "translated_to": target_language, | |
| }, | |
| ) | |
| ) | |
| return translated_docs | |
| def _build_youtube_http_client() -> requests.Session: | |
| session = requests.Session() | |
| session.headers.update( | |
| { | |
| "User-Agent": ( | |
| "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " | |
| "(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" | |
| ), | |
| "Accept-Language": "en-US,en;q=0.9", | |
| "Accept": "*/*", | |
| } | |
| ) | |
| retry_config = Retry( | |
| total=3, | |
| connect=3, | |
| read=3, | |
| backoff_factor=1, | |
| status_forcelist=[429, 500, 502, 503, 504], | |
| allowed_methods=["GET"], | |
| raise_on_status=False, | |
| ) | |
| adapter = HTTPAdapter(max_retries=retry_config) | |
| session.mount("https://", adapter) | |
| session.mount("http://", adapter) | |
| if os.getenv("YOUTUBE_CA_BUNDLE"): | |
| session.verify = os.getenv("YOUTUBE_CA_BUNDLE") | |
| return session | |
| def _build_youtube_transcript_api() -> YouTubeTranscriptApi: | |
| return YouTubeTranscriptApi(http_client=_build_youtube_http_client()) | |
| def _looks_like_youtube_ssl_failure(error: Exception) -> bool: | |
| error_text = str(error) | |
| ssl_markers = ( | |
| "HTTPSConnectionPool", | |
| "SSLError", | |
| "UNEXPECTED_EOF_WHILE_READING", | |
| "EOF occurred in violation of protocol", | |
| "Max retries exceeded with url", | |
| ) | |
| return isinstance(error, (SSLError, RequestException)) or any( | |
| marker in error_text for marker in ssl_markers | |
| ) | |
| def _format_youtube_transcript_error(error: Exception) -> str: | |
| if _looks_like_youtube_ssl_failure(error): | |
| proxy_hint = ( | |
| " Configure `YOUTUBE_HTTP_PROXY` / `YOUTUBE_HTTPS_PROXY` " | |
| "or standard `HTTP_PROXY` / `HTTPS_PROXY` in the Space secrets." | |
| if not any(os.getenv(var_name) for var_name in YOUTUBE_PROXY_ENV_VARS) | |
| else " Check that the configured outbound proxy is reachable from the Space." | |
| ) | |
| return ( | |
| "[HF-YT-SSL-001] The deployment could not establish a stable HTTPS connection to YouTube. " | |
| "This is common on cloud-hosted runtimes such as Hugging Face Spaces because " | |
| "YouTube often blocks or interrupts traffic from datacenter IPs." | |
| f"{proxy_hint}" | |
| ) | |
| return str(error) | |
| def _resolve_transcript(video_id: str, selected_language: str): | |
| api = _build_youtube_transcript_api() | |
| try: | |
| transcript_list = api.list(video_id) | |
| except Exception as exc: | |
| raise RuntimeError(_format_youtube_transcript_error(exc)) from exc | |
| available_transcripts = list(transcript_list) | |
| if selected_language == "Original": | |
| if not available_transcripts: | |
| raise ValueError("No transcript is available for this video.") | |
| return available_transcripts[0], "Original" | |
| if not available_transcripts: | |
| raise ValueError("No transcript is available for this video.") | |
| target_language_code = LANGUAGE_CODE_MAP[selected_language] | |
| try: | |
| return transcript_list.find_transcript([target_language_code]), selected_language | |
| except Exception: | |
| for base_transcript in available_transcripts: | |
| if not base_transcript.is_translatable: | |
| continue | |
| try: | |
| return base_transcript.translate(target_language_code), selected_language | |
| except Exception: | |
| continue | |
| available_languages = ", ".join( | |
| sorted( | |
| { | |
| f"{transcript.language} ({transcript.language_code})" | |
| for transcript in available_transcripts | |
| } | |
| ) | |
| ) | |
| raise ValueError( | |
| f"Could not provide transcript in {selected_language}. " | |
| f"Available transcript languages: {available_languages}" | |
| ) | |
| def _load_youtube_documents(url: str, selected_language: str) -> list[Document]: | |
| video_id = YoutubeLoader.extract_video_id(url) | |
| should_translate_with_llm = False | |
| try: | |
| transcript, transcript_language_label = _resolve_transcript(video_id, selected_language) | |
| except ValueError: | |
| if selected_language == "Original": | |
| raise | |
| transcript, transcript_language_label = _resolve_transcript(video_id, "Original") | |
| should_translate_with_llm = True | |
| try: | |
| fetched_transcript = transcript.fetch() | |
| except Exception as exc: | |
| raise RuntimeError(_format_youtube_transcript_error(exc)) from exc | |
| transcript_text = " ".join(snippet.text.strip() for snippet in fetched_transcript if snippet.text.strip()) | |
| if not transcript_text: | |
| raise ValueError("No transcript text could be extracted from this video.") | |
| docs = [ | |
| Document( | |
| page_content=transcript_text, | |
| metadata={ | |
| "source": url, | |
| "video_id": video_id, | |
| "language": fetched_transcript.language, | |
| "language_code": fetched_transcript.language_code, | |
| "is_generated": fetched_transcript.is_generated, | |
| "transcript_language_label": transcript_language_label, | |
| }, | |
| ) | |
| ] | |
| if should_translate_with_llm: | |
| docs = _translate_documents_with_llm(docs, selected_language) | |
| for doc in docs: | |
| doc.metadata["transcript_language_label"] = f"{selected_language} (LLM translated)" | |
| return docs | |
| def _make_transcript_filename(url: str) -> str: | |
| video_id = YoutubeLoader.extract_video_id(url) | |
| return f"youtube_transcript_{video_id}.txt" | |
| def _reset_youtube_transcript_state() -> None: | |
| st.session_state.youtube_transcript_text = "" | |
| st.session_state.youtube_transcript_name = "youtube_transcript.txt" | |
| st.session_state.youtube_transcript_source_url = "" | |
| st.session_state.youtube_transcript_language_label = "Original" | |
| st.session_state.youtube_transcript_source_mode = "" | |
| def _store_youtube_transcript(url: str, docs: list[Document]) -> None: | |
| st.session_state.youtube_transcript_text = "\n\n".join( | |
| doc.page_content for doc in docs if doc.page_content.strip() | |
| ) | |
| st.session_state.youtube_transcript_name = _make_transcript_filename(url) | |
| st.session_state.youtube_transcript_source_url = url | |
| st.session_state.youtube_transcript_language_label = docs[0].metadata.get( | |
| "transcript_language_label", | |
| docs[0].metadata.get("language", "Original"), | |
| ) | |
| st.session_state.youtube_transcript_source_mode = docs[0].metadata.get( | |
| "transcript_source_mode", | |
| "Direct transcript", | |
| ) | |
| def _normalize_transcript_text(raw_text: str) -> str: | |
| lines = [line.strip() for line in raw_text.splitlines()] | |
| return "\n".join(line for line in lines if line) | |
| def _read_uploaded_text_file(uploaded_file) -> str: | |
| return uploaded_file.getvalue().decode("utf-8", errors="ignore").strip() | |
| def _build_transcript_documents( | |
| url: str, | |
| transcript_text: str, | |
| language_label: str, | |
| source_mode: str, | |
| ) -> list[Document]: | |
| normalized_text = _normalize_transcript_text(transcript_text) | |
| if not normalized_text: | |
| raise ValueError("Transcript text is empty.") | |
| return [ | |
| Document( | |
| page_content=normalized_text, | |
| metadata={ | |
| "source": url, | |
| "video_id": YoutubeLoader.extract_video_id(url), | |
| "transcript_language_label": language_label, | |
| "transcript_source_mode": source_mode, | |
| }, | |
| ) | |
| ] | |
| def _load_manual_transcript_documents( | |
| url: str, | |
| selected_language: str, | |
| transcript_text: str, | |
| transcript_file, | |
| ) -> list[Document]: | |
| combined_parts = [] | |
| if transcript_text.strip(): | |
| combined_parts.append(transcript_text.strip()) | |
| if transcript_file is not None: | |
| combined_parts.append(_read_uploaded_text_file(transcript_file)) | |
| combined_text = "\n\n".join(part for part in combined_parts if part.strip()) | |
| if not combined_text.strip(): | |
| raise ValueError("Please paste a transcript or upload a transcript file.") | |
| docs = _build_transcript_documents( | |
| url, | |
| combined_text, | |
| "Original", | |
| "Manual transcript", | |
| ) | |
| if selected_language != "Original": | |
| docs = _translate_documents_with_llm(docs, selected_language) | |
| for doc in docs: | |
| doc.metadata["transcript_language_label"] = f"{selected_language} (LLM translated)" | |
| return docs | |
| def _extract_transcript_text_from_payload(payload) -> str: | |
| if isinstance(payload, str): | |
| return payload.strip() | |
| if isinstance(payload, list): | |
| text_parts = [] | |
| for item in payload: | |
| extracted = _extract_transcript_text_from_payload(item) | |
| if extracted: | |
| text_parts.append(extracted) | |
| return "\n".join(part for part in text_parts if part) | |
| if isinstance(payload, dict): | |
| for key in ("text", "transcript", "content", "full_text", "body"): | |
| value = payload.get(key) | |
| if isinstance(value, str) and value.strip(): | |
| return value.strip() | |
| for key in ("data", "result", "results", "transcription", "response"): | |
| if key in payload: | |
| extracted = _extract_transcript_text_from_payload(payload[key]) | |
| if extracted: | |
| return extracted | |
| for key in ("segments", "items", "captions", "chunks", "utterances"): | |
| value = payload.get(key) | |
| if isinstance(value, list): | |
| extracted = _extract_transcript_text_from_payload(value) | |
| if extracted: | |
| return extracted | |
| return "" | |
| def _load_youtube_documents_via_external_api(url: str, selected_language: str) -> list[Document]: | |
| api_url = os.getenv("YOUTUBE_TRANSCRIPT_API_URL", "").strip() | |
| if not api_url: | |
| raise ValueError( | |
| "External transcript API is not configured. Set `YOUTUBE_TRANSCRIPT_API_URL` in Space secrets." | |
| ) | |
| video_id = YoutubeLoader.extract_video_id(url) | |
| language_code = LANGUAGE_CODE_MAP.get(selected_language, "") | |
| formatted_url = api_url.format( | |
| video_id=video_id, | |
| url=quote_plus(url), | |
| language_code=language_code, | |
| ) | |
| method = os.getenv("YOUTUBE_TRANSCRIPT_API_METHOD", "GET").strip().upper() | |
| timeout_seconds = int(os.getenv("YOUTUBE_TRANSCRIPT_API_TIMEOUT", "45")) | |
| api_key = os.getenv("YOUTUBE_TRANSCRIPT_API_KEY", "").strip() | |
| api_key_header = os.getenv("YOUTUBE_TRANSCRIPT_API_KEY_HEADER", "Authorization").strip() | |
| headers = {"Accept": "application/json"} | |
| if api_key: | |
| if api_key_header.lower() == "authorization": | |
| headers[api_key_header] = f"Bearer {api_key}" | |
| else: | |
| headers[api_key_header] = api_key | |
| payload = { | |
| "video_id": video_id, | |
| "url": url, | |
| "language": language_code or None, | |
| } | |
| if method == "POST": | |
| response = requests.post(formatted_url, json=payload, headers=headers, timeout=timeout_seconds) | |
| else: | |
| response = requests.get(formatted_url, params=payload, headers=headers, timeout=timeout_seconds) | |
| response.raise_for_status() | |
| try: | |
| parsed_payload = response.json() | |
| except ValueError: | |
| parsed_payload = response.text | |
| transcript_text = _extract_transcript_text_from_payload(parsed_payload) | |
| if not transcript_text: | |
| raise ValueError("External transcript API response did not contain usable transcript text.") | |
| docs = _build_transcript_documents( | |
| url, | |
| transcript_text, | |
| selected_language if selected_language != "Original" else "Original", | |
| "External transcript API", | |
| ) | |
| if selected_language != "Original": | |
| for doc in docs: | |
| doc.metadata["transcript_language_label"] = selected_language | |
| return docs | |
| def _download_youtube_audio(url: str, video_id: str) -> str: | |
| try: | |
| import yt_dlp | |
| except ImportError as exc: | |
| raise RuntimeError("`yt-dlp` is not installed in this Space build.") from exc | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| output_template = os.path.join(temp_dir, f"{video_id}.%(ext)s") | |
| ydl_opts = { | |
| "format": "bestaudio[ext=m4a]/bestaudio[ext=webm]/bestaudio/best", | |
| "outtmpl": output_template, | |
| "quiet": True, | |
| "no_warnings": True, | |
| "noprogress": True, | |
| "skip_download": False, | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.extract_info(url, download=True) | |
| audio_files = [ | |
| os.path.join(temp_dir, file_name) | |
| for file_name in os.listdir(temp_dir) | |
| if os.path.splitext(file_name)[1].lower() in YOUTUBE_AUDIO_EXTENSIONS | |
| ] | |
| if not audio_files: | |
| raise RuntimeError("yt-dlp did not produce a supported audio file for transcription.") | |
| source_path = max(audio_files, key=os.path.getsize) | |
| persisted_path = os.path.join(tempfile.gettempdir(), os.path.basename(source_path)) | |
| with open(source_path, "rb") as source_file, open(persisted_path, "wb") as target_file: | |
| target_file.write(source_file.read()) | |
| return persisted_path | |
| def _transcribe_audio_with_groq(audio_path: str, selected_language: str) -> str: | |
| if not groq_api_key.strip(): | |
| raise ValueError("`GROQ_API_KEY` is required for audio transcription fallback.") | |
| model_name = os.getenv("GROQ_AUDIO_TRANSCRIPTION_MODEL", "whisper-large-v3-turbo") | |
| payload = { | |
| "model": model_name, | |
| "response_format": "json", | |
| "temperature": "0", | |
| } | |
| if selected_language != "Original": | |
| payload["language"] = LANGUAGE_CODE_MAP[selected_language] | |
| with open(audio_path, "rb") as audio_file: | |
| response = requests.post( | |
| "https://api.groq.com/openai/v1/audio/transcriptions", | |
| headers={"Authorization": f"Bearer {groq_api_key}"}, | |
| data=payload, | |
| files={"file": (os.path.basename(audio_path), audio_file)}, | |
| timeout=300, | |
| ) | |
| response.raise_for_status() | |
| transcript_text = response.json().get("text", "").strip() | |
| if not transcript_text: | |
| raise ValueError("Groq audio transcription returned empty text.") | |
| return transcript_text | |
| def _load_youtube_documents_via_audio_transcription(url: str, selected_language: str) -> list[Document]: | |
| video_id = YoutubeLoader.extract_video_id(url) | |
| audio_path = _download_youtube_audio(url, video_id) | |
| try: | |
| transcript_text = _transcribe_audio_with_groq(audio_path, selected_language) | |
| finally: | |
| if os.path.exists(audio_path): | |
| os.remove(audio_path) | |
| return _build_transcript_documents( | |
| url, | |
| transcript_text, | |
| selected_language if selected_language != "Original" else "Original", | |
| "Audio transcription (yt-dlp + Groq)", | |
| ) | |
| def _load_youtube_documents_with_fallbacks( | |
| url: str, | |
| selected_language: str, | |
| source_mode: str, | |
| transcript_text: str, | |
| transcript_file, | |
| ) -> list[Document]: | |
| if source_mode == "Manual transcript": | |
| return _load_manual_transcript_documents(url, selected_language, transcript_text, transcript_file) | |
| strategies = [] | |
| if source_mode in {"Auto", "Direct transcript"}: | |
| strategies.append(("Direct transcript", lambda: _load_youtube_documents(url, selected_language))) | |
| if source_mode in {"Auto", "External transcript API"}: | |
| strategies.append( | |
| ("External transcript API", lambda: _load_youtube_documents_via_external_api(url, selected_language)) | |
| ) | |
| if source_mode in {"Auto", "Audio transcription (yt-dlp + Groq)"}: | |
| strategies.append( | |
| ( | |
| "Audio transcription (yt-dlp + Groq)", | |
| lambda: _load_youtube_documents_via_audio_transcription(url, selected_language), | |
| ) | |
| ) | |
| failures = [] | |
| for strategy_name, loader in strategies: | |
| try: | |
| return loader() | |
| except Exception as exc: | |
| failures.append(f"{strategy_name}: {exc}") | |
| if source_mode == "Auto" and (transcript_text.strip() or transcript_file is not None): | |
| return _load_manual_transcript_documents(url, selected_language, transcript_text, transcript_file) | |
| if not failures: | |
| raise ValueError("No YouTube transcript strategy is available for the selected mode.") | |
| raise RuntimeError("All YouTube transcript strategies failed.\n" + "\n".join(failures)) | |
| def _has_meaningful_content(docs: list[Document], min_chars: int = 300) -> bool: | |
| combined_text = " ".join(doc.page_content.strip() for doc in docs if doc.page_content.strip()) | |
| return len(combined_text) >= min_chars | |
| def _extract_text_from_html(html: str) -> str: | |
| soup = BeautifulSoup(html, "html.parser") | |
| for tag in soup(["script", "style", "noscript", "svg"]): | |
| tag.decompose() | |
| meta_description = "" | |
| meta_tag = soup.find("meta", attrs={"name": "description"}) | |
| if meta_tag and meta_tag.get("content"): | |
| meta_description = meta_tag["content"].strip() | |
| main_candidates = soup.select("main, article, [role='main'], .content, .article-body") | |
| text_parts = [] | |
| for candidate in main_candidates: | |
| candidate_text = " ".join(candidate.stripped_strings) | |
| if len(candidate_text) > 200: | |
| text_parts.append(candidate_text) | |
| if not text_parts: | |
| body_text = " ".join(soup.stripped_strings) | |
| if body_text: | |
| text_parts.append(body_text) | |
| if meta_description: | |
| text_parts.insert(0, meta_description) | |
| return "\n\n".join(dict.fromkeys(part for part in text_parts if part)) | |
| def _load_web_documents(url: str) -> list[Document]: | |
| try: | |
| loader = UnstructuredURLLoader( | |
| urls=[url], | |
| ssl_verify=False, | |
| headers=REQUEST_HEADERS, | |
| ) | |
| docs = loader.load() | |
| if _has_meaningful_content(docs): | |
| return docs | |
| except Exception as loader_error: | |
| last_error = loader_error | |
| else: | |
| last_error = ValueError("Primary URL loader returned too little readable content.") | |
| session = requests.Session() | |
| for candidate_url in [url, url.rstrip("/")]: | |
| if not candidate_url: | |
| continue | |
| try: | |
| response = session.get( | |
| candidate_url, | |
| headers=REQUEST_HEADERS, | |
| timeout=20, | |
| verify=False, | |
| allow_redirects=True, | |
| ) | |
| response.encoding = response.encoding or response.apparent_encoding or "utf-8" | |
| if not response.text.strip(): | |
| continue | |
| text = _extract_text_from_html(response.text) | |
| if not text or len(text) < 300: | |
| continue | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| title = soup.title.string.strip() if soup.title and soup.title.string else candidate_url | |
| st.info("Primary URL loader failed or returned too little content. Used HTML fallback extraction instead.") | |
| return [ | |
| Document( | |
| page_content=text, | |
| metadata={ | |
| "source": candidate_url, | |
| "title": title, | |
| "http_status": response.status_code, | |
| }, | |
| ) | |
| ] | |
| except RequestException as request_error: | |
| last_error = request_error | |
| raise ValueError( | |
| f"Could not load readable text from the URL. Last loader error: {last_error}" | |
| ) | |
| def _load_uploaded_documents(files) -> list[Document]: | |
| docs: list[Document] = [] | |
| for uploaded_file in files: | |
| file_name = uploaded_file.name | |
| extension = os.path.splitext(file_name)[1].lower() | |
| file_bytes = uploaded_file.getvalue() | |
| if extension == ".pdf": | |
| reader = PdfReader(BytesIO(file_bytes)) | |
| pages = [] | |
| for page_number, page in enumerate(reader.pages, start=1): | |
| page_text = (page.extract_text() or "").strip() | |
| if page_text: | |
| pages.append( | |
| Document( | |
| page_content=page_text, | |
| metadata={ | |
| "source": file_name, | |
| "page": page_number, | |
| "type": "uploaded_file", | |
| }, | |
| ) | |
| ) | |
| docs.extend(pages) | |
| continue | |
| if extension in {".txt", ".md", ".csv"}: | |
| text = file_bytes.decode("utf-8", errors="ignore").strip() | |
| if text: | |
| docs.append( | |
| Document( | |
| page_content=text, | |
| metadata={"source": file_name, "type": "uploaded_file"}, | |
| ) | |
| ) | |
| continue | |
| if extension == ".docx": | |
| with ZipFile(BytesIO(file_bytes)) as docx_zip: | |
| document_xml = docx_zip.read("word/document.xml") | |
| root = ET.fromstring(document_xml) | |
| namespace = {"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main"} | |
| paragraphs = [] | |
| for paragraph in root.findall(".//w:p", namespace): | |
| texts = [ | |
| node.text | |
| for node in paragraph.findall(".//w:t", namespace) | |
| if node.text | |
| ] | |
| paragraph_text = "".join(texts).strip() | |
| if paragraph_text: | |
| paragraphs.append(paragraph_text) | |
| text = "\n\n".join(paragraphs).strip() | |
| if text: | |
| docs.append( | |
| Document( | |
| page_content=text, | |
| metadata={"source": file_name, "type": "uploaded_file"}, | |
| ) | |
| ) | |
| continue | |
| raise ValueError(f"Unsupported file type: {file_name}") | |
| return docs | |
| def _build_chain(selected_chain_type: str): | |
| prompts = _get_summary_prompts(summary_word_limit, summary_language) | |
| if selected_chain_type == "stuff": | |
| return load_summarize_chain(llm, chain_type="stuff", prompt=prompts["stuff"]) | |
| if selected_chain_type == "map_reduce": | |
| return load_summarize_chain( | |
| llm, | |
| chain_type="map_reduce", | |
| map_prompt=prompts["map"], | |
| combine_prompt=prompts["combine"], | |
| ) | |
| return load_summarize_chain( | |
| llm, | |
| chain_type="refine", | |
| question_prompt=prompts["refine_question"], | |
| refine_prompt=prompts["refine"], | |
| ) | |
| def _prepare_summary_documents(docs: list[Document], selected_chain_type: str) -> list[Document]: | |
| splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=200) | |
| split_docs = splitter.split_documents(docs) | |
| if selected_chain_type == "stuff": | |
| return split_docs[:3] | |
| if selected_chain_type == "refine": | |
| return split_docs[:10] | |
| return split_docs[:8] | |
| def _choose_effective_chain_type(requested_chain_type: str, docs: list[Document]) -> tuple[str, str | None]: | |
| splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=200) | |
| split_docs = splitter.split_documents(docs) | |
| chunk_count = len(split_docs) | |
| total_chars = sum(len(doc.page_content) for doc in split_docs) | |
| if chunk_count <= 3 and total_chars <= 6000: | |
| recommended = "stuff" | |
| elif chunk_count <= 10: | |
| recommended = "refine" | |
| else: | |
| recommended = "map_reduce" | |
| if requested_chain_type == "auto": | |
| return recommended, f"Auto-selected `{recommended}` based on content size." | |
| if requested_chain_type == "stuff" and recommended != "stuff": | |
| return recommended, f"Switched from `stuff` to `{recommended}` because the content is too large for a reliable single-pass summary." | |
| if requested_chain_type == "refine" and chunk_count > 12: | |
| return "map_reduce", "Switched from `refine` to `map_reduce` because the content is large enough that map-reduce is more reliable." | |
| return requested_chain_type, None | |
| if input_source_mode in {"URL", "Both"} and _is_youtube_url(generic_url): | |
| st.video(generic_url) | |
| transcript_col, export_col = st.columns(2) | |
| with transcript_col: | |
| if st.button("Fetch transcript"): | |
| if not generic_url.strip(): | |
| st.error("Please enter a YouTube URL.") | |
| elif not validators.url(generic_url): | |
| st.error("Please enter a valid YouTube URL.") | |
| else: | |
| try: | |
| with st.spinner("Loading transcript..."): | |
| docs = _load_youtube_documents_with_fallbacks( | |
| generic_url, | |
| transcript_language, | |
| youtube_source_mode, | |
| manual_transcript_text, | |
| manual_transcript_file, | |
| ) | |
| if not docs: | |
| st.error("No transcript could be extracted from the provided YouTube video.") | |
| else: | |
| _store_youtube_transcript(generic_url, docs) | |
| st.success( | |
| "Transcript ready for export in " | |
| f"{st.session_state.youtube_transcript_language_label} " | |
| f"via {st.session_state.youtube_transcript_source_mode}." | |
| ) | |
| except Exception as transcript_err: | |
| st.error(f"Failed to load YouTube transcript: {transcript_err}") | |
| with export_col: | |
| if ( | |
| st.session_state.youtube_transcript_text | |
| and st.session_state.youtube_transcript_source_url == generic_url | |
| ): | |
| st.caption( | |
| "Prepared transcript: " | |
| f"`{st.session_state.youtube_transcript_language_label}` via " | |
| f"`{st.session_state.youtube_transcript_source_mode}`" | |
| ) | |
| st.download_button( | |
| "Export transcript", | |
| data=st.session_state.youtube_transcript_text, | |
| file_name=st.session_state.youtube_transcript_name, | |
| mime="text/plain", | |
| ) | |
| if st.button("Summarize content"): | |
| if not groq_api_key.strip(): | |
| st.error("Please provide the information to get started") | |
| elif input_source_mode == "URL" and not generic_url.strip(): | |
| st.error("Content source is `URL`, so please provide a URL.") | |
| elif input_source_mode == "Upload documents" and not uploaded_files: | |
| st.error("Content source is `Upload documents`, so please upload at least one file.") | |
| elif input_source_mode == "Both" and (not generic_url.strip() or not uploaded_files): | |
| st.error("Content source is `Both`, so please provide a URL and upload at least one file.") | |
| elif generic_url.strip() and not validators.url(generic_url): | |
| st.error("Please enter a valid URL when using the URL field.") | |
| else: | |
| try: | |
| with st.spinner("waiting ...."): | |
| docs: list[Document] = [] | |
| if input_source_mode in {"URL", "Both"} and generic_url.strip(): | |
| if _is_youtube_url(generic_url): | |
| try: | |
| url_docs = _load_youtube_documents_with_fallbacks( | |
| generic_url, | |
| transcript_language, | |
| youtube_source_mode, | |
| manual_transcript_text, | |
| manual_transcript_file, | |
| ) | |
| _store_youtube_transcript(generic_url, url_docs) | |
| except Exception as load_err: | |
| st.error(f"Failed to load YouTube transcript: {load_err}") | |
| st.stop() | |
| else: | |
| _reset_youtube_transcript_state() | |
| try: | |
| url_docs = _load_web_documents(generic_url) | |
| except Exception as load_err: | |
| st.error(f"Failed to fetch URL content: {load_err}") | |
| st.stop() | |
| docs.extend(url_docs) | |
| else: | |
| _reset_youtube_transcript_state() | |
| if input_source_mode in {"Upload documents", "Both"} and uploaded_files: | |
| try: | |
| uploaded_docs = _load_uploaded_documents(uploaded_files) | |
| except Exception as load_err: | |
| st.error(f"Failed to read uploaded document(s): {load_err}") | |
| st.stop() | |
| docs.extend(uploaded_docs) | |
| if input_source_mode == "Both" and generic_url.strip() and uploaded_files: | |
| st.info("Summarizing combined content from the URL and uploaded documents.") | |
| if not docs: | |
| st.error("No content could be extracted from the selected source.") | |
| st.stop() | |
| effective_chain_type, chain_message = _choose_effective_chain_type( | |
| selected_chain_type, | |
| docs, | |
| ) | |
| if chain_message: | |
| st.info(chain_message) | |
| docs_for_summary = _prepare_summary_documents(docs, effective_chain_type) | |
| chain = _build_chain(effective_chain_type) | |
| output_summary = _extract_summary_text( | |
| chain.invoke({"input_documents": docs_for_summary}) | |
| ) | |
| st.success(output_summary) | |
| except Exception as e: | |
| st.error(f"Summarization failed: {e}") | |