| import re |
| from collections import Counter |
| from nltk.stem import PorterStemmer |
| from models import ABSA_PIPELINE, SPACY_NLP, TRANSLATOR_AR_EN |
|
|
| _stemmer = PorterStemmer() |
|
|
|
|
| _ASPECT_STOPWORDS = { |
| "the", "a", "an", "this", "that", "these", "those", "my", "our", "your", "their", |
| "everything", "nothing", "something", "anything", "it", "one", "ones", |
| "pros", "cons", "pro", "con", "review", "reviews", "star", "stars" |
| } |
|
|
| |
| _ABSTRACT_NOUNS = { |
| "difference", "upgrade", "downgrade", "improvement", "issue", "problem", |
| "compromise", |
| "thing", "stuff", "way", "lot", "kind", "type", "sort", "bit", |
| "matter", "deal", "point", "reason", "result", "change", "experience", |
| "time", "part", |
| "feature", "product", "model", "item", "unit", "option", "choice", |
| "purchase", "buy", "use", "value", "overall", |
| "look", "dream", "feel", "aspect", "detail", |
| "drawback", "alternative", "comparison", "competitor", "competitor", |
| "advantage", "disadvantage", "substitute", "rival", "replacement", |
| "day", |
| } |
|
|
| |
| _FILLER_POS = {"DET", "PRON", "ADP", "PART", "PUNCT", "SPACE", "SYM"} |
|
|
| |
| _MODEL_CODE_RE = re.compile(r"^[a-z]{0,3}\d+[a-z]{0,3}$") |
|
|
| |
| _NON_LATIN_RE = re.compile(r"[^\x00-\x7F]") |
|
|
| |
| DEFAULT_MIN_CONFIDENCE = 0.85 |
|
|
| |
| _ARABIC_RE = re.compile(r"[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF]+") |
|
|
| |
| _URL_RE = re.compile(r"https?://\S+", re.IGNORECASE) |
|
|
|
|
| def _is_arabic(text: str) -> bool: |
| """Return True if the text contains a significant amount of Arabic characters.""" |
| arabic_chars = len(_ARABIC_RE.findall(text)) |
| total_alpha = len(re.findall(r"[a-zA-Z\u0600-\u06FF]", text)) |
| if total_alpha == 0: |
| return False |
| return arabic_chars / total_alpha > 0.3 |
|
|
|
|
| def _translate_to_english(text: str) -> str: |
| """Translate Arabic text to English using Helsinki-NLP/opus-mt-ar-en. |
| |
| Returns the translated string, or the original text on failure. |
| """ |
| try: |
| result = TRANSLATOR_AR_EN(text, max_length=512) |
| if result and isinstance(result, list): |
| translated = (result[0].get("translation_text") or "").strip() |
| if translated: |
| return translated |
| except Exception: |
| pass |
| return text |
|
|
|
|
| def _is_model_code(text: str) -> bool: |
| """Return True if the text looks like a product model code (e.g. a6400, rx100).""" |
| tokens = text.strip().split() |
| if len(tokens) == 1: |
| tok = tokens[0] |
| |
| if "-" in tok: |
| return False |
| has_letters = bool(re.search(r"[a-z]", tok)) |
| has_digits = bool(re.search(r"\d", tok)) |
| return has_letters and has_digits and len(tok) <= 8 |
| return False |
|
|
|
|
| def _build_product_stem_set(product_title: str, product_tags: list[str] = None, product_categories: list[str] = None) -> set[str]: |
| """Build a lower-cased set of meaningful word stems from a product title, tags, and categories. |
| Uses PorterStemmer instead of spaCy for near-instant execution. |
| Used as a robust filter to skip extracted aspects that merely refer to the |
| product itself or its category (e.g. 'phone' from tags ['smart phone']). |
| """ |
| elements = [] |
| if product_title: elements.append(product_title) |
| if product_tags: elements.extend(product_tags) |
| if product_categories: elements.extend(product_categories) |
|
|
| text_before = " ".join(elements).lower() |
| |
| |
| compounds = { |
| "smartphone": "smart phone", |
| "smartwatch": "smart watch", |
| "smartband": "smart band", |
| "headphone": "head phone", |
| "earphone": "ear phone", |
| "earbud": "ear bud", |
| "mousepad": "mouse pad", |
| "webcam": "web cam" |
| } |
| |
| text_after = text_before |
| for comp, expanded in compounds.items(): |
| text_after = text_after.replace(comp, expanded) |
|
|
| stems = set() |
| trivial = {"the", "a", "an", "for", "with", "and", "or", "of", "in", "by", "to"} |
| |
| for word in re.findall(r'[a-z]+', text_before + " " + text_after): |
| if word not in trivial and len(word) > 1: |
| stems.add(_stemmer.stem(word)) |
| |
| |
| for elem in elements: |
| clean_elem = elem.strip().lower() |
| if len(clean_elem) > 1 and clean_elem not in trivial: |
| stems.add(clean_elem) |
| |
| return stems |
|
|
|
|
| |
| def _build_product_lemma_set(product_title: str, product_tags: list[str] = None, product_categories: list[str] = None) -> set[str]: |
| """Backward-compatible alias — now uses stemming internally.""" |
| return _build_product_stem_set(product_title, product_tags, product_categories) |
|
|
|
|
| def _clean_review_text(text: str) -> str: |
| """Clean scraped/poorly-formatted review content before NLP processing. |
| |
| Many review records in the DB are comma-joined concatenations of multiple |
| fields (title, rating label, image URL, short phrases, etc.). |
| This function: |
| 1. Removes all URLs / image links. |
| 2. Splits on commas and newlines. |
| 3. Discards fragments that look like noise: a bare '-', a single word |
| <= 3 chars (e.g. '-', 'Ok'), pure numbers, or lone punctuation. |
| 4. Rejoins surviving fragments with a period-space so spaCy sees proper |
| sentence boundaries. |
| """ |
| |
| text = _URL_RE.sub("", text) |
|
|
| |
| fragments = re.split(r"[,\n]+", text) |
|
|
| good: list[str] = [] |
| for frag in fragments: |
| frag = frag.strip().strip("'\"") |
| |
| if not frag: |
| continue |
| if re.fullmatch(r"[-\d\s!?.]+", frag): |
| continue |
| |
| |
| words = frag.split() |
| if len(words) >= 2: |
| good.append(frag) |
| elif len(words) == 1 and len(words[0]) >= 4 and words[0].isalpha(): |
| good.append(frag) |
|
|
| if not good: |
| |
| return _URL_RE.sub("", text).strip() |
|
|
| return ". ".join(good) |
|
|
|
|
| def _extract_aspects_from_doc(doc, product_stems: set[str]) -> list[str]: |
| """Extract candidate aspect terms from a pre-processed spaCy Doc. |
| |
| This is the core extraction logic shared by both single-text and batch modes. |
| Uses stem-based comparison for product-word filtering. |
| """ |
| aspects: set[str] = set() |
| for chunk in doc.noun_chunks: |
| cleaned = re.sub(r"^(and|or|but)\s+", "", chunk.text.strip().lower()) |
|
|
| |
| if _NON_LATIN_RE.search(cleaned): |
| continue |
|
|
| |
| if len(cleaned) <= 1 or all(tok.pos_ in _FILLER_POS for tok in chunk): |
| continue |
|
|
| |
| content_tokens = [tok for tok in chunk if tok.pos_ not in _FILLER_POS] |
| if not content_tokens: |
| continue |
|
|
| content_words = {tok.text.lower() for tok in content_tokens if tok.is_alpha} |
|
|
| |
| if content_words and content_words.issubset(_ASPECT_STOPWORDS): |
| continue |
|
|
| |
| if len(content_tokens) == 1 and _is_model_code(content_tokens[0].text.lower()): |
| continue |
|
|
| |
| if chunk.root.lemma_.lower() in _ABSTRACT_NOUNS: |
| continue |
|
|
| |
| if not any(tok.pos_ in {"NOUN", "PROPN"} for tok in content_tokens): |
| continue |
| |
| |
| content_stems = {_stemmer.stem(tok.text.lower()) for tok in content_tokens if tok.is_alpha} |
| if product_stems and content_stems: |
| if content_stems.issubset(product_stems): |
| continue |
|
|
| aspects.add(cleaned) |
| return list(aspects) |
|
|
|
|
| def extract_aspects(text: str, product_title: str = "", product_tags: list[str] = None, product_categories: list[str] = None, product_lemmas: set[str] = None) -> list[str]: |
| """Extract candidate aspect terms from a single text (backward-compatible). |
| |
| For batch processing, use extract_aspects_batch() instead. |
| """ |
| if product_lemmas is None: |
| product_lemmas = _build_product_stem_set(product_title, product_tags, product_categories) |
|
|
| text = _clean_review_text(text) |
| doc = SPACY_NLP(text) |
| return _extract_aspects_from_doc(doc, product_lemmas) |
|
|
|
|
| def extract_aspects_batch(texts: list[str], product_stems: set[str]) -> list[list[str]]: |
| """Extract aspects from multiple texts using spaCy's nlp.pipe() for speed. |
| |
| Returns a list of aspect-lists, one per input text. |
| """ |
| cleaned_texts = [_clean_review_text(t) for t in texts] |
| nlp = SPACY_NLP |
| |
| |
| docs = list(nlp.pipe(cleaned_texts, batch_size=64)) |
| return [_extract_aspects_from_doc(doc, product_stems) for doc in docs] |
|
|
|
|
| def _extract_sentences(text: str) -> list[str]: |
| """Split text into individual sentences.""" |
| return [s.strip() for s in re.split(r'(?<=[.!?])\s+', text) if s.strip()] |
|
|
|
|
| def _normalize_aspect(aspect: str) -> str: |
| """Normalize extracted aspects so similar mentions are grouped together.""" |
| |
| if re.search(r"[\u0600-\u06FF]", aspect): |
| return "" |
| raw_tokens = aspect.lower().strip().split() |
| tokens = [] |
| for tok in raw_tokens: |
| clean = re.sub(r"^[^a-z0-9\-]+|[^a-z0-9\-]+$", "", tok) |
| if clean and clean not in _ASPECT_STOPWORDS: |
| tokens.append(clean) |
| if not tokens: |
| return "" |
| return " ".join(tokens[:3]) |
|
|
|
|
| def _dedupe_preserve_order(sentences: list[str]) -> list[str]: |
| """Return unique non-empty sentences while preserving first-seen order.""" |
| seen: set[str] = set() |
| unique: list[str] = [] |
| for raw in sentences: |
| sentence = (raw or "").strip() |
| if not sentence: |
| continue |
| key = sentence.lower() |
| if key in seen: |
| continue |
| seen.add(key) |
| unique.append(sentence) |
| return unique |
|
|
|
|
| def _get_best_sentence(sentences: list[str]) -> str: |
| """Return the most informative sentence as a fallback highlight.""" |
| unique_sentences = _dedupe_preserve_order(sentences) |
| if not unique_sentences: |
| return "" |
| |
| selected = sorted(unique_sentences, key=len, reverse=True) |
| return selected[0].rstrip(".?!") |
|
|
|
|
| def _make_aspect_summary_line( |
| aspect: str, |
| pos: int, |
| neg: int, |
| threshold: float, |
| ) -> str: |
| """Build a concise, punchy bullet point for the UI pros/cons lists.""" |
| aspect_name = aspect.lower() |
| |
| if pos >= threshold and neg >= threshold: |
| return f"Mixed feedback on {aspect_name}" |
|
|
| if pos >= threshold: |
| return f"Excellent {aspect_name}" |
|
|
| if neg >= threshold: |
| return f"Issues with {aspect_name}" |
| |
| return "" |
|
|
|
|
| def _strip_leading_article(text: str) -> str: |
| """Remove a leading 'the/a/an' so templates can add their own article.""" |
| stripped = text.lstrip() |
| for art in ("the ", "a ", "an "): |
| if stripped.lower().startswith(art): |
| return stripped[len(art):] |
| return stripped |
|
|
|
|
| def _extract_short_phrase(sentences: list[str], aspect_name: str) -> str: |
| """Return a clean noun-phrase descriptor for use in advisory templates.""" |
| return aspect_name |
|
|
|
|
| def generate_summary(highlights: list[dict]) -> list[str]: |
| """Produce 3–4 advisory-tone sentences summarising all reviews. |
| |
| Sentence distribution follows Noon.com advisory rules: |
| - pros >> cons → 2-3 pro sentences, 1 con sentence |
| - cons >> pros → 1 pro sentence, 2-3 con sentences |
| - balanced → 2 pro sentences, 2 con sentences |
| Mixed-sentiment aspects get their own "receives mixed feedback" sentence. |
| |
| Always outputs English with advisory phrasing. |
| """ |
| import random |
|
|
| |
| pro_highlights = [] |
| mixed_highlights = [] |
| con_highlights = [] |
|
|
| for h in highlights: |
| pos = h["positive_mentions"] |
| neg = h["negative_mentions"] |
| if pos > neg * 2: |
| pro_highlights.append(h) |
| elif neg > pos * 2: |
| con_highlights.append(h) |
| else: |
| mixed_highlights.append(h) |
|
|
| |
| pro_phrases = [p for p in [_strip_leading_article(h["aspect"]) for h in pro_highlights] if p.strip()] |
| con_phrases = [p for p in [_strip_leading_article(h["aspect"]) for h in con_highlights] if p.strip()] |
| mixed_phrases = [p for p in [_strip_leading_article(h["aspect"]) for h in mixed_highlights] if p.strip()] |
|
|
| n_pro = len(pro_phrases) |
| n_con = len(con_phrases) |
|
|
| |
| |
| |
| |
| if n_pro > n_con * 2: |
| mode = "pro_dominated" |
| elif n_con > n_pro * 2: |
| mode = "con_dominated" |
| else: |
| mode = "balanced" |
|
|
| result: list[str] = [] |
|
|
| |
| def _make_pro_sentences(phrases, count): |
| """Generate `count` pro sentences from the available phrases without repeating any.""" |
| sents = [] |
| if not phrases: |
| return sents |
|
|
| idx = 0 |
| |
| |
| if count >= 1 and idx < len(phrases): |
| rem = len(phrases) - idx |
| if rem >= 2: |
| p0, p1 = phrases[idx], phrases[idx+1] |
| idx += 2 |
| sents.append(random.choice([ |
| f"This product is widely praised for its excellent {p0} and {p1}.", |
| f"Many people appreciate its {p0} and {p1}, calling it a great choice.", |
| f"Customers frequently highlight the {p0} and {p1}.", |
| ])) |
| else: |
| p0 = phrases[idx] |
| idx += 1 |
| sents.append(random.choice([ |
| f"This product is praised for its {p0}.", |
| f"Many people appreciate the {p0}.", |
| f"Customers frequently highlight the {p0}.", |
| ])) |
|
|
| |
| if count >= 2 and idx < len(phrases): |
| rem = len(phrases) - idx |
| if rem >= 2: |
| p0, p1 = phrases[idx], phrases[idx+1] |
| idx += 2 |
| sents.append(random.choice([ |
| f"Both the {p0} and {p1} are also frequently highlighted by buyers.", |
| f"The {p0} and {p1} receive consistently positive feedback.", |
| f"Users are happy with the {p0}, often describing the {p1} as a major plus.", |
| ])) |
| else: |
| p = phrases[idx] |
| idx += 1 |
| sents.append(random.choice([ |
| f"Its {p} is also frequently highlighted by buyers.", |
| f"The {p} receives consistently positive feedback.", |
| f"Users love its {p}, often describing it as a major plus.", |
| ])) |
|
|
| |
| if count >= 3 and idx < len(phrases): |
| rem = len(phrases) - idx |
| if rem >= 2: |
| p0, p1 = phrases[idx], phrases[idx+1] |
| idx += 2 |
| sents.append(random.choice([ |
| f"Positive mentions of the {p0} and {p1} further support its appeal.", |
| f"The {p0} and {p1} offer a great overall experience that many customers appreciate.", |
| f"Buyers are particularly satisfied with the {p0} and {p1} as well.", |
| ])) |
| else: |
| p = phrases[idx] |
| idx += 1 |
| sents.append(random.choice([ |
| f"Positive mentions of the {p} further support its appeal.", |
| f"The {p} offers a great overall experience that many customers appreciate.", |
| f"Buyers are particularly satisfied with the {p} as well.", |
| ])) |
|
|
| return sents |
|
|
| |
| def _make_con_sentences(phrases, count): |
| """Generate `count` con sentences from the available phrases without repeating any.""" |
| sents = [] |
| if not phrases: |
| return sents |
|
|
| idx = 0 |
|
|
| |
| if count >= 1 and idx < len(phrases): |
| rem = len(phrases) - idx |
| if rem >= 3: |
| joined = ", ".join(phrases[idx:idx+2]) + ", and " + phrases[idx+2] |
| idx += 3 |
| sents.append(random.choice([ |
| f"Some users reported concerns with {joined}.", |
| f"A significant number of users raised concerns regarding {joined}.", |
| f"Issues with {joined} were common complaints.", |
| ])) |
| elif rem == 2: |
| c0, c1 = phrases[idx], phrases[idx+1] |
| idx += 2 |
| sents.append(random.choice([ |
| f"Some users reported issues with {c0} and {c1}.", |
| f"Common concerns include the {c0} and {c1}.", |
| f"Issues with {c0} and {c1} were frequent complaints.", |
| ])) |
| else: |
| c0 = phrases[idx] |
| idx += 1 |
| sents.append(random.choice([ |
| f"A major concern is the {c0}.", |
| f"Some users reported issues with {c0}.", |
| f"A frequent complaint is the {c0}." |
| ])) |
|
|
| |
| if count >= 2 and idx < len(phrases): |
| rem = len(phrases) - idx |
| if rem >= 2: |
| joined = " and ".join(phrases[idx:idx+2]) |
| idx += 2 |
| sents.append(random.choice([ |
| f"Many people have raised serious concerns about the {joined}.", |
| f"A significant number of users reported that the {joined} simply does not meet expectations.", |
| f"Dissatisfaction with {joined} was a common theme among reviews.", |
| ])) |
| else: |
| c = phrases[idx] |
| idx += 1 |
| sents.append(random.choice([ |
| f"Many people have raised concerns about the {c}.", |
| f"A significant number of users reported that the {c} does not meet expectations.", |
| f"The {c} is frequently mentioned as a negative point.", |
| ])) |
|
|
| |
| if count >= 3 and idx < len(phrases): |
| rem = len(phrases) - idx |
| if rem >= 2: |
| joined = " and ".join(phrases[idx:idx+2]) |
| idx += 2 |
| sents.append(random.choice([ |
| f"Issues with {joined} were also noted across multiple reviews.", |
| f"The {joined} can be problematic, leading to frustration for some buyers.", |
| f"Overall, many customers noted these areas as needing improvement.", |
| ])) |
| else: |
| c = phrases[idx] |
| idx += 1 |
| sents.append(random.choice([ |
| f"Issues with the {c} were also noted across multiple reviews.", |
| f"The {c} can be problematic, leading to frustration for some buyers.", |
| f"Overall, many customers noted this area as needing improvement.", |
| ])) |
|
|
| return sents |
|
|
| |
| def _make_mixed_sentences(phrases, max_count=1): |
| sents = [] |
| if not phrases: |
| return sents |
| |
| idx = 0 |
| if max_count >= 1 and idx < len(phrases): |
| m0 = phrases[idx] |
| idx += 1 |
| sents.append(random.choice([ |
| f"The {m0} receives mixed feedback, with many finding it average or disappointing.", |
| f"The {m0} gets mixed feedback; some find it excellent, others feel it's disappointing.", |
| f"{m0.capitalize()} can be inconsistent, so it's often a point of debate among buyers.", |
| ])) |
| if max_count >= 2 and idx < len(phrases): |
| m1 = phrases[idx] |
| idx += 1 |
| sents.append(random.choice([ |
| f"Similarly, the {m1} receives mixed feedback from customers.", |
| f"The {m1} also gets divided opinions; results may vary.", |
| ])) |
| return sents |
|
|
| |
| if mode == "pro_dominated": |
| |
| pro_count = 3 if n_pro >= 3 else 2 |
| mixed_count = 0 if pro_count == 3 else 1 |
| |
| result.extend(_make_pro_sentences(pro_phrases, pro_count)) |
| result.extend(_make_mixed_sentences(mixed_phrases, max_count=mixed_count)) |
| result.extend(_make_con_sentences(con_phrases, count=1)) |
|
|
| elif mode == "con_dominated": |
| |
| con_count = 3 if n_con >= 3 else 2 |
| mixed_count = 0 if con_count == 3 else 1 |
|
|
| if n_pro >= 1: |
| p0 = pro_phrases[0] |
| result.append(random.choice([ |
| f"While one person praised its {p0}, many users highlighted several cons.", |
| f"While the {p0} received some praise, customers frequently mentioned negative aspects.", |
| f"The {p0} is one of the few aspects that received positive feedback.", |
| ])) |
| else: |
| result.append(random.choice([ |
| "Users frequently mentioned negative aspects and cons regarding this product.", |
| "Customer reviews highlighted several cons and negative aspects of this product.", |
| "Many reviewers pointed out negative aspects and areas for improvement." |
| ])) |
| |
| result.extend(_make_mixed_sentences(mixed_phrases, max_count=mixed_count)) |
| result.extend(_make_con_sentences(con_phrases, count=con_count)) |
|
|
| else: |
| |
| result.extend(_make_pro_sentences(pro_phrases, count=2)) |
| if mixed_phrases: |
| result.extend(_make_mixed_sentences(mixed_phrases, max_count=1)) |
| result.extend(_make_con_sentences(con_phrases, count=1)) |
| else: |
| result.extend(_make_con_sentences(con_phrases, count=2)) |
|
|
| |
| if not result: |
| result.append("This product has not received enough detailed feedback to extract highlights.") |
|
|
| return result[:4] |
|
|
|
|
| def classify_aspects(review_text: str, aspects: list[str]) -> list[dict]: |
| """Run the DeBERTa ABSA model on each aspect within the review context (single review, backward-compatible).""" |
| if not aspects: |
| return [] |
| inputs = [f"[CLS] {review_text} [SEP] {aspect} [SEP]" for aspect in aspects] |
| outputs = ABSA_PIPELINE(inputs, batch_size=32) |
| results = [] |
| for aspect, out in zip(aspects, outputs): |
| label = out["label"] |
| score = out["score"] |
| results.append({ |
| "aspect": aspect, |
| "sentiment": label, |
| "confidence": round(score, 4), |
| }) |
| return results |
|
|
|
|
| def classify_aspects_batch(items: list[tuple[str, list[str]]]) -> list[dict]: |
| """Batch-classify all (review_text, aspects) pairs in a single pipeline call. |
| |
| items: list of (review_text, aspects_list) tuples. |
| Returns a flat list of {aspect, sentiment, confidence} dicts. |
| """ |
| all_inputs = [] |
| all_aspects = [] |
| for review_text, aspects in items: |
| for aspect in aspects: |
| all_inputs.append(f"[CLS] {review_text} [SEP] {aspect} [SEP]") |
| all_aspects.append(aspect) |
|
|
| if not all_inputs: |
| return [] |
|
|
| outputs = ABSA_PIPELINE(all_inputs, batch_size=32) |
| results = [] |
| for aspect, out in zip(all_aspects, outputs): |
| results.append({ |
| "aspect": aspect, |
| "sentiment": out["label"], |
| "confidence": round(out["score"], 4), |
| }) |
| return results |
|
|
|
|
| def aggregate_pros_cons( |
| all_aspect_sentiments: list[dict], |
| total_reviews: int, |
| min_confidence: float = DEFAULT_MIN_CONFIDENCE, |
| threshold_divisor: float = 4.0, |
| ) -> dict: |
| """ |
| Aggregate aspect sentiments across all reviews into a product-level summary. |
| |
| Counts positive and negative mentions per normalized aspect and returns: |
| - highlights: ranked aspect summaries with mention counts; each item's |
| ``summary`` field contains the advisory-tone sentence for that aspect |
| - pros/cons: summary lines split by dominant sentiment |
| |
| Thresholds are computed as: |
| - threshold = total_reviews / threshold_divisor |
| """ |
| pos_counts: Counter = Counter() |
| neg_counts: Counter = Counter() |
|
|
| for item in all_aspect_sentiments: |
| if item["confidence"] < min_confidence: |
| continue |
| aspect = _normalize_aspect(item["aspect"]) |
| if item["sentiment"] == "Positive": |
| pos_counts[aspect] += 1 |
| elif item["sentiment"] == "Negative": |
| neg_counts[aspect] += 1 |
|
|
| safe_total_reviews = max(total_reviews, 1) |
| safe_divisor = threshold_divisor if threshold_divisor > 0 else 4.0 |
| threshold = safe_total_reviews / safe_divisor |
|
|
| highlights = [] |
| all_aspects = set(pos_counts.keys()) | set(neg_counts.keys()) |
| for aspect in all_aspects: |
| if not aspect: |
| continue |
| pos = pos_counts.get(aspect, 0) |
| neg = neg_counts.get(aspect, 0) |
| total = pos + neg |
| if total == 0: |
| continue |
| highlights.append( |
| { |
| "aspect": aspect, |
| "summary": _make_aspect_summary_line(aspect, pos, neg, threshold), |
| "positive_mentions": pos, |
| "negative_mentions": neg, |
| "total_mentions": total, |
| } |
| ) |
|
|
| highlights.sort(key=lambda item: item["total_mentions"], reverse=True) |
|
|
| pros = [ |
| item["summary"] |
| for item in highlights |
| if ( |
| item["positive_mentions"] > item["negative_mentions"] |
| and item["positive_mentions"] >= threshold |
| ) |
| ] |
| cons = [ |
| item["summary"] |
| for item in highlights |
| if ( |
| item["negative_mentions"] > item["positive_mentions"] |
| and item["negative_mentions"] >= threshold |
| ) |
| ] |
|
|
| |
| advisory_sentences = generate_summary(highlights) |
|
|
| final_highlights = [] |
| for i, text in enumerate(advisory_sentences): |
| if i < len(highlights): |
| h = highlights[i] |
| final_highlights.append({ |
| "aspect": h["aspect"], |
| "summary": text, |
| "positive_mentions": h["positive_mentions"], |
| "negative_mentions": h["negative_mentions"], |
| "total_mentions": h["total_mentions"] |
| }) |
| else: |
| final_highlights.append({ |
| "aspect": f"summary_{i}", |
| "summary": text, |
| "positive_mentions": 0, |
| "negative_mentions": 0, |
| "total_mentions": 0 |
| }) |
|
|
| return { |
| "highlights": final_highlights, |
| "pros": pros, |
| "cons": cons, |
| } |
|
|