redhairedshanks1's picture
Update services/extract_text.py
a0113c0 verified
# import os
# import logging
# import fitz # PyMuPDF
# import numpy as np
# from PIL import Image
# import cv2
# import re
# # OCR
# from paddleocr import PaddleOCR
# # Optional Mistral OCR
# try:
# from doctr.models import ocr_predictor
# from doctr.io import DocumentFile
# mistral_ocr = ocr_predictor(pretrained=True)
# use_mistral_ocr = True
# except ImportError:
# mistral_ocr = None
# use_mistral_ocr = False
# # Environment paths
# os.environ.setdefault("HOME", "/app")
# os.environ.setdefault("PADDLEOCR_HOME", "/app/.paddleocr")
# # Logging
# logging.basicConfig(level=logging.INFO)
# logger = logging.getLogger(__name__)
# # PaddleOCR
# ocr = PaddleOCR(use_angle_cls=True, lang='en')
# def clean_text(text):
# return re.sub(r'\s+', ' ', text).strip()
# def auto_rotate_image(pil_img):
# """Auto-rotate PIL image safely."""
# if pil_img.mode != "RGB":
# pil_img = pil_img.convert("RGB")
# img_cv = cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2GRAY)
# coords = np.column_stack(np.where(img_cv > 0))
# if coords.size == 0:
# return pil_img # blank page
# angle = cv2.minAreaRect(coords)[-1]
# angle = -(90 + angle) if angle < -45 else -angle
# (h, w) = img_cv.shape[:2]
# M = cv2.getRotationMatrix2D((w // 2, h // 2), angle, 1.0)
# rotated = cv2.warpAffine(img_cv, M, (w, h),
# flags=cv2.INTER_CUBIC,
# borderMode=cv2.BORDER_REPLICATE)
# return Image.fromarray(cv2.cvtColor(rotated, cv2.COLOR_GRAY2RGB))
# def extract_images_with_fitz(pdf_path, start_page=1, end_page=None):
# images = []
# try:
# doc = fitz.open(pdf_path)
# total_pages = len(doc)
# end = min(end_page or total_pages, total_pages)
# for i in range(start_page - 1, end):
# try:
# page = doc[i]
# pix = page.get_pixmap(matrix=fitz.Matrix(2, 2))
# mode = "RGBA" if pix.alpha else "RGB"
# img = Image.frombytes(mode, [pix.width, pix.height], pix.samples)
# images.append((i + 1, img))
# except Exception as e:
# logger.error(f"Error rendering page {i + 1}: {e}")
# doc.close()
# except Exception as e:
# logger.error(f"Failed to open PDF file: {e}")
# return images
# def extract_text_from_file(file, start_page=None, end_page=None, filename=None):
# ext = os.path.splitext(filename or "")[-1].lower()
# result = []
# if ext == ".pdf":
# try:
# doc = fitz.open(file.name)
# except Exception as e:
# logger.error(f"Cannot open PDF {filename}: {e}")
# return "[Error opening PDF]"
# images = extract_images_with_fitz(file.name, start_page or 1, end_page)
# total_pages = len(doc)
# start = max(start_page or 1, 1)
# end = min(end_page or total_pages, total_pages)
# for i, page in enumerate(doc):
# page_num = i + 1
# if not (start <= page_num <= end):
# continue
# text = page.get_text()
# if text.strip():
# result.append(f"Page {page_num} (Extracted):\n{clean_text(text)}")
# else:
# if i < len(images):
# try:
# img = auto_rotate_image(images[i][1])
# img_np = np.array(img)
# ocr_text = ""
# # PaddleOCR
# try:
# ocr_result = ocr.ocr(img_np, cls=True)
# ocr_text = "\n".join([line[1][0] for line in ocr_result[0]]) if ocr_result else ""
# except Exception as e:
# logger.warning(f"PaddleOCR failed on page {page_num}: {e}")
# # Mistral OCR fallback
# if not ocr_text and use_mistral_ocr:
# try:
# doc_img = DocumentFile.from_images(img)
# ocr_text = mistral_ocr(doc_img).render()
# except Exception as e:
# logger.warning(f"Mistral OCR failed on page {page_num}: {e}")
# ocr_text = "[OCR Error]"
# result.append(f"Page {page_num} (OCR):\n{clean_text(ocr_text) or '[No OCR Text]'}")
# except Exception as e:
# logger.error(f"OCR processing failed for page {page_num}: {e}")
# result.append(f"Page {page_num}: [OCR Error]")
# else:
# result.append(f"Page {page_num}: [No text or image]")
# doc.close()
# return "\n\n".join(result)
# elif ext == ".docx":
# from docx.api import Document
# doc = Document(file.name)
# paras = [p.text for p in doc.paragraphs if p.text.strip()]
# page_texts = []
# page_size = 500
# for i in range(0, len(paras), page_size):
# page_texts.append("\n".join(paras[i:i + page_size]))
# selected_pages = page_texts
# if start_page and end_page:
# selected_pages = page_texts[start_page - 1:end_page]
# return clean_text("\n\n".join(selected_pages))
# elif ext == ".csv":
# import pandas as pd
# try:
# return pd.read_csv(file.name).to_string(index=False)
# except Exception as e:
# logger.error(f"CSV read error: {e}")
# return "[CSV Read Error]"
# elif ext in [".xls", ".xlsx"]:
# import pandas as pd
# try:
# xl = pd.ExcelFile(file.name)
# return "\n\n".join([
# f"Sheet: {s}\n{xl.parse(s).to_string(index=False)}"
# for s in xl.sheet_names
# ])
# except Exception as e:
# logger.error(f"Excel read error: {e}")
# return "[Excel Read Error]"
# else:
# return "[Unsupported file type]"
import os
import logging
import fitz # PyMuPDF
import numpy as np
from PIL import Image
import cv2
import re
# OCR
from paddleocr import PaddleOCR
# Optional Mistral OCR
try:
from doctr.models import ocr_predictor
from doctr.io import DocumentFile
mistral_ocr = ocr_predictor(pretrained=True)
use_mistral_ocr = True
except ImportError:
mistral_ocr = None
use_mistral_ocr = False
# Environment paths
os.environ.setdefault("HOME", "/app")
os.environ.setdefault("PADDLEOCR_HOME", "/app/.paddleocr")
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize PaddleOCR correctly (no cls param at call time)
ocr = PaddleOCR(use_angle_cls=True, lang='en')
def clean_text(text: str) -> str:
return re.sub(r'\s+', ' ', text).strip()
def auto_rotate_image(pil_img):
"""Auto-rotate PIL image safely."""
if pil_img.mode != "RGB":
pil_img = pil_img.convert("RGB")
img_cv = cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2GRAY)
coords = np.column_stack(np.where(img_cv > 0))
if coords.size == 0:
return pil_img # blank page
angle = cv2.minAreaRect(coords)[-1]
angle = -(90 + angle) if angle < -45 else -angle
(h, w) = img_cv.shape[:2]
M = cv2.getRotationMatrix2D((w // 2, h // 2), angle, 1.0)
rotated = cv2.warpAffine(img_cv, M, (w, h),
flags=cv2.INTER_CUBIC,
borderMode=cv2.BORDER_REPLICATE)
return Image.fromarray(cv2.cvtColor(rotated, cv2.COLOR_GRAY2RGB))
def extract_images_with_fitz(pdf_path, start_page=1, end_page=None):
images = []
try:
doc = fitz.open(pdf_path)
total_pages = len(doc)
end = min(end_page or total_pages, total_pages)
for i in range(start_page - 1, end):
try:
page = doc[i]
pix = page.get_pixmap(matrix=fitz.Matrix(2, 2))
mode = "RGBA" if pix.alpha else "RGB"
img = Image.frombytes(mode, [pix.width, pix.height], pix.samples)
images.append((i + 1, img))
except Exception as e:
logger.error(f"Error rendering page {i + 1}: {e}")
doc.close()
except Exception as e:
logger.error(f"Failed to open PDF file: {e}")
return images
def extract_text_from_file(file, start_page=None, end_page=None, filename=None):
ext = os.path.splitext(filename or "")[-1].lower()
all_results = [] # Collect outputs from all methods
if ext == ".pdf":
try:
doc = fitz.open(file.name)
except Exception as e:
logger.error(f"Cannot open PDF {filename}: {e}")
return "[Error opening PDF]"
images = extract_images_with_fitz(file.name, start_page or 1, end_page)
total_pages = len(doc)
start = max(start_page or 1, 1)
end = min(end_page or total_pages, total_pages)
for i, page in enumerate(doc):
page_num = i + 1
if not (start <= page_num <= end):
continue
page_results = {}
# --- PyMuPDF ---
try:
text = page.get_text()
if text.strip():
page_results["PyMuPDF"] = f"Page {page_num}:\n{clean_text(text)}"
except Exception as e:
logger.warning(f"PyMuPDF failed on page {page_num}: {e}")
# --- PaddleOCR ---
paddle_text = ""
try:
if i < len(images):
img = auto_rotate_image(images[i][1])
img_np = np.array(img)
ocr_result = ocr.ocr(img_np) # ✅ FIXED (removed cls=True)
if ocr_result and len(ocr_result[0]) > 0:
paddle_text = "\n".join([line[1][0] for line in ocr_result[0]])
paddle_text = clean_text(paddle_text)
except Exception as e:
logger.warning(f"PaddleOCR failed on page {page_num}: {e}")
if paddle_text:
page_results["PaddleOCR"] = f"Page {page_num}:\n{paddle_text}"
# --- MistralOCR ---
mistral_text = ""
if use_mistral_ocr and i < len(images):
try:
doc_img = DocumentFile.from_images(images[i][1])
mistral_text = mistral_ocr(doc_img).render()
mistral_text = clean_text(mistral_text)
except Exception as e:
logger.warning(f"Mistral OCR failed on page {page_num}: {e}")
if mistral_text:
page_results["MistralOCR"] = f"Page {page_num}:\n{mistral_text}"
# Append collected method outputs for this page
combined_output = []
for method, out in page_results.items():
combined_output.append(f"===== Method: {method} =====\n{out}")
if combined_output:
all_results.append("\n".join(combined_output))
else:
all_results.append(f"Page {page_num}: [No text extracted by any method]")
doc.close()
return "\n\n".join(all_results)
elif ext == ".docx":
from docx.api import Document
doc = Document(file.name)
paras = [p.text for p in doc.paragraphs if p.text.strip()]
page_texts = []
page_size = 500
for i in range(0, len(paras), page_size):
page_texts.append("\n".join(paras[i:i + page_size]))
selected_pages = page_texts
if start_page and end_page:
selected_pages = page_texts[start_page - 1:end_page]
return clean_text("\n\n".join(selected_pages))
elif ext == ".csv":
import pandas as pd
try:
return pd.read_csv(file.name).to_string(index=False)
except Exception as e:
logger.error(f"CSV read error: {e}")
return "[CSV Read Error]"
elif ext in [".xls", ".xlsx"]:
import pandas as pd
try:
xl = pd.ExcelFile(file.name)
return "\n\n".join([
f"Sheet: {s}\n{xl.parse(s).to_string(index=False)}"
for s in xl.sheet_names
])
except Exception as e:
logger.error(f"Excel read error: {e}")
return "[Excel Read Error]"
else:
return "[Unsupported file type]"