from __future__ import annotations import hashlib import re from dataclasses import dataclass from io import BytesIO from pathlib import Path try: from pypdf import PdfReader except ImportError: PdfReader = None from .catalog import CHAPTER_CATALOG, clean_title, parse_chapter_number, pdf_metadata @dataclass class ChunkRecord: chunk_id: str text: str chapter_number: int chapter_name: str topic: str page_number: int source_file: str SUPPORTED_UPLOAD_EXTENSIONS = {".pdf", ".txt", ".md"} def get_pdf_reader(source: str | Path | BytesIO) -> object: if PdfReader is None: raise RuntimeError( "PDF support needs the `pypdf` package. Run `python3 -m pip install -r requirements.txt`." ) return PdfReader(source) def discover_pdfs(root_dir: Path) -> list[Path]: return sorted(path for path in root_dir.glob("*.pdf") if path.is_file()) def clean_text(text: str) -> str: text = text.replace("\x00", " ") text = re.sub(r"-\s*\n", "", text) text = re.sub(r"\s*\n\s*", "\n", text) text = re.sub(r"[ \t]+", " ", text) return text.strip() def candidate_topic(text: str, fallback: str) -> str: for line in text.splitlines(): line = re.sub(r"\s+", " ", line).strip(" .:-") line = re.sub(r"\d+$", "", line).strip(" .:-") if not line: continue if len(line) > 80: continue if re.fullmatch(r"[0-9. ]+", line): continue if line.lower().startswith("mathematics"): continue if any(char.isalpha() for char in line): return line.title() return fallback def split_text(text: str, chunk_size: int, chunk_overlap: int) -> list[str]: if len(text) <= chunk_size: return [text] chunks: list[str] = [] start = 0 while start < len(text): end = min(start + chunk_size, len(text)) chunk = text[start:end] if end < len(text): split_at = chunk.rfind("\n") if split_at > chunk_size // 2: chunk = chunk[:split_at] end = start + split_at chunks.append(chunk.strip()) if end == len(text): break start = max(end - chunk_overlap, 0) return [chunk for chunk in chunks if chunk] def build_chunk_records( page_texts: list[tuple[int, str]], *, source_key: str, chapter_number: int, chapter_name: str, source_file: str, chunk_size: int, chunk_overlap: int, ) -> list[ChunkRecord]: all_chunks: list[ChunkRecord] = [] for page_number, raw_text in page_texts: page_text = clean_text(raw_text) if not page_text: continue page_chunks = split_text(page_text, chunk_size=chunk_size, chunk_overlap=chunk_overlap) for chunk_index, chunk_text in enumerate(page_chunks, start=1): topic = candidate_topic(chunk_text, fallback=chapter_name) all_chunks.append( ChunkRecord( chunk_id=f"{source_key}-p{page_number}-c{chunk_index}", text=chunk_text, chapter_number=chapter_number, chapter_name=chapter_name, topic=topic, page_number=page_number, source_file=source_file, ) ) return all_chunks def extract_chunks_from_pdf( file_path: Path, chunk_size: int, chunk_overlap: int, ) -> list[ChunkRecord]: reader = get_pdf_reader(str(file_path)) meta = pdf_metadata(file_path) page_texts = [ (page_index, page.extract_text() or "") for page_index, page in enumerate(reader.pages, start=1) ] return build_chunk_records( page_texts, source_key=file_path.stem, chapter_number=int(meta["chapter_number"]), chapter_name=str(meta["chapter_name"]), source_file=str(meta["source_file"]), chunk_size=chunk_size, chunk_overlap=chunk_overlap, ) def uploaded_file_metadata(file_name: str) -> dict[str, str | int]: path = Path(file_name) chapter_number = parse_chapter_number(path) if chapter_number is None: return { "chapter_number": -1, "chapter_name": clean_title(path.stem), "source_file": path.name, } return { "chapter_number": chapter_number, "chapter_name": CHAPTER_CATALOG.get(chapter_number, clean_title(path.stem)), "source_file": path.name, } def extract_chunks_from_pdf_bytes( file_name: str, file_bytes: bytes, chunk_size: int, chunk_overlap: int, ) -> list[ChunkRecord]: meta = uploaded_file_metadata(file_name) reader = get_pdf_reader(BytesIO(file_bytes)) source_hash = hashlib.sha1(file_bytes).hexdigest()[:12] page_texts = [ (page_index, page.extract_text() or "") for page_index, page in enumerate(reader.pages, start=1) ] return build_chunk_records( page_texts, source_key=f"upload-{source_hash}", chapter_number=int(meta["chapter_number"]), chapter_name=str(meta["chapter_name"]), source_file=str(meta["source_file"]), chunk_size=chunk_size, chunk_overlap=chunk_overlap, ) def extract_chunks_from_text_bytes( file_name: str, file_bytes: bytes, chunk_size: int, chunk_overlap: int, ) -> list[ChunkRecord]: meta = uploaded_file_metadata(file_name) source_hash = hashlib.sha1(file_bytes).hexdigest()[:12] text = file_bytes.decode("utf-8", errors="ignore") return build_chunk_records( [(1, text)], source_key=f"upload-{source_hash}", chapter_number=int(meta["chapter_number"]), chapter_name=str(meta["chapter_name"]), source_file=str(meta["source_file"]), chunk_size=chunk_size, chunk_overlap=chunk_overlap, ) def extract_chunks_from_uploaded_file( file_name: str, file_bytes: bytes, chunk_size: int, chunk_overlap: int, ) -> list[ChunkRecord]: extension = Path(file_name).suffix.lower() if extension == ".pdf": return extract_chunks_from_pdf_bytes( file_name=file_name, file_bytes=file_bytes, chunk_size=chunk_size, chunk_overlap=chunk_overlap, ) if extension in {".txt", ".md"}: return extract_chunks_from_text_bytes( file_name=file_name, file_bytes=file_bytes, chunk_size=chunk_size, chunk_overlap=chunk_overlap, ) supported = ", ".join(sorted(SUPPORTED_UPLOAD_EXTENSIONS)) raise ValueError(f"Unsupported file type for {file_name}. Use one of: {supported}.")