Spaces:
Runtime error
Runtime error
| import asyncio | |
| import logging | |
| from typing import Callable, Optional | |
| from uuid import UUID | |
| import numpy as np | |
| from ntr_fileparser import ParsedDocument | |
| from ntr_text_fragmentation import (EntitiesExtractor, EntityRepository, | |
| InjectionBuilder, InMemoryEntityRepository, LinkerEntity) | |
| from common.configuration import Configuration | |
| from components.dbo.chunk_repository import ChunkRepository | |
| from components.embedding_extraction import EmbeddingExtractor | |
| from components.llm.deepinfra_api import DeepInfraApi | |
| from components.search.appendices_chunker import APPENDICES_CHUNKER | |
| from components.search.faiss_vector_search import FaissVectorSearch | |
| from components.services.llm_config import LLMConfigService | |
| logger = logging.getLogger(__name__) | |
| class EntityService: | |
| """ | |
| Сервис для работы с сущностями. | |
| Объединяет функциональность chunk_repository, destructurer, injection_builder и faiss_vector_search. | |
| """ | |
| def __init__( | |
| self, | |
| vectorizer: EmbeddingExtractor, | |
| chunk_repository: ChunkRepository, | |
| config: Configuration, | |
| llm_api: DeepInfraApi, | |
| llm_config_service: LLMConfigService, | |
| ) -> None: | |
| """ | |
| Инициализация сервиса. | |
| Args: | |
| vectorizer: Модель для извлечения эмбеддингов | |
| chunk_repository: Репозиторий для работы с чанками | |
| config: Конфигурация приложения | |
| llm_api: Клиент для взаимодействия с LLM API | |
| llm_config_service: Сервис для получения конфигурации LLM | |
| """ | |
| self.vectorizer = vectorizer | |
| self.config = config | |
| self.chunk_repository = chunk_repository | |
| self.llm_api = llm_api | |
| self.llm_config_service = llm_config_service | |
| self.faiss_search = None | |
| self.current_dataset_id = None | |
| self.neighbors_max_distance = config.db_config.entities.neighbors_max_distance | |
| self.max_entities_per_message = config.db_config.search.max_entities_per_message | |
| self.max_entities_per_dialogue = ( | |
| config.db_config.search.max_entities_per_dialogue | |
| ) | |
| self.main_extractor = EntitiesExtractor( | |
| strategy_name=config.db_config.entities.strategy_name, | |
| strategy_params=config.db_config.entities.strategy_params, | |
| process_tables=config.db_config.entities.process_tables, | |
| ) | |
| self.appendices_extractor = EntitiesExtractor( | |
| strategy_name=APPENDICES_CHUNKER, | |
| strategy_params={ | |
| "llm_api": self.llm_api, | |
| "llm_config_service": self.llm_config_service, | |
| }, | |
| process_tables=False, | |
| ) | |
| self._in_memory_cache: InMemoryEntityRepository = None | |
| self._cached_dataset_id: int | None = None | |
| def invalidate_cache(self) -> None: | |
| """Инвалидирует (удаляет) текущий кеш в памяти.""" | |
| if self._in_memory_cache: | |
| self._in_memory_cache = None | |
| self._cached_dataset_id = None | |
| else: | |
| logger.info("In-memory кеш уже пуст. Ничего не делаем.") | |
| def build_cache(self, dataset_id: int) -> None: | |
| """Строит кеш для указанного датасета.""" | |
| all_entities = self.chunk_repository.get_all_entities_for_dataset(dataset_id) | |
| in_memory_repo = InMemoryEntityRepository(entities=all_entities) | |
| self._in_memory_cache = in_memory_repo | |
| self._cached_dataset_id = dataset_id | |
| async def build_or_rebuild_cache_async(self, dataset_id: int) -> None: | |
| """ | |
| Строит или перестраивает кеш для указанного датасета, удаляя предыдущий кеш. | |
| """ | |
| all_entities = await self.chunk_repository.get_all_entities_for_dataset_async(dataset_id) | |
| if not all_entities: | |
| logger.warning(f"No entities found for dataset {dataset_id}. Cache not built.") | |
| self._in_memory_cache = None | |
| self._cached_dataset_id = None | |
| return | |
| logger.info(f"Building new in-memory cache for dataset {dataset_id}") | |
| in_memory_repo = InMemoryEntityRepository(entities=all_entities) | |
| self._in_memory_cache = in_memory_repo | |
| self._cached_dataset_id = dataset_id | |
| logger.info(f"Cached {len(all_entities)} entities for dataset {dataset_id}") | |
| def _get_repository_for_dataset(self, dataset_id: int) -> EntityRepository: | |
| """ | |
| Возвращает кешированный репозиторий, если он существует и соответствует | |
| запрошенному dataset_id, иначе возвращает основной репозиторий ChunkRepository. | |
| """ | |
| # Проверяем совпадение ID с закешированным | |
| if self._cached_dataset_id == dataset_id and self._in_memory_cache is not None: | |
| return self._in_memory_cache | |
| else: | |
| # Логируем причину промаха кеша для диагностики | |
| if not self._in_memory_cache: | |
| logger.warning(f"Cache miss for dataset {dataset_id}: Cache is empty. Using ChunkRepository (DB).") | |
| elif self._cached_dataset_id != dataset_id: | |
| logger.warning(f"Cache miss for dataset {dataset_id}: Cache contains data for dataset {self._cached_dataset_id}. Using ChunkRepository (DB).") | |
| else: # На случай непредвиденной ситуации | |
| logger.warning(f"Cache miss for dataset {dataset_id}: Unknown reason. Using ChunkRepository (DB).") | |
| return self.chunk_repository | |
| def _ensure_faiss_initialized(self, dataset_id: int) -> None: | |
| """ | |
| Проверяет и при необходимости инициализирует или обновляет FAISS индекс. | |
| Args: | |
| dataset_id: ID датасета для инициализации | |
| """ | |
| # Переинициализируем FAISS, только если ID датасета изменился | |
| if self.faiss_search is None or self.current_dataset_id != dataset_id: | |
| logger.info(f'Initializing FAISS for dataset {dataset_id}') | |
| entities, embeddings = self.chunk_repository.get_searching_entities( | |
| dataset_id | |
| ) | |
| if entities: | |
| embeddings_dict = { | |
| str(entity.id): embedding # Преобразуем UUID в строку для ключа | |
| for entity, embedding in zip(entities, embeddings) | |
| if embedding is not None | |
| } | |
| if embeddings_dict: # Проверяем, что есть хотя бы один эмбеддинг | |
| self.faiss_search = FaissVectorSearch( | |
| self.vectorizer, | |
| embeddings_dict, | |
| ) | |
| self.current_dataset_id = dataset_id | |
| logger.info( | |
| f'FAISS initialized for dataset {dataset_id} with {len(embeddings_dict)} embeddings' | |
| ) | |
| else: | |
| logger.warning( | |
| f'No valid embeddings found for dataset {dataset_id}' | |
| ) | |
| self.faiss_search = None | |
| self.current_dataset_id = None | |
| else: | |
| logger.warning(f'No entities found for dataset {dataset_id}') | |
| self.faiss_search = None | |
| self.current_dataset_id = None | |
| async def process_document( | |
| self, | |
| document: ParsedDocument, | |
| dataset_id: int, | |
| progress_callback: Optional[Callable] = None, | |
| ) -> None: | |
| """ | |
| Асинхронная обработка документа: разбиение на чанки и сохранение в базу. | |
| Args: | |
| document: Документ для обработки | |
| dataset_id: ID датасета | |
| progress_callback: Функция для отслеживания прогресса | |
| """ | |
| logger.info(f"Processing document {document.name} for dataset {dataset_id}") | |
| # Определяем экстрактор в зависимости от имени документа | |
| if 'Приложение' in document.name: | |
| entities = await self.appendices_extractor.extract_async(document) | |
| else: | |
| entities = await self.main_extractor.extract_async(document) | |
| # Фильтруем сущности для поиска | |
| filtering_entities = [ | |
| entity for entity in entities if entity.in_search_text is not None | |
| ] | |
| filtering_texts = [entity.in_search_text for entity in filtering_entities] | |
| embeddings = self.vectorizer.vectorize(filtering_texts, progress_callback) | |
| # Собираем словарь эмбеддингов только для найденных сущностей | |
| embeddings_dict = {} | |
| if embeddings is not None: | |
| embeddings_dict = { | |
| str(entity.id): embedding | |
| for entity, embedding in zip(filtering_entities, embeddings) | |
| if embedding is not None | |
| } | |
| else: | |
| logger.warning(f"Vectorizer returned None for document {document.name}") | |
| # Сохраняем в базу | |
| await self.chunk_repository.add_entities_async(entities, dataset_id, embeddings_dict) | |
| logger.info(f"Added {len(entities)} entities to dataset {dataset_id}") | |
| async def add_entities_batch_async( | |
| self, | |
| dataset_id: int, | |
| entities: list[LinkerEntity], | |
| embeddings: dict[str, np.ndarray], | |
| ): | |
| """Асинхронно добавляет батч сущностей и их эмбеддингов в БД.""" | |
| if not entities: | |
| logger.info("add_entities_batch_async called with empty entities list. Nothing to add.") | |
| return | |
| logger.info(f"Starting batch insertion of {len(entities)} entities for dataset {dataset_id}...") | |
| try: | |
| await asyncio.to_thread( | |
| self.chunk_repository.add_entities, | |
| entities, | |
| dataset_id, | |
| embeddings | |
| ) | |
| logger.info(f"Batch insertion of {len(entities)} entities finished for dataset {dataset_id}.") | |
| except Exception as e: | |
| logger.error( | |
| f"Error during batch insertion for dataset {dataset_id}: {e}", | |
| exc_info=True, | |
| ) | |
| raise e | |
| async def prepare_document_data_async( | |
| self, | |
| document: ParsedDocument, | |
| progress_callback: Optional[Callable] = None, | |
| ) -> tuple[list[LinkerEntity], dict[str, np.ndarray]]: | |
| """Асинхронно извлекает сущности и векторы для документа. | |
| Не сохраняет данные в репозиторий, а возвращает их для последующей | |
| батчевой обработки. | |
| Args: | |
| document: Документ для обработки. | |
| progress_callback: Функция для отслеживания прогресса векторизации. | |
| Returns: | |
| Кортеж: (список извлеченных LinkerEntity, словарь эмбеддингов {id_str: embedding}). | |
| """ | |
| logger.debug(f"Preparing data for document {document.name}") | |
| # 1. Извлечение сущностей | |
| if 'Приложение' in document.name: | |
| entities = await self.appendices_extractor.extract_async(document) | |
| else: | |
| entities = await self.main_extractor.extract_async(document) | |
| # 2. Векторизация (если нужно) | |
| filtering_entities = [ | |
| entity for entity in entities if entity.in_search_text is not None | |
| ] | |
| filtering_texts = [entity.in_search_text for entity in filtering_entities] | |
| embeddings = self.vectorizer.vectorize(filtering_texts, progress_callback) | |
| embeddings_dict = {} | |
| if embeddings is not None: | |
| embeddings_dict = { | |
| str(entity.id): embedding | |
| for entity, embedding in zip(filtering_entities, embeddings) | |
| if embedding is not None | |
| } | |
| else: | |
| logger.warning(f"Vectorizer returned None for document {document.name}") | |
| logger.debug(f"Prepared data for document {document.name}: {len(entities)} entities, {len(embeddings_dict)} embeddings.") | |
| return entities, embeddings_dict | |
| async def build_text_async( | |
| self, | |
| entities: list[str], | |
| dataset_id: int, | |
| chunk_scores: Optional[list[float]] = None, | |
| include_tables: bool = True, | |
| max_documents: Optional[int] = None, | |
| ) -> str: | |
| """ | |
| Асинхронная сборка текста из сущностей с использованием кешированного или основного репозитория. | |
| Args: | |
| entities: Список идентификаторов сущностей (строки UUID) | |
| dataset_id: ID датасета для получения репозитория (кешированного или БД) | |
| chunk_scores: Список весов чанков (соответствует порядку entities) | |
| include_tables: Флаг включения таблиц | |
| max_documents: Максимальное количество документов | |
| Returns: | |
| Собранный текст | |
| """ | |
| if not entities: | |
| logger.warning("build_text called with empty entities list.") | |
| return "" | |
| try: | |
| entity_ids = [UUID(entity) for entity in entities] | |
| except ValueError as e: | |
| logger.error(f"Invalid UUID format found in entities list: {e}") | |
| raise ValueError(f"Invalid UUID format in entities list: {entities}") from e | |
| repository = self._get_repository_for_dataset(dataset_id) | |
| # Передаем репозиторий (кеш или БД) в InjectionBuilder | |
| builder = InjectionBuilder(repository=repository) | |
| # Создаем словарь score_map UUID -> score, если chunk_scores предоставлены | |
| scores_map: dict[UUID, float] | None = None | |
| if chunk_scores is not None: | |
| if len(entity_ids) == len(chunk_scores): | |
| scores_map = {eid: score for eid, score in zip(entity_ids, chunk_scores)} | |
| else: | |
| logger.warning(f"Length mismatch between entities ({len(entity_ids)}) and chunk_scores ({len(chunk_scores)}). Scores ignored.") | |
| logger.info(f"Building text for {len(entity_ids)} entities from dataset {dataset_id} using {repository.__class__.__name__}") | |
| # Вызываем асинхронный метод сборщика | |
| return await builder.build_async( | |
| entities=entity_ids, # Передаем список UUID | |
| scores=scores_map, # Передаем словарь UUID -> score | |
| include_tables=include_tables, | |
| neighbors_max_distance=self.neighbors_max_distance, | |
| max_documents=max_documents, | |
| ) | |
| def search_similar_old( | |
| self, | |
| query: str, | |
| dataset_id: int, | |
| k: int | None = None, | |
| ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: | |
| """ | |
| Поиск похожих сущностей. | |
| Args: | |
| query: Текст запроса | |
| dataset_id: ID датасета | |
| k: Максимальное количество возвращаемых результатов (по умолчанию - все). | |
| Returns: | |
| tuple[np.ndarray, np.ndarray, np.ndarray]: | |
| - Вектор запроса | |
| - Оценки сходства | |
| - Идентификаторы найденных сущностей | |
| """ | |
| logger.info(f"Searching similar entities for dataset {dataset_id} with k={k}") | |
| self._ensure_faiss_initialized(dataset_id) | |
| if self.faiss_search is None: | |
| logger.warning( | |
| f"FAISS search not initialized for dataset {dataset_id}. Returning empty results." | |
| ) | |
| return np.array([]), np.array([]), np.array([]) | |
| # Выполняем поиск с использованием параметра k | |
| query_vector, scores, ids = self.faiss_search.search_vectors(query, max_entities=k) | |
| logger.info(f"Found {len(ids)} similar entities.") | |
| return query_vector, scores, ids | |
| def search_similar( | |
| self, | |
| query: str, | |
| dataset_id: int, | |
| previous_entities: list[list[str]] = None, | |
| ) -> tuple[list[list[str]], list[str], list[float]]: | |
| """ | |
| Поиск похожих сущностей. | |
| Args: | |
| query: Текст запроса | |
| dataset_id: ID датасета | |
| previous_entities: Список идентификаторов сущностей, которые уже были найдены | |
| Returns: | |
| tuple[list[list[str]], list[str], list[float]]: | |
| - Перефильтрованный список идентификаторов сущностей из прошлых запросов | |
| - Список идентификаторов найденных сущностей (строки UUID) | |
| - Скоры найденных сущностей | |
| """ | |
| self._ensure_faiss_initialized(dataset_id) | |
| if self.faiss_search is None: | |
| return previous_entities, [], [] | |
| if ( | |
| sum(len(entities) for entities in previous_entities) | |
| < self.max_entities_per_dialogue - self.max_entities_per_message | |
| ): | |
| _, scores, ids = self.faiss_search.search_vectors( | |
| query, self.max_entities_per_message | |
| ) | |
| try: | |
| scores = scores.tolist() | |
| ids = ids.tolist() | |
| except: | |
| scores = list(scores) | |
| ids = list(ids) | |
| return previous_entities, ids, scores | |
| if previous_entities: | |
| _, scores, ids = self.faiss_search.search_vectors( | |
| query, self.max_entities_per_dialogue | |
| ) | |
| scores = scores.tolist() | |
| ids = ids.tolist() | |
| print(ids) | |
| previous_entities_ids = [ | |
| [entity for entity in sublist if entity in ids] | |
| for sublist in previous_entities | |
| ] | |
| previous_entities_flat = [ | |
| entity for sublist in previous_entities_ids for entity in sublist | |
| ] | |
| new_entities = [] | |
| new_scores = [] | |
| for id_, score in zip(ids, scores): | |
| if id_ not in previous_entities_flat: | |
| new_entities.append(id_) | |
| new_scores.append(score) | |
| if len(new_entities) >= self.max_entities_per_message: | |
| break | |
| return previous_entities, new_entities, new_scores | |
| else: | |
| _, scores, ids = self.faiss_search.search_vectors( | |
| query, self.max_entities_per_dialogue | |
| ) | |
| scores = scores.tolist() | |
| ids = ids.tolist() | |
| return [], ids, scores | |