| from typing import BinaryIO, Optional |
| from langchain_community.document_loaders import PyPDFLoader, TextLoader |
| from langchain.text_splitter import RecursiveCharacterTextSplitter |
| import tempfile |
| import os |
| from docx import Document |
|
|
| from src.utils.config import config |
| from src.utils.logger import log_error |
| from src.models.document import DocumentType |
|
|
|
|
| class DocumentProcessor: |
| def __init__(self): |
| self.text_splitter = RecursiveCharacterTextSplitter( |
| chunk_size=1000, |
| chunk_overlap=200, |
| length_function=len, |
| ) |
|
|
| def extract_text_from_pdf(self, file_content: bytes, filename: str) -> str: |
| """Extract text from PDF using LangChain PyPDFLoader.""" |
| try: |
| |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file: |
| temp_file.write(file_content) |
| temp_file_path = temp_file.name |
|
|
| |
| loader = PyPDFLoader(temp_file_path) |
| documents = loader.load() |
|
|
| |
| text = "\n".join([doc.page_content for doc in documents]) |
|
|
| |
| os.unlink(temp_file_path) |
|
|
| return text |
|
|
| except Exception as e: |
| log_error(f"Error extracting text from PDF: {str(e)}") |
| return "" |
|
|
| def extract_text_from_txt(self, file_content: bytes, filename: str) -> str: |
| """Extract text from TXT file.""" |
| try: |
| |
| encodings = ["utf-8", "utf-16", "latin-1", "cp1252"] |
|
|
| for encoding in encodings: |
| try: |
| text = file_content.decode(encoding) |
| return text |
| except UnicodeDecodeError: |
| continue |
|
|
| |
| return file_content.decode("utf-8", errors="ignore") |
|
|
| except Exception as e: |
| log_error(f"Error extracting text from TXT: {str(e)}") |
| return "" |
|
|
| def extract_text_from_docx(self, file_content: bytes, filename: str) -> str: |
| """Extract text from DOCX file.""" |
| try: |
| |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".docx") as temp_file: |
| temp_file.write(file_content) |
| temp_file_path = temp_file.name |
|
|
| |
| from docx import Document as DocxDocument |
|
|
| doc = DocxDocument(temp_file_path) |
|
|
| |
| text_parts = [] |
| for paragraph in doc.paragraphs: |
| if paragraph.text.strip(): |
| text_parts.append(paragraph.text) |
|
|
| |
| for table in doc.tables: |
| for row in table.rows: |
| for cell in row.cells: |
| if cell.text.strip(): |
| text_parts.append(cell.text) |
|
|
| |
| os.unlink(temp_file_path) |
|
|
| |
| full_text = "\n".join(text_parts) |
| return full_text |
|
|
| except Exception as e: |
| log_error(f"Error extracting text from DOCX: {str(e)}") |
| return "" |
|
|
| def extract_text(self, file_content: bytes, filename: str) -> str: |
| """Extract text based on file extension.""" |
| file_ext = filename.lower().split(".")[-1] |
|
|
| if file_ext == "pdf": |
| return self.extract_text_from_pdf(file_content, filename) |
| elif file_ext == "txt": |
| return self.extract_text_from_txt(file_content, filename) |
| elif file_ext in ["docx", "doc"]: |
| return self.extract_text_from_docx(file_content, filename) |
| else: |
| log_error(f"Unsupported file type: {file_ext}") |
| return "" |
|
|
| def split_text_into_chunks(self, text: str) -> list: |
| """Split text into manageable chunks for processing.""" |
| return self.text_splitter.split_text(text) |
|
|
| def detect_document_type(self, text: str) -> DocumentType: |
| """Detect document type based on content.""" |
| text_lower = text.lower() |
|
|
| |
| rental_keywords = [ |
| "lease", |
| "rent", |
| "tenant", |
| "landlord", |
| "property", |
| "premises", |
| "deposit", |
| ] |
|
|
| |
| loan_keywords = [ |
| "loan", |
| "borrow", |
| "lender", |
| "principal", |
| "interest", |
| "repayment", |
| "credit", |
| ] |
|
|
| |
| employment_keywords = [ |
| "employment", |
| "employee", |
| "employer", |
| "salary", |
| "wages", |
| "position", |
| "job", |
| ] |
|
|
| |
| nda_keywords = ["confidential", "non-disclosure", "proprietary", "trade secret"] |
|
|
| |
| service_keywords = [ |
| "service", |
| "provider", |
| "client", |
| "deliverables", |
| "scope of work", |
| ] |
|
|
| |
| scores = { |
| DocumentType.RENTAL: sum( |
| 1 for keyword in rental_keywords if keyword in text_lower |
| ), |
| DocumentType.LOAN: sum( |
| 1 for keyword in loan_keywords if keyword in text_lower |
| ), |
| DocumentType.EMPLOYMENT: sum( |
| 1 for keyword in employment_keywords if keyword in text_lower |
| ), |
| DocumentType.NDA: sum( |
| 1 for keyword in nda_keywords if keyword in text_lower |
| ), |
| DocumentType.SERVICE: sum( |
| 1 for keyword in service_keywords if keyword in text_lower |
| ), |
| } |
|
|
| |
| if max(scores.values()) > 2: |
| return max(scores, key=scores.get) |
| else: |
| return DocumentType.OTHER |
|
|
| def extract_metadata(self, text: str) -> dict: |
| """Extract metadata from document text.""" |
| metadata = { |
| "word_count": len(text.split()), |
| "character_count": len(text), |
| "estimated_reading_time": len(text.split()) // 200, |
| } |
|
|
| return metadata |
|
|