diff --git "a/app.py" "b/app.py" new file mode 100644--- /dev/null +++ "b/app.py" @@ -0,0 +1,3054 @@ +# ============================================================================ +# GERMAN LINGUISTICS HUB (CONSOLIDATED APP V3) +# +# This script combines multiple NLP tools into a single Gradio interface. +# +# TABS & FUNCTIONALITY: +# 1. Comprehensive Analyzer (DE): +# - CONTEXTUAL analysis of full sentences. +# - Ranks all semantics by relevance to the sentence. +# 2. Word Encyclopedia (DE): (NEW!) +# - NON-CONTEXTUAL analysis of single words. +# - Finds ALL grammatical (Pattern) and semantic (OdeNet, ConceptNet) +# possibilities, cross-validated and grouped by Part-of-Speech. +# - Ideal for enriching word lists. +# 3. spaCy Analyzer (Multi-lingual): Direct spaCy output. +# 4. Grammar Check (DE): LanguageTool. +# 5. Inflections (DE): Direct Pattern.de output. +# 6. Thesaurus (DE): Direct OdeNet output. +# 7. ConceptNet (Direct): Direct ConceptNet API output. +# ============================================================================ + + +# ============================================================================ +# 1. CONSOLIDATED IMPORTS +# ============================================================================ +import gradio as gr +import spacy +from spacy import displacy +import base64 +import traceback +import subprocess +import sys +import os +from pathlib import Path +import importlib +import site +import threading +import queue +from dataclasses import dataclass +from enum import Enum +from typing import Dict, Any, List, Set, Optional, Tuple +import requests +import zipfile +import re +import sqlite3 +from huggingface_hub import hf_hub_download + +# --- Requests and gradio Import (for ConceptNet) --- +try: + import requests + from requests.exceptions import RequestException, HTTPError, ConnectionError, Timeout + REQUESTS_AVAILABLE = True +except ImportError: + REQUESTS_AVAILABLE = False + print("="*70) + print("CRITICAL WARNING: `requests` library not found.") + print("ConceptNet features will not function.") + print("="*70) + + +try: + from gradio_client import Client + GRADIO_CLIENT_AVAILABLE = True + +except ImportError: + GRADIO_CLIENT_AVAILABLE = False + print("="*70) + print("CRITICAL WARNING: `gradio_client` library not found.") + print("ConceptNet features will not function.") + print("Install with: pip install gradio_client") + print("="*70) + +# --- IWNLP (spaCy Extension) Import --- +try: + from spacy_iwnlp import spaCyIWNLP + IWNLP_AVAILABLE = True + print("✓ Successfully imported spacy-iwnlp") +except ImportError: + IWNLP_AVAILABLE = False + spaCyIWNLP = object # Dummy definition for error case + print("="*70) + print("WARNING: `spacy-iwnlp` library not found.") + print("The 'Word Encyclopedia' tab will be less accurate.") + print("Install with: pip install spacy-iwnlp") + print("="*70) + +# --- LanguageTool Import --- +try: + import language_tool_python + LT_AVAILABLE = True + print("✓ Successfully imported language_tool") +except ImportError: + LT_AVAILABLE = False + print("="*70) + print("CRITICAL WARNING: `language-tool-python` library not found.") + print("The 'German Grammar Check' tab will not function.") + print("="*70) + +# --- OdeNet (wn) Import --- +try: + import wn + WN_AVAILABLE = True + print("✓ Successfully imported wordnet for odenet") +except ImportError: + WN_AVAILABLE = False + print("="*70) + print("CRITICAL WARNING: `wn` library not found.") + print("The 'German Thesaurus' tab will not function.") + print("="*70) + +# --- Pattern.de Import --- +try: + from pattern.de import ( + pluralize, singularize, conjugate, tenses, lemma, lexeme, + attributive, predicative, + article, gender, MALE, FEMALE, NEUTRAL, PLURAL, + INFINITIVE, PRESENT, PAST, PARTICIPLE, + FIRST, SECOND, THIRD, SINGULAR, PLURAL as PL, + INDICATIVE, IMPERATIVE, SUBJUNCTIVE, + NOMINATIVE, ACCUSATIVE, DATIVE, GENITIVE, + SUBJECT, OBJECT, INDIRECT, PROPERTY, + DEFINITE, INDEFINITE, + comparative, superlative, + NOUN, VERB, ADJECTIVE, + parse, split + ) + PATTERN_DE_AVAILABLE = True + print("✓ Successfully imported pattern.de") +except ImportError as e: + PATTERN_DE_AVAILABLE = False + print("="*70) + print(f"CRITICAL WARNING: `pattern.de` library not found: {e}") + print("The 'German Inflections' tab will not function.") + print("="*70) + +# --- HanTa Tagger Import --- +try: + from HanTa.HanoverTagger import HanoverTagger + import HanTa.HanoverTagger + # This sys.modules line is critical for pickle compatibility + sys.modules['HanoverTagger'] = HanTa.HanoverTagger + HANTA_AVAILABLE = True + print("✓ Successfully imported HanTa") +except ImportError: + HANTA_AVAILABLE = False + HanoverTagger = object # Dummy definition + print("="*70) + print("CRITICAL WARNING: `HanTa` library not found.") + print("The 'Word Encyclopedia' tab will NOT function.") + print("Install with: pip install HanTa") + print("="*70) + +# ============================================================================ +# 2. SHARED GLOBALS & CONFIG +# ============================================================================ +VERBOSE = True # Enable verbose debug output for Pattern.de +def log(msg): + """Print debug messages if verbose mode is on.""" + if VERBOSE: + print(f"[DEBUG] {msg}") + +# --- Wiktionary Cache & Lock --- +WIKTIONARY_DB_PATH = "de_wiktionary_normalized.db" +WIKTIONARY_REPO_ID = "cstr/de-wiktionary-sqlite-normalized" +WIKTIONARY_CONN: Optional[sqlite3.Connection] = None +WIKTIONARY_CONN_LOCK = threading.Lock() +WIKTIONARY_AVAILABLE = False + +# --- ConceptNet Cache & Lock --- +CONCEPTNET_CACHE: Dict[Tuple[str, str], Any] = {} +CONCEPTNET_LOCK = threading.Lock() + +# --- HanTa Tagger Cache & Lock --- +HANTA_TAGGER_INSTANCE: Optional[HanoverTagger] = None +HANTA_TAGGER_LOCK = threading.Lock() + +# --- Helper --- +def _html_wrap(content: str, line_height: str = "2.0") -> str: + """Wraps displaCy HTML in a consistent, scrollable div.""" + return f'
{content}
' + +# --- Helper for SVA --- +def _conjugate_to_person_number(verb_lemma: str, person: str, number: str) -> Optional[str]: + """ + Return a present tense finite form for given person/number. + person in {'1','2','3'}, number in {'sg','pl'}. + """ + if not PATTERN_DE_AVAILABLE: + return None + try: + alias = {"1sg":"1sg","2sg":"2sg","3sg":"3sg","1pl":"1pl","2pl":"2pl","3pl":"3pl"}[f"{person}{number}"] + return conjugate(verb_lemma, alias) + except Exception: + return None + +# ============================================================================ +# 3. SPACY ANALYZER LOGIC +# ============================================================================ +# --- Globals & Config for spaCy --- +SPACY_MODEL_INFO: Dict[str, Tuple[str, str, str]] = { + "de": ("German", "de_core_news_md", "spacy"), + "en": ("English", "en_core_web_md", "spacy"), + "es": ("Spanish", "es_core_news_md", "spacy"), + "grc-proiel-trf": ("Ancient Greek (PROIEL TRF)", "grc_proiel_trf", "grecy"), + "grc-perseus-trf": ("Ancient Greek (Perseus TRF)", "grc_perseus_trf", "grecy"), + "grc_ner_trf": ("Ancient Greek (NER TRF)", "grc_ner_trf", "grecy"), + "grc-proiel-lg": ("Ancient Greek (PROIEL LG)", "grc_proiel_lg", "grecy"), + "grc-perseus-lg": ("Ancient Greek (Perseus LG)", "grc_perseus_lg", "grecy"), + "grc-proiel-sm": ("Ancient Greek (PROIEL SM)", "grc_proiel_sm", "grecy"), + "grc-perseus-sm": ("Ancient Greek (Perseus SM)", "grc_perseus_sm", "grecy"), +} +SPACY_UI_TEXT = { + "de": { + "title": "# 🔍 Mehrsprachiger Morpho-Syntaktischer Analysator", + "subtitle": "Analysieren Sie Texte auf Deutsch, Englisch, Spanisch und Altgriechisch", + "ui_lang_label": "Benutzeroberflächensprache", + "model_lang_label": "Textsprache für Analyse", + "input_label": "Text eingeben", + "input_placeholder": "Geben Sie hier Ihren Text ein...", + "button_text": "Text analysieren", + "button_processing_text": "Verarbeitung läuft...", + "tab_graphic": "Grafische Darstellung", + "tab_table": "Tabelle", + "tab_json": "JSON", + "tab_ner": "Entitäten", + "html_label": "Abhängigkeitsparsing", + "table_label": "Morphologische Analyse", + "table_headers": ["Wort", "Lemma", "POS", "Tag", "Morphologie", "Abhängigkeit"], + "json_label": "JSON-Ausgabe", + "ner_label": "Benannte Entitäten", + "error_message": "Fehler: " + }, + "en": { + "title": "# 🔍 Multilingual Morpho-Syntactic Analyzer", + "subtitle": "Analyze texts in German, English, Spanish, and Ancient Greek", + "ui_lang_label": "Interface Language", + "model_lang_label": "Text Language for Analysis", + "input_label": "Enter Text", + "input_placeholder": "Enter your text here...", + "button_text": "Analyze Text", + "button_processing_text": "Processing...", + "tab_graphic": "Graphic View", + "tab_table": "Table", + "tab_json": "JSON", + "tab_ner": "Entities", + "html_label": "Dependency Parsing", + "table_label": "Morphological Analysis", + "table_headers": ["Word", "Lemma", "POS", "Tag", "Morphology", "Dependency"], + "json_label": "JSON Output", + "ner_label": "Named Entities", + "error_message": "Error: " + }, + "es": { + "title": "# 🔍 Analizador Morfo-Sintáctico Multilingüe", + "subtitle": "Analice textos en alemán, inglés, español y griego antiguo", + "ui_lang_label": "Idioma de la Interfaz", + "model_lang_label": "Idioma del Texto para Análisis", + "input_label": "Introducir Texto", + "input_placeholder": "Ingrese su texto aquí...", + "button_text": "Analizar Texto", + "button_processing_text": "Procesando...", + "tab_graphic": "Vista Gráfica", + "tab_table": "Tabla", + "tab_json": "JSON", + "tab_ner": "Entidades", + "html_label": "Análisis de Dependencias", + "table_label": "Análisis Morfológico", + "table_headers": ["Palabra", "Lema", "POS", "Etiqueta", "Morfología", "Dependencia"], + "json_label": "Salida JSON", + "ner_label": "Entidades Nombradas", + "error_message": "Error: " + } +} +SPACY_MODELS: Dict[str, Optional[spacy.Language]] = {} + +# --- Dependency Installation --- +def spacy_install_spacy_transformers_once(): + """ Installs spacy-transformers, required for all _trf models. """ + marker_file = Path(".spacy_transformers_installed") + if marker_file.exists(): + print("✓ spacy-transformers already installed (marker found)") + return True + + print("Installing spacy-transformers (for _trf models)...") + cmd = [sys.executable, "-m", "pip", "install", "spacy-transformers"] + try: + subprocess.run(cmd, capture_output=True, text=True, check=True, timeout=900) + print("✓ Successfully installed spacy-transformers") + marker_file.touch() + return True + except Exception as e: + print(f"✗ FAILED to install spacy-transformers: {e}") + if hasattr(e, 'stdout'): print(f"STDOUT: {e.stdout}") + if hasattr(e, 'stderr'): print(f"STDERR: {e.stderr}") + return False + +def spacy_install_grecy_model_from_github(model_name: str) -> bool: + """ Installs a greCy model from GitHub Release. """ + marker_file = Path(f".{model_name}_installed") + if marker_file.exists(): + print(f"✓ {model_name} already installed (marker found)") + return True + print(f"Installing grecy model: {model_name}...") + if model_name == "grc_proiel_trf": + wheel_filename = "grc_proiel_trf-3.7.5-py3-none-any.whl" + elif model_name in ["grc_perseus_trf", "grc_proiel_lg", "grc_perseus_lg", + "grc_proiel_sm", "grc_perseus_sm", "grc_ner_trf"]: + wheel_filename = f"{model_name}-0.0.0-py3-none-any.whl" + else: + print(f"✗ Unknown grecy model: {model_name}") + return False + install_url = f"https://github.com/CrispStrobe/greCy/releases/download/v1.0-models/{wheel_filename}" + cmd = [sys.executable, "-m", "pip", "install", install_url, "--no-deps"] + print(f"Running: {' '.join(cmd)}") + try: + result = subprocess.run(cmd, capture_output=True, text=True, check=True, timeout=900) + if result.stdout: print("STDOUT:", result.stdout) + if result.stderr: print("STDERR:", result.stderr) + print(f"✓ Successfully installed {model_name} from GitHub") + marker_file.touch() + return True + except subprocess.CalledProcessError as e: + print(f"✗ Installation subprocess FAILED with code {e.returncode}") + print("STDOUT:", e.stdout) + print("STDERR:", e.stderr) + return False + except Exception as e: + print(f"✗ Installation exception: {e}") + traceback.print_exc() + return False + +# --- Model Loading (Lazy Loading) --- +def spacy_load_spacy_model(model_name: str) -> Optional[spacy.Language]: + """Load or install a standard spaCy model.""" + try: + return spacy.load(model_name) + except OSError: + print(f"Installing {model_name}...") + try: + subprocess.check_call([sys.executable, "-m", "spacy", "download", model_name]) + return spacy.load(model_name) + except Exception as e: + print(f"✗ Failed to install {model_name}: {e}") + if hasattr(e, 'stderr'): print(f"STDERR: {e.stderr}") + return None + +def spacy_load_grecy_model(model_name: str) -> Optional[spacy.Language]: + """ Load a grecy model, installing from GitHub if needed. """ + if not spacy_install_grecy_model_from_github(model_name): + print(f"✗ Cannot load {model_name} because installation failed.") + return None + try: + print("Refreshing importlib to find new package...") + importlib.invalidate_caches() + try: importlib.reload(site) + except Exception: pass + print(f"Trying: spacy.load('{model_name}')") + nlp = spacy.load(model_name) + print(f"✓ Successfully loaded {model_name}") + return nlp + except Exception as e: + print(f"✗ Model {model_name} is installed but FAILED to load.") + print(f" Error: {e}") + traceback.print_exc() + return None + +def spacy_initialize_models(): + """ Pre-load standard models and ensure _trf dependencies are ready. """ + print("\n" + "="*70) + print("INITIALIZING SPACY MODELS") + print("="*70 + "\n") + spacy_install_spacy_transformers_once() + loaded_count = 0 + spacy_model_count = 0 + for lang_code, (lang_name, model_name, model_type) in SPACY_MODEL_INFO.items(): + if model_type == "spacy": + spacy_model_count += 1 + print(f"Loading {lang_name} ({model_name})...") + nlp = spacy_load_spacy_model(model_name) + SPACY_MODELS[lang_code] = nlp + if nlp: + print(f"✓ {lang_name} ready\n") + loaded_count += 1 + else: + print(f"✗ {lang_name} FAILED\n") + else: + print(f"✓ {lang_name} ({model_name}) will be loaded on first use.\n") + SPACY_MODELS[lang_code] = None + print(f"Pre-loaded {loaded_count}/{spacy_model_count} standard models.") + print("="*70 + "\n") + +# --- Analysis Logic --- +def spacy_get_analysis(ui_lang: str, model_lang_key: str, text: str): + """Analyze text and return results.""" + ui_config = SPACY_UI_TEXT.get(ui_lang.lower(), SPACY_UI_TEXT["en"]) + error_prefix = ui_config["error_message"] + try: + if not text.strip(): + return ([], [], "

