File size: 2,265 Bytes
cc67867
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4d03adc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from __future__ import annotations

import io
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import List

from langchain_core.documents import Document

logger = logging.getLogger(__name__)


@dataclass
class IngestedFile:
    filename: str
    file_type: str
    page_count: int
    char_count: int
    documents: List[Document] = field(default_factory=list)


class DocumentProcessor:
    SUPPORTED_TYPES = {"pdf", "txt"}

    def ingest(self, file_bytes: bytes, filename: str) -> IngestedFile:
        ext = Path(filename).suffix.lstrip(".").lower()
        if ext not in self.SUPPORTED_TYPES:
            raise ValueError(f"Unsupported file type '.{ext}'. Supported: {self.SUPPORTED_TYPES}")

        docs = self._parse_pdf(file_bytes, filename) if ext == "pdf" else self._parse_txt(file_bytes, filename)

        if not docs:
            raise ValueError(f"No text could be extracted from '{filename}'.")

        total_chars = sum(len(d.page_content) for d in docs)
        return IngestedFile(filename=filename, file_type=ext, page_count=len(docs), char_count=total_chars, documents=docs)

    @staticmethod
    def _parse_pdf(file_bytes: bytes, filename: str) -> List[Document]:
        from pypdf import PdfReader
        reader = PdfReader(io.BytesIO(file_bytes))
        docs = []
        for page_num, page in enumerate(reader.pages, start=1):
            text = (page.extract_text() or "").strip()
            if text:
                docs.append(Document(
                    page_content=text,
                    metadata={"source": filename, "page": page_num, "file_type": "pdf"},
                ))
        return docs

    @staticmethod
    def _parse_txt(file_bytes: bytes, filename: str) -> List[Document]:
        for encoding in ("utf-8", "latin-1"):
            try:
                text = file_bytes.decode(encoding).strip()
                break
            except UnicodeDecodeError:
                continue
        else:
            raise ValueError(f"Could not decode '{filename}' as UTF-8 or latin-1.")

        if not text:
            return []
        return [Document(
            page_content=text,
            metadata={"source": filename, "page": 0, "file_type": "txt"},
        )]