Sluethink / app /utils /lexical_utils.py
topGdev's picture
add ai similarity
a561338
import re
from typing import List, Optional, Tuple, Set
import nltk
from nltk.tokenize import sent_tokenize, word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from rapidfuzz.distance import Levenshtein
from app.config import (
MIN_WORDS_PER_SENTENCE,
MIN_SENTENCE_LENGTH,
SEQUENCE_THRESHOLD,
EXACT_MATCH_SCORE,
)
nltk.download("punkt")
nltk.download("stopwords")
nltk.download("wordnet")
nltk.download('punkt_tab')
lemmatizer = WordNetLemmatizer()
def normalize_text(text: str) -> str:
if not text:
return ""
text = text.lower()
text = text.replace("\u00ad", "")
text = re.sub(r"-\s*\n\s*", "", text)
text = text.replace("\n", " ")
text = re.sub(r"[^\x20-\x7E]+", " ", text)
text = (text.replace("'", "'").replace("'", "'")
.replace(""", '"').replace(""", '"')
.replace("β€”", "-").replace("–", "-"))
text = re.sub(r"[^\w\s-]", " ", text)
text = re.sub(r"\s+", " ", text).strip()
tokens = word_tokenize(text)
lemmas = [lemmatizer.lemmatize(tok) for tok in tokens]
normalized = " ".join(lemmas)
normalized = re.sub(r"\s+", " ", normalized).strip()
return normalized
def get_meaningful_sentences(text: str) -> List[str]:
"""Extract meaningful sentences from text."""
sentences = sent_tokenize(text or "")
filtered = []
for s in sentences:
words = word_tokenize(s)
if len(words) >= MIN_WORDS_PER_SENTENCE and len(s.strip()) >= MIN_SENTENCE_LENGTH:
filtered.append(s.strip())
return filtered
def extract_keywords(text: str, max_keywords: int = 5) -> List[str]:
"""Extract top keywords from text for search queries."""
words = word_tokenize((text or "").lower())
stop_words = set(stopwords.words("english"))
filtered = [w for w in words if w.isalpha() and w not in stop_words and len(w) > 3]
freq = nltk.FreqDist(filtered)
return [word for word, _ in freq.most_common(max_keywords)]
def _word_shingles(norm_text: str, k: int = 7) -> List[str]:
tokens = norm_text.split()
if len(tokens) < k:
return []
return [" ".join(tokens[i:i+k]) for i in range(len(tokens) - k + 1)]
def _shingle_sets(a_norm: str, b_norm: str, k: int = 7) -> Tuple[Set[str], Set[str]]:
return set(_word_shingles(a_norm, k)), set(_word_shingles(b_norm, k))
def _jaccard(a: Set[str], b: Set[str]) -> float:
if not a and not b:
return 0.0
inter = len(a & b)
union = len(a | b) or 1
return inter / union
def _containment(a: Set[str], b: Set[str]) -> float:
if not a:
return 0.0
inter = len(a & b)
return inter / len(a)
def _winnowing_hashes(norm_text: str, k: int = 7, w: int = 4) -> List[Tuple[int, int]]:
tokens = norm_text.split()
if len(tokens) < k:
return []
shingles = [" ".join(tokens[i:i+k]) for i in range(len(tokens) - k + 1)]
hashes = [(hash(s) & 0xFFFFFFFF, i) for i, s in enumerate(shingles)]
if w <= 1 or len(hashes) <= w:
return list(dict.fromkeys(hashes))
fps: List[Tuple[int, int]] = []
last_min_abs = -1
for i in range(0, len(hashes) - w + 1):
window = hashes[i:i+w]
min_hash, min_idx = None, None
for j, (h, _) in enumerate(window):
if (min_hash is None) or (h < min_hash) or (h == min_hash and j > (min_idx or -1)):
min_hash, min_idx = h, j
abs_idx = i + (min_idx or 0)
if abs_idx != last_min_abs:
fps.append(hashes[abs_idx])
last_min_abs = abs_idx
return list(dict.fromkeys(fps))
def _winnowing_overlap(a_fp: List[Tuple[int, int]], b_fp: List[Tuple[int, int]]) -> float:
a_set, b_set = set(a_fp), set(b_fp)
if not a_set or not b_set:
return 0.0
shared = len(a_set & b_set)
denom = min(len(a_set), len(b_set)) or 1
return shared / denom
def _exact_substring(norm_sentence: str, norm_external: str) -> bool:
if not norm_sentence or not norm_external:
return False
return norm_external.find(norm_sentence) != -1
def _levenshtein_sim(a: str, b: str) -> float:
if not a or not b:
return 0.0
return Levenshtein.normalized_similarity(a, b)
def _lcs_length(a: str, b: str) -> int:
if not a or not b:
return 0
prev = [0] * (len(b) + 1)
best = 0
for i in range(1, len(a) + 1):
curr = [0]
ai = a[i-1]
for j in range(1, len(b) + 1):
if ai == b[j-1]:
v = prev[j-1] + 1
curr.append(v)
if v > best:
best = v
else:
curr.append(0)
prev = curr
return best
def _extract_phrases(norm_text: str, min_words: int = 3, max_words: int = 7) -> List[str]:
"""Extract all phrases of varying lengths for partial match detection."""
tokens = norm_text.split()
phrases = []
for k in range(min_words, min(max_words + 1, len(tokens) + 1)):
for i in range(len(tokens) - k + 1):
phrases.append(" ".join(tokens[i:i+k]))
return phrases
def _phrase_containment_match(norm_sentence: str, norm_external: str) -> Optional[Tuple[str, float]]:
"""Check if ANY phrase (3-7 words) from sentence appears in external text."""
phrases = _extract_phrases(norm_sentence, min_words=3, max_words=7)
best_phrase = None
best_score = 0.0
for phrase in phrases:
if phrase in norm_external:
score = len(phrase.split()) / len(norm_sentence.split())
if score > best_score:
best_score = score
best_phrase = phrase
if best_phrase and best_score >= 0.4:
return best_phrase, round(best_score, 3)
return None
def find_exact_matches(sentence: str, external_text: str) -> Optional[float]:
"""
Full-sentence lexical match with multiple strategies:
1) Exact substring match β†’ EXACT_MATCH_SCORE (100%)
2) Winnowing fingerprints β†’ robust plagiarism detection
3) Edit-distance (Levenshtein) β†’ catches paraphrased content
4) LCS (longest common substring) β†’ catches missed overlaps
"""
norm_s = normalize_text(sentence)
norm_e = normalize_text(external_text)
if len(norm_s) < MIN_SENTENCE_LENGTH:
return None
# 1) Exact full-sentence match
if _exact_substring(norm_s, norm_e):
return EXACT_MATCH_SCORE
# 2) Winnowing overlap
win_sim = _winnowing_overlap(
_winnowing_hashes(norm_s, k=5, w=4),
_winnowing_hashes(norm_e, k=5, w=4),
)
if win_sim >= SEQUENCE_THRESHOLD * 0.9:
return round(win_sim, 3)
# 3) Edit-distance fallback
lev = _levenshtein_sim(norm_s, norm_e)
if lev >= SEQUENCE_THRESHOLD:
return round(lev, 3)
# 4) LCS fallback
lcs_len = _lcs_length(norm_s, norm_e)
LCS_MIN_CHARS = 15
if lcs_len >= LCS_MIN_CHARS:
return round(lcs_len / max(1, len(norm_s)), 3)
return None
def find_partial_phrase_match(sentence: str, external_text: str) -> Optional[Tuple[str, float]]:
"""
Partial reuse detection via multiple strategies:
1) Phrase containment (3-7 word phrases)
2) Shingle-based Jaccard/Containment
3) Edit distance as fallback
"""
norm_s = normalize_text(sentence)
norm_e = normalize_text(external_text)
if not norm_s or not norm_e:
return None
# Strategy 1: Check if any 3-7 word phrase appears verbatim
phrase_match = _phrase_containment_match(norm_s, norm_e)
if phrase_match:
return sentence, phrase_match[1]
# Strategy 2: 7-word shingles
A, B = _shingle_sets(norm_s, norm_e, k=7)
if A and B:
jac = _jaccard(A, B)
con = _containment(A, B)
score = max(jac, con)
if score >= SEQUENCE_THRESHOLD * 0.8:
return sentence, round(score, 3)
# Strategy 3: Smaller shingles (5-word) for more matches
A_small, B_small = _shingle_sets(norm_s, norm_e, k=5)
if A_small and B_small:
jac_small = _jaccard(A_small, B_small)
con_small = _containment(A_small, B_small)
score_small = max(jac_small, con_small)
if score_small >= SEQUENCE_THRESHOLD * 0.75:
return sentence, round(score_small, 3)
# Strategy 4: Edit distance as last resort
lev = _levenshtein_sim(norm_s, norm_e)
if lev >= SEQUENCE_THRESHOLD * 0.85:
return sentence, round(lev, 3)
return None
def find_partial_phrase_match_for_internal(sentence: str, external_text: str) -> Optional[Tuple[str, float]]:
"""
Wrapper around find_partial_phrase_match that extracts the actual matched phrase
that appears in BOTH documents, not just the full sentence from the first doc.
"""
result = find_partial_phrase_match(sentence, external_text)
if not result:
return None
matched_text, score = result
norm_s = normalize_text(sentence)
norm_e = normalize_text(external_text)
if not norm_s or not norm_e:
return result
# Find the longest common substring that appears in both
words_s = norm_s.split()
words_e = norm_e.split()
best_common = ""
best_len = 0
# Try all consecutive word sequences from sentence A
for i in range(len(words_s)):
for j in range(i + 1, len(words_s) + 1):
phrase = ' '.join(words_s[i:j])
# Check if this phrase exists in document B
if phrase in norm_e:
phrase_len = len(phrase)
if phrase_len > best_len:
best_common = phrase
best_len = phrase_len
# If we found a common phrase, return it
if best_common:
return best_common, score
# Fallback: return original result
return matched_text, score