No text provided.

", "

No text provided.

", + gr.Button(value=ui_config["button_text"], interactive=True)) + nlp = SPACY_MODELS.get(model_lang_key) + if nlp is None: + print(f"First use of {model_lang_key}. Loading model...") + if model_lang_key not in SPACY_MODEL_INFO: + raise ValueError(f"Unknown model key: {model_lang_key}") + _, model_name, model_type = SPACY_MODEL_INFO[model_lang_key] + if model_type == "grecy": + nlp = spacy_load_grecy_model(model_name) + else: + nlp = spacy_load_spacy_model(model_name) + if nlp is None: + SPACY_MODELS.pop(model_lang_key, None) + err_msg = f"Model for {model_lang_key} ({model_name}) FAILED to load. Check logs." + err_html = f"

{err_msg}

" + return ([], {"error": err_msg}, err_html, err_html, + gr.Button(value=ui_config["button_text"], interactive=True)) + else: + SPACY_MODELS[model_lang_key] = nlp + print(f"✓ {model_lang_key} is now loaded and cached.") + doc = nlp(text) + dataframe_output = [] + json_output = [] + for token in doc: + lemma_str = token.lemma_ + morph_str = str(token.morph) if token.morph else '' + dep_str = token.dep_ if doc.is_parsed else '' + tag_str = token.tag_ or '' + pos_str = token.pos_ or '' + json_output.append({ + "word": token.text, "lemma": lemma_str, "pos": pos_str, + "tag": tag_str, "morphology": morph_str, "dependency": dep_str, + "is_stopword": token.is_stop + }) + dataframe_output.append([token.text, lemma_str, pos_str, tag_str, morph_str, dep_str]) + html_dep_out = "" + if "parser" in nlp.pipe_names and doc.is_parsed: + try: + options = {"compact": True, "bg": "#ffffff", "color": "#000000", "font": "Source Sans Pro"} + html_svg = displacy.render(doc, style="dep", jupyter=False, options=options) + html_dep_out = _html_wrap(html_svg, line_height="2.5") + except Exception as e: + html_dep_out = f"

Visualization error (DEP): {e}

" + else: + html_dep_out = "

Dependency parsing ('parser') not available or doc not parsed.

" + html_ner_out = "" + if "ner" in nlp.pipe_names: + if doc.ents: + try: + html_ner = displacy.render(doc, style="ent", jupyter=False) + html_ner_out = _html_wrap(html_ner, line_height="2.5") + except Exception as e: + html_ner_out = f"

Visualization error (NER): {e}

" + else: + html_ner_out = "

No named entities found in this text.

" + else: + html_ner_out = "

Named Entity Recognition ('ner') not available for this model.

