Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import hashlib | |
| import re | |
| from dataclasses import dataclass | |
| from io import BytesIO | |
| from pathlib import Path | |
| try: | |
| from pypdf import PdfReader | |
| except ImportError: | |
| PdfReader = None | |
| from .catalog import CHAPTER_CATALOG, clean_title, parse_chapter_number, pdf_metadata | |
| class ChunkRecord: | |
| chunk_id: str | |
| text: str | |
| chapter_number: int | |
| chapter_name: str | |
| topic: str | |
| page_number: int | |
| source_file: str | |
| SUPPORTED_UPLOAD_EXTENSIONS = {".pdf", ".txt", ".md"} | |
| def get_pdf_reader(source: str | Path | BytesIO) -> object: | |
| if PdfReader is None: | |
| raise RuntimeError( | |
| "PDF support needs the `pypdf` package. Run `python3 -m pip install -r requirements.txt`." | |
| ) | |
| return PdfReader(source) | |
| def discover_pdfs(root_dir: Path) -> list[Path]: | |
| return sorted(path for path in root_dir.glob("*.pdf") if path.is_file()) | |
| def clean_text(text: str) -> str: | |
| text = text.replace("\x00", " ") | |
| text = re.sub(r"-\s*\n", "", text) | |
| text = re.sub(r"\s*\n\s*", "\n", text) | |
| text = re.sub(r"[ \t]+", " ", text) | |
| return text.strip() | |
| def candidate_topic(text: str, fallback: str) -> str: | |
| for line in text.splitlines(): | |
| line = re.sub(r"\s+", " ", line).strip(" .:-") | |
| line = re.sub(r"\d+$", "", line).strip(" .:-") | |
| if not line: | |
| continue | |
| if len(line) > 80: | |
| continue | |
| if re.fullmatch(r"[0-9. ]+", line): | |
| continue | |
| if line.lower().startswith("mathematics"): | |
| continue | |
| if any(char.isalpha() for char in line): | |
| return line.title() | |
| return fallback | |
| def split_text(text: str, chunk_size: int, chunk_overlap: int) -> list[str]: | |
| if len(text) <= chunk_size: | |
| return [text] | |
| chunks: list[str] = [] | |
| start = 0 | |
| while start < len(text): | |
| end = min(start + chunk_size, len(text)) | |
| chunk = text[start:end] | |
| if end < len(text): | |
| split_at = chunk.rfind("\n") | |
| if split_at > chunk_size // 2: | |
| chunk = chunk[:split_at] | |
| end = start + split_at | |
| chunks.append(chunk.strip()) | |
| if end == len(text): | |
| break | |
| start = max(end - chunk_overlap, 0) | |
| return [chunk for chunk in chunks if chunk] | |
| def build_chunk_records( | |
| page_texts: list[tuple[int, str]], | |
| *, | |
| source_key: str, | |
| chapter_number: int, | |
| chapter_name: str, | |
| source_file: str, | |
| chunk_size: int, | |
| chunk_overlap: int, | |
| ) -> list[ChunkRecord]: | |
| all_chunks: list[ChunkRecord] = [] | |
| for page_number, raw_text in page_texts: | |
| page_text = clean_text(raw_text) | |
| if not page_text: | |
| continue | |
| page_chunks = split_text(page_text, chunk_size=chunk_size, chunk_overlap=chunk_overlap) | |
| for chunk_index, chunk_text in enumerate(page_chunks, start=1): | |
| topic = candidate_topic(chunk_text, fallback=chapter_name) | |
| all_chunks.append( | |
| ChunkRecord( | |
| chunk_id=f"{source_key}-p{page_number}-c{chunk_index}", | |
| text=chunk_text, | |
| chapter_number=chapter_number, | |
| chapter_name=chapter_name, | |
| topic=topic, | |
| page_number=page_number, | |
| source_file=source_file, | |
| ) | |
| ) | |
| return all_chunks | |
| def extract_chunks_from_pdf( | |
| file_path: Path, | |
| chunk_size: int, | |
| chunk_overlap: int, | |
| ) -> list[ChunkRecord]: | |
| reader = get_pdf_reader(str(file_path)) | |
| meta = pdf_metadata(file_path) | |
| page_texts = [ | |
| (page_index, page.extract_text() or "") | |
| for page_index, page in enumerate(reader.pages, start=1) | |
| ] | |
| return build_chunk_records( | |
| page_texts, | |
| source_key=file_path.stem, | |
| chapter_number=int(meta["chapter_number"]), | |
| chapter_name=str(meta["chapter_name"]), | |
| source_file=str(meta["source_file"]), | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap, | |
| ) | |
| def uploaded_file_metadata(file_name: str) -> dict[str, str | int]: | |
| path = Path(file_name) | |
| chapter_number = parse_chapter_number(path) | |
| if chapter_number is None: | |
| return { | |
| "chapter_number": -1, | |
| "chapter_name": clean_title(path.stem), | |
| "source_file": path.name, | |
| } | |
| return { | |
| "chapter_number": chapter_number, | |
| "chapter_name": CHAPTER_CATALOG.get(chapter_number, clean_title(path.stem)), | |
| "source_file": path.name, | |
| } | |
| def extract_chunks_from_pdf_bytes( | |
| file_name: str, | |
| file_bytes: bytes, | |
| chunk_size: int, | |
| chunk_overlap: int, | |
| ) -> list[ChunkRecord]: | |
| meta = uploaded_file_metadata(file_name) | |
| reader = get_pdf_reader(BytesIO(file_bytes)) | |
| source_hash = hashlib.sha1(file_bytes).hexdigest()[:12] | |
| page_texts = [ | |
| (page_index, page.extract_text() or "") | |
| for page_index, page in enumerate(reader.pages, start=1) | |
| ] | |
| return build_chunk_records( | |
| page_texts, | |
| source_key=f"upload-{source_hash}", | |
| chapter_number=int(meta["chapter_number"]), | |
| chapter_name=str(meta["chapter_name"]), | |
| source_file=str(meta["source_file"]), | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap, | |
| ) | |
| def extract_chunks_from_text_bytes( | |
| file_name: str, | |
| file_bytes: bytes, | |
| chunk_size: int, | |
| chunk_overlap: int, | |
| ) -> list[ChunkRecord]: | |
| meta = uploaded_file_metadata(file_name) | |
| source_hash = hashlib.sha1(file_bytes).hexdigest()[:12] | |
| text = file_bytes.decode("utf-8", errors="ignore") | |
| return build_chunk_records( | |
| [(1, text)], | |
| source_key=f"upload-{source_hash}", | |
| chapter_number=int(meta["chapter_number"]), | |
| chapter_name=str(meta["chapter_name"]), | |
| source_file=str(meta["source_file"]), | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap, | |
| ) | |
| def extract_chunks_from_uploaded_file( | |
| file_name: str, | |
| file_bytes: bytes, | |
| chunk_size: int, | |
| chunk_overlap: int, | |
| ) -> list[ChunkRecord]: | |
| extension = Path(file_name).suffix.lower() | |
| if extension == ".pdf": | |
| return extract_chunks_from_pdf_bytes( | |
| file_name=file_name, | |
| file_bytes=file_bytes, | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap, | |
| ) | |
| if extension in {".txt", ".md"}: | |
| return extract_chunks_from_text_bytes( | |
| file_name=file_name, | |
| file_bytes=file_bytes, | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap, | |
| ) | |
| supported = ", ".join(sorted(SUPPORTED_UPLOAD_EXTENSIONS)) | |
| raise ValueError(f"Unsupported file type for {file_name}. Use one of: {supported}.") | |