from typing import Dict, List, Optional, Tuple from dataclasses import dataclass import os from pptx import Presentation from pptx.shapes.base import BaseShape from pptx.enum.shapes import MSO_SHAPE_TYPE from docx import Document import PyPDF2 import re from pathlib import Path import logging class FileProcessor: """ Klasa odpowiedzialna za konwersję różnych formatów plików do ustrukturyzowanego tekstu. """ def __init__(self, config: Optional[Dict] = None): self.config = config or {} self.logger = logging.getLogger(__name__) async def convert_to_text(self, file_path: str) -> str: """ Konwertuje plik do tekstu. Args: file_path: Ścieżka do pliku Returns: str: Wyekstrahowany tekst """ file_ext = Path(file_path).suffix.lower() try: if file_ext in ['.ppt', '.pptx']: text = self._convert_presentation(file_path) elif file_ext in ['.doc', '.docx']: text = self._convert_word(file_path) elif file_ext == '.pdf': text = self._convert_pdf(file_path) elif file_ext == '.txt': text = self._read_text_file(file_path) else: raise ValueError(f"Nieobsługiwany format pliku: {file_ext}") return text except Exception as e: self.logger.error(f"Błąd podczas konwersji pliku {file_path}: {str(e)}") raise def _convert_presentation(self, file_path: str) -> str: """Konwertuje prezentację PPT/PPTX do tekstu""" presentation = Presentation(file_path) text_parts = [] for i, slide in enumerate(presentation.slides, 1): text_parts.append(f"\n=== Slajd {i} ===\n") for shape in slide.shapes: if hasattr(shape, 'text') and shape.text.strip(): text_parts.append(shape.text.strip()) if slide.has_notes_slide and slide.notes_slide: notes = slide.notes_slide.notes_text_frame.text.strip() if notes: text_parts.append(f"\n[Notatki: {notes}]\n") return '\n'.join(text_parts) def _convert_word(self, file_path: str) -> str: """Konwertuje dokument Word do tekstu""" doc = Document(file_path) text_parts = [] for paragraph in doc.paragraphs: if paragraph.text.strip(): text_parts.append(paragraph.text) for table in doc.tables: for row in table.rows: row_texts = [] for cell in row.cells: if cell.text.strip(): row_texts.append(cell.text.strip()) if row_texts: text_parts.append(' | '.join(row_texts)) return '\n'.join(text_parts) def _convert_pdf(self, file_path: str) -> str: """Konwertuje PDF do tekstu""" with open(file_path, 'rb') as file: reader = PyPDF2.PdfReader(file) text_parts = [] for i, page in enumerate(reader.pages): text = page.extract_text() if text.strip(): text_parts.append(f"\n=== Strona {i + 1} ===\n") text_parts.append(text) return '\n'.join(text_parts) def _read_text_file(self, file_path: str) -> str: """Czyta plik tekstowy""" with open(file_path, 'r', encoding='utf-8') as file: return file.read()