File size: 7,810 Bytes
63a687d
218085c
 
 
 
44706f3
218085c
 
63a687d
218085c
 
 
63a687d
9fa4ecb
0780f6f
 
 
218085c
9fa4ecb
 
 
013a9ea
218085c
 
 
63a687d
44706f3
 
 
 
 
 
 
 
 
 
 
 
bd1a487
 
 
 
44706f3
 
0780f6f
 
 
44706f3
8b892b9
 
 
 
 
 
0780f6f
 
8b892b9
 
 
 
 
44706f3
8b892b9
44706f3
0780f6f
8b892b9
 
 
44706f3
0780f6f
 
 
 
 
 
 
44706f3
 
 
 
9fa4ecb
bd1a487
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
218085c
9fa4ecb
 
 
63a687d
 
218085c
9fa4ecb
218085c
9fa4ecb
63a687d
218085c
 
 
 
63a687d
 
 
 
 
 
218085c
44706f3
8b892b9
 
 
0780f6f
8b892b9
 
 
 
 
 
 
 
bd1a487
 
 
9fa4ecb
63a687d
 
bd1a487
63a687d
0780f6f
 
 
 
 
63a687d
 
8b892b9
44706f3
0780f6f
bd1a487
9fa4ecb
 
63a687d
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
"""Экстрактор пользователей на той же логике, что и поиск поставщика."""

from __future__ import annotations

import re
import importlib
from typing import Any

from extractors.supplier_extractor import ExpenseSupplierExtractor, normalize_text


