| # import os | |
| # import logging | |
| # import fitz # PyMuPDF | |
| # import numpy as np | |
| # from PIL import Image | |
| # import cv2 | |
| # import re | |
| # # OCR | |
| # from paddleocr import PaddleOCR | |
| # # Optional Mistral OCR | |
| # try: | |
| # from doctr.models import ocr_predictor | |
| # from doctr.io import DocumentFile | |
| # mistral_ocr = ocr_predictor(pretrained=True) | |
| # use_mistral_ocr = True | |
| # except ImportError: | |
| # mistral_ocr = None | |
| # use_mistral_ocr = False | |
| # # Environment paths | |
| # os.environ.setdefault("HOME", "/app") | |
| # os.environ.setdefault("PADDLEOCR_HOME", "/app/.paddleocr") | |
| # # Logging | |
| # logging.basicConfig(level=logging.INFO) | |
| # logger = logging.getLogger(__name__) | |
| # # PaddleOCR | |
| # ocr = PaddleOCR(use_angle_cls=True, lang='en') | |
| # def clean_text(text): | |
| # return re.sub(r'\s+', ' ', text).strip() | |
| # def auto_rotate_image(pil_img): | |
| # """Auto-rotate PIL image safely.""" | |
| # if pil_img.mode != "RGB": | |
| # pil_img = pil_img.convert("RGB") | |
| # img_cv = cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2GRAY) | |
| # coords = np.column_stack(np.where(img_cv > 0)) | |
| # if coords.size == 0: | |
| # return pil_img # blank page | |
| # angle = cv2.minAreaRect(coords)[-1] | |
| # angle = -(90 + angle) if angle < -45 else -angle | |
| # (h, w) = img_cv.shape[:2] | |
| # M = cv2.getRotationMatrix2D((w // 2, h // 2), angle, 1.0) | |
| # rotated = cv2.warpAffine(img_cv, M, (w, h), | |
| # flags=cv2.INTER_CUBIC, | |
| # borderMode=cv2.BORDER_REPLICATE) | |
| # return Image.fromarray(cv2.cvtColor(rotated, cv2.COLOR_GRAY2RGB)) | |
| # def extract_images_with_fitz(pdf_path, start_page=1, end_page=None): | |
| # images = [] | |
| # try: | |
| # doc = fitz.open(pdf_path) | |
| # total_pages = len(doc) | |
| # end = min(end_page or total_pages, total_pages) | |
| # for i in range(start_page - 1, end): | |
| # try: | |
| # page = doc[i] | |
| # pix = page.get_pixmap(matrix=fitz.Matrix(2, 2)) | |
| # mode = "RGBA" if pix.alpha else "RGB" | |
| # img = Image.frombytes(mode, [pix.width, pix.height], pix.samples) | |
| # images.append((i + 1, img)) | |
| # except Exception as e: | |
| # logger.error(f"Error rendering page {i + 1}: {e}") | |
| # doc.close() | |
| # except Exception as e: | |
| # logger.error(f"Failed to open PDF file: {e}") | |
| # return images | |
| # def extract_text_from_file(file, start_page=None, end_page=None, filename=None): | |
| # ext = os.path.splitext(filename or "")[-1].lower() | |
| # result = [] | |
| # if ext == ".pdf": | |
| # try: | |
| # doc = fitz.open(file.name) | |
| # except Exception as e: | |
| # logger.error(f"Cannot open PDF {filename}: {e}") | |
| # return "[Error opening PDF]" | |
| # images = extract_images_with_fitz(file.name, start_page or 1, end_page) | |
| # total_pages = len(doc) | |
| # start = max(start_page or 1, 1) | |
| # end = min(end_page or total_pages, total_pages) | |
| # for i, page in enumerate(doc): | |
| # page_num = i + 1 | |
| # if not (start <= page_num <= end): | |
| # continue | |
| # text = page.get_text() | |
| # if text.strip(): | |
| # result.append(f"Page {page_num} (Extracted):\n{clean_text(text)}") | |
| # else: | |
| # if i < len(images): | |
| # try: | |
| # img = auto_rotate_image(images[i][1]) | |
| # img_np = np.array(img) | |
| # ocr_text = "" | |
| # # PaddleOCR | |
| # try: | |
| # ocr_result = ocr.ocr(img_np, cls=True) | |
| # ocr_text = "\n".join([line[1][0] for line in ocr_result[0]]) if ocr_result else "" | |
| # except Exception as e: | |
| # logger.warning(f"PaddleOCR failed on page {page_num}: {e}") | |
| # # Mistral OCR fallback | |
| # if not ocr_text and use_mistral_ocr: | |
| # try: | |
| # doc_img = DocumentFile.from_images(img) | |
| # ocr_text = mistral_ocr(doc_img).render() | |
| # except Exception as e: | |
| # logger.warning(f"Mistral OCR failed on page {page_num}: {e}") | |
| # ocr_text = "[OCR Error]" | |
| # result.append(f"Page {page_num} (OCR):\n{clean_text(ocr_text) or '[No OCR Text]'}") | |
| # except Exception as e: | |
| # logger.error(f"OCR processing failed for page {page_num}: {e}") | |
| # result.append(f"Page {page_num}: [OCR Error]") | |
| # else: | |
| # result.append(f"Page {page_num}: [No text or image]") | |
| # doc.close() | |
| # return "\n\n".join(result) | |
| # elif ext == ".docx": | |
| # from docx.api import Document | |
| # doc = Document(file.name) | |
| # paras = [p.text for p in doc.paragraphs if p.text.strip()] | |
| # page_texts = [] | |
| # page_size = 500 | |
| # for i in range(0, len(paras), page_size): | |
| # page_texts.append("\n".join(paras[i:i + page_size])) | |
| # selected_pages = page_texts | |
| # if start_page and end_page: | |
| # selected_pages = page_texts[start_page - 1:end_page] | |
| # return clean_text("\n\n".join(selected_pages)) | |
| # elif ext == ".csv": | |
| # import pandas as pd | |
| # try: | |
| # return pd.read_csv(file.name).to_string(index=False) | |
| # except Exception as e: | |
| # logger.error(f"CSV read error: {e}") | |
| # return "[CSV Read Error]" | |
| # elif ext in [".xls", ".xlsx"]: | |
| # import pandas as pd | |
| # try: | |
| # xl = pd.ExcelFile(file.name) | |
| # return "\n\n".join([ | |
| # f"Sheet: {s}\n{xl.parse(s).to_string(index=False)}" | |
| # for s in xl.sheet_names | |
| # ]) | |
| # except Exception as e: | |
| # logger.error(f"Excel read error: {e}") | |
| # return "[Excel Read Error]" | |
| # else: | |
| # return "[Unsupported file type]" | |
| import os | |
| import logging | |
| import fitz # PyMuPDF | |
| import numpy as np | |
| from PIL import Image | |
| import cv2 | |
| import re | |
| # OCR | |
| from paddleocr import PaddleOCR | |
| # Optional Mistral OCR | |
| try: | |
| from doctr.models import ocr_predictor | |
| from doctr.io import DocumentFile | |
| mistral_ocr = ocr_predictor(pretrained=True) | |
| use_mistral_ocr = True | |
| except ImportError: | |
| mistral_ocr = None | |
| use_mistral_ocr = False | |
| # Environment paths | |
| os.environ.setdefault("HOME", "/app") | |
| os.environ.setdefault("PADDLEOCR_HOME", "/app/.paddleocr") | |
| # Logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Initialize PaddleOCR correctly (no cls param at call time) | |
| ocr = PaddleOCR(use_angle_cls=True, lang='en') | |
| def clean_text(text: str) -> str: | |
| return re.sub(r'\s+', ' ', text).strip() | |
| def auto_rotate_image(pil_img): | |
| """Auto-rotate PIL image safely.""" | |
| if pil_img.mode != "RGB": | |
| pil_img = pil_img.convert("RGB") | |
| img_cv = cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2GRAY) | |
| coords = np.column_stack(np.where(img_cv > 0)) | |
| if coords.size == 0: | |
| return pil_img # blank page | |
| angle = cv2.minAreaRect(coords)[-1] | |
| angle = -(90 + angle) if angle < -45 else -angle | |
| (h, w) = img_cv.shape[:2] | |
| M = cv2.getRotationMatrix2D((w // 2, h // 2), angle, 1.0) | |
| rotated = cv2.warpAffine(img_cv, M, (w, h), | |
| flags=cv2.INTER_CUBIC, | |
| borderMode=cv2.BORDER_REPLICATE) | |
| return Image.fromarray(cv2.cvtColor(rotated, cv2.COLOR_GRAY2RGB)) | |
| def extract_images_with_fitz(pdf_path, start_page=1, end_page=None): | |
| images = [] | |
| try: | |
| doc = fitz.open(pdf_path) | |
| total_pages = len(doc) | |
| end = min(end_page or total_pages, total_pages) | |
| for i in range(start_page - 1, end): | |
| try: | |
| page = doc[i] | |
| pix = page.get_pixmap(matrix=fitz.Matrix(2, 2)) | |
| mode = "RGBA" if pix.alpha else "RGB" | |
| img = Image.frombytes(mode, [pix.width, pix.height], pix.samples) | |
| images.append((i + 1, img)) | |
| except Exception as e: | |
| logger.error(f"Error rendering page {i + 1}: {e}") | |
| doc.close() | |
| except Exception as e: | |
| logger.error(f"Failed to open PDF file: {e}") | |
| return images | |
| def extract_text_from_file(file, start_page=None, end_page=None, filename=None): | |
| ext = os.path.splitext(filename or "")[-1].lower() | |
| all_results = [] # Collect outputs from all methods | |
| if ext == ".pdf": | |
| try: | |
| doc = fitz.open(file.name) | |
| except Exception as e: | |
| logger.error(f"Cannot open PDF {filename}: {e}") | |
| return "[Error opening PDF]" | |
| images = extract_images_with_fitz(file.name, start_page or 1, end_page) | |
| total_pages = len(doc) | |
| start = max(start_page or 1, 1) | |
| end = min(end_page or total_pages, total_pages) | |
| for i, page in enumerate(doc): | |
| page_num = i + 1 | |
| if not (start <= page_num <= end): | |
| continue | |
| page_results = {} | |
| # --- PyMuPDF --- | |
| try: | |
| text = page.get_text() | |
| if text.strip(): | |
| page_results["PyMuPDF"] = f"Page {page_num}:\n{clean_text(text)}" | |
| except Exception as e: | |
| logger.warning(f"PyMuPDF failed on page {page_num}: {e}") | |
| # --- PaddleOCR --- | |
| paddle_text = "" | |
| try: | |
| if i < len(images): | |
| img = auto_rotate_image(images[i][1]) | |
| img_np = np.array(img) | |
| ocr_result = ocr.ocr(img_np) # ✅ FIXED (removed cls=True) | |
| if ocr_result and len(ocr_result[0]) > 0: | |
| paddle_text = "\n".join([line[1][0] for line in ocr_result[0]]) | |
| paddle_text = clean_text(paddle_text) | |
| except Exception as e: | |
| logger.warning(f"PaddleOCR failed on page {page_num}: {e}") | |
| if paddle_text: | |
| page_results["PaddleOCR"] = f"Page {page_num}:\n{paddle_text}" | |
| # --- MistralOCR --- | |
| mistral_text = "" | |
| if use_mistral_ocr and i < len(images): | |
| try: | |
| doc_img = DocumentFile.from_images(images[i][1]) | |
| mistral_text = mistral_ocr(doc_img).render() | |
| mistral_text = clean_text(mistral_text) | |
| except Exception as e: | |
| logger.warning(f"Mistral OCR failed on page {page_num}: {e}") | |
| if mistral_text: | |
| page_results["MistralOCR"] = f"Page {page_num}:\n{mistral_text}" | |
| # Append collected method outputs for this page | |
| combined_output = [] | |
| for method, out in page_results.items(): | |
| combined_output.append(f"===== Method: {method} =====\n{out}") | |
| if combined_output: | |
| all_results.append("\n".join(combined_output)) | |
| else: | |
| all_results.append(f"Page {page_num}: [No text extracted by any method]") | |
| doc.close() | |
| return "\n\n".join(all_results) | |
| elif ext == ".docx": | |
| from docx.api import Document | |
| doc = Document(file.name) | |
| paras = [p.text for p in doc.paragraphs if p.text.strip()] | |
| page_texts = [] | |
| page_size = 500 | |
| for i in range(0, len(paras), page_size): | |
| page_texts.append("\n".join(paras[i:i + page_size])) | |
| selected_pages = page_texts | |
| if start_page and end_page: | |
| selected_pages = page_texts[start_page - 1:end_page] | |
| return clean_text("\n\n".join(selected_pages)) | |
| elif ext == ".csv": | |
| import pandas as pd | |
| try: | |
| return pd.read_csv(file.name).to_string(index=False) | |
| except Exception as e: | |
| logger.error(f"CSV read error: {e}") | |
| return "[CSV Read Error]" | |
| elif ext in [".xls", ".xlsx"]: | |
| import pandas as pd | |
| try: | |
| xl = pd.ExcelFile(file.name) | |
| return "\n\n".join([ | |
| f"Sheet: {s}\n{xl.parse(s).to_string(index=False)}" | |
| for s in xl.sheet_names | |
| ]) | |
| except Exception as e: | |
| logger.error(f"Excel read error: {e}") | |
| return "[Excel Read Error]" | |
| else: | |
| return "[Unsupported file type]" | |