File size: 12,900 Bytes
13812dc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
"""Database helpers for the FastAPI application."""

from __future__ import annotations

import os
from functools import lru_cache
from pathlib import Path

import duckdb

try:
    from .sql_loader import load_sql
except ImportError:  # pragma: no cover - fallback for direct execution
    from sql_loader import load_sql

BASE_DIR = Path(__file__).resolve().parent
DEFAULT_DATA_DIR = BASE_DIR / "data"


def _resolve_path(env_var: str, default: Path) -> Path:
    value = os.environ.get(env_var)
    return Path(value) if value else default


def _data_dir() -> Path:
    return _resolve_path("ETYM_DATA_DIR", DEFAULT_DATA_DIR)


@lru_cache(maxsize=1)
def database_path() -> Path:
    """Return the configured DuckDB path, creating parent directories."""
    path = _resolve_path("ETYM_DB_PATH", _data_dir() / "etymdb.duckdb")
    path.parent.mkdir(parents=True, exist_ok=True)
    return path


def _ensure_database() -> Path:
    db_path = database_path()
    if db_path.exists():
        return db_path

    try:
        from . import ingest  # type: ignore[attr-defined]
    except ImportError:  # pragma: no cover - fallback for direct execution
        import ingest

    try:
        ingest.main()
    except Exception as exc:  # pragma: no cover - propagation with context
        raise RuntimeError("Failed to ingest the EtymDB dataset") from exc

    if not db_path.exists():
        raise RuntimeError(f"Expected DuckDB database at {db_path} after ingestion")
    return db_path


class _ConnectionManager:
    """Lazily open DuckDB connections when required."""

    def __init__(self) -> None:
        self._conn: duckdb.DuckDBPyConnection | None = None

    def __enter__(self) -> duckdb.DuckDBPyConnection:
        db_path = _ensure_database()
        self._conn = duckdb.connect(db_path.as_posix(), read_only=True)
        return self._conn

    def __exit__(self, exc_type, exc, tb) -> None:
        if self._conn is not None:
            self._conn.close()


def _normalize_depth(depth: int) -> int:
    return max(depth, 0)


def _get_language_families(conn) -> dict[str, dict[str, str]]:
    """Load language families into a lookup dict."""
    rows = conn.execute(load_sql("queries/get_language_families.sql")).fetchall()
    return {row[0]: {"name": row[1], "family": row[2], "branch": row[3]} for row in rows}


def _get_definitions_for_lexemes(conn, lexemes: list[str]) -> dict[str, str]:
    """Fetch primary definitions for the specified lexemes.

    Returns dict mapping lowercase lexeme -> definition string.
    Uses the first definition (entry_idx=0, meaning_idx=0, def_idx=0).
    """
    if not lexemes:
        return {}
    try:
        placeholders = ",".join(["?" for _ in lexemes])
        rows = conn.execute(
            f"""
            SELECT lexeme, definition
            FROM definitions
            WHERE lexeme IN ({placeholders})
              AND definition IS NOT NULL
              AND entry_idx = 0 AND meaning_idx = 0 AND def_idx = 0
            """,
            [lex.lower() for lex in lexemes],
        ).fetchall()
        return {row[0]: row[1].strip('"') if row[1] else None for row in rows}
    except Exception:
        return {}


def _make_node_id(lexeme: str, lang: str) -> str:
    """Create a unique node ID combining lexeme and language."""
    return f"{lexeme}|{lang}"


