ConvertAudioToJSON / extractors /date_extractor.py
VladGeekPro
SupplierDebugging
c743599
raw
history blame
24.7 kB
"""
Экстрактор дат из русского текста.
Классы:
- UniversalDateParser: парсер дат с поддержкой относительных и абсолютных дат
- ExpenseDateExtractor: обёртка для извлечения дат из текста
- ParsedDate: результат парсинга
- Token: токен текста
"""
from __future__ import annotations
import calendar
import difflib
import re
from dataclasses import dataclass
from datetime import date, datetime, timedelta
from typing import Any, Optional
from dateparser.search import search_dates
from pymorphy3 import MorphAnalyzer
MORPH = MorphAnalyzer()
WORD_RE = re.compile(r"[0-9]+(?:[./-][0-9]+)*|[а-яё]+", re.IGNORECASE)
@dataclass(frozen=True)
class ParsedDate:
"""Результат парсинга даты."""
date_iso: str
matched_expression: Optional[str]
@dataclass(frozen=True)
class Token:
"""Токен текста с морфологической информацией."""
original: str
normalized: str
raw_lemma: str
lemma: str
lemma_correction: Optional[str]
start: int
end: int
lemma_start: int
lemma_end: int
class UniversalDateParser:
"""
Универсальный парсер дат для русского языка.
Поддерживает:
- Прямые относительные даты: вчера, завтра, позавчера, послезавтра
- Недели: на следующей неделе, на прошлой неделе
- Периоды: через 2 дня, 3 недели назад, через месяц
- Текстовые даты: 5 марта, 15 января 2025
- Числовые даты: 15.01.2025, 2025-01-15
- Края периодов: в конце месяца, в начале недели
"""
MONTHS = {
"январь": 1, "февраль": 2, "март": 3, "апрель": 4, "май": 5, "июнь": 6,
"июль": 7, "август": 8, "сентябрь": 9, "октябрь": 10, "ноябрь": 11, "декабрь": 12,
}
WEEKDAYS = {
"понедельник": 0, "вторник": 1, "среда": 2, "четверг": 3,
"пятница": 4, "суббота": 5, "воскресенье": 6,
}
DIRECT_RELATIVE = {
"послезавтра": 2, "позавчера": -2, "сегодня": 0, "вчера": -1, "завтра": 1
}
ORDINAL_DAYS = {
"первый": 1, "второй": 2, "третий": 3, "четвертый": 4, "пятый": 5, "шестой": 6,
"седьмой": 7, "восьмой": 8, "девятый": 9, "десятый": 10, "одиннадцатый": 11,
"двенадцатый": 12, "тринадцатый": 13, "четырнадцатый": 14, "пятнадцатый": 15,
"шестнадцатый": 16, "семнадцатый": 17, "восемнадцатый": 18, "девятнадцатый": 19,
"двадцатый": 20, "двадцать первый": 21, "двадцать второй": 22, "двадцать третий": 23,
"двадцать четвертый": 24, "двадцать пятый": 25, "двадцать шестой": 26,
"двадцать седьмой": 27, "двадцать восьмой": 28, "двадцать девятый": 29,
"тридцатый": 30, "тридцать первый": 31,
}
NUMBER_WORDS = {
"ноль": 0, "один": 1, "два": 2, "три": 3, "четыре": 4, "пять": 5, "шесть": 6,
"семь": 7, "восемь": 8, "девять": 9, "десять": 10, "одиннадцать": 11,
"двенадцать": 12, "тринадцать": 13, "четырнадцать": 14, "пятнадцать": 15,
"шестнадцать": 16, "семнадцать": 17, "восемнадцать": 18, "девятнадцать": 19,
"двадцать": 20, "тридцать": 30,
}
FUTURE_HINTS = ("завтра", "послезавтра", "через", "быть", "заплатить", "следующий", "последующий")
PAST_HINTS = ("вчера", "позавчера", "назад", "прошлый", "предыдущий", "оплатить", "купить", "заказать")
# Регулярные выражения для парсинга
DIRECT_RELATIVE_RE = re.compile(r"(?<!\S)(послезавтра|позавчера|сегодня|вчера|завтра)(?!\S)")
WEEK_RELATIVE_RE = re.compile(
r"(?<!\S)на (?P<which>следующий|последующий|прошлый|предыдущий|этот) неделя"
r"(?: (?P<prep>в|во|на) (?P<weekday>понедельник|вторник|среда|четверг|пятница|суббота|воскресенье))?(?!\S)"
)
QUANTITY_RELATIVE_RE = re.compile(
r"(?<!\S)(?P<number>\d+|[а-яё]+(?: [а-яё]+)?) "
r"(?P<unit>месяц|неделя|день) "
r"(?P<ago>назад)"
r"(?: (?P<prep>в|во|на) (?P<weekday>понедельник|вторник|среда|четверг|пятница|суббота|воскресенье))?(?!\S)",
re.IGNORECASE,
)
FORWARD_QUANTITY_RE = re.compile(
r"(?<!\S)(?P<through>через) "
r"(?P<number>\d+|[а-яё]+(?: [а-яё]+)?) "
r"(?P<unit>месяц|неделя|день)"
r"(?: (?P<prep>в|во|на) (?P<weekday>понедельник|вторник|среда|четверг|пятница|суббота|воскресенье))?(?!\S)",
re.IGNORECASE,
)
FORWARD_SINGLE_UNIT_RE = re.compile(
r"(?<!\S)(?P<through>через) "
r"(?P<unit>месяц|неделя|день)"
r"(?: (?P<prep>в|во|на) (?P<weekday>понедельник|вторник|среда|четверг|пятница|суббота|воскресенье))?(?!\S)",
re.IGNORECASE,
)
TEXTUAL_ABSOLUTE_RE = re.compile(
r"(?<!\S)(?P<day>\d{1,2}|[а-яё]+(?: [а-яё]+)?) "
r"(?P<month>январь|февраль|март|апрель|май|июнь|июль|август|сентябрь|октябрь|ноябрь|декабрь)"
r"(?: (?P<year>\d{4}))?(?!\S)",
re.IGNORECASE,
)
PERIOD_EDGE_RE = re.compile(
r"(?<!\S)(?:в )?(?P<edge>начало|конец) (?P<which>этот|следующий|последующий|прошлый|предыдущий) (?P<unit>неделя|месяц)(?!\S)",
re.IGNORECASE,
)
@classmethod
def temporal_vocabulary(cls) -> set[str]:
"""Возвращает словарь временных терминов."""
vocab: set[str] = set()
vocab.update(cls.MONTHS)
vocab.update(cls.WEEKDAYS)
vocab.update(cls.DIRECT_RELATIVE)
vocab.update(cls.ORDINAL_DAYS)
vocab.update(cls.NUMBER_WORDS)
vocab.update({
"неделя", "месяц", "день", "назад", "через", "начало", "конец", "на", "в", "во",
"этот", "прошлый", "предыдущий", "следующий", "последующий",
})
return vocab
@staticmethod
def similarity(left: str, right: str) -> float:
"""Вычисляет схожесть двух строк."""
return difflib.SequenceMatcher(None, left, right).ratio()
@classmethod
def pick_temporal_correction(cls, normalized: str, raw_lemma: str) -> tuple[str, Optional[str]]:
"""Подбирает коррекцию для временного термина."""
vocab = cls.temporal_vocabulary()
if raw_lemma in vocab or not normalized.isalpha() or len(normalized) < 5:
return raw_lemma, None
candidates = list(difflib.get_close_matches(normalized, list(vocab), n=4, cutoff=0.74))
candidates.extend(difflib.get_close_matches(raw_lemma, list(vocab), n=4, cutoff=0.74))
candidates = list(dict.fromkeys(candidates))
if not candidates:
return raw_lemma, None
best = max(candidates, key=lambda item: max(cls.similarity(normalized, item), cls.similarity(raw_lemma, item)))
best_score = max(cls.similarity(normalized, best), cls.similarity(raw_lemma, best))
return (best, f"{raw_lemma}->{best}") if best_score >= 0.80 else (raw_lemma, None)
@staticmethod
def normalize_word(word: str) -> str:
"""Нормализует слово."""
return word.lower().replace("ё", "е")
@classmethod
def lemmatize(cls, word: str) -> str:
"""Возвращает лемму слова."""
return MORPH.parse(word)[0].normal_form if word.isalpha() else word
@classmethod
def tokenize(cls, text: str) -> list[Token]:
"""Токенизирует текст."""
tokens: list[Token] = []
lemma_cursor = 0
for match in WORD_RE.finditer(text):
original = match.group(0)
normalized = cls.normalize_word(original)
raw_lemma = cls.lemmatize(normalized)
lemma, correction = cls.pick_temporal_correction(normalized, raw_lemma)
lemma_start = lemma_cursor
lemma_end = lemma_start + len(lemma)
tokens.append(Token(original, normalized, raw_lemma, lemma, correction, match.start(), match.end(), lemma_start, lemma_end))
lemma_cursor = lemma_end + 1
return tokens
@staticmethod
def lemma_text(tokens: list[Token]) -> str:
"""Возвращает текст из лемм токенов."""
return " ".join(token.lemma for token in tokens)
@staticmethod
def surface_text(text: str, tokens: list[Token], start_idx: int, end_idx: int) -> str:
"""Возвращает исходный текст по индексам токенов."""
return text[tokens[start_idx].start:tokens[end_idx].end].strip() if tokens else ""
@staticmethod
def lemma_span_to_token_range(tokens: list[Token], span: tuple[int, int]) -> Optional[tuple[int, int]]:
"""Преобразует позиции в тексте лемм в индексы токенов."""
start_char, end_char = span
start_idx = end_idx = None
for idx, token in enumerate(tokens):
if start_idx is None and token.lemma_start <= start_char < token.lemma_end:
start_idx = idx
if token.lemma_start < end_char <= token.lemma_end:
end_idx = idx
break
return (start_idx, end_idx) if start_idx is not None and end_idx is not None else None
@classmethod
def make_parsed_date(cls, text: str, tokens: list[Token], match, parsed_date: date) -> Optional[ParsedDate]:
"""Создаёт ParsedDate из результата match."""
token_span = cls.lemma_span_to_token_range(tokens, match.span())
if token_span is None:
return None
return ParsedDate(parsed_date.isoformat(), cls.surface_text(text, tokens, token_span[0], token_span[1]))
@classmethod
def parse_number_phrase(cls, phrase: str) -> Optional[int]:
"""Парсит числовую фразу (цифры или слова)."""
phrase = phrase.strip()
if not phrase:
return None
if phrase.isdigit():
return int(phrase)
parts = phrase.split()
if len(parts) == 1:
return cls.NUMBER_WORDS.get(parts[0])
if len(parts) == 2 and parts[0] in {"двадцать", "тридцать"}:
base = cls.NUMBER_WORDS.get(parts[0])
addon = cls.NUMBER_WORDS.get(parts[1])
if base is not None and addon is not None and 1 <= addon <= 9:
return base + addon
return None
@classmethod
def parse_day_phrase(cls, phrase: str) -> Optional[int]:
"""Парсит день (число или порядковое слово)."""
if phrase.isdigit():
value = int(phrase)
return value if 1 <= value <= 31 else None
return cls.ORDINAL_DAYS.get(phrase.strip())
@staticmethod
def shift_months(value: date, months: int) -> date:
"""Сдвигает дату на указанное число месяцев."""
month_index = value.month - 1 + months
year = value.year + month_index // 12
month = month_index % 12 + 1
day = min(value.day, calendar.monthrange(year, month)[1])
return date(year, month, day)
@staticmethod
def parse_numeric_absolute(tokens: list[Token]) -> Optional[ParsedDate]:
"""Парсит числовые даты: 15.01.2025, 2025-01-15."""
for token in tokens:
separator = "." if "." in token.original else "-" if "-" in token.original else "/" if "/" in token.original else None
if separator is None:
continue
parts = token.original.split(separator)
if len(parts) != 3 or not all(part.isdigit() for part in parts):
continue
try:
if len(parts[0]) == 4:
parsed = date(int(parts[0]), int(parts[1]), int(parts[2]))
elif len(parts[2]) == 4:
parsed = date(int(parts[2]), int(parts[1]), int(parts[0]))
else:
continue
return ParsedDate(parsed.isoformat(), token.original)
except ValueError:
continue
return None
@classmethod
def parse_textual_absolute(cls, text: str, tokens: list[Token], reference_date: date) -> Optional[ParsedDate]:
"""Парсит текстовые даты: 5 марта, 15 января 2025."""
lemma_text = cls.lemma_text(tokens)
for match in cls.TEXTUAL_ABSOLUTE_RE.finditer(lemma_text):
day = cls.parse_day_phrase(match.group("day"))
month = cls.MONTHS.get(match.group("month"))
if day is None or month is None:
continue
year = int(match.group("year")) if match.group("year") else reference_date.year
try:
parsed = date(year, month, day)
except ValueError:
continue
result = cls.make_parsed_date(text, tokens, match, parsed)
if result is not None:
return result
return None
@classmethod
def parse_direct_relative(cls, text: str, tokens: list[Token], reference_date: date) -> Optional[ParsedDate]:
"""Парсит прямые относительные даты: вчера, завтра, позавчера, послезавтра."""
lemma_text = cls.lemma_text(tokens)
match = cls.DIRECT_RELATIVE_RE.search(lemma_text)
if not match:
return None
parsed = reference_date + timedelta(days=cls.DIRECT_RELATIVE[match.group(1)])
return cls.make_parsed_date(text, tokens, match, parsed)
@staticmethod
def week_monday(value: date) -> date:
"""Возвращает понедельник недели для указанной даты."""
return value - timedelta(days=value.weekday())
@classmethod
def parse_week_relative(cls, text: str, tokens: list[Token], reference_date: date) -> Optional[ParsedDate]:
"""Парсит недельные относительные даты: на следующей неделе, на прошлой неделе."""
lemma_text = cls.lemma_text(tokens)
match = cls.WEEK_RELATIVE_RE.search(lemma_text)
if not match:
return None
offsets = {"следующий": 7, "последующий": 7, "прошлый": -7, "предыдущий": -7, "этот": 0}
anchor = reference_date + timedelta(days=offsets[match.group("which")])
if match.group("weekday"):
anchor = cls.week_monday(anchor) + timedelta(days=cls.WEEKDAYS[match.group("weekday")])
return cls.make_parsed_date(text, tokens, match, anchor)
@classmethod
def parse_period_edge(cls, text: str, tokens: list[Token], reference_date: date) -> Optional[ParsedDate]:
"""Парсит края периодов: в конце месяца, в начале недели."""
lemma_text = cls.lemma_text(tokens)
match = cls.PERIOD_EDGE_RE.search(lemma_text)
if not match:
return None
edge, which, unit = match.group("edge"), match.group("which"), match.group("unit")
if unit == "неделя":
offsets = {"прошлый": -7, "предыдущий": -7, "этот": 0, "следующий": 7, "последующий": 7}
monday = cls.week_monday(reference_date + timedelta(days=offsets[which]))
parsed_date = monday if edge == "начало" else monday + timedelta(days=6)
else:
month_offset = {"прошлый": -1, "предыдущий": -1, "этот": 0, "следующий": 1, "последующий": 1}[which]
shifted = cls.shift_months(date(reference_date.year, reference_date.month, 1), month_offset)
parsed_date = shifted if edge == "начало" else date(shifted.year, shifted.month, calendar.monthrange(shifted.year, shifted.month)[1])
return cls.make_parsed_date(text, tokens, match, parsed_date)
@classmethod
def parse_quantity_relative(cls, text: str, tokens: list[Token], reference_date: date) -> Optional[ParsedDate]:
"""Парсит количественные относительные даты: через 2 дня, 3 недели назад."""
lemma_text = cls.lemma_text(tokens)
for regex, direction in ((cls.QUANTITY_RELATIVE_RE, -1), (cls.FORWARD_QUANTITY_RE, 1)):
for match in regex.finditer(lemma_text):
number = cls.parse_number_phrase(match.group("number"))
if number is None:
continue
unit = match.group("unit")
if unit == "месяц":
anchor = cls.shift_months(reference_date, direction * number)
else:
days = number * 7 if unit == "неделя" else number
anchor = reference_date + timedelta(days=direction * days)
if match.group("weekday"):
anchor = cls.week_monday(anchor) + timedelta(days=cls.WEEKDAYS[match.group("weekday")])
result = cls.make_parsed_date(text, tokens, match, anchor)
if result is not None:
return result
for match in cls.FORWARD_SINGLE_UNIT_RE.finditer(lemma_text):
unit = match.group("unit")
if unit == "месяц":
anchor = cls.shift_months(reference_date, 1)
else:
days = 7 if unit == "неделя" else 1
anchor = reference_date + timedelta(days=days)
if match.group("weekday"):
anchor = cls.week_monday(anchor) + timedelta(days=cls.WEEKDAYS[match.group("weekday")])
result = cls.make_parsed_date(text, tokens, match, anchor)
if result is not None:
return result
return None
@classmethod
def preference_for_text(cls, tokens: list[Token]) -> str:
"""Определяет предпочтение: прошлое или будущее."""
lemmas = [token.lemma for token in tokens]
future = sum(1 for hint in cls.FUTURE_HINTS if hint in lemmas)
past = sum(1 for hint in cls.PAST_HINTS if hint in lemmas)
return "future" if future > past else "past"
@staticmethod
def choose_best(matches: list[tuple[str, datetime]]) -> tuple[str, datetime]:
"""Выбирает лучший результат из списка."""
return sorted(matches, key=lambda item: (len(item[0]), -item[1].timestamp()), reverse=True)[0]
def parse(self, text: str, reference_date: date) -> Optional[ParsedDate]:
"""
Основной метод парсинга даты из текста.
Args:
text: Текст для парсинга
reference_date: Базовая дата для относительных вычислений
Returns:
ParsedDate с результатом или None
"""
tokens = self.tokenize(text)
# Пробуем все парсеры по очереди
for parser in (
lambda: self.parse_numeric_absolute(tokens),
lambda: self.parse_textual_absolute(text, tokens, reference_date),
lambda: self.parse_direct_relative(text, tokens, reference_date),
lambda: self.parse_week_relative(text, tokens, reference_date),
lambda: self.parse_period_edge(text, tokens, reference_date),
lambda: self.parse_quantity_relative(text, tokens, reference_date),
):
parsed = parser()
if parsed is not None:
return parsed
# Fallback: dateparser
normalized = " ".join(token.normalized for token in tokens)
relative_base = datetime.combine(reference_date, datetime.min.time()).replace(hour=12)
result = search_dates(
normalized,
languages=["ru"],
settings={
"RELATIVE_BASE": relative_base,
"PREFER_DATES_FROM": self.preference_for_text(tokens),
"STRICT_PARSING": False,
"REQUIRE_PARTS": [],
"NORMALIZE": True,
"RETURN_AS_TIMEZONE_AWARE": False,
"DATE_ORDER": "DMY",
},
)
filtered: list[tuple[str, datetime]] = []
for matched, value in result or []:
if isinstance(value, datetime) and not matched.strip().isdigit() and 2020 <= value.year <= 2100:
filtered.append((matched.strip(), value))
if not filtered:
return None
matched_expression, value = self.choose_best(filtered)
return ParsedDate(date_iso=value.date().isoformat(), matched_expression=matched_expression)
class ExpenseDateExtractor:
"""
Экстрактор дат для текста расходов.
Обёртка над UniversalDateParser с удобным интерфейсом.
"""
def __init__(self) -> None:
self.parser = UniversalDateParser()
def extract(self, text: str, reference_date: str | date | None = None, debug: bool = False) -> dict[str, Any]:
"""
Извлекает дату из текста.
Args:
text: Текст для анализа
reference_date: Базовая дата (по умолчанию сегодня)
debug: Включить отладочную информацию
Returns:
Словарь с date, date_iso, matched_date_phrase
"""
ref_date = self.to_date(reference_date or date.today().isoformat())
parsed = self.parser.parse(text=text, reference_date=ref_date)
matched_phrase = parsed.matched_expression if parsed else None
matched_span = None
if matched_phrase:
idx = text.lower().find(matched_phrase.lower())
if idx != -1:
matched_span = [idx, idx + len(matched_phrase)]
payload = {
"date": datetime.strptime(parsed.date_iso, "%Y-%m-%d").strftime("%d.%m.%Y") if parsed else None,
"date_iso": parsed.date_iso if parsed else None,
"matched_date_phrase": matched_phrase,
"matched_date_span": matched_span,
}
if debug:
payload["date_debug"] = {
"reference_date": ref_date.isoformat(),
"input_text": text,
"matched_date_phrase": payload["matched_date_phrase"],
"matched_date_span": payload["matched_date_span"],
"date_iso": payload["date_iso"],
}
return payload
@staticmethod
def to_date(value: str | date) -> date:
"""Преобразует строку или date в date."""
return value if isinstance(value, date) else datetime.strptime(value, "%Y-%m-%d").date()