raij-ai / aspect_based_sentiment /aspect_sentiment.py
github-actions[bot]
chore: sync from GitHub 2026-04-12 13:07:52 UTC
ec40ab9
import re
from collections import Counter
from nltk.stem import PorterStemmer
from models import ABSA_PIPELINE, SPACY_NLP, TRANSLATOR_AR_EN
_stemmer = PorterStemmer()
_ASPECT_STOPWORDS = {
"the", "a", "an", "this", "that", "these", "those", "my", "our", "your", "their",
"everything", "nothing", "something", "anything", "it", "one", "ones",
"pros", "cons", "pro", "con", "review", "reviews", "star", "stars"
}
# Abstract nouns that are never real product features on their own.
_ABSTRACT_NOUNS = {
"difference", "upgrade", "downgrade", "improvement", "issue", "problem",
"compromise",
"thing", "stuff", "way", "lot", "kind", "type", "sort", "bit",
"matter", "deal", "point", "reason", "result", "change", "experience",
"time", "part",
"feature", "product", "model", "item", "unit", "option", "choice",
"purchase", "buy", "use", "value", "overall",
"look", "dream", "feel", "aspect", "detail",
"drawback", "alternative", "comparison", "competitor", "competitor",
"advantage", "disadvantage", "substitute", "rival", "replacement",
"day",
}
# POS tags considered "filler" — tokens with these tags don't contribute meaning.
_FILLER_POS = {"DET", "PRON", "ADP", "PART", "PUNCT", "SPACE", "SYM"}
# Regex to detect product model codes like 'a6400', 'rx100iii', 'xt30ii'
_MODEL_CODE_RE = re.compile(r"^[a-z]{0,3}\d+[a-z]{0,3}$")
# Regex to detect any non-Latin characters (Arabic, CJK, etc.)
_NON_LATIN_RE = re.compile(r"[^\x00-\x7F]")
# Fixed confidence cutoff for including ABSA predictions in aggregation.
DEFAULT_MIN_CONFIDENCE = 0.85
# Arabic Unicode range for language detection.
_ARABIC_RE = re.compile(r"[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF]+")
# Regex to match URLs
_URL_RE = re.compile(r"https?://\S+", re.IGNORECASE)
def _is_arabic(text: str) -> bool:
"""Return True if the text contains a significant amount of Arabic characters."""
arabic_chars = len(_ARABIC_RE.findall(text))
total_alpha = len(re.findall(r"[a-zA-Z\u0600-\u06FF]", text))
if total_alpha == 0:
return False
return arabic_chars / total_alpha > 0.3
def _translate_to_english(text: str) -> str:
"""Translate Arabic text to English using Helsinki-NLP/opus-mt-ar-en.
Returns the translated string, or the original text on failure.
"""
try:
result = TRANSLATOR_AR_EN(text, max_length=512)
if result and isinstance(result, list):
translated = (result[0].get("translation_text") or "").strip()
if translated:
return translated
except Exception:
pass
return text
def _is_model_code(text: str) -> bool:
"""Return True if the text looks like a product model code (e.g. a6400, rx100)."""
tokens = text.strip().split()
if len(tokens) == 1:
tok = tokens[0]
# Ignore anything with a hyphen (e.g. '10-bit', 'v-log')
if "-" in tok:
return False
has_letters = bool(re.search(r"[a-z]", tok))
has_digits = bool(re.search(r"\d", tok))
return has_letters and has_digits and len(tok) <= 8
return False
def _build_product_stem_set(product_title: str, product_tags: list[str] = None, product_categories: list[str] = None) -> set[str]:
"""Build a lower-cased set of meaningful word stems from a product title, tags, and categories.
Uses PorterStemmer instead of spaCy for near-instant execution.
Used as a robust filter to skip extracted aspects that merely refer to the
product itself or its category (e.g. 'phone' from tags ['smart phone']).
"""
elements = []
if product_title: elements.append(product_title)
if product_tags: elements.extend(product_tags)
if product_categories: elements.extend(product_categories)
text_before = " ".join(elements).lower()
# Expand common compound product words so their individual bases can be matched
compounds = {
"smartphone": "smart phone",
"smartwatch": "smart watch",
"smartband": "smart band",
"headphone": "head phone",
"earphone": "ear phone",
"earbud": "ear bud",
"mousepad": "mouse pad",
"webcam": "web cam"
}
text_after = text_before
for comp, expanded in compounds.items():
text_after = text_after.replace(comp, expanded)
stems = set()
trivial = {"the", "a", "an", "for", "with", "and", "or", "of", "in", "by", "to"}
for word in re.findall(r'[a-z]+', text_before + " " + text_after):
if word not in trivial and len(word) > 1:
stems.add(_stemmer.stem(word))
# Keep whole elements too
for elem in elements:
clean_elem = elem.strip().lower()
if len(clean_elem) > 1 and clean_elem not in trivial:
stems.add(clean_elem)
return stems
# Backward-compatible alias
def _build_product_lemma_set(product_title: str, product_tags: list[str] = None, product_categories: list[str] = None) -> set[str]:
"""Backward-compatible alias — now uses stemming internally."""
return _build_product_stem_set(product_title, product_tags, product_categories)
def _clean_review_text(text: str) -> str:
"""Clean scraped/poorly-formatted review content before NLP processing.
Many review records in the DB are comma-joined concatenations of multiple
fields (title, rating label, image URL, short phrases, etc.).
This function:
1. Removes all URLs / image links.
2. Splits on commas and newlines.
3. Discards fragments that look like noise: a bare '-', a single word
<= 3 chars (e.g. '-', 'Ok'), pure numbers, or lone punctuation.
4. Rejoins surviving fragments with a period-space so spaCy sees proper
sentence boundaries.
"""
# Remove URLs first
text = _URL_RE.sub("", text)
# Split on commas and newlines
fragments = re.split(r"[,\n]+", text)
good: list[str] = []
for frag in fragments:
frag = frag.strip().strip("'\"")
# Skip empty, bare dash/hyphen, pure numbers, or very short tokens
if not frag:
continue
if re.fullmatch(r"[-\d\s!?.]+", frag):
continue
# Keep fragments that look like a real phrase (>= 2 words OR single
# word with >= 4 chars that is alphabetic)
words = frag.split()
if len(words) >= 2:
good.append(frag)
elif len(words) == 1 and len(words[0]) >= 4 and words[0].isalpha():
good.append(frag)
if not good:
# Fallback: return the URL-stripped original
return _URL_RE.sub("", text).strip()
return ". ".join(good)
def _extract_aspects_from_doc(doc, product_stems: set[str]) -> list[str]:
"""Extract candidate aspect terms from a pre-processed spaCy Doc.
This is the core extraction logic shared by both single-text and batch modes.
Uses stem-based comparison for product-word filtering.
"""
aspects: set[str] = set()
for chunk in doc.noun_chunks:
cleaned = re.sub(r"^(and|or|but)\s+", "", chunk.text.strip().lower())
# 1. Skip chunks that contain non-Latin characters
if _NON_LATIN_RE.search(cleaned):
continue
# 2. Skip trivial or purely-filler chunks
if len(cleaned) <= 1 or all(tok.pos_ in _FILLER_POS for tok in chunk):
continue
# Identify "content tokens" — tokens that are not determiners/pronouns/etc.
content_tokens = [tok for tok in chunk if tok.pos_ not in _FILLER_POS]
if not content_tokens:
continue
content_words = {tok.text.lower() for tok in content_tokens if tok.is_alpha}
# 3. Skip if every content word is in the stopword set
if content_words and content_words.issubset(_ASPECT_STOPWORDS):
continue
# 4. Skip product model codes
if len(content_tokens) == 1 and _is_model_code(content_tokens[0].text.lower()):
continue
# 5. Skip chunks whose HEAD (root) noun is abstract
if chunk.root.lemma_.lower() in _ABSTRACT_NOUNS:
continue
# 5.5 Skip chunks with no actual NOUN/PROPN
if not any(tok.pos_ in {"NOUN", "PROPN"} for tok in content_tokens):
continue
# 6. Skip aspects whose content stems are all in the product title/tags
content_stems = {_stemmer.stem(tok.text.lower()) for tok in content_tokens if tok.is_alpha}
if product_stems and content_stems:
if content_stems.issubset(product_stems):
continue
aspects.add(cleaned)
return list(aspects)
def extract_aspects(text: str, product_title: str = "", product_tags: list[str] = None, product_categories: list[str] = None, product_lemmas: set[str] = None) -> list[str]:
"""Extract candidate aspect terms from a single text (backward-compatible).
For batch processing, use extract_aspects_batch() instead.
"""
if product_lemmas is None:
product_lemmas = _build_product_stem_set(product_title, product_tags, product_categories)
text = _clean_review_text(text)
doc = SPACY_NLP(text)
return _extract_aspects_from_doc(doc, product_lemmas)
def extract_aspects_batch(texts: list[str], product_stems: set[str]) -> list[list[str]]:
"""Extract aspects from multiple texts using spaCy's nlp.pipe() for speed.
Returns a list of aspect-lists, one per input text.
"""
cleaned_texts = [_clean_review_text(t) for t in texts]
nlp = SPACY_NLP
# nlp.pipe() processes texts in a streaming batch — much faster than
# calling nlp(text) individually
docs = list(nlp.pipe(cleaned_texts, batch_size=64))
return [_extract_aspects_from_doc(doc, product_stems) for doc in docs]
def _extract_sentences(text: str) -> list[str]:
"""Split text into individual sentences."""
return [s.strip() for s in re.split(r'(?<=[.!?])\s+', text) if s.strip()]
def _normalize_aspect(aspect: str) -> str:
"""Normalize extracted aspects so similar mentions are grouped together."""
# Reject any aspect that still contains Arabic / non-ASCII characters
if re.search(r"[\u0600-\u06FF]", aspect):
return ""
raw_tokens = aspect.lower().strip().split()
tokens = []
for tok in raw_tokens:
clean = re.sub(r"^[^a-z0-9\-]+|[^a-z0-9\-]+$", "", tok)
if clean and clean not in _ASPECT_STOPWORDS:
tokens.append(clean)
if not tokens:
return ""
return " ".join(tokens[:3])
def _dedupe_preserve_order(sentences: list[str]) -> list[str]:
"""Return unique non-empty sentences while preserving first-seen order."""
seen: set[str] = set()
unique: list[str] = []
for raw in sentences:
sentence = (raw or "").strip()
if not sentence:
continue
key = sentence.lower()
if key in seen:
continue
seen.add(key)
unique.append(sentence)
return unique
def _get_best_sentence(sentences: list[str]) -> str:
"""Return the most informative sentence as a fallback highlight."""
unique_sentences = _dedupe_preserve_order(sentences)
if not unique_sentences:
return ""
# Use length as a rough proxy for informativeness.
selected = sorted(unique_sentences, key=len, reverse=True)
return selected[0].rstrip(".?!")
def _make_aspect_summary_line(
aspect: str,
pos: int,
neg: int,
threshold: float,
) -> str:
"""Build a concise, punchy bullet point for the UI pros/cons lists."""
aspect_name = aspect.lower()
if pos >= threshold and neg >= threshold:
return f"Mixed feedback on {aspect_name}"
if pos >= threshold:
return f"Excellent {aspect_name}"
if neg >= threshold:
return f"Issues with {aspect_name}"
return ""
def _strip_leading_article(text: str) -> str:
"""Remove a leading 'the/a/an' so templates can add their own article."""
stripped = text.lstrip()
for art in ("the ", "a ", "an "):
if stripped.lower().startswith(art):
return stripped[len(art):]
return stripped
def _extract_short_phrase(sentences: list[str], aspect_name: str) -> str:
"""Return a clean noun-phrase descriptor for use in advisory templates."""
return aspect_name
def generate_summary(highlights: list[dict]) -> list[str]:
"""Produce 3–4 advisory-tone sentences summarising all reviews.
Sentence distribution follows Noon.com advisory rules:
- pros >> cons → 2-3 pro sentences, 1 con sentence
- cons >> pros → 1 pro sentence, 2-3 con sentences
- balanced → 2 pro sentences, 2 con sentences
Mixed-sentiment aspects get their own "receives mixed feedback" sentence.
Always outputs English with advisory phrasing.
"""
import random
# ── Bucket aspects into pro / mixed / con ─────────────────────────
pro_highlights = []
mixed_highlights = []
con_highlights = []
for h in highlights:
pos = h["positive_mentions"]
neg = h["negative_mentions"]
if pos > neg * 2:
pro_highlights.append(h)
elif neg > pos * 2:
con_highlights.append(h)
else:
mixed_highlights.append(h)
# Filter out any blank phrases (result of failed aspect normalization)
pro_phrases = [p for p in [_strip_leading_article(h["aspect"]) for h in pro_highlights] if p.strip()]
con_phrases = [p for p in [_strip_leading_article(h["aspect"]) for h in con_highlights] if p.strip()]
mixed_phrases = [p for p in [_strip_leading_article(h["aspect"]) for h in mixed_highlights] if p.strip()]
n_pro = len(pro_phrases)
n_con = len(con_phrases)
# ── Determine sentence distribution ───────────────────────────────
# pros >> cons → 2-3 pro + 1 con
# cons >> pros → 1 pro + 2-3 con
# balanced → 2 pro + 2 con
if n_pro > n_con * 2:
mode = "pro_dominated"
elif n_con > n_pro * 2:
mode = "con_dominated"
else:
mode = "balanced"
result: list[str] = []
# ── PRO sentence templates ────────────────────────────────────────
def _make_pro_sentences(phrases, count):
"""Generate `count` pro sentences from the available phrases without repeating any."""
sents = []
if not phrases:
return sents
idx = 0
# Sentence 1
if count >= 1 and idx < len(phrases):
rem = len(phrases) - idx
if rem >= 2:
p0, p1 = phrases[idx], phrases[idx+1]
idx += 2
sents.append(random.choice([
f"This product is widely praised for its excellent {p0} and {p1}.",
f"Many people appreciate its {p0} and {p1}, calling it a great choice.",
f"Customers frequently highlight the {p0} and {p1}.",
]))
else:
p0 = phrases[idx]
idx += 1
sents.append(random.choice([
f"This product is praised for its {p0}.",
f"Many people appreciate the {p0}.",
f"Customers frequently highlight the {p0}.",
]))
# Sentence 2
if count >= 2 and idx < len(phrases):
rem = len(phrases) - idx
if rem >= 2:
p0, p1 = phrases[idx], phrases[idx+1]
idx += 2
sents.append(random.choice([
f"Both the {p0} and {p1} are also frequently highlighted by buyers.",
f"The {p0} and {p1} receive consistently positive feedback.",
f"Users are happy with the {p0}, often describing the {p1} as a major plus.",
]))
else:
p = phrases[idx]
idx += 1
sents.append(random.choice([
f"Its {p} is also frequently highlighted by buyers.",
f"The {p} receives consistently positive feedback.",
f"Users love its {p}, often describing it as a major plus.",
]))
# Sentence 3
if count >= 3 and idx < len(phrases):
rem = len(phrases) - idx
if rem >= 2:
p0, p1 = phrases[idx], phrases[idx+1]
idx += 2
sents.append(random.choice([
f"Positive mentions of the {p0} and {p1} further support its appeal.",
f"The {p0} and {p1} offer a great overall experience that many customers appreciate.",
f"Buyers are particularly satisfied with the {p0} and {p1} as well.",
]))
else:
p = phrases[idx]
idx += 1
sents.append(random.choice([
f"Positive mentions of the {p} further support its appeal.",
f"The {p} offers a great overall experience that many customers appreciate.",
f"Buyers are particularly satisfied with the {p} as well.",
]))
return sents
# ── CON sentence templates ────────────────────────────────────────
def _make_con_sentences(phrases, count):
"""Generate `count` con sentences from the available phrases without repeating any."""
sents = []
if not phrases:
return sents
idx = 0
# Sentence 1
if count >= 1 and idx < len(phrases):
rem = len(phrases) - idx
if rem >= 3:
joined = ", ".join(phrases[idx:idx+2]) + ", and " + phrases[idx+2]
idx += 3
sents.append(random.choice([
f"Some users reported concerns with {joined}.",
f"A significant number of users raised concerns regarding {joined}.",
f"Issues with {joined} were common complaints.",
]))
elif rem == 2:
c0, c1 = phrases[idx], phrases[idx+1]
idx += 2
sents.append(random.choice([
f"Some users reported issues with {c0} and {c1}.",
f"Common concerns include the {c0} and {c1}.",
f"Issues with {c0} and {c1} were frequent complaints.",
]))
else:
c0 = phrases[idx]
idx += 1
sents.append(random.choice([
f"A major concern is the {c0}.",
f"Some users reported issues with {c0}.",
f"A frequent complaint is the {c0}."
]))
# Sentence 2
if count >= 2 and idx < len(phrases):
rem = len(phrases) - idx
if rem >= 2:
joined = " and ".join(phrases[idx:idx+2])
idx += 2
sents.append(random.choice([
f"Many people have raised serious concerns about the {joined}.",
f"A significant number of users reported that the {joined} simply does not meet expectations.",
f"Dissatisfaction with {joined} was a common theme among reviews.",
]))
else:
c = phrases[idx]
idx += 1
sents.append(random.choice([
f"Many people have raised concerns about the {c}.",
f"A significant number of users reported that the {c} does not meet expectations.",
f"The {c} is frequently mentioned as a negative point.",
]))
# Sentence 3
if count >= 3 and idx < len(phrases):
rem = len(phrases) - idx
if rem >= 2:
joined = " and ".join(phrases[idx:idx+2])
idx += 2
sents.append(random.choice([
f"Issues with {joined} were also noted across multiple reviews.",
f"The {joined} can be problematic, leading to frustration for some buyers.",
f"Overall, many customers noted these areas as needing improvement.",
]))
else:
c = phrases[idx]
idx += 1
sents.append(random.choice([
f"Issues with the {c} were also noted across multiple reviews.",
f"The {c} can be problematic, leading to frustration for some buyers.",
f"Overall, many customers noted this area as needing improvement.",
]))
return sents
# ── MIXED sentence templates ──────────────────────────────────────
def _make_mixed_sentences(phrases, max_count=1):
sents = []
if not phrases:
return sents
idx = 0
if max_count >= 1 and idx < len(phrases):
m0 = phrases[idx]
idx += 1
sents.append(random.choice([
f"The {m0} receives mixed feedback, with many finding it average or disappointing.",
f"The {m0} gets mixed feedback; some find it excellent, others feel it's disappointing.",
f"{m0.capitalize()} can be inconsistent, so it's often a point of debate among buyers.",
]))
if max_count >= 2 and idx < len(phrases):
m1 = phrases[idx]
idx += 1
sents.append(random.choice([
f"Similarly, the {m1} receives mixed feedback from customers.",
f"The {m1} also gets divided opinions; results may vary.",
]))
return sents
# ── Assemble based on mode ────────────────────────────────────────
if mode == "pro_dominated":
# 2-3 pro sentences, maybe 1 mixed, then 1 con sentence. Total <= 4
pro_count = 3 if n_pro >= 3 else 2
mixed_count = 0 if pro_count == 3 else 1
result.extend(_make_pro_sentences(pro_phrases, pro_count))
result.extend(_make_mixed_sentences(mixed_phrases, max_count=mixed_count))
result.extend(_make_con_sentences(con_phrases, count=1))
elif mode == "con_dominated":
# 1 pro sentence (or acknowledgment), maybe 1 mixed, 2-3 con. Total <= 4
con_count = 3 if n_con >= 3 else 2
mixed_count = 0 if con_count == 3 else 1
if n_pro >= 1:
p0 = pro_phrases[0]
result.append(random.choice([
f"While one person praised its {p0}, many users highlighted several cons.",
f"While the {p0} received some praise, customers frequently mentioned negative aspects.",
f"The {p0} is one of the few aspects that received positive feedback.",
]))
else:
result.append(random.choice([
"Users frequently mentioned negative aspects and cons regarding this product.",
"Customer reviews highlighted several cons and negative aspects of this product.",
"Many reviewers pointed out negative aspects and areas for improvement."
]))
result.extend(_make_mixed_sentences(mixed_phrases, max_count=mixed_count))
result.extend(_make_con_sentences(con_phrases, count=con_count))
else: # balanced
# 2 pro sentences, maybe 1 mixed, 1-2 con sentences. Total <= 4
result.extend(_make_pro_sentences(pro_phrases, count=2))
if mixed_phrases:
result.extend(_make_mixed_sentences(mixed_phrases, max_count=1))
result.extend(_make_con_sentences(con_phrases, count=1))
else:
result.extend(_make_con_sentences(con_phrases, count=2))
# Fallback
if not result:
result.append("This product has not received enough detailed feedback to extract highlights.")
return result[:4]
def classify_aspects(review_text: str, aspects: list[str]) -> list[dict]:
"""Run the DeBERTa ABSA model on each aspect within the review context (single review, backward-compatible)."""
if not aspects:
return []
inputs = [f"[CLS] {review_text} [SEP] {aspect} [SEP]" for aspect in aspects]
outputs = ABSA_PIPELINE(inputs, batch_size=32)
results = []
for aspect, out in zip(aspects, outputs):
label = out["label"]
score = out["score"]
results.append({
"aspect": aspect,
"sentiment": label,
"confidence": round(score, 4),
})
return results
def classify_aspects_batch(items: list[tuple[str, list[str]]]) -> list[dict]:
"""Batch-classify all (review_text, aspects) pairs in a single pipeline call.
items: list of (review_text, aspects_list) tuples.
Returns a flat list of {aspect, sentiment, confidence} dicts.
"""
all_inputs = []
all_aspects = []
for review_text, aspects in items:
for aspect in aspects:
all_inputs.append(f"[CLS] {review_text} [SEP] {aspect} [SEP]")
all_aspects.append(aspect)
if not all_inputs:
return []
outputs = ABSA_PIPELINE(all_inputs, batch_size=32)
results = []
for aspect, out in zip(all_aspects, outputs):
results.append({
"aspect": aspect,
"sentiment": out["label"],
"confidence": round(out["score"], 4),
})
return results
def aggregate_pros_cons(
all_aspect_sentiments: list[dict],
total_reviews: int,
min_confidence: float = DEFAULT_MIN_CONFIDENCE,
threshold_divisor: float = 4.0,
) -> dict:
"""
Aggregate aspect sentiments across all reviews into a product-level summary.
Counts positive and negative mentions per normalized aspect and returns:
- highlights: ranked aspect summaries with mention counts; each item's
``summary`` field contains the advisory-tone sentence for that aspect
- pros/cons: summary lines split by dominant sentiment
Thresholds are computed as:
- threshold = total_reviews / threshold_divisor
"""
pos_counts: Counter = Counter()
neg_counts: Counter = Counter()
for item in all_aspect_sentiments:
if item["confidence"] < min_confidence:
continue
aspect = _normalize_aspect(item["aspect"])
if item["sentiment"] == "Positive":
pos_counts[aspect] += 1
elif item["sentiment"] == "Negative":
neg_counts[aspect] += 1
safe_total_reviews = max(total_reviews, 1)
safe_divisor = threshold_divisor if threshold_divisor > 0 else 4.0
threshold = safe_total_reviews / safe_divisor
highlights = []
all_aspects = set(pos_counts.keys()) | set(neg_counts.keys())
for aspect in all_aspects:
if not aspect: # skip empty-string keys left by failed normalization
continue
pos = pos_counts.get(aspect, 0)
neg = neg_counts.get(aspect, 0)
total = pos + neg
if total == 0:
continue
highlights.append(
{
"aspect": aspect,
"summary": _make_aspect_summary_line(aspect, pos, neg, threshold),
"positive_mentions": pos,
"negative_mentions": neg,
"total_mentions": total,
}
)
highlights.sort(key=lambda item: item["total_mentions"], reverse=True)
pros = [
item["summary"]
for item in highlights
if (
item["positive_mentions"] > item["negative_mentions"]
and item["positive_mentions"] >= threshold
)
]
cons = [
item["summary"]
for item in highlights
if (
item["negative_mentions"] > item["positive_mentions"]
and item["negative_mentions"] >= threshold
)
]
# ── Overall advisory summary (uses ALL highlights, no threshold) ──
advisory_sentences = generate_summary(highlights)
final_highlights = []
for i, text in enumerate(advisory_sentences):
if i < len(highlights):
h = highlights[i]
final_highlights.append({
"aspect": h["aspect"],
"summary": text,
"positive_mentions": h["positive_mentions"],
"negative_mentions": h["negative_mentions"],
"total_mentions": h["total_mentions"]
})
else:
final_highlights.append({
"aspect": f"summary_{i}",
"summary": text,
"positive_mentions": 0,
"negative_mentions": 0,
"total_mentions": 0
})
return {
"highlights": final_highlights,
"pros": pros,
"cons": cons,
}