def _build_node(
    lexeme: str,
    lang: str,
    sense: str,
    lang_families: dict,
    enriched_defs: dict[str, str] | None = None,
) -> dict:
    """Build a rich node with all available metadata.

    Args:
        lexeme: The word
        lang: Language code
        sense: EtymDB sense/definition
        lang_families: Language family lookup dict
        enriched_defs: Optional dict of enriched definitions from Free Dictionary API
    """
    node = {
        "id": _make_node_id(lexeme, lang),  # Unique ID includes language
        "lexeme": lexeme,  # Display name
        "lang": lang,
    }

    # Determine best definition to use
    # Priority: enriched definition (for English) > EtymDB sense
    definition = None
    if enriched_defs and lang == "en" and lexeme.lower() in enriched_defs:
        definition = enriched_defs[lexeme.lower()]
    elif sense and sense.lower() != lexeme.lower():
        definition = sense

    if definition:
        node["sense"] = definition

    # Add language metadata if available
    lang_info = lang_families.get(lang)
    if lang_info:
        node["lang_name"] = lang_info["name"]
        node["family"] = lang_info["family"]
        node["branch"] = lang_info["branch"]
    else:
        # Fallback: use lang code as name
        node["lang_name"] = lang
    return node


def get_db_stats() -> dict:
    """Return row counts for key tables."""
    with _ConnectionManager() as conn:
        words = conn.execute("SELECT COUNT(*) FROM v_english_curated").fetchone()[0]
        try:
            definitions = conn.execute("SELECT COUNT(*) FROM definitions").fetchone()[0]
        except Exception:
            definitions = 0
        return {"words": words, "definitions": definitions}


def fetch_etymology(word: str, depth: int = 5) -> dict | None:
    """Return an etymology graph for *word* or ``None`` if absent."""
    if not word:
        return None

    depth = _normalize_depth(depth)
    with _ConnectionManager() as conn:
        # Load language families (small table, 53 rows)
        lang_families = _get_language_families(conn)

        # Find starting word (prefer English, then most etymology links)
        start = conn.execute(
            load_sql("queries/find_start_word.sql"),
            [word],
        ).fetchone()
        if not start:
            return None

        start_ix, start_lang, start_lexeme, start_sense = start

        # Collect all node data first (without definitions)
        raw_nodes: dict[int, tuple] = {start_ix: (start_lexeme, start_lang, start_sense)}
        edges = []
        seen_edges: set[tuple[str, str]] = set()

        if depth > 0:
            # Recursive traversal that handles both simple links and compound etymologies
            # When target < 0, it's a sequence ID that resolves to multiple parent words
            # Track is_compound to style compound edges differently in the UI
            records = conn.execute(
                load_sql("queries/traverse_etymology.sql"),
                [start_ix, depth],
            ).fetchall()

            for row in records:
                child_ix, child_lexeme, child_lang, child_sense = row[:4]
                parent_ix, parent_lexeme, parent_lang, parent_sense = row[4:8]
                is_compound = row[8]
                link_type = row[9]

                raw_nodes.setdefault(child_ix, (child_lexeme, child_lang, child_sense))
                raw_nodes.setdefault(parent_ix, (parent_lexeme, parent_lang, parent_sense))

                # Build edges with compound flag and link type for UI styling
                child_id = _make_node_id(child_lexeme, child_lang)
                parent_id = _make_node_id(parent_lexeme, parent_lang)
                if child_id != parent_id:
                    edge_key = (child_id, parent_id)
                    if edge_key not in seen_edges:
                        seen_edges.add(edge_key)
                        edge = {"source": child_id, "target": parent_id}
                        if is_compound:
                            edge["compound"] = True
                        if link_type:
                            edge["type"] = link_type
                        edges.append(edge)

        # Fetch definitions only for English lexemes in this graph
        english_lexemes = [lex for lex, lang, _ in raw_nodes.values() if lang == "en"]
        enriched_defs = _get_definitions_for_lexemes(conn, english_lexemes)

        # Build final nodes with all metadata
        nodes = {
            ix: _build_node(lexeme, lang, sense, lang_families, enriched_defs)
            for ix, (lexeme, lang, sense) in raw_nodes.items()
        }

        # Word exists but has no etymology links
        if not edges:
            return {
                "nodes": list(nodes.values()),
                "edges": [],
                "no_etymology": True,
                "lexeme": start_lexeme,
            }

        return {"nodes": list(nodes.values()), "edges": edges}


