Spaces:
Running
Running
| # email_eval/api.py — v2.3, with LLM-based clarity and grammar | |
| # Six-metric evaluator for Subject + Body | |
| # - Clarity: LLM-based | |
| # - Length: hard-coded class-aware | |
| # - Spam: LLM counts for SAFE marketing phrases + heuristics | |
| # - Personalization: LLM cues + deterministic medium-best curve | |
| # - Tone: LLM flags + deterministic math | |
| # - Grammar: LLM-based | |
| # Exports: | |
| # - evaluate(subject, body, engine="openai", weights=None) -> dict | |
| # - metric_keys() -> list[str] | |
| from typing import Dict, Any, Tuple | |
| import regex as re | |
| import logging | |
| from .config import DEFAULT_WEIGHT_PRESETS, CLASS_BANDS, SPAM_TIER_WEIGHTS | |
| from .spam_llm import spam_counts | |
| from .personalization_llm import personalization_flags | |
| from .tone_llm import tone_flags | |
| from .clarity_llm import clarity_score | |
| from .grammar_llm import grammar_score | |
| from .preprocess import sentences, norm_text, word_count | |
| from .rules import PASSIVE_AGGRESSIVE, HOSTILE | |
| from .comments_llm import subjective_comments | |
| logging.basicConfig(level=logging.ERROR) # For error logging | |
| # ------------ public helper for UI/CSV order ------------ | |
| def metric_keys(): | |
| return ["clarity","length","spam_score","personalization","tone","grammatical_hygiene"] | |
| # ------------ weights / usage utils ------------ | |
| def _normalize_weights(weights: Dict[str, float]) -> Dict[str, float]: | |
| if not weights: | |
| weights = DEFAULT_WEIGHT_PRESETS["research_defaults"] | |
| total = sum(max(0.0, float(v)) for v in weights.values()) or 1.0 | |
| scale = 6.0 / total | |
| return {k: max(0.0, float(v)) * scale for k, v in weights.items()} | |
| def _safe_sum_usage(u: Any) -> int: | |
| if not isinstance(u, dict): | |
| return 0 | |
| return int(u.get("prompt_tokens", 0)) + int(u.get("completion_tokens", 0)) | |
| # ------------ deterministic class heuristic ------------ | |
| def _infer_class(subject: str, body: str) -> str: | |
| s = f"{subject} {body}".lower() | |
| if any(k in s for k in ("invoice","receipt","order #","otp","password reset")): return "transactional" | |
| if any(k in s for k in ("promo","sale","discount","offer","exclusive","reward","prize","cash","limited time","act now")): return "promo" | |
| if any(k in s for k in ("follow up","following up","gentle reminder")): return "follow_up" | |
| if any(k in s for k in ("support","ticket","issue id")): return "support" | |
| if any(k in s for k in ("intro","nice to meet","partnership","demo","outreach")): return "outreach" | |
| return "internal_request" | |
| # ------------ length scorer (class-aware) ------------ | |
| def _score_length(subject: str, body: str, klass: str) -> Tuple[float, list]: | |
| cfg = CLASS_BANDS.get(klass, CLASS_BANDS["internal_request"]) | |
| subj_len = len(norm_text(subject)) | |
| wc = word_count(body) | |
| # subject 0..3 (peak 30–60; soft 20–80) | |
| smin, smax = 20, 80 | |
| if 30 <= subj_len <= 60: | |
| subj_score = 3 | |
| elif smin <= subj_len <= smax: | |
| subj_score = 2 | |
| else: | |
| subj_score = 1 if subj_len > 0 else 0 | |
| # body 0..7 (ideal band -> 7; good band -> >=4; outside -> down to 0) | |
| (i_lo, i_hi), (g_lo, g_hi) = cfg["ideal"], cfg["good"] | |
| if i_lo <= wc <= i_hi: | |
| bscore = 7.0 | |
| elif g_lo <= wc <= g_hi: | |
| # linear falloff to 4 at good edges | |
| span = (g_hi - g_lo) or 1 | |
| dist_from_center = abs(wc - (i_lo + i_hi)/2) | |
| bscore = max(4.0, 7.0 - 3.0 * (dist_from_center / (span/2))) | |
| else: | |
| # quadratic penalty | |
| d = min(abs(wc - g_lo), abs(wc - g_hi)) | |
| bscore = max(0.0, 4.0 - (d/50.0)**2) | |
| reasons = [f"subject_len={subj_len}", f"body_wc={wc}", f"class={klass}"] | |
| return round(max(0.0, min(10.0, subj_score + bscore)), 2), reasons | |
| # ------------ spam scorer ------------ | |
| def _score_spam(subject: str, body: str, llm_counts=None, html_ratio_bad=False) -> Tuple[float, list]: | |
| score = 10.0; reasons=[] | |
| subj = norm_text(subject); txt = norm_text(body) | |
| # ALL CAPS subject | |
| if subj and subj.isupper(): | |
| score -= 2; reasons.append("ALL_CAPS_subject") | |
| # exclamations | |
| subj_ex = subj.count("!") | |
| tot_ex = subj_ex + txt.count("!") | |
| if subj_ex > 1: score -= 1; reasons.append("exclam>1_subject") | |
| if tot_ex > 2: score -= 1; reasons.append("exclam_total>2") | |
| # deterministic spam heuristics (urgency/reward/marketing/calls) | |
| trig = 0.0 | |
| import regex as re2 | |
| from .rules import SPAM_URGENCY, SPAM_REWARD, SPAM_CALLS, SPAM_MARKETING | |
| if any(re2.search(p, f"{subj} {txt}") for p in SPAM_URGENCY): | |
| trig += 1.25; reasons.append("urgency_markers") | |
| if any(re2.search(p, f"{subj} {txt}") for p in SPAM_REWARD): | |
| trig += 1.5; reasons.append("reward_claims") | |
| if any(re2.search(p, f"{subj} {txt}") for p in SPAM_CALLS): | |
| trig += 1.0; reasons.append("clickbait_calls") | |
| if any(re2.search(p, f"{subj} {txt}") for p in SPAM_MARKETING): | |
| trig += 0.75; reasons.append("marketing_phrases") | |
| if trig > 0: | |
| score -= min(6.0, trig) | |
| # lexicon (LLM counts only; no profanity lists) | |
| if llm_counts: | |
| penal = (llm_counts.get("A",0)*SPAM_TIER_WEIGHTS["A"] + | |
| llm_counts.get("B",0)*SPAM_TIER_WEIGHTS["B"] + | |
| llm_counts.get("C",0)*SPAM_TIER_WEIGHTS["C"]) | |
| penal = min(penal, 3.0) # cap lexicon penalty | |
| if penal > 0: | |
| score -= penal; reasons.append(f"lexicon_penalty={penal:.2f}") | |
| # optional HTML heuristic | |
| if html_ratio_bad: | |
| score -= 2; reasons.append("low_text_image_ratio") | |
| # Additional rule for consistency: too many links or URLs | |
| url_count = len(re.findall(r"https?://", f"{subj} {txt}")) | |
| if url_count > 3: | |
| score -= 1; reasons.append(f"too_many_urls={url_count}") | |
| # If multiple high-risk indicators present, cap at <=6 even before LLM | |
| if any(r in reasons for r in ("reward_claims","clickbait_calls","urgency_markers")) and (subj.isupper() or txt.count("!") >= 2): | |
| score = min(score, 6.0) | |
| return round(max(0.0, min(10.0, score)), 2), reasons | |
| # ------------ personalization scorer ------------ | |
| def _score_personalization(subject: str, body: str, cues, too_intrusive: bool) -> Tuple[float, list]: | |
| count = len(cues) | |
| relevant = sum(1 for c in cues if c.get("relevant")) | |
| # degree curve: medium best (research). | |
| if count == 0: base = 3 | |
| elif count == 1: base = 6 if relevant else 5 | |
| elif count == 2: base = 9 if relevant>=1 else 7 | |
| else: base = 6 if not too_intrusive else 5 | |
| subj_bonus = 1 if any(c.get("relevant") and c.get("text","") in (subject or "") for c in cues) else 0 | |
| score = max(0, min(10, base + subj_bonus)) | |
| reasons = [f"cues={count}", f"relevant={relevant}"] + (["too_intrusive"] if too_intrusive else []) | |
| return score, reasons | |
| GREETINGS = [r"(?i)^(hi|hello|good (morning|afternoon|evening)|dear)\b"] | |
| SIGNOFFS = [r"(?i)\b(regards|best|sincerely|thanks|thank you)\b"] | |
| # ------------ tone scorer ------------ | |
| def _score_tone(subject: str, body: str, flags: Dict) -> Tuple[float, list]: | |
| # Base below 10 so bonuses/penalties move meaningfully; audience-aware adjustment later | |
| score = 8.0; reasons=[] | |
| if any(re.search(p, body or "") for p in GREETINGS): score += 0.5; reasons.append("greeting") | |
| if any(re.search(p, body or "") for p in SIGNOFFS): score += 0.5; reasons.append("signoff") | |
| if (subject or "").isupper(): score -= 2; reasons.append("ALL_CAPS_subject") | |
| subj_ex = (subject or "").count("!") | |
| tot_ex = subj_ex + (body or "").count("!") | |
| if subj_ex > 1: score -= 1; reasons.append("exclam>1_subject") | |
| if tot_ex > 2: score -= 1; reasons.append("exclam_total>2") | |
| # emojis (simple heuristic) | |
| emojis = re.findall(r"[\p{Emoji}]", f"{subject or ''} {body or ''}") | |
| if len(emojis) > 1: | |
| score -= (len(emojis)-1); reasons.append(f"emoji_extra={len(emojis)-1}") | |
| # LLM flags | |
| if flags.get("too_aggressive"): score -= 1.5; reasons.append("too_aggressive") | |
| if flags.get("overly_casual_for_b2b"): score -= 0.75; reasons.append("overly_casual_for_b2b") | |
| if flags.get("passive_aggressive_markers"): score -= 0.5; reasons.append("passive_aggressive_markers") | |
| # Regex-based hostile/passive-aggressive detection | |
| if any(re.search(p, body or "") for p in HOSTILE): | |
| score -= 3.0; reasons.append("hostile_language") | |
| if any(re.search(p, body or "") for p in PASSIVE_AGGRESSIVE): | |
| score -= 1.0; reasons.append("passive_aggressive_phrasing") | |
| # Additional rule: polite markers bonus | |
| polite_count = len(re.findall(r"(?i)\b(please|thank you|thanks|appreciate)\b", body or "")) | |
| if polite_count > 0: | |
| score += min(0.25 * polite_count, 0.75); reasons.append(f"polite_markers={polite_count}") | |
| # Audience-aware adjustment: infer class, then prefer professional tone for business-like classes | |
| lower_body = (body or "").lower() | |
| is_family_like = any(k in lower_body for k in ("mom", "dad", "brother", "sister", "family", "love you")) | |
| if not is_family_like: | |
| # expect professional tone; penalize excessive informality/hostility further | |
| if any(re.search(p, body or "") for p in PASSIVE_AGGRESSIVE): | |
| score -= 0.5 | |
| if any(re.search(p, body or "") for p in HOSTILE): | |
| score -= 0.5 | |
| # Prevent saturation when negative markers present | |
| if any(t in reasons for t in ("too_aggressive","overly_casual_for_b2b","passive_aggressive_phrasing","hostile_language")): | |
| score = min(score, 9.0) | |
| return round(max(0.0, min(10.0, score)), 2), reasons | |
| # ------------------ main API ------------------ | |
| def evaluate(subject: str, body: str, engine: str = "openai", weights: Dict[str, float] = None) -> Dict[str, Any]: | |
| subject, body = subject or "", body or "" | |
| engine = engine or "openai" | |
| W = _normalize_weights(weights or DEFAULT_WEIGHT_PRESETS["research_defaults"]) | |
| # class for length | |
| klass = _infer_class(subject, body) | |
| # 1) clarity (LLM-based) | |
| try: | |
| c_score, c_details = clarity_score(subject, body, engine) | |
| c_reasons = [f"ask_signals={len(c_details['llm'].get('ask_signals', []))}", f"subject_useful={c_details['llm'].get('subject_useful', False)}", f"intro_clear={c_details['llm'].get('intro_clear', False)}", "source=llm"] | |
| except Exception as e: | |
| logging.error(f"Clarity failed: {e}") | |
| c_score, c_reasons = 0.0, ["llm_failed"] | |
| c_usage = c_details.get("usage", {}) if 'c_details' in locals() else {} | |
| # 2) length | |
| l_score, l_reasons = _score_length(subject, body, klass) | |
| # 3) spam (LLM counts + heuristics) | |
| try: | |
| sc_counts, sc_usage = spam_counts(subject, body, engine=engine) | |
| except Exception as e: | |
| logging.error(f"Spam counts failed: {e}") | |
| sc_counts, sc_usage = {"A":0,"B":0,"C":0}, {} | |
| s_score, s_reasons = _score_spam(subject, body, llm_counts=sc_counts, html_ratio_bad=False) | |
| # 4) personalization (LLM cues + deterministic curve) | |
| try: | |
| p_flags, p_usage = personalization_flags(subject, body, engine=engine) | |
| if not isinstance(p_flags, dict): p_flags = {"cues": [], "too_intrusive": False} | |
| except Exception as e: | |
| logging.error(f"Personalization flags failed: {e}") | |
| p_flags, p_usage = {"cues": [], "too_intrusive": False}, {} | |
| p_score, p_reasons = _score_personalization(subject, body, p_flags.get("cues", []), bool(p_flags.get("too_intrusive", False))) | |
| # 5) tone (LLM flags + deterministic math) | |
| try: | |
| t_flags, t_usage = tone_flags(subject, body, engine=engine) | |
| if not isinstance(t_flags, dict): | |
| t_flags = {"too_aggressive": False, "overly_casual_for_b2b": False, "passive_aggressive_markers": []} | |
| except Exception as e: | |
| logging.error(f"Tone flags failed: {e}") | |
| t_flags, t_usage = {"too_aggressive": False, "overly_casual_for_b2b": False, "passive_aggressive_markers": []}, {} | |
| t_score, t_reasons = _score_tone(subject, body, t_flags) | |
| # 6) grammar (LLM-based) | |
| try: | |
| g_score, g_reasons, g_usage = grammar_score(subject, body, engine) | |
| except Exception as e: | |
| logging.error(f"Grammar failed: {e}") | |
| g_score, g_reasons, g_usage = 8.0, ["llm_failed"], {} | |
| # aggregate | |
| scores = { | |
| "clarity": float(round(c_score, 2)), | |
| "length": float(round(l_score, 2)), | |
| "spam_score": float(round(s_score, 2)), | |
| "personalization": float(round(p_score, 2)), | |
| "tone": float(round(t_score, 2)), | |
| "grammatical_hygiene": float(round(g_score, 2)), | |
| } | |
| denom = sum(W.get(k, 0.0) for k in metric_keys()) or 1.0 | |
| weighted_total = float(round(max(0.0, min(10.0, sum(W[k]*scores[k] for k in metric_keys())/denom)), 2)) | |
| explanations = { | |
| "clarity": c_reasons, | |
| "length": l_reasons, | |
| "spam_score": s_reasons, | |
| "personalization": p_reasons, | |
| "tone": t_reasons, | |
| "grammatical_hygiene": g_reasons, | |
| } | |
| # usage (only for LLM-backed features that actually ran) | |
| def _u(x): return _safe_sum_usage(x) | |
| usage = {"openai_total": 0, "claude_total": 0, "total": 0} | |
| if engine == "openai": | |
| usage["openai_total"] = _u(sc_usage) + _u(p_usage) + _u(t_usage) + _u(c_usage) + _u(g_usage) | |
| else: | |
| usage["claude_total"] = _u(sc_usage) + _u(p_usage) + _u(t_usage) + _u(c_usage) + _u(g_usage) | |
| usage["total"] = usage["openai_total"] + usage["claude_total"] | |
| # subjective LLM comments | |
| try: | |
| comm_data, comm_usage = subjective_comments(subject, body, scores, explanations, engine=engine) | |
| except Exception as e: | |
| logging.error(f"Subjective comments failed: {e}") | |
| comm_data, comm_usage = {}, {} | |
| if engine == "openai": | |
| usage["openai_total"] += _u(comm_usage) | |
| else: | |
| usage["claude_total"] += _u(comm_usage) | |
| usage["total"] += _u(comm_usage) | |
| return { | |
| "class": klass, | |
| "scores": scores, | |
| "weighted_total": weighted_total, | |
| "explanations": explanations, | |
| "comments": comm_data, | |
| "usage": usage, | |
| "meta": {"engine": engine, "weights": W, "version": "2.3"}, | |
| } |