|
|
from flask import Flask, render_template, request, redirect, url_for, jsonify, session |
|
|
import nltk |
|
|
from nltk.corpus import wordnet as wn |
|
|
from nltk.tokenize import word_tokenize, sent_tokenize |
|
|
from nltk.tag import pos_tag |
|
|
from nltk.stem import WordNetLemmatizer |
|
|
from collections import Counter |
|
|
import re |
|
|
import os |
|
|
import json |
|
|
import random |
|
|
|
|
|
|
|
|
nltk.download('wordnet') |
|
|
nltk.download('punkt') |
|
|
nltk.download('averaged_perceptron_tagger') |
|
|
nltk.download('stopwords') |
|
|
|
|
|
app = Flask(__name__) |
|
|
app.secret_key = 'wsd_secret_key_2023' |
|
|
|
|
|
|
|
|
FEEDBACK_FILE = 'feedback_data.json' |
|
|
|
|
|
class EnhancedLesk: |
|
|
def __init__(self): |
|
|
self.feedback = self.load_feedback() |
|
|
self.lemmatizer = WordNetLemmatizer() |
|
|
self.stopwords = set(nltk.corpus.stopwords.words('english')) |
|
|
|
|
|
|
|
|
try: |
|
|
from transformers import AutoTokenizer, AutoModel |
|
|
import torch |
|
|
|
|
|
|
|
|
print("Loading BERT models...") |
|
|
self.tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased') |
|
|
self.bert_model = AutoModel.from_pretrained('bert-base-uncased') |
|
|
self.bert_available = True |
|
|
print("BERT models loaded successfully") |
|
|
except Exception as e: |
|
|
print(f"BERT models not available: {e}") |
|
|
print("Continuing without BERT embeddings") |
|
|
self.bert_available = False |
|
|
|
|
|
def load_feedback(self): |
|
|
if os.path.exists(FEEDBACK_FILE): |
|
|
with open(FEEDBACK_FILE) as f: |
|
|
return json.load(f) |
|
|
return {} |
|
|
|
|
|
def save_feedback(self): |
|
|
with open(FEEDBACK_FILE, 'w') as f: |
|
|
json.dump(self.feedback, f) |
|
|
|
|
|
def get_wordnet_pos(self, treebank_tag): |
|
|
"""Convert POS tag to WordNet POS format""" |
|
|
if treebank_tag.startswith('J'): |
|
|
return wn.ADJ |
|
|
elif treebank_tag.startswith('V'): |
|
|
return wn.VERB |
|
|
elif treebank_tag.startswith('N'): |
|
|
return wn.NOUN |
|
|
elif treebank_tag.startswith('R'): |
|
|
return wn.ADV |
|
|
else: |
|
|
return None |
|
|
|
|
|
def process_context(self, sentence, target_word): |
|
|
"""Process context words with positional weighting""" |
|
|
words = word_tokenize(sentence.lower()) |
|
|
|
|
|
|
|
|
target_pos = -1 |
|
|
for i, word in enumerate(words): |
|
|
if word.lower() == target_word.lower(): |
|
|
target_pos = i |
|
|
break |
|
|
|
|
|
|
|
|
context_words = [] |
|
|
for i, word in enumerate(words): |
|
|
if word.isalpha() and word not in self.stopwords: |
|
|
lemma = self.lemmatizer.lemmatize(word) |
|
|
|
|
|
|
|
|
if target_pos >= 0: |
|
|
distance = abs(i - target_pos) |
|
|
|
|
|
weight = max(1, 6 - distance) if distance <= 5 else 1 |
|
|
context_words.extend([lemma] * weight) |
|
|
else: |
|
|
context_words.append(lemma) |
|
|
|
|
|
return context_words |
|
|
|
|
|
def calculate_overlap_score(self, sense, context): |
|
|
"""Calculate overlap between sense signature and context with improved weighting""" |
|
|
|
|
|
signature = [] |
|
|
|
|
|
|
|
|
def_words = [w.lower() for w in word_tokenize(sense.definition()) |
|
|
if w.isalpha() and w not in self.stopwords] |
|
|
signature.extend(def_words * 2) |
|
|
|
|
|
|
|
|
for example in sense.examples(): |
|
|
ex_words = [w.lower() for w in word_tokenize(example) |
|
|
if w.isalpha() and w not in self.stopwords] |
|
|
signature.extend(ex_words) |
|
|
|
|
|
|
|
|
for hypernym in sense.hypernyms(): |
|
|
hyper_words = [w.lower() for w in word_tokenize(hypernym.definition()) |
|
|
if w.isalpha() and w not in self.stopwords] |
|
|
signature.extend(hyper_words) |
|
|
|
|
|
for hyponym in sense.hyponyms(): |
|
|
hypo_words = [w.lower() for w in word_tokenize(hyponym.definition()) |
|
|
if w.isalpha() and w not in self.stopwords] |
|
|
signature.extend(hypo_words) |
|
|
|
|
|
|
|
|
for meronym in sense.part_meronyms() + sense.substance_meronyms(): |
|
|
meronym_words = [w.lower() for w in word_tokenize(meronym.definition()) |
|
|
if w.isalpha() and w not in self.stopwords] |
|
|
signature.extend(meronym_words) |
|
|
|
|
|
for holonym in sense.part_holonyms() + sense.substance_holonyms(): |
|
|
holonym_words = [w.lower() for w in word_tokenize(holonym.definition()) |
|
|
if w.isalpha() and w not in self.stopwords] |
|
|
signature.extend(holonym_words) |
|
|
|
|
|
|
|
|
context_counter = Counter(context) |
|
|
signature_counter = Counter(signature) |
|
|
|
|
|
|
|
|
overlap_score = 0 |
|
|
for word, count in context_counter.items(): |
|
|
if word in signature_counter: |
|
|
|
|
|
overlap_score += count * min(signature_counter[word], 5) |
|
|
|
|
|
return overlap_score |
|
|
|
|
|
def bert_similarity(self, sense, context_sentence, target_word): |
|
|
"""Calculate semantic similarity using BERT embeddings""" |
|
|
if not hasattr(self, 'bert_available') or not self.bert_available: |
|
|
return 0 |
|
|
|
|
|
try: |
|
|
import torch |
|
|
|
|
|
|
|
|
gloss = sense.definition() |
|
|
|
|
|
|
|
|
inputs = self.tokenizer(context_sentence, gloss, return_tensors="pt", |
|
|
padding=True, truncation=True, max_length=512) |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = self.bert_model(**inputs) |
|
|
|
|
|
|
|
|
similarity = torch.cosine_similarity( |
|
|
outputs.last_hidden_state[0, 0], |
|
|
outputs.last_hidden_state[0, inputs.input_ids[0].tolist().index(self.tokenizer.sep_token_id) + 1] |
|
|
).item() |
|
|
|
|
|
return similarity * 10 |
|
|
except Exception as e: |
|
|
print(f"Error in BERT similarity calculation: {e}") |
|
|
return 0 |
|
|
|
|
|
def check_collocations(self, sentence, target_word): |
|
|
"""Check for common collocations that indicate specific senses""" |
|
|
collocations = { |
|
|
"bat": { |
|
|
"noun.animal": ["flying bat", "bat flying", "bat wings", "vampire bat", "fruit bat", "bat in the dark", "bat at night"], |
|
|
"noun.artifact": ["baseball bat", "cricket bat", "swing the bat", "wooden bat", "hit with bat"] |
|
|
}, |
|
|
"bank": { |
|
|
"noun.artifact": ["bank account", "bank manager", "bank loan", "bank robbery", "money in bank"], |
|
|
"noun.object": ["river bank", "bank of the river", "west bank", "bank erosion", "along the bank"] |
|
|
}, |
|
|
"bass": { |
|
|
"noun.animal": ["bass fish", "catch bass", "fishing bass", "largemouth bass"], |
|
|
"noun.attribute": ["bass sound", "bass guitar", "bass player", "bass note", "bass drum"] |
|
|
}, |
|
|
"spring": { |
|
|
"noun.time": ["spring season", "this spring", "last spring", "spring weather", "spring flowers"], |
|
|
"noun.artifact": ["metal spring", "spring coil", "spring mechanism"], |
|
|
"noun.object": ["water spring", "hot spring", "spring water"] |
|
|
}, |
|
|
"crane": { |
|
|
"noun.animal": ["crane bird", "crane flew", "crane nest", "crane species"], |
|
|
"noun.artifact": ["construction crane", "crane operator", "crane lifted"] |
|
|
} |
|
|
} |
|
|
|
|
|
if target_word not in collocations: |
|
|
return None, 0 |
|
|
|
|
|
|
|
|
sentence_lower = sentence.lower() |
|
|
for domain, phrases in collocations[target_word].items(): |
|
|
for phrase in phrases: |
|
|
if phrase.lower() in sentence_lower: |
|
|
|
|
|
for sense in wn.synsets(target_word): |
|
|
if sense.lexname() == domain: |
|
|
return sense, 15 |
|
|
|
|
|
return None, 0 |
|
|
|
|
|
def apply_rules(self, word, context, senses): |
|
|
"""Apply hand-coded rules for common ambiguous words""" |
|
|
word = word.lower() |
|
|
context_words = set(context) |
|
|
|
|
|
|
|
|
if word == "bat": |
|
|
|
|
|
animal_indicators = {"fly", "flying", "flew", "wing", "wings", "night", |
|
|
"dark", "cave", "nocturnal", "mammal", "animal", "leather", "leathery"} |
|
|
if any(indicator in context_words for indicator in animal_indicators): |
|
|
|
|
|
for sense in senses: |
|
|
if sense.lexname() == "noun.animal": |
|
|
return 10, sense |
|
|
|
|
|
|
|
|
sports_indicators = {"hit", "swing", "ball", "baseball", "cricket", |
|
|
"player", "game", "sport", "team", "wooden"} |
|
|
if any(indicator in context_words for indicator in sports_indicators): |
|
|
|
|
|
for sense in senses: |
|
|
if sense.lexname() == "noun.artifact": |
|
|
return 8, sense |
|
|
|
|
|
|
|
|
elif word == "bank": |
|
|
|
|
|
finance_indicators = {"money", "account", "deposit", "withdraw", "loan", |
|
|
"credit", "debit", "financial", "cash", "check"} |
|
|
if any(indicator in context_words for indicator in finance_indicators): |
|
|
for sense in senses: |
|
|
if "financial" in sense.definition() or "money" in sense.definition(): |
|
|
return 10, sense |
|
|
|
|
|
|
|
|
river_indicators = {"river", "stream", "water", "flow", "shore", "beach"} |
|
|
if any(indicator in context_words for indicator in river_indicators): |
|
|
for sense in senses: |
|
|
if "river" in sense.definition() or "stream" in sense.definition(): |
|
|
return 10, sense |
|
|
|
|
|
|
|
|
elif word == "bass": |
|
|
|
|
|
fish_indicators = {"fish", "fishing", "catch", "caught", "water", "lake", "river"} |
|
|
if any(indicator in context_words for indicator in fish_indicators): |
|
|
for sense in senses: |
|
|
if sense.lexname() == "noun.animal": |
|
|
return 10, sense |
|
|
|
|
|
|
|
|
music_indicators = {"music", "sound", "guitar", "player", "band", "note", "tone", "instrument", "concert", "loud"} |
|
|
if any(indicator in context_words for indicator in music_indicators): |
|
|
for sense in senses: |
|
|
if sense.lexname() == "noun.attribute" or "music" in sense.definition(): |
|
|
return 10, sense |
|
|
|
|
|
|
|
|
return 0, None |
|
|
|
|
|
def safe_compare_synsets(self, synset1, synset2): |
|
|
"""Safely compare two synsets, handling None values.""" |
|
|
if synset1 is None or synset2 is None: |
|
|
return synset1 is synset2 |
|
|
|
|
|
|
|
|
try: |
|
|
return synset1 == synset2 |
|
|
except AttributeError: |
|
|
return False |
|
|
|
|
|
def disambiguate(self, sentence, word): |
|
|
"""Disambiguate a word in a given sentence context""" |
|
|
word = word.lower() |
|
|
|
|
|
|
|
|
word_tokens = word_tokenize(sentence) |
|
|
pos_tags = pos_tag(word_tokens) |
|
|
word_pos = None |
|
|
|
|
|
for token, pos in pos_tags: |
|
|
if token.lower() == word: |
|
|
word_pos = self.get_wordnet_pos(pos) |
|
|
break |
|
|
|
|
|
|
|
|
if word_pos: |
|
|
senses = [s for s in wn.synsets(word) if s.pos() == word_pos] |
|
|
if not senses: |
|
|
senses = wn.synsets(word) |
|
|
else: |
|
|
senses = wn.synsets(word) |
|
|
|
|
|
if not senses: |
|
|
return None, [] |
|
|
|
|
|
|
|
|
context = self.process_context(sentence, word) |
|
|
|
|
|
|
|
|
collocation_sense, collocation_score = self.check_collocations(sentence, word) |
|
|
if collocation_sense and collocation_score > 0: |
|
|
|
|
|
top_senses = [s for s in senses if not self.safe_compare_synsets(s, collocation_sense)][:3] |
|
|
return collocation_sense, top_senses |
|
|
|
|
|
|
|
|
rule_score, rule_sense = self.apply_rules(word, context, senses) |
|
|
|
|
|
|
|
|
scored_senses = [] |
|
|
for sense in senses: |
|
|
|
|
|
|
|
|
rule_boost = rule_score if (rule_sense is not None and self.safe_compare_synsets(sense, rule_sense)) else 0 |
|
|
|
|
|
|
|
|
overlap_score = self.calculate_overlap_score(sense, context) |
|
|
|
|
|
|
|
|
bert_score = 0 |
|
|
if hasattr(self, 'bert_available') and self.bert_available: |
|
|
bert_score = self.bert_similarity(sense, sentence, word) |
|
|
|
|
|
|
|
|
feedback_key = f"{word}_{hash(sentence) % 10000}" |
|
|
feedback_score = self.feedback.get(feedback_key, {}).get(sense.name(), 0) |
|
|
|
|
|
|
|
|
final_score = ( |
|
|
overlap_score * 0.4 + |
|
|
bert_score * 0.3 + |
|
|
rule_boost * 0.2 + |
|
|
feedback_score * 0.1 |
|
|
) |
|
|
|
|
|
scored_senses.append((final_score, sense)) |
|
|
|
|
|
scored_senses.sort(reverse=True, key=lambda x: x[0]) |
|
|
|
|
|
if not scored_senses: |
|
|
return None, [] |
|
|
|
|
|
best_sense = scored_senses[0][1] |
|
|
top_senses = [s[1] for s in scored_senses[1:4]] |
|
|
return best_sense, top_senses |
|
|
|
|
|
def add_feedback(self, word, context, correct_sense): |
|
|
"""Store user feedback to improve future disambiguation""" |
|
|
|
|
|
context_str = ' '.join(context[:10]) |
|
|
key = f"{word}_{hash(context_str) % 10000}" |
|
|
|
|
|
if key not in self.feedback: |
|
|
self.feedback[key] = {} |
|
|
|
|
|
|
|
|
self.feedback[key][correct_sense] = self.feedback[key].get(correct_sense, 0) + 5 |
|
|
|
|
|
|
|
|
for sense in wn.synsets(word): |
|
|
if sense.name() != correct_sense and sense.name() in self.feedback[key]: |
|
|
self.feedback[key][sense.name()] = max(0, self.feedback[key][sense.name()] - 1) |
|
|
|
|
|
self.save_feedback() |
|
|
|
|
|
|
|
|
for sense in wn.synsets(word): |
|
|
if sense.name() == correct_sense: |
|
|
return { |
|
|
'definition': sense.definition(), |
|
|
'examples': sense.examples() |
|
|
} |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
lesk_processor = EnhancedLesk() |
|
|
|
|
|
@app.route('/', methods=['GET', 'POST']) |
|
|
def index(): |
|
|
if request.method == 'POST': |
|
|
text = request.form['text'] |
|
|
target_word = request.form.get('target_word', '') |
|
|
return redirect(url_for('results', text=text, word=target_word)) |
|
|
return render_template('index.html') |
|
|
|
|
|
@app.route('/results') |
|
|
def results(): |
|
|
text = request.args.get('text', '') |
|
|
target_word = request.args.get('word', '').lower() |
|
|
|
|
|
if not target_word: |
|
|
|
|
|
words = word_tokenize(text.lower()) |
|
|
ambiguous_words = [] |
|
|
for word in words: |
|
|
if word.isalpha() and len(wn.synsets(word)) > 1: |
|
|
ambiguous_words.append(word) |
|
|
|
|
|
|
|
|
if ambiguous_words: |
|
|
target_word = ambiguous_words[0] |
|
|
|
|
|
best_sense = None |
|
|
top_senses = [] |
|
|
highlighted_text = text |
|
|
sentence = "" |
|
|
context_words = [] |
|
|
|
|
|
if target_word: |
|
|
sentences = sent_tokenize(text) |
|
|
for sent in sentences: |
|
|
if re.search(r'\b' + re.escape(target_word) + r'\b', sent, re.I): |
|
|
sentence = sent |
|
|
context_words = lesk_processor.process_context(sent, target_word) |
|
|
try: |
|
|
best_sense, top_senses = lesk_processor.disambiguate(sent, target_word) |
|
|
except Exception as e: |
|
|
print(f"Disambiguation error: {e}") |
|
|
return render_template('error.html', |
|
|
error_message=f"Could not disambiguate the word '{target_word}'. Please try a different word or sentence.", |
|
|
error_details=str(e)) |
|
|
|
|
|
highlighted_text = re.sub( |
|
|
r'\b' + re.escape(target_word) + r'\b', |
|
|
f'<span class="highlight-word">{target_word}</span>', |
|
|
text, |
|
|
flags=re.IGNORECASE |
|
|
) |
|
|
break |
|
|
|
|
|
|
|
|
if best_sense: |
|
|
session['last_disambiguation'] = { |
|
|
'word': target_word, |
|
|
'context': context_words, |
|
|
'sentence': sentence |
|
|
} |
|
|
|
|
|
return render_template('results.html', |
|
|
text=text, |
|
|
highlighted_text=highlighted_text, |
|
|
target_word=target_word, |
|
|
best_sense=best_sense, |
|
|
top_senses=top_senses, |
|
|
sentence=sentence, |
|
|
context_words=', '.join([w for w in set(context_words)][:10])) |
|
|
|
|
|
@app.route('/feedback', methods=['POST']) |
|
|
def feedback(): |
|
|
data = request.get_json() |
|
|
word = data.get('word') |
|
|
context = data.get('context', []) |
|
|
correct_sense = data.get('correct_sense') |
|
|
|
|
|
if word and correct_sense: |
|
|
updated_sense = lesk_processor.add_feedback(word, context, correct_sense) |
|
|
return jsonify(updated_sense) |
|
|
|
|
|
return jsonify({'error': 'Invalid feedback data'}), 400 |
|
|
|
|
|
@app.route('/lesk-explained') |
|
|
def lesk_explained(): |
|
|
return render_template('lesk_explained.html') |
|
|
|
|
|
|
|
|
@app.route('/error') |
|
|
def error(): |
|
|
error_message = request.args.get('message', 'An unknown error occurred') |
|
|
error_details = request.args.get('details', '') |
|
|
return render_template('error.html', error_message=error_message, error_details=error_details) |
|
|
|
|
|
if __name__ == '__main__': |
|
|
app.run(debug=True) |
|
|
|