File size: 2,569 Bytes
50231a8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2d0ef3b
50231a8
 
2d0ef3b
 
50231a8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2d0ef3b
50231a8
 
 
 
 
2d0ef3b
50231a8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
from pathlib import Path

import docx2txt
from openpyxl import load_workbook
from PIL import Image
from pypdf import PdfReader
import pytesseract

from app.core.exceptions import ExtractionError


DOC_EXTENSIONS = {".pdf", ".docx", ".xlsx"}
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff"}
TEXT_EXTENSIONS = {".txt", ".md", ".csv", ".json"}


class ExtractionService:
    @staticmethod
    def _extract_pdf(file_path: Path, first_page_only: bool = False) -> str:
        reader = PdfReader(str(file_path))
        chunks: list[str] = []
        pages = reader.pages[:1] if first_page_only else reader.pages
        for page in pages:
            text = page.extract_text() or ""
            if text.strip():
                chunks.append(text)
        return "\n".join(chunks)

    @staticmethod
    def _extract_docx(file_path: Path) -> str:
        return docx2txt.process(str(file_path))

    @staticmethod
    def _extract_xlsx(file_path: Path) -> str:
        workbook = load_workbook(filename=str(file_path), read_only=True, data_only=True)
        chunks: list[str] = []
        for sheet in workbook.worksheets:
            for row in sheet.iter_rows(values_only=True):
                row_values = [str(value).strip() for value in row if value is not None and str(value).strip()]
                if row_values:
                    chunks.append(" ".join(row_values))
        workbook.close()
        return "\n".join(chunks)

    def extract_text(self, file_name: str, file_path: Path, pdf_first_page_only: bool = False) -> str:
        extension = Path(file_name).suffix.lower()

        try:
            if extension in DOC_EXTENSIONS:
                if extension == ".pdf":
                    return self._extract_pdf(file_path, first_page_only=pdf_first_page_only)
                if extension == ".docx":
                    return self._extract_docx(file_path)
                if extension == ".xlsx":
                    return self._extract_xlsx(file_path)

            if extension in IMAGE_EXTENSIONS:
                image = Image.open(file_path)
                return pytesseract.image_to_string(image)

            if extension in TEXT_EXTENSIONS:
                return file_path.read_text(encoding="utf-8", errors="ignore")

            raise ExtractionError(f"Unsupported file extension: {extension or 'unknown'}")
        except ExtractionError:
            raise
        except Exception as exc:
            raise ExtractionError("Failed to extract text from file") from exc


extraction_service = ExtractionService()