def fetch_random_word(include_compound: bool = True) -> dict[str, str | None]:
    """Return a random curated English word (has etymology, no phrases/proper nouns).

    Args:
        include_compound: If True, include compound-only words (e.g., "acquaintanceship").
                         If False, only return words with "deep" etymology chains.
    """
    view = "v_english_curated" if include_compound else "v_english_deep"
    # Guard against SQL injection (view name is interpolated)
    assert view in ("v_english_curated", "v_english_deep"), f"Invalid view: {view}"
    with _ConnectionManager() as conn:
        row = conn.execute(f"SELECT lexeme FROM {view} ORDER BY random() LIMIT 1").fetchone()
        return {"word": row[0] if row else None}


def fetch_language_info(lang_code: str) -> dict[str, str] | None:
    """Return language family info for a language code."""
    with _ConnectionManager() as conn:
        row = conn.execute(
            load_sql("queries/get_language_info.sql"),
            [lang_code],
        ).fetchone()
        if row:
            return {"name": row[0], "family": row[1], "branch": row[2]}
        return None


def fetch_all_language_families() -> dict[str, dict[str, str]]:
    """Return all language family mappings."""
    with _ConnectionManager() as conn:
        rows = conn.execute(load_sql("queries/get_language_families.sql")).fetchall()
        return {row[0]: {"name": row[1], "family": row[2], "branch": row[3]} for row in rows}


def _is_useful_sense(sense: str | None, lexeme: str) -> bool:
    """Check if a sense provides useful information beyond the lexeme itself.

    NULL senses are filtered out - they're structural entries without
    meaningful definitions. We prefer entries where sense differs from lexeme.
    """
    if sense is None:
        return False
    sense_lower = sense.lower().strip('"')
    lexeme_lower = lexeme.lower()
    # Not useful: NULL, empty string, or equals lexeme
    return sense_lower != "" and sense_lower != lexeme_lower


def _format_sense_for_display(sense: str) -> str:
    """Format a sense for display in the UI."""
    return sense.strip('"')


def search_words(query: str, limit: int = 10) -> list[dict[str, str]]:
    """Search for English words matching the query (fuzzy prefix search).

    Returns words with etymology data. Shows EtymDB sense when it differs
    from lexeme, otherwise falls back to Free Dictionary definition.
    When multiple senses exist for a word, shows all of them.

    For words with multiple Free Dictionary definitions, shows the primary
    definition with a count indicator (e.g., "+3 more").

    TODO: The subquery for def_count could be optimized by pre-computing
    definition counts into a materialized column or separate table. This
    would require a schema change to the definitions table.
    """
    if not query or len(query) < 2:
        return []

    with _ConnectionManager() as conn:
        # Get primary definitions (entry=0, meaning=0, def=0) with total count
        rows = conn.execute(
            load_sql("queries/search_words.sql"),
            [query, query],
        ).fetchall()

        # Group by lexeme to handle duplicates
        seen_lexemes: dict[str, list[tuple]] = {}
        for lexeme, sense, definition, pos, def_count in rows:
            if lexeme not in seen_lexemes:
                seen_lexemes[lexeme] = []
            seen_lexemes[lexeme].append((sense, definition, pos, def_count or 0))

        # Build results
        results = []
        for lexeme, entries in seen_lexemes.items():
            useful_senses = [s for s, _, _, _ in entries if _is_useful_sense(s, lexeme)]

            if useful_senses:
                # Show all entries with useful senses
                for sense in useful_senses:
                    display = _format_sense_for_display(sense)
                    results.append({"word": lexeme, "sense": display})
            else:
                # No useful senses - show Free Dictionary definition with count
                _, definition, pos, def_count = entries[0]
                display = definition.strip('"') if definition else None

                # Add count indicator for polysemous words
                if display and def_count > 1:
                    pos_str = f"({pos}) " if pos else ""
                    display = f"{pos_str}{display} (+{def_count - 1} more)"

                results.append({"word": lexeme, "sense": display})

            if len(results) >= limit:
                break

        return results[:limit]