class ExpenseUserExtractor:
    """Ищет пользователя тем же fuzzy-matcher, что и поставщика."""

    MIN_LEXICAL_SUPPORT = 0.40
    MIN_LEXICAL_WITH_PERSON = 0.30

    def __init__(
        self,
        users: list[str],
        suppliers: list[str],
        threshold: float = 0.25,
    ) -> None:
        self.threshold = threshold
        self.supplier_terms = {normalize_text(supplier) for supplier in suppliers}
        self.user_matcher = ExpenseSupplierExtractor(suppliers=users)
        self.morph: Any = None
        try:
            pymorphy3_module = importlib.import_module("pymorphy3")
            self.morph = pymorphy3_module.MorphAnalyzer()
        except Exception:
            self.morph = None

    def _looks_like_person_token(self, token: str) -> tuple[bool, float, bool]:
        lexical = self.user_matcher.lexical_support(token)
        has_person_grammeme = False
        if self.morph is not None:
            parses = self.morph.parse(token)
            if parses:
                has_person_grammeme = bool(
                    {"Name", "Surn", "Patr"}.intersection(set(parses[0].tag.grammemes))
                )

        # Сохраняем низкий порог для имён, но не пропускаем нарицательные слова.
        accepted = lexical >= self.MIN_LEXICAL_SUPPORT or (
            has_person_grammeme and lexical >= self.MIN_LEXICAL_WITH_PERSON
        )
        return accepted, lexical, has_person_grammeme

    def _build_user_candidate_text(
        self,
        normalized_text: str,
        supplier_phrase: str | None,
        date_phrase: str | None,
        include_debug: bool = False,
    ) -> tuple[str, list[str], list[dict[str, Any]] | None]:
        excluded_tokens: set[str] = set(self.user_matcher.noise_terms)
        if supplier_phrase:
            excluded_tokens.update(normalize_text(supplier_phrase).split())
        if date_phrase:
            excluded_tokens.update(normalize_text(date_phrase).split())
        excluded_tokens.update(self.supplier_terms)

        candidate_tokens: list[str] = []
        candidate_debug: list[dict[str, Any]] | None = [] if include_debug else None
        for token in normalized_text.split():
            if token in excluded_tokens or token.isdigit() or len(token) <= 1:
                continue
            accepted, lexical, has_person_grammeme = self._looks_like_person_token(token)
            if candidate_debug is not None:
                candidate_debug.append({
                    "token": token,
                    "lexical_support": round(lexical, 4),
                    "has_person_grammeme": has_person_grammeme,
                    "accepted": accepted,
                })
            if accepted:
                candidate_tokens.append(token)

        return " ".join(candidate_tokens), candidate_tokens, candidate_debug

    def _match_user_from_candidates(
        self,
        candidate_tokens: list[str],
        include_debug: bool = False,
    ) -> tuple[dict[str, Any], dict[str, Any] | None]:
        phrases: list[str] = []
        seen: set[str] = set()
        max_words = self.user_matcher.max_words
        for i in range(len(candidate_tokens)):
            for j in range(i + 1, min(i + 1 + max_words, len(candidate_tokens) + 1)):
                phrase = " ".join(candidate_tokens[i:j])
                if phrase not in seen:
                    seen.add(phrase)
                    phrases.append(phrase)

        best_row: dict[str, Any] | None = None
        debug_rows: list[dict[str, Any]] = []
        for phrase in phrases:
            row = self.user_matcher.score_phrase(phrase)
            score = float(row.get("score", -1.0))
            support = self.user_matcher.lexical_support(phrase)
            combined = 0.75 * score + 0.25 * support

            if include_debug:
                debug_rows.append({
                    "phrase": phrase,
                    "supplier": row.get("supplier"),
                    "score": round(score, 4),
                    "support": round(support, 4),
                    "combined": round(combined, 4),
                })

            if score >= self.threshold or combined >= self.threshold:
                enriched = {
                    "user": row.get("supplier"),
                    "user_score": round(score, 4) if score >= 0 else None,
                    "matched_user_phrase": phrase,
                    "combined": combined,
                }
                if best_row is None or combined > float(best_row.get("combined", -1.0)):
                    best_row = enriched

        if best_row is None:
            match_payload = {
                "user": None,
                "user_score": None,
                "matched_user_phrase": None,
            }
        else:
            match_payload = {
                "user": best_row.get("user"),
                "user_score": best_row.get("user_score"),
                "matched_user_phrase": best_row.get("matched_user_phrase"),
            }

        match_debug = None
        if include_debug:
            match_debug = {
                "phrases_count": len(phrases),
                "score_threshold": self.threshold,
                "combined_threshold": self.threshold,
                "top_candidates": sorted(debug_rows, key=lambda item: item["combined"], reverse=True)[:8],
            }

        return match_payload, match_debug

    def extract(
        self,
        text: str,
        supplier_phrase: str | None = None,
        date_phrase: str | None = None,
        debug: bool = False,
    ) -> dict[str, Any]:
        normalized_text = normalize_text(text)

        if re.search(r"(?<!\S)я(?!\S)", normalized_text, re.IGNORECASE):
            payload = {
                "user": "Я",
                "user_score": 1.0,
                "matched_user_phrase": "я",
            }
            if debug:
                payload["user_debug"] = {
                    "mode": "direct-pronoun",
                    "normalized_text": normalized_text,
                }
            return payload

        candidate_text, candidate_tokens, candidate_debug = self._build_user_candidate_text(
            normalized_text=normalized_text,
            supplier_phrase=supplier_phrase,
            date_phrase=date_phrase,
            include_debug=debug,
        )

        if not candidate_text:
            payload = {
                "user": None,
                "user_score": None,
                "matched_user_phrase": None,
            }
            match_debug = None
        else:
            payload, match_debug = self._match_user_from_candidates(candidate_tokens, include_debug=debug)

        if debug:
            payload["user_debug"] = {
                "mode": "user-matcher",
                "threshold": self.threshold,
                "rules": {
                    "min_lexical_support": self.MIN_LEXICAL_SUPPORT,
                    "min_lexical_with_person_grammeme": self.MIN_LEXICAL_WITH_PERSON,
                    "morph_enabled": self.morph is not None,
                },
                "excluded_supplier_phrase": supplier_phrase,
                "normalized_text": normalized_text,
                "candidate_text": candidate_text,
                "candidate_tokens": candidate_tokens,
                "candidate_token_debug": candidate_debug or [],
                "matcher_debug": match_debug,
            }

        return payload