import io import time import urllib.request import requests from requests.adapters import HTTPAdapter from pypdf import PdfReader from urllib3.util import Retry def fetch_pdf_text( pdf_url: str, timeout: int = 60, max_retries: int = 3, backoff_sec: float = 1.5, ) -> str: last_exc: Exception | None = None connect_timeout = min(max(int(timeout / 3), 10), 30) read_timeout = max(timeout, 60) session = requests.Session() retry_cfg = Retry( total=max_retries, connect=max_retries, read=max_retries, backoff_factor=backoff_sec, status_forcelist=(429, 500, 502, 503, 504), allowed_methods=frozenset(["GET", "HEAD"]), ) adapter = HTTPAdapter(max_retries=retry_cfg) session.mount("http://", adapter) session.mount("https://", adapter) headers = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ), "Accept": "application/pdf,*/*;q=0.8", } for attempt in range(max_retries): try: response = session.get( pdf_url, headers=headers, timeout=(connect_timeout, read_timeout), ) response.raise_for_status() pdf_stream = io.BytesIO(response.content) reader = PdfReader(pdf_stream) extracted = [] for page in reader.pages: text = page.extract_text() or "" if text.strip(): extracted.append(text) return "\n\n".join(extracted).strip() except Exception as exc: last_exc = exc # Fallback path: some hosts behave better with urllib defaults. try: req = urllib.request.Request( pdf_url, headers={"User-Agent": headers["User-Agent"]}, ) with urllib.request.urlopen(req, timeout=read_timeout) as resp: content = resp.read() pdf_stream = io.BytesIO(content) reader = PdfReader(pdf_stream) extracted = [] for page in reader.pages: text = page.extract_text() or "" if text.strip(): extracted.append(text) return "\n\n".join(extracted).strip() except Exception as fallback_exc: last_exc = fallback_exc if attempt < max_retries - 1: sleep_sec = backoff_sec * (2 ** attempt) time.sleep(sleep_sec) raise RuntimeError(f"Failed to fetch PDF after {max_retries} attempts: {last_exc}") def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 150) -> list[str]: if not text.strip(): return [] clean_text = " ".join(text.split()) chunks = [] start = 0 step = max(chunk_size - overlap, 1) while start < len(clean_text): end = min(start + chunk_size, len(clean_text)) chunks.append(clean_text[start:end]) if end >= len(clean_text): break start += step return chunks