"""Database helpers for the FastAPI application.""" from __future__ import annotations import os from functools import lru_cache from pathlib import Path import duckdb BASE_DIR = Path(__file__).resolve().parent DEFAULT_DATA_DIR = BASE_DIR / "data" def _resolve_path(env_var: str, default: Path) -> Path: value = os.environ.get(env_var) return Path(value) if value else default def _data_dir() -> Path: return _resolve_path("ETYM_DATA_DIR", DEFAULT_DATA_DIR) @lru_cache(maxsize=1) def database_path() -> Path: """Return the configured DuckDB path, creating parent directories.""" path = _resolve_path("ETYM_DB_PATH", _data_dir() / "etymdb.duckdb") path.parent.mkdir(parents=True, exist_ok=True) return path def _ensure_database() -> Path: db_path = database_path() if db_path.exists(): return db_path try: from . import ingest # type: ignore[attr-defined] except ImportError: # pragma: no cover - fallback for direct execution import ingest try: ingest.main() except Exception as exc: # pragma: no cover - propagation with context raise RuntimeError("Failed to ingest the EtymDB dataset") from exc if not db_path.exists(): raise RuntimeError(f"Expected DuckDB database at {db_path} after ingestion") return db_path class _ConnectionManager: """Lazily open DuckDB connections when required.""" def __init__(self) -> None: self._conn: duckdb.DuckDBPyConnection | None = None def __enter__(self) -> duckdb.DuckDBPyConnection: db_path = _ensure_database() self._conn = duckdb.connect(db_path.as_posix(), read_only=True) return self._conn def __exit__(self, exc_type, exc, tb) -> None: if self._conn is not None: self._conn.close() def _normalize_depth(depth: int) -> int: return max(depth, 0) def _get_language_families(conn) -> dict[str, dict[str, str]]: """Load language families into a lookup dict.""" rows = conn.execute( "SELECT lang_code, lang_name, family, branch FROM language_families" ).fetchall() return {row[0]: {"name": row[1], "family": row[2], "branch": row[3]} for row in rows} def _get_definitions_for_lexemes(conn, lexemes: list[str]) -> dict[str, str]: """Fetch primary definitions for the specified lexemes. Returns dict mapping lowercase lexeme -> definition string. Uses the first definition (entry_idx=0, meaning_idx=0, def_idx=0). """ if not lexemes: return {} try: placeholders = ",".join(["?" for _ in lexemes]) rows = conn.execute( f""" SELECT lexeme, definition FROM definitions WHERE lexeme IN ({placeholders}) AND definition IS NOT NULL AND entry_idx = 0 AND meaning_idx = 0 AND def_idx = 0 """, [lex.lower() for lex in lexemes], ).fetchall() return {row[0]: row[1].strip('"') if row[1] else None for row in rows} except Exception: return {} def _make_node_id(lexeme: str, lang: str) -> str: """Create a unique node ID combining lexeme and language.""" return f"{lexeme}|{lang}" def _build_node( lexeme: str, lang: str, sense: str, lang_families: dict, enriched_defs: dict[str, str] | None = None, ) -> dict: """Build a rich node with all available metadata. Args: lexeme: The word lang: Language code sense: EtymDB sense/definition lang_families: Language family lookup dict enriched_defs: Optional dict of enriched definitions from Free Dictionary API """ node = { "id": _make_node_id(lexeme, lang), # Unique ID includes language "lexeme": lexeme, # Display name "lang": lang, } # Determine best definition to use # Priority: enriched definition (for English) > EtymDB sense definition = None if enriched_defs and lang == "en" and lexeme.lower() in enriched_defs: definition = enriched_defs[lexeme.lower()] elif sense and sense.lower() != lexeme.lower(): definition = sense if definition: node["sense"] = definition # Add language metadata if available lang_info = lang_families.get(lang) if lang_info: node["lang_name"] = lang_info["name"] node["family"] = lang_info["family"] node["branch"] = lang_info["branch"] else: # Fallback: use lang code as name node["lang_name"] = lang return node def fetch_etymology(word: str, depth: int = 5) -> dict | None: """Return an etymology graph for *word* or ``None`` if absent.""" if not word: return None depth = _normalize_depth(depth) with _ConnectionManager() as conn: # Load language families (small table, 53 rows) lang_families = _get_language_families(conn) # Find starting word (prefer English, then most etymology links) start = conn.execute( """ SELECT w.word_ix, w.lang, w.lexeme, w.sense FROM words w LEFT JOIN links l ON l.source = w.word_ix WHERE lower(w.lexeme) = lower(?) GROUP BY w.word_ix, w.lang, w.lexeme, w.sense ORDER BY CASE WHEN w.lang = 'en' THEN 0 ELSE 1 END, COUNT(l.target) DESC, w.word_ix LIMIT 1 """, [word], ).fetchone() if not start: return None start_ix, start_lang, start_lexeme, start_sense = start # Collect all node data first (without definitions) raw_nodes: dict[int, tuple] = {start_ix: (start_lexeme, start_lang, start_sense)} edges = [] seen_edges: set[tuple[str, str]] = set() if depth > 0: # Recursive traversal that handles both simple links and compound etymologies # When target < 0, it's a sequence ID that resolves to multiple parent words # Track is_compound to style compound edges differently in the UI records = conn.execute( """ WITH RECURSIVE -- Resolve negative targets through sequences table resolved_links AS ( -- Simple links (positive target = direct word reference) SELECT source, target AS parent_ix, FALSE AS is_compound FROM links WHERE target > 0 UNION ALL -- Compound links (negative target = sequence, resolve to parents) SELECT l.source, s.parent_ix, TRUE AS is_compound FROM links l JOIN sequences s ON s.seq_ix = l.target WHERE l.target < 0 ), traversal(child_ix, parent_ix, is_compound, lvl) AS ( SELECT source, parent_ix, is_compound, 1 FROM resolved_links WHERE source = ? UNION ALL -- Only follow FROM parents that have valid sense (non-NULL for English) -- This keeps sense=NULL entries as nodes but doesn't traverse their garbage links SELECT rl.source, rl.parent_ix, rl.is_compound, lvl + 1 FROM traversal t JOIN resolved_links rl ON rl.source = t.parent_ix JOIN words parent_word ON parent_word.word_ix = t.parent_ix WHERE lvl < ? AND (parent_word.lang != 'en' OR parent_word.sense IS NOT NULL) ) SELECT child.word_ix AS child_ix, child.lexeme AS child_lexeme, child.lang AS child_lang, child.sense AS child_sense, parent.word_ix AS parent_ix, parent.lexeme AS parent_lexeme, parent.lang AS parent_lang, parent.sense AS parent_sense, tr.is_compound FROM traversal tr JOIN words child ON child.word_ix = tr.child_ix JOIN words parent ON parent.word_ix = tr.parent_ix """, [start_ix, depth], ).fetchall() for row in records: child_ix, child_lexeme, child_lang, child_sense = row[:4] parent_ix, parent_lexeme, parent_lang, parent_sense = row[4:8] is_compound = row[8] raw_nodes.setdefault(child_ix, (child_lexeme, child_lang, child_sense)) raw_nodes.setdefault(parent_ix, (parent_lexeme, parent_lang, parent_sense)) # Build edges with compound flag for UI styling child_id = _make_node_id(child_lexeme, child_lang) parent_id = _make_node_id(parent_lexeme, parent_lang) if child_id != parent_id: edge_key = (child_id, parent_id) if edge_key not in seen_edges: seen_edges.add(edge_key) edge = {"source": child_id, "target": parent_id} if is_compound: edge["compound"] = True edges.append(edge) # Fetch definitions only for English lexemes in this graph english_lexemes = [lex for lex, lang, _ in raw_nodes.values() if lang == "en"] enriched_defs = _get_definitions_for_lexemes(conn, english_lexemes) # Build final nodes with all metadata nodes = { ix: _build_node(lexeme, lang, sense, lang_families, enriched_defs) for ix, (lexeme, lang, sense) in raw_nodes.items() } # Return None if word has no etymology (single node, no edges) if not edges: return None return {"nodes": list(nodes.values()), "edges": edges} def fetch_random_word(include_compound: bool = True) -> dict[str, str | None]: """Return a random curated English word (has etymology, no phrases/proper nouns). Args: include_compound: If True, include compound-only words (e.g., "acquaintanceship"). If False, only return words with "deep" etymology chains. """ view = "v_english_curated" if include_compound else "v_english_deep" # Guard against SQL injection (view name is interpolated) assert view in ("v_english_curated", "v_english_deep"), f"Invalid view: {view}" with _ConnectionManager() as conn: row = conn.execute(f"SELECT lexeme FROM {view} ORDER BY random() LIMIT 1").fetchone() return {"word": row[0] if row else None} def fetch_language_info(lang_code: str) -> dict[str, str] | None: """Return language family info for a language code.""" with _ConnectionManager() as conn: row = conn.execute( "SELECT lang_name, family, branch FROM language_families WHERE lang_code = ?", [lang_code], ).fetchone() if row: return {"name": row[0], "family": row[1], "branch": row[2]} return None def fetch_all_language_families() -> dict[str, dict[str, str]]: """Return all language family mappings.""" with _ConnectionManager() as conn: rows = conn.execute( "SELECT lang_code, lang_name, family, branch FROM language_families" ).fetchall() return {row[0]: {"name": row[1], "family": row[2], "branch": row[3]} for row in rows} def _is_useful_sense(sense: str | None, lexeme: str) -> bool: """Check if a sense provides useful information beyond the lexeme itself. NULL senses are filtered out - they're structural entries without meaningful definitions. We prefer entries where sense differs from lexeme. """ if sense is None: return False sense_lower = sense.lower().strip('"') lexeme_lower = lexeme.lower() # Not useful: NULL, empty string, or equals lexeme return sense_lower != "" and sense_lower != lexeme_lower def _format_sense_for_display(sense: str) -> str: """Format a sense for display in the UI.""" return sense.strip('"') def search_words(query: str, limit: int = 10) -> list[dict[str, str]]: """Search for English words matching the query (fuzzy prefix search). Returns words with etymology data. Shows EtymDB sense when it differs from lexeme, otherwise falls back to Free Dictionary definition. When multiple senses exist for a word, shows all of them. For words with multiple Free Dictionary definitions, shows the primary definition with a count indicator (e.g., "+3 more"). TODO: The subquery for def_count could be optimized by pre-computing definition counts into a materialized column or separate table. This would require a schema change to the definitions table. """ if not query or len(query) < 2: return [] with _ConnectionManager() as conn: # Get primary definitions (entry=0, meaning=0, def=0) with total count rows = conn.execute( """ SELECT w.lexeme, w.sense, d.definition, d.part_of_speech, dc.def_count FROM v_english_curated w LEFT JOIN definitions d ON d.lexeme = lower(w.lexeme) AND d.entry_idx = 0 AND d.meaning_idx = 0 AND d.def_idx = 0 LEFT JOIN ( SELECT lexeme, COUNT(*) as def_count FROM definitions GROUP BY lexeme ) dc ON dc.lexeme = lower(w.lexeme) WHERE lower(w.lexeme) LIKE lower(?) || '%' ORDER BY CASE WHEN lower(w.lexeme) = lower(?) THEN 0 ELSE 1 END, length(w.lexeme), w.lexeme, w.word_ix """, [query, query], ).fetchall() # Group by lexeme to handle duplicates seen_lexemes: dict[str, list[tuple]] = {} for lexeme, sense, definition, pos, def_count in rows: if lexeme not in seen_lexemes: seen_lexemes[lexeme] = [] seen_lexemes[lexeme].append((sense, definition, pos, def_count or 0)) # Build results results = [] for lexeme, entries in seen_lexemes.items(): useful_senses = [s for s, _, _, _ in entries if _is_useful_sense(s, lexeme)] if useful_senses: # Show all entries with useful senses for sense in useful_senses: display = _format_sense_for_display(sense) results.append({"word": lexeme, "sense": display}) else: # No useful senses - show Free Dictionary definition with count _, definition, pos, def_count = entries[0] display = definition.strip('"') if definition else None # Add count indicator for polysemous words if display and def_count > 1: pos_str = f"({pos}) " if pos else "" display = f"{pos_str}{display} (+{def_count - 1} more)" results.append({"word": lexeme, "sense": display}) if len(results) >= limit: break return results[:limit]