from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.document_loaders import PyPDFLoader,UnstructuredWordDocumentLoader,TextLoader,UnstructuredHTMLLoader,UnstructuredMarkdownLoader import os from logger import logger def add_file_to_chroma(file_path, file_id, hugging_face_ef, db, logger): """Add file chunks to ChromaDB with advanced document handling.""" extension = file_path.split(".")[-1].lower() loader_map = { "pdf": PyPDFLoader, "docx": UnstructuredWordDocumentLoader, "txt": TextLoader, "html": UnstructuredHTMLLoader, "md": UnstructuredMarkdownLoader, } if extension not in loader_map: raise ValueError(f"Unsupported file type: {extension}") try: # Load document using appropriate loader loader = loader_map[extension](file_path) documents = loader.load() # Split text into chunks text_splitter = RecursiveCharacterTextSplitter( chunk_size=1500, chunk_overlap=200, length_function=len, add_start_index=True ) texts = text_splitter.split_documents(documents) # Add metadata for text in texts: text.metadata.update({ "file_id": str(file_id), "file_name": os.path.basename(file_path), "file_type": extension }) # Save to ChromaDB db.add_documents(texts, embedding=hugging_face_ef) # Clean up uploaded file if os.path.exists(file_path): os.remove(file_path) logger.info(f"Added file '{file_path}' to ChromaDB") return True except Exception as e: logger.error(f"Error processing file {file_path}: {str(e)}") if os.path.exists(file_path): os.remove(file_path) raise e def remove_file_from_chroma(file_id, db): """Remove file chunks from ChromaDB.""" try: # Get chunks for file_id results = db.get(where={"file_id": str(file_id)}) if results and results['ids']: db.delete(ids=results['ids']) return True return False except Exception as e: logger.error(f"Error removing file from ChromaDB: {str(e)}") return False def generate_query_response(query, db, llm_model, PROMPT_TEMPLATE): """Generate response for a query using the documents in ChromaDB.""" try: # Search for relevant documents with scores top_related = db.similarity_search_with_relevance_scores(query, k=4) # Check relevance of top result is_relevant = top_related[0][1] >= 0.4 if top_related else False # Build context from relevant chunks context = "\n".join([chunk[0].page_content for chunk in top_related]) # Generate response using the LLM prompt = PROMPT_TEMPLATE.format(context=context, query=query) answer = llm_model.generate_content(prompt).text logger.info(f"Query : {query}\nResponse:{answer}") # Prepare response with sources return { "is_relevant": is_relevant, "answer": answer, "sources": [{ "page_content": chunk[0].page_content, "score": chunk[1], "metadata": chunk[0].metadata } for chunk in top_related] } except Exception as e: logger.error(f"Error generating response: {str(e)}") return { "is_relevant": False, "answer": "An error occurred while processing your query.", "error": str(e) }