"""Экстрактор пользователей на той же логике, что и поиск поставщика.""" from __future__ import annotations import re import importlib from typing import Any from extractors.supplier_extractor import ExpenseSupplierExtractor, normalize_text class ExpenseUserExtractor: """Ищет пользователя тем же fuzzy-matcher, что и поставщика.""" MIN_LEXICAL_SUPPORT = 0.40 MIN_LEXICAL_WITH_PERSON = 0.30 def __init__( self, users: list[str], suppliers: list[str], threshold: float = 0.25, ) -> None: self.threshold = threshold self.supplier_terms = {normalize_text(supplier) for supplier in suppliers} self.user_matcher = ExpenseSupplierExtractor(suppliers=users) self.morph: Any = None try: pymorphy3_module = importlib.import_module("pymorphy3") self.morph = pymorphy3_module.MorphAnalyzer() except Exception: self.morph = None def _looks_like_person_token(self, token: str) -> tuple[bool, float, bool]: lexical = self.user_matcher.lexical_support(token) has_person_grammeme = False if self.morph is not None: parses = self.morph.parse(token) if parses: has_person_grammeme = bool( {"Name", "Surn", "Patr"}.intersection(set(parses[0].tag.grammemes)) ) # Сохраняем низкий порог для имён, но не пропускаем нарицательные слова. accepted = lexical >= self.MIN_LEXICAL_SUPPORT or ( has_person_grammeme and lexical >= self.MIN_LEXICAL_WITH_PERSON ) return accepted, lexical, has_person_grammeme def _build_user_candidate_text( self, normalized_text: str, supplier_phrase: str | None, date_phrase: str | None, include_debug: bool = False, ) -> tuple[str, list[str], list[dict[str, Any]] | None]: excluded_tokens: set[str] = set(self.user_matcher.noise_terms) if supplier_phrase: excluded_tokens.update(normalize_text(supplier_phrase).split()) if date_phrase: excluded_tokens.update(normalize_text(date_phrase).split()) excluded_tokens.update(self.supplier_terms) candidate_tokens: list[str] = [] candidate_debug: list[dict[str, Any]] | None = [] if include_debug else None for token in normalized_text.split(): if token in excluded_tokens or token.isdigit() or len(token) <= 1: continue accepted, lexical, has_person_grammeme = self._looks_like_person_token(token) if candidate_debug is not None: candidate_debug.append({ "token": token, "lexical_support": round(lexical, 4), "has_person_grammeme": has_person_grammeme, "accepted": accepted, }) if accepted: candidate_tokens.append(token) return " ".join(candidate_tokens), candidate_tokens, candidate_debug def _match_user_from_candidates( self, candidate_tokens: list[str], include_debug: bool = False, ) -> tuple[dict[str, Any], dict[str, Any] | None]: phrases: list[str] = [] seen: set[str] = set() max_words = self.user_matcher.max_words for i in range(len(candidate_tokens)): for j in range(i + 1, min(i + 1 + max_words, len(candidate_tokens) + 1)): phrase = " ".join(candidate_tokens[i:j]) if phrase not in seen: seen.add(phrase) phrases.append(phrase) best_row: dict[str, Any] | None = None debug_rows: list[dict[str, Any]] = [] for phrase in phrases: row = self.user_matcher.score_phrase(phrase) score = float(row.get("score", -1.0)) support = self.user_matcher.lexical_support(phrase) combined = 0.75 * score + 0.25 * support if include_debug: debug_rows.append({ "phrase": phrase, "supplier": row.get("supplier"), "score": round(score, 4), "support": round(support, 4), "combined": round(combined, 4), }) if score >= self.threshold or combined >= self.threshold: enriched = { "user": row.get("supplier"), "user_score": round(score, 4) if score >= 0 else None, "matched_user_phrase": phrase, "combined": combined, } if best_row is None or combined > float(best_row.get("combined", -1.0)): best_row = enriched if best_row is None: match_payload = { "user": None, "user_score": None, "matched_user_phrase": None, } else: match_payload = { "user": best_row.get("user"), "user_score": best_row.get("user_score"), "matched_user_phrase": best_row.get("matched_user_phrase"), } match_debug = None if include_debug: match_debug = { "phrases_count": len(phrases), "score_threshold": self.threshold, "combined_threshold": self.threshold, "top_candidates": sorted(debug_rows, key=lambda item: item["combined"], reverse=True)[:8], } return match_payload, match_debug def extract( self, text: str, supplier_phrase: str | None = None, date_phrase: str | None = None, debug: bool = False, ) -> dict[str, Any]: normalized_text = normalize_text(text) if re.search(r"(?