Spaces:
Sleeping
Sleeping
| import os | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_community.document_loaders import PyPDFLoader | |
| from langchain_core.documents import Document | |
| from embeddings.embedding_model import embedding_model | |
| from rag.chunking import split_documents | |
| from config import VECTOR_DB_PATH | |
| PDF_FOLDER = "documents/pdfs" | |
| def _ocr_pdf(pdf_path: str) -> list[Document]: | |
| """ | |
| Fallback OCR using PyMuPDF (fitz) + Tesseract. | |
| No Poppler required — PyMuPDF handles PDF-to-image conversion natively. | |
| Requires: pip install pymupdf pytesseract pillow | |
| Tesseract installed: winget install UB-Mannheim.TesseractOCR | |
| """ | |
| try: | |
| import fitz # PyMuPDF | |
| import pytesseract | |
| from PIL import Image | |
| import io | |
| except ImportError as e: | |
| print(f" [OCR] Missing package: {e}. Run: pip install pymupdf pytesseract pillow") | |
| return [] | |
| print(f" [OCR] Running Tesseract OCR on: {os.path.basename(pdf_path)}") | |
| try: | |
| pdf_doc = fitz.open(pdf_path) | |
| except Exception as e: | |
| print(f" [OCR] Could not open PDF with PyMuPDF: {e}") | |
| return [] | |
| documents = [] | |
| for i, page in enumerate(pdf_doc): | |
| # Render page to image at 200 DPI | |
| matrix = fitz.Matrix(200 / 72, 200 / 72) | |
| pix = page.get_pixmap(matrix=matrix) | |
| img_bytes = pix.tobytes("png") | |
| img = Image.open(io.BytesIO(img_bytes)) | |
| try: | |
| text = pytesseract.image_to_string(img) | |
| except Exception as e: | |
| print(f" [OCR] Tesseract failed on page {i+1}: {e}") | |
| print(" [OCR] Is Tesseract installed? Run: winget install UB-Mannheim.TesseractOCR") | |
| break | |
| char_count = len(text.strip()) | |
| print(f" [OCR] Page {i+1}: extracted {char_count} chars") | |
| if text.strip(): | |
| documents.append(Document( | |
| page_content=text, | |
| metadata={"source": pdf_path, "page": i} | |
| )) | |
| pdf_doc.close() | |
| return documents | |
| def _load_pdf(pdf_path: str) -> list[Document]: | |
| """ | |
| Load a PDF using PyPDFLoader. If all pages are empty (scanned PDF) or loader fails, | |
| automatically fall back to Tesseract OCR. | |
| """ | |
| filename = os.path.basename(pdf_path) | |
| try: | |
| loader = PyPDFLoader(pdf_path) | |
| docs = loader.load() | |
| print(f"Loaded {len(docs)} page(s) from {filename}") | |
| # Check if any page has real text | |
| text_docs = [d for d in docs if d.page_content.strip()] | |
| if text_docs: | |
| return text_docs | |
| except Exception as e: | |
| print(f" [WARNING] PyPDFLoader failed to read '{filename}': {e} — attempting OCR...") | |
| # All pages empty or loader failed → scanned PDF, try OCR | |
| print(f" [WARNING] No text extracted from '{filename}' — attempting OCR...") | |
| ocr_docs = _ocr_pdf(pdf_path) | |
| if not ocr_docs: | |
| print( | |
| f" [ERROR] OCR also failed for '{filename}'.\n" | |
| " Make sure Tesseract is installed and on PATH:\n" | |
| " winget install UB-Mannheim.TesseractOCR\n" | |
| ) | |
| return ocr_docs | |
| def _load_docx(docx_path: str) -> list[Document]: | |
| """ | |
| Load a Word (.docx) file and return a list of Documents. | |
| """ | |
| from docx import Document as DocxDocument | |
| filename = os.path.basename(docx_path) | |
| print(f"Loading Word document: {filename}") | |
| try: | |
| doc = DocxDocument(docx_path) | |
| full_text = [] | |
| for para in doc.paragraphs: | |
| if para.text.strip(): | |
| full_text.append(para.text.strip()) | |
| # Also extract text from tables | |
| for table in doc.tables: | |
| for row in table.rows: | |
| row_text = [cell.text.strip() for cell in row.cells if cell.text.strip()] | |
| if row_text: | |
| full_text.append(" | ".join(row_text)) | |
| text_content = "\n".join(full_text) | |
| if text_content.strip(): | |
| return [Document( | |
| page_content=text_content, | |
| metadata={"source": docx_path} | |
| )] | |
| except Exception as e: | |
| print(f" [ERROR] Failed to read Word file '{filename}': {e}") | |
| return [] | |
| def _load_excel(excel_path: str) -> list[Document]: | |
| """ | |
| Load an Excel (.xlsx, .xls) file using pandas and openpyxl, | |
| returning a text representation of the tables. | |
| """ | |
| import pandas as pd | |
| filename = os.path.basename(excel_path) | |
| print(f"Loading Excel spreadsheet: {filename}") | |
| try: | |
| with pd.ExcelFile(excel_path) as xls: | |
| documents = [] | |
| for sheet_name in xls.sheet_names: | |
| df = pd.read_excel(xls, sheet_name=sheet_name) | |
| if df.empty: | |
| continue | |
| # Convert sheet to string representation | |
| sheet_text = f"Sheet: {sheet_name}\n" | |
| sheet_text += df.to_string(index=False) | |
| if sheet_text.strip(): | |
| documents.append(Document( | |
| page_content=sheet_text, | |
| metadata={"source": excel_path, "sheet": sheet_name} | |
| )) | |
| return documents | |
| except Exception as e: | |
| print(f" [ERROR] Failed to read Excel file '{filename}': {e}") | |
| return [] | |
| def load_any_document(file_path: str) -> list[Document]: | |
| """ | |
| Unified loader for PDF, Word, and Excel files. | |
| """ | |
| ext = os.path.splitext(file_path)[-1].lower() | |
| if ext == ".pdf": | |
| return _load_pdf(file_path) | |
| elif ext == ".docx": | |
| return _load_docx(file_path) | |
| elif ext in [".xlsx", ".xls"]: | |
| return _load_excel(file_path) | |
| return [] | |
| def build_vector_store(): | |
| documents_folder = "documents" | |
| if not os.path.exists(documents_folder): | |
| os.makedirs(documents_folder) | |
| documents = [] | |
| # Recursively find all supported files in documents/ directory | |
| for root, dirs, files in os.walk(documents_folder): | |
| for file in files: | |
| file_path = os.path.join(root, file) | |
| # Skip checking directories and temp/lock files (e.g. ~$Doc.docx) | |
| if file.startswith("~$"): | |
| continue | |
| docs = load_any_document(file_path) | |
| if docs: | |
| documents.extend(docs) | |
| if not documents: | |
| print("No documents found or extracted in documents/ folder.") | |
| return None | |
| chunks = split_documents(documents) | |
| if not chunks: | |
| print("Chunking produced no results. Check CHUNK_SIZE / CHUNK_OVERLAP in config.py.") | |
| return None | |
| print(f"Building FAISS index from {len(chunks)} chunk(s)...") | |
| vector_db = FAISS.from_documents(chunks, embedding_model) | |
| vector_db.save_local(VECTOR_DB_PATH) | |
| print(f"FAISS vector database created and saved to '{VECTOR_DB_PATH}'.") | |
| return vector_db | |
| def load_vector_store(): | |
| index_file = os.path.join(VECTOR_DB_PATH, "index.faiss") | |
| if not os.path.exists(index_file): | |
| print("FAISS index not found.") | |
| print("Creating new vector database...") | |
| return build_vector_store() | |
| vector_db = FAISS.load_local( | |
| VECTOR_DB_PATH, | |
| embedding_model, | |
| allow_dangerous_deserialization=True | |
| ) | |
| return vector_db |