Azure_Powered_AI_Summary / file_processors.py
Chirapath's picture
Upload 9 files
8418b54 verified
import os
import json
import tempfile
import subprocess
from typing import Optional, Dict, Any, List
import PyPDF2
import docx
from openpyxl import load_workbook
import pandas as pd
from pptx import Presentation
from PIL import Image
import zipfile
import csv
import chardet
class FileProcessor:
"""Enhanced file processor for various document types with improved error handling"""
def __init__(self):
self.supported_extensions = {
'pdf': self._process_pdf,
'docx': self._process_docx,
'doc': self._process_doc,
'pptx': self._process_pptx,
'ppt': self._process_ppt,
'xlsx': self._process_xlsx,
'xls': self._process_xls,
'csv': self._process_csv,
'txt': self._process_txt,
'json': self._process_json,
'rtf': self._process_rtf,
'odt': self._process_odt,
'ods': self._process_ods,
'odp': self._process_odp
}
# Enhanced processing limits
self.max_file_size = 100 * 1024 * 1024 # 100MB
self.max_text_length = 1000000 # 1M characters
self.max_pages_pdf = 500
self.max_sheets_excel = 50
def process_file(self, file_path: str, extension: str = None) -> Optional[str]:
"""Process a file and extract its text content with enhanced error handling"""
try:
if not os.path.exists(file_path):
print(f"File not found: {file_path}")
return None
# Check file size
file_size = os.path.getsize(file_path)
if file_size > self.max_file_size:
print(f"File too large: {file_size} bytes (max: {self.max_file_size})")
return f"File too large for processing: {file_size / (1024*1024):.1f}MB"
# Determine extension if not provided
if not extension:
extension = file_path.split('.')[-1].lower() if '.' in file_path else ''
extension = extension.lower().strip('.')
if extension not in self.supported_extensions:
print(f"Unsupported file extension: {extension}")
return f"Unsupported file type: .{extension}"
# Process file based on extension
processor = self.supported_extensions[extension]
content = processor(file_path)
if content:
# Truncate if too long
if len(content) > self.max_text_length:
content = content[:self.max_text_length] + "\n[Content truncated due to length limit]"
print(f"Successfully processed {extension.upper()} file: {os.path.basename(file_path)}")
return content
else:
print(f"No content extracted from: {os.path.basename(file_path)}")
return f"Could not extract content from {extension.upper()} file"
except Exception as e:
print(f"Error processing file {file_path}: {e}")
return f"Error processing file: {str(e)}"
def _process_pdf(self, file_path: str) -> Optional[str]:
"""Extract text from PDF files with enhanced handling"""
try:
text_content = []
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
num_pages = len(pdf_reader.pages)
if num_pages > self.max_pages_pdf:
print(f"PDF too long ({num_pages} pages), processing first {self.max_pages_pdf}")
num_pages = self.max_pages_pdf
for page_num, page in enumerate(pdf_reader.pages[:num_pages]):
try:
page_text = page.extract_text()
if page_text.strip():
text_content.append(f"--- Page {page_num + 1} ---")
text_content.append(page_text)
text_content.append("")
except Exception as e:
text_content.append(f"--- Page {page_num + 1} (Error reading) ---")
print(f"Error reading PDF page {page_num + 1}: {e}")
if not text_content:
# Try alternative PDF processing with pdfplumber if available
try:
import pdfplumber
with pdfplumber.open(file_path) as pdf:
for page_num, page in enumerate(pdf.pages[:self.max_pages_pdf]):
page_text = page.extract_text()
if page_text:
text_content.append(f"--- Page {page_num + 1} ---")
text_content.append(page_text)
text_content.append("")
except ImportError:
return "PDF contains non-text content or requires advanced processing"
return "\n".join(text_content) if text_content else None
except Exception as e:
print(f"Error processing PDF: {e}")
return None
def _process_docx(self, file_path: str) -> Optional[str]:
"""Extract text from DOCX files with enhanced table handling"""
try:
doc = docx.Document(file_path)
text_content = []
# Extract paragraphs
for paragraph in doc.paragraphs:
if paragraph.text.strip():
text_content.append(paragraph.text)
# Extract tables with improved formatting
for table_num, table in enumerate(doc.tables, 1):
text_content.append(f"\n--- Table {table_num} ---")
for row_num, row in enumerate(table.rows):
row_text = []
for cell in row.cells:
cell_text = cell.text.strip().replace('\n', ' ').replace('\t', ' ')
row_text.append(cell_text)
if any(row_text): # Only add non-empty rows
text_content.append(" | ".join(row_text))
text_content.append("--- End Table ---\n")
return "\n".join(text_content) if text_content else None
except Exception as e:
print(f"Error processing DOCX: {e}")
return None
def _process_doc(self, file_path: str) -> Optional[str]:
"""Extract text from DOC files using available tools"""
try:
# Try with docx2txt first
try:
import docx2txt
text = docx2txt.process(file_path)
return text if text.strip() else None
except ImportError:
pass
# Try with antiword (if available on system)
try:
result = subprocess.run(
['antiword', file_path],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
return result.stdout
except (subprocess.SubprocessError, FileNotFoundError):
pass
# Try with LibreOffice (if available)
try:
temp_dir = tempfile.mkdtemp()
result = subprocess.run([
'libreoffice', '--headless', '--convert-to', 'txt',
'--outdir', temp_dir, file_path
], capture_output=True, timeout=60)
if result.returncode == 0:
txt_file = os.path.join(temp_dir, os.path.splitext(os.path.basename(file_path))[0] + '.txt')
if os.path.exists(txt_file):
with open(txt_file, 'r', encoding='utf-8') as f:
content = f.read()
# Clean up
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
return content
except (subprocess.SubprocessError, FileNotFoundError):
pass
return "DOC file processing requires additional tools (docx2txt, antiword, or LibreOffice)"
except Exception as e:
print(f"Error processing DOC: {e}")
return None
def _process_pptx(self, file_path: str) -> Optional[str]:
"""Extract text from PPTX files with enhanced slide handling"""
try:
presentation = Presentation(file_path)
text_content = []
for slide_num, slide in enumerate(presentation.slides, 1):
slide_text = []
slide_text.append(f"--- Slide {slide_num} ---")
# Extract text from shapes
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text.strip():
# Check if it's a title or content
if hasattr(shape, 'placeholder_format') and shape.placeholder_format:
if shape.placeholder_format.type == 1: # Title
slide_text.append(f"TITLE: {shape.text}")
else:
slide_text.append(shape.text)
else:
slide_text.append(shape.text)
# Extract table content if present
for shape in slide.shapes:
if shape.has_table:
slide_text.append("TABLE:")
table = shape.table
for row in table.rows:
row_text = [cell.text.strip() for cell in row.cells]
slide_text.append(" | ".join(row_text))
# Extract notes
if slide.has_notes_slide:
notes_text = slide.notes_slide.notes_text_frame.text
if notes_text.strip():
slide_text.append(f"NOTES: {notes_text}")
if len(slide_text) > 1: # More than just the slide header
text_content.extend(slide_text)
text_content.append("")
return "\n".join(text_content) if text_content else None
except Exception as e:
print(f"Error processing PPTX: {e}")
return None
def _process_ppt(self, file_path: str) -> Optional[str]:
"""Extract text from PPT files using LibreOffice if available"""
try:
# Try with LibreOffice
try:
temp_dir = tempfile.mkdtemp()
result = subprocess.run([
'libreoffice', '--headless', '--convert-to', 'txt',
'--outdir', temp_dir, file_path
], capture_output=True, timeout=60)
if result.returncode == 0:
txt_file = os.path.join(temp_dir, os.path.splitext(os.path.basename(file_path))[0] + '.txt')
if os.path.exists(txt_file):
with open(txt_file, 'r', encoding='utf-8') as f:
content = f.read()
# Clean up
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
return content
except (subprocess.SubprocessError, FileNotFoundError):
pass
return "PPT file processing requires LibreOffice or conversion to PPTX format"
except Exception as e:
print(f"Error processing PPT: {e}")
return None
def _process_xlsx(self, file_path: str) -> Optional[str]:
"""Extract text from XLSX files with enhanced sheet handling"""
try:
workbook = load_workbook(file_path, data_only=True)
text_content = []
sheet_count = 0
for sheet_name in workbook.sheetnames:
if sheet_count >= self.max_sheets_excel:
text_content.append(f"[Additional {len(workbook.sheetnames) - sheet_count} sheets truncated]")
break
sheet = workbook[sheet_name]
text_content.append(f"--- Sheet: {sheet_name} ---")
# Get data dimensions
max_row = min(sheet.max_row, 1000) # Limit rows
max_col = min(sheet.max_column, 100) # Limit columns
# Extract data efficiently
data = []
for row in sheet.iter_rows(min_row=1, max_row=max_row, min_col=1, max_col=max_col, values_only=True):
if any(cell is not None for cell in row):
row_data = [str(cell) if cell is not None else "" for cell in row]
# Remove trailing empty cells
while row_data and not row_data[-1]:
row_data.pop()
if row_data:
data.append(row_data)
if data:
# Format as table
for row in data[:100]: # Limit to first 100 rows per sheet
text_content.append(" | ".join(row))
else:
text_content.append("[Empty sheet]")
text_content.append("")
sheet_count += 1
return "\n".join(text_content) if text_content else None
except Exception as e:
print(f"Error processing XLSX: {e}")
return None
def _process_xls(self, file_path: str) -> Optional[str]:
"""Extract text from XLS files with enhanced error handling"""
try:
# Use pandas to read XLS files
xl_file = pd.ExcelFile(file_path)
text_content = []
sheet_count = 0
for sheet_name in xl_file.sheet_names:
if sheet_count >= self.max_sheets_excel:
text_content.append(f"[Additional {len(xl_file.sheet_names) - sheet_count} sheets truncated]")
break
text_content.append(f"--- Sheet: {sheet_name} ---")
try:
df = pd.read_excel(file_path, sheet_name=sheet_name)
# Convert DataFrame to string representation
if not df.empty:
# Limit rows and columns for processing
limited_df = df.head(100).iloc[:, :20] # First 100 rows, 20 columns
text_content.append(limited_df.to_string(index=False))
if len(df) > 100:
text_content.append(f"[{len(df) - 100} additional rows not shown]")
else:
text_content.append("[Empty sheet]")
except Exception as e:
text_content.append(f"[Error reading sheet: {e}]")
text_content.append("")
sheet_count += 1
return "\n".join(text_content) if text_content else None
except Exception as e:
print(f"Error processing XLS: {e}")
return None
def _process_csv(self, file_path: str) -> Optional[str]:
"""Extract text from CSV files with enhanced encoding detection"""
try:
# Detect encoding
encoding = 'utf-8'
try:
with open(file_path, 'rb') as f:
raw_data = f.read(10000) # Read first 10KB
detected = chardet.detect(raw_data)
if detected['encoding'] and detected['confidence'] > 0.7:
encoding = detected['encoding']
except:
pass
# Try reading with detected encoding
try:
df = pd.read_csv(file_path, encoding=encoding)
except UnicodeDecodeError:
# Fallback encodings
for fallback_encoding in ['latin-1', 'cp1252', 'iso-8859-1']:
try:
df = pd.read_csv(file_path, encoding=fallback_encoding)
break
except UnicodeDecodeError:
continue
else:
return "Could not decode CSV file with any encoding"
text_content = []
text_content.append("--- CSV Data ---")
text_content.append(f"Columns ({len(df.columns)}): {', '.join(df.columns.astype(str).tolist())}")
text_content.append(f"Total rows: {len(df)}")
text_content.append("")
# Show first 100 rows and limit columns if too many
limited_df = df.head(100)
if len(df.columns) > 20:
limited_df = limited_df.iloc[:, :20]
text_content.append(f"[Showing first 20 of {len(df.columns)} columns]")
text_content.append(limited_df.to_string(index=False))
if len(df) > 100:
text_content.append(f"\n[{len(df) - 100} additional rows not shown]")
return "\n".join(text_content)
except Exception as e:
print(f"Error processing CSV: {e}")
return None
def _process_txt(self, file_path: str) -> Optional[str]:
"""Extract text from TXT files with encoding detection"""
try:
# Detect encoding
encoding = 'utf-8'
try:
with open(file_path, 'rb') as f:
raw_data = f.read()
detected = chardet.detect(raw_data)
if detected['encoding'] and detected['confidence'] > 0.7:
encoding = detected['encoding']
except:
pass
# Try reading with detected encoding first
encodings_to_try = [encoding, 'utf-8', 'utf-16', 'latin-1', 'cp1252']
for enc in encodings_to_try:
try:
with open(file_path, 'r', encoding=enc) as file:
content = file.read()
return content if content.strip() else None
except UnicodeDecodeError:
continue
# If all encodings fail, try with error handling
with open(file_path, 'r', encoding='utf-8', errors='replace') as file:
return file.read()
except Exception as e:
print(f"Error processing TXT: {e}")
return None
def _process_json(self, file_path: str) -> Optional[str]:
"""Extract text from JSON files with pretty formatting"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
data = json.load(file)
# Convert JSON to readable text format
if isinstance(data, dict):
text_content = ["--- JSON Object ---"]
text_content.append(json.dumps(data, indent=2, ensure_ascii=False)[:50000]) # Limit size
elif isinstance(data, list):
text_content = ["--- JSON Array ---"]
text_content.append(f"Array with {len(data)} items:")
sample_items = min(10, len(data))
text_content.append(json.dumps(data[:sample_items], indent=2, ensure_ascii=False))
if len(data) > sample_items:
text_content.append(f"... and {len(data) - sample_items} more items")
else:
text_content = [str(data)]
return "\n".join(text_content)
except Exception as e:
print(f"Error processing JSON: {e}")
return None
def _process_rtf(self, file_path: str) -> Optional[str]:
"""Extract text from RTF files"""
try:
# Try with striprtf if available
try:
from striprtf.striprtf import rtf_to_text
with open(file_path, 'r', encoding='utf-8') as file:
rtf_content = file.read()
return rtf_to_text(rtf_content)
except ImportError:
pass
# Try with LibreOffice
try:
temp_dir = tempfile.mkdtemp()
result = subprocess.run([
'libreoffice', '--headless', '--convert-to', 'txt',
'--outdir', temp_dir, file_path
], capture_output=True, timeout=60)
if result.returncode == 0:
txt_file = os.path.join(temp_dir, os.path.splitext(os.path.basename(file_path))[0] + '.txt')
if os.path.exists(txt_file):
with open(txt_file, 'r', encoding='utf-8') as f:
content = f.read()
# Clean up
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
return content
except (subprocess.SubprocessError, FileNotFoundError):
pass
return "RTF file processing requires striprtf package or LibreOffice"
except Exception as e:
print(f"Error processing RTF: {e}")
return None
def _process_odt(self, file_path: str) -> Optional[str]:
"""Extract text from ODT files using LibreOffice or zip extraction"""
try:
# Try with LibreOffice first
try:
temp_dir = tempfile.mkdtemp()
result = subprocess.run([
'libreoffice', '--headless', '--convert-to', 'txt',
'--outdir', temp_dir, file_path
], capture_output=True, timeout=60)
if result.returncode == 0:
txt_file = os.path.join(temp_dir, os.path.splitext(os.path.basename(file_path))[0] + '.txt')
if os.path.exists(txt_file):
with open(txt_file, 'r', encoding='utf-8') as f:
content = f.read()
# Clean up
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
return content
except (subprocess.SubprocessError, FileNotFoundError):
pass
# Try manual XML extraction
try:
with zipfile.ZipFile(file_path, 'r') as zip_file:
if 'content.xml' in zip_file.namelist():
content_xml = zip_file.read('content.xml').decode('utf-8')
# Basic XML tag removal (not perfect but functional)
import re
text = re.sub(r'<[^>]+>', ' ', content_xml)
text = re.sub(r'\s+', ' ', text)
return text.strip() if text.strip() else None
except Exception:
pass
return "ODT file processing requires LibreOffice"
except Exception as e:
print(f"Error processing ODT: {e}")
return None
def _process_ods(self, file_path: str) -> Optional[str]:
"""Extract text from ODS files"""
try:
# Try with LibreOffice
try:
temp_dir = tempfile.mkdtemp()
result = subprocess.run([
'libreoffice', '--headless', '--convert-to', 'csv',
'--outdir', temp_dir, file_path
], capture_output=True, timeout=60)
if result.returncode == 0:
csv_file = os.path.join(temp_dir, os.path.splitext(os.path.basename(file_path))[0] + '.csv')
if os.path.exists(csv_file):
content = self._process_csv(csv_file)
# Clean up
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
return content
except (subprocess.SubprocessError, FileNotFoundError):
pass
return "ODS file processing requires LibreOffice"
except Exception as e:
print(f"Error processing ODS: {e}")
return None
def _process_odp(self, file_path: str) -> Optional[str]:
"""Extract text from ODP files"""
try:
# Try with LibreOffice
try:
temp_dir = tempfile.mkdtemp()
result = subprocess.run([
'libreoffice', '--headless', '--convert-to', 'txt',
'--outdir', temp_dir, file_path
], capture_output=True, timeout=60)
if result.returncode == 0:
txt_file = os.path.join(temp_dir, os.path.splitext(os.path.basename(file_path))[0] + '.txt')
if os.path.exists(txt_file):
with open(txt_file, 'r', encoding='utf-8') as f:
content = f.read()
# Clean up
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
return content
except (subprocess.SubprocessError, FileNotFoundError):
pass
return "ODP file processing requires LibreOffice"
except Exception as e:
print(f"Error processing ODP: {e}")
return None
def get_file_info(self, file_path: str) -> Dict[str, Any]:
"""Get comprehensive information about a file"""
try:
stat = os.stat(file_path)
extension = file_path.split('.')[-1].lower() if '.' in file_path else ''
return {
'filename': os.path.basename(file_path),
'size': stat.st_size,
'size_mb': round(stat.st_size / (1024 * 1024), 2),
'size_human': self._format_file_size(stat.st_size),
'extension': extension,
'supported': extension in self.supported_extensions,
'modified': stat.st_mtime,
'type': self._get_file_type(extension),
'processing_complexity': self._get_processing_complexity(extension, stat.st_size)
}
except Exception as e:
return {
'filename': os.path.basename(file_path) if file_path else 'unknown',
'error': str(e),
'supported': False,
'type': 'unknown'
}
def _format_file_size(self, size_bytes: int) -> str:
"""Format file size in human readable format"""
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024.0:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.1f} TB"
def _get_file_type(self, extension: str) -> str:
"""Get file type category"""
document_types = {'pdf', 'docx', 'doc', 'txt', 'rtf', 'odt'}
spreadsheet_types = {'xlsx', 'xls', 'csv', 'ods'}
presentation_types = {'pptx', 'ppt', 'odp'}
data_types = {'json', 'xml'}
if extension in document_types:
return 'document'
elif extension in spreadsheet_types:
return 'spreadsheet'
elif extension in presentation_types:
return 'presentation'
elif extension in data_types:
return 'data'
else:
return 'unknown'
def _get_processing_complexity(self, extension: str, file_size: int) -> str:
"""Estimate processing complexity"""
if extension in ['txt', 'csv', 'json']:
return 'low'
elif extension in ['docx', 'xlsx', 'pptx'] and file_size < 10 * 1024 * 1024:
return 'medium'
elif extension in ['pdf', 'doc', 'xls', 'ppt'] or file_size > 10 * 1024 * 1024:
return 'high'
else:
return 'medium'
def batch_process_files(self, file_paths: List[str]) -> Dict[str, Any]:
"""Process multiple files and return comprehensive results"""
results = {
'successful': [],
'failed': [],
'combined_content': [],
'total_files': len(file_paths),
'total_size': 0,
'processing_time': 0,
'file_types': {}
}
import time
start_time = time.time()
for file_path in file_paths:
try:
file_info = self.get_file_info(file_path)
results['total_size'] += file_info.get('size', 0)
file_type = file_info.get('type', 'unknown')
results['file_types'][file_type] = results['file_types'].get(file_type, 0) + 1
if file_info.get('supported', False):
content = self.process_file(file_path)
if content:
results['successful'].append({
'filename': file_info['filename'],
'content': content,
'size_mb': file_info['size_mb'],
'type': file_type,
'complexity': file_info.get('processing_complexity', 'unknown')
})
results['combined_content'].append(f"=== {file_info['filename']} ===")
results['combined_content'].append(content)
results['combined_content'].append("")
else:
results['failed'].append({
'filename': file_info['filename'],
'reason': 'No content extracted',
'type': file_type
})
else:
results['failed'].append({
'filename': file_info['filename'],
'reason': 'Unsupported file type',
'type': file_type
})
except Exception as e:
results['failed'].append({
'filename': os.path.basename(file_path) if file_path else 'unknown',
'reason': str(e),
'type': 'unknown'
})
results['processing_time'] = time.time() - start_time
results['combined_text'] = "\n".join(results['combined_content'])
results['success_rate'] = len(results['successful']) / len(file_paths) if file_paths else 0
results['total_size_mb'] = results['total_size'] / (1024 * 1024)
return results
def validate_file(self, file_path: str) -> Dict[str, Any]:
"""Validate a file before processing"""
validation_result = {
'valid': False,
'errors': [],
'warnings': [],
'info': {}
}
try:
if not os.path.exists(file_path):
validation_result['errors'].append("File does not exist")
return validation_result
file_info = self.get_file_info(file_path)
validation_result['info'] = file_info
# Check file size
if file_info['size'] > self.max_file_size:
validation_result['errors'].append(f"File too large: {file_info['size_human']} (max: {self._format_file_size(self.max_file_size)})")
if file_info['size'] == 0:
validation_result['errors'].append("File is empty")
# Check file type
if not file_info['supported']:
validation_result['errors'].append(f"Unsupported file type: .{file_info['extension']}")
# Check processing complexity
complexity = file_info.get('processing_complexity', 'unknown')
if complexity == 'high':
validation_result['warnings'].append("File may require significant processing time")
# Additional format-specific validation
extension = file_info['extension']
if extension == 'pdf' and file_info['size'] > 50 * 1024 * 1024:
validation_result['warnings'].append("Large PDF files may have incomplete text extraction")
validation_result['valid'] = len(validation_result['errors']) == 0
except Exception as e:
validation_result['errors'].append(f"Validation error: {str(e)}")
return validation_result