|
|
import os
|
|
|
import json
|
|
|
import tempfile
|
|
|
import subprocess
|
|
|
from typing import Optional, Dict, Any, List
|
|
|
import PyPDF2
|
|
|
import docx
|
|
|
from openpyxl import load_workbook
|
|
|
import pandas as pd
|
|
|
from pptx import Presentation
|
|
|
from PIL import Image
|
|
|
import zipfile
|
|
|
import csv
|
|
|
import chardet
|
|
|
|
|
|
class FileProcessor:
|
|
|
"""Enhanced file processor for various document types with improved error handling"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.supported_extensions = {
|
|
|
'pdf': self._process_pdf,
|
|
|
'docx': self._process_docx,
|
|
|
'doc': self._process_doc,
|
|
|
'pptx': self._process_pptx,
|
|
|
'ppt': self._process_ppt,
|
|
|
'xlsx': self._process_xlsx,
|
|
|
'xls': self._process_xls,
|
|
|
'csv': self._process_csv,
|
|
|
'txt': self._process_txt,
|
|
|
'json': self._process_json,
|
|
|
'rtf': self._process_rtf,
|
|
|
'odt': self._process_odt,
|
|
|
'ods': self._process_ods,
|
|
|
'odp': self._process_odp
|
|
|
}
|
|
|
|
|
|
|
|
|
self.max_file_size = 100 * 1024 * 1024
|
|
|
self.max_text_length = 1000000
|
|
|
self.max_pages_pdf = 500
|
|
|
self.max_sheets_excel = 50
|
|
|
|
|
|
def process_file(self, file_path: str, extension: str = None) -> Optional[str]:
|
|
|
"""Process a file and extract its text content with enhanced error handling"""
|
|
|
try:
|
|
|
if not os.path.exists(file_path):
|
|
|
print(f"File not found: {file_path}")
|
|
|
return None
|
|
|
|
|
|
|
|
|
file_size = os.path.getsize(file_path)
|
|
|
if file_size > self.max_file_size:
|
|
|
print(f"File too large: {file_size} bytes (max: {self.max_file_size})")
|
|
|
return f"File too large for processing: {file_size / (1024*1024):.1f}MB"
|
|
|
|
|
|
|
|
|
if not extension:
|
|
|
extension = file_path.split('.')[-1].lower() if '.' in file_path else ''
|
|
|
|
|
|
extension = extension.lower().strip('.')
|
|
|
|
|
|
if extension not in self.supported_extensions:
|
|
|
print(f"Unsupported file extension: {extension}")
|
|
|
return f"Unsupported file type: .{extension}"
|
|
|
|
|
|
|
|
|
processor = self.supported_extensions[extension]
|
|
|
content = processor(file_path)
|
|
|
|
|
|
if content:
|
|
|
|
|
|
if len(content) > self.max_text_length:
|
|
|
content = content[:self.max_text_length] + "\n[Content truncated due to length limit]"
|
|
|
|
|
|
print(f"Successfully processed {extension.upper()} file: {os.path.basename(file_path)}")
|
|
|
return content
|
|
|
else:
|
|
|
print(f"No content extracted from: {os.path.basename(file_path)}")
|
|
|
return f"Could not extract content from {extension.upper()} file"
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error processing file {file_path}: {e}")
|
|
|
return f"Error processing file: {str(e)}"
|
|
|
|
|
|
def _process_pdf(self, file_path: str) -> Optional[str]:
|
|
|
"""Extract text from PDF files with enhanced handling"""
|
|
|
try:
|
|
|
text_content = []
|
|
|
|
|
|
with open(file_path, 'rb') as file:
|
|
|
pdf_reader = PyPDF2.PdfReader(file)
|
|
|
num_pages = len(pdf_reader.pages)
|
|
|
|
|
|
if num_pages > self.max_pages_pdf:
|
|
|
print(f"PDF too long ({num_pages} pages), processing first {self.max_pages_pdf}")
|
|
|
num_pages = self.max_pages_pdf
|
|
|
|
|
|
for page_num, page in enumerate(pdf_reader.pages[:num_pages]):
|
|
|
try:
|
|
|
page_text = page.extract_text()
|
|
|
if page_text.strip():
|
|
|
text_content.append(f"--- Page {page_num + 1} ---")
|
|
|
text_content.append(page_text)
|
|
|
text_content.append("")
|
|
|
except Exception as e:
|
|
|
text_content.append(f"--- Page {page_num + 1} (Error reading) ---")
|
|
|
print(f"Error reading PDF page {page_num + 1}: {e}")
|
|
|
|
|
|
if not text_content:
|
|
|
|
|
|
try:
|
|
|
import pdfplumber
|
|
|
with pdfplumber.open(file_path) as pdf:
|
|
|
for page_num, page in enumerate(pdf.pages[:self.max_pages_pdf]):
|
|
|
page_text = page.extract_text()
|
|
|
if page_text:
|
|
|
text_content.append(f"--- Page {page_num + 1} ---")
|
|
|
text_content.append(page_text)
|
|
|
text_content.append("")
|
|
|
except ImportError:
|
|
|
return "PDF contains non-text content or requires advanced processing"
|
|
|
|
|
|
return "\n".join(text_content) if text_content else None
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error processing PDF: {e}")
|
|
|
return None
|
|
|
|
|
|
def _process_docx(self, file_path: str) -> Optional[str]:
|
|
|
"""Extract text from DOCX files with enhanced table handling"""
|
|
|
try:
|
|
|
doc = docx.Document(file_path)
|
|
|
text_content = []
|
|
|
|
|
|
|
|
|
for paragraph in doc.paragraphs:
|
|
|
if paragraph.text.strip():
|
|
|
text_content.append(paragraph.text)
|
|
|
|
|
|
|
|
|
for table_num, table in enumerate(doc.tables, 1):
|
|
|
text_content.append(f"\n--- Table {table_num} ---")
|
|
|
for row_num, row in enumerate(table.rows):
|
|
|
row_text = []
|
|
|
for cell in row.cells:
|
|
|
cell_text = cell.text.strip().replace('\n', ' ').replace('\t', ' ')
|
|
|
row_text.append(cell_text)
|
|
|
if any(row_text):
|
|
|
text_content.append(" | ".join(row_text))
|
|
|
text_content.append("--- End Table ---\n")
|
|
|
|
|
|
return "\n".join(text_content) if text_content else None
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error processing DOCX: {e}")
|
|
|
return None
|
|
|
|
|
|
def _process_doc(self, file_path: str) -> Optional[str]:
|
|
|
"""Extract text from DOC files using available tools"""
|
|
|
try:
|
|
|
|
|
|
try:
|
|
|
import docx2txt
|
|
|
text = docx2txt.process(file_path)
|
|
|
return text if text.strip() else None
|
|
|
except ImportError:
|
|
|
pass
|
|
|
|
|
|
|
|
|
try:
|
|
|
result = subprocess.run(
|
|
|
['antiword', file_path],
|
|
|
capture_output=True,
|
|
|
text=True,
|
|
|
timeout=30
|
|
|
)
|
|
|
if result.returncode == 0:
|
|
|
return result.stdout
|
|
|
except (subprocess.SubprocessError, FileNotFoundError):
|
|
|
pass
|
|
|
|
|
|
|
|
|
try:
|
|
|
temp_dir = tempfile.mkdtemp()
|
|
|
result = subprocess.run([
|
|
|
'libreoffice', '--headless', '--convert-to', 'txt',
|
|
|
'--outdir', temp_dir, file_path
|
|
|
], capture_output=True, timeout=60)
|
|
|
|
|
|
if result.returncode == 0:
|
|
|
txt_file = os.path.join(temp_dir, os.path.splitext(os.path.basename(file_path))[0] + '.txt')
|
|
|
if os.path.exists(txt_file):
|
|
|
with open(txt_file, 'r', encoding='utf-8') as f:
|
|
|
content = f.read()
|
|
|
|
|
|
import shutil
|
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
return content
|
|
|
except (subprocess.SubprocessError, FileNotFoundError):
|
|
|
pass
|
|
|
|
|
|
return "DOC file processing requires additional tools (docx2txt, antiword, or LibreOffice)"
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error processing DOC: {e}")
|
|
|
return None
|
|
|
|
|
|
def _process_pptx(self, file_path: str) -> Optional[str]:
|
|
|
"""Extract text from PPTX files with enhanced slide handling"""
|
|
|
try:
|
|
|
presentation = Presentation(file_path)
|
|
|
text_content = []
|
|
|
|
|
|
for slide_num, slide in enumerate(presentation.slides, 1):
|
|
|
slide_text = []
|
|
|
slide_text.append(f"--- Slide {slide_num} ---")
|
|
|
|
|
|
|
|
|
for shape in slide.shapes:
|
|
|
if hasattr(shape, "text") and shape.text.strip():
|
|
|
|
|
|
if hasattr(shape, 'placeholder_format') and shape.placeholder_format:
|
|
|
if shape.placeholder_format.type == 1:
|
|
|
slide_text.append(f"TITLE: {shape.text}")
|
|
|
else:
|
|
|
slide_text.append(shape.text)
|
|
|
else:
|
|
|
slide_text.append(shape.text)
|
|
|
|
|
|
|
|
|
for shape in slide.shapes:
|
|
|
if shape.has_table:
|
|
|
slide_text.append("TABLE:")
|
|
|
table = shape.table
|
|
|
for row in table.rows:
|
|
|
row_text = [cell.text.strip() for cell in row.cells]
|
|
|
slide_text.append(" | ".join(row_text))
|
|
|
|
|
|
|
|
|
if slide.has_notes_slide:
|
|
|
notes_text = slide.notes_slide.notes_text_frame.text
|
|
|
if notes_text.strip():
|
|
|
slide_text.append(f"NOTES: {notes_text}")
|
|
|
|
|
|
if len(slide_text) > 1:
|
|
|
text_content.extend(slide_text)
|
|
|
text_content.append("")
|
|
|
|
|
|
return "\n".join(text_content) if text_content else None
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error processing PPTX: {e}")
|
|
|
return None
|
|
|
|
|
|
def _process_ppt(self, file_path: str) -> Optional[str]:
|
|
|
"""Extract text from PPT files using LibreOffice if available"""
|
|
|
try:
|
|
|
|
|
|
try:
|
|
|
temp_dir = tempfile.mkdtemp()
|
|
|
result = subprocess.run([
|
|
|
'libreoffice', '--headless', '--convert-to', 'txt',
|
|
|
'--outdir', temp_dir, file_path
|
|
|
], capture_output=True, timeout=60)
|
|
|
|
|
|
if result.returncode == 0:
|
|
|
txt_file = os.path.join(temp_dir, os.path.splitext(os.path.basename(file_path))[0] + '.txt')
|
|
|
if os.path.exists(txt_file):
|
|
|
with open(txt_file, 'r', encoding='utf-8') as f:
|
|
|
content = f.read()
|
|
|
|
|
|
import shutil
|
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
return content
|
|
|
except (subprocess.SubprocessError, FileNotFoundError):
|
|
|
pass
|
|
|
|
|
|
return "PPT file processing requires LibreOffice or conversion to PPTX format"
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error processing PPT: {e}")
|
|
|
return None
|
|
|
|
|
|
def _process_xlsx(self, file_path: str) -> Optional[str]:
|
|
|
"""Extract text from XLSX files with enhanced sheet handling"""
|
|
|
try:
|
|
|
workbook = load_workbook(file_path, data_only=True)
|
|
|
text_content = []
|
|
|
|
|
|
sheet_count = 0
|
|
|
for sheet_name in workbook.sheetnames:
|
|
|
if sheet_count >= self.max_sheets_excel:
|
|
|
text_content.append(f"[Additional {len(workbook.sheetnames) - sheet_count} sheets truncated]")
|
|
|
break
|
|
|
|
|
|
sheet = workbook[sheet_name]
|
|
|
text_content.append(f"--- Sheet: {sheet_name} ---")
|
|
|
|
|
|
|
|
|
max_row = min(sheet.max_row, 1000)
|
|
|
max_col = min(sheet.max_column, 100)
|
|
|
|
|
|
|
|
|
data = []
|
|
|
for row in sheet.iter_rows(min_row=1, max_row=max_row, min_col=1, max_col=max_col, values_only=True):
|
|
|
if any(cell is not None for cell in row):
|
|
|
row_data = [str(cell) if cell is not None else "" for cell in row]
|
|
|
|
|
|
while row_data and not row_data[-1]:
|
|
|
row_data.pop()
|
|
|
if row_data:
|
|
|
data.append(row_data)
|
|
|
|
|
|
if data:
|
|
|
|
|
|
for row in data[:100]:
|
|
|
text_content.append(" | ".join(row))
|
|
|
else:
|
|
|
text_content.append("[Empty sheet]")
|
|
|
|
|
|
text_content.append("")
|
|
|
sheet_count += 1
|
|
|
|
|
|
return "\n".join(text_content) if text_content else None
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error processing XLSX: {e}")
|
|
|
return None
|
|
|
|
|
|
def _process_xls(self, file_path: str) -> Optional[str]:
|
|
|
"""Extract text from XLS files with enhanced error handling"""
|
|
|
try:
|
|
|
|
|
|
xl_file = pd.ExcelFile(file_path)
|
|
|
text_content = []
|
|
|
|
|
|
sheet_count = 0
|
|
|
for sheet_name in xl_file.sheet_names:
|
|
|
if sheet_count >= self.max_sheets_excel:
|
|
|
text_content.append(f"[Additional {len(xl_file.sheet_names) - sheet_count} sheets truncated]")
|
|
|
break
|
|
|
|
|
|
text_content.append(f"--- Sheet: {sheet_name} ---")
|
|
|
|
|
|
try:
|
|
|
df = pd.read_excel(file_path, sheet_name=sheet_name)
|
|
|
|
|
|
|
|
|
if not df.empty:
|
|
|
|
|
|
limited_df = df.head(100).iloc[:, :20]
|
|
|
text_content.append(limited_df.to_string(index=False))
|
|
|
|
|
|
if len(df) > 100:
|
|
|
text_content.append(f"[{len(df) - 100} additional rows not shown]")
|
|
|
else:
|
|
|
text_content.append("[Empty sheet]")
|
|
|
|
|
|
except Exception as e:
|
|
|
text_content.append(f"[Error reading sheet: {e}]")
|
|
|
|
|
|
text_content.append("")
|
|
|
sheet_count += 1
|
|
|
|
|
|
return "\n".join(text_content) if text_content else None
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error processing XLS: {e}")
|
|
|
return None
|
|
|
|
|
|
def _process_csv(self, file_path: str) -> Optional[str]:
|
|
|
"""Extract text from CSV files with enhanced encoding detection"""
|
|
|
try:
|
|
|
|
|
|
encoding = 'utf-8'
|
|
|
try:
|
|
|
with open(file_path, 'rb') as f:
|
|
|
raw_data = f.read(10000)
|
|
|
detected = chardet.detect(raw_data)
|
|
|
if detected['encoding'] and detected['confidence'] > 0.7:
|
|
|
encoding = detected['encoding']
|
|
|
except:
|
|
|
pass
|
|
|
|
|
|
|
|
|
try:
|
|
|
df = pd.read_csv(file_path, encoding=encoding)
|
|
|
except UnicodeDecodeError:
|
|
|
|
|
|
for fallback_encoding in ['latin-1', 'cp1252', 'iso-8859-1']:
|
|
|
try:
|
|
|
df = pd.read_csv(file_path, encoding=fallback_encoding)
|
|
|
break
|
|
|
except UnicodeDecodeError:
|
|
|
continue
|
|
|
else:
|
|
|
return "Could not decode CSV file with any encoding"
|
|
|
|
|
|
text_content = []
|
|
|
text_content.append("--- CSV Data ---")
|
|
|
text_content.append(f"Columns ({len(df.columns)}): {', '.join(df.columns.astype(str).tolist())}")
|
|
|
text_content.append(f"Total rows: {len(df)}")
|
|
|
text_content.append("")
|
|
|
|
|
|
|
|
|
limited_df = df.head(100)
|
|
|
if len(df.columns) > 20:
|
|
|
limited_df = limited_df.iloc[:, :20]
|
|
|
text_content.append(f"[Showing first 20 of {len(df.columns)} columns]")
|
|
|
|
|
|
text_content.append(limited_df.to_string(index=False))
|
|
|
|
|
|
if len(df) > 100:
|
|
|
text_content.append(f"\n[{len(df) - 100} additional rows not shown]")
|
|
|
|
|
|
return "\n".join(text_content)
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error processing CSV: {e}")
|
|
|
return None
|
|
|
|
|
|
def _process_txt(self, file_path: str) -> Optional[str]:
|
|
|
"""Extract text from TXT files with encoding detection"""
|
|
|
try:
|
|
|
|
|
|
encoding = 'utf-8'
|
|
|
try:
|
|
|
with open(file_path, 'rb') as f:
|
|
|
raw_data = f.read()
|
|
|
detected = chardet.detect(raw_data)
|
|
|
if detected['encoding'] and detected['confidence'] > 0.7:
|
|
|
encoding = detected['encoding']
|
|
|
except:
|
|
|
pass
|
|
|
|
|
|
|
|
|
encodings_to_try = [encoding, 'utf-8', 'utf-16', 'latin-1', 'cp1252']
|
|
|
|
|
|
for enc in encodings_to_try:
|
|
|
try:
|
|
|
with open(file_path, 'r', encoding=enc) as file:
|
|
|
content = file.read()
|
|
|
return content if content.strip() else None
|
|
|
except UnicodeDecodeError:
|
|
|
continue
|
|
|
|
|
|
|
|
|
with open(file_path, 'r', encoding='utf-8', errors='replace') as file:
|
|
|
return file.read()
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error processing TXT: {e}")
|
|
|
return None
|
|
|
|
|
|
def _process_json(self, file_path: str) -> Optional[str]:
|
|
|
"""Extract text from JSON files with pretty formatting"""
|
|
|
try:
|
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
|
data = json.load(file)
|
|
|
|
|
|
|
|
|
if isinstance(data, dict):
|
|
|
text_content = ["--- JSON Object ---"]
|
|
|
text_content.append(json.dumps(data, indent=2, ensure_ascii=False)[:50000])
|
|
|
elif isinstance(data, list):
|
|
|
text_content = ["--- JSON Array ---"]
|
|
|
text_content.append(f"Array with {len(data)} items:")
|
|
|
sample_items = min(10, len(data))
|
|
|
text_content.append(json.dumps(data[:sample_items], indent=2, ensure_ascii=False))
|
|
|
if len(data) > sample_items:
|
|
|
text_content.append(f"... and {len(data) - sample_items} more items")
|
|
|
else:
|
|
|
text_content = [str(data)]
|
|
|
|
|
|
return "\n".join(text_content)
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error processing JSON: {e}")
|
|
|
return None
|
|
|
|
|
|
def _process_rtf(self, file_path: str) -> Optional[str]:
|
|
|
"""Extract text from RTF files"""
|
|
|
try:
|
|
|
|
|
|
try:
|
|
|
from striprtf.striprtf import rtf_to_text
|
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
|
rtf_content = file.read()
|
|
|
return rtf_to_text(rtf_content)
|
|
|
except ImportError:
|
|
|
pass
|
|
|
|
|
|
|
|
|
try:
|
|
|
temp_dir = tempfile.mkdtemp()
|
|
|
result = subprocess.run([
|
|
|
'libreoffice', '--headless', '--convert-to', 'txt',
|
|
|
'--outdir', temp_dir, file_path
|
|
|
], capture_output=True, timeout=60)
|
|
|
|
|
|
if result.returncode == 0:
|
|
|
txt_file = os.path.join(temp_dir, os.path.splitext(os.path.basename(file_path))[0] + '.txt')
|
|
|
if os.path.exists(txt_file):
|
|
|
with open(txt_file, 'r', encoding='utf-8') as f:
|
|
|
content = f.read()
|
|
|
|
|
|
import shutil
|
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
return content
|
|
|
except (subprocess.SubprocessError, FileNotFoundError):
|
|
|
pass
|
|
|
|
|
|
return "RTF file processing requires striprtf package or LibreOffice"
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error processing RTF: {e}")
|
|
|
return None
|
|
|
|
|
|
def _process_odt(self, file_path: str) -> Optional[str]:
|
|
|
"""Extract text from ODT files using LibreOffice or zip extraction"""
|
|
|
try:
|
|
|
|
|
|
try:
|
|
|
temp_dir = tempfile.mkdtemp()
|
|
|
result = subprocess.run([
|
|
|
'libreoffice', '--headless', '--convert-to', 'txt',
|
|
|
'--outdir', temp_dir, file_path
|
|
|
], capture_output=True, timeout=60)
|
|
|
|
|
|
if result.returncode == 0:
|
|
|
txt_file = os.path.join(temp_dir, os.path.splitext(os.path.basename(file_path))[0] + '.txt')
|
|
|
if os.path.exists(txt_file):
|
|
|
with open(txt_file, 'r', encoding='utf-8') as f:
|
|
|
content = f.read()
|
|
|
|
|
|
import shutil
|
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
return content
|
|
|
except (subprocess.SubprocessError, FileNotFoundError):
|
|
|
pass
|
|
|
|
|
|
|
|
|
try:
|
|
|
with zipfile.ZipFile(file_path, 'r') as zip_file:
|
|
|
if 'content.xml' in zip_file.namelist():
|
|
|
content_xml = zip_file.read('content.xml').decode('utf-8')
|
|
|
|
|
|
import re
|
|
|
text = re.sub(r'<[^>]+>', ' ', content_xml)
|
|
|
text = re.sub(r'\s+', ' ', text)
|
|
|
return text.strip() if text.strip() else None
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
return "ODT file processing requires LibreOffice"
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error processing ODT: {e}")
|
|
|
return None
|
|
|
|
|
|
def _process_ods(self, file_path: str) -> Optional[str]:
|
|
|
"""Extract text from ODS files"""
|
|
|
try:
|
|
|
|
|
|
try:
|
|
|
temp_dir = tempfile.mkdtemp()
|
|
|
result = subprocess.run([
|
|
|
'libreoffice', '--headless', '--convert-to', 'csv',
|
|
|
'--outdir', temp_dir, file_path
|
|
|
], capture_output=True, timeout=60)
|
|
|
|
|
|
if result.returncode == 0:
|
|
|
csv_file = os.path.join(temp_dir, os.path.splitext(os.path.basename(file_path))[0] + '.csv')
|
|
|
if os.path.exists(csv_file):
|
|
|
content = self._process_csv(csv_file)
|
|
|
|
|
|
import shutil
|
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
return content
|
|
|
except (subprocess.SubprocessError, FileNotFoundError):
|
|
|
pass
|
|
|
|
|
|
return "ODS file processing requires LibreOffice"
|
|
|
except Exception as e:
|
|
|
print(f"Error processing ODS: {e}")
|
|
|
return None
|
|
|
|
|
|
def _process_odp(self, file_path: str) -> Optional[str]:
|
|
|
"""Extract text from ODP files"""
|
|
|
try:
|
|
|
|
|
|
try:
|
|
|
temp_dir = tempfile.mkdtemp()
|
|
|
result = subprocess.run([
|
|
|
'libreoffice', '--headless', '--convert-to', 'txt',
|
|
|
'--outdir', temp_dir, file_path
|
|
|
], capture_output=True, timeout=60)
|
|
|
|
|
|
if result.returncode == 0:
|
|
|
txt_file = os.path.join(temp_dir, os.path.splitext(os.path.basename(file_path))[0] + '.txt')
|
|
|
if os.path.exists(txt_file):
|
|
|
with open(txt_file, 'r', encoding='utf-8') as f:
|
|
|
content = f.read()
|
|
|
|
|
|
import shutil
|
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
return content
|
|
|
except (subprocess.SubprocessError, FileNotFoundError):
|
|
|
pass
|
|
|
|
|
|
return "ODP file processing requires LibreOffice"
|
|
|
except Exception as e:
|
|
|
print(f"Error processing ODP: {e}")
|
|
|
return None
|
|
|
|
|
|
def get_file_info(self, file_path: str) -> Dict[str, Any]:
|
|
|
"""Get comprehensive information about a file"""
|
|
|
try:
|
|
|
stat = os.stat(file_path)
|
|
|
extension = file_path.split('.')[-1].lower() if '.' in file_path else ''
|
|
|
|
|
|
return {
|
|
|
'filename': os.path.basename(file_path),
|
|
|
'size': stat.st_size,
|
|
|
'size_mb': round(stat.st_size / (1024 * 1024), 2),
|
|
|
'size_human': self._format_file_size(stat.st_size),
|
|
|
'extension': extension,
|
|
|
'supported': extension in self.supported_extensions,
|
|
|
'modified': stat.st_mtime,
|
|
|
'type': self._get_file_type(extension),
|
|
|
'processing_complexity': self._get_processing_complexity(extension, stat.st_size)
|
|
|
}
|
|
|
except Exception as e:
|
|
|
return {
|
|
|
'filename': os.path.basename(file_path) if file_path else 'unknown',
|
|
|
'error': str(e),
|
|
|
'supported': False,
|
|
|
'type': 'unknown'
|
|
|
}
|
|
|
|
|
|
def _format_file_size(self, size_bytes: int) -> str:
|
|
|
"""Format file size in human readable format"""
|
|
|
for unit in ['B', 'KB', 'MB', 'GB']:
|
|
|
if size_bytes < 1024.0:
|
|
|
return f"{size_bytes:.1f} {unit}"
|
|
|
size_bytes /= 1024.0
|
|
|
return f"{size_bytes:.1f} TB"
|
|
|
|
|
|
def _get_file_type(self, extension: str) -> str:
|
|
|
"""Get file type category"""
|
|
|
document_types = {'pdf', 'docx', 'doc', 'txt', 'rtf', 'odt'}
|
|
|
spreadsheet_types = {'xlsx', 'xls', 'csv', 'ods'}
|
|
|
presentation_types = {'pptx', 'ppt', 'odp'}
|
|
|
data_types = {'json', 'xml'}
|
|
|
|
|
|
if extension in document_types:
|
|
|
return 'document'
|
|
|
elif extension in spreadsheet_types:
|
|
|
return 'spreadsheet'
|
|
|
elif extension in presentation_types:
|
|
|
return 'presentation'
|
|
|
elif extension in data_types:
|
|
|
return 'data'
|
|
|
else:
|
|
|
return 'unknown'
|
|
|
|
|
|
def _get_processing_complexity(self, extension: str, file_size: int) -> str:
|
|
|
"""Estimate processing complexity"""
|
|
|
if extension in ['txt', 'csv', 'json']:
|
|
|
return 'low'
|
|
|
elif extension in ['docx', 'xlsx', 'pptx'] and file_size < 10 * 1024 * 1024:
|
|
|
return 'medium'
|
|
|
elif extension in ['pdf', 'doc', 'xls', 'ppt'] or file_size > 10 * 1024 * 1024:
|
|
|
return 'high'
|
|
|
else:
|
|
|
return 'medium'
|
|
|
|
|
|
def batch_process_files(self, file_paths: List[str]) -> Dict[str, Any]:
|
|
|
"""Process multiple files and return comprehensive results"""
|
|
|
results = {
|
|
|
'successful': [],
|
|
|
'failed': [],
|
|
|
'combined_content': [],
|
|
|
'total_files': len(file_paths),
|
|
|
'total_size': 0,
|
|
|
'processing_time': 0,
|
|
|
'file_types': {}
|
|
|
}
|
|
|
|
|
|
import time
|
|
|
start_time = time.time()
|
|
|
|
|
|
for file_path in file_paths:
|
|
|
try:
|
|
|
file_info = self.get_file_info(file_path)
|
|
|
results['total_size'] += file_info.get('size', 0)
|
|
|
|
|
|
file_type = file_info.get('type', 'unknown')
|
|
|
results['file_types'][file_type] = results['file_types'].get(file_type, 0) + 1
|
|
|
|
|
|
if file_info.get('supported', False):
|
|
|
content = self.process_file(file_path)
|
|
|
if content:
|
|
|
results['successful'].append({
|
|
|
'filename': file_info['filename'],
|
|
|
'content': content,
|
|
|
'size_mb': file_info['size_mb'],
|
|
|
'type': file_type,
|
|
|
'complexity': file_info.get('processing_complexity', 'unknown')
|
|
|
})
|
|
|
results['combined_content'].append(f"=== {file_info['filename']} ===")
|
|
|
results['combined_content'].append(content)
|
|
|
results['combined_content'].append("")
|
|
|
else:
|
|
|
results['failed'].append({
|
|
|
'filename': file_info['filename'],
|
|
|
'reason': 'No content extracted',
|
|
|
'type': file_type
|
|
|
})
|
|
|
else:
|
|
|
results['failed'].append({
|
|
|
'filename': file_info['filename'],
|
|
|
'reason': 'Unsupported file type',
|
|
|
'type': file_type
|
|
|
})
|
|
|
|
|
|
except Exception as e:
|
|
|
results['failed'].append({
|
|
|
'filename': os.path.basename(file_path) if file_path else 'unknown',
|
|
|
'reason': str(e),
|
|
|
'type': 'unknown'
|
|
|
})
|
|
|
|
|
|
results['processing_time'] = time.time() - start_time
|
|
|
results['combined_text'] = "\n".join(results['combined_content'])
|
|
|
results['success_rate'] = len(results['successful']) / len(file_paths) if file_paths else 0
|
|
|
results['total_size_mb'] = results['total_size'] / (1024 * 1024)
|
|
|
|
|
|
return results
|
|
|
|
|
|
def validate_file(self, file_path: str) -> Dict[str, Any]:
|
|
|
"""Validate a file before processing"""
|
|
|
validation_result = {
|
|
|
'valid': False,
|
|
|
'errors': [],
|
|
|
'warnings': [],
|
|
|
'info': {}
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
if not os.path.exists(file_path):
|
|
|
validation_result['errors'].append("File does not exist")
|
|
|
return validation_result
|
|
|
|
|
|
file_info = self.get_file_info(file_path)
|
|
|
validation_result['info'] = file_info
|
|
|
|
|
|
|
|
|
if file_info['size'] > self.max_file_size:
|
|
|
validation_result['errors'].append(f"File too large: {file_info['size_human']} (max: {self._format_file_size(self.max_file_size)})")
|
|
|
|
|
|
if file_info['size'] == 0:
|
|
|
validation_result['errors'].append("File is empty")
|
|
|
|
|
|
|
|
|
if not file_info['supported']:
|
|
|
validation_result['errors'].append(f"Unsupported file type: .{file_info['extension']}")
|
|
|
|
|
|
|
|
|
complexity = file_info.get('processing_complexity', 'unknown')
|
|
|
if complexity == 'high':
|
|
|
validation_result['warnings'].append("File may require significant processing time")
|
|
|
|
|
|
|
|
|
extension = file_info['extension']
|
|
|
if extension == 'pdf' and file_info['size'] > 50 * 1024 * 1024:
|
|
|
validation_result['warnings'].append("Large PDF files may have incomplete text extraction")
|
|
|
|
|
|
validation_result['valid'] = len(validation_result['errors']) == 0
|
|
|
|
|
|
except Exception as e:
|
|
|
validation_result['errors'].append(f"Validation error: {str(e)}")
|
|
|
|
|
|
return validation_result |