" + return (dataframe_output, json_output, html_dep_out, html_ner_out, + gr.Button(value=ui_config["button_text"], interactive=True)) + except Exception as e: + traceback.print_exc() + error_html = f"
{error_prefix} {str(e)}
" + return ([], {"error": str(e)}, error_html, error_html, + gr.Button(value=ui_config["button_text"], interactive=True)) + +# --- UI Update Logic --- +def spacy_update_ui(ui_lang: str): + """Update UI language for the spaCy tab.""" + ui_config = SPACY_UI_TEXT.get(ui_lang.lower(), SPACY_UI_TEXT["en"]) + return [ + gr.update(value=ui_config["title"]), + gr.update(value=ui_config["subtitle"]), + gr.update(label=ui_config["ui_lang_label"]), + gr.update(label=ui_config["model_lang_label"]), + gr.update(label=ui_config["input_label"], placeholder=ui_config["input_placeholder"]), + gr.update(value=ui_config["button_text"]), + gr.update(label=ui_config["tab_graphic"]), + gr.update(label=ui_config["tab_table"]), + gr.update(label=ui_config["tab_json"]), + gr.update(label=ui_config["tab_ner"]), + gr.update(label=ui_config["html_label"]), + gr.update(label=ui_config["table_label"], headers=ui_config["table_headers"]), + gr.update(label=ui_config["json_label"]), + gr.update(label=ui_config["ner_label"]) + ] + +# ============================================================================ +# 3b. IWNLP PIPELINE (NEW) +# ============================================================================ +IWNLP_PIPELINE: Optional[spacy.Language] = None +IWNLP_LOCK = threading.Lock() + +# Define paths for the data +DATA_DIR = "data" +LEMMATIZER_JSON_NAME = "IWNLP.Lemmatizer_20181001.json" +LEMMATIZER_JSON_PATH = os.path.join(DATA_DIR, LEMMATIZER_JSON_NAME) +LEMMATIZER_ZIP_URL = "https://dbs.cs.uni-duesseldorf.de/datasets/iwnlp/IWNLP.Lemmatizer_20181001.zip" +LEMMATIZER_ZIP_PATH = os.path.join(DATA_DIR, "IWNLP.Lemmatizer_20181001.zip") + +def iwnlp_download_and_unzip_data(): + """ + Checks for IWNLP data file. Downloads and unzips if not present. + """ + if os.path.exists(LEMMATIZER_JSON_PATH): + print("✓ IWNLP data file already exists.") + return True + + # --- File not found, must download and unzip --- + try: + os.makedirs(DATA_DIR, exist_ok=True) + + # 1. Download the ZIP file if it's not already here + if not os.path.exists(LEMMATIZER_ZIP_PATH): + print(f"IWNLP data not found. Downloading from {LEMMATIZER_ZIP_URL}...") + with requests.get(LEMMATIZER_ZIP_URL, stream=True) as r: + r.raise_for_status() + with open(LEMMATIZER_ZIP_PATH, 'wb') as f: + for chunk in r.iter_content(chunk_size=8192): + f.write(chunk) + print("✓ IWNLP Download complete.") + else: + print("✓ IWNLP zip file already present.") + + # 2. Unzip the file + print(f"Unzipping '{LEMMATIZER_ZIP_PATH}'...") + with zipfile.ZipFile(LEMMATIZER_ZIP_PATH, 'r') as zip_ref: + # Extract the specific file we need to the data directory + zip_ref.extract(LEMMATIZER_JSON_NAME, path=DATA_DIR) + print(f"✓ Unzip complete. File extracted to {LEMMATIZER_JSON_PATH}") + + if not os.path.exists(LEMMATIZER_JSON_PATH): + raise Exception("Unzip appeared to succeed, but the .json file is still missing.") + + return True + + except Exception as e: + print(f"✗ CRITICAL: Failed to download or unzip IWNLP data: {e}") + traceback.print_exc() + return False + + +def iwnlp_get_pipeline() -> Optional[spacy.Language]: + """ Thread-safe function to get a single instance of the IWNLP pipeline. """ + global IWNLP_PIPELINE + if not IWNLP_AVAILABLE: + raise ImportError("spacy-iwnlp library is not installed.") + + if IWNLP_PIPELINE: + return IWNLP_PIPELINE + + with IWNLP_LOCK: + if IWNLP_PIPELINE: + return IWNLP_PIPELINE + + try: + print("Initializing spaCy-IWNLP pipeline...") + + # --- 1. Ensure data file exists --- + if not iwnlp_download_and_unzip_data(): + return None # Failed to get data + + # --- 2. Load spaCy model --- + print("Loading 'de_core_news_md' for IWNLP...") + nlp_de = SPACY_MODELS.get("de") + if not nlp_de: + nlp_de = spacy_load_spacy_model("de_core_news_md") + if nlp_de: + SPACY_MODELS["de"] = nlp_de + else: + raise Exception("Failed to load 'de_core_news_md' for IWNLP.") + + # --- 3. Add IWNLP pipe --- + if not nlp_de.has_pipe("iwnlp"): + # This is the V3.0 initialization method + nlp_de.add_pipe('iwnlp', config={'lemmatizer_path': LEMMATIZER_JSON_PATH}) + print("✓ IWNLP pipe added to 'de' model.") + else: + print("✓ IWNLP pipe already present.") + + IWNLP_PIPELINE = nlp_de + return IWNLP_PIPELINE + + except Exception as e: + print(f"CRITICAL ERROR: Failed to initialize IWNLP pipeline: {e}") + traceback.print_exc() + return None + +# ============================================================================ +# 4. LANGUAGETOOL LOGIC +# ============================================================================ +# --- Globals for LanguageTool --- +LT_TOOL_INSTANCE: Optional[language_tool_python.LanguageTool] = None +LT_TOOL_LOCK = threading.Lock() +def lt_get_language_tool() -> Optional[language_tool_python.LanguageTool]: + """ Thread-safe function to get a single instance of the LanguageTool. """ + global LT_TOOL_INSTANCE + if not LT_AVAILABLE: + raise ImportError("language-tool-python library is not installed.") + if LT_TOOL_INSTANCE: + return LT_TOOL_INSTANCE + with LT_TOOL_LOCK: + if LT_TOOL_INSTANCE: + return LT_TOOL_INSTANCE + try: + print("Initializing LanguageTool for German (de-DE)...") + tool = language_tool_python.LanguageTool('de-DE') + try: + tool.picky = True + except Exception: + pass + _ = tool.check("Dies ist ein Test.") + print("LanguageTool (local server) initialized successfully.") + LT_TOOL_INSTANCE = tool + return LT_TOOL_INSTANCE + except Exception as e: + print(f"CRITICAL ERROR: Failed to initialize LanguageTool: {e}") + return None +# --- Grammar Checking Logic --- +def lt_check_grammar(text: str) -> List[Dict[str, Any]]: + """ Checks a German text for grammar and spelling errors and returns a JSON list. """ + try: + tool = lt_get_language_tool() + if tool is None: + return [{"error": "LanguageTool service failed to initialize."}] + if not text or not text.strip(): + return [{"info": "No text provided to check."}] + print(f"Checking text: {text}") + matches = tool.check(text) + if not matches: + try: + tool.picky = True + matches = tool.check(text) + except Exception: + pass + if not matches: + return [{"info": "No errors found!", "status": "perfect"}] + errors_list = [] + for match in matches: + error = { + "message": match.message, + "rule_id": match.ruleId, + "category": getattr(match.category, 'name', match.category), + "incorrect_text": text[match.offset : match.offset + match.errorLength], + "replacements": match.replacements, + "offset": match.offset, + "length": match.errorLength, + "context": getattr(match, "context", None), + "short_message": getattr(match, "shortMessage", None) + } + errors_list.append(error) + print(f"Found {len(errors_list)} errors.") + return errors_list + except Exception as e: + traceback.print_exc() + return [{"error": f"An unexpected error occurred: {str(e)}"}] + +# ============================================================================ +# 5. ODENET THESAURUS LOGIC +# ============================================================================ +# --- Globals & Classes for OdeNet --- +@dataclass +class OdeNetWorkItem: + """Represents a lookup request.""" + word: str + response_queue: queue.Queue +class OdeNetWorkerState(Enum): + NOT_STARTED = 1 + INITIALIZING = 2 + READY = 3 + ERROR = 4 +odenet_worker_state = OdeNetWorkerState.NOT_STARTED +odenet_worker_thread = None +odenet_work_queue = queue.Queue() +odenet_de_wn = None +# --- Worker Thread Logic --- +def odenet_download_wordnet_data(): + """Download WordNet data. Called once by worker thread.""" + if not WN_AVAILABLE: + print("[OdeNet Worker] 'wn' library not available. Skipping download.") + return False + try: + print("[OdeNet Worker] Downloading WordNet data...") + try: + wn.download('odenet:1.4') + except Exception as e: + print(f"[OdeNet Worker] Note: odenet download: {e}") + try: + wn.download('cili:1.0') + except Exception as e: + print(f"[OdeNet Worker] Note: cili download: {e}") + print("[OdeNet Worker] ✓ WordNet data ready") + return True + except Exception as e: + print(f"[OdeNet Worker] ✗ Failed to download WordNet data: {e}") + return False +def odenet_worker_loop(): + """ Worker thread main loop. """ + global odenet_worker_state, odenet_de_wn + if not WN_AVAILABLE: + print("[OdeNet Worker] 'wn' library not available. Worker cannot start.") + odenet_worker_state = OdeNetWorkerState.ERROR + return + try: + print("[OdeNet Worker] Starting worker thread...") + odenet_worker_state = OdeNetWorkerState.INITIALIZING + if not odenet_download_wordnet_data(): + odenet_worker_state = OdeNetWorkerState.ERROR + print("[OdeNet Worker] Failed to initialize") + return + print("[OdeNet Worker] Creating WordNet instance...") + odenet_de_wn = wn.Wordnet('odenet:1.4') + odenet_worker_state = OdeNetWorkerState.READY + print("[OdeNet Worker] Ready to process requests") + while True: + try: + item: OdeNetWorkItem = odenet_work_queue.get(timeout=1) + try: + result = odenet_process_word_lookup(item.word) + item.response_queue.put(("success", result)) + except Exception as e: + traceback.print_exc() + item.response_queue.put(("error", str(e))) + finally: + odenet_work_queue.task_done() + except queue.Empty: + continue + except Exception as e: + print(f"[OdeNet Worker] Fatal error: {e}") + traceback.print_exc() + odenet_worker_state = OdeNetWorkerState.ERROR +def odenet_process_word_lookup(word: str) -> Dict[str, Any]: + """ Process a single word lookup. Runs in the worker thread. """ + global odenet_de_wn + if not word or not word.strip(): + return {"info": "No word provided to check."} + word = word.strip().lower() + senses = odenet_de_wn.senses(word) + if not senses: + return {"info": f"The word '{word}' was not found in the thesaurus."} + results: Dict[str, Any] = {"input_word": word, "senses": []} + for sense in senses: + synset = sense.synset() + def get_lemmas(synsets, remove_self=False): + lemmas: Set[str] = set() + for s in synsets: + for lemma in s.lemmas(): + if not (remove_self and lemma == word): + lemmas.add(lemma) + return sorted(list(lemmas)) + antonym_words: Set[str] = set() + try: + for ant_sense in sense.get_related('antonym'): + antonym_words.add(ant_sense.word().lemma()) + except Exception: + pass + sense_info = { + "pos": synset.pos, + "definition": synset.definition() or "No definition available.", + "synonyms": get_lemmas([synset], remove_self=True), + "antonyms": sorted(list(antonym_words)), + "hypernyms (is a type of)": get_lemmas(synset.hypernyms()), + "hyponyms (examples are)": get_lemmas(synset.hyponyms()), + "holonyms (is part of)": get_lemmas(synset.holonyms()), + "meronyms (has parts)": get_lemmas(synset.meronyms()), + } + results["senses"].append(sense_info) + print(f"[OdeNet Worker] Found {len(results['senses'])} senses for '{word}'") + return results +def odenet_start_worker(): + """Start the worker thread if not already started.""" + global odenet_worker_thread, odenet_worker_state + if odenet_worker_state != OdeNetWorkerState.NOT_STARTED: + return + if not WN_AVAILABLE: + print("[OdeNet] 'wn' library not available. Worker will not be started.") + odenet_worker_state = OdeNetWorkerState.ERROR + return + odenet_worker_thread = threading.Thread(target=odenet_worker_loop, daemon=True, name="OdeNetWorker") + odenet_worker_thread.start() + timeout = 30 + for _ in range(timeout * 10): + if odenet_worker_state in (OdeNetWorkerState.READY, OdeNetWorkerState.ERROR): + break + threading.Event().wait(0.1) + if odenet_worker_state != OdeNetWorkerState.READY: + raise Exception("OdeNet Worker failed to initialize") +# --- Public API (Called by Gradio) --- +def odenet_get_thesaurus_info(word: str) -> Dict[str, Any]: + """ Public API: Finds thesaurus info for a German word. Thread-safe. """ + if not WN_AVAILABLE: + return {"error": "WordNet (wn) library is not available."} + if odenet_worker_state != OdeNetWorkerState.READY: + return {"error": "WordNet service is not ready yet. Please try again in a moment."} + try: + response_queue = queue.Queue() + item = OdeNetWorkItem(word=word, response_queue=response_queue) + odenet_work_queue.put(item) + try: + status, result = response_queue.get(timeout=30) + if status == "success": + return result + else: + return {"error": f"Lookup failed: {result}"} + except queue.Empty: + return {"error": "Request timed out"} + except Exception as e: + traceback.print_exc() + return {"error": f"An unexpected error occurred: {str(e)}"} + +# ============================================================================ +# 6. PATTERN INFLECTION LOGIC +# ============================================================================ +# --- Word Type Detection --- +def pattern_detect_word_type(word: str) -> Dict[str, Any]: + """ Use pattern.de's parser as a hint. """ + if not PATTERN_DE_AVAILABLE: + return {'pos': None, 'lemma': word, 'type': 'unknown'} + if not word or not word.strip() or all(ch in ".,;:!?()[]{}-–—'.../\|" for ch in word): + return {'pos': None, 'lemma': word, 'type': 'unknown'} + word_norm = word.strip() + log(f"Detecting type for: {word_norm}") + parser_result = {'pos': None, 'lemma': word_norm, 'type': None} + try: + parsed = parse(word_norm, lemmata=True) + for sentence in split(parsed): + if hasattr(sentence, "words") and sentence.words: + w = sentence.words[0] + w_type = getattr(w, "type", None) or getattr(w, "pos", None) + w_lemma = (getattr(w, "lemma", None) or word_norm) + non_content_prefixes = ("DT","ART","IN","APPR","APPRART","APPO","APZR","PTK","PRP","PPER","PPOS","PDS","PIS","KOUI","KON","$,","$.") + if w_type and any(w_type.startswith(p) for p in non_content_prefixes): + return {'pos': w_type, 'lemma': w_lemma, 'type': None} + parser_result['pos'] = w_type or "" + parser_result['lemma'] = w_lemma + if w_type and w_type.startswith('NN'): + parser_result['type'] = 'noun' + elif w_type and w_type.startswith('VB'): + parser_result['type'] = 'verb' + elif w_type and w_type.startswith('JJ'): + parser_result['type'] = 'adjective' + log(f" Parser says: POS={w_type}, lemma={w_lemma}, type={parser_result['type']}") + except Exception as e: + log(f" Parser failed: {e}") + return parser_result + +def pattern_is_good_analysis(analysis, analysis_type): + """Check if an analysis has meaningful data.""" + if not analysis: return False + + if analysis_type == 'noun': + # Check for declensions, either in the simple or ambiguous map + return len(analysis.get('declension', {})) >= 4 or len(analysis.get('declension_by_gender', {})) > 0 + + elif analysis_type == 'verb': + present = analysis.get('conjugation', {}).get('Präsens', {}) + if len(present) < 4: return False + unique_forms = set(present.values()) + if len(unique_forms) < 2: return False + return True + + elif analysis_type == 'adjective': + # **FIX: Better adjective validation** + # Must have attributive forms + if len(analysis.get('attributive', {})) == 0: + log(" ✗ Not a good adjective: No attributive forms.") + return False + + pred = analysis.get('predicative', '') + comp = analysis.get('comparative', '') + sup = analysis.get('superlative', '') + + if not pred: + log(" ✗ Not a good adjective: No predicative form.") + return False + + # Filter out nonsense: "lauf" -> "laufer", "laufst" + # Real comparatives end in -er. Real superlatives end in -st or -est. + # This allows "rasch" (rascher, raschst) but rejects "lauf" (laufer, laufst) + if comp and not comp.endswith("er"): + log(f" ✗ Not a good adjective: Comparative '{comp}' doesn't end in -er.") + return False + if sup and not (sup.endswith("st") or sup.endswith("est")): + log(f" ✗ Not a good adjective: Superlative '{sup}' doesn't end in -st/-est.") + return False + + return True + + return False + + +# --- Inflection Generators --- +def pattern_analyze_as_noun(word: str, hint_lemma: str = None) -> Dict[str, Any]: + """Comprehensive noun inflection analysis.""" + log(f" Analyzing as noun (hint_lemma={hint_lemma})") + analysis = {} + singular = singularize(word) + plural = pluralize(word) + log(f" singularize({word}) = {singular}") + log(f" pluralize({word}) = {plural}") + if plural != word and singular != word: + base = word + log(f" Word changes when pluralized => base = {base}") + elif singular != word: + base = singular + log(f" Word changes when singularized => base = {base}") + elif hint_lemma and hint_lemma != word: + base = hint_lemma + log(f" Using hint lemma => base = {base}") + else: + # This is a valid case, e.g. "Lauf" (singular) + base = word + log(f" Word is already base form => base = {base}") + + g = gender(base, pos=NOUN) + log(f" gender({base}) = {g}") + + # --- AMBIGUITY HANDLING for Nouns (e.g., der/das See) --- + if isinstance(g, tuple): + genders = list(g) + log(f" Detected ambiguous gender: {genders}") + elif g is None: + genders = [MALE] # Default + log(f" Gender unknown, defaulting to MALE") + else: + genders = [g] + + analysis["base_form"] = base + analysis["plural"] = pluralize(base) + analysis["singular"] = base + analysis["declension_by_gender"] = {} + + for gen in genders: + gender_str = {MALE: "Masculine", FEMALE: "Feminine", NEUTRAL: "Neuter"}.get(gen, "Unknown") + gen_declension = {} + for number, number_name in [(SINGULAR, "Singular"), (PLURAL, "Plural")]: + word_form = base if number == SINGULAR else pluralize(base) + word_form_cap = word_form.capitalize() + gender_for_article = gen if number == SINGULAR else PLURAL + for case, case_name in [(NOMINATIVE, "Nominativ"), (ACCUSATIVE, "Akkusativ"), + (DATIVE, "Dativ"), (GENITIVE, "Genitiv")]: + try: + def_art = article(word_form, DEFINITE, gender_for_article, case) + indef_art = article(word_form, INDEFINITE, gender_for_article, case) + indef_form = f"{indef_art} {word_form_cap}" if indef_art else word_form_cap + if number == PLURAL: + indef_form = "—" + gen_declension[f"{case_name} {number_name}"] = { + "definite": f"{def_art} {word_form_cap}" if def_art else word_form_cap, + "indefinite": indef_form, + "bare": word_form_cap + } + except Exception as e: + log(f" Failed to get article for {gender_str}/{case_name} {number_name}: {e}") + analysis["declension_by_gender"][gender_str] = gen_declension + + log(f" Generated declensions for {len(genders)} gender(s)") + if len(genders) == 1: + analysis["declension"] = analysis["declension_by_gender"][list(analysis["declension_by_gender"].keys())[0]] + analysis["gender"] = list(analysis["declension_by_gender"].keys())[0] + + return analysis +def pattern_analyze_as_verb(word: str, hint_lemma: str = None) -> Dict[str, Any]: + """Comprehensive verb conjugation analysis.""" + log(f" Analyzing as verb (hint_lemma={hint_lemma})") + verb_lemma = lemma(word) + log(f" lemma({word}) = {verb_lemma}") + if not verb_lemma or verb_lemma == word: + if hint_lemma and hint_lemma != word: + verb_lemma = hint_lemma + log(f" Using hint lemma: {verb_lemma}") + elif not verb_lemma: + log(f" No lemma found, trying base word") + verb_lemma = word # e.g. "lauf" + + analysis = {"infinitive": verb_lemma} + try: + lex = lexeme(verb_lemma) + if lex and len(lex) > 1: + analysis["lexeme"] = lex + log(f" lexeme has {len(lex)} forms") + except Exception as e: + log(f" Failed to get lexeme: {e}") + analysis["conjugation"] = {} + analysis["conjugation"]["Präsens"] = {} + present_count = 0 + for alias, name in [("1sg", "ich"), ("2sg", "du"), ("3sg", "er/sie/es"), + ("1pl", "wir"), ("2pl", "ihr"), ("3pl", "sie/Sie")]: + try: + form = conjugate(verb_lemma, alias) + if form: + analysis["conjugation"]["Präsens"][name] = form + present_count += 1 + except Exception as e: + log(f" Failed conjugate({verb_lemma}, {alias}): {e}") + log(f" Generated {present_count} present tense forms") + if present_count < 4: + # Try again with infinitive, e.g. if input was "lauf" + try: + verb_lemma = conjugate(word, INFINITIVE) + log(f" Retrying with infinitive '{verb_lemma}'") + analysis["infinitive"] = verb_lemma + present_count = 0 + for alias, name in [("1sg", "ich"), ("2sg", "du"), ("3sg", "er/sie/es"), + ("1pl", "wir"), ("2pl", "ihr"), ("3pl", "sie/Sie")]: + form = conjugate(verb_lemma, alias) + if form: + analysis["conjugation"]["Präsens"][name] = form + present_count += 1 + if present_count < 4: + log(f" Too few present forms, not a valid verb") + return None + except Exception as e: + log(f" Retry failed, not a valid verb: {e}") + return None + + analysis["conjugation"]["Präteritum"] = {} + for alias, name in [("1sgp", "ich"), ("2sgp", "du"), ("3sgp", "er/sie/es"), + ("1ppl", "wir"), ("2ppl", "ihr"), ("3ppl", "sie/Sie")]: + try: + form = conjugate(verb_lemma, alias) + if form: analysis["conjugation"]["Präteritum"][name] = form + except: pass + analysis["participles"] = {} + try: + form = conjugate(verb_lemma, "part") + if form: analysis["participles"]["Partizip Präsens"] = form + except: pass + try: + form = conjugate(verb_lemma, "ppart") + if form: analysis["participles"]["Partizip Perfekt"] = form + except: pass + analysis["conjugation"]["Imperativ"] = {} + for alias, name in [("2sg!", "du"), ("2pl!", "ihr")]: + try: + form = conjugate(verb_lemma, alias) + if form: analysis["conjugation"]["Imperativ"][name] = form + except: pass + analysis["conjugation"]["Konjunktiv I"] = {} + for alias, name in [("1sg?", "ich"), ("2sg?", "du"), ("3sg?", "er/sie/es"), + ("1pl?", "wir"), ("2pl?", "ihr"), ("3pl?", "sie/Sie")]: + try: + form = conjugate(verb_lemma, alias) + if form: analysis["conjugation"]["Konjunktiv I"][name] = form + except: pass + analysis["conjugation"]["Konjunktiv II"] = {} + for alias, name in [("1sgp?", "ich"), ("2sgp?", "du"), ("3sgp?", "er/sie/es"), + ("1ppl?", "wir"), ("2ppl?", "ihr"), ("3ppl?", "sie/Sie")]: + try: + form = conjugate(verb_lemma, alias) + if form: analysis["conjugation"]["Konjunktiv II"][name] = form + except: pass + return analysis +def pattern_analyze_as_adjective(word: str, hint_lemma: str = None) -> Dict[str, Any]: + """Comprehensive adjective inflection analysis.""" + log(f" Analyzing as adjective (hint_lemma={hint_lemma})") + base = predicative(word) + log(f" predicative({word}) = {base}") + if base == word.lower() and hint_lemma and hint_lemma != word: + base = hint_lemma + log(f" Using hint lemma: {base}") + + analysis = {} + analysis["predicative"] = base + + # *** FIX: Removed pos=ADJECTIVE, which was causing a crash *** + try: + analysis["comparative"] = comparative(base) + except Exception as e: + log(f" Failed to get comparative: {e}") + analysis["comparative"] = f"{base}er" # Fallback + + try: + analysis["superlative"] = superlative(base) + except Exception as e: + log(f" Failed to get superlative: {e}") + analysis["superlative"] = f"{base}st" # Fallback + + log(f" comparative = {analysis['comparative']}") + log(f" superlative = {analysis['superlative']}") + + analysis["attributive"] = {} + attr_count = 0 + for article_type, article_name in [(None, "Strong"), (INDEFINITE, "Mixed"), (DEFINITE, "Weak")]: + analysis["attributive"][article_name] = {} + for gender, gender_name in [(MALE, "Masculine"), (FEMALE, "Feminine"), + (NEUTRAL, "Neuter"), (PLURAL, "Plural")]: + analysis["attributive"][article_name][gender_name] = {} + for case, case_name in [(NOMINATIVE, "Nom"), (ACCUSATIVE, "Acc"), + (DATIVE, "Dat"), (GENITIVE, "Gen")]: + try: + attr_form = attributive(base, gender, case, article_type) + if article_type: + art = article("_", article_type, gender, case) + full_form = f"{art} {attr_form} [Noun]" if art else f"{attr_form} [Noun]" + else: + full_form = f"{attr_form} [Noun]" + analysis["attributive"][article_name][gender_name][case_name] = { + "form": attr_form, "example": full_form + } + attr_count += 1 + except Exception as e: + log(f" Failed attributive for {article_name}/{gender_name}/{case_name}: {e}") + + log(f" Generated {attr_count} attributive forms") + if attr_count == 0: + return None + return analysis +# --- Public API (Called by Gradio) --- +def pattern_get_all_inflections(word: str) -> Dict[str, Any]: + """ + Generates ALL possible inflections for a German word. + Analyzes the word as-is AND its lowercase version to catch + ambiguities like "Lauf" (noun) vs "lauf" (verb). + """ + if not PATTERN_DE_AVAILABLE: + return {"error": "`PatternLite` library not available."} + if not word or not word.strip(): + return {"info": "Please enter a word."} + word = word.strip() + word_lc = word.lower() + log("="*70); log(f"ANALYZING: {word} (and {word_lc})"); log("="*70) + + # --- Analyze word as-is (e.g., "Lauf") --- + detection_as_is = pattern_detect_word_type(word) + analyses_as_is: Dict[str, Any] = {} + try: + log("\n--- Trying analysis for: " + word + " ---") + noun_analysis_as_is = pattern_analyze_as_noun(word, detection_as_is['lemma']) + if noun_analysis_as_is and pattern_is_good_analysis(noun_analysis_as_is, 'noun'): + log("✓ Noun analysis is good") + analyses_as_is["noun"] = noun_analysis_as_is + verb_analysis_as_is = pattern_analyze_as_verb(word, detection_as_is['lemma']) + if verb_analysis_as_is and pattern_is_good_analysis(verb_analysis_as_is, 'verb'): + log("✓ Verb analysis is good") + analyses_as_is["verb"] = verb_analysis_as_is + adj_analysis_as_is = pattern_analyze_as_adjective(word, detection_as_is['lemma']) + if adj_analysis_as_is and pattern_is_good_analysis(adj_analysis_as_is, 'adjective'): + log("✓ Adjective analysis is good") + analyses_as_is["adjective"] = adj_analysis_as_is + except Exception as e: + log(f"\nERROR during 'as-is' analysis: {e}") + traceback.print_exc() + return {"error": f"An unexpected error occurred during 'as-is' analysis: {str(e)}"} + + # --- Analyze lowercase version (e.g., "lauf") if different --- + analyses_lc: Dict[str, Any] = {} + if word != word_lc: + detection_lc = pattern_detect_word_type(word_lc) + try: + log("\n--- Trying analysis for: " + word_lc + " ---") + noun_analysis_lc = pattern_analyze_as_noun(word_lc, detection_lc['lemma']) + if noun_analysis_lc and pattern_is_good_analysis(noun_analysis_lc, 'noun'): + log("✓ Noun analysis (lc) is good") + analyses_lc["noun"] = noun_analysis_lc + verb_analysis_lc = pattern_analyze_as_verb(word_lc, detection_lc['lemma']) + if verb_analysis_lc and pattern_is_good_analysis(verb_analysis_lc, 'verb'): + log("✓ Verb analysis (lc) is good") + analyses_lc["verb"] = verb_analysis_lc + adj_analysis_lc = pattern_analyze_as_adjective(word_lc, detection_lc['lemma']) + if adj_analysis_lc and pattern_is_good_analysis(adj_analysis_lc, 'adjective'): + log("✓ Adjective analysis (lc) is good") + analyses_lc["adjective"] = adj_analysis_lc + except Exception as e: + log(f"\nERROR during 'lowercase' analysis: {e}") + traceback.print_exc() + return {"error": f"An unexpected error occurred during 'lowercase' analysis: {str(e)}"} + + # --- Merge the results --- + final_analyses = analyses_as_is.copy() + for key, value in analyses_lc.items(): + if key not in final_analyses: + final_analyses[key] = value + + results: Dict[str, Any] = { + "input_word": word, + "analyses": final_analyses + } + if not results["analyses"]: + results["info"] = "Word could not be analyzed as noun, verb, or adjective." + log(f"\nFinal merged result: {len(results['analyses'])} analysis/analyses") + return results + +def word_appears_in_inflections(word: str, inflections: Dict[str, Any], pos_type: str) -> bool: + """ + Check if the input word appears in the inflection forms AND + cross-validate the POS with OdeNet to reject artifacts. + """ + import re + word_lower = word.lower() + word_cap = word.capitalize() + + # 1. Extract all actual inflection forms (not metadata) + actual_forms = [] + if pos_type == 'noun': + declension = inflections.get('declension', {}) + declension_by_gender = inflections.get('declension_by_gender', {}) + for case_data in declension.values(): + if isinstance(case_data, dict): actual_forms.append(case_data.get('bare', '')) + for gender_data in declension_by_gender.values(): + if isinstance(gender_data, dict): + for case_data in gender_data.values(): + if isinstance(case_data, dict): actual_forms.append(case_data.get('bare', '')) + + elif pos_type == 'verb': + conjugation = inflections.get('conjugation', {}) + for tense_data in conjugation.values(): + if isinstance(tense_data, dict): actual_forms.extend(tense_data.values()) + participles = inflections.get('participles', {}) + actual_forms.extend(participles.values()) + actual_forms.extend(inflections.get('lexeme', [])) + actual_forms.append(inflections.get('infinitive', '')) + + elif pos_type == 'adjective': + actual_forms.append(inflections.get('predicative', '')) + actual_forms.append(inflections.get('comparative', '')) + actual_forms.append(inflections.get('superlative', '')) + attributive = inflections.get('attributive', {}) + for article_data in attributive.values(): + if isinstance(article_data, dict): + for gender_data in article_data.values(): + if isinstance(gender_data, dict): + for case_data in gender_data.values(): + if isinstance(case_data, dict): actual_forms.append(case_data.get('form', '')) + + # 2. Clean forms and check for match + cleaned_forms = set() + for form in actual_forms: + if not form or form == '—': continue + # For simple forms (most verb forms, adjectives), use as-is + # For complex forms (nouns with articles), extract words + if ' ' in form or '[' in form: + words = re.findall(r'\b[\wäöüÄÖÜß]+\b', form) + cleaned_forms.update(w.lower() for w in words) + else: + cleaned_forms.add(form.lower()) + + articles = {'der', 'die', 'das', 'den', 'dem', 'des', 'ein', 'eine', 'einen', 'einem', 'eines', 'einer'} + cleaned_forms = {f for f in cleaned_forms if f not in articles} + + word_found_in_forms = False + if pos_type == 'noun': + # Nouns can be input as lowercase, but inflections are capitalized. + # We check if the *lowercase* input word matches a *lowercase* form. + if word_lower in cleaned_forms: + word_found_in_forms = True + else: + # For verbs/adjectives, a lowercase match is sufficient + if word_lower in cleaned_forms: + word_found_in_forms = True + + if not word_found_in_forms: + log(f" ✗ Word '{word}' not found in any {pos_type} inflection forms.") + return False + + log(f" ✓ Word '{word}' was found in the {pos_type} inflection table.") + + # 3. Cross-validate POS with OdeNet to filter artifacts (e.g., 'heute' as 'heuen') + if not WN_AVAILABLE: + log(" ⚠️ OdeNet (WN_AVAILABLE=False) is not available to validate POS. Accepting pattern.de's analysis.") + return True + + try: + if pos_type == 'noun': + pos_lemma = inflections.get("base_form", word_lower) + expected_pos_tag = 'n' + elif pos_type == 'verb': + pos_lemma = inflections.get("infinitive", word_lower) + expected_pos_tag = 'v' + elif pos_type == 'adjective': + pos_lemma = inflections.get("predicative", word_lower) + expected_pos_tag = 'a' + else: + log(f" ? Unknown pos_type '{pos_type}' for OdeNet check.") + return True # Don't block unknown types + + log(f" Validating {pos_type} (lemma: '{pos_lemma}') with OdeNet (expecting pos='{expected_pos_tag}')...") + odenet_result = odenet_get_thesaurus_info(pos_lemma) + senses = odenet_result.get('senses', []) + pos_senses = [s for s in senses if s.get('pos') == expected_pos_tag] + + # If no senses for lemma, check input word as fallback + if not pos_senses and pos_lemma.lower() != word.lower(): + log(f" No '{expected_pos_tag}' senses for lemma '{pos_lemma}'. Checking input word '{word}'...") + odenet_result = odenet_get_thesaurus_info(word) + senses = odenet_result.get('senses', []) + pos_senses = [s for s in senses if s.get('pos') == expected_pos_tag] + + if not pos_senses: + log(f" ✗ REJECTED: OdeNet has no '{expected_pos_tag}' senses for '{pos_lemma}' or '{word}'. This is likely a pattern.de artifact.") + return False + else: + log(f" ✓ VERIFIED: OdeNet found {len(pos_senses)} '{expected_pos_tag}' sense(s).") + return True + + except Exception as e: + log(f" ⚠️ OdeNet validation check failed with error: {e}") + return True # Fail open: If OdeNet fails, trust pattern.de + +# ============================================================================ +# 6b. CONCEPTNET HELPER LOGIC (V2 - ROBUST PARSER) +# ============================================================================ +def conceptnet_get_relations(word: str, language: str = 'de') -> Dict[str, Any]: + """ + Fetches relations from the cstr/conceptnet_normalized Gradio API. + + This V2 version uses a robust regex parser to correctly handle the + Markdown output and filter self-referential junk. + """ + if not GRADIO_CLIENT_AVAILABLE: + return {"error": "`gradio_client` library is not installed. Install with: pip install gradio_client"} + + if not word or not word.strip(): + return {"info": "No word provided."} + + word_lower = word.strip().lower() + cache_key = (word_lower, language) + + # --- 1. Check Cache --- + with CONCEPTNET_LOCK: + if cache_key in CONCEPTNET_CACHE: + log(f"ConceptNet: Found '{word_lower}' in cache.") + return CONCEPTNET_CACHE[cache_key] + + log(f"ConceptNet: Fetching '{word_lower}' from Gradio API...") + + try: + # --- 2. Call Gradio API --- + client = Client("cstr/conceptnet_normalized") + + selected_relations = [ + "RelatedTo", "IsA", "PartOf", "HasA", "UsedFor", + "CapableOf", "AtLocation", "Synonym", "Antonym", + "Causes", "HasProperty", "MadeOf", "HasSubevent", + "DerivedFrom", "SimilarTo", "Desires", "CausesDesire" + ] + + result_markdown = client.predict( + word=word_lower, + lang=language, + selected_relations=selected_relations, + api_name="/get_semantic_profile" + ) + + # --- 3. Parse the Markdown Result (Robustly) --- + relations_list = [] + if not isinstance(result_markdown, str): + raise TypeError(f"ConceptNet API returned type {type(result_markdown)}, expected str.") + + lines = result_markdown.split('\n') + current_relation = None + + # Regex to capture: "- `[WEIGHT]`" + # Groups: (1: Node1) (2: Relation) (3: Node2) (4: Weight) + line_pattern = None + + for line in lines: + line = line.strip() + if not line: + continue + + # Check for relation headers (e.g., "## IsA") + if line.startswith('## '): + current_relation = line[3:].strip() + if current_relation: + # Pre-compile the regex for this specific relation + line_pattern = re.compile( + r"-\s*(.+?)\s+(%s)\s+→\s+(.+?)\s+\`\[([\d.]+)\]\`" % re.escape(current_relation) + ) + continue + + # Parse relation entries + if line.startswith('- ') and current_relation and line_pattern: + match = line_pattern.search(line) + + if not match: + log(f"ConceptNet Parser: No match for line '{line}' with relation '{current_relation}'") + continue + + try: + # Extract parts + node1 = match.group(1).strip().strip('*') + relation = match.group(2) # This is current_relation + node2 = match.group(3).strip().strip('*') + weight = float(match.group(4)) + + other_node = None + direction = None + + # Determine direction and filter self-references + if node1.lower() == word_lower and node2.lower() != word_lower: + other_node = node2 + direction = "->" + elif node2.lower() == word_lower and node1.lower() != word_lower: + other_node = node1 + direction = "<-" + else: + # This filters "schnell Synonym → schnell" + continue + + relations_list.append({ + "relation": relation, + "direction": direction, + "other_node": other_node, + "other_lang": language, # We assume the other node is also in the same lang + "weight": weight, + "surface": f"{node1} {relation} {node2}" + }) + + except Exception as e: + log(f"ConceptNet Parser: Error parsing line '{line}': {e}") + continue + + # --- 4. Finalize and Cache Result --- + if not relations_list: + final_result = {"info": f"No valid (non-self-referential) relations found for '{word_lower}'."} + else: + # Sort by weight, descending + relations_list.sort(key=lambda x: x.get('weight', 0.0), reverse=True) + final_result = {"relations": relations_list} + + with CONCEPTNET_LOCK: + CONCEPTNET_CACHE[cache_key] = final_result + + log(f"ConceptNet: Returning {len(relations_list)} relations for '{word_lower}'") + return final_result + + except Exception as e: + error_msg = f"ConceptNet Gradio API request failed: {type(e).__name__} - {e}" + log(f"ConceptNet API error for '{word_lower}': {e}") + traceback.print_exc() + return {"error": error_msg, "traceback": traceback.format_exc()} + +# ============================================================================ +# 6c. NEW: HANTA INITIALIZER & HELPERS +# ============================================================================ + +def hanta_get_tagger() -> Optional[HanoverTagger]: + """ Thread-safe function to get a single instance of the HanTa Tagger. """ + global HANTA_TAGGER_INSTANCE + if not HANTA_AVAILABLE: + raise ImportError("HanTa library is not installed.") + + if HANTA_TAGGER_INSTANCE: + return HANTA_TAGGER_INSTANCE + + with HANTA_TAGGER_LOCK: + if HANTA_TAGGER_INSTANCE: + return HANTA_TAGGER_INSTANCE + + try: + print("Initializing HanTa Tagger (loading model)...") + PACKAGE_DIR = os.path.dirname(HanTa.HanoverTagger.__file__) + MODEL_PATH = os.path.join(PACKAGE_DIR, 'morphmodel_ger.pgz') + + if not os.path.exists(MODEL_PATH): + print(f"CRITICAL: HanTa model file 'morphmodel_ger.pgz' not found at {MODEL_PATH}") + raise FileNotFoundError("HanTa model file missing. Please ensure HanTa is correctly installed.") + + tagger = HanoverTagger(MODEL_PATH) + _ = tagger.analyze("Test") # Warm-up call + print("✓ HanTa Tagger initialized successfully.") + HANTA_TAGGER_INSTANCE = tagger + return HANTA_TAGGER_INSTANCE + except Exception as e: + print(f"CRITICAL ERROR: Failed to initialize HanTa Tagger: {e}") + traceback.print_exc() + return None + +def _get_odenet_senses_by_pos(word: str) -> Dict[str, List[Dict[str, Any]]]: + """ + (Helper) Fetches OdeNet senses for a word and groups them by POS. + + *** V18 FIX: OdeNet uses 'a' for BOTH Adjective and Adverb. *** + """ + senses_by_pos: Dict[str, List[Dict]] = { + "noun": [], "verb": [], "adjective": [], "adverb": [] + } + if not WN_AVAILABLE: + log(f"OdeNet check skipped for '{word}': WN_AVAILABLE=False") + # If OdeNet is down, we can't validate, so we must return + # non-empty lists to avoid incorrectly rejecting a POS. + # This is a "fail-open" strategy. + return {"noun": [{"info": "OdeNet unavailable"}], + "verb": [{"info": "OdeNet unavailable"}], + "adjective": [{"info": "OdeNet unavailable"}], + "adverb": [{"info": "OdeNet unavailable"}]} + + try: + all_senses = odenet_get_thesaurus_info(word).get("senses", []) + for sense in all_senses: + if "error" in sense: continue + pos_tag = sense.get("pos") + + if pos_tag == 'n': + senses_by_pos["noun"].append(sense) + elif pos_tag == 'v': + senses_by_pos["verb"].append(sense) + + # --- THIS IS THE CRITICAL FIX --- + elif pos_tag == 'a': + log(f"Found OdeNet 'a' tag (Adj/Adv) for sense: {sense.get('definition', '...')[:30]}") + senses_by_pos["adjective"].append(sense) + senses_by_pos["adverb"].append(sense) + # --- END OF FIX --- + + except Exception as e: + log(f"OdeNet helper check failed for '{word}': {e}") + + log(f"OdeNet senses for '{word}': " + f"{len(senses_by_pos['noun'])}N, " + f"{len(senses_by_pos['verb'])}V, " + f"{len(senses_by_pos['adjective'])}Adj, " + f"{len(senses_by_pos['adverb'])}Adv") + return senses_by_pos + +def _hanta_get_candidates(word: str, hanta_tagger: "HanoverTagger") -> Set[str]: + """ + (Helper) Gets all possible HanTa STTS tags for a word, + checking both lowercase and capitalized versions. + """ + all_tags = set() + try: + # Check lowercase (for verbs, adjs, advs) + tags_lower = hanta_tagger.tag_word(word.lower(), cutoff=20) + all_tags.update(tag[0] for tag in tags_lower) + except Exception as e: + log(f"HanTa tag_word (lower) failed for '{word}': {e}") + + try: + # Check capitalized (for nouns) + tags_upper = hanta_tagger.tag_word(word.capitalize(), cutoff=20) + all_tags.update(tag[0] for tag in tags_upper) + except Exception as e: + log(f"HanTa tag_word (upper) failed for '{word}': {e}") + + log(f"HanTa candidates for '{word}': {all_tags}") + return all_tags + +def _hanta_map_tags_to_pos(hanta_tags: Set[str]) -> Dict[str, Set[str]]: + """ + (Helper) Maps STTS tags to simplified POS groups and injects the + ADJ(D) -> ADV heuristic. + """ + pos_groups = {"noun": set(), "verb": set(), "adjective": set(), "adverb": set()} + has_adjd = False + + for tag in hanta_tags: + # Nouns (NN), Proper Nouns (NE), Nominalized Inf. (NNI), Nom. Adj. (NNA) + if tag.startswith("NN") or tag == "NE": + pos_groups["noun"].add(tag) + # Verbs (VV...), Auxiliaries (VA...), Modals (VM...) + elif tag.startswith("VV") or tag.startswith("VA") or tag.startswith("VM"): + pos_groups["verb"].add(tag) + # Adjectives (Attributive ADJ(A), Predicative ADJ(D)) + elif tag.startswith("ADJ"): + pos_groups["adjective"].add(tag) + if tag == "ADJ(D)": + has_adjd = True + # Adverbs + elif tag == "ADV": + pos_groups["adverb"].add(tag) + + # --- The Core Heuristic --- + # If HanTa found a predicative adjective (ADJD), it can *also* be used + # as an adverb (e..g, "er singt schön" [ADV] vs. "er ist schön" [ADJD]). + if has_adjd: + log("Injecting ADV possibility based on ADJ(D) tag.") + pos_groups["adverb"].add("ADV (from ADJD)") + + # Filter out empty groups + return {k: v for k, v in pos_groups.items() if v} + +def _hanta_get_lemma_for_pos(word: str, pos_group: str, hanta_tagger: "HanoverTagger") -> str: + """ + (Helper) Gets the correct lemma for a given word and POS group + using case-sensitive analysis. + """ + lemma = "" + try: + if pos_group == "noun": + # Nouns must be lemmatized from their capitalized form + lemma = hanta_tagger.analyze(word.capitalize(), casesensitive=True)[0] + elif pos_group == "verb": + # Verbs must be lemmatized from their lowercase form + lemma = hanta_tagger.analyze(word.lower(), casesensitive=True)[0] + elif pos_group == "adjective": + # Adjectives are lemmatized from their lowercase form + lemma = hanta_tagger.analyze(word.lower(), casesensitive=True)[0] + elif pos_group == "adverb": + # Adverbs are also lemmatized from lowercase + lemma = hanta_tagger.analyze(word.lower(), casesensitive=True)[0] + + except Exception as e: + log(f"HanTa analyze failed for {word}/{pos_group}: {e}. Falling back.") + + # Fallback logic + if not lemma: + if pos_group == "noun": + return word.capitalize() + return word.lower() + + return lemma + +def _build_semantics(lemma: str, odenet_senses: List[Dict], top_n: int) -> Dict[str, Any]: + """ + (Helper) Builds the semantics block with OdeNet and ConceptNet. + """ + conceptnet_relations = [] + if REQUESTS_AVAILABLE: + try: + conceptnet_result = conceptnet_get_relations(lemma, language='de') + conceptnet_relations = conceptnet_result.get("relations", []) + except Exception as e: + conceptnet_relations = [{"error": str(e)}] + + if top_n > 0: + odenet_senses = odenet_senses[:top_n] + conceptnet_relations.sort(key=lambda x: x.get('weight', 0.0), reverse=True) + conceptnet_relations = conceptnet_relations[:top_n] + + return { + "lemma": lemma, + "odenet_senses": odenet_senses, + "conceptnet_relations": conceptnet_relations + } + +# ============================================================================ +# 6d. WIKTIONARY DATABASE LOGIC (NEW PRIMARY ENGINE) +# ============================================================================ + +# ============================================================================ +# 6d. WIKTIONARY DATABASE LOGIC (NEW PRIMARY ENGINE) +# ============================================================================ + +def wiktionary_download_db() -> bool: + """ + Downloads the Wiktionary DB from Hugging Face Hub if it doesn't exist. + """ + global WIKTIONARY_AVAILABLE + if os.path.exists(WIKTIONARY_DB_PATH): + print(f"✓ Wiktionary DB '{WIKTIONARY_DB_PATH}' already exists.") + WIKTIONARY_AVAILABLE = True + return True + + print(f"Wiktionary DB not found. Downloading from '{WIKTIONARY_REPO_ID}'...") + try: + hf_hub_download( + repo_id=WIKTIONARY_REPO_ID, + filename=WIKTIONARY_DB_PATH, + repo_type="dataset", + local_dir=".", + local_dir_use_symlinks=False + ) + print(f"✓ Wiktionary DB downloaded successfully.") + WIKTIONARY_AVAILABLE = True + return True + except Exception as e: + print(f"✗ CRITICAL: Failed to download Wiktionary DB: {e}") + traceback.print_exc() + return False + +def wiktionary_get_connection() -> Optional[sqlite3.Connection]: + """ + Thread-safe function to get a single, read-only SQLite connection. + """ + global WIKTIONARY_CONN, WIKTIONARY_AVAILABLE + if not WIKTIONARY_AVAILABLE: + log("Wiktionary DB is not available, cannot create connection.") + return None + + if WIKTIONARY_CONN: + return WIKTIONARY_CONN + + with WIKTIONARY_CONN_LOCK: + if WIKTIONARY_CONN: + return WIKTIONARY_CONN + + if not os.path.exists(WIKTIONARY_DB_PATH): + log("Wiktionary DB file missing, connection failed.") + WIKTIONARY_AVAILABLE = False + return None + + try: + log("Creating new read-only connection to Wiktionary DB...") + # URI mode for read-only connection + db_uri = f"file:{WIKTIONARY_DB_PATH}?mode=ro" + conn = sqlite3.connect(db_uri, uri=True, check_same_thread=False) + conn.row_factory = sqlite3.Row # Makes results dict-like + + # Test query + _ = conn.execute("SELECT name FROM sqlite_master WHERE type='table' LIMIT 1").fetchone() + + print("✓ Wiktionary DB connection successful.") + WIKTIONARY_CONN = conn + return WIKTIONARY_CONN + except Exception as e: + print(f"✗ CRITICAL: Failed to connect to Wiktionary DB: {e}") + traceback.print_exc() + WIKTIONARY_AVAILABLE = False + return None + +def _wiktionary_map_pos_key(wikt_pos: Optional[str]) -> str: + """Maps Wiktionary POS tags to our internal keys.""" + if not wikt_pos: + return "unknown" + if wikt_pos == "noun": return "noun" + if wikt_pos == "verb": return "verb" + if wikt_pos == "adj": return "adjective" + if wikt_pos == "adv": return "adverb" + return wikt_pos # E.g., "phrase", "abbrev" + +def _wiktionary_build_report_for_entry(entry_id: int, conn: sqlite3.Connection) -> Dict[str, Any]: + """ + Fetches all associated data for a single Wiktionary entry_id. + """ + report = {} + + # 1. Get Base Entry Info + entry_data = conn.execute( + "SELECT word, pos, pos_title, lang FROM entries WHERE id = ?", (entry_id,) + ).fetchone() + if not entry_data: + return {"error": "Entry ID not found"} + report.update(dict(entry_data)) + report["entry_id"] = entry_id + report["lemma"] = entry_data["word"] # Alias for clarity + + # 2. Get Senses (Definitions) + senses_q = conn.execute( + """ + SELECT s.id as sense_id, g.gloss_text + FROM senses s + JOIN glosses g ON s.id = g.sense_id + WHERE s.entry_id = ? + ORDER BY s.id, g.id + """, (entry_id,) + ).fetchall() + report["senses"] = [dict(s) for s in senses_q] + + # 3. Get Inflected Forms + forms_q = conn.execute( + """ + SELECT f.form_text, GROUP_CONCAT(t.tag, ', ') as tags + FROM forms f + LEFT JOIN form_tags ft ON f.id = ft.form_id + LEFT JOIN tags t ON ft.tag_id = t.id + WHERE f.entry_id = ? + GROUP BY f.id + ORDER BY f.id + """, (entry_id,) + ).fetchall() + report["forms"] = [dict(f) for f in forms_q] + + # 4. Get Pronunciations + sounds_q = conn.execute( + "SELECT ipa, audio FROM sounds WHERE entry_id = ?", (entry_id,) + ).fetchall() + report["sounds"] = [dict(s) for s in sounds_q] + + # 5. Get Synonyms + syn_q = conn.execute( + "SELECT synonym_word FROM synonyms WHERE entry_id = ?", (entry_id,) + ).fetchall() + report["synonyms"] = [s["synonym_word"] for s in syn_q] + + # 6. Get Antonyms + ant_q = conn.execute( + "SELECT antonym_word FROM antonyms WHERE entry_id = ?", (entry_id,) + ).fetchall() + report["antonyms"] = [a["antonym_word"] for a in ant_q] + + # 7. Get Examples (Limit 5 for brevity) + ex_q = conn.execute( + """ + SELECT ex.text + FROM examples ex + JOIN senses s ON ex.sense_id = s.id + WHERE s.entry_id = ? + LIMIT 5 + """, (entry_id,) + ).fetchall() + report["examples"] = [ex["text"] for ex in ex_q] + + return report + +def _wiktionary_find_all_entries(word: str, conn: sqlite3.Connection) -> List[Dict[str, Any]]: + """ + Finds all entries related to a word, checking both lemmas and + NON-VARIANT inflected forms. + Returns a list of full entry reports. + """ + log(f"Wiktionary: Querying for '{word}'...") + found_entry_ids: Set[int] = set() + + # 1. Check if the word is a lemma (base form) + # e.g., input "Haus" finds "Haus (Substantiv)" + # e.g., input "gehe" finds "gehe (Konjugierte Form)" + lemma_q = conn.execute( + "SELECT id FROM entries WHERE word = ? AND lang = 'Deutsch'", (word,) + ).fetchall() + for row in lemma_q: + found_entry_ids.add(row["id"]) + + # 2. Check if the word is a true inflected form, but NOT a "variant" + # e.g., input "gehe" finds "gehen (Verb)" + # e.g., input "Haus" finds "Hau (Substantiv)" + # This WILL NOT find "Häusle" from "Haus" anymore. + form_q = conn.execute( + """ + SELECT DISTINCT e.id + FROM forms f + JOIN entries e ON f.entry_id = e.id + WHERE f.form_text = ? AND e.lang = 'Deutsch' + AND f.id NOT IN ( + -- Exclude all form_ids that are tagged as 'variant' + SELECT ft.form_id + FROM form_tags ft + JOIN tags t ON ft.tag_id = t.id + WHERE t.tag = 'variant' + ) + """, (word,) + ).fetchall() + for row in form_q: + found_entry_ids.add(row["id"]) + + log(f"Wiktionary: Found {len(found_entry_ids)} unique matching entries.") + + # 3. Build a full report for each unique entry + all_reports = [] + for entry_id in found_entry_ids: + try: + report = _wiktionary_build_report_for_entry(entry_id, conn) + all_reports.append(report) + except Exception as e: + log(f"Wiktionary: Failed to build report for entry {entry_id}: {e}") + + return all_reports + +def _wiktionary_format_semantics_block( + wikt_report: Dict[str, Any], + lemma: str, + top_n: int +) -> Dict[str, Any]: + """ + Combines Wiktionary senses with OdeNet/ConceptNet senses. + """ + # 1. Get Wiktionary senses + wiktionary_senses = [] + for sense in wikt_report.get("senses", []): + wiktionary_senses.append({ + "definition": sense.get("gloss_text"), + "source": "wiktionary" + }) + + # 2. Get OdeNet senses for this lemma + pos_key = _wiktionary_map_pos_key(wikt_report.get("pos")) + odenet_senses = [] + if WN_AVAILABLE: + try: + # Use the corrected helper from your V18 code + senses_by_pos = _get_odenet_senses_by_pos(lemma) + odenet_senses_raw = senses_by_pos.get(pos_key, []) + + # Filter out placeholder + if odenet_senses_raw and "info" not in odenet_senses_raw[0]: + odenet_senses = odenet_senses_raw + except Exception as e: + log(f"OdeNet lookup failed for {lemma} ({pos_key}): {e}") + + # 3. Get ConceptNet relations + conceptnet_relations = [] + if REQUESTS_AVAILABLE: + try: + conceptnet_result = conceptnet_get_relations(lemma, language='de') + conceptnet_relations = conceptnet_result.get("relations", []) + except Exception as e: + conceptnet_relations = [{"error": str(e)}] + + # 4. Apply top_n limit + if top_n > 0: + wiktionary_senses = wiktionary_senses[:top_n] + odenet_senses = odenet_senses[:top_n] + conceptnet_relations.sort(key=lambda x: x.get('weight', 0.0), reverse=True) + conceptnet_relations = conceptnet_relations[:top_n] + + return { + "lemma": lemma, + "wiktionary_senses": wiktionary_senses, + "odenet_senses": odenet_senses, + "conceptnet_relations": conceptnet_relations, + "wiktionary_synonyms": wikt_report.get("synonyms", []), + "wiktionary_antonyms": wikt_report.get("antonyms", []) + } + +def _analyze_word_with_wiktionary(word: str, top_n: int) -> Dict[str, Any]: + """ + (NEW PRIMARY ENGINE) Analyzes a word using the Wiktionary DB. + Returns {} on failure to signal dispatcher to fall back. + """ + final_result: Dict[str, Any] = { + "input_word": word, + "analysis": {} + } + + conn = wiktionary_get_connection() + if not conn: + return {} # Return empty dict to signal failure + + try: + wiktionary_reports = _wiktionary_find_all_entries(word, conn) + except Exception as e: + log(f"Wiktionary query failed: {e}") + return {} # Signal failure + + if not wiktionary_reports: + return {} # No results, signal to fallback + + for wikt_report in wiktionary_reports: + pos_key = _wiktionary_map_pos_key(wikt_report.get("pos")) + lemma = wikt_report.get("lemma", word) + + # Build the inflection block from Wiktionary data + inflections_block = { + "base_form": lemma, + "forms_list": wikt_report.get("forms", []), + "source": "wiktionary" + } + + # Build the semantics block + semantics_block = _wiktionary_format_semantics_block(wikt_report, lemma, top_n) + + # Add Pattern.de analysis for comparison/completeness + pattern_block = {} + if PATTERN_DE_AVAILABLE: + try: + if pos_key == "noun": + pattern_block = pattern_analyze_as_noun(lemma) + elif pos_key == "verb": + pattern_block = pattern_analyze_as_verb(lemma) + elif pos_key == "adjective": + pattern_block = pattern_analyze_as_adjective(lemma) + except Exception: + pattern_block = {"error": "Pattern.de analysis failed."} + + # Build the final report for this POS entry + pos_entry_report = { + "inflections_wiktionary": inflections_block, + "inflections_pattern": pattern_block, + "semantics_combined": semantics_block, + "wiktionary_metadata": { + "pos_title": wikt_report.get("pos_title"), + "pronunciation": wikt_report.get("sounds"), + "examples": wikt_report.get("examples") + } + } + + # Append to the list for this POS key + if pos_key not in final_result["analysis"]: + final_result["analysis"][pos_key] = [] + + final_result["analysis"][pos_key].append(pos_entry_report) + + final_result["info"] = f"Analysis from Wiktionary (Primary Engine). Found {len(wiktionary_reports)} matching entry/entries." + return final_result + +# ============================================================================ +# 7. CONSOLIDATED ANALYZER LOGIC +# ============================================================================ + +# --- 7a. Comprehensive (Contextual) Analyzer --- + +def comprehensive_german_analysis(text: str, top_n_value: Optional[float] = 0) -> Dict[str, Any]: + """ + (CONTEXTUAL) Combines NLP tools for a deep analysis of German text. + + ** V19 UPDATE: ** Reads the new list-based, multi-engine output + from `analyze_word_encyclopedia` and combines all senses for ranking. + """ + + try: + if not text or not text.strip(): + return {"info": "Please enter text to analyze."} + top_n = int(top_n_value) if top_n_value is not None else 0 + + print(f"\n[Comprehensive Analysis] Starting analysis for: \"{text}\" (top_n={top_n})") + results: Dict[str, Any] = {"input_text": text} + nlp_de = None + context_doc = None + + # --- 1. LanguageTool Grammar Check --- + print("[Comprehensive Analysis] Running LanguageTool...") + if LT_AVAILABLE: + try: + results["grammar_check"] = lt_check_grammar(text) + except Exception as e: + results["grammar_check"] = {"error": f"LanguageTool failed: {e}"} + else: + results["grammar_check"] = {"error": "LanguageTool not available."} + + # --- 2. spaCy Morpho-Syntactic Backbone --- + print("[Comprehensive Analysis] Running spaCy...") + spacy_json_output = [] + try: + _, spacy_json, _, _, _ = spacy_get_analysis("en", "de", text) + if isinstance(spacy_json, list): + spacy_json_output = spacy_json + results["spacy_analysis"] = spacy_json_output + nlp_de = SPACY_MODELS.get("de") + if nlp_de: + context_doc = nlp_de(text) + if not context_doc.has_vector or context_doc.vector_norm == 0: + print("[Comprehensive Analysis] WARNING: Context sentence has no vector.") + context_doc = None + else: + results["spacy_analysis"] = spacy_json + except Exception as e: + results["spacy_analysis"] = {"error": f"spaCy analysis failed: {e}"} + + # --- 2b. Heuristic SVA check --- + try: + if isinstance(results.get("grammar_check"), list) and any(d.get("status") == "perfect" for d in results["grammar_check"]): + subj_num = None + verb_num = None + verb_token = None + subj_token = None + for tok in spacy_json_output: + if tok.get("dependency") in {"sb", "nsubj"}: + m = tok.get("morphology","") + if "Number=Sing" in m: + subj_num = "Sing" + subj_token = tok + spacy_pos_up = (tok.get("pos") or "").upper() + if (spacy_pos_up in {"VERB", "AUX"}) and ("VerbForm=Fin" in tok.get("morphology","")): + verb_token = tok + m = tok.get("morphology","") + if "Number=Plur" in m: + verb_num = "Plur" + if subj_num == "Sing" and verb_num == "Plur": + corrected_sentence_sg = None + corrected_sentence_pl = None + replacements = [] + v_lemma = verb_token.get("lemma") if verb_token else None + v_word = verb_token.get("word") if verb_token else None + v_3sg = _conjugate_to_person_number(v_lemma, "3", "sg") if v_lemma else None + if v_3sg and v_word: + corrected_sentence_sg = text.replace(v_word, v_3sg, 1) + replacements.append(corrected_sentence_sg) + subj_word = subj_token.get("word") if subj_token else None + subj_pl = None + if subj_word and PATTERN_DE_AVAILABLE: + try: subj_pl = pluralize(subj_word) + except Exception: subj_pl = None + if subj_word and subj_pl and subj_pl != subj_word: + corrected_sentence_pl = text.replace(subj_word, subj_pl, 1) + replacements.append(corrected_sentence_pl) + sva = { + "message": "Möglicher Kongruenzfehler: Singular-Subjekt mit pluralischer Verbform.", + "rule_id": "HEURISTIC_SUBJ_VERB_AGREEMENT", + "category": "Grammar", + "incorrect_text": f"{verb_token.get('word')}" if verb_token else "", + "replacements": replacements, "offset": None, "length": None, + "context": None, "short_message": "Subjekt–Verb-Kongruenz" + } + results["grammar_check"] = [sva] + except Exception as e: + print(f"SVA Heuristic failed: {e}") + pass + + # --- 3. Lemma-by-Lemma Deep Dive (V19 LOGIC) --- + print("[Comprehensive Analysis] Running Lemma Deep Dive...") + FUNCTION_POS = {"DET","ADP","AUX","PUNCT","SCONJ","CCONJ","PART","PRON","NUM","SYM","X", "SPACE"} + lemma_deep_dive: Dict[str, Any] = {} + processed_lemmas: Set[str] = set() + + if not spacy_json_output: + print("[Comprehensive Analysis] No spaCy tokens to analyze. Skipping deep dive.") + else: + for token in spacy_json_output: + lemma = token.get("lemma") + pos = (token.get("pos") or "").upper() + + if not lemma or lemma == "--" or pos in FUNCTION_POS or lemma in processed_lemmas: + continue + processed_lemmas.add(lemma) + print(f"[Deep Dive] Analyzing lemma: '{lemma}' (from token '{token.get('word')}')") + + # --- 3a. Get Validated Grammatical & Semantic Analysis --- + # We call our new, multi-engine dispatcher. + lemma_report: Dict[str, Any] = {} + inflection_analysis = {} + semantic_analysis = {} + + try: + # We pass top_n=0 to get ALL semantic possibilities for ranking + encyclopedia_data = analyze_word_encyclopedia(lemma, 0) + + # The "analysis" key contains {"noun": [ ... ], "verb": [ ... ], ...} + word_analysis = encyclopedia_data.get("analysis", {}) + + # *** THIS IS THE KEY CHANGE *** + # Iterate over the POS keys and the *list* of entries for each + for pos_key, entry_list in word_analysis.items(): + if not entry_list: + continue + + # For context, we only rank the *first* (most likely) entry + # provided by the encyclopedia for that POS. + data = entry_list[0] + + # Store all inflection blocks + inflection_analysis[f"{pos_key}_wiktionary"] = data.get("inflections_wiktionary") + inflection_analysis[f"{pos_key}_pattern"] = data.get("inflections_pattern") + + # --- Combine ALL senses (Wiktionary, OdeNet) for ranking --- + all_senses_for_pos = [] + semantics_block = data.get("semantics_combined", {}) + + # Add Wiktionary senses + wikt_senses = semantics_block.get("wiktionary_senses", []) + for s in wikt_senses: + s["source"] = "wiktionary" + all_senses_for_pos.append(s) + + # Add OdeNet senses + odenet_senses = semantics_block.get("odenet_senses", []) + for s in odenet_senses: + s["source"] = "odenet" + all_senses_for_pos.append(s) + + semantic_analysis[f"{pos_key}_senses"] = all_senses_for_pos + + # Add ConceptNet relations (store separately, as they are not "senses") + if "conceptnet_relations" not in semantic_analysis: + semantic_analysis["conceptnet_relations"] = [] + semantic_analysis["conceptnet_relations"].extend( + semantics_block.get("conceptnet_relations", []) + ) + + lemma_report["inflection_analysis"] = inflection_analysis + + except Exception as e: + lemma_report["inflection_analysis"] = {"error": f"V19 Analyzer failed: {e}", "traceback": traceback.format_exc()} + + + # --- 3b. Contextual Re-ranking (Unchanged) --- + # re-rank the semantic data we gathered in step 3a. + + # OdeNet Senses (now combined with Wiktionary senses) + for key in semantic_analysis: + if key.endswith("_senses") and nlp_de: + ranked_senses = [] + for sense in semantic_analysis[key]: + # ... (your existing re-ranking code) ... + if "error" in sense: continue + definition = sense.get("definition", "") + relevance = 0.0 + if definition and context_doc: + try: + def_doc = nlp_de(definition) + if def_doc.has_vector and def_doc.vector_norm > 0: + relevance = context_doc.similarity(def_doc) + except Exception: + relevance = 0.0 + sense["relevance_score"] = float(relevance) + ranked_senses.append(sense) + + ranked_senses.sort(key=lambda x: x.get('relevance_score', 0.0), reverse=True) + if top_n > 0: + ranked_senses = ranked_senses[:top_n] + semantic_analysis[key] = ranked_senses + + # ConceptNet Relations + if "conceptnet_relations" in semantic_analysis and nlp_de: + ranked_relations = [] + # ... (your existing re-ranking code) ... + for rel in semantic_analysis["conceptnet_relations"]: + if "error" in rel: continue + text_to_score = rel.get('surface') or rel.get('other_node', '') + relevance = 0.0 + if text_to_score and context_doc: + try: + rel_doc = nlp_de(text_to_score) + if rel_doc.has_vector and rel_doc.vector_norm > 0: + relevance = context_doc.similarity(rel_doc) + except Exception: + relevance = 0.0 + rel["relevance_score"] = float(relevance) + ranked_relations.append(rel) + + ranked_relations.sort(key=lambda x: x.get('relevance_score', 0.0), reverse=True) + if top_n > 0: + ranked_relations = ranked_relations[:top_n] + semantic_analysis["conceptnet_relations"] = ranked_relations + + lemma_report["semantic_analysis"] = semantic_analysis + lemma_deep_dive[lemma] = lemma_report + + results["lemma_deep_dive"] = lemma_deep_dive + print("[Comprehensive Analysis] Analysis complete.") + return results + + except Exception as e: + print(f"[Comprehensive Analysis] FATAL ERROR: {e}") + traceback.print_exc() + return { + "error": f"Analysis failed: {str(e)}", + "traceback": traceback.format_exc(), + "input_text": text + } + +# --- 7b. NEW: Word Encyclopedia (Non-Contextual) Analyzer --- +def _analyze_word_with_hanta(word: str, top_n_value: int) -> Dict[str, Any]: + """ + (PUBLIC DISPATCHER) Analyzes a single word for all possible forms. + (FALLBACK ENGINE 1) Analyzes a single word using HanTa + OdeNet + Pattern. + This function intelligently selects the best available engine: + 1. PRIMARY: Attempts to use the HanTa-led engine (V17) for maximum accuracy. + 2. FALLBACK: If HanTa is not available, it uses the spaCy-IWNLP-led + engine (V16 logic from 'analyze_word_comprehensively') as a robust fallback. + """ + if not word or not word.strip(): + return {"info": "Please enter a word."} + + top_n = int(top_n_value) if top_n_value is not None else 0 + + # --- PRIMARY ENGINE: HanTa-led (V17) --- + if HANTA_AVAILABLE: + print(f"\n[Word Encyclopedia] Starting V18 (HanTa) analysis for: \"{word}\"") + final_result: Dict[str, Any] = { + "input_word": word, + "analysis": {} + } + + try: + hanta_tagger = hanta_get_tagger() + if not hanta_tagger: + raise Exception("HanTa Tagger failed to initialize.") # Will be caught and trigger fallback + + # --- 1. Get All Grammatical Candidates (HanTa) --- + hanta_tags = _hanta_get_candidates(word, hanta_tagger) + if not hanta_tags: + return {"info": f"No grammatical analysis found for '{word}'."} + + # --- 2. Map Tags to POS Groups (with Adverb Heuristic) --- + pos_groups_map = _hanta_map_tags_to_pos(hanta_tags) + log(f"Found {len(pos_groups_map)} possible POS group(s): {list(pos_groups_map.keys())}") + + # --- 3. Validate and Build Report for each POS Group --- + for pos_group, specific_tags in pos_groups_map.items(): + print(f"--- Analyzing as: {pos_group.upper()} ---") + + # --- 3a. Get Lemma (HanTa) --- + lemma = _hanta_get_lemma_for_pos(word, pos_group, hanta_tagger) + log(f"Lemma for {pos_group} is: '{lemma}'") + + # --- 3b. Get Semantics & VALIDATE (OdeNet) --- + # We call the NEW, CORRECTED helper from Section 6c + all_odenet_senses = _get_odenet_senses_by_pos(lemma) + pos_odenet_senses = all_odenet_senses.get(pos_group, []) + + # We only reject if OdeNet is working and returns no senses. + # If OdeNet is down, the list will contain a placeholder and we proceed. + if not pos_odenet_senses: + log(f"✗ REJECTED {pos_group}: OdeNet is available but has no '{pos_group}' senses for lemma '{lemma}'.") + continue + + # Filter out the placeholder if OdeNet is down + if pos_odenet_senses and "info" in pos_odenet_senses[0]: + log(f"✓ VERIFIED {pos_group}: OdeNet is unavailable, proceeding without validation.") + pos_odenet_senses = [] # Clear the placeholder + else: + log(f"✓ VERIFIED {pos_group}: OdeNet found {len(pos_odenet_senses)} sense(s).") + + # --- 3c. Get Inflections (Pattern) --- + inflection_report = {} + if not PATTERN_DE_AVAILABLE: + inflection_report = {"info": "pattern.de library not available. No inflections generated."} + else: + try: + if pos_group == "noun": + inflection_report = pattern_analyze_as_noun(lemma) + elif pos_group == "verb": + inflection_report = pattern_analyze_as_verb(lemma) + elif pos_group == "adjective": + inflection_report = pattern_analyze_as_adjective(lemma) + elif pos_group == "adverb": + inflection_report = {"base_form": lemma, "info": "Adverbs are non-inflecting."} + + if not pattern_is_good_analysis(inflection_report, pos_group) and pos_group != "adverb": + log(f"⚠️ Warning: pattern.de generated a poor inflection table for {lemma} ({pos_group}).") + inflection_report["warning"] = "Inflection table from pattern.de seems incomplete or invalid." + except Exception as e: + log(f"pattern.de inflection failed for {lemma} ({pos_group}): {e}") + inflection_report = {"error": f"pattern.de failed: {e}", "traceback": traceback.format_exc()} + + # --- 3d. Build Final Report Block --- + final_result["analysis"][pos_group] = { + "hanta_analysis": { + "detected_tags": sorted(list(specific_tags)), + "lemma": lemma, + "morphemes": [ + hanta_tagger.analyze(word.capitalize() if pos_group == 'noun' else word.lower(), taglevel=3) + ] + }, + "inflections": inflection_report, + "semantics": _build_semantics(lemma, pos_odenet_senses, top_n) + } + + if not final_result["analysis"]: + return { + "input_word": word, + "info": f"No valid, semantically-verified analysis found for '{word}'. It may be a typo or a function word." + } + + final_result["info"] = "Analysis performed by HanTa-led fallback engine." + return final_result + + except Exception as e: + print(f"[Word Encyclopedia] HanTa FALLBACK Engine FAILED: {e}") + traceback.print_exc() + return {} # Signal failure + + # --- FALLBACK ENGINE: spaCy-IWNLP-led (V16) --- + if IWNLP_AVAILABLE: + try: + log("--- Dispatcher: HanTa not found or failed. Attempting IWNLP Fallback Engine ---") + # We call your existing V16 function, which we just made robust in Step 2. + result = _analyze_word_with_iwnlp(word, top_n_value) + result["info"] = result.get("info", "") + " (Analysis performed by IWNLP-based fallback engine)" + return result + except Exception as e: + log(f"--- IWNLP Fallback Engine FAILED: {e} ---") + traceback.print_exc() + return {"error": f"IWNLP Fallback Engine failed: {e}"} + + # --- No engines available --- + log("--- Dispatcher: No valid analysis engines found. ---") + return { + "input_word": word, + "error": "Fatal Error: Neither HanTa nor spacy-iwnlp are available. " + "Please install at least one to use the Word Encyclopedia." + } + +def _analyze_word_with_iwnlp(word: str, top_n_value: int) -> Dict[str, Any]: + """ + (FALLBACK ENGINE 2) Analyzes a single word using IWNLP + OdeNet + Pattern. + This was the V16 engine. + + V19 UPDATE: This function *must* be modified to match the new + output format: `analysis: { "pos_key": [ ...list... ] }` + + (NON-CONTEXTUAL) Analyzes a single word for ALL its possible + grammatical and semantic forms. + + ** Strategy: IWNLP Lemmas + spaCy POS + Pattern.de Validators** + 1. Get spaCy's primary POS (e.g., "ADV" for "heute"). + 2. Get IWNLP's list of *lemmas* (e.g., "Lauf" -> ['Lauf', 'laufen']). + 3. Create a unique set of all possible lemmas from spaCy, IWNLP, and the word itself. + 4. Iterate this lemma set: + - Try to analyze each lemma as NOUN (capitalized). + - Try to analyze each lemma as VERB. + - Try to analyze each lemma as ADJECTIVE. + - Validate each with pattern_is_good_analysis AND by checking for OdeNet senses. + 5. After checking inflections, check if spaCy's POS was 'ADV'. + If so, and OdeNet has 'r' senses, add an 'adverb' report. + 6. This finds all inflecting forms ("Lauf", "gut") AND non-inflecting + forms ("heute") while rejecting artifacts ("klauf", "heutst"). + """ + if not word or not word.strip(): + return {"info": "Please enter a word."} + + if not IWNLP_AVAILABLE: + return {"error": "`spacy-iwnlp` library not available. This tab requires it."} + + top_n = int(top_n_value) if top_n_value is not None else 0 + + print(f"\n[Word Encyclopedia] Starting IWNP-fallback analysis for: \"{word}\" (top_n={top_n})") + + final_result: Dict[str, Any] = { + "input_word": word, + "analysis": {} + } + + # --- Helper: Get OdeNet senses --- + def _get_odenet_senses_by_pos(w): + """ + (Internal helper for IWNLP fallback) + + *** V18 FIX: OdeNet uses 'a' for BOTH Adjective and Adverb. *** + """ + senses_by_pos: Dict[str, List[Dict]] = { + "noun": [], "verb": [], "adjective": [], "adverb": [] + } + if not WN_AVAILABLE: + log(f"[IWNLP Fallback] OdeNet check skipped for '{w}': WN_AVAILABLE=False") + # Fail-open strategy + return {"noun": [{"info": "OdeNet unavailable"}], + "verb": [{"info": "OdeNet unavailable"}], + "adjective": [{"info": "OdeNet unavailable"}], + "adverb": [{"info": "OdeNet unavailable"}]} + + try: + all_senses = odenet_get_thesaurus_info(w).get("senses", []) + for sense in all_senses: + if "error" in sense: continue + pos_tag = sense.get("pos") + + if pos_tag == 'n': + senses_by_pos["noun"].append(sense) + elif pos_tag == 'v': + senses_by_pos["verb"].append(sense) + + # --- THIS IS THE CRITICAL FIX --- + elif pos_tag == 'a': + log(f"[IWNLP Fallback] Found OdeNet 'a' tag (Adj/Adv) for sense: {sense.get('definition', '...')[:30]}") + senses_by_pos["adjective"].append(sense) + senses_by_pos["adverb"].append(sense) + # --- END OF FIX --- + + except Exception as e: + print(f"[Word Encyclopedia] OdeNet check failed: {e}") + return senses_by_pos + + # --- Helper: Build semantics block --- + def _build_semantics(lemma, odenet_senses, top_n): + conceptnet_relations = [] + if REQUESTS_AVAILABLE: + try: + conceptnet_result = conceptnet_get_relations(lemma, language='de') + conceptnet_relations = conceptnet_result.get("relations", []) + except Exception as e: + conceptnet_relations = [{"error": str(e)}] + + if top_n > 0: + odenet_senses = odenet_senses[:top_n] + conceptnet_relations.sort(key=lambda x: x.get('weight', 0.0), reverse=True) + conceptnet_relations = conceptnet_relations[:top_n] + + return { + "lemma": lemma, + "odenet_senses": odenet_senses, + "conceptnet_relations": conceptnet_relations + } + + # --- 1. GET ALL LEMMA CANDIDATES & SPACY POS --- + try: + iwnlp = iwnlp_get_pipeline() + if not iwnlp: + return {"error": "IWNLP pipeline failed to initialize."} + + doc = iwnlp(word) + token = doc[0] + + # Get spaCy's best POS guess + spacy_pos = token.pos_ # e.g., "NOUN" for "Lauf", "ADV" for "heute" + spacy_lemma = token.lemma_ + + # *** THIS IS THE FIX *** + # Get IWNLP's lemma list (it only registers 'iwnlp_lemmas') + iwnlp_lemmas_list = token._.iwnlp_lemmas or [] + + # Combine all possible lemmas + all_lemmas = set(iwnlp_lemmas_list) + all_lemmas.add(spacy_lemma) + all_lemmas.add(word) # Add the word itself + + print(f"[Word Encyclopedia] spaCy POS: {spacy_pos}") + print(f"[Word Encyclopedia] All lemmas to check: {all_lemmas}") + + except Exception as e: + traceback.print_exc() + return {"error": f"IWNLP analysis failed: {e}"} + + # --- 2. CHECK INFLECTING POSSIBILITIES FOR EACH LEMMA --- + + # This dict will hold the *best* analysis for each POS + # e.g., "gut" -> { 'adjective': {...}, 'noun': {...} } + + valid_analyses: Dict[str, Dict[str, Any]] = {} + + for lemma in all_lemmas: + if not lemma: continue + + odenet_senses_by_pos = _get_odenet_senses_by_pos(lemma) + + # --- Check NOUN --- + if 'noun' not in valid_analyses: + noun_inflections = {} + is_good_noun = False + + if not PATTERN_DE_AVAILABLE: + noun_inflections = {"info": "pattern.de not available."} + is_good_noun = True + else: + try: + noun_inflections = pattern_analyze_as_noun(lemma.capitalize()) + if pattern_is_good_analysis(noun_inflections, "noun"): + is_good_noun = True + except Exception as e: + noun_inflections = {"error": f"pattern.de failed: {e}"} + + if is_good_noun: + odenet_senses = odenet_senses_by_pos.get('noun', []) + if not odenet_senses and lemma.lower() == word.lower(): + odenet_senses = _get_odenet_senses_by_pos(lemma.capitalize()).get('noun', []) + + # We accept if (senses exist) OR (OdeNet is down and we can't check) + if odenet_senses: + # We must filter out the "unavailable" placeholder + if "info" not in odenet_senses[0]: + log(f" ✓ [IWNLP Fallback] Valid NOUN found: {lemma}") + valid_analyses['noun'] = { + "lemma": noun_inflections.get("base_form", lemma), + "inflections": noun_inflections, + "odenet_senses": odenet_senses + } + elif not WN_AVAILABLE: # OdeNet is down + log(f" ✓ [IWNLP Fallback] Accepting NOUN (OdeNet unavailable): {lemma}") + valid_analyses['noun'] = { + "lemma": noun_inflections.get("base_form", lemma), + "inflections": noun_inflections, + "odenet_senses": [] # No senses to show + } + + + # --- Check VERB --- + if 'verb' not in valid_analyses: + verb_inflections = {} + is_good_verb = False + + if not PATTERN_DE_AVAILABLE: + verb_inflections = {"info": "pattern.de not available."} + is_good_verb = True + else: + try: + verb_inflections = pattern_analyze_as_verb(lemma) + if pattern_is_good_analysis(verb_inflections, "verb"): + is_good_verb = True + except Exception as e: + verb_inflections = {"error": f"pattern.de failed: {e}"} + + if is_good_verb: + odenet_senses = odenet_senses_by_pos.get('verb', []) + + if odenet_senses: + if "info" not in odenet_senses[0]: + log(f" ✓ [IWNLP Fallback] Valid VERB found: {lemma}") + valid_analyses['verb'] = { + "lemma": verb_inflections.get("infinitive", lemma), + "inflections": verb_inflections, + "odenet_senses": odenet_senses + } + elif not WN_AVAILABLE: + log(f" ✓ [IWNLP Fallback] Accepting VERB (OdeNet unavailable): {lemma}") + valid_analyses['verb'] = { + "lemma": verb_inflections.get("infinitive", lemma), + "inflections": verb_inflections, + "odenet_senses": [] + } + + # --- Check ADJECTIVE --- + if 'adjective' not in valid_analyses: + adj_inflections = {} + is_good_adj = False + + if not PATTERN_DE_AVAILABLE: + adj_inflections = {"info": "pattern.de not available."} + is_good_adj = True + else: + try: + adj_inflections = pattern_analyze_as_adjective(lemma) + if pattern_is_good_analysis(adj_inflections, "adjective"): + is_good_adj = True + except Exception as e: + adj_inflections = {"error": f"pattern.de failed: {e}"} + + if is_good_adj: + odenet_senses = odenet_senses_by_pos.get('adjective', []) + + if odenet_senses: + if "info" not in odenet_senses[0]: + log(f" ✓ [IWNLP Fallback] Valid ADJECTIVE found: {lemma}") + valid_analyses['adjective'] = { + "lemma": adj_inflections.get("predicative", lemma), + "inflections": adj_inflections, + "odenet_senses": odenet_senses + } + elif not WN_AVAILABLE: + log(f" ✓ [IWNLP Fallback] Accepting ADJECTIVE (OdeNet unavailable): {lemma}") + valid_analyses['adjective'] = { + "lemma": adj_inflections.get("predicative", lemma), + "inflections": adj_inflections, + "odenet_senses": [] + } + + # --- 3. CHECK NON-INFLECTING POS (ADVERB) --- + if spacy_pos == "ADV": + odenet_senses = _get_odenet_senses_by_pos(word).get('adverb', []) + + if odenet_senses: + if "info" not in odenet_senses[0]: + log(f" ✓ [IWNLP Fallback] Valid ADVERB found: {word}") + valid_analyses['adverb'] = { + "lemma": word, + "inflections": {"base_form": word}, + "odenet_senses": odenet_senses + } + elif not WN_AVAILABLE: + log(f" ✓ [IWNLP Fallback] Accepting ADVERB (OdeNet unavailable): {word}") + valid_analyses['adverb'] = { + "lemma": word, + "inflections": {"base_form": word}, + "odenet_senses": [] + } + + # --- 4. CHECK OTHER FUNCTION WORDS (e.g. "mein" -> DET) --- + # We add this if spaCy found a function word AND we haven't found any + # content-word analyses (which are more informative). + FUNCTION_POS = {"DET", "PRON", "ADP", "AUX", "CCONJ", "SCONJ", "PART", "PUNCT", "SYM"} + if spacy_pos in FUNCTION_POS and not valid_analyses: + pos_key = spacy_pos.lower() + print(f" ✓ Valid Function Word found: {word} (POS: {spacy_pos})") + valid_analyses[pos_key] = { + "lemma": spacy_lemma, + "inflections": {"base_form": spacy_lemma}, + "odenet_senses": [], # Function words aren't in OdeNet + "spacy_analysis": { # Add the spaCy info + "word": token.text, "lemma": token.lemma_, + "pos_UPOS": token.pos_, "pos_TAG": token.tag_, + "morphology": str(token.morph) + } + } + + # --- 5. BUILD FINAL REPORT --- + for pos_key, analysis_data in valid_analyses.items(): + pos_report = { + "inflections_pattern": analysis_data["inflections"], + "semantics_combined": _build_semantics( + analysis_data["lemma"], + analysis_data["odenet_senses"], + top_n + ) + } + # Add spaCy analysis if it was included + if "spacy_analysis" in analysis_data: + pos_report["spacy_analysis"] = analysis_data["spacy_analysis"] + + # Wrap it in a list + final_result["analysis"][pos_key] = [pos_report] # <--- THE CHANGE + + if not final_result["analysis"]: + return {} # No results + + final_result["info"] = "Analysis performed by IWNLP-based fallback engine." + return final_result + + +# --- 7b. NEW: Word Encyclopedia (Non-Contextual) Analyzer --- + +# --- THIS IS THE NEW PUBLIC DISPATCHER FUNCTION --- +def analyze_word_encyclopedia(word: str, top_n_value: Optional[float] = 0) -> Dict[str, Any]: + """ + (PUBLIC DISPATCHER V19) Analyzes a single word for all possible forms. + + This function intelligently selects the best available engine in order: + 1. PRIMARY: Wiktionary DB (Accurate, pre-compiled data) + 2. FALLBACK 1: HanTa-led engine (Good heuristics) + 3. FALLBACK 2: IWNLP-led engine (Different heuristics) + """ + if not word or not word.strip(): + return {"info": "Please enter a word."} + + word = word.strip() + top_n = int(top_n_value) if top_n_value is not None else 0 + + # --- 1. Try PRIMARY Engine: Wiktionary --- + if WIKTIONARY_AVAILABLE: + wikt_result = _analyze_word_with_wiktionary(word, top_n) + if wikt_result and wikt_result.get("analysis"): + log("V19 Dispatcher: Returning Wiktionary result.") + return wikt_result + elif WIKTIONARY_AVAILABLE: + log("V19 Dispatcher: Wiktionary is available but found no results.") + else: + log("V19 Dispatcher: Wiktionary failed to initialize, falling back.") + + # --- 2. Try FALLBACK 1: HanTa --- + if HANTA_AVAILABLE: + hanta_result = _analyze_word_with_hanta(word, top_n) + if hanta_result and hanta_result.get("analysis"): + log("V19 Dispatcher: Wiktionary failed, returning HanTa result.") + return hanta_result + else: + log("V19 Dispatcher: HanTa fallback found no results.") + + # --- 3. Try FALLBACK 2: IWNLP --- + if IWNLP_AVAILABLE: + iwnlp_result = _analyze_word_with_iwnlp(word, top_n) + if iwnlp_result and iwnlp_result.get("analysis"): + log("V19 Dispatcher: HanTa failed, returning IWNLP result.") + return iwnlp_result + else: + log("V19 Dispatcher: IWNLP fallback found no results.") + + # --- No engines available or no results --- + log("--- Dispatcher: No valid analysis engines found or no results. ---") + return { + "input_word": word, + "error": "No analysis found for this word.", + "info": "The word was not found in Wiktionary, and no fallback " + "engines (HanTa, IWNLP) could produce a valid analysis." + } + + +# ============================================================================ +# 8. GRADIO UI CREATION +# ============================================================================ +def create_spacy_tab(): + """Creates the UI for the spaCy tab.""" + config = SPACY_UI_TEXT["en"] + model_choices = list(SPACY_MODEL_INFO.keys()) + with gr.Row(): + ui_lang_radio = gr.Radio(["DE", "EN", "ES"], label=config["ui_lang_label"], value="EN") + model_lang_radio = gr.Radio( + choices=[(SPACY_MODEL_INFO[k][0], k) for k in model_choices], + label=config["model_lang_label"], + value=model_choices[0] + ) + markdown_title = gr.Markdown(config["title"]) + markdown_subtitle = gr.Markdown(config["subtitle"]) + text_input = gr.Textbox(label=config["input_label"], placeholder=config["input_placeholder"], lines=5) + analyze_button = gr.Button(config["button_text"], variant="primary") + with gr.Tabs(): + with gr.Tab(config["tab_graphic"]) as tab_graphic: + html_dep_out = gr.HTML(label=config["html_label"]) + with gr.Tab(config["tab_ner"]) as tab_ner: + html_ner_out = gr.HTML(label=config["ner_label"]) + with gr.Tab(config["tab_table"]) as tab_table: + df_out = gr.DataFrame(label=config["table_label"], headers=config["table_headers"], interactive=False) + with gr.Tab(config["tab_json"]) as tab_json: + json_out = gr.JSON(label=config["json_label"]) + analyze_button.click(fn=spacy_get_analysis, + inputs=[ui_lang_radio, model_lang_radio, text_input], + outputs=[df_out, json_out, html_dep_out, html_ner_out, analyze_button], + api_name="get_morphology") + ui_lang_radio.change(fn=spacy_update_ui, + inputs=ui_lang_radio, + outputs=[markdown_title, markdown_subtitle, ui_lang_radio, model_lang_radio, + text_input, analyze_button, tab_graphic, tab_table, tab_json, tab_ner, + html_dep_out, df_out, json_out, html_ner_out]) + +def create_languagetool_tab(): + """Creates the UI for the LanguageTool tab.""" + gr.Markdown("# 🇩🇪 German Grammar & Spelling Checker") + gr.Markdown("Powered by `language-tool-python`. This service checks German text for grammatical errors and spelling mistakes.") + with gr.Column(): + text_input = gr.Textbox( + label="German Text to Check", + placeholder="e.g., Ich sehe dem Mann. Das ist ein Huas.", + lines=5 + ) + check_button = gr.Button("Check Text", variant="primary") + output = gr.JSON(label="Detected Errors (JSON)") + check_button.click( + fn=lt_check_grammar, + inputs=[text_input], + outputs=[output], + api_name="check_grammar" + ) + gr.Examples( + [["Das ist ein Huas."], ["Ich sehe dem Mann."], + ["Die Katze schlafen auf dem Tisch."], ["Er fragt ob er gehen kann."]], + inputs=[text_input], outputs=[output], fn=lt_check_grammar + ) + +def create_odenet_tab(): + """Creates the UI for the OdeNet tab.""" + gr.Markdown("# 🇩🇪 German Thesaurus (WordNet) Service") + gr.Markdown("Powered by `wn` and `OdeNet (odenet:1.4)`. Finds synonyms, antonyms, and other semantic relations for German words.") + with gr.Column(): + word_input = gr.Textbox( + label="German Word", + placeholder="e.g., Haus, schnell, gut, Katze" + ) + check_button = gr.Button("Find Relations", variant="primary") + output = gr.JSON(label="Thesaurus Information (JSON)") + check_button.click( + fn=odenet_get_thesaurus_info, + inputs=[word_input], + outputs=[output], + api_name="get_thesaurus" + ) + gr.Examples( + [["Hund"], ["gut"], ["laufen"], ["Haus"], ["schnell"]], + inputs=[word_input], outputs=[output], fn=odenet_get_thesaurus_info + ) + +def create_pattern_tab(): + """Creates the UI for the Pattern.de tab.""" + gr.Markdown("# 🇩🇪 Complete German Word Inflection System") + gr.Markdown("Powered by `PatternLite`. Generates complete inflection tables (declension, conjugation) for German words. Robustly handles ambiguity (e.g., 'Lauf' vs 'lauf').") + with gr.Column(): + word_input = gr.Textbox( + label="German Word", + placeholder="z.B. Haus, gehen, schön, besser, lief, Lauf, See" + ) + generate_button = gr.Button("Generate All Forms", variant="primary") + output = gr.JSON(label="Complete Inflection Analysis") + generate_button.click( + fn=pattern_get_all_inflections, + inputs=[word_input], + outputs=[output], + api_name="get_all_inflections" + ) + gr.Examples( + [["Haus"], ["gehen"], ["schön"], ["besser"], ["ging"], ["schnellem"], ["Katze"], ["Lauf"], ["See"]], + inputs=[word_input], outputs=[output], fn=pattern_get_all_inflections + ) + +def create_conceptnet_tab(): + """--- NEW: Creates the UI for the ConceptNet tab ---""" + gr.Markdown("# 🌍 ConceptNet Knowledge Graph (Direct API)") + gr.Markdown("Powered by `api.conceptnet.io`. Fetches semantic relations for a word in any language.") + with gr.Row(): + word_input = gr.Textbox( + label="Word or Phrase", + placeholder="e.g., Baum, tree, Katze" + ) + lang_input = gr.Textbox( + label="Language Code", + placeholder="de", + value="de" + ) + check_button = gr.Button("Find Relations", variant="primary") + output = gr.JSON(label="ConceptNet Relations (JSON)") + + check_button.click( + fn=conceptnet_get_relations, + inputs=[word_input, lang_input], + outputs=[output], + api_name="get_conceptnet" + ) + gr.Examples( + [["Baum", "de"], ["tree", "en"], ["Katze", "de"], ["gato", "es"]], + inputs=[word_input, lang_input], outputs=[output], fn=conceptnet_get_relations + ) + +def create_combined_tab(): + """Creates the UI for the CONTEXTUAL Comprehensive Analyzer tab.""" + gr.Markdown("# 🚀 Comprehensive Analyzer (Contextual)") + gr.Markdown("This tool provides a deep, **lemma-based** analysis *in context*. It integrates all tools and uses the **full sentence** to rank semantic senses by relevance.") + with gr.Column(): + text_input = gr.Textbox( + label="German Text", + placeholder="e.g., Die schnelle Katze springt über den faulen Hund.", + lines=5 + ) + top_n_number = gr.Number( + label="Limit Semantic Senses per POS (0 for all)", + value=0, + step=1, + minimum=0, + interactive=True + ) + analyze_button = gr.Button("Run Comprehensive Analysis", variant="primary") + + # *** ADD STATUS OUTPUT *** + status_output = gr.Markdown(value="", visible=True) + output = gr.JSON(label="Comprehensive Analysis (JSON)") + + # *** WRAPPER FUNCTION TO FORCE REFRESH *** + def run_analysis_with_status(text, top_n): + try: + status = "🔄 Analyzing..." + yield status, {} + + result = comprehensive_german_analysis(text, top_n) + + status = f"✅ Analysis complete! Found {len(result.get('lemma_deep_dive', {}))} lemmas." + yield status, result + + except Exception as e: + error_status = f"❌ Error: {str(e)}" + error_result = {"error": str(e), "traceback": traceback.format_exc()} + yield error_status, error_result + + analyze_button.click( + fn=run_analysis_with_status, + inputs=[text_input, top_n_number], + outputs=[status_output, output], + api_name="comprehensive_analysis" + ) + + gr.Examples( + [["Die Katze schlafen auf dem Tisch.", 3], + ["Das ist ein Huas.", 0], + ["Ich laufe schnell.", 3], + ["Der Gärtner pflanzt einen Baum.", 5], + ["Ich fahre an den See.", 3]], + inputs=[text_input, top_n_number], + outputs=[status_output, output], + fn=run_analysis_with_status + ) + +def create_word_encyclopedia_tab(): + """--- NEW: Creates the UI for the NON-CONTEXTUAL Word Analyzer tab ---""" + gr.Markdown("# 📖 Word Encyclopedia (Non-Contextual)") + gr.Markdown("This tool analyzes a **single word** for *all possible* grammatical and semantic forms. It's ideal for enriching word lists. It finds ambiguities (e.g., 'Lauf' as noun and verb) and groups all data by Part-of-Speech.") + with gr.Column(): + word_input = gr.Textbox( + label="Single German Word", + placeholder="e.g., Lauf, See, schnell" + ) + top_n_number = gr.Number( + label="Limit Semantic Senses per POS (0 for all)", + value=0, + step=1, + minimum=0, + interactive=True + ) + analyze_button = gr.Button("Analyze Word", variant="primary") + + output = gr.JSON(label="Word Encyclopedia Analysis (JSON)") + + analyze_button.click( + fn=analyze_word_encyclopedia, + inputs=[word_input, top_n_number], + outputs=[output], + api_name="analyze_word" + ) + + gr.Examples( + [["Lauf", 3], + ["See", 0], + ["schnell", 3], + ["Hund", 5]], + inputs=[word_input, top_n_number], + outputs=[output], + fn=analyze_word_encyclopedia + ) + +# --- Main UI Builder --- +def create_consolidated_interface(): + """Builds the final Gradio app with all tabs.""" + with gr.Blocks(title="Consolidated Linguistics Hub", theme=gr.themes.Soft()) as demo: + gr.Markdown("# 🏛️ Consolidated Linguistics Hub") + gr.Markdown("A suite of advanced tools for German linguistics, providing both contextual and non-contextual analysis.") + + with gr.Tabs(): + # --- NEW "Word Encyclopedia" TAB --- + with gr.Tab("📖 Word Encyclopedia (DE)"): + create_word_encyclopedia_tab() + + with gr.Tab("🚀 Comprehensive Analyzer (DE)"): + create_combined_tab() + + with gr.Tab("🔬 spaCy Analyzer (Multi-lingual)"): + create_spacy_tab() + + with gr.Tab("✅ Grammar Check (DE)"): + create_languagetool_tab() + + with gr.Tab("📚 Inflections (DE)"): + create_pattern_tab() + + with gr.Tab("📖 Thesaurus (DE)"): + create_odenet_tab() + + with gr.Tab("🌐 ConceptNet (Direct)"): + create_conceptnet_tab() + + return demo + +# ============================================================================ +# 9. MAIN EXECUTION BLOCK +# ============================================================================ + +if __name__ == "__main__": + print("\n" + "="*70) + print("CONSOLIDATED LINGUISTICS HUB (STARTING)") + print("="*70 + "\n") + + # --- 1. Initialize spaCy Models --- + print("--- Initializing spaCy Models ---") + spacy_initialize_models() + print("--- spaCy Done ---\n") + + # --- 2. Initialize OdeNet Worker --- + print("--- Initializing OdeNet Worker ---") + if WN_AVAILABLE: + try: + odenet_start_worker() + print("✓ OdeNet worker is starting/ready.") + except Exception as e: + print(f"✗ FAILED to start OdeNet worker: {e}") + print(" 'Thesaurus' and 'Comprehensive' tabs may fail.") + else: + print("INFO: OdeNet ('wn') library not available, skipping worker.") + print("--- OdeNet Done ---\n") + + # --- 3. Initialize Wiktionary --- + print("--- Initializing Wiktionary DB ---") + try: + if not wiktionary_download_db(): + print("✗ WARNING: Failed to download Wiktionary DB. Primary engine is disabled.") + else: + # Try to pre-warm the connection + _ = wiktionary_get_connection() + except Exception as e: + print(f"✗ FAILED to initialize Wiktionary: {e}") + print("--- Wiktionary Done ---\n") + + # --- 4. Initialize HanTa Tagger --- + print("--- Initializing HanTa Tagger ---") + if HANTA_AVAILABLE: + try: + hanta_get_tagger() # Call the function to load the model + except Exception as e: + print(f"✗ FAILED to start HanTa tagger: {e}") + print("  'Word Encyclopedia' tab will fail.") + else: + print("INFO: HanTa library not available, skipping tagger.") + print("--- HanTa Done ---\n") + + # --- 54. Check LanguageTool --- + print("--- Checking LanguageTool ---") + if not LT_AVAILABLE: + print("WARNING: language-tool-python not available. 'Grammar' tab will fail.") + else: + print("✓ LanguageTool library is available (will lazy-load on first use).") + print("--- LanguageTool Done ---\n") + + # --- 6. Check Pattern.de --- + print("--- Checking Pattern.de ---") + if not PATTERN_DE_AVAILABLE: + print("WARNING: pattern.de library not available. 'Inflections' tab will fail.") + else: + print("✓ Pattern.de library is available.") + print("--- Pattern.de Done ---\n") + + # --- 7. Check Requests (for ConceptNet) --- + print("--- Checking Requests (for ConceptNet) ---") + if not REQUESTS_AVAILABLE: + print("WARNING: requests library not available. 'ConceptNet' features will fail.") + else: + print("✓ Requests library is available.") + print("--- Requests Done ---\n") + + print("="*70) + print("All services initialized. Launching Gradio Hub...") + print("="*70 + "\n") + + # --- 8. Launch Gradio --- + demo = create_consolidated_interface() + demo.launch(server_name="0.0.0.0", server_port=7860, show_error=True) \ No newline at end of file