navaneethkrishnan's picture
Upload 10 files
9ea5e05 verified
# email_eval/api.py — v2.3, with LLM-based clarity and grammar
# Six-metric evaluator for Subject + Body
# - Clarity: LLM-based
# - Length: hard-coded class-aware
# - Spam: LLM counts for SAFE marketing phrases + heuristics
# - Personalization: LLM cues + deterministic medium-best curve
# - Tone: LLM flags + deterministic math
# - Grammar: LLM-based
# Exports:
# - evaluate(subject, body, engine="openai", weights=None) -> dict
# - metric_keys() -> list[str]
from typing import Dict, Any, Tuple
import regex as re
import logging
from .config import DEFAULT_WEIGHT_PRESETS, CLASS_BANDS, SPAM_TIER_WEIGHTS
from .spam_llm import spam_counts
from .personalization_llm import personalization_flags
from .tone_llm import tone_flags
from .clarity_llm import clarity_score
from .grammar_llm import grammar_score
from .preprocess import sentences, norm_text, word_count
from .rules import PASSIVE_AGGRESSIVE, HOSTILE
from .comments_llm import subjective_comments
logging.basicConfig(level=logging.ERROR) # For error logging
# ------------ public helper for UI/CSV order ------------
def metric_keys():
return ["clarity","length","spam_score","personalization","tone","grammatical_hygiene"]
# ------------ weights / usage utils ------------
def _normalize_weights(weights: Dict[str, float]) -> Dict[str, float]:
if not weights:
weights = DEFAULT_WEIGHT_PRESETS["research_defaults"]
total = sum(max(0.0, float(v)) for v in weights.values()) or 1.0
scale = 6.0 / total
return {k: max(0.0, float(v)) * scale for k, v in weights.items()}
def _safe_sum_usage(u: Any) -> int:
if not isinstance(u, dict):
return 0
return int(u.get("prompt_tokens", 0)) + int(u.get("completion_tokens", 0))
# ------------ deterministic class heuristic ------------
def _infer_class(subject: str, body: str) -> str:
s = f"{subject} {body}".lower()
if any(k in s for k in ("invoice","receipt","order #","otp","password reset")): return "transactional"
if any(k in s for k in ("promo","sale","discount","offer","exclusive","reward","prize","cash","limited time","act now")): return "promo"
if any(k in s for k in ("follow up","following up","gentle reminder")): return "follow_up"
if any(k in s for k in ("support","ticket","issue id")): return "support"
if any(k in s for k in ("intro","nice to meet","partnership","demo","outreach")): return "outreach"
return "internal_request"
# ------------ length scorer (class-aware) ------------
def _score_length(subject: str, body: str, klass: str) -> Tuple[float, list]:
cfg = CLASS_BANDS.get(klass, CLASS_BANDS["internal_request"])
subj_len = len(norm_text(subject))
wc = word_count(body)
# subject 0..3 (peak 30–60; soft 20–80)
smin, smax = 20, 80
if 30 <= subj_len <= 60:
subj_score = 3
elif smin <= subj_len <= smax:
subj_score = 2
else:
subj_score = 1 if subj_len > 0 else 0
# body 0..7 (ideal band -> 7; good band -> >=4; outside -> down to 0)
(i_lo, i_hi), (g_lo, g_hi) = cfg["ideal"], cfg["good"]
if i_lo <= wc <= i_hi:
bscore = 7.0
elif g_lo <= wc <= g_hi:
# linear falloff to 4 at good edges
span = (g_hi - g_lo) or 1
dist_from_center = abs(wc - (i_lo + i_hi)/2)
bscore = max(4.0, 7.0 - 3.0 * (dist_from_center / (span/2)))
else:
# quadratic penalty
d = min(abs(wc - g_lo), abs(wc - g_hi))
bscore = max(0.0, 4.0 - (d/50.0)**2)
reasons = [f"subject_len={subj_len}", f"body_wc={wc}", f"class={klass}"]
return round(max(0.0, min(10.0, subj_score + bscore)), 2), reasons
# ------------ spam scorer ------------
def _score_spam(subject: str, body: str, llm_counts=None, html_ratio_bad=False) -> Tuple[float, list]:
score = 10.0; reasons=[]
subj = norm_text(subject); txt = norm_text(body)
# ALL CAPS subject
if subj and subj.isupper():
score -= 2; reasons.append("ALL_CAPS_subject")
# exclamations
subj_ex = subj.count("!")
tot_ex = subj_ex + txt.count("!")
if subj_ex > 1: score -= 1; reasons.append("exclam>1_subject")
if tot_ex > 2: score -= 1; reasons.append("exclam_total>2")
# deterministic spam heuristics (urgency/reward/marketing/calls)
trig = 0.0
import regex as re2
from .rules import SPAM_URGENCY, SPAM_REWARD, SPAM_CALLS, SPAM_MARKETING
if any(re2.search(p, f"{subj} {txt}") for p in SPAM_URGENCY):
trig += 1.25; reasons.append("urgency_markers")
if any(re2.search(p, f"{subj} {txt}") for p in SPAM_REWARD):
trig += 1.5; reasons.append("reward_claims")
if any(re2.search(p, f"{subj} {txt}") for p in SPAM_CALLS):
trig += 1.0; reasons.append("clickbait_calls")
if any(re2.search(p, f"{subj} {txt}") for p in SPAM_MARKETING):
trig += 0.75; reasons.append("marketing_phrases")
if trig > 0:
score -= min(6.0, trig)
# lexicon (LLM counts only; no profanity lists)
if llm_counts:
penal = (llm_counts.get("A",0)*SPAM_TIER_WEIGHTS["A"] +
llm_counts.get("B",0)*SPAM_TIER_WEIGHTS["B"] +
llm_counts.get("C",0)*SPAM_TIER_WEIGHTS["C"])
penal = min(penal, 3.0) # cap lexicon penalty
if penal > 0:
score -= penal; reasons.append(f"lexicon_penalty={penal:.2f}")
# optional HTML heuristic
if html_ratio_bad:
score -= 2; reasons.append("low_text_image_ratio")
# Additional rule for consistency: too many links or URLs
url_count = len(re.findall(r"https?://", f"{subj} {txt}"))
if url_count > 3:
score -= 1; reasons.append(f"too_many_urls={url_count}")
# If multiple high-risk indicators present, cap at <=6 even before LLM
if any(r in reasons for r in ("reward_claims","clickbait_calls","urgency_markers")) and (subj.isupper() or txt.count("!") >= 2):
score = min(score, 6.0)
return round(max(0.0, min(10.0, score)), 2), reasons
# ------------ personalization scorer ------------
def _score_personalization(subject: str, body: str, cues, too_intrusive: bool) -> Tuple[float, list]:
count = len(cues)
relevant = sum(1 for c in cues if c.get("relevant"))
# degree curve: medium best (research).
if count == 0: base = 3
elif count == 1: base = 6 if relevant else 5
elif count == 2: base = 9 if relevant>=1 else 7
else: base = 6 if not too_intrusive else 5
subj_bonus = 1 if any(c.get("relevant") and c.get("text","") in (subject or "") for c in cues) else 0
score = max(0, min(10, base + subj_bonus))
reasons = [f"cues={count}", f"relevant={relevant}"] + (["too_intrusive"] if too_intrusive else [])
return score, reasons
GREETINGS = [r"(?i)^(hi|hello|good (morning|afternoon|evening)|dear)\b"]
SIGNOFFS = [r"(?i)\b(regards|best|sincerely|thanks|thank you)\b"]
# ------------ tone scorer ------------
def _score_tone(subject: str, body: str, flags: Dict) -> Tuple[float, list]:
# Base below 10 so bonuses/penalties move meaningfully; audience-aware adjustment later
score = 8.0; reasons=[]
if any(re.search(p, body or "") for p in GREETINGS): score += 0.5; reasons.append("greeting")
if any(re.search(p, body or "") for p in SIGNOFFS): score += 0.5; reasons.append("signoff")
if (subject or "").isupper(): score -= 2; reasons.append("ALL_CAPS_subject")
subj_ex = (subject or "").count("!")
tot_ex = subj_ex + (body or "").count("!")
if subj_ex > 1: score -= 1; reasons.append("exclam>1_subject")
if tot_ex > 2: score -= 1; reasons.append("exclam_total>2")
# emojis (simple heuristic)
emojis = re.findall(r"[\p{Emoji}]", f"{subject or ''} {body or ''}")
if len(emojis) > 1:
score -= (len(emojis)-1); reasons.append(f"emoji_extra={len(emojis)-1}")
# LLM flags
if flags.get("too_aggressive"): score -= 1.5; reasons.append("too_aggressive")
if flags.get("overly_casual_for_b2b"): score -= 0.75; reasons.append("overly_casual_for_b2b")
if flags.get("passive_aggressive_markers"): score -= 0.5; reasons.append("passive_aggressive_markers")
# Regex-based hostile/passive-aggressive detection
if any(re.search(p, body or "") for p in HOSTILE):
score -= 3.0; reasons.append("hostile_language")
if any(re.search(p, body or "") for p in PASSIVE_AGGRESSIVE):
score -= 1.0; reasons.append("passive_aggressive_phrasing")
# Additional rule: polite markers bonus
polite_count = len(re.findall(r"(?i)\b(please|thank you|thanks|appreciate)\b", body or ""))
if polite_count > 0:
score += min(0.25 * polite_count, 0.75); reasons.append(f"polite_markers={polite_count}")
# Audience-aware adjustment: infer class, then prefer professional tone for business-like classes
lower_body = (body or "").lower()
is_family_like = any(k in lower_body for k in ("mom", "dad", "brother", "sister", "family", "love you"))
if not is_family_like:
# expect professional tone; penalize excessive informality/hostility further
if any(re.search(p, body or "") for p in PASSIVE_AGGRESSIVE):
score -= 0.5
if any(re.search(p, body or "") for p in HOSTILE):
score -= 0.5
# Prevent saturation when negative markers present
if any(t in reasons for t in ("too_aggressive","overly_casual_for_b2b","passive_aggressive_phrasing","hostile_language")):
score = min(score, 9.0)
return round(max(0.0, min(10.0, score)), 2), reasons
# ------------------ main API ------------------
def evaluate(subject: str, body: str, engine: str = "openai", weights: Dict[str, float] = None) -> Dict[str, Any]:
subject, body = subject or "", body or ""
engine = engine or "openai"
W = _normalize_weights(weights or DEFAULT_WEIGHT_PRESETS["research_defaults"])
# class for length
klass = _infer_class(subject, body)
# 1) clarity (LLM-based)
try:
c_score, c_details = clarity_score(subject, body, engine)
c_reasons = [f"ask_signals={len(c_details['llm'].get('ask_signals', []))}", f"subject_useful={c_details['llm'].get('subject_useful', False)}", f"intro_clear={c_details['llm'].get('intro_clear', False)}", "source=llm"]
except Exception as e:
logging.error(f"Clarity failed: {e}")
c_score, c_reasons = 0.0, ["llm_failed"]
c_usage = c_details.get("usage", {}) if 'c_details' in locals() else {}
# 2) length
l_score, l_reasons = _score_length(subject, body, klass)
# 3) spam (LLM counts + heuristics)
try:
sc_counts, sc_usage = spam_counts(subject, body, engine=engine)
except Exception as e:
logging.error(f"Spam counts failed: {e}")
sc_counts, sc_usage = {"A":0,"B":0,"C":0}, {}
s_score, s_reasons = _score_spam(subject, body, llm_counts=sc_counts, html_ratio_bad=False)
# 4) personalization (LLM cues + deterministic curve)
try:
p_flags, p_usage = personalization_flags(subject, body, engine=engine)
if not isinstance(p_flags, dict): p_flags = {"cues": [], "too_intrusive": False}
except Exception as e:
logging.error(f"Personalization flags failed: {e}")
p_flags, p_usage = {"cues": [], "too_intrusive": False}, {}
p_score, p_reasons = _score_personalization(subject, body, p_flags.get("cues", []), bool(p_flags.get("too_intrusive", False)))
# 5) tone (LLM flags + deterministic math)
try:
t_flags, t_usage = tone_flags(subject, body, engine=engine)
if not isinstance(t_flags, dict):
t_flags = {"too_aggressive": False, "overly_casual_for_b2b": False, "passive_aggressive_markers": []}
except Exception as e:
logging.error(f"Tone flags failed: {e}")
t_flags, t_usage = {"too_aggressive": False, "overly_casual_for_b2b": False, "passive_aggressive_markers": []}, {}
t_score, t_reasons = _score_tone(subject, body, t_flags)
# 6) grammar (LLM-based)
try:
g_score, g_reasons, g_usage = grammar_score(subject, body, engine)
except Exception as e:
logging.error(f"Grammar failed: {e}")
g_score, g_reasons, g_usage = 8.0, ["llm_failed"], {}
# aggregate
scores = {
"clarity": float(round(c_score, 2)),
"length": float(round(l_score, 2)),
"spam_score": float(round(s_score, 2)),
"personalization": float(round(p_score, 2)),
"tone": float(round(t_score, 2)),
"grammatical_hygiene": float(round(g_score, 2)),
}
denom = sum(W.get(k, 0.0) for k in metric_keys()) or 1.0
weighted_total = float(round(max(0.0, min(10.0, sum(W[k]*scores[k] for k in metric_keys())/denom)), 2))
explanations = {
"clarity": c_reasons,
"length": l_reasons,
"spam_score": s_reasons,
"personalization": p_reasons,
"tone": t_reasons,
"grammatical_hygiene": g_reasons,
}
# usage (only for LLM-backed features that actually ran)
def _u(x): return _safe_sum_usage(x)
usage = {"openai_total": 0, "claude_total": 0, "total": 0}
if engine == "openai":
usage["openai_total"] = _u(sc_usage) + _u(p_usage) + _u(t_usage) + _u(c_usage) + _u(g_usage)
else:
usage["claude_total"] = _u(sc_usage) + _u(p_usage) + _u(t_usage) + _u(c_usage) + _u(g_usage)
usage["total"] = usage["openai_total"] + usage["claude_total"]
# subjective LLM comments
try:
comm_data, comm_usage = subjective_comments(subject, body, scores, explanations, engine=engine)
except Exception as e:
logging.error(f"Subjective comments failed: {e}")
comm_data, comm_usage = {}, {}
if engine == "openai":
usage["openai_total"] += _u(comm_usage)
else:
usage["claude_total"] += _u(comm_usage)
usage["total"] += _u(comm_usage)
return {
"class": klass,
"scores": scores,
"weighted_total": weighted_total,
"explanations": explanations,
"comments": comm_data,
"usage": usage,
"meta": {"engine": engine, "weights": W, "version": "2.3"},
}