Spaces:
Running
Running
| # ============================================================================ | |
| # GERMAN LINGUISTICS HUB (CONSOLIDATED APP V22) | |
| # | |
| # This script combines multiple NLP tools into a single Gradio interface. | |
| # | |
| # ============================================================================ | |
| # TABS & FUNCTIONALITY: | |
| # ============================================================================ | |
| # | |
| # --- PRIMARY TABS --- | |
| # | |
| # 1. Word Encyclopedia (DE): | |
| # - NON-CONTEXTUAL analysis of single words. | |
| # - Multi-engine dispatcher with user selection and automatic fallback: | |
| # (Wiktionary -> DWDSmor -> HanTa -> IWNLP) | |
| # - Aggregates all grammatical (Wiktionary, Pattern) and semantic | |
| # (Wiktionary, OdeNet, ConceptNet) possibilities, grouped by Part-of-Speech. | |
| # - Validates and filters artifacts (e.g., "abgeschnitten", "lauf"). | |
| # | |
| # 2. Comprehensive Analyzer (DE): | |
| # - CONTEXTUAL analysis of full sentences. | |
| # - Uses the Word Encyclopedia's dispatcher for robust lemma analysis. | |
| # - Ranks all semantic senses (Wiktionary, OdeNet) by relevance to the sentence. | |
| # | |
| # --- STANDALONE TOOL TABS --- | |
| # | |
| # 3. spaCy Analyzer (Multi-lingual): | |
| # - Direct, raw spaCy output (NER, POS, dependencies) for multiple languages. | |
| # | |
| # 4. Grammar Check (DE): | |
| # - Direct LanguageTool output. | |
| # | |
| # --- RAW ENGINE TABS (for debugging & comparison) --- | |
| # | |
| # 5. Engine: Wiktionary (DE): | |
| # - Standalone access to the Wiktionary DB (Primary) engine. | |
| # | |
| # 6. Engine: DWDSmor (DE): | |
| # - Standalone access to the DWDSmor (Fallback 1) engine. | |
| # | |
| # 7. Engine: HanTa (DE): | |
| # - Standalone access to the HanTa (Fallback 2) engine. | |
| # | |
| # 8. Engine: IWNLP-spaCy (DE): | |
| # - Standalone access to the IWNLP-spaCy (Fallback 3) engine. | |
| # | |
| # --- RAW COMPONENT TABS (for debugging & comparison) --- | |
| # | |
| # 9. Component: Inflections (DE): | |
| # - Direct access to the `pattern.de` library. | |
| # | |
| # 10. Component: Thesaurus (DE): | |
| # - Direct access to the `OdeNet` library. | |
| # | |
| # 11. Component: ConceptNet (Direct): | |
| # - Direct access to the ConceptNet API. | |
| # | |
| # ============================================================================ | |
| # ============================================================================ | |
| # 1. CONSOLIDATED IMPORTS | |
| # ============================================================================ | |
| import gradio as gr | |
| import spacy | |
| from spacy import displacy | |
| import base64 | |
| import traceback | |
| import subprocess | |
| import sys | |
| import os | |
| from pathlib import Path | |
| import importlib | |
| import site | |
| import threading | |
| import queue | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| from typing import Dict, Any, List, Set, Optional, Tuple | |
| import requests | |
| import zipfile | |
| import re | |
| import sqlite3 | |
| from huggingface_hub import hf_hub_download | |
| # --- Requests and gradio Import (for ConceptNet) --- | |
| try: | |
| import requests | |
| from requests.exceptions import RequestException, HTTPError, ConnectionError, Timeout | |
| REQUESTS_AVAILABLE = True | |
| except ImportError: | |
| REQUESTS_AVAILABLE = False | |
| print("="*70) | |
| print("CRITICAL WARNING: `requests` library not found.") | |
| print("ConceptNet features will not function.") | |
| print("="*70) | |
| try: | |
| from gradio_client import Client | |
| GRADIO_CLIENT_AVAILABLE = True | |
| except ImportError: | |
| GRADIO_CLIENT_AVAILABLE = False | |
| print("="*70) | |
| print("CRITICAL WARNING: `gradio_client` library not found.") | |
| print("ConceptNet features will not function.") | |
| print("Install with: pip install gradio_client") | |
| print("="*70) | |
| # --- IWNLP (spaCy Extension) Import --- | |
| try: | |
| from spacy_iwnlp import spaCyIWNLP | |
| IWNLP_AVAILABLE = True | |
| print("β Successfully imported spacy-iwnlp") | |
| except ImportError: | |
| IWNLP_AVAILABLE = False | |
| spaCyIWNLP = object # Dummy definition for error case | |
| print("="*70) | |
| print("WARNING: `spacy-iwnlp` library not found.") | |
| print("The 'Word Encyclopedia' tab will be less accurate.") | |
| print("Install with: pip install spacy-iwnlp") | |
| print("="*70) | |
| # --- LanguageTool Import --- | |
| try: | |
| import language_tool_python | |
| LT_AVAILABLE = True | |
| print("β Successfully imported language_tool") | |
| except ImportError: | |
| LT_AVAILABLE = False | |
| print("="*70) | |
| print("CRITICAL WARNING: `language-tool-python` library not found.") | |
| print("The 'German Grammar Check' tab will not function.") | |
| print("="*70) | |
| # --- OdeNet (wn) Import --- | |
| try: | |
| import wn | |
| WN_AVAILABLE = True | |
| print("β Successfully imported wordnet for odenet") | |
| except ImportError: | |
| WN_AVAILABLE = False | |
| print("="*70) | |
| print("CRITICAL WARNING: `wn` library not found.") | |
| print("The 'German Thesaurus' tab will not function.") | |
| print("="*70) | |
| # --- Pattern.de Import --- | |
| try: | |
| from pattern.de import ( | |
| pluralize, singularize, conjugate, tenses, lemma, lexeme, | |
| attributive, predicative, | |
| article, gender, MALE, FEMALE, NEUTRAL, PLURAL, | |
| INFINITIVE, PRESENT, PAST, PARTICIPLE, | |
| FIRST, SECOND, THIRD, SINGULAR, PLURAL as PL, | |
| INDICATIVE, IMPERATIVE, SUBJUNCTIVE, | |
| NOMINATIVE, ACCUSATIVE, DATIVE, GENITIVE, | |
| SUBJECT, OBJECT, INDIRECT, PROPERTY, | |
| DEFINITE, INDEFINITE, | |
| comparative, superlative, | |
| NOUN, VERB, ADJECTIVE, | |
| parse, split | |
| ) | |
| PATTERN_DE_AVAILABLE = True | |
| print("β Successfully imported pattern.de") | |
| except ImportError as e: | |
| PATTERN_DE_AVAILABLE = False | |
| print("="*70) | |
| print(f"CRITICAL WARNING: `pattern.de` library not found: {e}") | |
| print("The 'German Inflections' tab will not function.") | |
| print("="*70) | |
| # --- HanTa Tagger Import --- | |
| try: | |
| from HanTa.HanoverTagger import HanoverTagger | |
| import HanTa.HanoverTagger | |
| # This sys.modules line is critical for pickle compatibility | |
| sys.modules['HanoverTagger'] = HanTa.HanoverTagger | |
| HANTA_AVAILABLE = True | |
| print("β Successfully imported HanTa") | |
| except ImportError: | |
| HANTA_AVAILABLE = False | |
| HanoverTagger = object # Dummy definition | |
| print("="*70) | |
| print("CRITICAL WARNING: `HanTa` library not found.") | |
| print("The 'Word Encyclopedia' tab will NOT function.") | |
| print("Install with: pip install HanTa") | |
| print("="*70) | |
| # --- DWDSmor Import --- | |
| DWDSMOR_AVAILABLE = False | |
| DwdsmorLemmatizerClass = object # Dummy definition | |
| try: | |
| import dwdsmor | |
| import dwdsmor.spacy # Test this import | |
| DWDSMOR_AVAILABLE = True | |
| print("β Successfully imported dwdsmor") | |
| except ImportError as e: | |
| DWDSMOR_AVAILABLE = False | |
| print("="*70) | |
| print(f"WARNING: `dwdsmor` or a dependency failed to import: {e}") | |
| print("The DWDSmor engine will not be available.") | |
| print("On macOS, run: brew install sfst") | |
| print("On Debian/Ubuntu, run: apt-get install sfst") | |
| print("Then, run: pip install dwdsmor") | |
| print("="*70) | |
| # ============================================================================ | |
| # 2. SHARED GLOBALS & CONFIG | |
| # ============================================================================ | |
| VERBOSE = True # Enable verbose debug output for Pattern.de | |
| def log(msg): | |
| """Print debug messages if verbose mode is on.""" | |
| if VERBOSE: | |
| print(f"[DEBUG] {msg}") | |
| # --- Wiktionary Cache & Lock --- | |
| WIKTIONARY_DB_PATH = "de_wiktionary_normalized.db" | |
| WIKTIONARY_REPO_ID = "cstr/de-wiktionary-sqlite-normalized" | |
| WIKTIONARY_CONN: Optional[sqlite3.Connection] = None | |
| WIKTIONARY_CONN_LOCK = threading.Lock() | |
| WIKTIONARY_AVAILABLE = False | |
| # --- ConceptNet Cache & Lock --- | |
| CONCEPTNET_CACHE: Dict[Tuple[str, str], Any] = {} | |
| CONCEPTNET_LOCK = threading.Lock() | |
| CONCEPTNET_CLIENT: Optional[Client] = None | |
| CONCEPTNET_CLIENT_LOCK = threading.Lock() | |
| # --- HanTa Tagger Cache & Lock --- | |
| HANTA_TAGGER_INSTANCE: Optional[HanoverTagger] = None | |
| HANTA_TAGGER_LOCK = threading.Lock() | |
| # --- DWDSmor Cache & Lock --- | |
| DWDSMOR_LEMMATIZER: Optional[Any] = None | |
| DWDSMOR_LEMMATIZER_LOCK = threading.Lock() | |
| # --- Helper --- | |
| def _html_wrap(content: str, line_height: str = "2.0") -> str: | |
| """Wraps displaCy HTML in a consistent, scrollable div.""" | |
| return f'<div style="overflow-x:auto; border: 1px solid #e6e9ef; border-radius: 0.25rem; padding: 1rem; line-height: {line_height};">{content}</div>' | |
| # --- Helper for SVA --- | |
| def _conjugate_to_person_number(verb_lemma: str, person: str, number: str) -> Optional[str]: | |
| """ | |
| Return a present tense finite form for given person/number. | |
| person in {'1','2','3'}, number in {'sg','pl'}. | |
| """ | |
| if not PATTERN_DE_AVAILABLE: | |
| return None | |
| try: | |
| alias = {"1sg":"1sg","2sg":"2sg","3sg":"3sg","1pl":"1pl","2pl":"2pl","3pl":"3pl"}[f"{person}{number}"] | |
| return conjugate(verb_lemma, alias) | |
| except Exception: | |
| return None | |
| # ============================================================================ | |
| # 3. SPACY ANALYZER LOGIC | |
| # ============================================================================ | |
| # --- Globals & Config for spaCy --- | |
| SPACY_MODEL_INFO: Dict[str, Tuple[str, str, str]] = { | |
| "de": ("German", "de_core_news_md", "spacy"), | |
| "en": ("English", "en_core_web_md", "spacy"), | |
| "es": ("Spanish", "es_core_news_md", "spacy"), | |
| "grc-proiel-trf": ("Ancient Greek (PROIEL TRF)", "grc_proiel_trf", "grecy"), | |
| "grc-perseus-trf": ("Ancient Greek (Perseus TRF)", "grc_perseus_trf", "grecy"), | |
| "grc_ner_trf": ("Ancient Greek (NER TRF)", "grc_ner_trf", "grecy"), | |
| "grc-proiel-lg": ("Ancient Greek (PROIEL LG)", "grc_proiel_lg", "grecy"), | |
| "grc-perseus-lg": ("Ancient Greek (Perseus LG)", "grc_perseus_lg", "grecy"), | |
| "grc-proiel-sm": ("Ancient Greek (PROIEL SM)", "grc_proiel_sm", "grecy"), | |
| "grc-perseus-sm": ("Ancient Greek (Perseus SM)", "grc_perseus_sm", "grecy"), | |
| } | |
| SPACY_UI_TEXT = { | |
| "de": { | |
| "title": "# π Mehrsprachiger Morpho-Syntaktischer Analysator", | |
| "subtitle": "Analysieren Sie Texte auf Deutsch, Englisch, Spanisch und Altgriechisch", | |
| "ui_lang_label": "BenutzeroberflΓ€chensprache", | |
| "model_lang_label": "Textsprache fΓΌr Analyse", | |
| "input_label": "Text eingeben", | |
| "input_placeholder": "Geben Sie hier Ihren Text ein...", | |
| "button_text": "Text analysieren", | |
| "button_processing_text": "Verarbeitung lΓ€uft...", | |
| "tab_graphic": "Grafische Darstellung", | |
| "tab_table": "Tabelle", | |
| "tab_json": "JSON", | |
| "tab_ner": "EntitΓ€ten", | |
| "html_label": "AbhΓ€ngigkeitsparsing", | |
| "table_label": "Morphologische Analyse", | |
| "table_headers": ["Wort", "Lemma", "POS", "Tag", "Morphologie", "AbhΓ€ngigkeit"], | |
| "json_label": "JSON-Ausgabe", | |
| "ner_label": "Benannte EntitΓ€ten", | |
| "error_message": "Fehler: " | |
| }, | |
| "en": { | |
| "title": "# π Multilingual Morpho-Syntactic Analyzer", | |
| "subtitle": "Analyze texts in German, English, Spanish, and Ancient Greek", | |
| "ui_lang_label": "Interface Language", | |
| "model_lang_label": "Text Language for Analysis", | |
| "input_label": "Enter Text", | |
| "input_placeholder": "Enter your text here...", | |
| "button_text": "Analyze Text", | |
| "button_processing_text": "Processing...", | |
| "tab_graphic": "Graphic View", | |
| "tab_table": "Table", | |
| "tab_json": "JSON", | |
| "tab_ner": "Entities", | |
| "html_label": "Dependency Parsing", | |
| "table_label": "Morphological Analysis", | |
| "table_headers": ["Word", "Lemma", "POS", "Tag", "Morphology", "Dependency"], | |
| "json_label": "JSON Output", | |
| "ner_label": "Named Entities", | |
| "error_message": "Error: " | |
| }, | |
| "es": { | |
| "title": "# π Analizador Morfo-SintΓ‘ctico MultilingΓΌe", | |
| "subtitle": "Analice textos en alemΓ‘n, inglΓ©s, espaΓ±ol y griego antiguo", | |
| "ui_lang_label": "Idioma de la Interfaz", | |
| "model_lang_label": "Idioma del Texto para AnΓ‘lisis", | |
| "input_label": "Introducir Texto", | |
| "input_placeholder": "Ingrese su texto aquΓ...", | |
| "button_text": "Analizar Texto", | |
| "button_processing_text": "Procesando...", | |
| "tab_graphic": "Vista GrΓ‘fica", | |
| "tab_table": "Tabla", | |
| "tab_json": "JSON", | |
| "tab_ner": "Entidades", | |
| "html_label": "AnΓ‘lisis de Dependencias", | |
| "table_label": "AnΓ‘lisis MorfolΓ³gico", | |
| "table_headers": ["Palabra", "Lema", "POS", "Etiqueta", "MorfologΓa", "Dependencia"], | |
| "json_label": "Salida JSON", | |
| "ner_label": "Entidades Nombradas", | |
| "error_message": "Error: " | |
| } | |
| } | |
| SPACY_MODELS: Dict[str, Optional[spacy.Language]] = {} | |
| # --- Dependency Installation --- | |
| def spacy_install_spacy_transformers_once(): | |
| """ Installs spacy-transformers, required for all _trf models. """ | |
| marker_file = Path(".spacy_transformers_installed") | |
| if marker_file.exists(): | |
| print("β spacy-transformers already installed (marker found)") | |
| return True | |
| print("Installing spacy-transformers (for _trf models)...") | |
| cmd = [sys.executable, "-m", "pip", "install", "spacy-transformers"] | |
| try: | |
| subprocess.run(cmd, capture_output=True, text=True, check=True, timeout=900) | |
| print("β Successfully installed spacy-transformers") | |
| marker_file.touch() | |
| return True | |
| except Exception as e: | |
| print(f"β FAILED to install spacy-transformers: {e}") | |
| if hasattr(e, 'stdout'): print(f"STDOUT: {e.stdout}") | |
| if hasattr(e, 'stderr'): print(f"STDERR: {e.stderr}") | |
| return False | |
| def spacy_install_grecy_model_from_github(model_name: str) -> bool: | |
| """ Installs a greCy model from GitHub Release. """ | |
| marker_file = Path(f".{model_name}_installed") | |
| if marker_file.exists(): | |
| print(f"β {model_name} already installed (marker found)") | |
| return True | |
| print(f"Installing grecy model: {model_name}...") | |
| if model_name == "grc_proiel_trf": | |
| wheel_filename = "grc_proiel_trf-3.7.5-py3-none-any.whl" | |
| elif model_name in ["grc_perseus_trf", "grc_proiel_lg", "grc_perseus_lg", | |
| "grc_proiel_sm", "grc_perseus_sm", "grc_ner_trf"]: | |
| wheel_filename = f"{model_name}-0.0.0-py3-none-any.whl" | |
| else: | |
| print(f"β Unknown grecy model: {model_name}") | |
| return False | |
| install_url = f"https://github.com/CrispStrobe/greCy/releases/download/v1.0-models/{wheel_filename}" | |
| cmd = [sys.executable, "-m", "pip", "install", install_url, "--no-deps"] | |
| print(f"Running: {' '.join(cmd)}") | |
| try: | |
| result = subprocess.run(cmd, capture_output=True, text=True, check=True, timeout=900) | |
| if result.stdout: print("STDOUT:", result.stdout) | |
| if result.stderr: print("STDERR:", result.stderr) | |
| print(f"β Successfully installed {model_name} from GitHub") | |
| marker_file.touch() | |
| return True | |
| except subprocess.CalledProcessError as e: | |
| print(f"β Installation subprocess FAILED with code {e.returncode}") | |
| print("STDOUT:", e.stdout) | |
| print("STDERR:", e.stderr) | |
| return False | |
| except Exception as e: | |
| print(f"β Installation exception: {e}") | |
| traceback.print_exc() | |
| return False | |
| # --- Model Loading (Lazy Loading) --- | |
| def spacy_load_spacy_model(model_name: str) -> Optional[spacy.Language]: | |
| """Load or install a standard spaCy model.""" | |
| try: | |
| return spacy.load(model_name) | |
| except OSError: | |
| print(f"Installing {model_name}...") | |
| try: | |
| subprocess.check_call([sys.executable, "-m", "spacy", "download", model_name]) | |
| return spacy.load(model_name) | |
| except Exception as e: | |
| print(f"β Failed to install {model_name}: {e}") | |
| if hasattr(e, 'stderr'): print(f"STDERR: {e.stderr}") | |
| return None | |
| def spacy_load_grecy_model(model_name: str) -> Optional[spacy.Language]: | |
| """ Load a grecy model, installing from GitHub if needed. """ | |
| if not spacy_install_grecy_model_from_github(model_name): | |
| print(f"β Cannot load {model_name} because installation failed.") | |
| return None | |
| try: | |
| print("Refreshing importlib to find new package...") | |
| importlib.invalidate_caches() | |
| try: importlib.reload(site) | |
| except Exception: pass | |
| print(f"Trying: spacy.load('{model_name}')") | |
| nlp = spacy.load(model_name) | |
| print(f"β Successfully loaded {model_name}") | |
| return nlp | |
| except Exception as e: | |
| print(f"β Model {model_name} is installed but FAILED to load.") | |
| print(f" Error: {e}") | |
| traceback.print_exc() | |
| return None | |
| def spacy_initialize_models(): | |
| """ Pre-load standard models and ensure _trf dependencies are ready. """ | |
| print("\n" + "="*70) | |
| print("INITIALIZING SPACY MODELS") | |
| print("="*70 + "\n") | |
| spacy_install_spacy_transformers_once() | |
| loaded_count = 0 | |
| spacy_model_count = 0 | |
| for lang_code, (lang_name, model_name, model_type) in SPACY_MODEL_INFO.items(): | |
| if model_type == "spacy": | |
| spacy_model_count += 1 | |
| print(f"Loading {lang_name} ({model_name})...") | |
| nlp = spacy_load_spacy_model(model_name) | |
| SPACY_MODELS[lang_code] = nlp | |
| if nlp: | |
| print(f"β {lang_name} ready\n") | |
| loaded_count += 1 | |
| else: | |
| print(f"β {lang_name} FAILED\n") | |
| else: | |
| print(f"β {lang_name} ({model_name}) will be loaded on first use.\n") | |
| SPACY_MODELS[lang_code] = None | |
| print(f"Pre-loaded {loaded_count}/{spacy_model_count} standard models.") | |
| print("="*70 + "\n") | |
| # --- Analysis Logic --- | |
| def spacy_get_analysis(ui_lang: str, model_lang_key: str, text: str): | |
| """Analyze text and return results.""" | |
| ui_config = SPACY_UI_TEXT.get(ui_lang.lower(), SPACY_UI_TEXT["en"]) | |
| error_prefix = ui_config["error_message"] | |
| try: | |
| if not text.strip(): | |
| return ([], [], "<p style='color: orange;'>No text provided.</p>", "<p>No text provided.</p>", | |
| gr.Button(value=ui_config["button_text"], interactive=True)) | |
| nlp = SPACY_MODELS.get(model_lang_key) | |
| if nlp is None: | |
| print(f"First use of {model_lang_key}. Loading model...") | |
| if model_lang_key not in SPACY_MODEL_INFO: | |
| raise ValueError(f"Unknown model key: {model_lang_key}") | |
| _, model_name, model_type = SPACY_MODEL_INFO[model_lang_key] | |
| if model_type == "grecy": | |
| nlp = spacy_load_grecy_model(model_name) | |
| else: | |
| nlp = spacy_load_spacy_model(model_name) | |
| if nlp is None: | |
| SPACY_MODELS.pop(model_lang_key, None) | |
| err_msg = f"Model for {model_lang_key} ({model_name}) FAILED to load. Check logs." | |
| err_html = f"<p style='color: red;'>{err_msg}</p>" | |
| return ([], {"error": err_msg}, err_html, err_html, | |
| gr.Button(value=ui_config["button_text"], interactive=True)) | |
| else: | |
| SPACY_MODELS[model_lang_key] = nlp | |
| print(f"β {model_lang_key} is now loaded and cached.") | |
| doc = nlp(text) | |
| dataframe_output = [] | |
| json_output = [] | |
| for token in doc: | |
| lemma_str = token.lemma_ | |
| morph_str = str(token.morph) if token.morph else '' | |
| dep_str = token.dep_ if doc.is_parsed else '' | |
| tag_str = token.tag_ or '' | |
| pos_str = token.pos_ or '' | |
| json_output.append({ | |
| "word": token.text, "lemma": lemma_str, "pos": pos_str, | |
| "tag": tag_str, "morphology": morph_str, "dependency": dep_str, | |
| "is_stopword": token.is_stop | |
| }) | |
| dataframe_output.append([token.text, lemma_str, pos_str, tag_str, morph_str, dep_str]) | |
| html_dep_out = "" | |
| if "parser" in nlp.pipe_names and doc.is_parsed: | |
| try: | |
| options = {"compact": True, "bg": "#ffffff", "color": "#000000", "font": "Source Sans Pro"} | |
| html_svg = displacy.render(doc, style="dep", jupyter=False, options=options) | |
| html_dep_out = _html_wrap(html_svg, line_height="2.5") | |
| except Exception as e: | |
| html_dep_out = f"<p style='color: orange;'>Visualization error (DEP): {e}</p>" | |
| else: | |
| html_dep_out = "<p style='color: orange;'>Dependency parsing ('parser') not available or doc not parsed.</p>" | |
| html_ner_out = "" | |
| if "ner" in nlp.pipe_names: | |
| if doc.ents: | |
| try: | |
| html_ner = displacy.render(doc, style="ent", jupyter=False) | |
| html_ner_out = _html_wrap(html_ner, line_height="2.5") | |
| except Exception as e: | |
| html_ner_out = f"<p style='color: orange;'>Visualization error (NER): {e}</p>" | |
| else: | |
| html_ner_out = "<p>No named entities found in this text.</p>" | |
| else: | |
| html_ner_out = "<p style='color: orange;'>Named Entity Recognition ('ner') not available for this model.</p>" | |
| return (dataframe_output, json_output, html_dep_out, html_ner_out, | |
| gr.Button(value=ui_config["button_text"], interactive=True)) | |
| except Exception as e: | |
| traceback.print_exc() | |
| error_html = f"<div style='color: red; border: 1px solid red; padding: 10px; border-radius: 5px; background-color: #fff5f5;'><strong>{error_prefix}</strong> {str(e)}</div>" | |
| return ([], {"error": str(e)}, error_html, error_html, | |
| gr.Button(value=ui_config["button_text"], interactive=True)) | |
| # --- UI Update Logic --- | |
| def spacy_update_ui(ui_lang: str): | |
| """Update UI language for the spaCy tab.""" | |
| ui_config = SPACY_UI_TEXT.get(ui_lang.lower(), SPACY_UI_TEXT["en"]) | |
| return [ | |
| gr.update(value=ui_config["title"]), | |
| gr.update(value=ui_config["subtitle"]), | |
| gr.update(label=ui_config["ui_lang_label"]), | |
| gr.update(label=ui_config["model_lang_label"]), | |
| gr.update(label=ui_config["input_label"], placeholder=ui_config["input_placeholder"]), | |
| gr.update(value=ui_config["button_text"]), | |
| gr.update(label=ui_config["tab_graphic"]), | |
| gr.update(label=ui_config["tab_table"]), | |
| gr.update(label=ui_config["tab_json"]), | |
| gr.update(label=ui_config["tab_ner"]), | |
| gr.update(label=ui_config["html_label"]), | |
| gr.update(label=ui_config["table_label"], headers=ui_config["table_headers"]), | |
| gr.update(label=ui_config["json_label"]), | |
| gr.update(label=ui_config["ner_label"]) | |
| ] | |
| # ============================================================================ | |
| # 3b. IWNLP PIPELINE (NEW) | |
| # ============================================================================ | |
| IWNLP_PIPELINE: Optional[spacy.Language] = None | |
| IWNLP_LOCK = threading.Lock() | |
| # Define paths for the data | |
| DATA_DIR = "data" | |
| LEMMATIZER_JSON_NAME = "IWNLP.Lemmatizer_20181001.json" | |
| LEMMATIZER_JSON_PATH = os.path.join(DATA_DIR, LEMMATIZER_JSON_NAME) | |
| LEMMATIZER_ZIP_URL = "https://dbs.cs.uni-duesseldorf.de/datasets/iwnlp/IWNLP.Lemmatizer_20181001.zip" | |
| LEMMATIZER_ZIP_PATH = os.path.join(DATA_DIR, "IWNLP.Lemmatizer_20181001.zip") | |
| def iwnlp_download_and_unzip_data(): | |
| """ | |
| Checks for IWNLP data file. Downloads and unzips if not present. | |
| """ | |
| if os.path.exists(LEMMATIZER_JSON_PATH): | |
| print("β IWNLP data file already exists.") | |
| return True | |
| # --- File not found, must download and unzip --- | |
| try: | |
| os.makedirs(DATA_DIR, exist_ok=True) | |
| # 1. Download the ZIP file if it's not already here | |
| if not os.path.exists(LEMMATIZER_ZIP_PATH): | |
| print(f"IWNLP data not found. Downloading from {LEMMATIZER_ZIP_URL}...") | |
| with requests.get(LEMMATIZER_ZIP_URL, stream=True) as r: | |
| r.raise_for_status() | |
| with open(LEMMATIZER_ZIP_PATH, 'wb') as f: | |
| for chunk in r.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| print("β IWNLP Download complete.") | |
| else: | |
| print("β IWNLP zip file already present.") | |
| # 2. Unzip the file | |
| print(f"Unzipping '{LEMMATIZER_ZIP_PATH}'...") | |
| with zipfile.ZipFile(LEMMATIZER_ZIP_PATH, 'r') as zip_ref: | |
| # Extract the specific file we need to the data directory | |
| zip_ref.extract(LEMMATIZER_JSON_NAME, path=DATA_DIR) | |
| print(f"β Unzip complete. File extracted to {LEMMATIZER_JSON_PATH}") | |
| if not os.path.exists(LEMMATIZER_JSON_PATH): | |
| raise Exception("Unzip appeared to succeed, but the .json file is still missing.") | |
| return True | |
| except Exception as e: | |
| print(f"β CRITICAL: Failed to download or unzip IWNLP data: {e}") | |
| traceback.print_exc() | |
| return False | |
| def iwnlp_get_pipeline() -> Optional[spacy.Language]: | |
| """ Thread-safe function to get a single instance of the IWNLP pipeline. """ | |
| global IWNLP_PIPELINE | |
| if not IWNLP_AVAILABLE: | |
| raise ImportError("spacy-iwnlp library is not installed.") | |
| if IWNLP_PIPELINE: | |
| return IWNLP_PIPELINE | |
| with IWNLP_LOCK: | |
| if IWNLP_PIPELINE: | |
| return IWNLP_PIPELINE | |
| try: | |
| print("Initializing spaCy-IWNLP pipeline...") | |
| # --- 1. Ensure data file exists --- | |
| if not iwnlp_download_and_unzip_data(): | |
| return None # Failed to get data | |
| # --- 2. Load spaCy model --- | |
| print("Loading 'de_core_news_md' for IWNLP...") | |
| nlp_de = SPACY_MODELS.get("de") | |
| if not nlp_de: | |
| nlp_de = spacy_load_spacy_model("de_core_news_md") | |
| if nlp_de: | |
| SPACY_MODELS["de"] = nlp_de | |
| else: | |
| raise Exception("Failed to load 'de_core_news_md' for IWNLP.") | |
| # --- 3. Add IWNLP pipe --- | |
| if not nlp_de.has_pipe("iwnlp"): | |
| # This is the V3.0 initialization method | |
| nlp_de.add_pipe('iwnlp', config={'lemmatizer_path': LEMMATIZER_JSON_PATH}) | |
| print("β IWNLP pipe added to 'de' model.") | |
| else: | |
| print("β IWNLP pipe already present.") | |
| IWNLP_PIPELINE = nlp_de | |
| return IWNLP_PIPELINE | |
| except Exception as e: | |
| print(f"CRITICAL ERROR: Failed to initialize IWNLP pipeline: {e}") | |
| traceback.print_exc() | |
| return None | |
| # ============================================================================ | |
| # 4. LANGUAGETOOL LOGIC | |
| # ============================================================================ | |
| # --- Globals for LanguageTool --- | |
| LT_TOOL_INSTANCE: Optional[language_tool_python.LanguageTool] = None | |
| LT_TOOL_LOCK = threading.Lock() | |
| def lt_get_language_tool() -> Optional[language_tool_python.LanguageTool]: | |
| """ Thread-safe function to get a single instance of the LanguageTool. """ | |
| global LT_TOOL_INSTANCE | |
| if not LT_AVAILABLE: | |
| raise ImportError("language-tool-python library is not installed.") | |
| if LT_TOOL_INSTANCE: | |
| return LT_TOOL_INSTANCE | |
| with LT_TOOL_LOCK: | |
| if LT_TOOL_INSTANCE: | |
| return LT_TOOL_INSTANCE | |
| try: | |
| print("Initializing LanguageTool for German (de-DE)...") | |
| tool = language_tool_python.LanguageTool('de-DE') | |
| try: | |
| tool.picky = True | |
| except Exception: | |
| pass | |
| _ = tool.check("Dies ist ein Test.") | |
| print("LanguageTool (local server) initialized successfully.") | |
| LT_TOOL_INSTANCE = tool | |
| return LT_TOOL_INSTANCE | |
| except Exception as e: | |
| print(f"CRITICAL ERROR: Failed to initialize LanguageTool: {e}") | |
| return None | |
| # --- Grammar Checking Logic --- | |
| def lt_check_grammar(text: str) -> List[Dict[str, Any]]: | |
| """ Checks a German text for grammar and spelling errors and returns a JSON list. """ | |
| try: | |
| tool = lt_get_language_tool() | |
| if tool is None: | |
| return [{"error": "LanguageTool service failed to initialize."}] | |
| if not text or not text.strip(): | |
| return [{"info": "No text provided to check."}] | |
| print(f"Checking text: {text}") | |
| matches = tool.check(text) | |
| if not matches: | |
| try: | |
| tool.picky = True | |
| matches = tool.check(text) | |
| except Exception: | |
| pass | |
| if not matches: | |
| return [{"info": "No errors found!", "status": "perfect"}] | |
| errors_list = [] | |
| for match in matches: | |
| error = { | |
| "message": match.message, | |
| "rule_id": match.ruleId, | |
| "category": getattr(match.category, 'name', match.category), | |
| "incorrect_text": text[match.offset : match.offset + match.errorLength], | |
| "replacements": match.replacements, | |
| "offset": match.offset, | |
| "length": match.errorLength, | |
| "context": getattr(match, "context", None), | |
| "short_message": getattr(match, "shortMessage", None) | |
| } | |
| errors_list.append(error) | |
| print(f"Found {len(errors_list)} errors.") | |
| return errors_list | |
| except Exception as e: | |
| traceback.print_exc() | |
| return [{"error": f"An unexpected error occurred: {str(e)}"}] | |
| # ============================================================================ | |
| # 5. ODENET THESAURUS LOGIC | |
| # ============================================================================ | |
| # --- Globals & Classes for OdeNet --- | |
| class OdeNetWorkItem: | |
| """Represents a lookup request.""" | |
| word: str | |
| response_queue: queue.Queue | |
| class OdeNetWorkerState(Enum): | |
| NOT_STARTED = 1 | |
| INITIALIZING = 2 | |
| READY = 3 | |
| ERROR = 4 | |
| odenet_worker_state = OdeNetWorkerState.NOT_STARTED | |
| odenet_worker_thread = None | |
| odenet_work_queue = queue.Queue() | |
| odenet_de_wn = None | |
| # --- Worker Thread Logic --- | |
| def odenet_download_wordnet_data(): | |
| """Download WordNet data. Called once by worker thread.""" | |
| if not WN_AVAILABLE: | |
| print("[OdeNet Worker] 'wn' library not available. Skipping download.") | |
| return False | |
| try: | |
| print("[OdeNet Worker] Downloading WordNet data...") | |
| try: | |
| wn.download('odenet:1.4') | |
| except Exception as e: | |
| print(f"[OdeNet Worker] Note: odenet download: {e}") | |
| try: | |
| wn.download('cili:1.0') | |
| except Exception as e: | |
| print(f"[OdeNet Worker] Note: cili download: {e}") | |
| print("[OdeNet Worker] β WordNet data ready") | |
| return True | |
| except Exception as e: | |
| print(f"[OdeNet Worker] β Failed to download WordNet data: {e}") | |
| return False | |
| def odenet_worker_loop(): | |
| """ Worker thread main loop. """ | |
| global odenet_worker_state, odenet_de_wn | |
| if not WN_AVAILABLE: | |
| print("[OdeNet Worker] 'wn' library not available. Worker cannot start.") | |
| odenet_worker_state = OdeNetWorkerState.ERROR | |
| return | |
| try: | |
| print("[OdeNet Worker] Starting worker thread...") | |
| odenet_worker_state = OdeNetWorkerState.INITIALIZING | |
| if not odenet_download_wordnet_data(): | |
| odenet_worker_state = OdeNetWorkerState.ERROR | |
| print("[OdeNet Worker] Failed to initialize") | |
| return | |
| print("[OdeNet Worker] Creating WordNet instance...") | |
| odenet_de_wn = wn.Wordnet('odenet:1.4') | |
| odenet_worker_state = OdeNetWorkerState.READY | |
| print("[OdeNet Worker] Ready to process requests") | |
| while True: | |
| try: | |
| item: OdeNetWorkItem = odenet_work_queue.get(timeout=1) | |
| try: | |
| result = odenet_process_word_lookup(item.word) | |
| item.response_queue.put(("success", result)) | |
| except Exception as e: | |
| traceback.print_exc() | |
| item.response_queue.put(("error", str(e))) | |
| finally: | |
| odenet_work_queue.task_done() | |
| except queue.Empty: | |
| continue | |
| except Exception as e: | |
| print(f"[OdeNet Worker] Fatal error: {e}") | |
| traceback.print_exc() | |
| odenet_worker_state = OdeNetWorkerState.ERROR | |
| def odenet_process_word_lookup(word: str) -> Dict[str, Any]: | |
| """ Process a single word lookup. Runs in the worker thread. """ | |
| global odenet_de_wn | |
| if not word or not word.strip(): | |
| return {"info": "No word provided to check."} | |
| word = word.strip().lower() | |
| senses = odenet_de_wn.senses(word) | |
| if not senses: | |
| return {"info": f"The word '{word}' was not found in the thesaurus."} | |
| results: Dict[str, Any] = {"input_word": word, "senses": []} | |
| for sense in senses: | |
| synset = sense.synset() | |
| def get_lemmas(synsets, remove_self=False): | |
| lemmas: Set[str] = set() | |
| for s in synsets: | |
| for lemma in s.lemmas(): | |
| if not (remove_self and lemma == word): | |
| lemmas.add(lemma) | |
| return sorted(list(lemmas)) | |
| antonym_words: Set[str] = set() | |
| try: | |
| for ant_sense in sense.get_related('antonym'): | |
| antonym_words.add(ant_sense.word().lemma()) | |
| except Exception: | |
| pass | |
| sense_info = { | |
| "pos": synset.pos, | |
| "definition": synset.definition() or "No definition available.", | |
| "synonyms": get_lemmas([synset], remove_self=True), | |
| "antonyms": sorted(list(antonym_words)), | |
| "hypernyms (is a type of)": get_lemmas(synset.hypernyms()), | |
| "hyponyms (examples are)": get_lemmas(synset.hyponyms()), | |
| "holonyms (is part of)": get_lemmas(synset.holonyms()), | |
| "meronyms (has parts)": get_lemmas(synset.meronyms()), | |
| } | |
| results["senses"].append(sense_info) | |
| print(f"[OdeNet Worker] Found {len(results['senses'])} senses for '{word}'") | |
| return results | |
| def odenet_start_worker(): | |
| """Start the worker thread if not already started.""" | |
| global odenet_worker_thread, odenet_worker_state | |
| if odenet_worker_state != OdeNetWorkerState.NOT_STARTED: | |
| return | |
| if not WN_AVAILABLE: | |
| print("[OdeNet] 'wn' library not available. Worker will not be started.") | |
| odenet_worker_state = OdeNetWorkerState.ERROR | |
| return | |
| odenet_worker_thread = threading.Thread(target=odenet_worker_loop, daemon=True, name="OdeNetWorker") | |
| odenet_worker_thread.start() | |
| timeout = 30 | |
| for _ in range(timeout * 10): | |
| if odenet_worker_state in (OdeNetWorkerState.READY, OdeNetWorkerState.ERROR): | |
| break | |
| threading.Event().wait(0.1) | |
| if odenet_worker_state != OdeNetWorkerState.READY: | |
| raise Exception("OdeNet Worker failed to initialize") | |
| # --- Public API (Called by Gradio) --- | |
| def odenet_get_thesaurus_info(word: str) -> Dict[str, Any]: | |
| """ Public API: Finds thesaurus info for a German word. Thread-safe. """ | |
| if not WN_AVAILABLE: | |
| return {"error": "WordNet (wn) library is not available."} | |
| if odenet_worker_state != OdeNetWorkerState.READY: | |
| return {"error": "WordNet service is not ready yet. Please try again in a moment."} | |
| try: | |
| response_queue = queue.Queue() | |
| item = OdeNetWorkItem(word=word, response_queue=response_queue) | |
| odenet_work_queue.put(item) | |
| try: | |
| status, result = response_queue.get(timeout=30) | |
| if status == "success": | |
| return result | |
| else: | |
| return {"error": f"Lookup failed: {result}"} | |
| except queue.Empty: | |
| return {"error": "Request timed out"} | |
| except Exception as e: | |
| traceback.print_exc() | |
| return {"error": f"An unexpected error occurred: {str(e)}"} | |
| # ============================================================================ | |
| # 6. PATTERN INFLECTION LOGIC | |
| # ============================================================================ | |
| # --- Word Type Detection --- | |
| def pattern_detect_word_type(word: str) -> Dict[str, Any]: | |
| """ Use pattern.de's parser as a hint. """ | |
| if not PATTERN_DE_AVAILABLE: | |
| return {'pos': None, 'lemma': word, 'type': 'unknown'} | |
| if not word or not word.strip() or all(ch in ".,;:!?()[]{}-ββ'.../\|" for ch in word): | |
| return {'pos': None, 'lemma': word, 'type': 'unknown'} | |
| word_norm = word.strip() | |
| log(f"Detecting type for: {word_norm}") | |
| parser_result = {'pos': None, 'lemma': word_norm, 'type': None} | |
| try: | |
| parsed = parse(word_norm, lemmata=True) | |
| for sentence in split(parsed): | |
| if hasattr(sentence, "words") and sentence.words: | |
| w = sentence.words[0] | |
| w_type = getattr(w, "type", None) or getattr(w, "pos", None) | |
| w_lemma = (getattr(w, "lemma", None) or word_norm) | |
| non_content_prefixes = ("DT","ART","IN","APPR","APPRART","APPO","APZR","PTK","PRP","PPER","PPOS","PDS","PIS","KOUI","KON","$,","$.") | |
| if w_type and any(w_type.startswith(p) for p in non_content_prefixes): | |
| return {'pos': w_type, 'lemma': w_lemma, 'type': None} | |
| parser_result['pos'] = w_type or "" | |
| parser_result['lemma'] = w_lemma | |
| if w_type and w_type.startswith('NN'): | |
| parser_result['type'] = 'noun' | |
| elif w_type and w_type.startswith('VB'): | |
| parser_result['type'] = 'verb' | |
| elif w_type and w_type.startswith('JJ'): | |
| parser_result['type'] = 'adjective' | |
| log(f" Parser says: POS={w_type}, lemma={w_lemma}, type={parser_result['type']}") | |
| except Exception as e: | |
| log(f" Parser failed: {e}") | |
| return parser_result | |
| def pattern_is_good_analysis(analysis, analysis_type): | |
| """Check if an analysis has meaningful data.""" | |
| if not analysis: return False | |
| if analysis_type == 'noun': | |
| # Check for declensions, either in the simple or ambiguous map | |
| return len(analysis.get('declension', {})) >= 4 or len(analysis.get('declension_by_gender', {})) > 0 | |
| elif analysis_type == 'verb': | |
| present = analysis.get('conjugation', {}).get('PrΓ€sens', {}) | |
| if len(present) < 4: return False | |
| unique_forms = set(present.values()) | |
| if len(unique_forms) < 2: return False | |
| return True | |
| elif analysis_type == 'adjective': | |
| # **FIX: Better adjective validation** | |
| # Must have attributive forms | |
| if len(analysis.get('attributive', {})) == 0: | |
| log(" β Not a good adjective: No attributive forms.") | |
| return False | |
| pred = analysis.get('predicative', '') | |
| comp = analysis.get('comparative', '') | |
| sup = analysis.get('superlative', '') | |
| if not pred: | |
| log(" β Not a good adjective: No predicative form.") | |
| return False | |
| # Filter out nonsense: "lauf" -> "laufer", "laufst" | |
| # Real comparatives end in -er. Real superlatives end in -st or -est. | |
| # This allows "rasch" (rascher, raschst) but rejects "lauf" (laufer, laufst) | |
| if comp and not comp.endswith("er"): | |
| log(f" β Not a good adjective: Comparative '{comp}' doesn't end in -er.") | |
| return False | |
| if sup and not (sup.endswith("st") or sup.endswith("est")): | |
| log(f" β Not a good adjective: Superlative '{sup}' doesn't end in -st/-est.") | |
| return False | |
| return True | |
| return False | |
| # --- Inflection Generators --- | |
| def pattern_analyze_as_noun(word: str, hint_lemma: str = None) -> Dict[str, Any]: | |
| """Comprehensive noun inflection analysis.""" | |
| log(f" Analyzing as noun (hint_lemma={hint_lemma})") | |
| analysis = {} | |
| singular = singularize(word) | |
| plural = pluralize(word) | |
| log(f" singularize({word}) = {singular}") | |
| log(f" pluralize({word}) = {plural}") | |
| if plural != word and singular != word: | |
| base = word | |
| log(f" Word changes when pluralized => base = {base}") | |
| elif singular != word: | |
| base = singular | |
| log(f" Word changes when singularized => base = {base}") | |
| elif hint_lemma and hint_lemma != word: | |
| base = hint_lemma | |
| log(f" Using hint lemma => base = {base}") | |
| else: | |
| # This is a valid case, e.g. "Lauf" (singular) | |
| base = word | |
| log(f" Word is already base form => base = {base}") | |
| g = gender(base, pos=NOUN) | |
| log(f" gender({base}) = {g}") | |
| # --- AMBIGUITY HANDLING for Nouns (e.g., der/das See) --- | |
| if isinstance(g, tuple): | |
| genders = list(g) | |
| log(f" Detected ambiguous gender: {genders}") | |
| elif g is None: | |
| genders = [MALE] # Default | |
| log(f" Gender unknown, defaulting to MALE") | |
| else: | |
| genders = [g] | |
| analysis["base_form"] = base | |
| analysis["plural"] = pluralize(base) | |
| analysis["singular"] = base | |
| analysis["declension_by_gender"] = {} | |
| for gen in genders: | |
| gender_str = {MALE: "Masculine", FEMALE: "Feminine", NEUTRAL: "Neuter"}.get(gen, "Unknown") | |
| gen_declension = {} | |
| for number, number_name in [(SINGULAR, "Singular"), (PLURAL, "Plural")]: | |
| word_form = base if number == SINGULAR else pluralize(base) | |
| word_form_cap = word_form.capitalize() | |
| gender_for_article = gen if number == SINGULAR else PLURAL | |
| for case, case_name in [(NOMINATIVE, "Nominativ"), (ACCUSATIVE, "Akkusativ"), | |
| (DATIVE, "Dativ"), (GENITIVE, "Genitiv")]: | |
| try: | |
| def_art = article(word_form, DEFINITE, gender_for_article, case) | |
| indef_art = article(word_form, INDEFINITE, gender_for_article, case) | |
| indef_form = f"{indef_art} {word_form_cap}" if indef_art else word_form_cap | |
| if number == PLURAL: | |
| indef_form = "β" | |
| gen_declension[f"{case_name} {number_name}"] = { | |
| "definite": f"{def_art} {word_form_cap}" if def_art else word_form_cap, | |
| "indefinite": indef_form, | |
| "bare": word_form_cap | |
| } | |
| except Exception as e: | |
| log(f" Failed to get article for {gender_str}/{case_name} {number_name}: {e}") | |
| analysis["declension_by_gender"][gender_str] = gen_declension | |
| log(f" Generated declensions for {len(genders)} gender(s)") | |
| if len(genders) == 1: | |
| analysis["declension"] = analysis["declension_by_gender"][list(analysis["declension_by_gender"].keys())[0]] | |
| analysis["gender"] = list(analysis["declension_by_gender"].keys())[0] | |
| return analysis | |
| def pattern_analyze_as_verb(word: str, hint_lemma: str = None) -> Dict[str, Any]: | |
| """Comprehensive verb conjugation analysis.""" | |
| log(f" Analyzing as verb (hint_lemma={hint_lemma})") | |
| verb_lemma = lemma(word) | |
| log(f" lemma({word}) = {verb_lemma}") | |
| if not verb_lemma or verb_lemma == word: | |
| if hint_lemma and hint_lemma != word: | |
| verb_lemma = hint_lemma | |
| log(f" Using hint lemma: {verb_lemma}") | |
| elif not verb_lemma: | |
| log(f" No lemma found, trying base word") | |
| verb_lemma = word # e.g. "lauf" | |
| analysis = {"infinitive": verb_lemma} | |
| try: | |
| lex = lexeme(verb_lemma) | |
| if lex and len(lex) > 1: | |
| analysis["lexeme"] = lex | |
| log(f" lexeme has {len(lex)} forms") | |
| except Exception as e: | |
| log(f" Failed to get lexeme: {e}") | |
| analysis["conjugation"] = {} | |
| analysis["conjugation"]["PrΓ€sens"] = {} | |
| present_count = 0 | |
| for alias, name in [("1sg", "ich"), ("2sg", "du"), ("3sg", "er/sie/es"), | |
| ("1pl", "wir"), ("2pl", "ihr"), ("3pl", "sie/Sie")]: | |
| try: | |
| form = conjugate(verb_lemma, alias) | |
| if form: | |
| analysis["conjugation"]["PrΓ€sens"][name] = form | |
| present_count += 1 | |
| except Exception as e: | |
| log(f" Failed conjugate({verb_lemma}, {alias}): {e}") | |
| log(f" Generated {present_count} present tense forms") | |
| if present_count < 4: | |
| # Try again with infinitive, e.g. if input was "lauf" | |
| try: | |
| verb_lemma = conjugate(word, INFINITIVE) | |
| log(f" Retrying with infinitive '{verb_lemma}'") | |
| analysis["infinitive"] = verb_lemma | |
| present_count = 0 | |
| for alias, name in [("1sg", "ich"), ("2sg", "du"), ("3sg", "er/sie/es"), | |
| ("1pl", "wir"), ("2pl", "ihr"), ("3pl", "sie/Sie")]: | |
| form = conjugate(verb_lemma, alias) | |
| if form: | |
| analysis["conjugation"]["PrΓ€sens"][name] = form | |
| present_count += 1 | |
| if present_count < 4: | |
| log(f" Too few present forms, not a valid verb") | |
| return None | |
| except Exception as e: | |
| log(f" Retry failed, not a valid verb: {e}") | |
| return None | |
| analysis["conjugation"]["PrΓ€teritum"] = {} | |
| for alias, name in [("1sgp", "ich"), ("2sgp", "du"), ("3sgp", "er/sie/es"), | |
| ("1ppl", "wir"), ("2ppl", "ihr"), ("3ppl", "sie/Sie")]: | |
| try: | |
| form = conjugate(verb_lemma, alias) | |
| if form: analysis["conjugation"]["PrΓ€teritum"][name] = form | |
| except: pass | |
| analysis["participles"] = {} | |
| try: | |
| form = conjugate(verb_lemma, "part") | |
| if form: analysis["participles"]["Partizip PrΓ€sens"] = form | |
| except: pass | |
| try: | |
| form = conjugate(verb_lemma, "ppart") | |
| if form: analysis["participles"]["Partizip Perfekt"] = form | |
| except: pass | |
| analysis["conjugation"]["Imperativ"] = {} | |
| for alias, name in [("2sg!", "du"), ("2pl!", "ihr")]: | |
| try: | |
| form = conjugate(verb_lemma, alias) | |
| if form: analysis["conjugation"]["Imperativ"][name] = form | |
| except: pass | |
| analysis["conjugation"]["Konjunktiv I"] = {} | |
| for alias, name in [("1sg?", "ich"), ("2sg?", "du"), ("3sg?", "er/sie/es"), | |
| ("1pl?", "wir"), ("2pl?", "ihr"), ("3pl?", "sie/Sie")]: | |
| try: | |
| form = conjugate(verb_lemma, alias) | |
| if form: analysis["conjugation"]["Konjunktiv I"][name] = form | |
| except: pass | |
| analysis["conjugation"]["Konjunktiv II"] = {} | |
| for alias, name in [("1sgp?", "ich"), ("2sgp?", "du"), ("3sgp?", "er/sie/es"), | |
| ("1ppl?", "wir"), ("2ppl?", "ihr"), ("3ppl?", "sie/Sie")]: | |
| try: | |
| form = conjugate(verb_lemma, alias) | |
| if form: analysis["conjugation"]["Konjunktiv II"][name] = form | |
| except: pass | |
| return analysis | |
| def pattern_analyze_as_adjective(word: str, hint_lemma: str = None) -> Dict[str, Any]: | |
| """Comprehensive adjective inflection analysis.""" | |
| log(f" Analyzing as adjective (hint_lemma={hint_lemma})") | |
| base = predicative(word) | |
| log(f" predicative({word}) = {base}") | |
| if base == word.lower() and hint_lemma and hint_lemma != word: | |
| base = hint_lemma | |
| log(f" Using hint lemma: {base}") | |
| analysis = {} | |
| analysis["predicative"] = base | |
| # *** FIX: Removed pos=ADJECTIVE, which was causing a crash *** | |
| try: | |
| analysis["comparative"] = comparative(base) | |
| except Exception as e: | |
| log(f" Failed to get comparative: {e}") | |
| analysis["comparative"] = f"{base}er" # Fallback | |
| try: | |
| analysis["superlative"] = superlative(base) | |
| except Exception as e: | |
| log(f" Failed to get superlative: {e}") | |
| analysis["superlative"] = f"{base}st" # Fallback | |
| log(f" comparative = {analysis['comparative']}") | |
| log(f" superlative = {analysis['superlative']}") | |
| analysis["attributive"] = {} | |
| attr_count = 0 | |
| for article_type, article_name in [(None, "Strong"), (INDEFINITE, "Mixed"), (DEFINITE, "Weak")]: | |
| analysis["attributive"][article_name] = {} | |
| for gender, gender_name in [(MALE, "Masculine"), (FEMALE, "Feminine"), | |
| (NEUTRAL, "Neuter"), (PLURAL, "Plural")]: | |
| analysis["attributive"][article_name][gender_name] = {} | |
| for case, case_name in [(NOMINATIVE, "Nom"), (ACCUSATIVE, "Acc"), | |
| (DATIVE, "Dat"), (GENITIVE, "Gen")]: | |
| try: | |
| attr_form = attributive(base, gender, case, article_type) | |
| if article_type: | |
| art = article("_", article_type, gender, case) | |
| full_form = f"{art} {attr_form} [Noun]" if art else f"{attr_form} [Noun]" | |
| else: | |
| full_form = f"{attr_form} [Noun]" | |
| analysis["attributive"][article_name][gender_name][case_name] = { | |
| "form": attr_form, "example": full_form | |
| } | |
| attr_count += 1 | |
| except Exception as e: | |
| log(f" Failed attributive for {article_name}/{gender_name}/{case_name}: {e}") | |
| log(f" Generated {attr_count} attributive forms") | |
| if attr_count == 0: | |
| return None | |
| return analysis | |
| # --- Public API (Called by Gradio) --- | |
| def pattern_get_all_inflections(word: str) -> Dict[str, Any]: | |
| """ | |
| Generates ALL possible inflections for a German word. | |
| Analyzes the word as-is AND its lowercase version to catch | |
| ambiguities like "Lauf" (noun) vs "lauf" (verb). | |
| """ | |
| if not PATTERN_DE_AVAILABLE: | |
| return {"error": "`PatternLite` library not available."} | |
| if not word or not word.strip(): | |
| return {"info": "Please enter a word."} | |
| word = word.strip() | |
| word_lc = word.lower() | |
| log("="*70); log(f"ANALYZING: {word} (and {word_lc})"); log("="*70) | |
| # --- Analyze word as-is (e.g., "Lauf") --- | |
| detection_as_is = pattern_detect_word_type(word) | |
| analyses_as_is: Dict[str, Any] = {} | |
| try: | |
| log("\n--- Trying analysis for: " + word + " ---") | |
| noun_analysis_as_is = pattern_analyze_as_noun(word, detection_as_is['lemma']) | |
| if noun_analysis_as_is and pattern_is_good_analysis(noun_analysis_as_is, 'noun'): | |
| log("β Noun analysis is good") | |
| analyses_as_is["noun"] = noun_analysis_as_is | |
| verb_analysis_as_is = pattern_analyze_as_verb(word, detection_as_is['lemma']) | |
| if verb_analysis_as_is and pattern_is_good_analysis(verb_analysis_as_is, 'verb'): | |
| log("β Verb analysis is good") | |
| analyses_as_is["verb"] = verb_analysis_as_is | |
| adj_analysis_as_is = pattern_analyze_as_adjective(word, detection_as_is['lemma']) | |
| if adj_analysis_as_is and pattern_is_good_analysis(adj_analysis_as_is, 'adjective'): | |
| log("β Adjective analysis is good") | |
| analyses_as_is["adjective"] = adj_analysis_as_is | |
| except Exception as e: | |
| log(f"\nERROR during 'as-is' analysis: {e}") | |
| traceback.print_exc() | |
| return {"error": f"An unexpected error occurred during 'as-is' analysis: {str(e)}"} | |
| # --- Analyze lowercase version (e.g., "lauf") if different --- | |
| analyses_lc: Dict[str, Any] = {} | |
| if word != word_lc: | |
| detection_lc = pattern_detect_word_type(word_lc) | |
| try: | |
| log("\n--- Trying analysis for: " + word_lc + " ---") | |
| noun_analysis_lc = pattern_analyze_as_noun(word_lc, detection_lc['lemma']) | |
| if noun_analysis_lc and pattern_is_good_analysis(noun_analysis_lc, 'noun'): | |
| log("β Noun analysis (lc) is good") | |
| analyses_lc["noun"] = noun_analysis_lc | |
| verb_analysis_lc = pattern_analyze_as_verb(word_lc, detection_lc['lemma']) | |
| if verb_analysis_lc and pattern_is_good_analysis(verb_analysis_lc, 'verb'): | |
| log("β Verb analysis (lc) is good") | |
| analyses_lc["verb"] = verb_analysis_lc | |
| adj_analysis_lc = pattern_analyze_as_adjective(word_lc, detection_lc['lemma']) | |
| if adj_analysis_lc and pattern_is_good_analysis(adj_analysis_lc, 'adjective'): | |
| log("β Adjective analysis (lc) is good") | |
| analyses_lc["adjective"] = adj_analysis_lc | |
| except Exception as e: | |
| log(f"\nERROR during 'lowercase' analysis: {e}") | |
| traceback.print_exc() | |
| return {"error": f"An unexpected error occurred during 'lowercase' analysis: {str(e)}"} | |
| # --- Merge the results --- | |
| final_analyses = analyses_as_is.copy() | |
| for key, value in analyses_lc.items(): | |
| if key not in final_analyses: | |
| final_analyses[key] = value | |
| results: Dict[str, Any] = { | |
| "input_word": word, | |
| "analyses": final_analyses | |
| } | |
| if not results["analyses"]: | |
| results["info"] = "Word could not be analyzed as noun, verb, or adjective." | |
| log(f"\nFinal merged result: {len(results['analyses'])} analysis/analyses") | |
| return results | |
| def word_appears_in_inflections(word: str, inflections: Dict[str, Any], pos_type: str) -> bool: | |
| """ | |
| Check if the input word appears in the inflection forms AND | |
| cross-validate the POS with OdeNet to reject artifacts. | |
| """ | |
| import re | |
| word_lower = word.lower() | |
| word_cap = word.capitalize() | |
| # 1. Extract all actual inflection forms (not metadata) | |
| actual_forms = [] | |
| if pos_type == 'noun': | |
| declension = inflections.get('declension', {}) | |
| declension_by_gender = inflections.get('declension_by_gender', {}) | |
| for case_data in declension.values(): | |
| if isinstance(case_data, dict): actual_forms.append(case_data.get('bare', '')) | |
| for gender_data in declension_by_gender.values(): | |
| if isinstance(gender_data, dict): | |
| for case_data in gender_data.values(): | |
| if isinstance(case_data, dict): actual_forms.append(case_data.get('bare', '')) | |
| elif pos_type == 'verb': | |
| conjugation = inflections.get('conjugation', {}) | |
| for tense_data in conjugation.values(): | |
| if isinstance(tense_data, dict): actual_forms.extend(tense_data.values()) | |
| participles = inflections.get('participles', {}) | |
| actual_forms.extend(participles.values()) | |
| actual_forms.extend(inflections.get('lexeme', [])) | |
| actual_forms.append(inflections.get('infinitive', '')) | |
| elif pos_type == 'adjective': | |
| actual_forms.append(inflections.get('predicative', '')) | |
| actual_forms.append(inflections.get('comparative', '')) | |
| actual_forms.append(inflections.get('superlative', '')) | |
| attributive = inflections.get('attributive', {}) | |
| for article_data in attributive.values(): | |
| if isinstance(article_data, dict): | |
| for gender_data in article_data.values(): | |
| if isinstance(gender_data, dict): | |
| for case_data in gender_data.values(): | |
| if isinstance(case_data, dict): actual_forms.append(case_data.get('form', '')) | |
| # 2. Clean forms and check for match | |
| cleaned_forms = set() | |
| for form in actual_forms: | |
| if not form or form == 'β': continue | |
| # For simple forms (most verb forms, adjectives), use as-is | |
| # For complex forms (nouns with articles), extract words | |
| if ' ' in form or '[' in form: | |
| words = re.findall(r'\b[\wÀâüΓΓΓΓ]+\b', form) | |
| cleaned_forms.update(w.lower() for w in words) | |
| else: | |
| cleaned_forms.add(form.lower()) | |
| articles = {'der', 'die', 'das', 'den', 'dem', 'des', 'ein', 'eine', 'einen', 'einem', 'eines', 'einer'} | |
| cleaned_forms = {f for f in cleaned_forms if f not in articles} | |
| word_found_in_forms = False | |
| if pos_type == 'noun': | |
| # Nouns can be input as lowercase, but inflections are capitalized. | |
| # We check if the *lowercase* input word matches a *lowercase* form. | |
| if word_lower in cleaned_forms: | |
| word_found_in_forms = True | |
| else: | |
| # For verbs/adjectives, a lowercase match is sufficient | |
| if word_lower in cleaned_forms: | |
| word_found_in_forms = True | |
| if not word_found_in_forms: | |
| log(f" β Word '{word}' not found in any {pos_type} inflection forms.") | |
| return False | |
| log(f" β Word '{word}' was found in the {pos_type} inflection table.") | |
| # 3. Cross-validate POS with OdeNet to filter artifacts (e.g., 'heute' as 'heuen') | |
| if not WN_AVAILABLE: | |
| log(" β οΈ OdeNet (WN_AVAILABLE=False) is not available to validate POS. Accepting pattern.de's analysis.") | |
| return True | |
| try: | |
| if pos_type == 'noun': | |
| pos_lemma = inflections.get("base_form", word_lower) | |
| expected_pos_tag = 'n' | |
| elif pos_type == 'verb': | |
| pos_lemma = inflections.get("infinitive", word_lower) | |
| expected_pos_tag = 'v' | |
| elif pos_type == 'adjective': | |
| pos_lemma = inflections.get("predicative", word_lower) | |
| expected_pos_tag = 'a' | |
| else: | |
| log(f" ? Unknown pos_type '{pos_type}' for OdeNet check.") | |
| return True # Don't block unknown types | |
| log(f" Validating {pos_type} (lemma: '{pos_lemma}') with OdeNet (expecting pos='{expected_pos_tag}')...") | |
| odenet_result = odenet_get_thesaurus_info(pos_lemma) | |
| senses = odenet_result.get('senses', []) | |
| pos_senses = [s for s in senses if s.get('pos') == expected_pos_tag] | |
| # If no senses for lemma, check input word as fallback | |
| if not pos_senses and pos_lemma.lower() != word.lower(): | |
| log(f" No '{expected_pos_tag}' senses for lemma '{pos_lemma}'. Checking input word '{word}'...") | |
| odenet_result = odenet_get_thesaurus_info(word) | |
| senses = odenet_result.get('senses', []) | |
| pos_senses = [s for s in senses if s.get('pos') == expected_pos_tag] | |
| if not pos_senses: | |
| log(f" β REJECTED: OdeNet has no '{expected_pos_tag}' senses for '{pos_lemma}' or '{word}'. This is likely a pattern.de artifact.") | |
| return False | |
| else: | |
| log(f" β VERIFIED: OdeNet found {len(pos_senses)} '{expected_pos_tag}' sense(s).") | |
| return True | |
| except Exception as e: | |
| log(f" β οΈ OdeNet validation check failed with error: {e}") | |
| return True # Fail open: If OdeNet fails, trust pattern.de | |
| # ============================================================================ | |
| # 6b. CONCEPTNET HELPER LOGIC (V2 - ROBUST PARSER) | |
| # ============================================================================ | |
| def get_conceptnet_client() -> Optional[Client]: | |
| """ Thread-safe function to get a single instance of the Gradio Client. """ | |
| global CONCEPTNET_CLIENT | |
| if not GRADIO_CLIENT_AVAILABLE: | |
| return None | |
| if CONCEPTNET_CLIENT: | |
| return CONCEPTNET_CLIENT | |
| with CONCEPTNET_CLIENT_LOCK: | |
| if CONCEPTNET_CLIENT: | |
| return CONCEPTNET_CLIENT | |
| try: | |
| print("Initializing Gradio Client for ConceptNet...") | |
| client = Client("cstr/conceptnet_normalized") | |
| print("β Gradio Client for ConceptNet initialized.") | |
| CONCEPTNET_CLIENT = client | |
| return CONCEPTNET_CLIENT | |
| except Exception as e: | |
| print(f"β CRITICAL: Failed to initialize ConceptNet Gradio Client: {e}") | |
| traceback.print_exc() | |
| return None | |
| def conceptnet_get_relations(word: str, language: str = 'de') -> Dict[str, Any]: | |
| """ | |
| Fetches relations from the cstr/conceptnet_normalized Gradio API. | |
| This V2 version uses a robust regex parser to correctly handle the | |
| Markdown output and filter self-referential junk. | |
| """ | |
| if not GRADIO_CLIENT_AVAILABLE: | |
| return {"error": "`gradio_client` library is not installed. Install with: pip install gradio_client"} | |
| if not word or not word.strip(): | |
| return {"info": "No word provided."} | |
| word_lower = word.strip().lower() | |
| cache_key = (word_lower, language) | |
| # --- 1. Check Cache --- | |
| with CONCEPTNET_LOCK: | |
| if cache_key in CONCEPTNET_CACHE: | |
| log(f"ConceptNet: Found '{word_lower}' in cache.") | |
| return CONCEPTNET_CACHE[cache_key] | |
| log(f"ConceptNet: Fetching '{word_lower}' from Gradio API...") | |
| try: | |
| # --- 2. Call Gradio API --- | |
| client = get_conceptnet_client() # <-- USE HELPER | |
| if not client: | |
| return {"error": "ConceptNet Gradio Client is not available."} | |
| selected_relations = [ | |
| "RelatedTo", "IsA", "PartOf", "HasA", "UsedFor", | |
| "CapableOf", "AtLocation", "Synonym", "Antonym", | |
| "Causes", "HasProperty", "MadeOf", "HasSubevent", | |
| "DerivedFrom", "SimilarTo", "Desires", "CausesDesire" | |
| ] | |
| result_markdown = client.predict( | |
| word=word_lower, | |
| lang=language, | |
| selected_relations=selected_relations, | |
| api_name="/get_semantic_profile" | |
| ) | |
| # --- 3. Parse the Markdown Result (Robustly) --- | |
| relations_list = [] | |
| if not isinstance(result_markdown, str): | |
| raise TypeError(f"ConceptNet API returned type {type(result_markdown)}, expected str.") | |
| lines = result_markdown.split('\n') | |
| current_relation = None | |
| # Regex to capture: "- <NODE1> <RELATION> β <NODE2> `[WEIGHT]`" | |
| # Groups: (1: Node1) (2: Relation) (3: Node2) (4: Weight) | |
| line_pattern = None | |
| for line in lines: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| # Check for relation headers (e.g., "## IsA") | |
| if line.startswith('## '): | |
| current_relation = line[3:].strip() | |
| if current_relation: | |
| # Pre-compile the regex for this specific relation | |
| line_pattern = re.compile( | |
| r"-\s*(.+?)\s+(%s)\s+β\s+(.+?)\s+\`\[([\d.]+)\]\`" % re.escape(current_relation) | |
| ) | |
| continue | |
| # Parse relation entries | |
| if line.startswith('- ') and current_relation and line_pattern: | |
| match = line_pattern.search(line) | |
| if not match: | |
| log(f"ConceptNet Parser: No match for line '{line}' with relation '{current_relation}'") | |
| continue | |
| try: | |
| # Extract parts | |
| node1 = match.group(1).strip().strip('*') | |
| relation = match.group(2) # This is current_relation | |
| node2 = match.group(3).strip().strip('*') | |
| weight = float(match.group(4)) | |
| other_node = None | |
| direction = None | |
| # Determine direction and filter self-references | |
| if node1.lower() == word_lower and node2.lower() != word_lower: | |
| other_node = node2 | |
| direction = "->" | |
| elif node2.lower() == word_lower and node1.lower() != word_lower: | |
| other_node = node1 | |
| direction = "<-" | |
| else: | |
| # This filters "schnell Synonym β schnell" | |
| continue | |
| relations_list.append({ | |
| "relation": relation, | |
| "direction": direction, | |
| "other_node": other_node, | |
| "other_lang": language, # We assume the other node is also in the same lang | |
| "weight": weight, | |
| "surface": f"{node1} {relation} {node2}" | |
| }) | |
| except Exception as e: | |
| log(f"ConceptNet Parser: Error parsing line '{line}': {e}") | |
| continue | |
| # --- 4. Finalize and Cache Result --- | |
| if not relations_list: | |
| final_result = {"info": f"No valid (non-self-referential) relations found for '{word_lower}'."} | |
| else: | |
| # Sort by weight, descending | |
| relations_list.sort(key=lambda x: x.get('weight', 0.0), reverse=True) | |
| final_result = {"relations": relations_list} | |
| with CONCEPTNET_LOCK: | |
| CONCEPTNET_CACHE[cache_key] = final_result | |
| log(f"ConceptNet: Returning {len(relations_list)} relations for '{word_lower}'") | |
| return final_result | |
| except Exception as e: | |
| error_msg = f"ConceptNet Gradio API request failed: {type(e).__name__} - {e}" | |
| log(f"ConceptNet API error for '{word_lower}': {e}") | |
| traceback.print_exc() | |
| return {"error": error_msg, "traceback": traceback.format_exc()} | |
| # ============================================================================ | |
| # 6c. NEW: HANTA INITIALIZER & HELPERS | |
| # ============================================================================ | |
| def hanta_get_tagger() -> Optional[HanoverTagger]: | |
| """ Thread-safe function to get a single instance of the HanTa Tagger. """ | |
| global HANTA_TAGGER_INSTANCE | |
| if not HANTA_AVAILABLE: | |
| raise ImportError("HanTa library is not installed.") | |
| if HANTA_TAGGER_INSTANCE: | |
| return HANTA_TAGGER_INSTANCE | |
| with HANTA_TAGGER_LOCK: | |
| if HANTA_TAGGER_INSTANCE: | |
| return HANTA_TAGGER_INSTANCE | |
| try: | |
| print("Initializing HanTa Tagger (loading model)...") | |
| PACKAGE_DIR = os.path.dirname(HanTa.HanoverTagger.__file__) | |
| MODEL_PATH = os.path.join(PACKAGE_DIR, 'morphmodel_ger.pgz') | |
| if not os.path.exists(MODEL_PATH): | |
| print(f"CRITICAL: HanTa model file 'morphmodel_ger.pgz' not found at {MODEL_PATH}") | |
| raise FileNotFoundError("HanTa model file missing. Please ensure HanTa is correctly installed.") | |
| tagger = HanoverTagger(MODEL_PATH) | |
| _ = tagger.analyze("Test") # Warm-up call | |
| print("β HanTa Tagger initialized successfully.") | |
| HANTA_TAGGER_INSTANCE = tagger | |
| return HANTA_TAGGER_INSTANCE | |
| except Exception as e: | |
| print(f"CRITICAL ERROR: Failed to initialize HanTa Tagger: {e}") | |
| traceback.print_exc() | |
| return None | |
| def _get_odenet_senses_by_pos(word: str) -> Dict[str, List[Dict[str, Any]]]: | |
| """ | |
| (Helper) Fetches OdeNet senses for a word and groups them by POS. | |
| *** V18 FIX: OdeNet uses 'a' for BOTH Adjective and Adverb. *** | |
| """ | |
| senses_by_pos: Dict[str, List[Dict]] = { | |
| "noun": [], "verb": [], "adjective": [], "adverb": [] | |
| } | |
| if not WN_AVAILABLE: | |
| log(f"OdeNet check skipped for '{word}': WN_AVAILABLE=False") | |
| # If OdeNet is down, we can't validate, so we must return | |
| # non-empty lists to avoid incorrectly rejecting a POS. | |
| # This is a "fail-open" strategy. | |
| return {"noun": [{"info": "OdeNet unavailable"}], | |
| "verb": [{"info": "OdeNet unavailable"}], | |
| "adjective": [{"info": "OdeNet unavailable"}], | |
| "adverb": [{"info": "OdeNet unavailable"}]} | |
| try: | |
| all_senses = odenet_get_thesaurus_info(word).get("senses", []) | |
| for sense in all_senses: | |
| if "error" in sense: continue | |
| pos_tag = sense.get("pos") | |
| if pos_tag == 'n': | |
| senses_by_pos["noun"].append(sense) | |
| elif pos_tag == 'v': | |
| senses_by_pos["verb"].append(sense) | |
| # --- THIS IS THE CRITICAL FIX --- | |
| elif pos_tag == 'a': | |
| log(f"Found OdeNet 'a' tag (Adj/Adv) for sense: {sense.get('definition', '...')[:30]}") | |
| senses_by_pos["adjective"].append(sense) | |
| senses_by_pos["adverb"].append(sense) | |
| # --- END OF FIX --- | |
| except Exception as e: | |
| log(f"OdeNet helper check failed for '{word}': {e}") | |
| log(f"OdeNet senses for '{word}': " | |
| f"{len(senses_by_pos['noun'])}N, " | |
| f"{len(senses_by_pos['verb'])}V, " | |
| f"{len(senses_by_pos['adjective'])}Adj, " | |
| f"{len(senses_by_pos['adverb'])}Adv") | |
| return senses_by_pos | |
| def _hanta_get_candidates(word: str, hanta_tagger: "HanoverTagger") -> Set[str]: | |
| """ | |
| (Helper) Gets all possible HanTa STTS tags for a word, | |
| checking both lowercase and capitalized versions. | |
| """ | |
| all_tags = set() | |
| try: | |
| # Check lowercase (for verbs, adjs, advs) | |
| tags_lower = hanta_tagger.tag_word(word.lower(), cutoff=20) | |
| all_tags.update(tag[0] for tag in tags_lower) | |
| except Exception as e: | |
| log(f"HanTa tag_word (lower) failed for '{word}': {e}") | |
| try: | |
| # Check capitalized (for nouns) | |
| tags_upper = hanta_tagger.tag_word(word.capitalize(), cutoff=20) | |
| all_tags.update(tag[0] for tag in tags_upper) | |
| except Exception as e: | |
| log(f"HanTa tag_word (upper) failed for '{word}': {e}") | |
| log(f"HanTa candidates for '{word}': {all_tags}") | |
| return all_tags | |
| def _hanta_map_tags_to_pos(hanta_tags: Set[str]) -> Dict[str, Set[str]]: | |
| """ | |
| (Helper) Maps STTS tags to simplified POS groups and injects the | |
| ADJ(D) -> ADV heuristic. | |
| """ | |
| pos_groups = {"noun": set(), "verb": set(), "adjective": set(), "adverb": set()} | |
| has_adjd = False | |
| for tag in hanta_tags: | |
| # Nouns (NN), Proper Nouns (NE), Nominalized Inf. (NNI), Nom. Adj. (NNA) | |
| if tag.startswith("NN") or tag == "NE": | |
| pos_groups["noun"].add(tag) | |
| # Verbs (VV...), Auxiliaries (VA...), Modals (VM...) | |
| elif tag.startswith("VV") or tag.startswith("VA") or tag.startswith("VM"): | |
| pos_groups["verb"].add(tag) | |
| # Adjectives (Attributive ADJ(A), Predicative ADJ(D)) | |
| elif tag.startswith("ADJ"): | |
| pos_groups["adjective"].add(tag) | |
| if tag == "ADJ(D)": | |
| has_adjd = True | |
| # Adverbs | |
| elif tag == "ADV": | |
| pos_groups["adverb"].add(tag) | |
| # --- The Core Heuristic --- | |
| # If HanTa found a predicative adjective (ADJD), it can *also* be used | |
| # as an adverb (e..g, "er singt schΓΆn" [ADV] vs. "er ist schΓΆn" [ADJD]). | |
| if has_adjd: | |
| log("Injecting ADV possibility based on ADJ(D) tag.") | |
| pos_groups["adverb"].add("ADV (from ADJD)") | |
| # Filter out empty groups | |
| return {k: v for k, v in pos_groups.items() if v} | |
| def _hanta_get_lemma_for_pos(word: str, pos_group: str, hanta_tagger: "HanoverTagger") -> str: | |
| """ | |
| (Helper) Gets the correct lemma for a given word and POS group | |
| using case-sensitive analysis. | |
| """ | |
| lemma = "" | |
| try: | |
| if pos_group == "noun": | |
| # Nouns must be lemmatized from their capitalized form | |
| lemma = hanta_tagger.analyze(word.capitalize(), casesensitive=True)[0] | |
| elif pos_group == "verb": | |
| # Verbs must be lemmatized from their lowercase form | |
| lemma = hanta_tagger.analyze(word.lower(), casesensitive=True)[0] | |
| elif pos_group == "adjective": | |
| # Adjectives are lemmatized from their lowercase form | |
| lemma = hanta_tagger.analyze(word.lower(), casesensitive=True)[0] | |
| elif pos_group == "adverb": | |
| # Adverbs are also lemmatized from lowercase | |
| lemma = hanta_tagger.analyze(word.lower(), casesensitive=True)[0] | |
| except Exception as e: | |
| log(f"HanTa analyze failed for {word}/{pos_group}: {e}. Falling back.") | |
| # Fallback logic | |
| if not lemma: | |
| if pos_group == "noun": | |
| return word.capitalize() | |
| return word.lower() | |
| return lemma | |
| def _build_semantics(lemma: str, odenet_senses: List[Dict], top_n: int) -> Dict[str, Any]: | |
| """ | |
| (Helper) Builds the semantics block with OdeNet and ConceptNet. | |
| """ | |
| conceptnet_relations = [] | |
| if REQUESTS_AVAILABLE: | |
| try: | |
| conceptnet_result = conceptnet_get_relations(lemma, language='de') | |
| conceptnet_relations = conceptnet_result.get("relations", []) | |
| except Exception as e: | |
| conceptnet_relations = [{"error": str(e)}] | |
| if top_n > 0: | |
| odenet_senses = odenet_senses[:top_n] | |
| conceptnet_relations.sort(key=lambda x: x.get('weight', 0.0), reverse=True) | |
| conceptnet_relations = conceptnet_relations[:top_n] | |
| return { | |
| "lemma": lemma, | |
| "odenet_senses": odenet_senses, | |
| "conceptnet_relations": conceptnet_relations | |
| } | |
| # ============================================================================ | |
| # 6d. WIKTIONARY DATABASE LOGIC (PRIMARY ENGINE) | |
| # ============================================================================ | |
| def wiktionary_download_db() -> bool: | |
| """ | |
| Downloads the Wiktionary DB from Hugging Face Hub if it doesn't exist. | |
| """ | |
| global WIKTIONARY_AVAILABLE | |
| if os.path.exists(WIKTIONARY_DB_PATH): | |
| print(f"β Wiktionary DB '{WIKTIONARY_DB_PATH}' already exists.") | |
| WIKTIONARY_AVAILABLE = True | |
| return True | |
| print(f"Wiktionary DB not found. Downloading from '{WIKTIONARY_REPO_ID}'...") | |
| try: | |
| hf_hub_download( | |
| repo_id=WIKTIONARY_REPO_ID, | |
| filename=WIKTIONARY_DB_PATH, | |
| repo_type="dataset", | |
| local_dir=".", | |
| local_dir_use_symlinks=False | |
| ) | |
| print(f"β Wiktionary DB downloaded successfully.") | |
| WIKTIONARY_AVAILABLE = True | |
| return True | |
| except Exception as e: | |
| print(f"β CRITICAL: Failed to download Wiktionary DB: {e}") | |
| traceback.print_exc() | |
| return False | |
| def wiktionary_get_connection() -> Optional[sqlite3.Connection]: | |
| """ | |
| Thread-safe function to get a single, read-only SQLite connection. | |
| """ | |
| global WIKTIONARY_CONN, WIKTIONARY_AVAILABLE | |
| if not WIKTIONARY_AVAILABLE: | |
| log("Wiktionary DB is not available, cannot create connection.") | |
| return None | |
| if WIKTIONARY_CONN: | |
| return WIKTIONARY_CONN | |
| with WIKTIONARY_CONN_LOCK: | |
| if WIKTIONARY_CONN: | |
| return WIKTIONARY_CONN | |
| if not os.path.exists(WIKTIONARY_DB_PATH): | |
| log("Wiktionary DB file missing, connection failed.") | |
| WIKTIONARY_AVAILABLE = False | |
| return None | |
| try: | |
| log("Creating new read-only connection to Wiktionary DB...") | |
| # URI mode for read-only connection | |
| db_uri = f"file:{WIKTIONARY_DB_PATH}?mode=ro" | |
| conn = sqlite3.connect(db_uri, uri=True, check_same_thread=False) | |
| conn.row_factory = sqlite3.Row # Makes results dict-like | |
| # Test query | |
| _ = conn.execute("SELECT name FROM sqlite_master WHERE type='table' LIMIT 1").fetchone() | |
| print("β Wiktionary DB connection successful.") | |
| WIKTIONARY_CONN = conn | |
| return WIKTIONARY_CONN | |
| except Exception as e: | |
| print(f"β CRITICAL: Failed to connect to Wiktionary DB: {e}") | |
| traceback.print_exc() | |
| WIKTIONARY_AVAILABLE = False | |
| return None | |
| def _wiktionary_map_pos_key(wikt_pos: Optional[str]) -> str: | |
| """Maps Wiktionary POS tags to our internal keys.""" | |
| if not wikt_pos: | |
| return "unknown" | |
| if wikt_pos == "noun": return "noun" | |
| if wikt_pos == "verb": return "verb" | |
| if wikt_pos == "adj": return "adjective" | |
| if wikt_pos == "adv": return "adverb" | |
| return wikt_pos # E.g., "phrase", "abbrev" | |
| def _wiktionary_build_report_for_entry(entry_id: int, conn: sqlite3.Connection) -> Dict[str, Any]: | |
| """ | |
| Fetches all associated data for a single Wiktionary entry_id. | |
| """ | |
| report = {} | |
| # 1. Get Base Entry Info | |
| entry_data = conn.execute( | |
| "SELECT word, pos, pos_title, lang FROM entries WHERE id = ?", (entry_id,) | |
| ).fetchone() | |
| if not entry_data: | |
| return {"error": "Entry ID not found"} | |
| report.update(dict(entry_data)) | |
| report["entry_id"] = entry_id | |
| report["lemma"] = entry_data["word"] # Alias for clarity | |
| # 2. Get Senses (Definitions) | |
| senses_q = conn.execute( | |
| """ | |
| SELECT s.id as sense_id, g.gloss_text | |
| FROM senses s | |
| JOIN glosses g ON s.id = g.sense_id | |
| WHERE s.entry_id = ? | |
| ORDER BY s.id, g.id | |
| """, (entry_id,) | |
| ).fetchall() | |
| report["senses"] = [dict(s) for s in senses_q] | |
| # 3. Get Inflected Forms | |
| forms_q = conn.execute( | |
| """ | |
| SELECT f.form_text, GROUP_CONCAT(t.tag, ', ') as tags | |
| FROM forms f | |
| LEFT JOIN form_tags ft ON f.id = ft.form_id | |
| LEFT JOIN tags t ON ft.tag_id = t.id | |
| WHERE f.entry_id = ? | |
| GROUP BY f.id | |
| ORDER BY f.id | |
| """, (entry_id,) | |
| ).fetchall() | |
| report["forms"] = [dict(f) for f in forms_q] | |
| # 4. Get Pronunciations | |
| sounds_q = conn.execute( | |
| "SELECT ipa, audio FROM sounds WHERE entry_id = ?", (entry_id,) | |
| ).fetchall() | |
| report["sounds"] = [dict(s) for s in sounds_q] | |
| # 5. Get Synonyms | |
| syn_q = conn.execute( | |
| "SELECT synonym_word FROM synonyms WHERE entry_id = ?", (entry_id,) | |
| ).fetchall() | |
| report["synonyms"] = [s["synonym_word"] for s in syn_q] | |
| # 6. Get Antonyms | |
| ant_q = conn.execute( | |
| "SELECT antonym_word FROM antonyms WHERE entry_id = ?", (entry_id,) | |
| ).fetchall() | |
| report["antonyms"] = [a["antonym_word"] for a in ant_q] | |
| # 7. Get Examples (Limit 5 for brevity) | |
| ex_q = conn.execute( | |
| """ | |
| SELECT ex.text | |
| FROM examples ex | |
| JOIN senses s ON ex.sense_id = s.id | |
| WHERE s.entry_id = ? | |
| LIMIT 5 | |
| """, (entry_id,) | |
| ).fetchall() | |
| report["examples"] = [ex["text"] for ex in ex_q] | |
| return report | |
| def _wiktionary_find_all_entries(word: str, conn: sqlite3.Connection) -> List[Dict[str, Any]]: | |
| """ | |
| Finds all entries related to a word, checking both lemmas and | |
| NON-VARIANT inflected forms. | |
| Returns a list of full entry reports. | |
| """ | |
| log(f"Wiktionary: Querying for '{word}'...") | |
| found_entry_ids: Set[int] = set() | |
| # 1. Check if the word is a lemma (base form) | |
| # e.g., input "Haus" finds "Haus (Substantiv)" | |
| # e.g., input "gehe" finds "gehe (Konjugierte Form)" | |
| lemma_q = conn.execute( | |
| "SELECT id FROM entries WHERE word = ? AND lang = 'Deutsch'", (word,) | |
| ).fetchall() | |
| for row in lemma_q: | |
| found_entry_ids.add(row["id"]) | |
| # 2. Check if the word is a true inflected form, but NOT a "variant" | |
| # e.g., input "gehe" finds "gehen (Verb)" | |
| # e.g., input "Haus" finds "Hau (Substantiv)" | |
| # This WILL NOT find "HΓ€usle" from "Haus" anymore. | |
| form_q = conn.execute( | |
| """ | |
| SELECT DISTINCT e.id | |
| FROM forms f | |
| JOIN entries e ON f.entry_id = e.id | |
| WHERE f.form_text = ? AND e.lang = 'Deutsch' | |
| AND f.id NOT IN ( | |
| -- Exclude all form_ids that are tagged as 'variant' | |
| SELECT ft.form_id | |
| FROM form_tags ft | |
| JOIN tags t ON ft.tag_id = t.id | |
| WHERE t.tag = 'variant' | |
| ) | |
| """, (word,) | |
| ).fetchall() | |
| for row in form_q: | |
| found_entry_ids.add(row["id"]) | |
| log(f"Wiktionary: Found {len(found_entry_ids)} unique matching entries.") | |
| # 3. Build a full report for each unique entry | |
| all_reports = [] | |
| for entry_id in found_entry_ids: | |
| try: | |
| report = _wiktionary_build_report_for_entry(entry_id, conn) | |
| all_reports.append(report) | |
| except Exception as e: | |
| log(f"Wiktionary: Failed to build report for entry {entry_id}: {e}") | |
| return all_reports | |
| def _wiktionary_format_semantics_block( | |
| wikt_report: Dict[str, Any], | |
| pattern_block: Dict[str, Any], | |
| top_n: int | |
| ) -> Dict[str, Any]: | |
| """ | |
| Combines Wiktionary senses with OdeNet/ConceptNet senses, | |
| using the CORRECT lemma from the pattern.de analysis block. | |
| """ | |
| # --- THIS IS THE FIX --- | |
| # Determine the true lemma from the pattern.de block, as it's more reliable | |
| # for semantic lookup than the wiktionary lemma (which could be an inflected form). | |
| pos_key = _wiktionary_map_pos_key(wikt_report.get("pos")) | |
| semantic_lemma = "" | |
| if pos_key == "verb": | |
| semantic_lemma = pattern_block.get("infinitive") | |
| elif pos_key == "noun": | |
| semantic_lemma = pattern_block.get("base_form") | |
| elif pos_key == "adjective": | |
| semantic_lemma = pattern_block.get("predicative") | |
| # Fallback if pattern.de fails or it's a non-inflecting POS | |
| if not semantic_lemma: | |
| semantic_lemma = wikt_report.get("lemma", "") | |
| log(f"[DEBUG] Wiktionary Semantics: Building block for lemma='{semantic_lemma}', pos='{pos_key}'") | |
| # --- END OF FIX --- | |
| # 1. Get Wiktionary senses (from the original report) | |
| wiktionary_senses = [] | |
| for sense in wikt_report.get("senses", []): | |
| wiktionary_senses.append({ | |
| "definition": sense.get("gloss_text"), | |
| "source": "wiktionary" | |
| }) | |
| # 2. Get OdeNet senses for the *semantic_lemma* | |
| odenet_senses = [] | |
| if WN_AVAILABLE: | |
| try: | |
| senses_by_pos = _get_odenet_senses_by_pos(semantic_lemma) | |
| odenet_senses_raw = senses_by_pos.get(pos_key, []) | |
| # Filter out placeholder | |
| if odenet_senses_raw and "info" not in odenet_senses_raw[0]: | |
| odenet_senses = odenet_senses_raw | |
| except Exception as e: | |
| log(f"[DEBUG] OdeNet lookup failed for {semantic_lemma} ({pos_key}): {e}") | |
| # 3. Get ConceptNet relations for the *semantic_lemma* | |
| conceptnet_relations = [] | |
| if REQUESTS_AVAILABLE: | |
| try: | |
| conceptnet_result = conceptnet_get_relations(semantic_lemma, language='de') | |
| conceptnet_relations = conceptnet_result.get("relations", []) | |
| except Exception as e: | |
| conceptnet_relations = [{"error": str(e)}] | |
| # 4. Apply top_n limit | |
| if top_n > 0: | |
| wiktionary_senses = wiktionary_senses[:top_n] | |
| odenet_senses = odenet_senses[:top_n] | |
| conceptnet_relations.sort(key=lambda x: x.get('weight', 0.0), reverse=True) | |
| conceptnet_relations = conceptnet_relations[:top_n] | |
| return { | |
| "lemma": semantic_lemma, # Return the *correct* lemma for this path | |
| "wiktionary_senses": wiktionary_senses, | |
| "odenet_senses": odenet_senses, | |
| "conceptnet_relations": conceptnet_relations, | |
| "wiktionary_synonyms": wikt_report.get("synonyms", []), | |
| "wiktionary_antonyms": wikt_report.get("antonyms", []) | |
| } | |
| def _analyze_word_with_wiktionary(word: str, top_n: int) -> Dict[str, Any]: | |
| """ | |
| (PRIMARY ENGINE) Analyzes a word using the Wiktionary DB. | |
| Returns {} on failure to signal dispatcher to fall back. | |
| """ | |
| final_result: Dict[str, Any] = { | |
| "input_word": word, | |
| "analysis": {} | |
| } | |
| conn = wiktionary_get_connection() | |
| if not conn: | |
| return {} # Return empty dict to signal failure | |
| # --- 1. GET SPACY/IWNLP HINT FOR PRIORITIZATION --- | |
| spacy_pos_hint = None | |
| spacy_lemma_hint = None | |
| if IWNLP_AVAILABLE: | |
| try: | |
| iwnlp = iwnlp_get_pipeline() | |
| if iwnlp: | |
| doc = iwnlp(word) | |
| token = doc[0] | |
| # Map spaCy POS to our internal keys | |
| spacy_pos_raw = token.pos_.lower() | |
| if spacy_pos_raw == "adj": spacy_pos_hint = "adjective" | |
| elif spacy_pos_raw == "adv": spacy_pos_hint = "adverb" | |
| elif spacy_pos_raw == "verb": spacy_pos_hint = "verb" | |
| elif spacy_pos_raw == "noun": spacy_pos_hint = "noun" | |
| else: spacy_pos_hint = spacy_pos_raw | |
| spacy_lemma_hint = token.lemma_ | |
| log(f"[DEBUG] Wiktionary Priority Hint: spaCy POS is '{spacy_pos_hint}', lemma is '{spacy_lemma_hint}'") | |
| except Exception as e: | |
| log(f"[DEBUG] Wiktionary Priority Hint: spaCy/IWNLP failed: {e}") | |
| # --- 2. FIND ALL WIKTIONARY ENTRIES --- | |
| try: | |
| wiktionary_reports = _wiktionary_find_all_entries(word, conn) | |
| except Exception as e: | |
| log(f"[DEBUG] Wiktionary query failed: {e}") | |
| return {} # Signal failure | |
| if not wiktionary_reports: | |
| return {} # No results, signal to fallback | |
| # --- 3. PRIORITIZE/SORT THE WIKTIONARY ENTRIES --- | |
| def get_priority_score(report): | |
| wikt_pos = _wiktionary_map_pos_key(report.get("pos")) | |
| wikt_lemma = report.get("lemma") | |
| # Priority 1: Exact POS match with spaCy hint | |
| if spacy_pos_hint and wikt_pos == spacy_pos_hint: | |
| # Bonus if lemma also matches | |
| if spacy_lemma_hint and wikt_lemma == spacy_lemma_hint: | |
| return 1 | |
| return 2 | |
| # Priority 2: Input word is the lemma (e.g., "Haus" -> "Haus") | |
| if wikt_lemma.lower() == word.lower(): | |
| return 3 | |
| # Priority 3: Other inflected forms (e.g. "gehe" -> "gehen") | |
| return 4 | |
| wiktionary_reports.sort(key=get_priority_score) | |
| log(f"[DEBUG] Wiktionary: Sorted entries: {[r.get('lemma') + ' (' + r.get('pos') + ')' for r in wiktionary_reports]}") | |
| # --- 4. BUILD AND *VALIDATE* THE FINAL REPORT (PATH-PURE) --- | |
| word_lower = word.lower() | |
| for wikt_report in wiktionary_reports: | |
| pos_key = _wiktionary_map_pos_key(wikt_report.get("pos")) | |
| lemma = wikt_report.get("lemma", word) | |
| pos_title = wikt_report.get("pos_title", "") | |
| # --- A. Build Wiktionary Inflection Block --- | |
| inflections_wikt_block = { | |
| "base_form": lemma, | |
| "forms_list": wikt_report.get("forms", []), | |
| "source": "wiktionary" | |
| } | |
| # --- B. Build Pattern Inflection Block (CRITICAL for finding true lemma) --- | |
| pattern_block = {} | |
| if PATTERN_DE_AVAILABLE: | |
| try: | |
| if pos_key == "noun" or "Substantiv" in pos_title: | |
| pattern_block = pattern_analyze_as_noun(lemma) | |
| elif pos_key == "verb" or "Verb" in pos_title or "Konjugierte Form" in pos_title: | |
| # Use the *input word* for inflected forms to find the right lemma | |
| if "Konjugierte Form" in pos_title: | |
| pattern_block = pattern_analyze_as_verb(word) | |
| else: | |
| pattern_block = pattern_analyze_as_verb(lemma) | |
| elif pos_key == "adjective" or "Adjektiv" in pos_title or "Deklinierte Form" in pos_title: | |
| # Use the *input word* for inflected forms | |
| if "Deklinierte Form" in pos_title: | |
| pattern_block = pattern_analyze_as_adjective(word) | |
| else: | |
| pattern_block = pattern_analyze_as_adjective(lemma) | |
| elif pos_key == "adverb": | |
| pattern_block = {"base_form": lemma, "info": "Adverbs are non-inflecting."} | |
| except Exception as e: | |
| pattern_block = {"error": f"Pattern.de analysis for {pos_key}('{lemma}') failed: {e}"} | |
| # --- C. Build Semantics Block (using correct lemma from pattern_block) --- | |
| semantics_block = _wiktionary_format_semantics_block(wikt_report, pattern_block, top_n) | |
| # --- D. Assemble the report (pre-validation) --- | |
| pos_entry_report = { | |
| "inflections_wiktionary": inflections_wikt_block, | |
| "inflections_pattern": pattern_block, | |
| "semantics_combined": semantics_block, | |
| "wiktionary_metadata": { | |
| "pos_title": pos_title, | |
| "pronunciation": wikt_report.get("sounds"), | |
| "examples": wikt_report.get("examples") | |
| } | |
| } | |
| # --- E. VALIDATION FILTER --- | |
| is_valid = False | |
| is_inflected_entry = "Konjugierte Form" in pos_title or "Deklinierte Form" in pos_title | |
| # Check 1: Is the input word the lemma OF A BASE FORM entry? | |
| if not is_inflected_entry and lemma.lower() == word_lower: | |
| is_valid = True | |
| log(f"[DEBUG] Wiktionary: KEEPING entry '{lemma}' ({pos_key}) because input word matches lemma of a base entry.") | |
| # Check 2: Is the input word in the *bare* forms list? | |
| # (This is the only check that should apply to inflected entries) | |
| if not is_valid: | |
| for form_entry in inflections_wikt_block.get("forms_list", []): | |
| form_text = form_entry.get("form_text", "") | |
| bare_form = re.sub(r"\(.*\)", "", form_text).strip() | |
| bare_form = re.sub(r"^(der|die|das|ein|eine|am)\s+", "", bare_form, flags=re.IGNORECASE).strip() | |
| bare_form = bare_form.rstrip("!.") | |
| if bare_form.lower() == word_lower: | |
| is_valid = True | |
| log(f"[DEBUG] Wiktionary: KEEPING entry '{lemma}' ({pos_key}) because input word found in form: '{form_text}'") | |
| break | |
| # --- F. Add to final result if valid --- | |
| if is_valid: | |
| if pos_key not in final_result["analysis"]: | |
| final_result["analysis"][pos_key] = [] | |
| final_result["analysis"][pos_key].append(pos_entry_report) | |
| else: | |
| log(f"[DEBUG] Wiktionary: DROPPING entry '{lemma}' ({pos_key}, {pos_title}) because input word '{word}' was not found in its valid forms.") | |
| # --- END OF VALIDATION --- | |
| final_result["info"] = f"Analysis from Wiktionary (Primary Engine). Found {len(wiktionary_reports)} matching entries, kept {sum(len(v) for v in final_result.get('analysis', {}).values())}." | |
| return final_result | |
| # ============================================================================ | |
| # 6e. SHARED SEMANTIC HELPER | |
| # ============================================================================ | |
| def _build_semantics_block_for_lemma(lemma: str, pos_key: str, top_n: int) -> Dict[str, Any]: | |
| """ | |
| (REUSABLE HELPER) | |
| Fetches OdeNet and ConceptNet data for a given lemma and POS. | |
| """ | |
| log(f"[DEBUG] Building semantics for lemma='{lemma}', pos='{pos_key}'") | |
| # 1. Get OdeNet senses for this lemma + POS | |
| odenet_senses = [] | |
| if WN_AVAILABLE: | |
| try: | |
| senses_by_pos = _get_odenet_senses_by_pos(lemma) | |
| odenet_senses_raw = senses_by_pos.get(pos_key, []) | |
| # Filter out placeholder | |
| if odenet_senses_raw and "info" not in odenet_senses_raw[0]: | |
| odenet_senses = odenet_senses_raw | |
| except Exception as e: | |
| log(f"[DEBUG] OdeNet lookup failed for {lemma} ({pos_key}): {e}") | |
| # 2. Get ConceptNet relations for this lemma | |
| conceptnet_relations = [] | |
| if REQUESTS_AVAILABLE: | |
| try: | |
| conceptnet_result = conceptnet_get_relations(lemma, language='de') | |
| conceptnet_relations = conceptnet_result.get("relations", []) | |
| except Exception as e: | |
| conceptnet_relations = [{"error": str(e)}] | |
| # 3. Apply top_n limit | |
| if top_n > 0: | |
| odenet_senses = odenet_senses[:top_n] | |
| conceptnet_relations.sort(key=lambda x: x.get('weight', 0.0), reverse=True) | |
| conceptnet_relations = conceptnet_relations[:top_n] | |
| return { | |
| "lemma": lemma, | |
| "wiktionary_senses": [], # This block is for non-Wiktionary engines | |
| "odenet_senses": odenet_senses, | |
| "conceptnet_relations": conceptnet_relations, | |
| "wiktionary_synonyms": [], | |
| "wiktionary_antonyms": [] | |
| } | |
| # ============================================================================ | |
| # 6f. DWDSMOR ENGINE (NEW FALLBACK 1) | |
| # ============================================================================ | |
| def dwdsmor_get_lemmatizer() -> Optional[Any]: # Return type is 'sfst.Transducer' | |
| """ | |
| Thread-safe function to get a single instance of the DWDSmor analyzer. | |
| It will automatically download/cache the 'open' automata from Hugging Face Hub. | |
| """ | |
| global DWDSMOR_LEMMATIZER | |
| if not DWDSMOR_AVAILABLE: | |
| raise ImportError("dwdsmor library is not installed.") | |
| if DWDSMOR_LEMMATIZER: | |
| return DWDSMOR_LEMMATIZER | |
| with DWDSMOR_LEMMATIZER_LOCK: | |
| if DWDSMOR_LEMMATIZER: | |
| return DWDSMOR_LEMMATIZER | |
| try: | |
| print("Initializing DWDSmor lemmatizer (loading automata)...") | |
| # --- THIS IS THE FIX --- | |
| # Use the correct API from dwdsmor's own tools (analysis.py) | |
| # This will find and download the HF repo automatically | |
| from dwdsmor import automaton | |
| automata = automaton.automata() | |
| analyzer = automata.analyzer("lemma") # Use the 'lemma' automaton | |
| # --- END OF FIX --- | |
| # Force the traversal to actually run by converting to a list. | |
| print("[DEBUG] DWDSmor: Running warm-up call...") | |
| _ = list(analyzer.analyze("Test", join_tags=True)) | |
| print("β DWDSmor lemmatizer initialized successfully.") | |
| DWDSMOR_LEMMATIZER = analyzer | |
| return DWDSMOR_LEMMATIZER | |
| except Exception as e: | |
| print(f"β CRITICAL: Failed to initialize DWDSmor: {e}") | |
| traceback.print_exc() | |
| return None | |
| def _dwdsmor_map_pos_key(dwdsmor_pos: str) -> str: | |
| """Maps DWDSmor POS tags to our internal keys.""" | |
| if dwdsmor_pos == "V": return "verb" | |
| if dwdsmor_pos == "NN": return "noun" | |
| if dwdsmor_pos == "NPROP": return "noun" # Proper Noun | |
| if dwdsmor_pos == "ADJ": return "adjective" | |
| if dwdsmor_pos == "ADV": return "adverb" | |
| return dwdsmor_pos.lower() # Fallback for others | |
| def _analyze_word_with_dwdsmor(word: str, top_n: int) -> Dict[str, Any]: | |
| """ | |
| (FALLBACK ENGINE 1) Analyzes a single word using DWDSmor + Pattern + Semantics. | |
| Returns {} on failure. | |
| """ | |
| if not DWDSMOR_AVAILABLE: | |
| return {} # Signal failure | |
| print(f"\n[Word Encyclopedia] Running V21 (DWDSmor) engine for: \"{word}\"") | |
| final_result: Dict[str, Any] = { | |
| "input_word": word, | |
| "analysis": {} | |
| } | |
| try: | |
| analyzer = dwdsmor_get_lemmatizer() | |
| if not analyzer: | |
| raise Exception("DWDSmor lemmatizer failed to initialize.") | |
| analyses = list(analyzer.analyze(word, join_tags=True)) | |
| if not analyses: | |
| return {} # No results | |
| log(f"[DEBUG] DWDSmor: Found {len(analyses)} potential analyses.") | |
| processed_lemmas_pos: Set[Tuple[str, str]] = set() | |
| for analysis in analyses: | |
| # --- THIS IS THE FIX --- | |
| # The 'Traversal' object from analyzer.analyze() uses: | |
| # .analysis -> for the lemma string (e.g., "Haus") | |
| # .pos -> for the POS tag (e.g., "NN") | |
| # .spec -> for the full analysis string | |
| if not analysis.analysis or not analysis.pos: | |
| continue | |
| lemma = analysis.analysis # Use .analysis, not .lemma | |
| pos_key = _dwdsmor_map_pos_key(analysis.pos) | |
| # --- END OF FIX --- | |
| if (lemma, pos_key) in processed_lemmas_pos: | |
| continue | |
| processed_lemmas_pos.add((lemma, pos_key)) | |
| log(f"--- Analyzing DWDSmor path: lemma='{lemma}', pos='{pos_key}' ---") | |
| # --- 1. Get Inflections (Pattern) --- | |
| pattern_block = {} | |
| if PATTERN_DE_AVAILABLE: | |
| try: | |
| if pos_key == "noun": | |
| pattern_block = pattern_analyze_as_noun(lemma) | |
| elif pos_key == "verb": | |
| pattern_block = pattern_analyze_as_verb(lemma) | |
| elif pos_key == "adjective": | |
| pattern_block = pattern_analyze_as_adjective(lemma) | |
| elif pos_key == "adverb": | |
| pattern_block = {"base_form": lemma, "info": "Adverbs are non-inflecting."} | |
| except Exception as e: | |
| pattern_block = {"error": f"Pattern.de analysis for {pos_key}('{lemma}') failed: {e}"} | |
| # --- 2. Build Semantics Block --- | |
| semantics_block = _build_semantics_block_for_lemma(lemma, pos_key, top_n) | |
| # --- 3. Build Final Report Block --- | |
| pos_entry_report = { | |
| "dwdsmor_analysis": { | |
| "lemma": lemma, | |
| "pos": analysis.pos, | |
| "analysis_string": analysis.spec, # .spec is the full string | |
| "source": "dwdsmor" | |
| }, | |
| "inflections_pattern": pattern_block, | |
| "semantics_combined": semantics_block | |
| } | |
| if pos_key not in final_result["analysis"]: | |
| final_result["analysis"][pos_key] = [] | |
| final_result["analysis"][pos_key].append(pos_entry_report) | |
| if not final_result["analysis"]: | |
| return {} # No valid paths found | |
| final_result["info"] = "Analysis performed by DWDSmor-led engine." | |
| return final_result | |
| except Exception as e: | |
| print(f"[Word Encyclopedia] DWDSmor Engine FAILED: {e}") | |
| traceback.print_exc() | |
| return {} # Signal failure | |
| # ============================================================================ | |
| # 7. CONSOLIDATED ANALYZER LOGIC | |
| # ============================================================================ | |
| # --- 7a. Comprehensive (Contextual) Analyzer --- | |
| def comprehensive_german_analysis(text: str, top_n_value: Optional[float] = 0) -> Dict[str, Any]: | |
| """ | |
| (CONTEXTUAL) Combines NLP tools for a deep analysis of German text. | |
| Reads the list-based, multi-engine output | |
| from `analyze_word_encyclopedia` and combines all senses for ranking. | |
| """ | |
| try: | |
| if not text or not text.strip(): | |
| return {"info": "Please enter text to analyze."} | |
| top_n = int(top_n_value) if top_n_value is not None else 0 | |
| print(f"\n[Comprehensive Analysis] Starting analysis for: \"{text}\" (top_n={top_n})") | |
| results: Dict[str, Any] = {"input_text": text} | |
| nlp_de = None | |
| context_doc = None | |
| # --- 1. LanguageTool Grammar Check --- | |
| print("[Comprehensive Analysis] Running LanguageTool...") | |
| if LT_AVAILABLE: | |
| try: | |
| results["grammar_check"] = lt_check_grammar(text) | |
| except Exception as e: | |
| results["grammar_check"] = {"error": f"LanguageTool failed: {e}"} | |
| else: | |
| results["grammar_check"] = {"error": "LanguageTool not available."} | |
| # --- 2. spaCy Morpho-Syntactic Backbone --- | |
| print("[Comprehensive Analysis] Running spaCy...") | |
| spacy_json_output = [] | |
| try: | |
| _, spacy_json, _, _, _ = spacy_get_analysis("en", "de", text) | |
| if isinstance(spacy_json, list): | |
| spacy_json_output = spacy_json | |
| results["spacy_analysis"] = spacy_json_output | |
| nlp_de = SPACY_MODELS.get("de") | |
| if nlp_de: | |
| context_doc = nlp_de(text) | |
| if not context_doc.has_vector or context_doc.vector_norm == 0: | |
| print("[Comprehensive Analysis] WARNING: Context sentence has no vector.") | |
| context_doc = None | |
| else: | |
| results["spacy_analysis"] = spacy_json | |
| except Exception as e: | |
| results["spacy_analysis"] = {"error": f"spaCy analysis failed: {e}"} | |
| # --- 2b. Heuristic SVA check --- | |
| try: | |
| if isinstance(results.get("grammar_check"), list) and any(d.get("status") == "perfect" for d in results["grammar_check"]): | |
| subj_num = None | |
| verb_num = None | |
| verb_token = None | |
| subj_token = None | |
| for tok in spacy_json_output: | |
| if tok.get("dependency") in {"sb", "nsubj"}: | |
| m = tok.get("morphology","") | |
| if "Number=Sing" in m: | |
| subj_num = "Sing" | |
| subj_token = tok | |
| spacy_pos_up = (tok.get("pos") or "").upper() | |
| if (spacy_pos_up in {"VERB", "AUX"}) and ("VerbForm=Fin" in tok.get("morphology","")): | |
| verb_token = tok | |
| m = tok.get("morphology","") | |
| if "Number=Plur" in m: | |
| verb_num = "Plur" | |
| if subj_num == "Sing" and verb_num == "Plur": | |
| corrected_sentence_sg = None | |
| corrected_sentence_pl = None | |
| replacements = [] | |
| v_lemma = verb_token.get("lemma") if verb_token else None | |
| v_word = verb_token.get("word") if verb_token else None | |
| v_3sg = _conjugate_to_person_number(v_lemma, "3", "sg") if v_lemma else None | |
| if v_3sg and v_word: | |
| corrected_sentence_sg = text.replace(v_word, v_3sg, 1) | |
| replacements.append(corrected_sentence_sg) | |
| subj_word = subj_token.get("word") if subj_token else None | |
| subj_pl = None | |
| if subj_word and PATTERN_DE_AVAILABLE: | |
| try: subj_pl = pluralize(subj_word) | |
| except Exception: subj_pl = None | |
| if subj_word and subj_pl and subj_pl != subj_word: | |
| corrected_sentence_pl = text.replace(subj_word, subj_pl, 1) | |
| replacements.append(corrected_sentence_pl) | |
| sva = { | |
| "message": "MΓΆglicher Kongruenzfehler: Singular-Subjekt mit pluralischer Verbform.", | |
| "rule_id": "HEURISTIC_SUBJ_VERB_AGREEMENT", | |
| "category": "Grammar", | |
| "incorrect_text": f"{verb_token.get('word')}" if verb_token else "", | |
| "replacements": replacements, "offset": None, "length": None, | |
| "context": None, "short_message": "SubjektβVerb-Kongruenz" | |
| } | |
| results["grammar_check"] = [sva] | |
| except Exception as e: | |
| print(f"SVA Heuristic failed: {e}") | |
| pass | |
| # --- 3. Lemma-by-Lemma Deep Dive (V19 LOGIC) --- | |
| print("[Comprehensive Analysis] Running Lemma Deep Dive...") | |
| FUNCTION_POS = {"DET","ADP","AUX","PUNCT","SCONJ","CCONJ","PART","PRON","NUM","SYM","X", "SPACE"} | |
| lemma_deep_dive: Dict[str, Any] = {} | |
| processed_lemmas: Set[str] = set() | |
| if not spacy_json_output: | |
| print("[Comprehensive Analysis] No spaCy tokens to analyze. Skipping deep dive.") | |
| else: | |
| for token in spacy_json_output: | |
| lemma = token.get("lemma") | |
| pos = (token.get("pos") or "").upper() | |
| if not lemma or lemma == "--" or pos in FUNCTION_POS or lemma in processed_lemmas: | |
| continue | |
| processed_lemmas.add(lemma) | |
| print(f"[Deep Dive] Analyzing lemma: '{lemma}' (from token '{token.get('word')}')") | |
| # --- 3a. Get Validated Grammatical & Semantic Analysis --- | |
| # We call our new, multi-engine dispatcher. | |
| lemma_report: Dict[str, Any] = {} | |
| inflection_analysis = {} | |
| semantic_analysis = {} | |
| try: | |
| # We pass top_n=0 to get ALL semantic possibilities for ranking | |
| encyclopedia_data = analyze_word_encyclopedia(lemma, 0) | |
| # The "analysis" key contains {"noun": [ ... ], "verb": [ ... ], ...} | |
| word_analysis = encyclopedia_data.get("analysis", {}) | |
| # *** THIS IS THE KEY CHANGE *** | |
| # Iterate over the POS keys and the *list* of entries for each | |
| for pos_key, entry_list in word_analysis.items(): | |
| if not entry_list: | |
| continue | |
| # For context, we only rank the *first* (most likely) entry | |
| # provided by the encyclopedia for that POS. | |
| data = entry_list[0] | |
| # Store all inflection blocks | |
| inflection_analysis[f"{pos_key}_wiktionary"] = data.get("inflections_wiktionary") | |
| inflection_analysis[f"{pos_key}_pattern"] = data.get("inflections_pattern") | |
| # --- Combine ALL senses (Wiktionary, OdeNet) for ranking --- | |
| all_senses_for_pos = [] | |
| semantics_block = data.get("semantics_combined", {}) | |
| # Add Wiktionary senses | |
| wikt_senses = semantics_block.get("wiktionary_senses", []) | |
| for s in wikt_senses: | |
| s["source"] = "wiktionary" | |
| all_senses_for_pos.append(s) | |
| # Add OdeNet senses | |
| odenet_senses = semantics_block.get("odenet_senses", []) | |
| for s in odenet_senses: | |
| s["source"] = "odenet" | |
| all_senses_for_pos.append(s) | |
| semantic_analysis[f"{pos_key}_senses"] = all_senses_for_pos | |
| # Add ConceptNet relations (store separately, as they are not "senses") | |
| if "conceptnet_relations" not in semantic_analysis: | |
| semantic_analysis["conceptnet_relations"] = [] | |
| semantic_analysis["conceptnet_relations"].extend( | |
| semantics_block.get("conceptnet_relations", []) | |
| ) | |
| lemma_report["inflection_analysis"] = inflection_analysis | |
| except Exception as e: | |
| lemma_report["inflection_analysis"] = {"error": f"V19 Analyzer failed: {e}", "traceback": traceback.format_exc()} | |
| # --- 3b. Contextual Re-ranking (Unchanged) --- | |
| # re-rank the semantic data we gathered in step 3a. | |
| # OdeNet Senses (now combined with Wiktionary senses) | |
| for key in semantic_analysis: | |
| if key.endswith("_senses") and nlp_de: | |
| ranked_senses = [] | |
| for sense in semantic_analysis[key]: | |
| if "error" in sense: continue | |
| definition = sense.get("definition", "") | |
| relevance = 0.0 | |
| if definition and context_doc: | |
| try: | |
| def_doc = nlp_de(definition) | |
| if def_doc.has_vector and def_doc.vector_norm > 0: | |
| relevance = context_doc.similarity(def_doc) | |
| except Exception: | |
| relevance = 0.0 | |
| sense["relevance_score"] = float(relevance) | |
| ranked_senses.append(sense) | |
| ranked_senses.sort(key=lambda x: x.get('relevance_score', 0.0), reverse=True) | |
| if top_n > 0: | |
| ranked_senses = ranked_senses[:top_n] | |
| semantic_analysis[key] = ranked_senses | |
| # ConceptNet Relations | |
| if "conceptnet_relations" in semantic_analysis and nlp_de: | |
| ranked_relations = [] | |
| for rel in semantic_analysis["conceptnet_relations"]: | |
| if "error" in rel: continue | |
| text_to_score = rel.get('surface') or rel.get('other_node', '') | |
| relevance = 0.0 | |
| if text_to_score and context_doc: | |
| try: | |
| rel_doc = nlp_de(text_to_score) | |
| if rel_doc.has_vector and rel_doc.vector_norm > 0: | |
| relevance = context_doc.similarity(rel_doc) | |
| except Exception: | |
| relevance = 0.0 | |
| rel["relevance_score"] = float(relevance) | |
| ranked_relations.append(rel) | |
| ranked_relations.sort(key=lambda x: x.get('relevance_score', 0.0), reverse=True) | |
| if top_n > 0: | |
| ranked_relations = ranked_relations[:top_n] | |
| semantic_analysis["conceptnet_relations"] = ranked_relations | |
| lemma_report["semantic_analysis"] = semantic_analysis | |
| lemma_deep_dive[lemma] = lemma_report | |
| results["lemma_deep_dive"] = lemma_deep_dive | |
| print("[Comprehensive Analysis] Analysis complete.") | |
| return results | |
| except Exception as e: | |
| print(f"[Comprehensive Analysis] FATAL ERROR: {e}") | |
| traceback.print_exc() | |
| return { | |
| "error": f"Analysis failed: {str(e)}", | |
| "traceback": traceback.format_exc(), | |
| "input_text": text | |
| } | |
| # --- 7b. NEW: Word Encyclopedia (Non-Contextual) Analyzer --- | |
| def _analyze_word_with_hanta(word: str, top_n_value: Optional[float] = 0) -> Dict[str, Any]: | |
| """ | |
| (FALLBACK ENGINE 2) Analyzes a single word using HanTa + OdeNet + Pattern. | |
| This was the V18 engine. Returns {} on failure. | |
| """ | |
| if not HANTA_AVAILABLE: | |
| return {} # Signal failure | |
| top_n = int(top_n_value) if top_n_value is not None else 0 | |
| print(f"\n[Word Encyclopedia] Running V18 (HanTa) fallback for: \"{word}\"") | |
| final_result: Dict[str, Any] = { | |
| "input_word": word, | |
| "analysis": {} | |
| } | |
| word_lower = word.lower() # For validation | |
| try: | |
| hanta_tagger = hanta_get_tagger() | |
| if not hanta_tagger: | |
| raise Exception("HanTa Tagger failed to initialize.") | |
| hanta_tags = _hanta_get_candidates(word, hanta_tagger) | |
| if not hanta_tags: | |
| return {} | |
| pos_groups_map = _hanta_map_tags_to_pos(hanta_tags) | |
| log(f"Found {len(pos_groups_map)} possible POS group(s): {list(pos_groups_map.keys())}") | |
| for pos_group, specific_tags in pos_groups_map.items(): | |
| print(f"--- Analyzing as: {pos_group.upper()} ---") | |
| lemma = _hanta_get_lemma_for_pos(word, pos_group, hanta_tagger) | |
| log(f"Lemma for {pos_group} is: '{lemma}'") | |
| all_odenet_senses = _get_odenet_senses_by_pos(lemma) | |
| pos_odenet_senses = all_odenet_senses.get(pos_group, []) | |
| if not pos_odenet_senses: | |
| log(f"β REJECTED {pos_group}: OdeNet is available but has no '{pos_group}' senses for lemma '{lemma}'.") | |
| continue | |
| if pos_odenet_senses and "info" in pos_odenet_senses[0]: | |
| log(f"β VERIFIED {pos_group}: OdeNet is unavailable, proceeding without validation.") | |
| pos_odenet_senses = [] | |
| else: | |
| log(f"β VERIFIED {pos_group}: OdeNet found {len(pos_odenet_senses)} sense(s).") | |
| # --- 1. Get Inflections (Pattern) --- | |
| inflection_report = {} | |
| if not PATTERN_DE_AVAILABLE: | |
| inflection_report = {"info": "pattern.de library not available. No inflections generated."} | |
| else: | |
| try: | |
| if pos_group == "noun": | |
| inflection_report = pattern_analyze_as_noun(lemma) | |
| elif pos_group == "verb": | |
| inflection_report = pattern_analyze_as_verb(lemma) | |
| elif pos_group == "adjective": | |
| inflection_report = pattern_analyze_as_adjective(lemma) | |
| elif pos_group == "adverb": | |
| inflection_report = {"base_form": lemma, "info": "Adverbs are non-inflecting."} | |
| if not pattern_is_good_analysis(inflection_report, pos_group) and pos_group != "adverb": | |
| log(f"β οΈ Warning: pattern.de generated a poor inflection table for {lemma} ({pos_group}).") | |
| inflection_report["warning"] = "Inflection table from pattern.de seems incomplete or invalid." | |
| except Exception as e: | |
| log(f"pattern.de inflection failed for {lemma} ({pos_group}): {e}") | |
| inflection_report = {"error": f"pattern.de failed: {e}", "traceback": traceback.format_exc()} | |
| # --- 2. Build Semantics Block --- | |
| semantics_block = _build_semantics_block_for_lemma(lemma, pos_group, top_n) | |
| # --- 3. Build Final Report Block --- | |
| pos_entry_report = { | |
| "hanta_analysis": { | |
| "detected_tags": sorted(list(specific_tags)), | |
| "lemma": lemma, | |
| "morphemes": [ | |
| hanta_tagger.analyze(word.capitalize() if pos_group == 'noun' else word.lower(), taglevel=3) | |
| ] | |
| }, | |
| "inflections_pattern": inflection_report, | |
| "semantics_combined": semantics_block | |
| } | |
| # --- 4. *** VALIDATION FILTER *** --- | |
| is_valid = False | |
| if lemma.lower() == word_lower: | |
| is_valid = True | |
| log(f"[DEBUG] HanTa: KEEPING entry '{lemma}' ({pos_group}) because input word matches lemma.") | |
| if not is_valid: | |
| # Check pattern.de's lexeme (for verbs) | |
| for form in inflection_report.get("lexeme", []): | |
| if form.lower() == word_lower: | |
| is_valid = True | |
| log(f"[DEBUG] HanTa: KEEPING entry '{lemma}' ({pos_group}) because input word found in pattern.de lexeme.") | |
| break | |
| if not is_valid: | |
| # Check pattern.de's participles (for "abgeschnitten") | |
| for part_form in inflection_report.get("participles", {}).values(): | |
| if part_form.lower() == word_lower: | |
| is_valid = True | |
| log(f"[DEBUG] HanTa: KEEPING entry '{lemma}' ({pos_group}) because input word found in pattern.de participles.") | |
| break | |
| if not is_valid and pos_group == "adjective": | |
| # Check adjective forms | |
| if word_lower == inflection_report.get("predicative", "").lower() or \ | |
| word_lower == inflection_report.get("comparative", "").lower() or \ | |
| word_lower == inflection_report.get("superlative", "").lower(): | |
| is_valid = True | |
| log(f"[DEBUG] HanTa: KEEPING entry '{lemma}' ({pos_group}) because input word matches adj comparison form.") | |
| if not is_valid and pos_group == "noun": | |
| # Check noun forms | |
| if word_lower == inflection_report.get("singular", "").lower() or \ | |
| word_lower == inflection_report.get("plural", "").lower(): | |
| is_valid = True | |
| log(f"[DEBUG] HanTa: KEEPING entry '{lemma}' ({pos_group}) because input word matches noun singular/plural.") | |
| if not is_valid and pos_group == "adverb": | |
| is_valid = True # Adverbs are non-inflecting, always keep. | |
| if is_valid: | |
| if pos_group not in final_result["analysis"]: | |
| final_result["analysis"][pos_group] = [] | |
| final_result["analysis"][pos_group].append(pos_entry_report) | |
| else: | |
| log(f"[DEBUG] HanTa: DROPPING entry '{lemma}' ({pos_group}) because input word '{word}' was not found in its valid forms.") | |
| # --- END OF VALIDATION --- | |
| if not final_result["analysis"]: | |
| return {} # No results | |
| final_result["info"] = "Analysis performed by HanTa-led fallback engine." | |
| return final_result | |
| except Exception as e: | |
| print(f"[Word Encyclopedia] HanTa FALLBACK Engine FAILED: {e}") | |
| traceback.print_exc() | |
| return {} # Signal failure | |
| def _analyze_word_with_iwnlp(word: str, top_n_value: Optional[float] = 0) -> Dict[str, Any]: | |
| """ | |
| (FALLBACK ENGINE 3) Analyzes a single word using IWNLP + OdeNet + Pattern. | |
| This is the full V16/V18 logic, restored and with the new validation filter. | |
| Returns {} on failure. | |
| """ | |
| if not word or not word.strip(): | |
| return {} # Use empty dict for "info" | |
| if not IWNLP_AVAILABLE: | |
| return {} # Signal failure | |
| top_n = int(top_n_value) if top_n_value is not None else 0 | |
| print(f"\n[Word Encyclopedia] Running IWNLP-fallback analysis for: \"{word}\" (top_n={top_n})") | |
| final_result: Dict[str, Any] = { | |
| "input_word": word, | |
| "analysis": {} | |
| } | |
| word_lower = word.lower() # For validation | |
| # --- Helper: Get OdeNet senses --- | |
| def _get_odenet_senses_by_pos_internal(w): | |
| """ | |
| (Internal helper for IWNLP fallback) | |
| OdeNet uses 'a' for BOTH Adjective and Adverb. | |
| """ | |
| senses_by_pos: Dict[str, List[Dict]] = { | |
| "noun": [], "verb": [], "adjective": [], "adverb": [] | |
| } | |
| if not WN_AVAILABLE: | |
| log(f"[IWNLP Fallback] OdeNet check skipped for '{w}': WN_AVAILABLE=False") | |
| # Fail-open strategy | |
| return {"noun": [{"info": "OdeNet unavailable"}], | |
| "verb": [{"info": "OdeNet unavailable"}], | |
| "adjective": [{"info": "OdeNet unavailable"}], | |
| "adverb": [{"info": "OdeNet unavailable"}]} | |
| try: | |
| all_senses = odenet_get_thesaurus_info(w).get("senses", []) | |
| for sense in all_senses: | |
| if "error" in sense: continue | |
| pos_tag = sense.get("pos") | |
| if pos_tag == 'n': | |
| senses_by_pos["noun"].append(sense) | |
| elif pos_tag == 'v': | |
| senses_by_pos["verb"].append(sense) | |
| elif pos_tag == 'a': | |
| log(f"[IWNLP Fallback] Found OdeNet 'a' tag (Adj/Adv) for sense: {sense.get('definition', '...')[:30]}") | |
| senses_by_pos["adjective"].append(sense) | |
| senses_by_pos["adverb"].append(sense) | |
| except Exception as e: | |
| print(f"[Word Encyclopedia] OdeNet check failed: {e}") | |
| return senses_by_pos | |
| # --- 1. GET ALL LEMMA CANDIDATES & SPACY POS --- | |
| try: | |
| iwnlp = iwnlp_get_pipeline() | |
| if not iwnlp: | |
| return {} # Signal failure | |
| doc = iwnlp(word) | |
| token = doc[0] | |
| spacy_pos = token.pos_ # e.g., "NOUN" for "Lauf", "ADV" for "heute" | |
| spacy_lemma = token.lemma_ | |
| iwnlp_lemmas_list = token._.iwnlp_lemmas or [] | |
| all_lemmas = set(iwnlp_lemmas_list) | |
| all_lemmas.add(spacy_lemma) | |
| all_lemmas.add(word) # Add the word itself | |
| print(f"[Word Encyclopedia] spaCy POS: {spacy_pos}") | |
| print(f"[Word Encyclopedia] All lemmas to check: {all_lemmas}") | |
| except Exception as e: | |
| traceback.print_exc() | |
| return {} # Signal failure | |
| # --- 2. CHECK INFLECTING POSSIBILITIES FOR EACH LEMMA --- | |
| valid_analyses: Dict[str, Dict[str, Any]] = {} | |
| for lemma in all_lemmas: | |
| if not lemma: continue | |
| odenet_senses_by_pos = _get_odenet_senses_by_pos_internal(lemma) | |
| # --- Check NOUN --- | |
| if 'noun' not in valid_analyses: | |
| noun_inflections = {} | |
| is_good_noun = False | |
| if not PATTERN_DE_AVAILABLE: | |
| noun_inflections = {"info": "pattern.de not available."} | |
| is_good_noun = True | |
| else: | |
| try: | |
| noun_inflections = pattern_analyze_as_noun(lemma.capitalize()) | |
| if pattern_is_good_analysis(noun_inflections, "noun"): | |
| is_good_noun = True | |
| except Exception as e: | |
| noun_inflections = {"error": f"pattern.de failed: {e}"} | |
| if is_good_noun: | |
| odenet_senses = odenet_senses_by_pos.get('noun', []) | |
| if not odenet_senses and lemma.lower() == word.lower(): | |
| odenet_senses = _get_odenet_senses_by_pos_internal(lemma.capitalize()).get('noun', []) | |
| if odenet_senses: | |
| if "info" not in odenet_senses[0] or not WN_AVAILABLE: | |
| log(f" β [IWNLP Fallback] Valid NOUN found: {lemma}") | |
| valid_analyses['noun'] = { | |
| "lemma": noun_inflections.get("base_form", lemma), | |
| "inflections": noun_inflections, | |
| "odenet_senses": [] if "info" in odenet_senses[0] else odenet_senses | |
| } | |
| # --- Check VERB --- | |
| if 'verb' not in valid_analyses: | |
| verb_inflections = {} | |
| is_good_verb = False | |
| if not PATTERN_DE_AVAILABLE: | |
| verb_inflections = {"info": "pattern.de not available."} | |
| is_good_verb = True | |
| else: | |
| try: | |
| verb_inflections = pattern_analyze_as_verb(lemma) | |
| if pattern_is_good_analysis(verb_inflections, "verb"): | |
| is_good_verb = True | |
| except Exception as e: | |
| verb_inflections = {"error": f"pattern.de failed: {e}"} | |
| if is_good_verb: | |
| odenet_senses = odenet_senses_by_pos.get('verb', []) | |
| if odenet_senses: | |
| if "info" not in odenet_senses[0] or not WN_AVAILABLE: | |
| log(f" β [IWNLP Fallback] Valid VERB found: {lemma}") | |
| valid_analyses['verb'] = { | |
| "lemma": verb_inflections.get("infinitive", lemma), | |
| "inflections": verb_inflections, | |
| "odenet_senses": [] if "info" in odenet_senses[0] else odenet_senses | |
| } | |
| # --- Check ADJECTIVE --- | |
| if 'adjective' not in valid_analyses: | |
| adj_inflections = {} | |
| is_good_adj = False | |
| if not PATTERN_DE_AVAILABLE: | |
| adj_inflections = {"info": "pattern.de not available."} | |
| is_good_adj = True | |
| else: | |
| try: | |
| adj_inflections = pattern_analyze_as_adjective(lemma) | |
| if pattern_is_good_analysis(adj_inflections, "adjective"): | |
| is_good_adj = True | |
| except Exception as e: | |
| adj_inflections = {"error": f"pattern.de failed: {e}"} | |
| if is_good_adj: | |
| odenet_senses = odenet_senses_by_pos.get('adjective', []) | |
| if odenet_senses: | |
| if "info" not in odenet_senses[0] or not WN_AVAILABLE: | |
| log(f" β [IWNLP Fallback] Valid ADJECTIVE found: {lemma}") | |
| valid_analyses['adjective'] = { | |
| "lemma": adj_inflections.get("predicative", lemma), | |
| "inflections": adj_inflections, | |
| "odenet_senses": [] if "info" in odenet_senses[0] else odenet_senses | |
| } | |
| # --- 3. CHECK NON-INFLECTING POS (ADVERB) --- | |
| if spacy_pos == "ADV": | |
| odenet_senses = _get_odenet_senses_by_pos_internal(word).get('adverb', []) | |
| if odenet_senses: | |
| if "info" not in odenet_senses[0] or not WN_AVAILABLE: | |
| log(f" β [IWNLP Fallback] Valid ADVERB found: {word}") | |
| valid_analyses['adverb'] = { | |
| "lemma": word, | |
| "inflections": {"base_form": word}, | |
| "odenet_senses": [] if "info" in odenet_senses[0] else odenet_senses | |
| } | |
| # --- 4. CHECK OTHER FUNCTION WORDS (e.g. "mein" -> DET) --- | |
| FUNCTION_POS = {"DET", "PRON", "ADP", "AUX", "CCONJ", "SCONJ", "PART", "PUNCT", "SYM"} | |
| if spacy_pos in FUNCTION_POS and not valid_analyses: | |
| pos_key = spacy_pos.lower() | |
| print(f" β Valid Function Word found: {word} (POS: {spacy_pos})") | |
| valid_analyses[pos_key] = { | |
| "lemma": spacy_lemma, | |
| "inflections": {"base_form": spacy_lemma}, | |
| "odenet_senses": [], | |
| "spacy_analysis": { | |
| "word": token.text, "lemma": token.lemma_, | |
| "pos_UPOS": token.pos_, "pos_TAG": token.tag_, | |
| "morphology": str(token.morph) | |
| } | |
| } | |
| # --- 5. BUILD FINAL REPORT (V21 MODIFIED + VALIDATION) --- | |
| for pos_key, analysis_data in valid_analyses.items(): | |
| lemma = analysis_data["lemma"] | |
| inflection_block = analysis_data["inflections"] | |
| # --- E. VALIDATION FILTER --- | |
| is_valid = False | |
| if lemma.lower() == word_lower: | |
| is_valid = True | |
| log(f"[DEBUG] IWNLP: KEEPING entry '{lemma}' ({pos_key}) because input word matches lemma.") | |
| if not is_valid: | |
| # Check pattern.de's lexeme (for verbs) | |
| for form in inflection_block.get("lexeme", []): | |
| if form.lower() == word_lower: | |
| is_valid = True | |
| log(f"[DEBUG] IWNLP: KEEPING entry '{lemma}' ({pos_key}) because input word found in pattern.de lexeme.") | |
| break | |
| if not is_valid: | |
| # Check pattern.de's participles (for "abgeschnitten") | |
| for part_form in inflection_block.get("participles", {}).values(): | |
| if part_form.lower() == word_lower: | |
| is_valid = True | |
| log(f"[DEBUG] IWNLP: KEEPING entry '{lemma}' ({pos_key}) because input word found in pattern.de participles.") | |
| break | |
| if not is_valid and pos_key == "adjective": | |
| # Check adjective forms | |
| if word_lower == inflection_block.get("predicative", "").lower() or \ | |
| word_lower == inflection_block.get("comparative", "").lower() or \ | |
| word_lower == inflection_block.get("superlative", "").lower(): | |
| is_valid = True | |
| log(f"[DEBUG] IWNLP: KEEPING entry '{lemma}' ({pos_key}) because input word matches adj comparison form.") | |
| if not is_valid and pos_key == "noun": | |
| # Check noun forms | |
| if word_lower == inflection_block.get("singular", "").lower() or \ | |
| word_lower == inflection_block.get("plural", "").lower(): | |
| is_valid = True | |
| log(f"[DEBUG] IWNLP: KEEPING entry '{lemma}' ({pos_key}) because input word matches noun singular/plural.") | |
| if not is_valid and (pos_key == "adverb" or "spacy_analysis" in analysis_data): | |
| is_valid = True # Adverbs and Function Words are non-inflecting, always keep. | |
| log(f"[DEBUG] IWNLP: KEEPING entry '{lemma}' ({pos_key}) because it is a non-inflecting word (ADV/FUNC).") | |
| if is_valid: | |
| pos_report = { | |
| "inflections_pattern": inflection_block, | |
| # Use the new global helper | |
| "semantics_combined": _build_semantics_block_for_lemma( | |
| lemma, | |
| pos_key, | |
| top_n | |
| ) | |
| } | |
| if "spacy_analysis" in analysis_data: | |
| pos_report["spacy_analysis"] = analysis_data["spacy_analysis"] | |
| if pos_key not in final_result["analysis"]: | |
| final_result["analysis"][pos_key] = [] | |
| final_result["analysis"][pos_key].append(pos_report) | |
| else: | |
| log(f"[DEBUG] IWNLP: DROPPING entry '{lemma}' ({pos_key}) because input word '{word}' was not found in its valid forms.") | |
| # --- END VALIDATION --- | |
| if not final_result["analysis"]: | |
| return {} # No results | |
| final_result["info"] = "Analysis performed by IWNLP-based fallback engine." | |
| return final_result | |
| # --- 7b. Word Encyclopedia (Non-Contextual) Analyzer --- | |
| # --- PUBLIC DISPATCHER FUNCTION --- | |
| # --- THIS IS THE NEW PUBLIC DISPATCHER FUNCTION --- | |
| def analyze_word_encyclopedia(word: str, top_n_value: Optional[float] = 0, engine_choice: str = "wiktionary") -> Dict[str, Any]: | |
| """ | |
| (PUBLIC DISPATCHER V22) Analyzes a single word using the selected engine | |
| as a starting point, then automatically falls back if no results are found. | |
| Chain: Wiktionary -> DWDSmor -> HanTa -> IWNLP | |
| """ | |
| if not word or not word.strip(): | |
| return {"info": "Please enter a word."} | |
| word = word.strip() | |
| top_n = int(top_n_value) if top_n_value is not None else 0 | |
| result = {} | |
| info_log = [] # To track which engines failed | |
| log(f"\n[Word Encyclopedia] User selected engine: '{engine_choice}' for word: '{word}'") | |
| try: | |
| # --- 1. Try Wiktionary --- | |
| if engine_choice == "wiktionary": | |
| log(f"[DEBUG] V22 Dispatcher: Trying Wiktionary (Primary) for '{word}'...") | |
| result = _analyze_word_with_wiktionary(word, top_n) | |
| if result and result.get("analysis"): | |
| return result # Success | |
| info_log.append("Wiktionary found no results.") | |
| log(f"[DEBUG] V22 Dispatcher: Wiktionary found no results. Falling back to DWDSmor...") | |
| # --- 2. Try DWDSmor (NEW) --- | |
| if engine_choice == "dwdsmor" or (engine_choice == "wiktionary" and not result.get("analysis")): | |
| log(f"[DEBUG] V22 Dispatcher: Trying DWDSmor (Fallback 1) for '{word}'...") | |
| result = _analyze_word_with_dwdsmor(word, top_n) | |
| if result and result.get("analysis"): | |
| result["info"] = f"Analysis from DWDSmor (Fallback 1). {(' '.join(info_log))}" | |
| return result # Success | |
| info_log.append("DWDSmor found no results.") | |
| log(f"[DEBUG] V22 Dispatcher: DWDSmor found no results. Falling back to HanTa...") | |
| # --- 3. Try HanTa --- | |
| if engine_choice == "hanta" or (not result.get("analysis")): | |
| log(f"[DEBUG] V22 Dispatcher: Trying HanTa (Fallback 2) for '{word}'...") | |
| result = _analyze_word_with_hanta(word, top_n) | |
| if result and result.get("analysis"): | |
| result["info"] = f"Analysis from HanTa (Fallback 2). {(' '.join(info_log))}" | |
| return result # Success | |
| info_log.append("HanTa found no results.") | |
| log(f"[DEBUG] V22 Dispatcher: HanTa found no results. Falling back to IWNLP...") | |
| # --- 4. Try IWNLP --- | |
| if engine_choice == "iwnlp" or (not result.get("analysis")): | |
| log(f"[DEBUG] V22 Dispatcher: Trying IWNLP (Fallback 3) for '{word}'...") | |
| result = _analyze_word_with_iwnlp(word, top_n) | |
| if result and result.get("analysis"): | |
| result["info"] = f"Analysis from IWNLP (Fallback 3). {(' '.join(info_log))}" | |
| return result # Success | |
| info_log.append("IWNLP found no results.") | |
| except Exception as e: | |
| log(f"--- Dispatcher FAILED for engine {engine_choice}: {e} ---") | |
| traceback.print_exc() | |
| return { | |
| "input_word": word, | |
| "error": f"An engine failed during analysis.", | |
| "traceback": traceback.format_exc() | |
| } | |
| # --- No engines found anything --- | |
| log(f"[DEBUG] V22 Dispatcher: All engines failed to find results for '{word}'.") | |
| return { | |
| "input_word": word, | |
| "info": f"No analysis found. All engines failed. ({' '.join(info_log)})" | |
| } | |
| # ============================================================================ | |
| # 8. GRADIO UI CREATION | |
| # ============================================================================ | |
| def create_spacy_tab(): | |
| """Creates the UI for the spaCy tab.""" | |
| config = SPACY_UI_TEXT["en"] | |
| model_choices = list(SPACY_MODEL_INFO.keys()) | |
| with gr.Row(): | |
| ui_lang_radio = gr.Radio(["DE", "EN", "ES"], label=config["ui_lang_label"], value="EN") | |
| model_lang_radio = gr.Radio( | |
| choices=[(SPACY_MODEL_INFO[k][0], k) for k in model_choices], | |
| label=config["model_lang_label"], | |
| value=model_choices[0] | |
| ) | |
| markdown_title = gr.Markdown(config["title"]) | |
| markdown_subtitle = gr.Markdown(config["subtitle"]) | |
| text_input = gr.Textbox(label=config["input_label"], placeholder=config["input_placeholder"], lines=5) | |
| analyze_button = gr.Button(config["button_text"], variant="primary") | |
| with gr.Tabs(): | |
| with gr.Tab(config["tab_graphic"]) as tab_graphic: | |
| html_dep_out = gr.HTML(label=config["html_label"]) | |
| with gr.Tab(config["tab_ner"]) as tab_ner: | |
| html_ner_out = gr.HTML(label=config["ner_label"]) | |
| with gr.Tab(config["tab_table"]) as tab_table: | |
| df_out = gr.DataFrame(label=config["table_label"], headers=config["table_headers"], interactive=False) | |
| with gr.Tab(config["tab_json"]) as tab_json: | |
| json_out = gr.JSON(label=config["json_label"]) | |
| analyze_button.click(fn=spacy_get_analysis, | |
| inputs=[ui_lang_radio, model_lang_radio, text_input], | |
| outputs=[df_out, json_out, html_dep_out, html_ner_out, analyze_button], | |
| api_name="get_morphology") | |
| ui_lang_radio.change(fn=spacy_update_ui, | |
| inputs=ui_lang_radio, | |
| outputs=[markdown_title, markdown_subtitle, ui_lang_radio, model_lang_radio, | |
| text_input, analyze_button, tab_graphic, tab_table, tab_json, tab_ner, | |
| html_dep_out, df_out, json_out, html_ner_out]) | |
| def create_languagetool_tab(): | |
| """Creates the UI for the LanguageTool tab.""" | |
| gr.Markdown("# π©πͺ German Grammar & Spelling Checker") | |
| gr.Markdown("Powered by `language-tool-python`. This service checks German text for grammatical errors and spelling mistakes.") | |
| with gr.Column(): | |
| text_input = gr.Textbox( | |
| label="German Text to Check", | |
| placeholder="e.g., Ich sehe dem Mann. Das ist ein Huas.", | |
| lines=5 | |
| ) | |
| check_button = gr.Button("Check Text", variant="primary") | |
| output = gr.JSON(label="Detected Errors (JSON)") | |
| check_button.click( | |
| fn=lt_check_grammar, | |
| inputs=[text_input], | |
| outputs=[output], | |
| api_name="check_grammar" | |
| ) | |
| gr.Examples( | |
| [["Das ist ein Huas."], ["Ich sehe dem Mann."], | |
| ["Die Katze schlafen auf dem Tisch."], ["Er fragt ob er gehen kann."]], | |
| inputs=[text_input], outputs=[output], fn=lt_check_grammar | |
| ) | |
| def create_odenet_tab(): | |
| """Creates the UI for the OdeNet tab.""" | |
| gr.Markdown("# π©πͺ German Thesaurus (WordNet) Service") | |
| gr.Markdown("Powered by `wn` and `OdeNet (odenet:1.4)`. Finds synonyms, antonyms, and other semantic relations for German words.") | |
| with gr.Column(): | |
| word_input = gr.Textbox( | |
| label="German Word", | |
| placeholder="e.g., Haus, schnell, gut, Katze" | |
| ) | |
| check_button = gr.Button("Find Relations", variant="primary") | |
| output = gr.JSON(label="Thesaurus Information (JSON)") | |
| check_button.click( | |
| fn=odenet_get_thesaurus_info, | |
| inputs=[word_input], | |
| outputs=[output], | |
| api_name="get_thesaurus" | |
| ) | |
| gr.Examples( | |
| [["Hund"], ["gut"], ["laufen"], ["Haus"], ["schnell"]], | |
| inputs=[word_input], outputs=[output], fn=odenet_get_thesaurus_info | |
| ) | |
| def create_pattern_tab(): | |
| """Creates the UI for the Pattern.de tab.""" | |
| gr.Markdown("# π©πͺ Complete German Word Inflection System") | |
| gr.Markdown("Powered by `PatternLite`. Generates complete inflection tables (declension, conjugation) for German words. Robustly handles ambiguity (e.g., 'Lauf' vs 'lauf').") | |
| with gr.Column(): | |
| word_input = gr.Textbox( | |
| label="German Word", | |
| placeholder="z.B. Haus, gehen, schΓΆn, besser, lief, Lauf, See" | |
| ) | |
| generate_button = gr.Button("Generate All Forms", variant="primary") | |
| output = gr.JSON(label="Complete Inflection Analysis") | |
| generate_button.click( | |
| fn=pattern_get_all_inflections, | |
| inputs=[word_input], | |
| outputs=[output], | |
| api_name="get_all_inflections" | |
| ) | |
| gr.Examples( | |
| [["Haus"], ["gehen"], ["schΓΆn"], ["besser"], ["ging"], ["schnellem"], ["Katze"], ["Lauf"], ["See"]], | |
| inputs=[word_input], outputs=[output], fn=pattern_get_all_inflections | |
| ) | |
| def create_conceptnet_tab(): | |
| """--- NEW: Creates the UI for the ConceptNet tab ---""" | |
| gr.Markdown("# π ConceptNet Knowledge Graph (Direct API)") | |
| gr.Markdown("Powered by `api.conceptnet.io`. Fetches semantic relations for a word in any language.") | |
| with gr.Row(): | |
| word_input = gr.Textbox( | |
| label="Word or Phrase", | |
| placeholder="e.g., Baum, tree, Katze" | |
| ) | |
| lang_input = gr.Textbox( | |
| label="Language Code", | |
| placeholder="de", | |
| value="de" | |
| ) | |
| check_button = gr.Button("Find Relations", variant="primary") | |
| output = gr.JSON(label="ConceptNet Relations (JSON)") | |
| check_button.click( | |
| fn=conceptnet_get_relations, | |
| inputs=[word_input, lang_input], | |
| outputs=[output], | |
| api_name="get_conceptnet" | |
| ) | |
| gr.Examples( | |
| [["Baum", "de"], ["tree", "en"], ["Katze", "de"], ["gato", "es"]], | |
| inputs=[word_input, lang_input], outputs=[output], fn=conceptnet_get_relations | |
| ) | |
| def create_combined_tab(): | |
| """Creates the UI for the CONTEXTUAL Comprehensive Analyzer tab.""" | |
| gr.Markdown("# π Comprehensive Analyzer (Contextual)") | |
| gr.Markdown("This tool provides a deep, **lemma-based** analysis *in context*. It integrates all tools and uses the **full sentence** to rank semantic senses by relevance.") | |
| with gr.Column(): | |
| text_input = gr.Textbox( | |
| label="German Text", | |
| placeholder="e.g., Die schnelle Katze springt ΓΌber den faulen Hund.", | |
| lines=5 | |
| ) | |
| top_n_number = gr.Number( | |
| label="Limit Semantic Senses per POS (0 for all)", | |
| value=0, | |
| step=1, | |
| minimum=0, | |
| interactive=True | |
| ) | |
| analyze_button = gr.Button("Run Comprehensive Analysis", variant="primary") | |
| # *** ADD STATUS OUTPUT *** | |
| status_output = gr.Markdown(value="", visible=True) | |
| output = gr.JSON(label="Comprehensive Analysis (JSON)") | |
| # *** WRAPPER FUNCTION TO FORCE REFRESH *** | |
| def run_analysis_with_status(text, top_n): | |
| try: | |
| status = "π Analyzing..." | |
| yield status, {} | |
| result = comprehensive_german_analysis(text, top_n) | |
| status = f"β Analysis complete! Found {len(result.get('lemma_deep_dive', {}))} lemmas." | |
| yield status, result | |
| except Exception as e: | |
| error_status = f"β Error: {str(e)}" | |
| error_result = {"error": str(e), "traceback": traceback.format_exc()} | |
| yield error_status, error_result | |
| analyze_button.click( | |
| fn=run_analysis_with_status, | |
| inputs=[text_input, top_n_number], | |
| outputs=[status_output, output], | |
| api_name="comprehensive_analysis" | |
| ) | |
| gr.Examples( | |
| [["Die Katze schlafen auf dem Tisch.", 3], | |
| ["Das ist ein Huas.", 0], | |
| ["Ich laufe schnell.", 3], | |
| ["Der GΓ€rtner pflanzt einen Baum.", 5], | |
| ["Ich fahre an den See.", 3]], | |
| inputs=[text_input, top_n_number], | |
| outputs=[status_output, output], | |
| fn=run_analysis_with_status | |
| ) | |
| def create_word_encyclopedia_tab(): | |
| """--- UI for the NON-CONTEXTUAL Word Analyzer tab ---""" | |
| gr.Markdown("# π Word Encyclopedia (Non-Contextual)") | |
| gr.Markdown("This tool analyzes a **single word** for *all possible* grammatical and semantic forms. It finds ambiguities (e.g., 'Lauf' as noun and verb) and groups all data by Part-of-Speech.") | |
| with gr.Column(): | |
| word_input = gr.Textbox( | |
| label="Single German Word", | |
| placeholder="e.g., Lauf, See, schnell, heute" | |
| ) | |
| with gr.Row(): | |
| top_n_number = gr.Number( | |
| label="Limit Semantic Senses per POS (0 for all)", | |
| value=0, | |
| step=1, | |
| minimum=0, | |
| interactive=True | |
| ) | |
| # --- ADD DWDSMOR TO THE RADIO BUTTONS --- | |
| engine_radio = gr.Radio( | |
| label="Select Analysis Engine (will auto-fallback)", | |
| choices=[ | |
| ("Wiktionary (Default)", "wiktionary"), | |
| ("DWDSmor (New)", "dwdsmor"), | |
| ("HanTa (Fallback 2)", "hanta"), | |
| ("IWNLP (Fallback 3)", "iwnlp") | |
| ], | |
| value="wiktionary", | |
| interactive=True | |
| ) | |
| # --- END OF CHANGE --- | |
| analyze_button = gr.Button("Analyze Word", variant="primary") | |
| output = gr.JSON(label="Word Encyclopedia Analysis (JSON)") | |
| analyze_button.click( | |
| fn=analyze_word_encyclopedia, | |
| inputs=[word_input, top_n_number, engine_radio], | |
| outputs=[output], | |
| api_name="analyze_word" | |
| ) | |
| gr.Examples( | |
| [["Lauf", 3, "wiktionary"], | |
| ["See", 0, "wiktionary"], | |
| ["schnell", 3, "wiktionary"], | |
| ["heute", 0, "wiktionary"], | |
| ["gebildet", 0, "dwdsmor"]], # Example to show the new engine | |
| inputs=[word_input, top_n_number, engine_radio], | |
| outputs=[output], | |
| fn=analyze_word_encyclopedia | |
| ) | |
| def create_wiktionary_tab(): | |
| """Creates the UI for the standalone Wiktionary lookup tab.""" | |
| gr.Markdown("# π Wiktionary Lookup (Raw Engine)") | |
| gr.Markdown("Directly query the Wiktionary (Primary) engine. This shows the raw, combined data from the database, Pattern.de, and semantic sources.") | |
| with gr.Column(): | |
| word_input = gr.Textbox( | |
| label="Single German Word", | |
| placeholder="e.g., Haus, gehe, heute" | |
| ) | |
| analyze_button = gr.Button("Lookup Word in Wiktionary", variant="primary") | |
| output = gr.JSON(label="Wiktionary Engine Analysis (JSON)") | |
| # Call the internal engine function directly, hardcoding top_n=0 | |
| analyze_button.click( | |
| fn=lambda word: _analyze_word_with_wiktionary(word, 0), | |
| inputs=[word_input], | |
| outputs=[output], | |
| api_name="wiktionary_lookup" | |
| ) | |
| gr.Examples( | |
| [["Haus"], ["gehe"], ["heute"], ["Lauf"]], | |
| inputs=[word_input], outputs=[output], fn=lambda word: _analyze_word_with_wiktionary(word, 0) | |
| ) | |
| def create_dwdsmor_tab(): | |
| """Creates the UI for the standalone DWDSmor lookup tab.""" | |
| gr.Markdown("# ποΈ DWDSmor Morphology (Raw Engine)") | |
| gr.Markdown("Directly query the `dwdsmor` FST-based engine. This is a high-precision morphological analyzer.") | |
| def dwdsmor_raw_analysis(word): | |
| """Wrapper to get raw DWDSmor analysis as JSON.""" | |
| if not DWDSMOR_AVAILABLE: | |
| return {"error": "DWDSmor library not installed."} | |
| try: | |
| analyzer = dwdsmor_get_lemmatizer() | |
| if not analyzer: | |
| return {"error": "DWDSmor lemmatizer failed to initialize."} | |
| # --- THIS IS THE FIX --- | |
| # The analyzer.analyze() returns a Traversal object, which is iterable | |
| analyses = list(analyzer.analyze(word, join_tags=True)) | |
| # --- END OF FIX --- | |
| if not analyses: | |
| return {"info": f"No analysis found for '{word}'."} | |
| # Convert Traversal objects to plain dicts for JSON output | |
| results = [] | |
| for analysis in analyses: | |
| results.append({ | |
| "lemma": analysis.analysis, # In this object, .analysis is the lemma | |
| "pos": analysis.pos, | |
| "analysis_string": analysis.spec, # .spec is the full string | |
| "tags": analysis.tags | |
| }) | |
| return {"input_word": word, "analyses": results} | |
| except Exception as e: | |
| return {"error": str(e), "traceback": traceback.format_exc()} | |
| with gr.Column(): | |
| word_input = gr.Textbox( | |
| label="Single German Word", | |
| placeholder="e.g., gebildet, schnell, HΓ€user" | |
| ) | |
| analyze_button = gr.Button("Analyze Word with DWDSmor", variant="primary") | |
| output = gr.JSON(label="DWDSmor Raw Analysis (JSON)") | |
| analyze_button.click( | |
| fn=dwdsmor_raw_analysis, | |
| inputs=[word_input], | |
| outputs=[output], | |
| api_name="dwdsmor_lookup" | |
| ) | |
| gr.Examples( | |
| [["gebildet"], ["schnell"], ["HΓ€user"], ["gehe"]], | |
| inputs=[word_input], outputs=[output], fn=dwdsmor_raw_analysis | |
| ) | |
| def create_hanta_tab(): | |
| """Creates the UI for the standalone HanTa Engine tab.""" | |
| gr.Markdown("# π€ HanTa Lookup (Raw Engine)") | |
| gr.Markdown("Directly query the HanTa (Fallback 1) engine. This shows the raw, combined data from HanTa, Pattern.de, and semantic sources.") | |
| with gr.Column(): | |
| word_input = gr.Textbox( | |
| label="Single German Word", | |
| placeholder="e.g., Haus, gehe, heute" | |
| ) | |
| analyze_button = gr.Button("Lookup Word with HanTa", variant="primary") | |
| output = gr.JSON(label="HanTa Engine Analysis (JSON)") | |
| # Call the internal engine function directly, hardcoding top_n=0 | |
| analyze_button.click( | |
| fn=lambda word: _analyze_word_with_hanta(word, 0), | |
| inputs=[word_input], | |
| outputs=[output], | |
| api_name="hanta_lookup" | |
| ) | |
| gr.Examples( | |
| [["Haus"], ["gehe"], ["heute"], ["Lauf"]], | |
| inputs=[word_input], outputs=[output], fn=lambda word: _analyze_word_with_hanta(word, 0) | |
| ) | |
| def create_iwnlp_tab(): | |
| """Creates the UI for the standalone IWNLP Engine tab.""" | |
| gr.Markdown("# π¬ IWNLP-spaCy Lookup (Raw Engine)") | |
| gr.Markdown("Directly query the IWNLP-spaCy (Fallback 2) engine. This shows the raw, combined data from spaCy, IWNLP, Pattern.de, and semantic sources.") | |
| with gr.Column(): | |
| word_input = gr.Textbox( | |
| label="Single German Word", | |
| placeholder="e.g., Haus, gehe, heute" | |
| ) | |
| analyze_button = gr.Button("Lookup Word with IWNLP", variant="primary") | |
| output = gr.JSON(label="IWNLP Engine Analysis (JSON)") | |
| # Call the internal engine function directly, hardcoding top_n=0 | |
| analyze_button.click( | |
| fn=lambda word: _analyze_word_with_iwnlp(word, 0), | |
| inputs=[word_input], | |
| outputs=[output], | |
| api_name="iwnlp_lookup" | |
| ) | |
| gr.Examples( | |
| [["Haus"], ["gehe"], ["heute"], ["Lauf"]], | |
| inputs=[word_input], outputs=[output], fn=lambda word: _analyze_word_with_iwnlp(word, 0) | |
| ) | |
| # --- Main UI Builder --- | |
| def create_consolidated_interface(): | |
| """Builds the final Gradio app with all tabs.""" | |
| with gr.Blocks(title="Consolidated Linguistics Hub", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# ποΈ Consolidated Linguistics Hub") | |
| gr.Markdown("A suite of advanced tools for German linguistics, providing both contextual and non-contextual analysis.") | |
| with gr.Tabs(): | |
| # --- Main Tools --- | |
| with gr.Tab("π Word Encyclopedia (DE)"): | |
| create_word_encyclopedia_tab() | |
| with gr.Tab("π Comprehensive Analyzer (DE)"): | |
| create_combined_tab() | |
| with gr.Tab("π¬ spaCy Analyzer (Multi-lingual)"): | |
| create_spacy_tab() | |
| with gr.Tab("β Grammar Check (DE)"): | |
| create_languagetool_tab() | |
| # --- Standalone Engine Tabs (NEW) --- | |
| with gr.Tab("π Engine: Wiktionary (DE)"): | |
| create_wiktionary_tab() | |
| with gr.Tab("π€ Engine: HanTa (DE)"): | |
| create_hanta_tab() | |
| with gr.Tab("π¬ Engine: IWNLP-spaCy (DE)"): | |
| create_iwnlp_tab() | |
| with gr.Tab("ποΈ Engine: DWDSmor (DE)"): | |
| create_dwdsmor_tab() | |
| # --- Standalone Component Tabs --- | |
| with gr.Tab("π Component: Inflections (DE)"): | |
| create_pattern_tab() | |
| with gr.Tab("π Component: Thesaurus (DE)"): | |
| create_odenet_tab() | |
| with gr.Tab("π Component: ConceptNet (Direct)"): | |
| create_conceptnet_tab() | |
| return demo | |
| # ============================================================================ | |
| # 9. MAIN EXECUTION BLOCK | |
| # ============================================================================ | |
| if __name__ == "__main__": | |
| print("\n" + "="*70) | |
| print("CONSOLIDATED LINGUISTICS HUB (STARTING)") | |
| print("="*70 + "\n") | |
| # --- 1. Initialize spaCy Models --- | |
| print("--- Initializing spaCy Models ---") | |
| spacy_initialize_models() | |
| print("--- spaCy Done ---\n") | |
| # --- 2. Initialize OdeNet Worker --- | |
| print("--- Initializing OdeNet Worker ---") | |
| if WN_AVAILABLE: | |
| try: | |
| odenet_start_worker() | |
| print("β OdeNet worker is starting/ready.") | |
| except Exception as e: | |
| print(f"β FAILED to start OdeNet worker: {e}") | |
| print(" 'Thesaurus' and 'Comprehensive' tabs may fail.") | |
| else: | |
| print("INFO: OdeNet ('wn') library not available, skipping worker.") | |
| print("--- OdeNet Done ---\n") | |
| # --- 3. Initialize Wiktionary --- | |
| print("--- Initializing Wiktionary DB ---") | |
| try: | |
| if not wiktionary_download_db(): | |
| print("β WARNING: Failed to download Wiktionary DB. Primary engine is disabled.") | |
| else: | |
| # Try to pre-warm the connection | |
| _ = wiktionary_get_connection() | |
| except Exception as e: | |
| print(f"β FAILED to initialize Wiktionary: {e}") | |
| print("--- Wiktionary Done ---\n") | |
| # --- Initialize DWDSmor --- | |
| print("--- Initializing DWDSmor Lemmatizer ---") | |
| if DWDSMOR_AVAILABLE: | |
| try: | |
| dwdsmor_get_lemmatizer() # Call the function to load the model | |
| except Exception as e: | |
| print(f"β FAILED to start DWDSmor: {e}") | |
| print(" 'Word Encyclopedia' DWDSmor engine will fail.") | |
| else: | |
| print("INFO: DWDSmor library not available, skipping lemmatizer.") | |
| print("--- DWDSmor Done ---\n") | |
| # --- 4. Initialize HanTa Tagger --- | |
| print("--- Initializing HanTa Tagger ---") | |
| if HANTA_AVAILABLE: | |
| try: | |
| hanta_get_tagger() # Call the function to load the model | |
| except Exception as e: | |
| print(f"β FAILED to start HanTa tagger: {e}") | |
| print(" Β 'Word Encyclopedia' tab will fail.") | |
| else: | |
| print("INFO: HanTa library not available, skipping tagger.") | |
| print("--- HanTa Done ---\n") | |
| # --- 54. Check LanguageTool --- | |
| print("--- Checking LanguageTool ---") | |
| if not LT_AVAILABLE: | |
| print("WARNING: language-tool-python not available. 'Grammar' tab will fail.") | |
| else: | |
| print("β LanguageTool library is available (will lazy-load on first use).") | |
| print("--- LanguageTool Done ---\n") | |
| # --- 6. Check Pattern.de --- | |
| print("--- Checking Pattern.de ---") | |
| if not PATTERN_DE_AVAILABLE: | |
| print("WARNING: pattern.de library not available. 'Inflections' tab will fail.") | |
| else: | |
| print("β Pattern.de library is available.") | |
| print("--- Pattern.de Done ---\n") | |
| # --- 7. Check Requests (for ConceptNet) --- | |
| print("--- Checking Requests (for ConceptNet) ---") | |
| if not REQUESTS_AVAILABLE: | |
| print("WARNING: requests library not available. 'ConceptNet' features will fail.") | |
| else: | |
| print("β Requests library is available.") | |
| print("--- Requests Done ---\n") | |
| # --- 8. Initialize ConceptNet Client --- | |
| print("--- Initializing ConceptNet Client ---") | |
| if GRADIO_CLIENT_AVAILABLE: | |
| try: | |
| get_conceptnet_client() # Call the function to load the client | |
| except Exception as e: | |
| print(f"β FAILED to start ConceptNet Client: {e}") | |
| else: | |
| print("INFO: gradio_client not available, skipping ConceptNet client.") | |
| print("--- ConceptNet Client Done ---\n") | |
| print("="*70) | |
| print("All services initialized. Launching Gradio Hub...") | |
| print("="*70 + "\n") | |
| # --- 9. Launch Gradio --- | |
| demo = create_consolidated_interface() | |
| demo.launch(server_name="0.0.0.0", server_port=7860, show_error=True) |