| | import math |
| | from collections import Counter |
| | from typing import Optional |
| |
|
| | import numpy as np |
| |
|
| | from core.model_manager import ModelManager |
| | from core.model_runtime.entities.model_entities import ModelType |
| | from core.rag.datasource.keyword.jieba.jieba_keyword_table_handler import JiebaKeywordTableHandler |
| | from core.rag.embedding.cached_embedding import CacheEmbedding |
| | from core.rag.models.document import Document |
| | from core.rag.rerank.entity.weight import VectorSetting, Weights |
| | from core.rag.rerank.rerank_base import BaseRerankRunner |
| |
|
| |
|
| | class WeightRerankRunner(BaseRerankRunner): |
| | def __init__(self, tenant_id: str, weights: Weights) -> None: |
| | self.tenant_id = tenant_id |
| | self.weights = weights |
| |
|
| | def run( |
| | self, |
| | query: str, |
| | documents: list[Document], |
| | score_threshold: Optional[float] = None, |
| | top_n: Optional[int] = None, |
| | user: Optional[str] = None, |
| | ) -> list[Document]: |
| | """ |
| | Run rerank model |
| | :param query: search query |
| | :param documents: documents for reranking |
| | :param score_threshold: score threshold |
| | :param top_n: top n |
| | :param user: unique user id if needed |
| | |
| | :return: |
| | """ |
| | docs = [] |
| | doc_id = [] |
| | unique_documents = [] |
| | for document in documents: |
| | if document.metadata["doc_id"] not in doc_id: |
| | doc_id.append(document.metadata["doc_id"]) |
| | docs.append(document.page_content) |
| | unique_documents.append(document) |
| |
|
| | documents = unique_documents |
| |
|
| | rerank_documents = [] |
| | query_scores = self._calculate_keyword_score(query, documents) |
| |
|
| | query_vector_scores = self._calculate_cosine(self.tenant_id, query, documents, self.weights.vector_setting) |
| | for document, query_score, query_vector_score in zip(documents, query_scores, query_vector_scores): |
| | |
| | score = ( |
| | self.weights.vector_setting.vector_weight * query_vector_score |
| | + self.weights.keyword_setting.keyword_weight * query_score |
| | ) |
| | if score_threshold and score < score_threshold: |
| | continue |
| | document.metadata["score"] = score |
| | rerank_documents.append(document) |
| | rerank_documents = sorted(rerank_documents, key=lambda x: x.metadata["score"], reverse=True) |
| | return rerank_documents[:top_n] if top_n else rerank_documents |
| |
|
| | def _calculate_keyword_score(self, query: str, documents: list[Document]) -> list[float]: |
| | """ |
| | Calculate BM25 scores |
| | :param query: search query |
| | :param documents: documents for reranking |
| | |
| | :return: |
| | """ |
| | keyword_table_handler = JiebaKeywordTableHandler() |
| | query_keywords = keyword_table_handler.extract_keywords(query, None) |
| | documents_keywords = [] |
| | for document in documents: |
| | |
| | document_keywords = keyword_table_handler.extract_keywords(document.page_content, None) |
| | document.metadata["keywords"] = document_keywords |
| | documents_keywords.append(document_keywords) |
| |
|
| | |
| | query_keyword_counts = Counter(query_keywords) |
| |
|
| | |
| | total_documents = len(documents) |
| |
|
| | |
| | all_keywords = set() |
| | for document_keywords in documents_keywords: |
| | all_keywords.update(document_keywords) |
| |
|
| | keyword_idf = {} |
| | for keyword in all_keywords: |
| | |
| | doc_count_containing_keyword = sum(1 for doc_keywords in documents_keywords if keyword in doc_keywords) |
| | |
| | keyword_idf[keyword] = math.log((1 + total_documents) / (1 + doc_count_containing_keyword)) + 1 |
| |
|
| | query_tfidf = {} |
| |
|
| | for keyword, count in query_keyword_counts.items(): |
| | tf = count |
| | idf = keyword_idf.get(keyword, 0) |
| | query_tfidf[keyword] = tf * idf |
| |
|
| | |
| | documents_tfidf = [] |
| | for document_keywords in documents_keywords: |
| | document_keyword_counts = Counter(document_keywords) |
| | document_tfidf = {} |
| | for keyword, count in document_keyword_counts.items(): |
| | tf = count |
| | idf = keyword_idf.get(keyword, 0) |
| | document_tfidf[keyword] = tf * idf |
| | documents_tfidf.append(document_tfidf) |
| |
|
| | def cosine_similarity(vec1, vec2): |
| | intersection = set(vec1.keys()) & set(vec2.keys()) |
| | numerator = sum(vec1[x] * vec2[x] for x in intersection) |
| |
|
| | sum1 = sum(vec1[x] ** 2 for x in vec1) |
| | sum2 = sum(vec2[x] ** 2 for x in vec2) |
| | denominator = math.sqrt(sum1) * math.sqrt(sum2) |
| |
|
| | if not denominator: |
| | return 0.0 |
| | else: |
| | return float(numerator) / denominator |
| |
|
| | similarities = [] |
| | for document_tfidf in documents_tfidf: |
| | similarity = cosine_similarity(query_tfidf, document_tfidf) |
| | similarities.append(similarity) |
| |
|
| | |
| | |
| |
|
| | return similarities |
| |
|
| | def _calculate_cosine( |
| | self, tenant_id: str, query: str, documents: list[Document], vector_setting: VectorSetting |
| | ) -> list[float]: |
| | """ |
| | Calculate Cosine scores |
| | :param query: search query |
| | :param documents: documents for reranking |
| | |
| | :return: |
| | """ |
| | query_vector_scores = [] |
| |
|
| | model_manager = ModelManager() |
| |
|
| | embedding_model = model_manager.get_model_instance( |
| | tenant_id=tenant_id, |
| | provider=vector_setting.embedding_provider_name, |
| | model_type=ModelType.TEXT_EMBEDDING, |
| | model=vector_setting.embedding_model_name, |
| | ) |
| | cache_embedding = CacheEmbedding(embedding_model) |
| | query_vector = cache_embedding.embed_query(query) |
| | for document in documents: |
| | |
| | if "score" in document.metadata: |
| | query_vector_scores.append(document.metadata["score"]) |
| | else: |
| | |
| | vec1 = np.array(query_vector) |
| | vec2 = np.array(document.vector) |
| |
|
| | |
| | dot_product = np.dot(vec1, vec2) |
| |
|
| | |
| | norm_vec1 = np.linalg.norm(vec1) |
| | norm_vec2 = np.linalg.norm(vec2) |
| |
|
| | |
| | cosine_sim = dot_product / (norm_vec1 * norm_vec2) |
| | query_vector_scores.append(cosine_sim) |
| |
|
| | return query_vector_scores |
| |
|