|
|
"""Database helpers for the FastAPI application.""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import os |
|
|
from functools import lru_cache |
|
|
from pathlib import Path |
|
|
|
|
|
import duckdb |
|
|
|
|
|
BASE_DIR = Path(__file__).resolve().parent |
|
|
DEFAULT_DATA_DIR = BASE_DIR / "data" |
|
|
|
|
|
|
|
|
def _resolve_path(env_var: str, default: Path) -> Path: |
|
|
value = os.environ.get(env_var) |
|
|
return Path(value) if value else default |
|
|
|
|
|
|
|
|
def _data_dir() -> Path: |
|
|
return _resolve_path("ETYM_DATA_DIR", DEFAULT_DATA_DIR) |
|
|
|
|
|
|
|
|
@lru_cache(maxsize=1) |
|
|
def database_path() -> Path: |
|
|
"""Return the configured DuckDB path, creating parent directories.""" |
|
|
path = _resolve_path("ETYM_DB_PATH", _data_dir() / "etymdb.duckdb") |
|
|
path.parent.mkdir(parents=True, exist_ok=True) |
|
|
return path |
|
|
|
|
|
|
|
|
def _ensure_database() -> Path: |
|
|
db_path = database_path() |
|
|
if db_path.exists(): |
|
|
return db_path |
|
|
|
|
|
try: |
|
|
from . import ingest |
|
|
except ImportError: |
|
|
import ingest |
|
|
|
|
|
try: |
|
|
ingest.main() |
|
|
except Exception as exc: |
|
|
raise RuntimeError("Failed to ingest the EtymDB dataset") from exc |
|
|
|
|
|
if not db_path.exists(): |
|
|
raise RuntimeError(f"Expected DuckDB database at {db_path} after ingestion") |
|
|
return db_path |
|
|
|
|
|
|
|
|
class _ConnectionManager: |
|
|
"""Lazily open DuckDB connections when required.""" |
|
|
|
|
|
def __init__(self) -> None: |
|
|
self._conn: duckdb.DuckDBPyConnection | None = None |
|
|
|
|
|
def __enter__(self) -> duckdb.DuckDBPyConnection: |
|
|
db_path = _ensure_database() |
|
|
self._conn = duckdb.connect(db_path.as_posix(), read_only=True) |
|
|
return self._conn |
|
|
|
|
|
def __exit__(self, exc_type, exc, tb) -> None: |
|
|
if self._conn is not None: |
|
|
self._conn.close() |
|
|
|
|
|
|
|
|
def _normalize_depth(depth: int) -> int: |
|
|
return max(depth, 0) |
|
|
|
|
|
|
|
|
def _get_language_families(conn) -> dict[str, dict[str, str]]: |
|
|
"""Load language families into a lookup dict.""" |
|
|
rows = conn.execute( |
|
|
"SELECT lang_code, lang_name, family, branch FROM language_families" |
|
|
).fetchall() |
|
|
return {row[0]: {"name": row[1], "family": row[2], "branch": row[3]} for row in rows} |
|
|
|
|
|
|
|
|
def _get_definitions_for_lexemes(conn, lexemes: list[str]) -> dict[str, str]: |
|
|
"""Fetch primary definitions for the specified lexemes. |
|
|
|
|
|
Returns dict mapping lowercase lexeme -> definition string. |
|
|
Uses the first definition (entry_idx=0, meaning_idx=0, def_idx=0). |
|
|
""" |
|
|
if not lexemes: |
|
|
return {} |
|
|
try: |
|
|
placeholders = ",".join(["?" for _ in lexemes]) |
|
|
rows = conn.execute( |
|
|
f""" |
|
|
SELECT lexeme, definition |
|
|
FROM definitions |
|
|
WHERE lexeme IN ({placeholders}) |
|
|
AND definition IS NOT NULL |
|
|
AND entry_idx = 0 AND meaning_idx = 0 AND def_idx = 0 |
|
|
""", |
|
|
[lex.lower() for lex in lexemes], |
|
|
).fetchall() |
|
|
return {row[0]: row[1].strip('"') if row[1] else None for row in rows} |
|
|
except Exception: |
|
|
return {} |
|
|
|
|
|
|
|
|
def _make_node_id(lexeme: str, lang: str) -> str: |
|
|
"""Create a unique node ID combining lexeme and language.""" |
|
|
return f"{lexeme}|{lang}" |
|
|
|
|
|
|
|
|
def _build_node( |
|
|
lexeme: str, |
|
|
lang: str, |
|
|
sense: str, |
|
|
lang_families: dict, |
|
|
enriched_defs: dict[str, str] | None = None, |
|
|
) -> dict: |
|
|
"""Build a rich node with all available metadata. |
|
|
|
|
|
Args: |
|
|
lexeme: The word |
|
|
lang: Language code |
|
|
sense: EtymDB sense/definition |
|
|
lang_families: Language family lookup dict |
|
|
enriched_defs: Optional dict of enriched definitions from Free Dictionary API |
|
|
""" |
|
|
node = { |
|
|
"id": _make_node_id(lexeme, lang), |
|
|
"lexeme": lexeme, |
|
|
"lang": lang, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
definition = None |
|
|
if enriched_defs and lang == "en" and lexeme.lower() in enriched_defs: |
|
|
definition = enriched_defs[lexeme.lower()] |
|
|
elif sense and sense.lower() != lexeme.lower(): |
|
|
definition = sense |
|
|
|
|
|
if definition: |
|
|
node["sense"] = definition |
|
|
|
|
|
|
|
|
lang_info = lang_families.get(lang) |
|
|
if lang_info: |
|
|
node["lang_name"] = lang_info["name"] |
|
|
node["family"] = lang_info["family"] |
|
|
node["branch"] = lang_info["branch"] |
|
|
else: |
|
|
|
|
|
node["lang_name"] = lang |
|
|
return node |
|
|
|
|
|
|
|
|
def fetch_etymology(word: str, depth: int = 5) -> dict | None: |
|
|
"""Return an etymology graph for *word* or ``None`` if absent.""" |
|
|
if not word: |
|
|
return None |
|
|
|
|
|
depth = _normalize_depth(depth) |
|
|
with _ConnectionManager() as conn: |
|
|
|
|
|
lang_families = _get_language_families(conn) |
|
|
|
|
|
|
|
|
start = conn.execute( |
|
|
""" |
|
|
SELECT w.word_ix, w.lang, w.lexeme, w.sense |
|
|
FROM words w |
|
|
LEFT JOIN links l ON l.source = w.word_ix |
|
|
WHERE lower(w.lexeme) = lower(?) |
|
|
GROUP BY w.word_ix, w.lang, w.lexeme, w.sense |
|
|
ORDER BY |
|
|
CASE WHEN w.lang = 'en' THEN 0 ELSE 1 END, |
|
|
COUNT(l.target) DESC, |
|
|
w.word_ix |
|
|
LIMIT 1 |
|
|
""", |
|
|
[word], |
|
|
).fetchone() |
|
|
if not start: |
|
|
return None |
|
|
|
|
|
start_ix, start_lang, start_lexeme, start_sense = start |
|
|
|
|
|
|
|
|
raw_nodes: dict[int, tuple] = {start_ix: (start_lexeme, start_lang, start_sense)} |
|
|
edges = [] |
|
|
seen_edges: set[tuple[str, str]] = set() |
|
|
|
|
|
if depth > 0: |
|
|
|
|
|
|
|
|
|
|
|
records = conn.execute( |
|
|
""" |
|
|
WITH RECURSIVE |
|
|
-- Resolve negative targets through sequences table |
|
|
resolved_links AS ( |
|
|
-- Simple links (positive target = direct word reference) |
|
|
SELECT source, target AS parent_ix, FALSE AS is_compound |
|
|
FROM links |
|
|
WHERE target > 0 |
|
|
UNION ALL |
|
|
-- Compound links (negative target = sequence, resolve to parents) |
|
|
SELECT l.source, s.parent_ix, TRUE AS is_compound |
|
|
FROM links l |
|
|
JOIN sequences s ON s.seq_ix = l.target |
|
|
WHERE l.target < 0 |
|
|
), |
|
|
traversal(child_ix, parent_ix, is_compound, lvl) AS ( |
|
|
SELECT source, parent_ix, is_compound, 1 |
|
|
FROM resolved_links |
|
|
WHERE source = ? |
|
|
UNION ALL |
|
|
-- Only follow FROM parents that have valid sense (non-NULL for English) |
|
|
-- This keeps sense=NULL entries as nodes but doesn't traverse their garbage links |
|
|
SELECT rl.source, rl.parent_ix, rl.is_compound, lvl + 1 |
|
|
FROM traversal t |
|
|
JOIN resolved_links rl ON rl.source = t.parent_ix |
|
|
JOIN words parent_word ON parent_word.word_ix = t.parent_ix |
|
|
WHERE lvl < ? |
|
|
AND (parent_word.lang != 'en' OR parent_word.sense IS NOT NULL) |
|
|
) |
|
|
SELECT |
|
|
child.word_ix AS child_ix, |
|
|
child.lexeme AS child_lexeme, |
|
|
child.lang AS child_lang, |
|
|
child.sense AS child_sense, |
|
|
parent.word_ix AS parent_ix, |
|
|
parent.lexeme AS parent_lexeme, |
|
|
parent.lang AS parent_lang, |
|
|
parent.sense AS parent_sense, |
|
|
tr.is_compound |
|
|
FROM traversal tr |
|
|
JOIN words child ON child.word_ix = tr.child_ix |
|
|
JOIN words parent ON parent.word_ix = tr.parent_ix |
|
|
""", |
|
|
[start_ix, depth], |
|
|
).fetchall() |
|
|
|
|
|
for row in records: |
|
|
child_ix, child_lexeme, child_lang, child_sense = row[:4] |
|
|
parent_ix, parent_lexeme, parent_lang, parent_sense = row[4:8] |
|
|
is_compound = row[8] |
|
|
|
|
|
raw_nodes.setdefault(child_ix, (child_lexeme, child_lang, child_sense)) |
|
|
raw_nodes.setdefault(parent_ix, (parent_lexeme, parent_lang, parent_sense)) |
|
|
|
|
|
|
|
|
child_id = _make_node_id(child_lexeme, child_lang) |
|
|
parent_id = _make_node_id(parent_lexeme, parent_lang) |
|
|
if child_id != parent_id: |
|
|
edge_key = (child_id, parent_id) |
|
|
if edge_key not in seen_edges: |
|
|
seen_edges.add(edge_key) |
|
|
edge = {"source": child_id, "target": parent_id} |
|
|
if is_compound: |
|
|
edge["compound"] = True |
|
|
edges.append(edge) |
|
|
|
|
|
|
|
|
english_lexemes = [lex for lex, lang, _ in raw_nodes.values() if lang == "en"] |
|
|
enriched_defs = _get_definitions_for_lexemes(conn, english_lexemes) |
|
|
|
|
|
|
|
|
nodes = { |
|
|
ix: _build_node(lexeme, lang, sense, lang_families, enriched_defs) |
|
|
for ix, (lexeme, lang, sense) in raw_nodes.items() |
|
|
} |
|
|
|
|
|
|
|
|
if not edges: |
|
|
return None |
|
|
|
|
|
return {"nodes": list(nodes.values()), "edges": edges} |
|
|
|
|
|
|
|
|
def fetch_random_word(include_compound: bool = True) -> dict[str, str | None]: |
|
|
"""Return a random curated English word (has etymology, no phrases/proper nouns). |
|
|
|
|
|
Args: |
|
|
include_compound: If True, include compound-only words (e.g., "acquaintanceship"). |
|
|
If False, only return words with "deep" etymology chains. |
|
|
""" |
|
|
view = "v_english_curated" if include_compound else "v_english_deep" |
|
|
|
|
|
assert view in ("v_english_curated", "v_english_deep"), f"Invalid view: {view}" |
|
|
with _ConnectionManager() as conn: |
|
|
row = conn.execute(f"SELECT lexeme FROM {view} ORDER BY random() LIMIT 1").fetchone() |
|
|
return {"word": row[0] if row else None} |
|
|
|
|
|
|
|
|
def fetch_language_info(lang_code: str) -> dict[str, str] | None: |
|
|
"""Return language family info for a language code.""" |
|
|
with _ConnectionManager() as conn: |
|
|
row = conn.execute( |
|
|
"SELECT lang_name, family, branch FROM language_families WHERE lang_code = ?", |
|
|
[lang_code], |
|
|
).fetchone() |
|
|
if row: |
|
|
return {"name": row[0], "family": row[1], "branch": row[2]} |
|
|
return None |
|
|
|
|
|
|
|
|
def fetch_all_language_families() -> dict[str, dict[str, str]]: |
|
|
"""Return all language family mappings.""" |
|
|
with _ConnectionManager() as conn: |
|
|
rows = conn.execute( |
|
|
"SELECT lang_code, lang_name, family, branch FROM language_families" |
|
|
).fetchall() |
|
|
return {row[0]: {"name": row[1], "family": row[2], "branch": row[3]} for row in rows} |
|
|
|
|
|
|
|
|
def _is_useful_sense(sense: str | None, lexeme: str) -> bool: |
|
|
"""Check if a sense provides useful information beyond the lexeme itself. |
|
|
|
|
|
NULL senses are filtered out - they're structural entries without |
|
|
meaningful definitions. We prefer entries where sense differs from lexeme. |
|
|
""" |
|
|
if sense is None: |
|
|
return False |
|
|
sense_lower = sense.lower().strip('"') |
|
|
lexeme_lower = lexeme.lower() |
|
|
|
|
|
return sense_lower != "" and sense_lower != lexeme_lower |
|
|
|
|
|
|
|
|
def _format_sense_for_display(sense: str) -> str: |
|
|
"""Format a sense for display in the UI.""" |
|
|
return sense.strip('"') |
|
|
|
|
|
|
|
|
def search_words(query: str, limit: int = 10) -> list[dict[str, str]]: |
|
|
"""Search for English words matching the query (fuzzy prefix search). |
|
|
|
|
|
Returns words with etymology data. Shows EtymDB sense when it differs |
|
|
from lexeme, otherwise falls back to Free Dictionary definition. |
|
|
When multiple senses exist for a word, shows all of them. |
|
|
|
|
|
For words with multiple Free Dictionary definitions, shows the primary |
|
|
definition with a count indicator (e.g., "+3 more"). |
|
|
|
|
|
TODO: The subquery for def_count could be optimized by pre-computing |
|
|
definition counts into a materialized column or separate table. This |
|
|
would require a schema change to the definitions table. |
|
|
""" |
|
|
if not query or len(query) < 2: |
|
|
return [] |
|
|
|
|
|
with _ConnectionManager() as conn: |
|
|
|
|
|
rows = conn.execute( |
|
|
""" |
|
|
SELECT w.lexeme, w.sense, d.definition, d.part_of_speech, dc.def_count |
|
|
FROM v_english_curated w |
|
|
LEFT JOIN definitions d ON d.lexeme = lower(w.lexeme) |
|
|
AND d.entry_idx = 0 AND d.meaning_idx = 0 AND d.def_idx = 0 |
|
|
LEFT JOIN ( |
|
|
SELECT lexeme, COUNT(*) as def_count |
|
|
FROM definitions |
|
|
GROUP BY lexeme |
|
|
) dc ON dc.lexeme = lower(w.lexeme) |
|
|
WHERE lower(w.lexeme) LIKE lower(?) || '%' |
|
|
ORDER BY |
|
|
CASE WHEN lower(w.lexeme) = lower(?) THEN 0 ELSE 1 END, |
|
|
length(w.lexeme), |
|
|
w.lexeme, |
|
|
w.word_ix |
|
|
""", |
|
|
[query, query], |
|
|
).fetchall() |
|
|
|
|
|
|
|
|
seen_lexemes: dict[str, list[tuple]] = {} |
|
|
for lexeme, sense, definition, pos, def_count in rows: |
|
|
if lexeme not in seen_lexemes: |
|
|
seen_lexemes[lexeme] = [] |
|
|
seen_lexemes[lexeme].append((sense, definition, pos, def_count or 0)) |
|
|
|
|
|
|
|
|
results = [] |
|
|
for lexeme, entries in seen_lexemes.items(): |
|
|
useful_senses = [s for s, _, _, _ in entries if _is_useful_sense(s, lexeme)] |
|
|
|
|
|
if useful_senses: |
|
|
|
|
|
for sense in useful_senses: |
|
|
display = _format_sense_for_display(sense) |
|
|
results.append({"word": lexeme, "sense": display}) |
|
|
else: |
|
|
|
|
|
_, definition, pos, def_count = entries[0] |
|
|
display = definition.strip('"') if definition else None |
|
|
|
|
|
|
|
|
if display and def_count > 1: |
|
|
pos_str = f"({pos}) " if pos else "" |
|
|
display = f"{pos_str}{display} (+{def_count - 1} more)" |
|
|
|
|
|
results.append({"word": lexeme, "sense": display}) |
|
|
|
|
|
if len(results) >= limit: |
|
|
break |
|
|
|
|
|
return results[:limit] |
|
|
|