# ============================================================================
# ENGLISH LINGUISTICS HUB (CONSOLIDATED APP V24-EN)
#
# This script provides a comprehensive Linguistics Hub for English analysis,
# adding NLTK, Stanza, TextBlob, HanTa(EN), OEWN, and OpenBLP.
# It maintains the exact same JSON output structure as the German app.
#
# ============================================================================
# TABS & FUNCTIONALITY:
# ============================================================================
#
# --- PRIMARY TABS ---
#
# 1. Word Encyclopedia (EN):
# - NON-CONTEXTUAL analysis of single words.
# - Multi-engine dispatcher with user selection and automatic fallback:
# (Wiktionary -> HanTa -> Stanza -> NLTK -> TextBlob)
# - Aggregates all grammatical (Wiktionary, Pattern) and semantic
# (Wiktionary, OEWN, OpenBLP, ConceptNet) possibilities.
#
# 2. Comprehensive Analyzer (EN):
# - CONTEXTUAL analysis of full sentences.
# - Uses the Word Encyclopedia's dispatcher for robust lemma analysis.
# - Ranks all semantic senses (Wiktionary, OEWN) by relevance.
#
# ============================================================================
# ============================================================================
# 1. CONSOLIDATED IMPORTS
# ============================================================================
import gradio as gr
import spacy
from spacy import displacy
import base64
import traceback
import subprocess
import sys
import os
from pathlib import Path
import importlib
import site
import threading
import queue
from dataclasses import dataclass
from enum import Enum
from typing import Dict, Any, List, Set, Optional, Tuple
import requests
import zipfile
import re
import sqlite3
import json
from huggingface_hub import hf_hub_download
import gzip
import shutil
# --- Requests and gradio Import (for ConceptNet) ---
try:
import requests
from requests.exceptions import RequestException, HTTPError, ConnectionError, Timeout
REQUESTS_AVAILABLE = True
print("✓ Successfully imported requests.")
except ImportError:
REQUESTS_AVAILABLE = False
print("CRITICAL WARNING: `requests` library not found.")
try:
from gradio_client import Client
GRADIO_CLIENT_AVAILABLE = True
except ImportError:
GRADIO_CLIENT_AVAILABLE = False
print("CRITICAL WARNING: `gradio_client` library not found.")
# --- LanguageTool Import ---
try:
import language_tool_python
LT_AVAILABLE = True
print("✓ Successfully imported language_tool")
except ImportError:
LT_AVAILABLE = False
print("CRITICAL WARNING: `language-tool-python` library not found.")
# --- WordNet (wn) Import (for OEWN) ---
try:
import wn
WN_AVAILABLE = True
print("✓ Successfully imported wordnet (for OEWN)")
except ImportError:
WN_AVAILABLE = False
print("CRITICAL WARNING: `wn` library not found.")
# --- Pattern.en Import (ENGLISH) ---
PATTERN_EN_AVAILABLE = False
# Define constants locally as fallbacks (Pattern standard values) to prevent import errors
# Tenses
INFINITIVE = "inf"
PRESENT = "pres"
PAST = "pst"
FUTURE = "fut"
PARTICIPLE = "part"
# Person/Number
FIRST = 1
SECOND = 2
THIRD = 3
SINGULAR = "sg"
PLURAL = "pl"
# POS
NOUN = "NN"
VERB = "VB"
ADJECTIVE = "JJ"
try:
print("Trying to import pattern.en")
import pattern.en
# Import functions safely
from pattern.en import (
pluralize, singularize,
conjugate, lemma, lexeme, tenses,
comparative, superlative,
predicative, attributive,
article,
parse, split
)
# Try to import constants, but don't fail if they are missing (we use fallbacks)
print("Trying to import pattern constants.")
try:
from pattern.en import (
INFINITIVE, PRESENT, PAST, PARTICIPLE,
FIRST, SECOND, THIRD, SINGULAR, PLURAL,
NOUN, VERB, ADJECTIVE
)
except ImportError:
print("Using local fallback constants for Pattern.en")
PATTERN_EN_AVAILABLE = True
print("✓ Successfully imported pattern.en")
except ImportError:
print("Using PatternLite fallback logic...")
try:
# Attempt simple import for PatternLite structure
import pattern.en
from pattern.en import pluralize, singularize, conjugate, lemma, lexeme
# Manually map functions if they are missing in Lite but available under different names
if not 'comparative' in dir(pattern.en):
from pattern.en import comparative, superlative
PATTERN_EN_AVAILABLE = True
print("✓ Successfully imported pattern.en (via PatternLite)")
except ImportError as e:
PATTERN_EN_AVAILABLE = False
print(f"CRITICAL WARNING: `pattern.en` library not found: {e}")
# --- HanTa Tagger Import (for EN) ---
try:
from HanTa.HanoverTagger import HanoverTagger
import HanTa.HanoverTagger
sys.modules['HanoverTagger'] = HanTa.HanoverTagger
HANTA_AVAILABLE = True
print("✓ Successfully imported HanTa")
except ImportError:
HANTA_AVAILABLE = False
print("CRITICAL WARNING: `HanTa` library not found.")
# --- NLTK & TextBlob Import ---
try:
import nltk
from nltk.corpus import wordnet as nltk_wn
from nltk.stem import WordNetLemmatizer
# --- CRITICAL: Download required NLTK data ---
# These are the specific packages causing your "LookupError" and "MissingCorpusError"
print("Downloading NLTK data...")
_nltk_packages = [
'wordnet',
'omw-1.4',
'averaged_perceptron_tagger',
'averaged_perceptron_tagger_eng', # Specific for newer NLTK
'punkt',
'punkt_tab' # Specific for newer TextBlob/NLTK
]
for pkg in _nltk_packages:
try:
nltk.download(pkg, quiet=True)
except Exception as e:
print(f"Warning: Failed to download NLTK package '{pkg}': {e}")
NLTK_AVAILABLE = True
print("✓ Successfully imported nltk and downloaded data")
except ImportError:
NLTK_AVAILABLE = False
print("WARNING: `nltk` library not found.")
except Exception as e:
NLTK_AVAILABLE = False
print(f"WARNING: `nltk` data download failed: {e}")
try:
from textblob import TextBlob
TEXTBLOB_AVAILABLE = True
print("✓ Successfully imported textblob")
except ImportError:
TEXTBLOB_AVAILABLE = False
print("WARNING: `textblob` library not found.")
# --- Stanza Import ---
try:
import stanza
STANZA_AVAILABLE = True
print("✓ Successfully imported stanza")
except ImportError:
STANZA_AVAILABLE = False
print("WARNING: `stanza` library not found.")
# --- German-specific imports are not needed ---
IWNLP_AVAILABLE = False
DWDSMOR_AVAILABLE = False
# ============================================================================
# 2. SHARED GLOBALS & CONFIG
# ============================================================================
VERBOSE = True
def log(msg):
if VERBOSE:
print(f"[DEBUG] {msg}")
# --- Wiktionary Cache & Lock (ENGLISH) ---
WIKTIONARY_REPO_ID = "cstr/en-wiktionary-sqlite-all"
WIKTIONARY_REMOTE_FILE = "en_wiktionary_normalized_all.db.gz" # Filename in Wiktionary Repo
WIKTIONARY_DB_PATH = "en_wiktionary_normalized.db" # Local extracted file
WIKTIONARY_CONN: Optional[sqlite3.Connection] = None
WIKTIONARY_CONN_LOCK = threading.Lock()
WIKTIONARY_AVAILABLE = False
# --- ConceptNet Cache & Lock ---
CONCEPTNET_CACHE: Dict[Tuple[str, str], Any] = {}
CONCEPTNET_LOCK = threading.Lock()
CONCEPTNET_CLIENT: Optional[Client] = None
CONCEPTNET_CLIENT_LOCK = threading.Lock()
# --- HanTa Tagger Cache & Lock (for EN) ---
HANTA_TAGGER_EN: Optional[HanoverTagger] = None
HANTA_TAGGER_LOCK = threading.Lock()
# --- Stanza Cache & Lock (for EN) ---
STANZA_PIPELINE_EN: Optional[stanza.Pipeline] = None
STANZA_PIPELINE_LOCK = threading.Lock()
# --- NLTK Cache & Lock (for EN) ---
NLTK_LEMMATIZER: Optional[WordNetLemmatizer] = None
NLTK_LEMMATIZER_LOCK = threading.Lock()
# --- Helper ---
def _html_wrap(content: str, line_height: str = "2.0") -> str:
return f'
{content}
'
# --- Helper for SVA (ENGLISH) ---
def _conjugate_to_person_number_en(verb_lemma: str, person: str, number: str) -> Optional[str]:
"""
Return a present tense finite form for given person/number (English).
person in {'1','2','3'}, number in {'sg','pl'}.
"""
if not PATTERN_EN_AVAILABLE:
return None
try:
p_num = int(person)
# Use the constants defined in the import block
n_num = SINGULAR if number == 'sg' else PLURAL
# Explicitly name arguments for safety across Pattern versions
return conjugate(verb_lemma, tense=PRESENT, person=p_num, number=n_num)
except Exception:
return None
# ============================================================================
# 3. SPACY ANALYZER LOGIC
# ============================================================================
# --- Globals & Config for spaCy (Updated for English focus) ---
SPACY_MODEL_INFO: Dict[str, Tuple[str, str, str]] = {
"en": ("English", "en_core_web_md", "spacy"),
"de": ("German", "de_core_news_md", "spacy"),
"es": ("Spanish", "es_core_news_md", "spacy"),
"grc-proiel-trf": ("Ancient Greek (PROIEL TRF)", "grc_proiel_trf", "grecy"),
"grc-perseus-trf": ("Ancient Greek (Perseus TRF)", "grc_perseus_trf", "grecy"),
"grc_ner_trf": ("Ancient Greek (NER TRF)", "grc_ner_trf", "grecy"),
"grc-proiel-lg": ("Ancient Greek (PROIEL LG)", "grc_proiel_lg", "grecy"),
"grc-perseus-lg": ("Ancient Greek (Perseus LG)", "grc_perseus_lg", "grecy"),
"grc-proiel-sm": ("Ancient Greek (PROIEL SM)", "grc_proiel_sm", "grecy"),
"grc-perseus-sm": ("Ancient Greek (Perseus SM)", "grc_perseus_sm", "grecy"),
}
SPACY_UI_TEXT = {
"de": {
"title": "# 🔍 Mehrsprachiger Morpho-Syntaktischer Analysator",
"subtitle": "Analysieren Sie Texte auf Deutsch, Englisch, Spanisch und Altgriechisch",
"ui_lang_label": "Benutzeroberflächensprache",
"model_lang_label": "Textsprache für Analyse",
"input_label": "Text eingeben",
"input_placeholder": "Geben Sie hier Ihren Text ein...",
"button_text": "Text analysieren",
"button_processing_text": "Verarbeitung läuft...",
"tab_graphic": "Grafische Darstellung",
"tab_table": "Tabelle",
"tab_json": "JSON",
"tab_ner": "Entitäten",
"html_label": "Abhängigkeitsparsing",
"table_label": "Morphologische Analyse",
"table_headers": ["Wort", "Lemma", "POS", "Tag", "Morphologie", "Abhängigkeit"],
"json_label": "JSON-Ausgabe",
"ner_label": "Benannte Entitäten",
"error_message": "Fehler: "
},
"en": {
"title": "# 🔍 Multilingual Morpho-Syntactic Analyzer",
"subtitle": "Analyze texts in German, English, Spanish, and Ancient Greek",
"ui_lang_label": "Interface Language",
"model_lang_label": "Text Language for Analysis",
"input_label": "Enter Text",
"input_placeholder": "Enter your text here...",
"button_text": "Analyze Text",
"button_processing_text": "Processing...",
"tab_graphic": "Graphic View",
"tab_table": "Table",
"tab_json": "JSON",
"tab_ner": "Entities",
"html_label": "Dependency Parsing",
"table_label": "Morphological Analysis",
"table_headers": ["Word", "Lemma", "POS", "Tag", "Morphology", "Dependency"],
"json_label": "JSON Output",
"ner_label": "Named Entities",
"error_message": "Error: "
},
"es": {
"title": "# 🔍 Analizador Morfo-Sintáctico Multilingüe",
"subtitle": "Analice textos en alemán, inglés, español y griego antiguo",
"ui_lang_label": "Idioma de la Interfaz",
"model_lang_label": "Idioma del Texto para Análisis",
"input_label": "Introducir Texto",
"input_placeholder": "Ingrese su texto aquí...",
"button_text": "Analizar Texto",
"button_processing_text": "Procesando...",
"tab_graphic": "Vista Gráfica",
"tab_table": "Tabla",
"tab_json": "JSON",
"tab_ner": "Entidades",
"html_label": "Análisis de Dependencias",
"table_label": "Análisis Morfológico",
"table_headers": ["Palabra", "Lema", "POS", "Etiqueta", "Morfología", "Dependencia"],
"json_label": "Salida JSON",
"ner_label": "Entidades Nombradas",
"error_message": "Error: "
}
}
SPACY_MODELS: Dict[str, Optional[spacy.Language]] = {}
# --- Dependency Installation & Model Loading ---
def spacy_install_spacy_transformers_once():
""" Installs spacy-transformers, required for all _trf models. """
marker_file = Path(".spacy_transformers_installed")
if marker_file.exists():
print("✓ spacy-transformers already installed (marker found)")
return True
print("Installing spacy-transformers (for _trf models)...")
cmd = [sys.executable, "-m", "pip", "install", "spacy-transformers"]
try:
subprocess.run(cmd, capture_output=True, text=True, check=True, timeout=900)
print("✓ Successfully installed spacy-transformers")
marker_file.touch()
return True
except Exception as e:
print(f"✗ FAILED to install spacy-transformers: {e}")
return False
def spacy_install_grecy_model_from_github(model_name: str) -> bool:
""" Installs a greCy model from GitHub Release. """
marker_file = Path(f".{model_name}_installed")
if marker_file.exists():
print(f"✓ {model_name} already installed (marker found)")
return True
print(f"Installing grecy model: {model_name}...")
if model_name == "grc_proiel_trf":
wheel_filename = "grc_proiel_trf-3.7.5-py3-none-any.whl"
elif model_name in ["grc_perseus_trf", "grc_proiel_lg", "grc_perseus_lg",
"grc_proiel_sm", "grc_perseus_sm", "grc_ner_trf"]:
wheel_filename = f"{model_name}-0.0.0-py3-none-any.whl"
else:
print(f"✗ Unknown grecy model: {model_name}")
return False
install_url = f"https://github.com/CrispStrobe/greCy/releases/download/v1.0-models/{wheel_filename}"
cmd = [sys.executable, "-m", "pip", "install", install_url, "--no-deps"]
print(f"Running: {' '.join(cmd)}")
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True, timeout=900)
if result.stdout: print("STDOUT:", result.stdout)
if result.stderr: print("STDERR:", result.stderr)
print(f"✓ Successfully installed {model_name} from GitHub")
marker_file.touch()
return True
except subprocess.CalledProcessError as e:
print(f"✗ Installation subprocess FAILED with code {e.returncode}")
print("STDOUT:", e.stdout)
print("STDERR:", e.stderr)
return False
except Exception as e:
print(f"✗ Installation exception: {e}")
traceback.print_exc()
return False
def spacy_load_spacy_model(model_name: str) -> Optional[spacy.Language]:
"""Load or install a standard spaCy model."""
try:
return spacy.load(model_name)
except OSError:
print(f"Installing {model_name}...")
try:
subprocess.check_call([sys.executable, "-m", "spacy", "download", model_name])
return spacy.load(model_name)
except Exception as e:
print(f"✗ Failed to install {model_name}: {e}")
return None
def spacy_load_grecy_model(model_name: str) -> Optional[spacy.Language]:
""" Load a grecy model, installing from GitHub if needed. """
if not spacy_install_grecy_model_from_github(model_name):
print(f"✗ Cannot load {model_name} because installation failed.")
return None
try:
print("Refreshing importlib to find new package...")
importlib.invalidate_caches()
try: importlib.reload(site)
except Exception: pass
print(f"Trying: spacy.load('{model_name}')")
nlp = spacy.load(model_name)
print(f"✓ Successfully loaded {model_name}")
return nlp
except Exception as e:
print(f"✗ Model {model_name} is installed but FAILED to load.")
print(f" Error: {e}")
traceback.print_exc()
return None
def spacy_initialize_models():
""" Pre-load standard models and ensure _trf dependencies are ready. """
print("\n" + "="*70)
print("INITIALIZING SPACY MODELS")
print("="*70 + "\n")
spacy_install_spacy_transformers_once()
loaded_count = 0
spacy_model_count = 0
for lang_code, (lang_name, model_name, model_type) in SPACY_MODEL_INFO.items():
if model_type == "spacy":
spacy_model_count += 1
print(f"Loading {lang_name} ({model_name})...")
nlp = spacy_load_spacy_model(model_name)
SPACY_MODELS[lang_code] = nlp
if nlp:
print(f"✓ {lang_name} ready\n")
loaded_count += 1
else:
print(f"✗ {lang_name} FAILED\n")
else:
print(f"✓ {lang_name} ({model_name}) will be loaded on first use.\n")
SPACY_MODELS[lang_code] = None
print(f"Pre-loaded {loaded_count}/{spacy_model_count} standard models.")
print("="*70 + "\n")
def spacy_get_analysis(ui_lang: str, model_lang_key: str, text: str):
"""Analyze text and return results."""
ui_config = SPACY_UI_TEXT.get(ui_lang.lower(), SPACY_UI_TEXT["en"])
error_prefix = ui_config.get("error_message", "Error: ")
try:
if not text.strip():
return ([], [], "No text provided.
", "No text provided.
",
gr.Button(value=ui_config.get("button_text", "Analyze"), interactive=True))
nlp = SPACY_MODELS.get(model_lang_key)
if nlp is None:
# Try loading one last time
if model_lang_key in SPACY_MODEL_INFO:
_, model_name, model_type = SPACY_MODEL_INFO[model_lang_key]
if model_type == 'grecy': nlp = spacy_load_grecy_model(model_name)
else: nlp = spacy_load_spacy_model(model_name)
SPACY_MODELS[model_lang_key] = nlp
if nlp is None:
return ([], {"error": "Model load failed"}, "Error", "Error", gr.Button(interactive=True))
doc = nlp(text)
dataframe_output = []
json_output = []
for token in doc:
lemma_str = token.lemma_
morph_str = str(token.morph) if token.morph else ''
dep_str = token.dep_ if doc.is_parsed else ''
tag_str = token.tag_ or ''
pos_str = token.pos_ or ''
json_output.append({
"word": token.text, "lemma": lemma_str, "pos": pos_str,
"tag": tag_str, "morphology": morph_str, "dependency": dep_str,
"is_stopword": token.is_stop
})
dataframe_output.append([token.text, lemma_str, pos_str, tag_str, morph_str, dep_str])
html_dep_out = ""
if "parser" in nlp.pipe_names and doc.is_parsed:
try:
options = {"compact": True, "bg": "#ffffff", "color": "#000000", "font": "Source Sans Pro"}
html_svg = displacy.render(doc, style="dep", jupyter=False, options=options)
html_dep_out = _html_wrap(html_svg, line_height="2.5")
except Exception as e:
html_dep_out = f"Visualization error: {e}
"
html_ner_out = ""
if "ner" in nlp.pipe_names:
if doc.ents:
try:
html_ner = displacy.render(doc, style="ent", jupyter=False)
html_ner_out = _html_wrap(html_ner, line_height="2.5")
except Exception: html_ner_out = "Error rendering NER
"
else: html_ner_out = "No entities found.
"
return (dataframe_output, json_output, html_dep_out, html_ner_out,
gr.Button(value=ui_config.get("button_text", "Analyze"), interactive=True))
except Exception as e:
traceback.print_exc()
error_html = f"{error_prefix} {str(e)}
"
return ([], {"error": str(e)}, error_html, error_html, gr.Button(interactive=True))
def spacy_update_ui(ui_lang: str):
"""Update UI language for the spaCy tab."""
# Placeholder - actual implementation would update labels
return [gr.update()] * 14
# ============================================================================
# 4. GRAMMAR CHECKER LOGIC (LanguageTool Only)
# ============================================================================
# --- Globals for LanguageTool ---
LT_TOOL_INSTANCES: Dict[str, Optional[language_tool_python.LanguageTool]] = {}
LT_TOOL_LOCK = threading.Lock()
def lt_get_language_tool(lang: str = 'en') -> Optional[language_tool_python.LanguageTool]:
""" Thread-safe function to get a LanguageTool instance for a specific language. """
global LT_TOOL_INSTANCES
if not LT_AVAILABLE:
raise ImportError("language-tool-python library is not installed.")
lang_code = 'en-US' if lang == 'en' else 'de-DE'
if lang_code in LT_TOOL_INSTANCES:
return LT_TOOL_INSTANCES[lang_code]
with LT_TOOL_LOCK:
if lang_code in LT_TOOL_INSTANCES:
return LT_TOOL_INSTANCES[lang_code]
try:
print(f"Initializing LanguageTool for {lang_code}...")
tool = language_tool_python.LanguageTool(lang_code)
_ = tool.check("This is a test.") if lang == 'en' else tool.check("Dies ist ein Test.")
print(f"LanguageTool ({lang_code}) initialized successfully.")
LT_TOOL_INSTANCES[lang_code] = tool
return tool
except Exception as e:
print(f"CRITICAL ERROR: Failed to initialize LanguageTool for {lang_code}: {e}")
return None
def lt_check_grammar(text: str, lang: str = 'en') -> List[Dict[str, Any]]:
""" Checks text for grammar errors and returns a JSON list. """
try:
tool = lt_get_language_tool(lang)
if tool is None:
return [{"error": f"LanguageTool service for '{lang}' failed to initialize."}]
if not text or not text.strip():
return [{"info": "No text provided to check."}]
matches = tool.check(text)
if not matches:
return [{"info": "No errors found!", "status": "perfect"}]
errors_list = []
for match in matches:
error = {
"message": match.message,
"rule_id": match.ruleId,
"category": getattr(match.category, 'name', match.category),
"incorrect_text": text[match.offset : match.offset + match.errorLength],
"replacements": match.replacements,
"offset": match.offset,
"length": match.errorLength,
}
errors_list.append(error)
return errors_list
except Exception as e:
traceback.print_exc()
return [{"error": f"An unexpected error occurred: {str(e)}"}]
# ============================================================================
# 5. WORDNET THESAURUS LOGIC (OEWN)
# ============================================================================
# --- Globals & Classes for WordNet ---
@dataclass
class WordNetWorkItem:
word: str
lang: str
response_queue: queue.Queue
class WordNetWorkerState(Enum):
NOT_STARTED = 1
INITIALIZING = 2
READY = 3
ERROR = 4
wordnet_worker_state = WordNetWorkerState.NOT_STARTED
wordnet_worker_thread = None
wordnet_work_queue = queue.Queue()
wordnet_en_instance = None # For OEWN
# --- Worker Thread Logic (Adapted for OEWN) ---
def wordnet_download_data():
"""Download WordNet data. Called once by worker thread."""
if not WN_AVAILABLE:
print("[WordNet Worker] 'wn' library not available. Skipping download.")
return False
try:
print("[WordNet Worker] Downloading WordNet data...")
# --- OEWN REPLACEMENT ---
try:
wn.download('oewn') # Open English WordNet
print("✓ Downloaded OEWN")
except Exception as e:
print(f"[WordNet Worker] Note: oewn download: {e}")
# --- END REPLACEMENT ---
try:
wn.download('cili:1.0')
except Exception as e:
print(f"[WordNet Worker] Note: cili download: {e}")
print("[WordNet Worker] ✓ WordNet data ready")
return True
except Exception as e:
print(f"[WordNet Worker] ✗ Failed to download WordNet data: {e}")
return False
def wordnet_worker_loop():
""" Worker thread main loop. """
global wordnet_worker_state, wordnet_en_instance
if not WN_AVAILABLE:
wordnet_worker_state = WordNetWorkerState.ERROR
return
try:
print("[WordNet Worker] Starting worker thread...")
wordnet_worker_state = WordNetWorkerState.INITIALIZING
if not wordnet_download_data():
wordnet_worker_state = WordNetWorkerState.ERROR
return
print("[WordNet Worker] Creating WordNet instances...")
# --- OEWN REPLACEMENT ---
wordnet_en_instance = wn.Wordnet('oewn')
print("✓ Loaded OEWN (English)")
# --- END REPLACEMENT ---
wordnet_worker_state = WordNetWorkerState.READY
print("[WordNet Worker] Ready to process requests")
while True:
try:
item: WordNetWorkItem = wordnet_work_queue.get(timeout=1)
try:
if item.lang == 'en':
wn_instance = wordnet_en_instance
else:
# This app is English-only, but we keep the structure
raise Exception(f"Language '{item.lang}' not supported by this worker.")
if wn_instance is None:
raise Exception(f"WordNet instance for '{item.lang}' is not loaded.")
result = wordnet_process_word_lookup(item.word, wn_instance)
item.response_queue.put(("success", result))
except Exception as e:
traceback.print_exc()
item.response_queue.put(("error", str(e)))
finally:
wordnet_work_queue.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"[WordNet Worker] Fatal error: {e}")
traceback.print_exc()
wordnet_worker_state = WordNetWorkerState.ERROR
def wordnet_process_word_lookup(word: str, wn_instance: wn.Wordnet) -> Dict[str, Any]:
""" Process a single word lookup. Runs in the worker thread. """
if not word or not word.strip():
return {"info": "No word provided to check."}
word = word.strip().lower()
senses = wn_instance.senses(word)
if not senses:
return {"info": f"The word '{word}' was not found in the thesaurus."}
results: Dict[str, Any] = {"input_word": word, "senses": []}
for sense in senses:
synset = sense.synset()
def get_lemmas(synsets, remove_self=False):
lemmas: Set[str] = set()
for s in synsets:
for lemma in s.lemmas():
if not (remove_self and lemma == word):
lemmas.add(lemma)
return sorted(list(lemmas))
antonym_words: Set[str] = set()
try:
for ant_sense in sense.get_related('antonym'):
antonym_words.add(ant_sense.word().lemma())
except Exception:
pass
sense_info = {
"pos": synset.pos,
"definition": synset.definition() or "No definition available.",
"synonyms": get_lemmas([synset], remove_self=True),
"antonyms": sorted(list(antonym_words)),
"hypernyms (is a type of)": get_lemmas(synset.hypernyms()),
"hyponyms (examples are)": get_lemmas(synset.hyponyms()),
"holonyms (is part of)": get_lemmas(synset.holonyms()),
"meronyms (has parts)": get_lemmas(synset.meronyms()),
}
results["senses"].append(sense_info)
return results
def wordnet_start_worker():
"""Start the worker thread if not already started."""
global wordnet_worker_thread, wordnet_worker_state
if wordnet_worker_state != WordNetWorkerState.NOT_STARTED:
return
if not WN_AVAILABLE:
wordnet_worker_state = WordNetWorkerState.ERROR
return
wordnet_worker_thread = threading.Thread(target=wordnet_worker_loop, daemon=True, name="WordNetWorker")
wordnet_worker_thread.start()
timeout = 30
for _ in range(timeout * 10):
if wordnet_worker_state in (WordNetWorkerState.READY, WordNetWorkerState.ERROR):
break
threading.Event().wait(0.1)
if wordnet_worker_state != WordNetWorkerState.READY:
raise Exception("OdeNet Worker failed to initialize")
# --- Public API (Adapted) ---
def wordnet_get_thesaurus_info(word: str, lang: str = 'en') -> Dict[str, Any]:
""" Public API: Finds thesaurus info. Thread-safe. """
if not WN_AVAILABLE:
return {"error": "WordNet (wn) library is not available."}
if wordnet_worker_state != WordNetWorkerState.READY:
return {"error": "WordNet service is not ready. Please try again."}
try:
response_queue = queue.Queue()
item = WordNetWorkItem(word=word, lang=lang, response_queue=response_queue) # <-- Pass lang
wordnet_work_queue.put(item)
try:
status, result = response_queue.get(timeout=30)
if status == "success":
return result
else:
return {"error": f"Lookup failed: {result}"}
except queue.Empty:
return {"error": "Request timed out"}
except Exception as e:
traceback.print_exc()
return {"error": f"An unexpected error occurred: {str(e)}"}
# ============================================================================
# 6. PATTERN INFLECTION LOGIC (pattern.en)
# ============================================================================
def pattern_is_good_analysis(analysis, analysis_type):
"""Check if an analysis has meaningful data."""
if not analysis: return False
if analysis_type == 'noun':
return 'plural' in analysis and analysis['plural'] != analysis['singular']
elif analysis_type == 'verb':
present = analysis.get('conjugation', {}).get('Present', {})
if len(present) < 3: return False
return True
elif analysis_type == 'adjective':
return 'comparative' in analysis or 'superlative' in analysis
return False
def pattern_analyze_as_noun_en(word: str, hint_lemma: str = None) -> Dict[str, Any]:
"""Comprehensive noun inflection analysis for English."""
log(f" Analyzing as noun (hint_lemma={hint_lemma})")
if not PATTERN_EN_AVAILABLE: return {'error': 'pattern.en not available'}
# 1. Determine Singular/Plural base
# If the word is already plural, singularize it to get the lemma
try:
singular_form = singularize(word)
plural_form = pluralize(singular_form)
except Exception as e:
return {'error': f'Inflection failed: {e}'}
# 2. Get Indefinite Article (a/an)
try:
art = article(singular_form)
art_str = f"{art} {singular_form}"
except Exception:
art_str = f"a/an {singular_form}"
analysis = {
"base_form": singular_form,
"singular": singular_form,
"plural": plural_form,
"article": art_str,
"declension": {
"Singular": {"form": singular_form},
"Plural": {"form": plural_form}
},
"gender": "N/A" # English nouns strictly do not have grammatical gender
}
return analysis
def pattern_analyze_as_verb_en(word: str, hint_lemma: str = None) -> Dict[str, Any]:
"""Comprehensive verb conjugation analysis for English."""
log(f" Analyzing as verb (hint_lemma={hint_lemma})")
if not PATTERN_EN_AVAILABLE: return {'error': 'pattern.en not available'}
# 1. Get Lemma
try:
verb_lemma = lemma(word)
except:
verb_lemma = word
analysis = {"infinitive": verb_lemma}
# 2. Get Lexeme (List of all forms)
try:
# lexeme returns: [infinitive, 3sg, present_participle, past, past_participle]
# e.g., be => ['be', 'is', 'being', 'was', 'been']
forms = lexeme(verb_lemma)
analysis["lexeme"] = forms
except Exception as e:
log(f" Failed to get lexeme: {e}")
analysis["lexeme"] = []
# 3. Conjugation Table
analysis["conjugation"] = {}
try:
# Present Tense
analysis["conjugation"]["Present"] = {
"I (1sg)": conjugate(verb_lemma, tense=PRESENT, person=1, number=SINGULAR),
"you (2sg)": conjugate(verb_lemma, tense=PRESENT, person=2, number=SINGULAR),
"he/she (3sg)": conjugate(verb_lemma, tense=PRESENT, person=3, number=SINGULAR),
"we (1pl)": conjugate(verb_lemma, tense=PRESENT, person=1, number=PLURAL),
"you (2pl)": conjugate(verb_lemma, tense=PRESENT, person=2, number=PLURAL),
"they (3pl)": conjugate(verb_lemma, tense=PRESENT, person=3, number=PLURAL),
}
# Past Tense (Pattern usually handles simple past variations)
analysis["conjugation"]["Past"] = {
"I (1sg)": conjugate(verb_lemma, tense=PAST, person=1, number=SINGULAR),
"he/she (3sg)": conjugate(verb_lemma, tense=PAST, person=3, number=SINGULAR),
"General": conjugate(verb_lemma, tense=PAST) # For regular verbs where all are same
}
# Participles
analysis["participles"] = {
"Present Participle (gerund)": conjugate(verb_lemma, tense=PRESENT, aspect="progressive"), # or aspect=PROGRESSIVE
"Past Participle": conjugate(verb_lemma, tense=PAST, aspect="perfective") # or use PARTICIPLE constant
}
except Exception as e:
log(f" Failed to conjugate: {e}")
return analysis
def pattern_analyze_as_adjective_en(word: str, hint_lemma: str = None) -> Dict[str, Any]:
"""Comprehensive adjective inflection analysis for English."""
log(f" Analyzing as adjective (hint_lemma={hint_lemma})")
if not PATTERN_EN_AVAILABLE: return {'error': 'pattern.en not available'}
try:
# If the word is comparative/superlative, try to get the base (predicative)
# Note: Pattern doesn't have a strong 'un-grade' function, so we rely on lemma if available
# or assumes input is the base.
base = word
except Exception:
base = word
analysis = {}
analysis["predicative"] = base
try:
comp = comparative(base)
sup = superlative(base)
analysis["comparative"] = comp
analysis["superlative"] = sup
analysis["grading"] = {
"Positive": base,
"Comparative": comp,
"Superlative": sup
}
except Exception as e:
log(f" Failed to get comparison: {e}")
analysis["grading"] = {"error": "Could not grade adjective"}
return analysis
# --- Public API (Adapted) ---
def pattern_get_all_inflections(word: str, lang: str = 'en') -> Dict[str, Any]:
"""
Generates ALL possible inflections for an English word.
"""
if lang != 'en' or not PATTERN_EN_AVAILABLE:
return {"error": "`pattern.en` library not available or lang not 'en'."}
word = word.strip()
log(f"ANALYZING (EN): {word}")
analyses: Dict[str, Any] = {}
try:
noun_analysis = pattern_analyze_as_noun_en(word)
if noun_analysis and not noun_analysis.get("error"):
analyses["noun"] = noun_analysis
verb_analysis = pattern_analyze_as_verb_en(word)
if verb_analysis and not verb_analysis.get("error"):
analyses["verb"] = verb_analysis
adj_analysis = pattern_analyze_as_adjective_en(word)
if adj_analysis and not adj_analysis.get("error"):
analyses["adjective"] = adj_analysis
except Exception as e:
return {"error": f"An unexpected error occurred: {str(e)}"}
results: Dict[str, Any] = {
"input_word": word,
"analyses": analyses
}
if not results["analyses"]:
results["info"] = "Word could not be analyzed as noun, verb, or adjective."
return results
def word_appears_in_inflections_en(word: str, inflections: Dict[str, Any], pos_type: str) -> bool:
"""
Check if the input word appears in the English inflection forms.
"""
word_lower = word.lower()
actual_forms = set()
if pos_type == 'noun':
actual_forms.add(inflections.get('singular', '').lower())
actual_forms.add(inflections.get('plural', '').lower())
elif pos_type == 'verb':
conjugation = inflections.get('conjugation', {})
for tense_data in conjugation.values():
if isinstance(tense_data, dict): actual_forms.update(v.lower() for v in tense_data.values())
participles = inflections.get('participles', {})
actual_forms.update(v.lower() for v in participles.values())
actual_forms.update(f.lower() for f in inflections.get('lexeme', []))
actual_forms.add(inflections.get('infinitive', '').lower())
elif pos_type == 'adjective':
actual_forms.add(inflections.get('predicative', '').lower())
actual_forms.add(inflections.get('comparative', '').lower())
actual_forms.add(inflections.get('superlative', '').lower())
elif pos_type == 'adverb':
return True # Adverbs are non-inflecting, always valid
if word_lower in actual_forms:
log(f" ✓ Word '{word}' was found in the {pos_type} inflection table.")
return True
log(f" ✗ Word '{word}' not found in any {pos_type} inflection forms.")
return False
# ============================================================================
# 6b. CONCEPTNET & OPENBLP LOGIC
# ============================================================================
def get_conceptnet_client() -> Optional[Client]:
""" Thread-safe function to get a single instance of the Gradio Client. """
global CONCEPTNET_CLIENT
if not GRADIO_CLIENT_AVAILABLE:
return None
if CONCEPTNET_CLIENT:
return CONCEPTNET_CLIENT
with CONCEPTNET_CLIENT_LOCK:
if CONCEPTNET_CLIENT:
return CONCEPTNET_CLIENT
try:
print("Initializing Gradio Client for ConceptNet...")
client = Client("cstr/conceptnet_normalized")
print("✓ Gradio Client for ConceptNet initialized.")
CONCEPTNET_CLIENT = client
return CONCEPTNET_CLIENT
except Exception as e:
print(f"✗ CRITICAL: Failed to initialize ConceptNet Gradio Client: {e}")
return None
def conceptnet_get_relations(word: str, language: str = 'en') -> Dict[str, Any]:
"""
Fetches relations from the cstr/conceptnet_normalized Gradio API.
"""
if not GRADIO_CLIENT_AVAILABLE:
return {"error": "`gradio_client` library is not installed."}
if not word or not word.strip():
return {"info": "No word provided."}
word_lower = word.strip().lower()
cache_key = (word_lower, language)
with CONCEPTNET_LOCK:
if cache_key in CONCEPTNET_CACHE:
log(f"ConceptNet: Found '{word_lower}' in cache.")
return CONCEPTNET_CACHE[cache_key]
log(f"ConceptNet: Fetching '{word_lower}' from Gradio API...")
try:
client = get_conceptnet_client()
if not client:
return {"error": "ConceptNet Gradio Client is not available."}
selected_relations = ["RelatedTo", "IsA", "PartOf", "HasA", "UsedFor", "CapableOf", "AtLocation", "Synonym", "Antonym", "Causes", "HasProperty", "MadeOf", "HasSubevent", "DerivedFrom", "SimilarTo"]
result_markdown = client.predict(
word=word_lower,
lang=language,
selected_relations=selected_relations,
api_name="/get_semantic_profile"
)
relations_list = []
if not isinstance(result_markdown, str):
raise TypeError(f"ConceptNet API returned type {type(result_markdown)}, expected str.")
lines = result_markdown.split('\n')
current_relation = None
line_pattern = None
for line in lines:
line = line.strip()
if not line: continue
if line.startswith('## '):
current_relation = line[3:].strip()
if current_relation:
line_pattern = re.compile(
r"-\s*(.+?)\s+(%s)\s+→\s+(.+?)\s+\`\[([\d.]+)\]\`" % re.escape(current_relation)
)
continue
if line.startswith('- ') and current_relation and line_pattern:
match = line_pattern.search(line)
if not match: continue
try:
node1 = match.group(1).strip().strip('*')
relation = match.group(2)
node2 = match.group(3).strip().strip('*')
weight = float(match.group(4))
other_node, direction = None, None
if node1.lower() == word_lower and node2.lower() != word_lower:
other_node, direction = node2, "->"
elif node2.lower() == word_lower and node1.lower() != word_lower:
other_node, direction = node1, "<-"
else:
continue
relations_list.append({
"relation": relation, "direction": direction, "other_node": other_node,
"other_lang": language, "weight": weight,
"surface": f"{node1} {relation} {node2}"
})
except Exception as e:
log(f"ConceptNet Parser: Error parsing line '{line}': {e}")
if not relations_list:
final_result = {"info": f"No valid relations found for '{word_lower}'."}
else:
relations_list.sort(key=lambda x: x.get('weight', 0.0), reverse=True)
final_result = {"relations": relations_list}
with CONCEPTNET_LOCK:
CONCEPTNET_CACHE[cache_key] = final_result
log(f"ConceptNet: Returning {len(relations_list)} relations for '{word_lower}'")
return final_result
except Exception as e:
error_msg = f"ConceptNet Gradio API request failed: {type(e).__name__} - {e}"
return {"error": error_msg}
# --- OpenBLP Stub ---
def openblp_get_relations(lemma: str) -> List[Dict[str, Any]]:
"""
Stub function to query OpenBLP.
Replace this with your actual OpenBLP database/API query.
"""
# Placeholder logic
if lemma == "dog":
return [
{"relation": "HasProperty", "other_node": "loyal", "weight": 0.9, "source": "openblp"},
{"relation": "IsA", "other_node": "animal", "weight": 1.0, "source": "openblp"}
]
if lemma == "cat":
return [
{"relation": "HasProperty", "other_node": "independent", "weight": 0.8, "source": "openblp"}
]
return []
# ============================================================================
# 6c. NEW: HANTA (EN) INITIALIZER & ENGINE
# ============================================================================
def hanta_get_tagger_en() -> Optional[HanoverTagger]:
""" Thread-safe function to get the ENGLISH HanTa Tagger. """
global HANTA_TAGGER_EN
if not HANTA_AVAILABLE:
raise ImportError("HanTa library is not installed.")
if HANTA_TAGGER_EN:
return HANTA_TAGGER_EN
with HANTA_TAGGER_LOCK:
if HANTA_TAGGER_EN:
return HANTA_TAGGER_EN
try:
print("Initializing HanTa Tagger (English)...")
PACKAGE_DIR = os.path.dirname(HanTa.HanoverTagger.__file__)
MODEL_PATH = os.path.join(PACKAGE_DIR, 'morphmodel_en.pgz')
if not os.path.exists(MODEL_PATH):
raise FileNotFoundError(f"HanTa English model not found at {MODEL_PATH}")
tagger = HanoverTagger(MODEL_PATH)
_ = tagger.analyze("Test") # Warm-up call
print("✓ HanTa Tagger (English) initialized successfully.")
HANTA_TAGGER_EN = tagger
return HANTA_TAGGER_EN
except Exception as e:
print(f"CRITICAL ERROR: Failed to initialize HanTa (EN) Tagger: {e}")
return None
def _hanta_pos_to_key(hanta_pos: str) -> Optional[str]:
""" Maps HanTa's complex POS tags to simple keys. """
if hanta_pos.startswith('N'): return "noun"
if hanta_pos.startswith('VV'): return "verb"
if hanta_pos.startswith('ADJ'): return "adjective"
if hanta_pos == 'ADV': return "adverb"
return None
def _analyze_word_with_hanta_en(word: str, top_n: int) -> Dict[str, Any]:
""" (FALLBACK ENGINE 1) Analyzes a single word using HanTa (EN). """
if not HANTA_AVAILABLE: return {}
print(f"\n[Word Encyclopedia] Running HanTa (EN) fallback for: \"{word}\"")
final_result = {"input_word": word, "analysis": {}}
try:
tagger = hanta_get_tagger_en()
if not tagger: return {}
possible_tags = tagger.tag_word(word.lower())
possible_tags.extend(tagger.tag_word(word.capitalize()))
processed_lemmas_pos: Set[Tuple[str, str]] = set()
for hanta_pos, _ in possible_tags:
pos_key = _hanta_pos_to_key(hanta_pos)
if not pos_key: continue
raw_analysis = tagger.analyze(word.lower() if pos_key != 'noun' else word.capitalize())
lemma = raw_analysis[0] # The lemma
if (lemma, pos_key) in processed_lemmas_pos:
continue
processed_lemmas_pos.add((lemma, pos_key))
log(f"--- Analyzing HanTa (EN) path: lemma='{lemma}', pos='{pos_key}' ---")
pattern_block = {}
if PATTERN_EN_AVAILABLE:
if pos_key == "noun": pattern_block = pattern_analyze_as_noun_en(lemma)
elif pos_key == "verb": pattern_block = pattern_analyze_as_verb_en(lemma)
elif pos_key == "adjective": pattern_block = pattern_analyze_as_adjective_en(lemma)
elif pos_key == "adverb": pattern_block = {"base_form": lemma, "info": "Adverbs are non-inflecting."}
semantics_block = _build_semantics_block_for_lemma(lemma, pos_key, top_n, 'en')
pos_entry_report = {
"hanta_analysis": {
"lemma": lemma,
"pos_tag": hanta_pos,
"analysis_string": str(raw_analysis),
"source": "hanta_en"
},
"inflections_pattern": pattern_block,
"semantics_combined": semantics_block
}
if word_appears_in_inflections_en(word, pattern_block, pos_key):
if pos_key not in final_result["analysis"]:
final_result["analysis"][pos_key] = []
final_result["analysis"][pos_key].append(pos_entry_report)
else:
log(f" ✗ HanTa (EN) path {lemma}/{pos_key} REJECTED by validation.")
if not final_result["analysis"]: return {}
final_result["info"] = "Analysis from HanTa (EN) (Fallback 1)."
return final_result
except Exception as e:
log(f"HanTa (EN) Engine FAILED: {e}")
traceback.print_exc()
return {}
# ============================================================================
# 6d. WIKTIONARY DATABASE LOGIC (EN)
# ============================================================================
def wiktionary_download_db() -> bool:
""" Downloads the compressed English Wiktionary DB and extracts it. """
global WIKTIONARY_AVAILABLE
# Check if the extracted DB already exists
if os.path.exists(WIKTIONARY_DB_PATH):
print(f"✓ English Wiktionary DB '{WIKTIONARY_DB_PATH}' already exists.")
WIKTIONARY_AVAILABLE = True
return True
print(f"English Wiktionary DB not found. Downloading '{WIKTIONARY_REMOTE_FILE}' from '{WIKTIONARY_REPO_ID}'...")
try:
# 1. Download the .gz file
downloaded_gz_path = hf_hub_download(
repo_id=WIKTIONARY_REPO_ID,
filename=WIKTIONARY_REMOTE_FILE,
repo_type="dataset",
local_dir="."
# Removed deprecated `local_dir_use_symlinks`
)
# 2. Decompress the .gz file to the .db file
print(f"Downloading complete. Extracting '{downloaded_gz_path}' to '{WIKTIONARY_DB_PATH}'...")
with gzip.open(downloaded_gz_path, 'rb') as f_in:
with open(WIKTIONARY_DB_PATH, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
# Optional: Cleanup the .gz file to save space
try:
os.remove(downloaded_gz_path)
except OSError:
pass
print(f"✓ English Wiktionary DB downloaded and extracted successfully.")
WIKTIONARY_AVAILABLE = True
return True
except Exception as e:
print(f"✗ CRITICAL: Failed to download/extract English Wiktionary DB: {e}")
# traceback.print_exc() # Uncomment for deep debugging
return False
def wiktionary_run_startup_diagnostics():
""" Runs critical checks on the DB structure and content at startup. """
print("\n" + "="*50)
print("RUNNING WIKTIONARY DB DIAGNOSTICS")
print("="*50)
conn = wiktionary_get_connection()
if not conn:
print("✗ Diagnostics aborted: No DB connection.")
return
try:
# 1. Check Table Structure
print("[1] Checking Tables...")
tables = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
table_names = [t['name'] for t in tables]
print(f" Found tables: {table_names}")
if 'entries' not in table_names:
print("CRITICAL ERROR: 'entries' table missing!")
return
# 2. Check Language Encoding (The likely cause of your empty results)
print("\n[2] Checking Language Format (Top 5)...")
langs = conn.execute("SELECT lang, COUNT(*) as c FROM entries GROUP BY lang ORDER BY c DESC LIMIT 5").fetchall()
for row in langs:
print(f" - '{row['lang']}': {row['c']} entries")
# 3. Check Specific 'Missing' Words
test_words = ["ready", "runner", "run", "house"]
print(f"\n[3] Probing missing words: {test_words}")
for word in test_words:
# Check exact match raw
raw = conn.execute("SELECT count(*) as c FROM entries WHERE word = ?", (word,)).fetchone()
print(f" - '{word}' (Raw check): Found {raw['c']} rows")
if raw['c'] == 0:
# Check case insensitive
nocase = conn.execute("SELECT word FROM entries WHERE word LIKE ? LIMIT 1", (word,)).fetchone()
if nocase:
print(f" ! WARNING: '{word}' not found exactly, but found '{nocase['word']}' (Case mismatch?)")
else:
print(f" ! CRITICAL: '{word}' does not exist in DB at all.")
except Exception as e:
print(f"✗ Diagnostics crashed: {e}")
traceback.print_exc()
print("="*50 + "\n")
def wiktionary_get_connection() -> Optional[sqlite3.Connection]:
""" Thread-safe function to get a single, read-only SQLite connection. """
global WIKTIONARY_CONN, WIKTIONARY_AVAILABLE
if not WIKTIONARY_AVAILABLE:
log("Wiktionary DB is not available, cannot create connection.")
return None
if WIKTIONARY_CONN:
return WIKTIONARY_CONN
with WIKTIONARY_CONN_LOCK:
if WIKTIONARY_CONN:
return WIKTIONARY_CONN
if not os.path.exists(WIKTIONARY_DB_PATH):
log("Wiktionary DB file missing, connection failed.")
WIKTIONARY_AVAILABLE = False
return None
try:
log("Creating new read-only connection to Wiktionary DB...")
db_uri = f"file:{WIKTIONARY_DB_PATH}?mode=ro"
conn = sqlite3.connect(db_uri, uri=True, check_same_thread=False)
conn.row_factory = sqlite3.Row
_ = conn.execute("SELECT name FROM sqlite_master WHERE type='table' LIMIT 1").fetchone()
print("✓ Wiktionary DB connection successful.")
WIKTIONARY_CONN = conn
return WIKTIONARY_CONN
except Exception as e:
print(f"✗ CRITICAL: Failed to connect to Wiktionary DB: {e}")
WIKTIONARY_AVAILABLE = False
return None
def _wiktionary_map_pos_key(wikt_pos: Optional[str]) -> str:
"""Maps Wiktionary POS tags to our internal keys."""
if not wikt_pos: return "unknown"
if wikt_pos == "noun": return "noun"
if wikt_pos == "verb": return "verb"
if wikt_pos == "adj": return "adjective"
if wikt_pos == "adv": return "adverb"
return wikt_pos
def _wiktionary_build_report_for_entry(entry_id: int, conn: sqlite3.Connection) -> Dict[str, Any]:
""" (REVISED FOR FULL DB V3) Fetches ALL data for a single entry_id. """
report = {}
entry_data = conn.execute(
"SELECT word, title, redirect, pos, pos_title, lang, etymology_text FROM entries WHERE id = ?", (entry_id,)
).fetchone()
if not entry_data:
return {"error": "Entry ID not found"}
report.update(dict(entry_data))
report["entry_id"] = entry_id
report["lemma"] = entry_data["word"]
senses_q = conn.execute(
"""
SELECT
s.id as sense_id, s.sense_index,
(SELECT GROUP_CONCAT(g.gloss_text, '; ') FROM glosses g WHERE g.sense_id = s.id) as glosses,
(SELECT GROUP_CONCAT(t.tag, ', ') FROM sense_tags st JOIN tags t ON st.tag_id = t.id WHERE st.sense_id = s.id) as tags,
(SELECT GROUP_CONCAT(top.topic, ', ') FROM sense_topics stop JOIN topics top ON stop.topic_id = top.id WHERE stop.sense_id = s.id) as topics
FROM senses s
WHERE s.entry_id = ? ORDER BY s.id
""", (entry_id,)
).fetchall()
senses_list = []
for sense_row in senses_q:
sense_dict = dict(sense_row)
sense_id = sense_dict["sense_id"]
examples_q = conn.execute(
"SELECT text, ref FROM examples WHERE sense_id = ?", (sense_id,)
).fetchall()
sense_dict["examples"] = [dict(ex) for ex in examples_q]
senses_list.append(sense_dict)
report["senses"] = senses_list
forms_q = conn.execute(
"""
SELECT f.form_text, f.sense_index,
(SELECT GROUP_CONCAT(t.tag, ', ') FROM form_tags ft JOIN tags t ON ft.tag_id = t.id WHERE ft.form_id = f.id) as tags
FROM forms f
WHERE f.entry_id = ? GROUP BY f.id ORDER BY f.id
""", (entry_id,)
).fetchall()
report["forms"] = [dict(f) for f in forms_q]
return report
def _wiktionary_find_all_entries(word: str, conn: sqlite3.Connection) -> List[Dict[str, Any]]:
""" Finds entries with verbose debugging if lookup fails. """
log(f"Wiktionary (EN): Querying for '{word}'...")
found_entry_ids: Set[int] = set()
lang_query = 'English'
form_titles = ("Inflected form", "verb form", "noun form", "adjective form", "Comparative", "Superlative")
# Search variants: input, lowercase, title-case
search_variants = list(set([word, word.lower(), word.title()]))
placeholders = ', '.join('?' for _ in search_variants)
# 1. Search Lemmatized Entries
sql_lemma = f"SELECT id, pos_title, word FROM entries WHERE word IN ({placeholders}) AND lang = ?"
params_lemma = list(search_variants) + [lang_query]
lemma_q = conn.execute(sql_lemma, params_lemma).fetchall()
parent_lemmas_to_find: Set[str] = set()
for row in lemma_q:
entry_id = row["id"]
pos_title = row["pos_title"] or ""
found_entry_ids.add(entry_id)
# Check for parent lemma in "form_of" field
if any(ft in pos_title for ft in form_titles):
form_of_q = conn.execute("SELECT form_of FROM senses WHERE entry_id = ?", (entry_id,)).fetchall()
for form_row in form_of_q:
form_of_json = form_row["form_of"]
if not form_of_json: continue
try:
form_of_data = json.loads(form_of_json)
if isinstance(form_of_data, list) and form_of_data:
parent = form_of_data[0].get("word")
if parent: parent_lemmas_to_find.add(parent)
except json.JSONDecodeError: pass
# 2. Search Inflected Forms
sql_form = f"""
SELECT DISTINCT e.id
FROM forms f
JOIN entries e ON f.entry_id = e.id
WHERE f.form_text IN ({placeholders}) AND e.lang = ?
AND f.id NOT IN (
SELECT ft.form_id FROM form_tags ft JOIN tags t ON ft.tag_id = t.id
WHERE t.tag IN ('variant', 'auxiliary')
)
"""
params_form = list(search_variants) + [lang_query]
form_q = conn.execute(sql_form, params_form).fetchall()
for row in form_q:
found_entry_ids.add(row["id"])
# 3. Add Parent Lemmas
if parent_lemmas_to_find:
for lemma_word in parent_lemmas_to_find:
parent_id_q = conn.execute("SELECT id FROM entries WHERE word = ? AND lang = ?", (lemma_word, lang_query)).fetchall()
for row in parent_id_q: found_entry_ids.add(row["id"])
# =========================================================
# 🔍 VERBOSE DEBUG DETECTIVE (Triggered on Failure)
# =========================================================
if not found_entry_ids:
log(f"⚠ [DEBUG-VERBOSE] Zero results for '{word}'. Running diagnostics...")
try:
# Check 1: Does it exist in ANY language?
any_lang = conn.execute(
f"SELECT lang, word FROM entries WHERE word IN ({placeholders}) LIMIT 5",
list(search_variants)
).fetchall()
if any_lang:
found_langs = [f"{r['word']} ({r['lang']})" for r in any_lang]
log(f" -> FOUND in other languages/cases: {found_langs}")
log(f" -> CONCLUSION: Language filter '{lang_query}' might be too strict.")
else:
log(f" -> NOT FOUND in 'entries' table (any language).")
# Check 2: Does it exist as a form?
any_form = conn.execute(
f"SELECT form_text FROM forms WHERE form_text IN ({placeholders}) LIMIT 1",
list(search_variants)
).fetchone()
if any_form:
log(f" -> FOUND in 'forms' table as '{any_form['form_text']}'! (But failed to link to an English entry)")
else:
log(f" -> NOT FOUND in 'forms' table either.")
# Check 3: Is it there but with whitespace issues?
fuzzy = conn.execute("SELECT word FROM entries WHERE word LIKE ? LIMIT 1", (f"%{word}%",)).fetchone()
if fuzzy:
log(f" -> PARTIAL MATCH found: '{fuzzy['word']}'. (Check for whitespace/punctuation?)")
else:
log(f" -> COMPLETELY MISSING from DB.")
except Exception as e:
log(f" -> Detective crashed: {e}")
log(f"Wiktionary: Found {len(found_entry_ids)} unique matching entries.")
all_reports = []
for entry_id in found_entry_ids:
try:
report = _wiktionary_build_report_for_entry(entry_id, conn)
all_reports.append(report)
except Exception as e:
log(f"Wiktionary: Failed to build report for entry {entry_id}: {e}")
return all_reports
def _wiktionary_format_semantics_block(wikt_report: Dict[str, Any], pattern_block: Dict[str, Any], top_n: int) -> Dict[str, Any]:
""" Combines English Wiktionary senses with OEWN/ConceptNet. """
pos_key = _wiktionary_map_pos_key(wikt_report.get("pos"))
semantic_lemma = wikt_report.get("lemma")
wiktionary_senses = []
for sense in wikt_report.get("senses", []):
wiktionary_senses.append({
"definition": sense.get("glosses"),
"source": "wiktionary"
})
oewn_senses = []
if WN_AVAILABLE:
try:
senses_by_pos = _get_wordnet_senses_by_pos(semantic_lemma, 'en')
oewn_senses_raw = senses_by_pos.get(pos_key, [])
if oewn_senses_raw and "info" not in oewn_senses_raw[0]:
oewn_senses = oewn_senses_raw
except Exception as e:
log(f"[DEBUG] OEWN lookup failed for {semantic_lemma} ({pos_key}): {e}")
conceptnet_relations = []
if REQUESTS_AVAILABLE:
try:
conceptnet_result = conceptnet_get_relations(semantic_lemma, language='en')
conceptnet_relations = conceptnet_result.get("relations", [])
except Exception: pass
if top_n > 0:
wiktionary_senses = wiktionary_senses[:top_n]
oewn_senses = oewn_senses[:top_n]
conceptnet_relations.sort(key=lambda x: x.get('weight', 0.0), reverse=True)
conceptnet_relations = conceptnet_relations[:top_n]
return {
"lemma": semantic_lemma,
"wiktionary_senses": wiktionary_senses,
"odenet_senses": oewn_senses, # Key name preserved
"conceptnet_relations": conceptnet_relations,
"wiktionary_synonyms": wikt_report.get("synonyms", []),
"wiktionary_antonyms": wikt_report.get("antonyms", [])
}
# ============================================================================
# 6e. SHARED SEMANTIC HELPER (OEWN + OpenBLP)
# ============================================================================
def _get_wordnet_senses_by_pos(word: str, lang: str = 'en') -> Dict[str, List[Dict[str, Any]]]:
""" (Helper) Fetches WordNet (OEWN) senses for a word and groups them by POS. """
senses_by_pos: Dict[str, List[Dict]] = {
"noun": [], "verb": [], "adjective": [], "adverb": []
}
if not WN_AVAILABLE:
return {"noun": [{"info": "WordNet unavailable"}], "verb": [{"info": "WordNet unavailable"}],
"adjective": [{"info": "WordNet unavailable"}], "adverb": [{"info": "WordNet unavailable"}]}
try:
all_senses = wordnet_get_thesaurus_info(word, lang).get("senses", [])
for sense in all_senses:
if "error" in sense: continue
pos_tag = sense.get("pos")
if pos_tag == 'n': senses_by_pos["noun"].append(sense)
elif pos_tag == 'v': senses_by_pos["verb"].append(sense)
elif pos_tag == 'a' or pos_tag == 's': senses_by_pos["adjective"].append(sense)
elif pos_tag == 'r': senses_by_pos["adverb"].append(sense)
except Exception as e:
log(f"WordNet helper check failed for '{word}': {e}")
return senses_by_pos
def _build_semantics_block_for_lemma(lemma: str, pos_key: str, top_n: int, lang: str = 'en') -> Dict[str, Any]:
""" (REUSABLE HELPER) Fetches OEWN, ConceptNet, and OpenBLP data. """
log(f"[DEBUG] Building semantics for lemma='{lemma}', pos='{pos_key}', lang='{lang}'")
oewn_senses = []
if WN_AVAILABLE:
try:
senses_by_pos = _get_wordnet_senses_by_pos(lemma, lang)
oewn_senses_raw = senses_by_pos.get(pos_key, [])
if oewn_senses_raw and "info" not in oewn_senses_raw[0]:
oewn_senses = oewn_senses_raw
except Exception as e:
log(f"[DEBUG] OEWN lookup failed for {lemma} ({pos_key}): {e}")
conceptnet_relations = []
if REQUESTS_AVAILABLE:
try:
conceptnet_result = conceptnet_get_relations(lemma, language=lang)
conceptnet_relations = conceptnet_result.get("relations", [])
except Exception as e:
conceptnet_relations = [{"error": str(e)}]
openblp_relations = []
try:
openblp_relations = openblp_get_relations(lemma)
except Exception as e:
openblp_relations = [{"error": f"OpenBLP stub failed: {e}"}]
if top_n > 0:
oewn_senses = oewn_senses[:top_n]
conceptnet_relations.sort(key=lambda x: x.get('weight', 0.0), reverse=True)
conceptnet_relations = conceptnet_relations[:top_n]
openblp_relations.sort(key=lambda x: x.get('weight', 0.0), reverse=True)
openblp_relations = openblp_relations[:top_n]
return {
"lemma": lemma,
"wiktionary_senses": [],
"odenet_senses": oewn_senses,
"conceptnet_relations": conceptnet_relations,
"openblp_relations": openblp_relations,
"wiktionary_synonyms": [],
"wiktionary_antonyms": []
}
# ============================================================================
# 6f. PRIMARY & FALLBACK ENGINES
# ============================================================================
# --- PRIMARY ENGINE: WIKTIONARY (EN) ---
def _analyze_word_with_wiktionary(word: str, top_n: int) -> Dict[str, Any]:
""" (PRIMARY ENGINE) Analyzes an English word using the Wiktionary DB. """
final_result: Dict[str, Any] = {"input_word": word, "analysis": {}}
conn = wiktionary_get_connection()
if not conn: return {}
spacy_pos_hint, spacy_lemma_hint = None, None
try:
nlp_en = SPACY_MODELS.get("en")
if nlp_en:
doc = nlp_en(word)
token = doc[0]
spacy_pos_hint = token.pos_.lower()
spacy_lemma_hint = token.lemma_
except Exception: pass
try:
wiktionary_reports = _wiktionary_find_all_entries(word, conn)
except Exception as e:
log(f"[DEBUG] Wiktionary (EN) query failed: {e}")
return {}
if not wiktionary_reports: return {}
def get_priority_score(report):
wikt_pos = _wiktionary_map_pos_key(report.get("pos"))
wikt_lemma = report.get("lemma")
if spacy_pos_hint and wikt_pos == spacy_pos_hint:
if spacy_lemma_hint and wikt_lemma == spacy_lemma_hint: return 1
return 2
if wikt_lemma and wikt_lemma.lower() == word.lower(): return 3
return 4
wiktionary_reports.sort(key=get_priority_score)
word_lower = word.lower()
for wikt_report in wiktionary_reports:
# --- FIX START: Safe Extraction ---
pos_key = _wiktionary_map_pos_key(wikt_report.get("pos"))
lemma = wikt_report.get("lemma") or word
pos_title = wikt_report.get("pos_title") or "" # FORCE STRING
# --- FIX END ---
inflections_wikt_block = {
"base_form": lemma,
"forms_list": wikt_report.get("forms", []),
"source": "wiktionary"
}
pattern_block = {}
if PATTERN_EN_AVAILABLE:
try:
use_word = word if "form" in pos_title.lower() else lemma
if pos_key == "noun": pattern_block = pattern_analyze_as_noun_en(use_word)
elif pos_key == "verb": pattern_block = pattern_analyze_as_verb_en(use_word)
elif pos_key == "adjective": pattern_block = pattern_analyze_as_adjective_en(use_word)
elif pos_key == "adverb": pattern_block = {"base_form": lemma, "info": "Adverbs are non-inflecting."}
except Exception as e:
pattern_block = {"error": f"Pattern.en analysis failed: {e}"}
semantics_block = _wiktionary_format_semantics_block(wikt_report, pattern_block, top_n)
pos_entry_report = {
"inflections_wiktionary": inflections_wikt_block,
"inflections_pattern": pattern_block,
"semantics_combined": semantics_block,
"wiktionary_metadata": {
"pos_title": pos_title,
"etymology": wikt_report.get("etymology_text") or "",
"pronunciation": wikt_report.get("sounds") or "",
}
}
is_valid = False
is_inflected_entry = any(ft in pos_title for ft in ["form", "Comparative", "Superlative"])
if lemma.lower() == word_lower: is_valid = True
if not is_valid and not is_inflected_entry:
for form_entry in inflections_wikt_block.get("forms_list", []):
form_text = form_entry.get("form_text", "").strip()
if form_text.lower() == word_lower:
is_valid = True
break
if is_valid:
if pos_key not in final_result["analysis"]:
final_result["analysis"][pos_key] = []
final_result["analysis"][pos_key].append(pos_entry_report)
final_result["info"] = f"Analysis from Wiktionary. Found {len(wiktionary_reports)} raw entries."
return final_result
# --- FALLBACK 2: STANZA ---
def stanza_get_pipeline_en() -> Optional[stanza.Pipeline]:
""" Thread-safe function to get the ENGLISH Stanza Pipeline. """
global STANZA_PIPELINE_EN
if not STANZA_AVAILABLE:
raise ImportError("Stanza library is not installed.")
if STANZA_PIPELINE_EN:
return STANZA_PIPELINE_EN
with STANZA_PIPELINE_LOCK:
if STANZA_PIPELINE_EN:
return STANZA_PIPELINE_EN
try:
print("Initializing Stanza Pipeline (English)...")
stanza.download('en', verbose=False, processors='tokenize,pos,lemma')
pipeline = stanza.Pipeline('en', verbose=False, processors='tokenize,pos,lemma')
print("✓ Stanza Pipeline (English) initialized successfully.")
STANZA_PIPELINE_EN = pipeline
return STANZA_PIPELINE_EN
except Exception as e:
print(f"CRITICAL ERROR: Failed to initialize Stanza (EN) Pipeline: {e}")
return None
def _analyze_word_with_stanza(word: str, top_n: int) -> Dict[str, Any]:
""" (FALLBACK ENGINE 2) Analyzes with Stanza. Must match JSON. """
if not STANZA_AVAILABLE: return {}
print(f"\n[Word Encyclopedia] Running Stanza fallback for: \"{word}\"")
final_result = {"input_word": word, "analysis": {}}
try:
pipeline = stanza_get_pipeline_en()
if not pipeline: return {}
doc = pipeline(word)
processed_lemmas_pos: Set[Tuple[str, str]] = set()
for sent in doc.sentences:
for token in sent.words:
pos_map = {"NOUN": "noun", "VERB": "verb", "ADJ": "adjective", "ADV": "adverb"}
if token.pos not in pos_map: continue
pos_key = pos_map[token.pos]
lemma = token.lemma
if not lemma: continue
if (lemma, pos_key) in processed_lemmas_pos: continue
processed_lemmas_pos.add((lemma, pos_key))
log(f"--- Analyzing Stanza path: lemma='{lemma}', pos='{pos_key}' ---")
pattern_block = {}
if PATTERN_EN_AVAILABLE:
if pos_key == "noun": pattern_block = pattern_analyze_as_noun_en(lemma)
elif pos_key == "verb": pattern_block = pattern_analyze_as_verb_en(lemma)
elif pos_key == "adjective": pattern_block = pattern_analyze_as_adjective_en(lemma)
elif pos_key == "adverb": pattern_block = {"base_form": lemma, "info": "Adverbs are non-inflecting."}
semantics_block = _build_semantics_block_for_lemma(lemma, pos_key, top_n, 'en')
pos_entry_report = {
"stanza_analysis": { # <-- New key for this engine
"lemma": lemma,
"pos_UPOS": token.pos,
"pos_XPOS": token.xpos,
"morphology": str(token.feats) if token.feats else "",
"source": "stanza"
},
"inflections_pattern": pattern_block,
"semantics_combined": semantics_block
}
if word_appears_in_inflections_en(word, pattern_block, pos_key):
if pos_key not in final_result["analysis"]:
final_result["analysis"][pos_key] = []
final_result["analysis"][pos_key].append(pos_entry_report)
else:
log(f" ✗ Stanza path {lemma}/{pos_key} REJECTED by validation.")
if not final_result["analysis"]: return {}
final_result["info"] = "Analysis from Stanza (Fallback 2)."
return final_result
except Exception as e:
log(f"Stanza Engine FAILED: {e}")
traceback.print_exc()
return {}
# --- FALLBACK 3: NLTK ---
def nltk_get_lemmatizer() -> Optional[WordNetLemmatizer]:
""" Thread-safe function to get the NLTK Lemmatizer. """
global NLTK_LEMMATIZER
if not NLTK_AVAILABLE:
return None # Don't raise error, just return None to trigger graceful fallback
if NLTK_LEMMATIZER:
return NLTK_LEMMATIZER
with NLTK_LEMMATIZER_LOCK:
if NLTK_LEMMATIZER:
return NLTK_LEMMATIZER
try:
# Ensure data is present one last time before init
try:
nltk.data.find('corpora/wordnet.zip')
except LookupError:
nltk.download('wordnet', quiet=True)
NLTK_LEMMATIZER = WordNetLemmatizer()
# Warm up
_ = NLTK_LEMMATIZER.lemmatize("cats")
print("✓ NLTK Lemmatizer initialized.")
return NLTK_LEMMATIZER
except Exception as e:
print(f"✗ NLTK Init Failed: {e}")
return None
def _nltk_get_wordnet_pos(treebank_tag):
"""Converts NLTK's Treebank POS tag to a WordNet tag."""
if treebank_tag.startswith('J'): return nltk_wn.ADJ
if treebank_tag.startswith('V'): return nltk_wn.VERB
if treebank_tag.startswith('N'): return nltk_wn.NOUN
if treebank_tag.startswith('R'): return nltk_wn.ADV
return None
def _analyze_word_with_nltk(word: str, top_n: int) -> Dict[str, Any]:
""" (FALLBACK ENGINE 3) Analyzes with NLTK. """
if not NLTK_AVAILABLE: return {}
print(f"\n[Word Encyclopedia] Running NLTK fallback for: \"{word}\"")
final_result = {"input_word": word, "analysis": {}}
try:
lemmatizer = nltk_get_lemmatizer()
if not lemmatizer: return {}
# NLTK's POS tagger needs a list
# This specific call was crashing because 'averaged_perceptron_tagger_eng' was missing
try:
tag = nltk.pos_tag([word])[0][1]
except LookupError:
# Last ditch attempt to download if it was missing
nltk.download('averaged_perceptron_tagger_eng', quiet=True)
tag = nltk.pos_tag([word])[0][1]
wn_pos = _nltk_get_wordnet_pos(tag)
if not wn_pos:
log(f" ✗ NLTK path REJECTED: Unknown POS tag '{tag}'.")
return {}
lemma = lemmatizer.lemmatize(word, wn_pos)
# Map NLTK WN constants to strings
pos_map_rev = {nltk_wn.NOUN: "noun", nltk_wn.VERB: "verb", nltk_wn.ADJ: "adjective", nltk_wn.ADV: "adverb"}
pos_key = pos_map_rev.get(wn_pos)
if not pos_key: return {}
log(f"--- Analyzing NLTK path: lemma='{lemma}', pos='{pos_key}' ---")
pattern_block = {}
if PATTERN_EN_AVAILABLE:
# Use the fixed pattern functions from previous step
if pos_key == "noun": pattern_block = pattern_analyze_as_noun_en(lemma)
elif pos_key == "verb": pattern_block = pattern_analyze_as_verb_en(lemma)
elif pos_key == "adjective": pattern_block = pattern_analyze_as_adjective_en(lemma)
elif pos_key == "adverb": pattern_block = {"base_form": lemma, "info": "Adverbs are non-inflecting."}
semantics_block = _build_semantics_block_for_lemma(lemma, pos_key, top_n, 'en')
pos_entry_report = {
"nltk_analysis": {
"lemma": lemma,
"pos_Treebank": tag,
"pos_WordNet": wn_pos,
"source": "nltk"
},
"inflections_pattern": pattern_block,
"semantics_combined": semantics_block
}
if word_appears_in_inflections_en(word, pattern_block, pos_key):
if pos_key not in final_result["analysis"]:
final_result["analysis"][pos_key] = []
final_result["analysis"][pos_key].append(pos_entry_report)
else:
log(f" ✗ NLTK path {lemma}/{pos_key} REJECTED by validation.")
if not final_result["analysis"]: return {}
final_result["info"] = "Analysis from NLTK (Fallback 3)."
return final_result
except Exception as e:
log(f"NLTK Engine FAILED: {e}")
# traceback.print_exc() # Optional: Uncomment for deep debugging
return {}
# --- FALLBACK 4: TEXTBLOB ---
def _analyze_word_with_textblob(word: str, top_n: int) -> Dict[str, Any]:
""" (FALLBACK ENGINE 4) Analyzes with TextBlob. """
if not TEXTBLOB_AVAILABLE: return {}
print(f"\n[Word Encyclopedia] Running TextBlob fallback for: \"{word}\"")
final_result = {"input_word": word, "analysis": {}}
def get_wordnet_pos_tb(treebank_tag):
if treebank_tag.startswith('J'): return 'a'
if treebank_tag.startswith('V'): return 'v'
if treebank_tag.startswith('N'): return 'n'
if treebank_tag.startswith('R'): return 'r'
return None
try:
try:
blob = TextBlob(word)
# This access triggers the tokenizer
tags = blob.tags
except (LookupError, Exception) as e:
if "punkt" in str(e):
print("Attempting to download missing TextBlob/NLTK data...")
import nltk
nltk.download('punkt_tab', quiet=True)
nltk.download('punkt', quiet=True)
blob = TextBlob(word)
tags = blob.tags
else:
raise e
if not tags: return {}
processed_lemmas_pos: Set[Tuple[str, str]] = set()
for tb_word, tag in tags:
tb_pos = get_wordnet_pos_tb(tag)
if not tb_pos: continue
lemma = tb_word.lemmatize(tb_pos)
pos_map = {'n': "noun", 'v': "verb", 'a': "adjective", 'r': "adverb"}
pos_key = pos_map.get(tb_pos)
if not pos_key: continue
if (lemma, pos_key) in processed_lemmas_pos: continue
processed_lemmas_pos.add((lemma, pos_key))
log(f"--- Analyzing TextBlob path: lemma='{lemma}', pos='{pos_key}' ---")
pattern_block = {}
if PATTERN_EN_AVAILABLE:
if pos_key == "noun": pattern_block = pattern_analyze_as_noun_en(lemma)
elif pos_key == "verb": pattern_block = pattern_analyze_as_verb_en(lemma)
elif pos_key == "adjective": pattern_block = pattern_analyze_as_adjective_en(lemma)
elif pos_key == "adverb": pattern_block = {"base_form": lemma, "info": "Adverbs are non-inflecting."}
semantics_block = _build_semantics_block_for_lemma(lemma, pos_key, top_n, 'en')
pos_entry_report = {
"textblob_analysis": {
"lemma": lemma,
"pos_Treebank": tag,
"source": "textblob"
},
"inflections_pattern": pattern_block,
"semantics_combined": semantics_block
}
if word_appears_in_inflections_en(word, pattern_block, pos_key):
if pos_key not in final_result["analysis"]:
final_result["analysis"][pos_key] = []
final_result["analysis"][pos_key].append(pos_entry_report)
else:
log(f" ✗ TextBlob path {lemma}/{pos_key} REJECTED by validation.")
if not final_result["analysis"]: return {}
final_result["info"] = "Analysis from TextBlob (Fallback 4)."
return final_result
except Exception as e:
log(f"TextBlob Engine FAILED: {e}")
return {}
# ============================================================================
# 7. CONSOLIDATED ANALYZER LOGIC
# ============================================================================
# --- 7a. Comprehensive (Contextual) Analyzer ---
def comprehensive_english_analysis(text: str, top_n_value: Optional[float] = 0) -> Dict[str, Any]:
"""
(CONTEXTUAL) Combines NLP tools for a deep analysis of English text.
"""
try:
if not text or not text.strip():
return {"info": "Please enter text to analyze."}
top_n = int(top_n_value) if top_n_value is not None else 0
print(f"\n[Comprehensive Analysis (EN)] Starting analysis for: \"{text}\"")
results: Dict[str, Any] = {"input_text": text}
nlp_en = None
context_doc = None
# --- 1. LanguageTool Grammar Check (default) ---
print("[Comprehensive Analysis (EN)] Running LanguageTool...")
if LT_AVAILABLE:
try:
results["grammar_check"] = lt_check_grammar(text, 'en')
except Exception as e:
results["grammar_check"] = {"error": f"LanguageTool failed: {e}"}
else:
results["grammar_check"] = {"error": "LanguageTool not available."}
# --- 2. spaCy Morpho-Syntactic Backbone ---
print("[Comprehensive Analysis (EN)] Running spaCy...")
spacy_json_output = []
try:
_, spacy_json, _, _, _ = spacy_get_analysis("en", "en", text)
if isinstance(spacy_json, list):
spacy_json_output = spacy_json
results["spacy_analysis"] = spacy_json_output
nlp_en = SPACY_MODELS.get("en")
if nlp_en:
context_doc = nlp_en(text)
if not context_doc.has_vector or context_doc.vector_norm == 0:
context_doc = None
else:
results["spacy_analysis"] = spacy_json
except Exception as e:
results["spacy_analysis"] = {"error": f"spaCy analysis failed: {e}"}
# --- 2b. Heuristic SVA check (English) ---
try:
if isinstance(results.get("grammar_check"), list) and any(d.get("status") == "perfect" for d in results["grammar_check"]):
subj_num, verb_num, verb_token, subj_token = None, None, None, None
for tok in spacy_json_output:
if tok.get("dependency") == "nsubj":
m = tok.get("morphology","")
if "Number=Sing" in m: subj_num, subj_token = "Sing", tok
spacy_pos_up = (tok.get("pos") or "").upper()
if (spacy_pos_up in {"VERB", "AUX"}) and ("VerbForm=Fin" in tok.get("morphology","")):
verb_token = tok
m = tok.get("morphology","")
if "Number=Plur" in m: verb_num = "Plur"
if subj_num == "Sing" and verb_num == "Plur":
# ... (Simplified SVA logic for English) ...
sva = { "message": "Possible Subject-Verb Agreement Error: Singular subject with plural verb.", "rule_id": "HEURISTIC_SVA_EN", "category": "Grammar", "incorrect_text": f"{verb_token.get('word')}" if verb_token else "", "replacements": [] }
results["grammar_check"] = [sva]
except Exception as e:
print(f"SVA Heuristic failed: {e}")
# --- 3. Lemma-by-Lemma Deep Dive ---
print("[Comprehensive Analysis (EN)] Running Lemma Deep Dive...")
FUNCTION_POS = {"DET","ADP","AUX","PUNCT","SCONJ","CCONJ","PART","PRON","NUM","SYM","X", "SPACE"}
lemma_deep_dive: Dict[str, Any] = {}
processed_lemmas: Set[str] = set()
if not spacy_json_output:
print("[Comprehensive Analysis (EN)] No spaCy tokens to analyze.")
else:
for token in spacy_json_output:
lemma = token.get("lemma")
pos = (token.get("pos") or "").upper()
if not lemma or lemma == "--" or pos in FUNCTION_POS or lemma in processed_lemmas:
continue
processed_lemmas.add(lemma)
print(f"[Deep Dive (EN)] Analyzing lemma: '{lemma}'")
lemma_report: Dict[str, Any] = {}
inflection_analysis = {}
semantic_analysis = {}
try:
# --- Call our NEW English dispatcher ---
encyclopedia_data = analyze_word_encyclopedia(lemma, 0, "wiktionary", 'en')
word_analysis = encyclopedia_data.get("analysis", {})
for pos_key, entry_list in word_analysis.items():
if not entry_list: continue
data = entry_list[0] # Use first, best analysis
inflection_analysis[f"{pos_key}_wiktionary"] = data.get("inflections_wiktionary")
inflection_analysis[f"{pos_key}_pattern"] = data.get("inflections_pattern")
all_senses_for_pos = []
semantics_block = data.get("semantics_combined", {})
# Add Wiktionary senses
wikt_senses = semantics_block.get("wiktionary_senses", [])
for s in wikt_senses:
s["source"] = "wiktionary"
all_senses_for_pos.append(s)
# Add OEWN (OdeNet) senses
wordnet_senses = semantics_block.get("odenet_senses", [])
for s in wordnet_senses:
s["source"] = "oewn" # Label it correctly
all_senses_for_pos.append(s)
semantic_analysis[f"{pos_key}_senses"] = all_senses_for_pos
# Add ConceptNet
if "conceptnet_relations" not in semantic_analysis:
semantic_analysis["conceptnet_relations"] = []
semantic_analysis["conceptnet_relations"].extend(
semantics_block.get("conceptnet_relations", [])
)
# Add OpenBLP
if "openblp_relations" not in semantic_analysis:
semantic_analysis["openblp_relations"] = []
semantic_analysis["openblp_relations"].extend(
semantics_block.get("openblp_relations", [])
)
lemma_report["inflection_analysis"] = inflection_analysis
except Exception as e:
lemma_report["inflection_analysis"] = {"error": f"Analyzer failed: {e}"}
# --- 3b. Contextual Re-ranking ---
# (This logic is identical, it just needs the `nlp_en` model)
if nlp_en and context_doc:
# Rank Senses (Wiktionary + OEWN)
for key in semantic_analysis:
if key.endswith("_senses"):
ranked_senses = []
for sense in semantic_analysis[key]:
if "error" in sense: continue
definition = sense.get("definition", "")
relevance = 0.0
if definition:
try:
def_doc = nlp_en(definition)
if def_doc.has_vector and def_doc.vector_norm > 0:
relevance = context_doc.similarity(def_doc)
except Exception: relevance = 0.0
sense["relevance_score"] = float(relevance)
ranked_senses.append(sense)
ranked_senses.sort(key=lambda x: x.get('relevance_score', 0.0), reverse=True)
if top_n > 0:
ranked_senses = ranked_senses[:top_n]
semantic_analysis[key] = ranked_senses
# Rank Relations (ConceptNet, OpenBLP)
for key in ["conceptnet_relations", "openblp_relations"]:
if key in semantic_analysis:
ranked_relations = []
for rel in semantic_analysis[key]:
if "error" in rel: continue
text_to_score = rel.get('surface') or rel.get('other_node', '')
relevance = 0.0
if text_to_score:
try:
rel_doc = nlp_en(text_to_score)
if rel_doc.has_vector and rel_doc.vector_norm > 0:
relevance = context_doc.similarity(rel_doc)
except Exception: relevance = 0.0
rel["relevance_score"] = float(relevance)
ranked_relations.append(rel)
ranked_relations.sort(key=lambda x: x.get('relevance_score', 0.0), reverse=True)
if top_n > 0:
ranked_relations = ranked_relations[:top_n]
semantic_analysis[key] = ranked_relations
lemma_report["semantic_analysis"] = semantic_analysis
lemma_deep_dive[lemma] = lemma_report
results["lemma_deep_dive"] = lemma_deep_dive
print("[Comprehensive Analysis (EN)] Analysis complete.")
return results
except Exception as e:
print(f"[Comprehensive Analysis (EN)] FATAL ERROR: {e}")
return {
"error": f"Analysis failed: {str(e)}",
"traceback": traceback.format_exc(),
}
# --- 7b. Word Encyclopedia (Non-Contextual) Analyzer ---
def analyze_word_encyclopedia(word: str, top_n_value: Optional[float] = 0, engine_choice: str = "wiktionary", lang: str = 'en') -> Dict[str, Any]:
"""
(PUBLIC DISPATCHER EN) Analyzes a single English word.
Chain: Wiktionary -> HanTa -> Stanza -> NLTK -> TextBlob
"""
if lang != 'en': return {"error": "This is the English app."}
if not word or not word.strip(): return {"info": "Please enter a word."}
word = word.strip()
top_n = int(top_n_value) if top_n_value is not None else 0
result = {}
info_log = []
# Define the full chain of engines to try
engine_functions = {
"wiktionary": _analyze_word_with_wiktionary,
"hanta": _analyze_word_with_hanta_en,
"stanza": _analyze_word_with_stanza,
"nltk": _analyze_word_with_nltk,
"textblob": _analyze_word_with_textblob
}
# Start the chain based on user's choice
start_engines = list(engine_functions.keys())
if engine_choice in start_engines:
start_index = start_engines.index(engine_choice)
start_engines = start_engines[start_index:]
else:
start_engines = list(engine_functions.keys()) # Default to full chain
try:
for engine_name in start_engines:
log(f"[DEBUG] EN Dispatcher: Trying Engine '{engine_name}' for '{word}'...")
if not engine_functions[engine_name]:
info_log.append(f"{engine_name} is not available.")
continue
engine_func = engine_functions[engine_name]
result = engine_func(word, top_n)
if result and result.get("analysis"):
# Success!
if info_log:
result["info"] = f"{result.get('info', '')} (Fallbacks: {' '.join(info_log)})"
return result
info_log.append(f"{engine_name} found no results.")
log(f"[DEBUG] EN Dispatcher: Engine '{engine_name}' found no results. Falling back...")
except Exception as e:
log(f"--- Dispatcher FAILED for engine {engine_choice}: {e} ---")
traceback.print_exc()
return { "error": f"An engine failed during analysis.", "traceback": traceback.format_exc() }
# --- No engines found anything ---
return {
"input_word": word,
"info": f"No analysis found. All engines failed. ({' '.join(info_log)})"
}
# ============================================================================
# 7.5 VISUALIZATION & HTML HELPERS (NEW)
# ============================================================================
HTML_CSS = """
"""
def _format_word_analysis_html(data: Dict[str, Any]) -> str:
""" Generates HTML for a single word analysis result. """
if not data or "analysis" not in data:
return f"{HTML_CSS}No analysis data available. {data.get('info', '')}
"
html = HTML_CSS
analysis = data["analysis"]
# Iterate over POS
for pos_key, entries in analysis.items():
if not entries: continue
entry = entries[0] # Take best candidate
# --- POS Display Logic ---
# Map internal keys to nice display names and CSS classes
display_pos = pos_key.upper()
css_class = "pos-other"
if pos_key == 'noun': css_class = "pos-noun"
elif pos_key == 'verb': css_class = "pos-verb"
elif pos_key == 'adj' or pos_key == 'adjective':
css_class = "pos-adj"
display_pos = "ADJECTIVE"
elif pos_key == 'adv' or pos_key == 'adverb':
css_class = "pos-adv"
display_pos = "ADVERB"
elif pos_key == 'name':
css_class = "pos-name"
display_pos = "PROPER NOUN"
# Data Extraction
inf_wikt = entry.get("inflections_wiktionary") or {}
inf_pat = entry.get("inflections_pattern") or {}
sem_comb = entry.get("semantics_combined") or {}
lemma = inf_wikt.get("base_form") or \
inf_pat.get("base_form") or \
sem_comb.get("lemma") or \
data.get("input_word") or "?"
# --- CARD START ---
html += f"""
"""
# --- Inflections Section ---
html += "
Morphology & Inflections
"
html += "
"
# We check Pattern data first. If it's empty, we show '-' or rely on Wiktionary forms.
has_pattern_data = bool(inf_pat) and "error" not in inf_pat
if pos_key == 'noun':
html += f"| Singular | {inf_pat.get('singular', lemma if not has_pattern_data else '-')} |
"
html += f"| Plural | {inf_pat.get('plural', '-')} |
"
if has_pattern_data:
html += f"| Context | {inf_pat.get('article', '-')} |
"
elif pos_key == 'verb':
cj = inf_pat.get('conjugation') or {}
pres = cj.get('Present') or {}
past = cj.get('Past') or {}
parts = inf_pat.get('participles') or {}
html += f"| Infinitive | {inf_pat.get('infinitive', lemma)} |
"
html += f"| 3rd Person (He/She) | {pres.get('he/she (3sg)', '-')} |
"
html += f"| Past Simple | {past.get('General', '-')} |
"
html += f"| Participle (Ing) | {parts.get('Present Participle (gerund)', '-')} |
"
html += f"| Participle (Past) | {parts.get('Past Participle', '-')} |
"
elif pos_key in ['adjective', 'adj']:
gr = inf_pat.get('grading') or {}
html += f"| Positive | {gr.get('Positive', lemma)} |
"
html += f"| Comparative | {gr.get('Comparative', '-')} |
"
html += f"| Superlative | {gr.get('Superlative', '-')} |
"
# Wiktionary Forms (The "Other Forms" box)
forms_list = inf_wikt.get("forms_list") or []
if forms_list:
# Extract text carefully
forms_str_list = []
for f in forms_list[:8]: # Show up to 8 forms
txt = f.get('form_text')
tags = f.get('tags')
if txt:
# Append tag if available e.g., "Readys (plural)"
display_txt = f"{txt} ({tags})" if tags else txt
forms_str_list.append(display_txt)
if forms_str_list:
html += f"| Forms (DB) | {', '.join(forms_str_list)} |
"
html += "
"
# --- Semantics Section ---
html += "
Definitions & Senses
"
wikt_senses = sem_comb.get("wiktionary_senses") or []
oewn_senses = sem_comb.get("odenet_senses") or []
if not wikt_senses and not oewn_senses:
html += "
No definitions found.
"
for s in wikt_senses[:3]:
gloss_raw = s.get("definition") or ""
gloss = str(gloss_raw).replace(";", "
") # Ensure string
if gloss:
html += f"
Wikt {gloss}
"
for s in oewn_senses[:3]:
defi = s.get("definition") or ""
if defi:
html += f"
OEWN {defi}
"
html += "
"
# --- Relations Section ---
rels = sem_comb.get("conceptnet_relations") or []
if rels:
html += "
Knowledge Graph
"
top_n = 5
visible_rels = rels[:top_n]
hidden_rels = rels[top_n:]
def render_rel(r):
# Robust extraction
rel_name = r.get("relation", "Rel")
# Prefer other_node, fall back to parsing surface, fall back to '?'
target = r.get("other_node") or "?"
# Clean up surface text if needed
if target == "?" and "surface" in r:
parts = str(r["surface"]).split()
if len(parts) > 2: target = parts[-1]
return f"
{rel_name}: {target}"
html += "
"
for r in visible_rels:
html += render_rel(r)
html += "
"
if hidden_rels:
html += f"""
Show {len(hidden_rels)} more relations
"""
for r in hidden_rels:
html += render_rel(r)
html += "
"
html += "
"
html += "
" # End Card
return html
def _format_comprehensive_html(data: Dict[str, Any]) -> str:
""" Generates HTML for the comprehensive sentence analysis. """
if "error" in data:
return f"{data['error']}
"
html = HTML_CSS
# 1. Grammar Check Banner
gc = data.get("grammar_check", [])
if isinstance(gc, list) and len(gc) == 1 and gc[0].get("status") == "perfect":
html += "✓ Grammar Check Passed: No obvious errors detected.
"
elif isinstance(gc, list) and gc:
html += "⚠ Grammar Issues Detected:
"
for err in gc:
msg = err.get("message", "Error")
bad = err.get("incorrect_text", "")
html += f"• {msg} (in: '{bad}')
"
html += "
"
# 2. Lemma Deep Dive Accordion
deep_dive = data.get("lemma_deep_dive", {})
if not deep_dive:
html += "No deep analysis available.
"
else:
html += "Word-by-Word Analysis
"
for lemma, details in deep_dive.items():
# Construct a fake "single word" object to reuse the formatting function
# We need to reshape the deep_dive structure slightly to match the expected format
# The deep dive has keys "inflection_analysis" and "semantic_analysis".
# We need to map this back to { "analysis": { "pos": [ entry... ] } }
# This is a bit tricky because deep_dive separates inflection from semantics
# while the word analyzer groups them by POS entry.
# We will generate a simplified view here.
html += f"{lemma}
"
inflections = details.get("inflection_analysis", {})
semantics = details.get("semantic_analysis", {})
# We need to guess the POS keys present
all_keys = set([k.split('_')[0] for k in inflections.keys()])
reconstructed_data = {"analysis": {}}
for pos in all_keys:
entry = {
"inflections_wiktionary": inflections.get(f"{pos}_wiktionary"),
"inflections_pattern": inflections.get(f"{pos}_pattern"),
"semantics_combined": {
"lemma": lemma,
"wiktionary_senses": [s for s in semantics.get(f"{pos}_senses", []) if s.get('source') == 'wiktionary'],
"odenet_senses": [s for s in semantics.get(f"{pos}_senses", []) if s.get('source') == 'oewn'],
"conceptnet_relations": semantics.get("conceptnet_relations", [])
}
}
reconstructed_data["analysis"][pos] = [entry]
html += _format_word_analysis_html(reconstructed_data)
html += " "
return html
# ============================================================================
# 8. GRADIO UI CREATION (Adapted for English)
# ============================================================================
def create_spacy_tab():
"""Creates the UI for the spaCy tab."""
config = SPACY_UI_TEXT["en"]
model_choices = list(SPACY_MODEL_INFO.keys())
with gr.Row():
ui_lang_radio = gr.Radio(["DE", "EN", "ES"], label=config["ui_lang_label"], value="EN")
model_lang_radio = gr.Radio(
choices=[(SPACY_MODEL_INFO[k][0], k) for k in model_choices],
label=config["model_lang_label"],
value="en" # <-- Default to English
)
markdown_title = gr.Markdown(config["title"])
markdown_subtitle = gr.Markdown(config["subtitle"])
text_input = gr.Textbox(label=config["input_label"], placeholder=config["input_placeholder"], lines=5)
analyze_button = gr.Button(config["button_text"], variant="primary")
with gr.Tabs():
with gr.Tab(config["tab_graphic"]) as tab_graphic:
html_dep_out = gr.HTML(label=config["html_label"])
with gr.Tab(config["tab_ner"]) as tab_ner:
html_ner_out = gr.HTML(label=config["ner_label"])
with gr.Tab(config["tab_table"]) as tab_table:
df_out = gr.DataFrame(label=config["table_label"], headers=config["table_headers"], interactive=False)
with gr.Tab(config["tab_json"]) as tab_json:
json_out = gr.JSON(label=config["json_label"])
analyze_button.click(fn=spacy_get_analysis,
inputs=[ui_lang_radio, model_lang_radio, text_input],
outputs=[df_out, json_out, html_dep_out, html_ner_out, analyze_button],
api_name="get_morphology")
ui_lang_radio.change(fn=spacy_update_ui,
inputs=ui_lang_radio,
outputs=[markdown_title, markdown_subtitle, ui_lang_radio, model_lang_radio,
text_input, analyze_button, tab_graphic, tab_table, tab_json, tab_ner,
html_dep_out, df_out, json_out, html_ner_out])
def create_languagetool_tab():
"""Creates the UI for the Grammar Checker tab with LT."""
gr.Markdown("# 🇬🇧 English Grammar & Spelling Checker")
gr.Markdown("Powered by `LanguageTool`.")
with gr.Row():
text_input = gr.Textbox(
label="English Text to Check",
placeholder="e.g., I seen the man. This is a houze.",
lines=5,
scale=3
)
check_button = gr.Button("Check Text", variant="primary")
output = gr.JSON(label="Detected Errors (JSON)")
check_button.click(
fn=lambda text: lt_check_grammar(text, 'en'),
inputs=[text_input],
outputs=[output],
api_name="check_grammar"
)
gr.Examples(
[["This is a houze."], ["I seen the man."],
["The cat sleep on the table."], ["He asks if he can go."]],
inputs=[text_input], outputs=[output], fn=lambda text: lt_check_grammar(text, 'en'),
cache_examples=False
)
def create_wordnet_tab():
"""Creates the UI for the OEWN tab."""
gr.Markdown("# 🇬🇧 English Thesaurus (OEWN) Service")
gr.Markdown("Powered by `wn` and `Open English WordNet (oewn)`.")
with gr.Column():
word_input = gr.Textbox(
label="English Word",
placeholder="e.g., house, fast, good, cat"
)
check_button = gr.Button("Find Relations", variant="primary")
output = gr.JSON(label="Thesaurus Information (JSON)")
check_button.click(
fn=lambda word: wordnet_get_thesaurus_info(word, 'en'),
inputs=[word_input],
outputs=[output],
api_name="get_thesaurus"
)
gr.Examples(
[["dog"], ["good"], ["run"], ["house"], ["fast"]],
inputs=[word_input], outputs=[output], fn=lambda word: wordnet_get_thesaurus_info(word, 'en'),
cache_examples=False
)
def create_pattern_tab():
"""Creates the UI for the Pattern.en tab."""
gr.Markdown("# 🇬🇧 Complete English Word Inflection System")
gr.Markdown("Powered by `pattern.en`. Generates inflection tables.")
with gr.Column():
word_input = gr.Textbox(
label="English Word",
placeholder="e.g., house, go, beautiful, better, went, cat"
)
generate_button = gr.Button("Generate All Forms", variant="primary")
output = gr.JSON(label="Complete Inflection Analysis")
generate_button.click(
fn=lambda word: pattern_get_all_inflections(word, 'en'),
inputs=[word_input],
outputs=[output],
api_name="get_all_inflections"
)
gr.Examples(
[["house"], ["go"], ["beautiful"], ["better"], ["went"], ["cat"], ["run"]],
inputs=[word_input], outputs=[output], fn=lambda word: pattern_get_all_inflections(word, 'en'),
cache_examples=False
)
def create_conceptnet_tab():
"""--- Creates the UI for the ConceptNet tab ---"""
gr.Markdown("# 🌍 ConceptNet Knowledge Graph (Direct API)")
gr.Markdown("Fetches semantic relations for a word in any language.")
with gr.Row():
word_input = gr.Textbox(label="Word or Phrase", placeholder="e.g., tree, Katze")
lang_input = gr.Textbox(label="Language Code", value="en") # <-- Default to 'en'
check_button = gr.Button("Find Relations", variant="primary")
output = gr.JSON(label="ConceptNet Relations (JSON)")
check_button.click(
fn=conceptnet_get_relations,
inputs=[word_input, lang_input],
outputs=[output],
api_name="get_conceptnet"
)
gr.Examples(
[["tree", "en"], ["Baum", "de"], ["cat", "en"], ["gato", "es"]],
inputs=[word_input, lang_input], outputs=[output], fn=conceptnet_get_relations,
cache_examples=False
)
def create_openblp_tab():
"""--- Creates the UI for the OpenBLP tab ---"""
gr.Markdown("# 🔗 OpenBLP Knowledge Graph (Stub)")
gr.Markdown("Stub component to query OpenBLP relations.")
with gr.Column():
word_input = gr.Textbox(
label="English Lemma",
placeholder="e.g., dog, cat"
)
check_button = gr.Button("Find Relations", variant="primary")
output = gr.JSON(label="OpenBLP Relations (JSON)")
check_button.click(
fn=openblp_get_relations,
inputs=[word_input],
outputs=[output],
api_name="get_openblp"
)
gr.Examples(
[["dog"], ["cat"], ["house"]],
inputs=[word_input], outputs=[output], fn=openblp_get_relations,
cache_examples=False
)
def create_combined_tab():
"""Creates the UI for the CONTEXTUAL Comprehensive Analyzer tab."""
gr.Markdown("# 🚀 Comprehensive Analyzer (Contextual - EN)")
gr.Markdown("This tool provides a deep, **lemma-based** analysis *in context* for English.")
with gr.Column():
text_input = gr.Textbox(
label="English Text",
placeholder="e.g., The quick brown fox jumps over the lazy dog.",
lines=5
)
top_n_number = gr.Number(
label="Limit Semantic Senses per POS (0 for all)",
value=0, step=1, minimum=0, interactive=True
)
analyze_button = gr.Button("Run Comprehensive Analysis", variant="primary")
status_output = gr.Markdown(value="", visible=True)
# --- CHANGED: Added HTML output ---
html_output = gr.HTML(label="Visual Report")
json_output = gr.JSON(label="Raw JSON Data")
# --- CHANGED: Wrapper to return Status, HTML, and JSON ---
def run_analysis_with_status_visual(text, top_n):
try:
status = "🔄 Analyzing..."
yield status, "", {} # Clear outputs
result = comprehensive_english_analysis(text, top_n)
# Generate HTML
html = _format_comprehensive_html(result)
status = f"✅ Analysis complete! Found {len(result.get('lemma_deep_dive', {}))} lemmas."
yield status, html, result
except Exception as e:
error_status = f"❌ Error: {str(e)}"
yield error_status, f"{str(e)}
", {"error": str(e), "traceback": traceback.format_exc()}
analyze_button.click(
fn=run_analysis_with_status_visual,
inputs=[text_input, top_n_number],
outputs=[status_output, html_output, json_output],
api_name="comprehensive_analysis"
)
gr.Examples(
[["The cat sleeps on the table.", 3]],
inputs=[text_input, top_n_number],
outputs=[status_output, html_output, json_output],
fn=run_analysis_with_status_visual,
cache_examples=False
)
def create_word_encyclopedia_tab():
"""--- UI for the NON-CONTEXTUAL Word Analyzer tab ---"""
gr.Markdown("# 📖 Word Encyclopedia (Non-Contextual - EN)")
gr.Markdown("Analyzes a **single English word** for all possible forms, using a chain of engines.")
with gr.Column():
word_input = gr.Textbox(
label="Single English Word",
placeholder="e.g., run, water, fast, beautiful"
)
with gr.Row():
top_n_number = gr.Number(
label="Limit Semantic Senses per POS (0 for all)",
value=0, step=1, minimum=0, interactive=True
)
engine_radio = gr.Radio(
label="Select Analysis Engine",
choices=[
("Wiktionary (Default)", "wiktionary"),
("HanTa (EN)", "hanta"),
("Stanza", "stanza"),
("NLTK", "nltk"),
("TextBlob", "textblob"),
],
value="wiktionary",
interactive=True
)
analyze_button = gr.Button("Analyze Word", variant="primary")
# --- CHANGED: Added HTML output component ---
html_output = gr.HTML(label="Visual Report")
json_output = gr.JSON(label="Raw JSON Data")
# --- CHANGED: Wrapper function to return both HTML and JSON ---
def run_word_visual(word, top_n, engine):
data = analyze_word_encyclopedia(word, top_n, engine, 'en')
html = _format_word_analysis_html(data)
return html, data
analyze_button.click(
fn=run_word_visual, # Use wrapper
inputs=[word_input, top_n_number, engine_radio],
outputs=[html_output, json_output], # Output to both
api_name="analyze_word"
)
gr.Examples(
[["run", 3, "wiktionary"], ["water", 0, "wiktionary"]],
inputs=[word_input, top_n_number, engine_radio],
outputs=[html_output, json_output],
fn=run_word_visual,
cache_examples=False
)
# --- Standalone Engine Tabs ---
def create_wiktionary_tab():
gr.Markdown("# 📙 Wiktionary Lookup (Raw Engine - EN)")
gr.Markdown("Directly query the English Wiktionary (Primary) engine.")
word_input = gr.Textbox(label="Single English Word", placeholder="e.g., house, go, today")
analyze_button = gr.Button("Lookup Word in Wiktionary", variant="primary")
output = gr.JSON(label="Wiktionary Engine Analysis (JSON)")
analyze_button.click(
fn=lambda word: _analyze_word_with_wiktionary(word, 0),
inputs=[word_input], outputs=[output], api_name="wiktionary_lookup"
)
gr.Examples([["house"], ["go"], ["today"], ["run"]], inputs=[word_input], outputs=[output],
fn=lambda word: _analyze_word_with_wiktionary(word, 0), cache_examples=False)
def create_hanta_tab():
gr.Markdown("# 🤖 HanTa Lookup (Raw Engine - EN)")
gr.Markdown("Directly query the HanTa (EN) (Fallback 1) engine.")
word_input = gr.Textbox(label="Single English Word", placeholder="e.g., running, houses, unhappiest")
analyze_button = gr.Button("Lookup Word with HanTa", variant="primary")
output = gr.JSON(label="HanTa Engine Analysis (JSON)")
analyze_button.click(
fn=lambda word: _analyze_word_with_hanta_en(word, 0),
inputs=[word_input], outputs=[output], api_name="hanta_lookup"
)
gr.Examples([["running"], ["houses"], ["unhappiest"], ["fast"]], inputs=[word_input], outputs=[output],
fn=lambda word: _analyze_word_with_hanta_en(word, 0), cache_examples=False)
def create_stanza_tab():
gr.Markdown("# 🏛️ Stanza Lookup (Raw Engine - EN)")
gr.Markdown("Directly query the Stanza (Fallback 2) engine.")
word_input = gr.Textbox(label="Single English Word", placeholder="e.g., ran, better, was")
analyze_button = gr.Button("Lookup Word with Stanza", variant="primary")
output = gr.JSON(label="Stanza Engine Analysis (JSON)")
analyze_button.click(
fn=lambda word: _analyze_word_with_stanza(word, 0),
inputs=[word_input], outputs=[output], api_name="stanza_lookup"
)
gr.Examples([["ran"], ["better"], ["was"], ["dogs"]], inputs=[word_input], outputs=[output],
fn=lambda word: _analyze_word_with_stanza(word, 0), cache_examples=False)
def create_nltk_tab():
gr.Markdown("# 📚 NLTK Lookup (Raw Engine - EN)")
gr.Markdown("Directly query the NLTK (Fallback 3) engine.")
word_input = gr.Textbox(label="Single English Word", placeholder="e.g., corpora, went")
analyze_button = gr.Button("Lookup Word with NLTK", variant="primary")
output = gr.JSON(label="NLTK Engine Analysis (JSON)")
analyze_button.click(
fn=lambda word: _analyze_word_with_nltk(word, 0),
inputs=[word_input], outputs=[output], api_name="nltk_lookup"
)
gr.Examples([["corpora"], ["went"], ["best"], ["running"]], inputs=[word_input], outputs=[output],
fn=lambda word: _analyze_word_with_nltk(word, 0), cache_examples=False)
def create_textblob_tab():
gr.Markdown("# 💬 TextBlob Lookup (Raw Engine - EN)")
gr.Markdown("Directly query the TextBlob (Fallback 4) engine.")
word_input = gr.Textbox(label="Single English Word", placeholder="e.g., worse, cacti")
analyze_button = gr.Button("Lookup Word with TextBlob", variant="primary")
output = gr.JSON(label="TextBlob Engine Analysis (JSON)")
analyze_button.click(
fn=lambda word: _analyze_word_with_textblob(word, 0),
inputs=[word_input], outputs=[output], api_name="textblob_lookup"
)
gr.Examples([["worse"], ["cacti"], ["spoke"], ["fastest"]], inputs=[word_input], outputs=[output],
fn=lambda word: _analyze_word_with_textblob(word, 0), cache_examples=False)
# --- Main UI Builder ---
def create_consolidated_interface():
"""Builds the final Gradio app with all tabs."""
with gr.Blocks(title="Consolidated Linguistics Hub (EN)", theme=gr.themes.Soft()) as demo:
gr.Markdown("# 🏛️ Consolidated Linguistics Hub (ENGLISH)")
gr.Markdown("A suite of advanced tools for English linguistics, built on OEWN, Stanza, NLTK, TextBlob, and more.")
with gr.Tabs():
# --- Main Tools ---
with gr.Tab("📖 Word Encyclopedia (EN)"):
create_word_encyclopedia_tab()
with gr.Tab("🚀 Comprehensive Analyzer (EN)"):
create_combined_tab()
with gr.Tab("🔬 spaCy Analyzer (Multi-lingual)"):
create_spacy_tab()
with gr.Tab("✅ Grammar Check (EN)"):
create_languagetool_tab()
# --- Standalone Engine Tabs (NEW & EXPANDED) ---
with gr.Tab("📙 Engine: Wiktionary (EN)"):
create_wiktionary_tab()
with gr.Tab("🤖 Engine: HanTa (EN)"):
create_hanta_tab()
with gr.Tab("🏛️ Engine: Stanza (EN)"):
create_stanza_tab()
with gr.Tab("📚 Engine: NLTK (EN)"):
create_nltk_tab()
with gr.Tab("💬 Engine: TextBlob (EN)"):
create_textblob_tab()
# --- Standalone Component Tabs ---
with gr.Tab("📚 Component: Inflections (EN)"):
create_pattern_tab()
with gr.Tab("📖 Component: Thesaurus (OEWN)"):
create_wordnet_tab()
with gr.Tab("🌐 Component: ConceptNet (Direct)"):
create_conceptnet_tab()
with gr.Tab("🔗 Component: OpenBLP (EN)"):
create_openblp_tab()
return demo
# ============================================================================
# 9. MAIN EXECUTION BLOCK
# ============================================================================
if __name__ == "__main__":
print("\n" + "="*70)
print("CONSOLIDATED LINGUISTICS HUB (ENGLISH) (STARTING)")
print("="*70 + "\n")
# --- 1. Initialize spaCy Models ---
print("--- Initializing spaCy Models ---")
spacy_initialize_models()
print("--- spaCy Done ---\n")
# --- 2. Initialize WordNet Worker (OEWN) ---
print("--- Initializing OEWN Worker ---")
if WN_AVAILABLE:
try:
wordnet_start_worker()
print("✓ OEWN worker is starting/ready.")
except Exception as e:
print(f"✗ FAILED to start OEWN worker: {e}")
else:
print("INFO: OEWN ('wn') library not available, skipping worker.")
print("--- OEWN Done ---\n")
# --- 3. Initialize Wiktionary (English) ---
print("--- Initializing English Wiktionary DB ---")
try:
if not wiktionary_download_db():
print("✗ WARNING: Failed to download English Wiktionary DB. Primary engine is disabled.")
else:
_ = wiktionary_get_connection() # Pre-warm
wiktionary_run_startup_diagnostics()
except Exception as e:
print(f"✗ FAILED to initialize Wiktionary: {e}")
print("--- Wiktionary Done ---\n")
# --- 4. Initialize HanTa Tagger (EN) ---
print("--- Initializing HanTa Tagger (EN) ---")
if HANTA_AVAILABLE:
try:
hanta_get_tagger_en()
except Exception as e:
print(f"✗ FAILED to start HanTa (EN) tagger: {e}")
else:
print("INFO: HanTa library not available, skipping tagger.")
print("--- HanTa Done ---\n")
# --- 5. Initialize Stanza Pipeline (EN) ---
print("--- Initializing Stanza Pipeline (EN) ---")
if STANZA_AVAILABLE:
try:
stanza_get_pipeline_en()
except Exception as e:
print(f"✗ FAILED to start Stanza (EN) pipeline: {e}")
else:
print("INFO: Stanza library not available, skipping pipeline.")
print("--- Stanza Done ---\n")
# --- 6. Initialize NLTK Lemmatizer ---
print("--- Initializing NLTK Lemmatizer ---")
if NLTK_AVAILABLE:
try:
nltk_get_lemmatizer()
except Exception as e:
print(f"✗ FAILED to start NLTK: {e}")
else:
print("INFO: NLTK library not available, skipping lemmatizer.")
print("--- NLTK Done ---\n")
# --- 8. Check Pattern.en ---
print("--- Checking Pattern.en ---")
if not PATTERN_EN_AVAILABLE:
print("WARNING: pattern.en library not available. 'Inflections' tab will fail.")
else:
print("✓ Pattern.en library is available.")
print("--- Pattern.en Done ---\n")
# --- 9. Initialize ConceptNet Client ---
print("--- Initializing ConceptNet Client ---")
if GRADIO_CLIENT_AVAILABLE:
try:
get_conceptnet_client()
except Exception as e:
print(f"✗ FAILED to start ConceptNet Client: {e}")
else:
print("INFO: gradio_client not available, skipping ConceptNet client.")
print("--- ConceptNet Client Done ---\n")
print("="*70)
print("All services initialized. Launching Gradio Hub (EN)...")
print("="*70 + "\n")
# --- 10. Launch Gradio ---
demo = create_consolidated_interface()
# Use a different port (e.g., 7861) to avoid conflicts with the German app
# demo.launch(server_name="0.0.0.0", server_port=7861, show_error=True)
# No server_port argument!
demo.launch(server_name="0.0.0.0", show_error=True)