Spaces:
Build error
Build error
| # app/crud.py | |
| from langchain_community.document_loaders import PyPDFLoader, Docx2txtLoader, CSVLoader, UnstructuredExcelLoader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from sqlalchemy.future import select | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from markitdown import MarkItDown | |
| import os | |
| import logging | |
| from typing import List, Optional | |
| # from app.db.models.docs import * | |
| # from app.schemas.schemas import DocumentCreate, DocumentUpdate | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| async def load_file_with_markitdown(file_path:str, llm_client:str=None, model:str=None): | |
| if llm_client and model: | |
| markitdown = MarkItDown(llm_client, model) | |
| documents = markitdown.convert(file_path) | |
| else: | |
| markitdown = MarkItDown() | |
| documents = markitdown.convert(file_path) | |
| return documents | |
| async def load_pdf_with_langchain(file_path): | |
| """ | |
| Loads and extracts text from a PDF file using LangChain's PyPDFLoader. | |
| Parameters: | |
| file_path (str): Path to the PDF file. | |
| Returns: | |
| List[Document]: A list of LangChain Document objects with metadata. | |
| """ | |
| loader = PyPDFLoader(file_path, extract_images=True) | |
| documents = loader.load() | |
| return documents # Returns a list of Document objects | |
| async def load_file_with_langchain(file_path: str): | |
| """ | |
| Loads and extracts text from a PDF or DOCX file using LangChain's appropriate loader. | |
| Parameters: | |
| file_path (str): Path to the file (PDF or DOCX). | |
| Returns: | |
| List[Document]: A list of LangChain Document objects with metadata. | |
| """ | |
| # Determine the file extension | |
| _, file_extension = os.path.splitext(file_path) | |
| # Choose the loader based on file extension | |
| if file_extension.lower() == '.pdf': | |
| loader = PyPDFLoader(file_path) | |
| elif file_extension.lower() == '.docx': | |
| loader = Docx2txtLoader(file_path) | |
| elif file_extension.lower() == '.csv': | |
| loader = CSVLoader(file_path) | |
| elif file_extension.lower() == '.xlsx': | |
| loader = UnstructuredExcelLoader(file_path) | |
| else: | |
| raise ValueError("Unsupported file format. Please provide a PDF or DOCX file.") | |
| # Load the documents | |
| documents = loader.load() | |
| return documents | |
| async def split_documents(documents, chunk_size=10000, chunk_overlap=1000): | |
| """ | |
| Splits documents into smaller chunks with overlap. | |
| Parameters: | |
| documents (List[Document]): List of LangChain Document objects. | |
| chunk_size (int): The maximum size of each chunk. | |
| chunk_overlap (int): The number of characters to overlap between chunks. | |
| Returns: | |
| List[Document]: List of chunked Document objects. | |
| """ | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap, | |
| ) | |
| split_docs = text_splitter.split_documents(documents) | |
| return split_docs | |
| async def process_uploaded_file( | |
| id, file_path, | |
| rag_system=None, | |
| llm_client=None, | |
| llm_model=None | |
| ): | |
| try: | |
| # Load the document using LangChain | |
| documents = await load_file_with_markitdown(file_path, llm_client=llm_client, model=llm_model) | |
| logger.info(f"Loaded document: {file_path}") | |
| # Concatenate all pages to get the full document text for context generation | |
| # whole_document_content = "\n".join([doc.page_content for doc in documents]) | |
| except Exception as e: | |
| logger.error(f"Failed to load document {file_path}: {e}") | |
| raise RuntimeError(f"Error loading document: {file_path}") from e | |
| # # Generate context for each chunk if llm is provided | |
| # if llm: | |
| # for doc in split_docs: | |
| # try: | |
| # context = await llm.generate_context(doc, whole_document_content=whole_document_content) | |
| # # Add context to the beginning of the page content | |
| # doc.page_content = f"{context.replace('<|eot_id|>', '')}\n\n{doc.page_content}" | |
| # logger.info(f"Context generated and added for chunk {split_docs.index(doc)}") | |
| # except Exception as e: | |
| # logger.error(f"Failed to generate context for chunk {split_docs.index(doc)}: {e}") | |
| # raise RuntimeError(f"Error generating context for chunk {split_docs.index(doc)}") from e | |
| # Add to RAG system if rag_system is provided and load_only is False | |
| if rag_system: | |
| try: | |
| rag_system.add_document(doc_id = f"{id}_{documents.title}", text = documents.text_content, meta_data = {"source": file_path}) | |
| print(f"doc_id: {id}_{documents.title}") | |
| print(f"content: {documents.text_content}") | |
| # print(f"New Page Content: {doc.page_content}") | |
| logger.info(f"Document chunks successfully added to RAG system for file {file_path}") | |
| except Exception as e: | |
| logger.error(f"Failed to add document chunks to RAG system for {file_path}: {e}") | |
| raise RuntimeError(f"Error adding document to RAG system: {file_path}") from e | |
| else: | |
| logger.info(f"Loaded document {file_path}, but not added to RAG system") | |
| return documents | |