| |
| """ |
| Shared helpers for M2 metadata indexing and candidate retrieval. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import hashlib |
| import re |
| import unicodedata |
| from datetime import datetime, timezone |
|
|
|
|
| MULTISPACE_RE = re.compile(r"\s+") |
| NON_ALNUM_RE = re.compile(r"[^0-9a-z]+") |
| TOKEN_RE = re.compile(r"[0-9a-z]+") |
|
|
|
|
| def ts() -> str: |
| return datetime.now(timezone.utc).isoformat() |
|
|
|
|
| def stable_hash(text: str, prefix: str, length: int = 12) -> str: |
| return f"{prefix}:{hashlib.sha1(text.encode('utf-8')).hexdigest()[:length]}" |
|
|
|
|
| def normalize_text(text: str | None) -> str: |
| if not text: |
| return "" |
| text = unicodedata.normalize("NFKD", text) |
| text = text.encode("ascii", "ignore").decode("ascii") |
| text = text.lower() |
| text = NON_ALNUM_RE.sub(" ", text) |
| return MULTISPACE_RE.sub(" ", text).strip() |
|
|
|
|
| def unique_preserve(items: list[str]) -> list[str]: |
| seen: set[str] = set() |
| out: list[str] = [] |
| for item in items: |
| if not item or item in seen: |
| continue |
| seen.add(item) |
| out.append(item) |
| return out |
|
|
|
|
| def tokenize(text: str | None) -> list[str]: |
| return TOKEN_RE.findall(normalize_text(text)) |
|
|
|
|
| def make_acronym(text: str | None) -> str: |
| tokens = tokenize(text) |
| if len(tokens) < 2: |
| return "" |
| acronym = "".join(token[0] for token in tokens if token) |
| if len(acronym) < 2 or len(acronym) > 8: |
| return "" |
| return acronym |
|
|
|
|
| def contains_normalized_term(field_text: str | None, term: str) -> bool: |
| norm_field = normalize_text(field_text) |
| norm_term = normalize_text(term) |
| if not norm_field or not norm_term: |
| return False |
| padded_field = f" {norm_field} " |
| padded_term = f" {norm_term} " |
| return padded_term in padded_field |
|
|
|
|
| def fts_quote(term: str) -> str: |
| term = term.strip() |
| if not term: |
| return "" |
| escaped = term.replace('"', '""') |
| if " " in term or "-" in term or "/" in term: |
| return f'"{escaped}"' |
| return escaped |
|
|
|
|
| def build_fts_query(terms: list[str]) -> str: |
| quoted = [fts_quote(t) for t in unique_preserve([normalize_text(t) for t in terms]) if t] |
| return " OR ".join(quoted) |
|
|