WhizTenderBot1.0 / file_processor.py
Marek4321's picture
Update file_processor.py
ab13f19 verified
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
import os
from pptx import Presentation
from pptx.shapes.base import BaseShape
from pptx.enum.shapes import MSO_SHAPE_TYPE
from docx import Document
import PyPDF2
import re
from pathlib import Path
import logging
class FileProcessor:
"""
Klasa odpowiedzialna za konwersj臋 r贸偶nych format贸w plik贸w do
ustrukturyzowanego tekstu.
"""
def __init__(self, config: Optional[Dict] = None):
self.config = config or {}
self.logger = logging.getLogger(__name__)
async def convert_to_text(self, file_path: str) -> str:
"""
Konwertuje plik do tekstu.
Args:
file_path: 艢cie偶ka do pliku
Returns:
str: Wyekstrahowany tekst
"""
file_ext = Path(file_path).suffix.lower()
try:
if file_ext in ['.ppt', '.pptx']:
text = self._convert_presentation(file_path)
elif file_ext in ['.doc', '.docx']:
text = self._convert_word(file_path)
elif file_ext == '.pdf':
text = self._convert_pdf(file_path)
elif file_ext == '.txt':
text = self._read_text_file(file_path)
else:
raise ValueError(f"Nieobs艂ugiwany format pliku: {file_ext}")
return text
except Exception as e:
self.logger.error(f"B艂膮d podczas konwersji pliku {file_path}: {str(e)}")
raise
def _convert_presentation(self, file_path: str) -> str:
"""Konwertuje prezentacj臋 PPT/PPTX do tekstu"""
presentation = Presentation(file_path)
text_parts = []
for i, slide in enumerate(presentation.slides, 1):
text_parts.append(f"\n=== Slajd {i} ===\n")
for shape in slide.shapes:
if hasattr(shape, 'text') and shape.text.strip():
text_parts.append(shape.text.strip())
if slide.has_notes_slide and slide.notes_slide:
notes = slide.notes_slide.notes_text_frame.text.strip()
if notes:
text_parts.append(f"\n[Notatki: {notes}]\n")
return '\n'.join(text_parts)
def _convert_word(self, file_path: str) -> str:
"""Konwertuje dokument Word do tekstu"""
doc = Document(file_path)
text_parts = []
for paragraph in doc.paragraphs:
if paragraph.text.strip():
text_parts.append(paragraph.text)
for table in doc.tables:
for row in table.rows:
row_texts = []
for cell in row.cells:
if cell.text.strip():
row_texts.append(cell.text.strip())
if row_texts:
text_parts.append(' | '.join(row_texts))
return '\n'.join(text_parts)
def _convert_pdf(self, file_path: str) -> str:
"""Konwertuje PDF do tekstu"""
with open(file_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
text_parts = []
for i, page in enumerate(reader.pages):
text = page.extract_text()
if text.strip():
text_parts.append(f"\n=== Strona {i + 1} ===\n")
text_parts.append(text)
return '\n'.join(text_parts)
def _read_text_file(self, file_path: str) -> str:
"""Czyta plik tekstowy"""
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()