| from docx import Document
|
| import pdfplumber
|
| import re
|
| from typing import Tuple
|
| import os
|
|
|
| def extract_docx(file_obj) -> str:
|
| """
|
| Extract text from DOCX with enhanced error handling and formatting preservation
|
| """
|
| try:
|
| doc = Document(file_obj)
|
|
|
|
|
| paragraphs = []
|
| for para in doc.paragraphs:
|
| text = para.text.strip()
|
| if text:
|
| paragraphs.append(text)
|
|
|
|
|
| for table in doc.tables:
|
| for row in table.rows:
|
| row_text = []
|
| for cell in row.cells:
|
| cell_text = cell.text.strip()
|
| if cell_text:
|
| row_text.append(cell_text)
|
| if row_text:
|
| paragraphs.append(" | ".join(row_text))
|
|
|
| extracted_text = "\n\n".join(paragraphs)
|
|
|
|
|
| extracted_text = clean_extracted_text(extracted_text)
|
|
|
| return extracted_text
|
|
|
| except Exception as e:
|
| error_msg = f"[DOCX Extraction Error] {str(e)}"
|
| print(error_msg)
|
| return f"Error extracting DOCX: {str(e)}"
|
|
|
|
|
| def extract_pdf(file_obj) -> str:
|
| """
|
| Extract text from PDF with multiple strategies and enhanced error handling
|
| """
|
| try:
|
| extracted_pages = []
|
|
|
| with pdfplumber.open(file_obj) as pdf:
|
|
|
| successful_pages = 0
|
| total_pages = len(pdf.pages)
|
|
|
| for page_num, page in enumerate(pdf.pages, 1):
|
| try:
|
|
|
| page_text = page.extract_text()
|
|
|
|
|
| if not page_text or len(page_text.strip()) < 50:
|
| page_text = page.extract_text(layout=True)
|
|
|
|
|
| if not page_text or len(page_text.strip()) < 50:
|
| page_text = page.extract_text(
|
| x_tolerance=2,
|
| y_tolerance=2
|
| )
|
|
|
| if page_text and page_text.strip():
|
|
|
| clean_text = page_text.strip()
|
| extracted_pages.append(f"--- Page {page_num} ---\n{clean_text}")
|
| successful_pages += 1
|
| else:
|
| print(f"[PDF Warning] Page {page_num} yielded no text")
|
|
|
| except Exception as page_error:
|
| print(f"[PDF Warning] Error on page {page_num}: {page_error}")
|
| continue
|
|
|
| if successful_pages == 0:
|
| return "[PDF Error] No text could be extracted from any page. The PDF may be image-based or corrupted."
|
|
|
| if successful_pages < total_pages * 0.5:
|
| print(f"[PDF Warning] Only {successful_pages}/{total_pages} pages extracted successfully")
|
|
|
| full_text = "\n\n".join(extracted_pages)
|
|
|
|
|
| full_text = clean_extracted_text(full_text)
|
|
|
| return full_text
|
|
|
| except Exception as e:
|
| error_msg = f"[PDF Extraction Error] {str(e)}"
|
| print(error_msg)
|
| return f"Error extracting PDF: {str(e)}"
|
|
|
|
|
| def clean_extracted_text(text: str) -> str:
|
| """
|
| Clean up common issues in extracted text
|
| """
|
|
|
| text = re.sub(r'\n{3,}', '\n\n', text)
|
| text = re.sub(r' {2,}', ' ', text)
|
|
|
|
|
| text = re.sub(r'^\s*\d+\s*$', '', text, flags=re.MULTILINE)
|
|
|
|
|
| text = re.sub(r'^\s*Page \d+ of \d+\s*$', '', text, flags=re.MULTILINE)
|
| text = re.sub(r'^\s*\d+/\d+\s*$', '', text, flags=re.MULTILINE)
|
|
|
|
|
| text = text.replace('', "'")
|
| text = text.replace('', "'")
|
| text = text.replace('"', '"')
|
| text = text.replace('"', '"')
|
| text = text.replace('–', '-')
|
| text = text.replace('—', '-')
|
|
|
|
|
| text = re.sub(r'[\u200b\u200c\u200d\ufeff]', '', text)
|
|
|
| return text.strip()
|
|
|
|
|
| def validate_extraction(text: str, filename: str) -> Tuple[bool, str]:
|
| """
|
| Validate extracted text quality
|
| """
|
|
|
| if not text or not text.strip():
|
| return False, "No text extracted"
|
|
|
|
|
| if len(text) < 100:
|
| return False, f"Extracted text too short ({len(text)} characters)"
|
|
|
|
|
| if text.startswith("Error") or text.startswith("["):
|
| return False, "Extraction error detected"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| words = text.split()
|
| if len(words) < 50:
|
| return False, f"Too few words ({len(words)})"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| return True, f"Valid extraction: {len(words)} words, {len(text)} characters"
|
|
|
|
|
| def detect_file_encoding(file_path: str) -> str:
|
| """
|
| Detect file encoding for text files
|
| """
|
| try:
|
| import chardet
|
| with open(file_path, 'rb') as f:
|
| raw_data = f.read()
|
| result = chardet.detect(raw_data)
|
| return result['encoding']
|
| except:
|
| return 'utf-8'
|
|
|
|
|
| def extract_text_file(file_obj) -> str:
|
| """
|
| Extract from plain text file with encoding detection
|
| """
|
| try:
|
|
|
| try:
|
| return file_obj.read().decode('utf-8')
|
| except UnicodeDecodeError:
|
|
|
| file_obj.seek(0)
|
| try:
|
| return file_obj.read().decode('latin-1')
|
| except:
|
| file_obj.seek(0)
|
| return file_obj.read().decode('cp1252')
|
| except Exception as e:
|
| return f"Error reading text file: {str(e)}" |