| | |
| | from typing import List |
| | from langchain.vectorstores import Chroma |
| | from langchain.retrievers.multi_vector import MultiVectorRetriever |
| | from langchain.storage import InMemoryStore |
| | import uuid |
| | from langchain.document_loaders import TextLoader, DirectoryLoader |
| | import os |
| | from sentence_transformers.cross_encoder import CrossEncoder |
| | import numpy as np |
| | from langchain.schema import BaseRetriever, Document |
| | from typing import List |
| | from langchain.callbacks.manager import CallbackManagerForRetrieverRun |
| | from langchain.vectorstores import VectorStore |
| | from langchain.load import dumps, loads |
| | from typing import Any |
| |
|
| |
|
| | def load_data(data_path): |
| | folders = os.listdir(data_path) |
| | dir_loaders = [] |
| | loaded_documents = [] |
| |
|
| | for folder in folders: |
| | dir_loader = DirectoryLoader(os.path.join(data_path, folder), loader_cls=TextLoader) |
| | dir_loaders.append(dir_loader) |
| |
|
| | for dir_loader in dir_loaders: |
| | loaded_documents.extend(dir_loader.load()) |
| |
|
| | return loaded_documents |
| |
|
| | def process_data(data: List[str], child_text_splitter, embedding, vectorstore_name: str) -> MultiVectorRetriever: |
| |
|
| | |
| | vectorstore = Chroma( |
| | collection_name=vectorstore_name, |
| | embedding_function=embedding, |
| | |
| | ) |
| |
|
| | |
| | store = InMemoryStore() |
| | id_key = "doc_id" |
| |
|
| | |
| | retriever = MultiVectorRetriever( |
| | vectorstore=vectorstore, |
| | docstore=store, |
| | id_key=id_key, |
| | search_kwargs={"k": 25} |
| | ) |
| |
|
| | doc_ids = [str(uuid.uuid4()) for _ in data] |
| | sub_docs = [] |
| |
|
| | for i, doc in enumerate(data): |
| | _id = doc_ids[i] |
| | _sub_docs = child_text_splitter.split_documents([doc]) |
| | for _doc in _sub_docs: |
| | _doc.metadata[id_key] = _id |
| | sub_docs.extend(_sub_docs) |
| |
|
| | retriever.vectorstore.add_documents(sub_docs) |
| | retriever.docstore.mset(list(zip(doc_ids, data))) |
| |
|
| | return vectorstore, retriever |
| |
|
| | class CustomRetriever(BaseRetriever): |
| | |
| | retriever:Any |
| |
|
| | def reciprocal_rank_fusion(self, results: list[list], k=60): |
| | """ Reciprocal_rank_fusion that takes multiple lists of ranked documents |
| | and an optional parameter k used in the RRF formula """ |
| |
|
| | |
| | fused_scores = {} |
| |
|
| | |
| | for docs in results: |
| | |
| | for rank, doc in enumerate(docs): |
| | |
| | doc_str = dumps(doc) |
| | |
| | if doc_str not in fused_scores: |
| | fused_scores[doc_str] = 0 |
| | |
| | previous_score = fused_scores[doc_str] |
| | |
| | fused_scores[doc_str] += 1 / (rank + k) |
| |
|
| | |
| | reranked_results = [ |
| | (loads(doc), score) |
| | for doc, score in sorted(fused_scores.items(), key=lambda x: x[1], reverse=True) |
| | ] |
| |
|
| | |
| | rr_list=[] |
| | for doc in reranked_results: |
| | rr_list.append(doc[0]) |
| | return rr_list |
| |
|
| |
|
| | def _get_relevant_documents( |
| | self, queries: list, *, run_manager: CallbackManagerForRetrieverRun |
| | ) -> List[Document]: |
| | |
| | documents=[] |
| | for i in range(len(queries)): |
| | document = self.retriever.get_relevant_documents(queries[i], callbacks=run_manager.get_child()) |
| | documents.append(document) |
| |
|
| | unique_documents = self.reciprocal_rank_fusion(documents) |
| |
|
| | |
| | docs_content = [] |
| | for i in range(len(unique_documents)): |
| | docs_content.append(unique_documents[i].page_content) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | model = CrossEncoder('nnngoc/ms-marco-MiniLM-L-6-v2-32-6M-1') |
| |
|
| | |
| | sentence_combinations = [[queries[0], document] for document in docs_content] |
| |
|
| | |
| | similarity_scores = model.predict(sentence_combinations) |
| |
|
| | |
| | sim_scores_argsort = reversed(np.argsort(similarity_scores)) |
| |
|
| | |
| | docs = [] |
| | for idx in sim_scores_argsort: |
| | docs.append(unique_documents[idx]) |
| |
|
| | docs_top_10 = docs[0:10] |
| |
|
| | return docs_top_10 |
| | |
| |
|
| | import cohere |
| | COHERE_API_KEY = 'axMzubIv9l3UTObYnIaHuZhE6tR3Nj8eGReXTws9' |
| |
|
| | class CustomRetriever1(BaseRetriever): |
| | |
| | retriever:Any |
| |
|
| | def reciprocal_rank_fusion(self, results: list[list], k=60): |
| | """ Reciprocal_rank_fusion that takes multiple lists of ranked documents |
| | and an optional parameter k used in the RRF formula """ |
| |
|
| | |
| | fused_scores = {} |
| |
|
| | |
| | for docs in results: |
| | |
| | for rank, doc in enumerate(docs): |
| | |
| | doc_str = dumps(doc) |
| | |
| | if doc_str not in fused_scores: |
| | fused_scores[doc_str] = 0 |
| | |
| | previous_score = fused_scores[doc_str] |
| | |
| | fused_scores[doc_str] += 1 / (rank + k) |
| |
|
| | |
| | reranked_results = [ |
| | (loads(doc), score) |
| | for doc, score in sorted(fused_scores.items(), key=lambda x: x[1], reverse=True) |
| | ] |
| |
|
| | |
| | rr_list=[] |
| | for doc in reranked_results: |
| | rr_list.append(doc[0]) |
| | return rr_list[:30] |
| |
|
| | def _get_relevant_documents( |
| | self, queries: list, *, run_manager: CallbackManagerForRetrieverRun |
| | ) -> List[Document]: |
| | |
| | documents=[] |
| | for i in range(len(queries)): |
| | document = self.retriever.get_relevant_documents(queries[i], callbacks=run_manager.get_child()) |
| | documents.append(document) |
| |
|
| | unique_documents = self.reciprocal_rank_fusion(documents) |
| |
|
| | |
| | docs_content = [] |
| | for i in range(len(unique_documents)): |
| | docs_content.append(unique_documents[i].page_content) |
| |
|
| | co = cohere.Client(COHERE_API_KEY) |
| | results = co.rerank(query=queries[0], documents=docs_content, top_n=10, model='rerank-multilingual-v3.0', return_documents=True) |
| |
|
| | reranked_indices = [result.index for result in results.results] |
| |
|
| | sorted_documents = [unique_documents[idx] for idx in reranked_indices] |
| |
|
| | return sorted_documents |