ai-seo-analyzer / optimizer.py
lsdf's picture
Apply local-first accumulation for chunk-level optimizer progress.
f65551f
raw
history blame
65.9 kB
import json
import re
from itertools import combinations
from typing import Any, Dict, List, Optional, Tuple
import requests
import logic
import nlp_processor
import semantic_graph
STOP_WORDS = {
"en": {"a", "an", "and", "or", "the", "to", "of", "for", "in", "on", "at", "by", "with", "from", "as", "is", "are", "be", "was", "were"},
"ru": {"и", "или", "в", "во", "на", "по", "с", "со", "к", "ко", "для", "из", "за", "что", "это", "как", "а", "но", "у", "о", "от"},
"de": {"und", "oder", "der", "die", "das", "zu", "von", "mit", "fur", "in", "auf", "ist", "sind"},
"es": {"y", "o", "el", "la", "los", "las", "de", "del", "en", "con", "para", "por", "es", "son"},
"it": {"e", "o", "il", "lo", "la", "i", "gli", "le", "di", "del", "in", "con", "per", "da", "e", "sono"},
"pl": {"i", "oraz", "lub", "w", "na", "z", "ze", "do", "od", "po", "dla", "to", "jest", "sa"},
"pt": {"e", "ou", "o", "a", "os", "as", "de", "do", "da", "em", "no", "na", "com", "para", "por", "e", "sao"},
}
BERT_TARGET_THRESHOLD = 0.7
BERT_GOAL_DELTA_MIN = 0.005
SEMANTIC_GAP_TOLERANCE_PCT = 0.15
SEMANTIC_GAP_MIN_ABS = 3.0
def _tokenize(text: str) -> List[str]:
return [
x
for x in re.sub(r"[^\w\s-]+", " ", (text or "").lower(), flags=re.UNICODE).split()
if len(x) >= 2
]
def _filter_stopwords(tokens: List[str], language: str) -> List[str]:
stop = STOP_WORDS.get(language, STOP_WORDS["en"])
return [t for t in tokens if t not in stop]
def _split_sentences(text: str) -> List[str]:
text = (text or "").strip()
if not text:
return []
parts = re.split(r"(?<=[\.\!\?])\s+", text)
parts = [p.strip() for p in parts if p.strip()]
if len(parts) <= 1:
parts = [p.strip() for p in re.split(r"\n+", text) if p.strip()]
return parts
def _max_sentences_for_level(cascade_level: int, operation: str) -> int:
if operation == "insert":
return 2
if cascade_level <= 1:
return 2
if cascade_level == 2:
return 3
return 4
def _validate_candidate_text(edited_text: str, cascade_level: int, operation: str) -> List[str]:
reasons: List[str] = []
text = (edited_text or "").strip()
if not text:
reasons.append("empty_candidate")
return reasons
sentence_count = len(_split_sentences(text))
max_sent = _max_sentences_for_level(cascade_level, operation)
if sentence_count > max_sent:
reasons.append(f"too_many_sentences>{max_sent}")
# Heuristic quality checks: duplicated words/entities and obvious malformed token joins.
if re.search(r"\b([A-Za-z][A-Za-z0-9-]{1,})\s+\1\b", text, flags=re.IGNORECASE):
reasons.append("duplicated_entity_or_word")
# Catch broken lowercase+Camel join artifacts like "likemBit", but allow brand CamelCase like "RedDogCasino".
if re.search(r"\b[a-z]{6,}[A-Z][a-z]+\b", text):
reasons.append("suspicious_token_join")
return reasons
def _build_analysis_snapshot(
target_text: str,
competitors: List[str],
keywords: List[str],
language: str,
target_title: str,
competitor_titles: List[str],
) -> Dict[str, Any]:
wc_target = logic.count_words(target_text, language)
wc_comp = [logic.count_words(t, language) for t in competitors]
if wc_comp:
avg_total = sum(c["total"] for c in wc_comp) / len(wc_comp)
avg_sig = sum(c["significant"] for c in wc_comp) / len(wc_comp)
else:
avg_total = 0
avg_sig = 0
ngram_stats = logic.calculate_ngram_stats(target_text, competitors, language)
key_phrases, _ = logic.parse_keywords(keywords, language)
bm25 = logic.calculate_bm25_recommendations(target_text, competitors, keywords, language)
bert = logic.perform_bert_analysis(target_text, competitors, key_phrases, language)
title_data = {}
if (target_title or "").strip():
title_data = logic.analyze_title(target_title, competitor_titles, keywords, language)
return {
"word_counts": {
"target": wc_target,
"competitors": wc_comp,
"avg": {"total": round(avg_total), "significant": round(avg_sig)},
},
"ngram_stats": ngram_stats,
"bm25_recommendations": bm25,
"bert_analysis": bert,
"title_analysis": title_data,
}
def _build_semantic_snapshot(
target_text: str,
competitors: List[str],
language: str,
) -> Dict[str, Any]:
def _build_doc(text: str, doc_id: int) -> Dict[str, Any]:
sentences_data = nlp_processor.preprocess_text(text, language)
graph, word_weights = semantic_graph.build_semantic_graph(sentences_data, lang=language)
graph_data = semantic_graph.get_graph_data_for_frontend(graph)
return {
"id": doc_id,
"text": text,
"word_weights": word_weights,
"stats": {
"nodes": len(graph_data.get("nodes", [])),
"links": len(graph_data.get("links", [])),
},
}
target_doc = _build_doc(target_text, 0)
comp_docs = []
for idx, c in enumerate([x for x in competitors if (x or "").strip()]):
comp_docs.append(_build_doc(c, idx + 1))
num_comp = len(comp_docs)
target_weights = target_doc["word_weights"]
all_terms = set(target_weights.keys())
for c in comp_docs:
all_terms.update(c["word_weights"].keys())
term_power_table = []
for term in all_terms:
target_weight = int(target_weights.get(term, 0))
comp_weights = [int(c["word_weights"].get(term, 0)) for c in comp_docs]
comp_avg = round(sum(comp_weights) / max(1, num_comp), 2)
comp_occ = sum(1 for w in comp_weights if w > 0)
term_power_table.append(
{
"term": term,
"target_weight": target_weight,
"competitor_avg_weight": comp_avg,
"comp_occurrence": comp_occ,
"comp_total": num_comp,
}
)
return {"comparison": {"term_power_table": term_power_table, "num_competitors": num_comp}}
def _is_semantic_gap(target_weight: float, competitor_avg_weight: float) -> bool:
# Gap is significant only when competitor is meaningfully above target:
# - relative margin above tolerance band (15%)
# - and absolute margin to avoid micro-noise around normalized weights
if competitor_avg_weight <= 0:
return False
abs_gap = competitor_avg_weight - target_weight
rel_threshold = target_weight * (1.0 + SEMANTIC_GAP_TOLERANCE_PCT)
return (competitor_avg_weight > rel_threshold) and (abs_gap >= SEMANTIC_GAP_MIN_ABS)
def _compute_metrics(analysis: Dict[str, Any], semantic: Dict[str, Any], keywords: List[str], language: str) -> Dict[str, Any]:
competitor_count = len(analysis.get("word_counts", {}).get("competitors", []))
min_signal = 1 if competitor_count <= 1 else 2
bert_details = analysis.get("bert_analysis", {}).get("detailed", []) or []
bert_low = [d for d in bert_details if float(d.get("my_max_score", 0)) < BERT_TARGET_THRESHOLD]
bert_phrase_scores = {
str(d.get("phrase", "")).strip().lower(): float(d.get("my_max_score", 0) or 0.0)
for d in bert_details
if str(d.get("phrase", "")).strip()
}
bm25_remove = [x for x in (analysis.get("bm25_recommendations") or []) if x.get("action") == "remove"]
bm25_remove_count = len(bm25_remove)
ngram_signal_count = 0
ngrams = analysis.get("ngram_stats", {}) or {}
for bucket_name in ("unigrams", "bigrams"):
for item in (ngrams.get(bucket_name) or []):
comp_occ = int(item.get("comp_occurrence", 0))
if comp_occ < min_signal:
continue
target = float(item.get("target_count", 0))
comp_avg = float(item.get("competitor_avg", 0))
ratio_signal = comp_avg > 0 if target == 0 else comp_avg >= target * 2
if ratio_signal:
ngram_signal_count += 1
title_score = None
title_bert = analysis.get("title_analysis", {}).get("bert", {})
if title_bert and title_bert.get("target_score") is not None:
title_score = float(title_bert.get("target_score", 0))
keyword_terms = set()
for kw in keywords:
tokens = _filter_stopwords(_tokenize(kw), language)
for t in tokens:
keyword_terms.add(t)
for n in (2, 3):
for i in range(0, max(0, len(tokens) - n + 1)):
keyword_terms.add(" ".join(tokens[i : i + n]))
table = semantic.get("comparison", {}).get("term_power_table", []) or []
by_term = {str(r.get("term", "")).lower(): r for r in table}
semantic_gap_count = 0
semantic_gap_sum = 0.0
semantic_gap_terms: List[Dict[str, Any]] = []
for term in keyword_terms:
row = by_term.get(term)
if not row:
continue
target_w = float(row.get("target_weight", 0))
comp_w = float(row.get("competitor_avg_weight", 0))
gap = comp_w - target_w
if int(row.get("comp_occurrence", 0)) >= min_signal and _is_semantic_gap(target_w, comp_w):
semantic_gap_count += 1
semantic_gap_sum += gap
base = max(1.0, target_w)
semantic_gap_terms.append(
{
"term": term,
"target_weight": round(target_w, 2),
"competitor_avg_weight": round(comp_w, 2),
"gap": round(gap, 2),
"gap_pct_of_target": round(gap / base, 4),
"comp_occurrence": int(row.get("comp_occurrence", 0)),
"comp_total": int(row.get("comp_total", 0)),
}
)
# Composite score (0..100)
w_bert, w_bm25, w_ng, w_title, w_sem = 30, 20, 15, 10, 25
bert_comp = 1.0 - (len(bert_low) / max(1, len(bert_details)))
bm25_comp = 1.0 if bm25_remove_count <= 3 else max(0.0, 1.0 - ((bm25_remove_count - 3) / 10.0))
ng_comp = max(0.0, 1.0 - (ngram_signal_count / 15.0))
title_comp = 1.0 if title_score is None else min(1.0, max(0.0, title_score / 0.65))
sem_comp = max(0.0, 1.0 - (semantic_gap_count / 20.0))
weighted = (
w_bert * bert_comp
+ w_bm25 * bm25_comp
+ w_ng * ng_comp
+ w_title * title_comp
+ w_sem * sem_comp
)
total_w = w_bert + w_bm25 + w_ng + w_title + w_sem
score = round((weighted / total_w) * 100.0, 2)
return {
"score": score,
"competitor_count": competitor_count,
"min_competitor_signal": min_signal,
"bert_low_count": len(bert_low),
"bert_total_keywords": len(bert_details),
"bert_phrase_scores": bert_phrase_scores,
"bm25_remove_count": bm25_remove_count,
"ngram_signal_count": ngram_signal_count,
"title_bert_score": title_score,
"semantic_gap_count": semantic_gap_count,
"semantic_gap_sum": round(semantic_gap_sum, 4),
"semantic_gap_terms": sorted(
semantic_gap_terms,
key=lambda x: (x.get("gap", 0), x.get("gap_pct_of_target", 0)),
reverse=True,
)[:20],
}
def _choose_optimization_goal(analysis: Dict[str, Any], semantic: Dict[str, Any], keywords: List[str], language: str) -> Dict[str, Any]:
bert_details = analysis.get("bert_analysis", {}).get("detailed", []) or []
low_bert = [x for x in bert_details if float(x.get("my_max_score", 0)) < BERT_TARGET_THRESHOLD]
if low_bert:
worst = sorted(low_bert, key=lambda x: float(x.get("my_max_score", 0)))[0]
focus_terms = _filter_stopwords(_tokenize(worst.get("phrase", "")), language)[:4]
return {"type": "bert", "label": str(worst.get("phrase", "")), "focus_terms": focus_terms, "avoid_terms": []}
bm25_remove = [x for x in (analysis.get("bm25_recommendations") or []) if x.get("action") == "remove"]
if len(bm25_remove) >= 4:
spam_terms = [str(x.get("word", "")) for x in sorted(bm25_remove, key=lambda r: int(r.get("count", 0)), reverse=True)[:4]]
return {"type": "bm25", "label": "reduce spam", "focus_terms": [], "avoid_terms": spam_terms}
# Semantic keyword gaps
lang_stop = STOP_WORDS.get(language, STOP_WORDS["en"])
keyword_terms = set()
for kw in keywords:
toks = [t for t in _tokenize(kw) if t not in lang_stop]
keyword_terms.update(toks)
for n in (2, 3):
for i in range(0, max(0, len(toks) - n + 1)):
keyword_terms.add(" ".join(toks[i : i + n]))
table = semantic.get("comparison", {}).get("term_power_table", []) or []
candidate_rows: List[Tuple[str, float]] = []
for row in table:
term = str(row.get("term", "")).lower()
if term not in keyword_terms:
continue
target_w = float(row.get("target_weight", 0))
comp_w = float(row.get("competitor_avg_weight", 0))
gap = comp_w - target_w
if _is_semantic_gap(target_w, comp_w):
candidate_rows.append((term, gap))
if candidate_rows:
top_term = sorted(candidate_rows, key=lambda x: x[1], reverse=True)[0][0]
return {"type": "semantic", "label": top_term, "focus_terms": [top_term], "avoid_terms": []}
# Fallback: ngram add signal
for bucket_name in ("unigrams", "bigrams"):
bucket = analysis.get("ngram_stats", {}).get(bucket_name, []) or []
for item in bucket:
target = float(item.get("target_count", 0))
comp_avg = float(item.get("competitor_avg", 0))
if (target == 0 and comp_avg > 0) or (target > 0 and comp_avg >= target * 2):
return {"type": "ngram", "label": str(item.get("ngram", "")), "focus_terms": _tokenize(str(item.get("ngram", "")))[:3], "avoid_terms": []}
return {"type": "none", "label": "no-op", "focus_terms": [], "avoid_terms": []}
def _choose_sentence_idx(sentences: List[str], focus_terms: List[str], avoid_terms: List[str], language: str) -> int:
if not sentences:
return 0
stop = STOP_WORDS.get(language, STOP_WORDS["en"])
focus = [x for x in focus_terms if x and x not in stop]
if avoid_terms:
best_idx, best_score = 0, -1.0
for i, s in enumerate(sentences):
lower = s.lower()
score = sum(lower.count(t.lower()) for t in avoid_terms if t)
if score > best_score:
best_idx, best_score = i, score
return best_idx
if focus:
best_idx, best_score = 0, -1.0
for i, s in enumerate(sentences):
lower = s.lower()
score = sum(lower.count(t.lower()) for t in focus)
if score > best_score:
best_idx, best_score = i, score
return best_idx
return min(2, len(sentences) - 1)
def _rank_sentence_indices(
sentences: List[str],
focus_terms: List[str],
avoid_terms: List[str],
language: str,
goal_type: str = "",
goal_label: str = "",
) -> List[int]:
if not sentences:
return [0]
stop = STOP_WORDS.get(language, STOP_WORDS["en"])
focus = [x for x in focus_terms if x and x not in stop]
avoid = [x for x in avoid_terms if x]
center = (len(sentences) - 1) / 2.0
# For BERT optimization prefer natural prose chunks over list/menu/noisy blocks.
candidate_indices = list(range(len(sentences)))
if goal_type == "bert":
non_noise = [i for i, s in enumerate(sentences) if not _is_noise_like_sentence(s)]
if non_noise:
candidate_indices = non_noise
scored: List[Tuple[int, float, int]] = []
for idx in candidate_indices:
s = sentences[idx]
lower = s.lower()
focus_score = sum(lower.count(t.lower()) for t in focus)
avoid_score = sum(lower.count(t.lower()) for t in avoid)
chunk_rel = _chunk_goal_relevance(s, goal_type, goal_label, focus_terms, language)
noise_penalty = 1.0 if _is_noise_like_sentence(s) else 0.0
# Prefer semantically relevant and lexical matches; push noisy headers/CTA lower.
score = (chunk_rel * 4.0) + (focus_score * 3.0) + (avoid_score * 2.0) - (noise_penalty * 3.0) - (abs(idx - center) * 0.05)
scored.append((idx, score, len(s)))
scored.sort(key=lambda x: (x[1], -x[2]), reverse=True)
ordered = [idx for idx, _, _ in scored]
if not ordered:
ordered = list(range(len(sentences)))
return ordered
def _span_variants_for_level(cascade_level: int) -> List[Tuple[str, int, int]]:
# (operation, left_radius, right_radius)
if cascade_level <= 1:
return [("rewrite", 0, 0), ("rewrite", 0, 1), ("rewrite", 1, 0)]
if cascade_level == 2:
return [("rewrite", 1, 1), ("rewrite", 0, 2), ("rewrite", 2, 0), ("rewrite", 1, 2), ("rewrite", 2, 1)]
if cascade_level == 3:
return [("insert", 0, 0), ("insert", 0, 0), ("insert", 0, 0), ("rewrite", 1, 1)]
return [("rewrite", 2, 2), ("rewrite", 1, 3), ("rewrite", 3, 1), ("rewrite", 2, 3), ("rewrite", 3, 2)]
def _choose_edit_span(
sentences: List[str],
goal: Dict[str, Any],
language: str,
cascade_level: int,
attempt_cursor: int,
) -> Tuple[str, int, int, int, int]:
ranked = _rank_sentence_indices(
sentences,
goal.get("focus_terms", []) or [],
goal.get("avoid_terms", []) or [],
language,
str(goal.get("type", "") or ""),
str(goal.get("label", "") or ""),
)
variants = _span_variants_for_level(cascade_level)
total = max(1, len(ranked) * len(variants))
pick = attempt_cursor % total
sent_pick = pick % len(ranked)
variant_pick = (pick // len(ranked)) % len(variants)
sent_idx = ranked[sent_pick]
operation, left_radius, right_radius = variants[variant_pick]
if operation == "insert":
span_start = sent_idx
span_end = sent_idx
else:
span_start = max(0, sent_idx - left_radius)
span_end = min(len(sentences) - 1, sent_idx + right_radius)
return operation, span_start, span_end, sent_idx, variant_pick
def _is_noise_like_sentence(text: str) -> bool:
s = (text or "").strip()
if not s:
return True
lower = s.lower()
tokens = _tokenize(lower)
if len(tokens) <= 2:
return True
if len(tokens) <= 8 and re.search(r"\b(play|explore|best|top|contact|login|signup|casino)\b", lower):
return True
if len(s) <= 90 and re.fullmatch(r"[A-Z0-9\s\-\|\:\.]+", s):
return True
if re.search(r"\b(best|top)\b.{0,20}\b(alternative|alternatives|casino|casinos)\b", lower) and len(tokens) <= 12:
return True
return False
def _chunk_goal_relevance(
text: str,
goal_type: str,
goal_label: str,
focus_terms: List[str],
language: str,
) -> float:
chunk = (text or "").strip()
if not chunk:
return 0.0
if goal_type == "bert" and (goal_label or "").strip():
try:
model = logic.get_bert_model()
embeddings = model.encode([goal_label.strip(), chunk], convert_to_tensor=True)
return float(logic.util.cos_sim(embeddings[0:1], embeddings[1:2])[0][0].item())
except Exception:
pass
# Lexical fallback for non-BERT goals or if embedding scoring is unavailable.
toks = _filter_stopwords(_tokenize(chunk), language)
if not toks:
return 0.0
focus = [f.lower() for f in (focus_terms or []) if f]
if not focus:
return 0.0
overlap = 0.0
token_str = " ".join(toks)
for f in focus:
if " " in f:
overlap += 1.0 if f in token_str else 0.0
else:
overlap += 1.0 if f in toks else 0.0
return overlap / max(1.0, float(len(focus)))
def _chunk_goal_delta(
before_text: str,
after_text: str,
goal_type: str,
goal_label: str,
focus_terms: List[str],
language: str,
) -> float:
before_rel = _chunk_goal_relevance(before_text, goal_type, goal_label, focus_terms, language)
after_rel = _chunk_goal_relevance(after_text, goal_type, goal_label, focus_terms, language)
return round(after_rel - before_rel, 4)
def _min_chunk_delta(goal_type: str) -> float:
if goal_type == "bert":
return 0.01
return 0.05
def _chunk_relevance_pair(
before_text: str,
after_text: str,
goal_type: str,
goal_label: str,
focus_terms: List[str],
language: str,
) -> Tuple[float, float]:
before_rel = _chunk_goal_relevance(before_text, goal_type, goal_label, focus_terms, language)
after_rel = _chunk_goal_relevance(after_text, goal_type, goal_label, focus_terms, language)
return round(before_rel, 4), round(after_rel, 4)
def _term_diff(before_text: str, after_text: str, language: str) -> Dict[str, List[str]]:
before_tokens = _filter_stopwords(_tokenize(before_text), language)
after_tokens = _filter_stopwords(_tokenize(after_text), language)
before_set = set(before_tokens)
after_set = set(after_tokens)
removed = sorted(list(before_set - after_set))[:12]
added = sorted(list(after_set - before_set))[:12]
return {"added_terms": added, "removed_terms": removed}
def _edits_conflict(a: Dict[str, Any], b: Dict[str, Any]) -> bool:
if a.get("operation") == "insert" or b.get("operation") == "insert":
return int(a.get("span_start", 0)) == int(b.get("span_start", 0))
a0, a1 = int(a.get("span_start", 0)), int(a.get("span_end", 0))
b0, b1 = int(b.get("span_start", 0)), int(b.get("span_end", 0))
return not (a1 < b0 or b1 < a0)
def _apply_edits_to_sentences(sentences: List[str], edits: List[Dict[str, Any]]) -> List[str]:
updated = list(sentences)
ordered = sorted(
edits,
key=lambda e: (int(e.get("span_start", 0)), int(e.get("span_end", 0))),
reverse=True,
)
for edit in ordered:
op = str(edit.get("operation", "rewrite"))
start = int(edit.get("span_start", 0))
end = int(edit.get("span_end", start))
text = str(edit.get("edited_text", "")).strip()
if not text:
continue
if op == "insert":
updated = _insert_after(updated, end, text)
else:
updated = _replace_span(updated, start, end, text)
return updated
def _metrics_delta(prev_metrics: Dict[str, Any], next_metrics: Dict[str, Any]) -> Dict[str, Any]:
keys = [
"score",
"bert_low_count",
"bm25_remove_count",
"ngram_signal_count",
"semantic_gap_count",
"semantic_gap_sum",
]
out: Dict[str, Any] = {}
for k in keys:
pv = prev_metrics.get(k)
nv = next_metrics.get(k)
if pv is None or nv is None:
continue
out[k] = round(float(nv) - float(pv), 4)
pv_t = prev_metrics.get("title_bert_score")
nv_t = next_metrics.get("title_bert_score")
if pv_t is not None and nv_t is not None:
out["title_bert_score"] = round(float(nv_t) - float(pv_t), 4)
prev_terms = {str(x.get("term", "")).lower() for x in (prev_metrics.get("semantic_gap_terms") or []) if x.get("term")}
next_terms = {str(x.get("term", "")).lower() for x in (next_metrics.get("semantic_gap_terms") or []) if x.get("term")}
if prev_terms or next_terms:
out["semantic_gap_terms_added"] = sorted(list(next_terms - prev_terms))[:12]
out["semantic_gap_terms_removed"] = sorted(list(prev_terms - next_terms))[:12]
return out
def _non_conflicting_edit_combos(candidates: List[Dict[str, Any]], min_size: int = 2, max_size: int = 4) -> List[List[Dict[str, Any]]]:
if not candidates:
return []
n = len(candidates)
combos: List[List[Dict[str, Any]]] = []
for r in range(max(2, min_size), min(max_size, n) + 1):
for idxs in combinations(range(n), r):
combo = [candidates[i] for i in idxs]
conflict = False
for i in range(len(combo)):
for j in range(i + 1, len(combo)):
e1 = combo[i].get("edit_payload")
e2 = combo[j].get("edit_payload")
if not e1 or not e2:
conflict = True
break
if _edits_conflict(e1, e2):
conflict = True
break
if conflict:
break
if not conflict:
combos.append(combo)
return combos
def _extract_json_object(text: str) -> Optional[Dict[str, Any]]:
raw = (text or "").strip()
if not raw:
return None
try:
return json.loads(raw)
except Exception:
pass
m = re.search(r"\{[\s\S]*\}", raw)
if not m:
return None
try:
return json.loads(m.group(0))
except Exception:
return None
def _llm_edit_chunk(
*,
api_key: str,
base_url: str,
model: str,
language: str,
full_text: str,
chunk_text: str,
operation: str,
context_before: str,
context_after: str,
cascade_level: int,
goal_type: str,
goal_label: str,
focus_terms: List[str],
avoid_terms: List[str],
temperature: float,
) -> Dict[str, Any]:
endpoint = base_url.rstrip("/") + "/chat/completions"
op = operation if operation in {"rewrite", "insert"} else "rewrite"
system_msg = (
"You are an SEO copy editor. Work locally, preserve narrative flow, factual tone, and language. "
"Return strict JSON only: {\"edited_text\": \"...\"}. "
"Do not rewrite the whole text. Never change topic or introduce unrelated entities."
)
op_instruction = (
"Rewrite the provided chunk only."
if op == "rewrite"
else "Create a short bridge chunk (1-2 sentences) to insert after the chunk."
)
max_sent = _max_sentences_for_level(cascade_level, op)
user_msg = (
f"Language: {language}\n"
f"Operation: {op}\n"
f"Cascade level: L{cascade_level}\n"
f"Goal: {goal_type} ({goal_label})\n"
f"Instruction: {op_instruction}\n"
f"Must preserve overall narrative and style.\n"
"Text must be grammatically correct and natural for native readers.\n"
"Keep edits tightly local to the provided chunk and immediate context only.\n"
"Edit must be substantive (not just synonyms) and should increase relevance to the goal phrase.\n"
f"Focus terms to strengthen: {', '.join(focus_terms) if focus_terms else '-'}\n"
f"Terms to de-emphasize/avoid overuse: {', '.join(avoid_terms) if avoid_terms else '-'}\n\n"
f"Chunk to edit/expand:\n{chunk_text}\n\n"
f"Prev context:\n{context_before}\n\n"
f"Next context:\n{context_after}\n\n"
"Constraints:\n"
"1) Keep text concise and locally coherent.\n"
"2) Keep local coherence with surrounding text.\n"
f"3) Max {max_sent} sentence(s) in edited_text.\n"
"4) Keep key named entities from the original chunk unchanged when possible.\n"
"5) For BERT goal, improve semantic match to goal phrase without keyword stuffing.\n"
"6) Only output JSON object."
)
payload = {
"model": model,
"temperature": float(max(0.0, min(1.2, temperature))),
"messages": [
{"role": "system", "content": system_msg},
{"role": "user", "content": user_msg},
],
"response_format": {"type": "json_object"},
}
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
response = requests.post(endpoint, headers=headers, json=payload, timeout=60)
response.raise_for_status()
data = response.json()
content = (
data.get("choices", [{}])[0]
.get("message", {})
.get("content", "")
)
parsed = _extract_json_object(content)
edited = ""
if parsed:
edited = str(parsed.get("edited_text") or parsed.get("revised_sentence") or parsed.get("rewrite") or "").strip()
if not edited:
raise ValueError("LLM returned invalid JSON edit payload.")
return {
"edited_text": edited,
"prompt_debug": {
"operation": op,
"cascade_level": cascade_level,
"goal_type": goal_type,
"goal_label": goal_label,
"focus_terms": focus_terms,
"avoid_terms": avoid_terms,
"max_sentences": max_sent,
"chunk_text": chunk_text,
"context_before": context_before,
"context_after": context_after,
"temperature": float(max(0.0, min(1.2, temperature))),
},
}
def _replace_span(sentences: List[str], start_idx: int, end_idx: int, replacement_text: str) -> List[str]:
replacement = _split_sentences(replacement_text)
if not replacement:
replacement = [replacement_text.strip()]
return sentences[:start_idx] + replacement + sentences[end_idx + 1 :]
def _insert_after(sentences: List[str], after_idx: int, inserted_text: str) -> List[str]:
insertion = _split_sentences(inserted_text)
if not insertion:
insertion = [inserted_text.strip()]
return sentences[: after_idx + 1] + insertion + sentences[after_idx + 1 :]
def _goal_improved(
goal_type: str,
goal_label: str,
prev_metrics: Dict[str, Any],
next_metrics: Dict[str, Any],
) -> bool:
if goal_type == "bert":
key = (goal_label or "").strip().lower()
prev_phrase = float((prev_metrics.get("bert_phrase_scores") or {}).get(key, 0.0))
next_phrase = float((next_metrics.get("bert_phrase_scores") or {}).get(key, 0.0))
# Accept smaller but real phrase-level gains to accumulate progress toward threshold (e.g. 0.51 -> 0.70).
return (next_phrase - prev_phrase) >= BERT_GOAL_DELTA_MIN or next_metrics["bert_low_count"] < prev_metrics["bert_low_count"]
if goal_type == "bm25":
return next_metrics["bm25_remove_count"] < prev_metrics["bm25_remove_count"]
if goal_type == "semantic":
return next_metrics["semantic_gap_count"] < prev_metrics["semantic_gap_count"]
if goal_type == "ngram":
return next_metrics["ngram_signal_count"] < prev_metrics["ngram_signal_count"]
return next_metrics["score"] > prev_metrics["score"]
def _bert_phrase_delta(goal_label: str, prev_metrics: Dict[str, Any], next_metrics: Dict[str, Any]) -> float:
key = (goal_label or "").strip().lower()
prev_phrase = float((prev_metrics.get("bert_phrase_scores") or {}).get(key, 0.0))
next_phrase = float((next_metrics.get("bert_phrase_scores") or {}).get(key, 0.0))
return round(next_phrase - prev_phrase, 4)
def _is_candidate_valid(
prev_metrics: Dict[str, Any],
next_metrics: Dict[str, Any],
goal_type: str,
goal_label: str,
optimization_mode: str,
) -> Tuple[bool, List[str], bool]:
mode = (optimization_mode or "balanced").lower()
if mode not in {"conservative", "balanced", "aggressive"}:
mode = "balanced"
cfg = {
"conservative": {"max_score_drop": 0.0, "max_title_drop": 0.02},
"balanced": {"max_score_drop": 1.0, "max_title_drop": 0.03},
"aggressive": {"max_score_drop": 2.0, "max_title_drop": 0.05},
}[mode]
reasons = []
score_drop = float(prev_metrics["score"]) - float(next_metrics["score"])
if score_drop > cfg["max_score_drop"]:
reasons.append(f"score_drop>{cfg['max_score_drop']}")
# Hard regressions in critical counters.
if next_metrics["bm25_remove_count"] > prev_metrics["bm25_remove_count"] + (1 if mode == "aggressive" else 0):
reasons.append("bm25_remove_regression")
if next_metrics["bert_low_count"] > prev_metrics["bert_low_count"] + (1 if mode == "aggressive" else 0):
reasons.append("bert_low_regression")
if next_metrics["semantic_gap_count"] > prev_metrics["semantic_gap_count"] + (1 if mode == "aggressive" else 0):
reasons.append("semantic_gap_regression")
prev_title = prev_metrics.get("title_bert_score")
next_title = next_metrics.get("title_bert_score")
if prev_title is not None and next_title is not None and next_title < (prev_title - cfg["max_title_drop"]):
reasons.append("title_bert_drop")
improved = _goal_improved(goal_type, goal_label, prev_metrics, next_metrics)
# In conservative mode require explicit goal improvement.
if mode == "conservative" and not improved:
reasons.append("goal_not_improved")
return (len(reasons) == 0), reasons, improved
def optimize_text(request_data: Dict[str, Any]) -> Dict[str, Any]:
target_text = str(request_data.get("target_text", "")).strip()
competitors = [str(x) for x in (request_data.get("competitors") or []) if str(x).strip()]
keywords = [str(x) for x in (request_data.get("keywords") or []) if str(x).strip()]
language = str(request_data.get("language", "en")).strip() or "en"
target_title = str(request_data.get("target_title", "") or "")
competitor_titles = [str(x) for x in (request_data.get("competitor_titles") or [])]
api_key = str(request_data.get("api_key", "")).strip()
if not api_key:
raise ValueError("API key is required.")
base_url = str(request_data.get("api_base_url", "https://api.deepseek.com/v1")).strip() or "https://api.deepseek.com/v1"
model = str(request_data.get("model", "deepseek-chat")).strip() or "deepseek-chat"
max_iterations = int(request_data.get("max_iterations", 2) or 2)
max_iterations = max(1, min(8, max_iterations))
candidates_per_iteration = int(request_data.get("candidates_per_iteration", 2) or 2)
candidates_per_iteration = max(1, min(5, candidates_per_iteration))
temperature = float(request_data.get("temperature", 0.25) or 0.25)
optimization_mode = str(request_data.get("optimization_mode", "balanced") or "balanced")
baseline_analysis = _build_analysis_snapshot(
target_text, competitors, keywords, language, target_title, competitor_titles
)
baseline_semantic = _build_semantic_snapshot(target_text, competitors, language)
baseline_metrics = _compute_metrics(baseline_analysis, baseline_semantic, keywords, language)
current_text = target_text
current_analysis = baseline_analysis
current_semantic = baseline_semantic
current_metrics = baseline_metrics
logs: List[Dict[str, Any]] = []
applied_changes = 0
seen_candidate_rewrites = set()
cascade_level = 1
consecutive_failures = 0
goal_attempt_cursor: Dict[str, int] = {}
attempted_spans = set()
queued_candidates: List[Dict[str, Any]] = []
for step in range(max_iterations):
goal = _choose_optimization_goal(current_analysis, current_semantic, keywords, language)
if goal["type"] == "none":
logs.append({"step": step + 1, "status": "stopped", "reason": "No optimization goals left."})
break
sentences = _split_sentences(current_text)
if not sentences:
logs.append({"step": step + 1, "status": "stopped", "reason": "No sentences available for editing."})
break
goal_key = f"{goal.get('type', '')}:{goal.get('label', '')}".strip().lower()
base_attempt_cursor = int(goal_attempt_cursor.get(goal_key, 0))
span_trials = 2 if cascade_level <= 2 else 3
local_candidates = candidates_per_iteration if cascade_level <= 2 else min(6, candidates_per_iteration + 1)
candidates: List[Dict[str, Any]] = []
chosen_spans: List[Dict[str, Any]] = []
candidate_idx = 0
for st in range(span_trials):
attempt_cursor = base_attempt_cursor + st
operation, span_start, span_end, sent_idx, span_variant = _choose_edit_span(
sentences, goal, language, cascade_level, attempt_cursor
)
max_span_retries = max(1, len(sentences) * 4)
retries = 0
while retries < max_span_retries:
span_key = (goal_key, cascade_level, operation, span_start, span_end)
if span_key not in attempted_spans:
attempted_spans.add(span_key)
break
attempt_cursor += 1
operation, span_start, span_end, sent_idx, span_variant = _choose_edit_span(
sentences, goal, language, cascade_level, attempt_cursor
)
retries += 1
original_span_text = " ".join(sentences[span_start : span_end + 1]).strip()
context_before = " ".join(sentences[max(0, span_start - 2) : span_start]).strip()
context_after = " ".join(sentences[span_end + 1 : min(len(sentences), span_end + 3)]).strip()
chosen_spans.append(
{
"operation": operation,
"span_start": span_start,
"span_end": span_end,
"sentence_index": sent_idx,
"span_variant": span_variant,
"sentence_before": original_span_text,
}
)
per_span_candidates = max(1, local_candidates // span_trials)
for ci in range(per_span_candidates):
candidate_idx += 1
temp = min(1.1, max(0.0, temperature + (candidate_idx - 1) * 0.07))
try:
llm_result = _llm_edit_chunk(
api_key=api_key,
base_url=base_url,
model=model,
language=language,
full_text=current_text,
chunk_text=original_span_text,
operation=operation,
context_before=context_before,
context_after=context_after,
cascade_level=cascade_level,
goal_type=goal["type"],
goal_label=goal["label"],
focus_terms=goal["focus_terms"],
avoid_terms=goal["avoid_terms"],
temperature=temp,
)
edited_text = str((llm_result or {}).get("edited_text", "")).strip()
prompt_debug = (llm_result or {}).get("prompt_debug", {})
if not edited_text or edited_text == original_span_text:
continue
quality_issues = _validate_candidate_text(edited_text, cascade_level, operation)
before_rel, after_rel = _chunk_relevance_pair(
original_span_text,
edited_text,
goal["type"],
goal["label"],
goal.get("focus_terms", []) or [],
language,
)
chunk_delta = _chunk_goal_delta(
original_span_text,
edited_text,
goal["type"],
goal["label"],
goal.get("focus_terms", []) or [],
language,
)
local_chunk_improved = chunk_delta >= _min_chunk_delta(goal["type"])
if quality_issues:
candidates.append(
{
"candidate_index": candidate_idx,
"error": "quality_validation_failed",
"valid": False,
"goal_improved": False,
"local_chunk_improved": local_chunk_improved,
"chunk_goal_delta": chunk_delta,
"invalid_reasons": quality_issues,
"delta_score": -999.0,
"candidate_score": None,
"sentence_after": edited_text,
"chunk_relevance_before": before_rel,
"chunk_relevance_after": after_rel,
"term_diff": _term_diff(original_span_text, edited_text, language),
"llm_prompt_debug": prompt_debug,
"operation": operation,
"sentence_index": sent_idx,
"span_start": span_start,
"span_end": span_end,
"span_variant": span_variant,
"sentence_before": original_span_text,
}
)
continue
candidate_key = (operation, span_start, span_end, edited_text.strip().lower())
if candidate_key in seen_candidate_rewrites:
candidates.append(
{
"candidate_index": candidate_idx,
"error": "duplicate_candidate_rewrite",
"valid": False,
"goal_improved": False,
"local_chunk_improved": local_chunk_improved,
"chunk_goal_delta": chunk_delta,
"invalid_reasons": ["duplicate_candidate_rewrite"],
"delta_score": -999.0,
"candidate_score": None,
"chunk_relevance_before": before_rel,
"chunk_relevance_after": after_rel,
"term_diff": _term_diff(original_span_text, edited_text, language),
"llm_prompt_debug": prompt_debug,
"operation": operation,
"sentence_index": sent_idx,
"span_start": span_start,
"span_end": span_end,
"span_variant": span_variant,
"sentence_before": original_span_text,
}
)
continue
seen_candidate_rewrites.add(candidate_key)
if operation == "insert":
candidate_sentences = _insert_after(sentences, span_end, edited_text)
else:
candidate_sentences = _replace_span(sentences, span_start, span_end, edited_text)
candidate_text = " ".join(candidate_sentences).strip()
cand_analysis = _build_analysis_snapshot(
candidate_text, competitors, keywords, language, target_title, competitor_titles
)
cand_semantic = _build_semantic_snapshot(candidate_text, competitors, language)
cand_metrics = _compute_metrics(cand_analysis, cand_semantic, keywords, language)
valid, invalid_reasons, goal_improved = _is_candidate_valid(
current_metrics, cand_metrics, goal["type"], goal["label"], optimization_mode
)
delta_score = round(cand_metrics["score"] - current_metrics["score"], 3)
bert_phrase_delta = _bert_phrase_delta(goal["label"], current_metrics, cand_metrics) if goal.get("type") == "bert" else 0.0
md = _metrics_delta(current_metrics, cand_metrics)
candidates.append(
{
"candidate_index": candidate_idx,
"sentence_before": original_span_text,
"sentence_after": edited_text,
"operation": operation,
"sentence_index": sent_idx,
"span_start": span_start,
"span_end": span_end,
"span_variant": span_variant,
"text": candidate_text,
"analysis": cand_analysis,
"semantic": cand_semantic,
"metrics": cand_metrics,
"valid": valid,
"goal_improved": goal_improved,
"bert_phrase_delta": bert_phrase_delta,
"local_chunk_improved": local_chunk_improved,
"chunk_goal_delta": chunk_delta,
"chunk_relevance_before": before_rel,
"chunk_relevance_after": after_rel,
"term_diff": _term_diff(original_span_text, edited_text, language),
"llm_prompt_debug": prompt_debug,
"invalid_reasons": invalid_reasons,
"delta_score": delta_score,
"candidate_score": cand_metrics.get("score"),
"metrics_delta": md,
"edit_payload": {
"operation": operation,
"span_start": span_start,
"span_end": span_end,
"edited_text": edited_text,
},
}
)
except Exception as e:
candidates.append(
{
"candidate_index": candidate_idx,
"error": str(e),
"valid": False,
"goal_improved": False,
"local_chunk_improved": False,
"chunk_goal_delta": -999.0,
"invalid_reasons": [str(e)],
"delta_score": -999.0,
"candidate_score": None,
"llm_prompt_debug": {
"operation": operation,
"cascade_level": cascade_level,
"goal_type": goal.get("type"),
"goal_label": goal.get("label"),
},
"operation": operation,
"sentence_index": sent_idx,
"span_start": span_start,
"span_end": span_end,
"span_variant": span_variant,
"sentence_before": original_span_text,
}
)
goal_attempt_cursor[goal_key] = base_attempt_cursor + span_trials
primary_span = chosen_spans[0] if chosen_spans else {"operation": "-", "span_start": 0, "span_end": 0, "sentence_index": 0, "span_variant": 0, "sentence_before": ""}
valid_raw_candidates = [c for c in candidates if c.get("valid")]
valid_candidates = [
c
for c in valid_raw_candidates
if c.get("valid")
and (
c.get("goal_improved")
or (goal.get("type") == "bert" and float(c.get("bert_phrase_delta") or 0.0) > 0.0)
or float(c.get("candidate_score") or -1) > float(current_metrics["score"])
)
and (
goal.get("type") != "bert"
or float(c.get("bert_phrase_delta") or 0.0) > 0.0
or c.get("local_chunk_improved")
)
]
if not valid_candidates:
# Local-first accumulation mode:
# if we have guardrail-valid candidates that improve chunk relevance,
# apply the strongest local edit immediately and continue optimizing next chunks.
local_progress_candidates = [
c
for c in valid_raw_candidates
if c.get("local_chunk_improved")
]
if local_progress_candidates:
best_local = sorted(
local_progress_candidates,
key=lambda c: (
float(c.get("chunk_goal_delta") or 0.0),
float(c.get("bert_phrase_delta") or 0.0),
float(c.get("candidate_score") or -999.0),
),
reverse=True,
)[0]
prev_metrics = current_metrics
current_text = best_local["text"]
current_analysis = best_local["analysis"]
current_semantic = best_local["semantic"]
current_metrics = best_local["metrics"]
applied_changes += 1
queued_candidates = []
logs.append(
{
"step": step + 1,
"status": "applied_local_progress",
"goal": goal,
"cascade_level": cascade_level,
"operation": best_local.get("operation"),
"sentence_index": best_local.get("sentence_index"),
"span_start": best_local.get("span_start"),
"span_end": best_local.get("span_end"),
"span_variant": best_local.get("span_variant"),
"sentence_before": best_local.get("sentence_before"),
"sentence_after": best_local.get("sentence_after"),
"reason": "Applied best local-improvement candidate despite no immediate global gain.",
"current_score": prev_metrics.get("score"),
"metrics_before": prev_metrics,
"metrics_after": current_metrics,
"delta_score": round(float(current_metrics.get("score", 0)) - float(prev_metrics.get("score", 0)), 3),
"chosen_candidate_index": best_local.get("candidate_index"),
"chosen_chunk_goal_delta": best_local.get("chunk_goal_delta"),
"chosen_bert_phrase_delta": best_local.get("bert_phrase_delta"),
"chosen_metrics_delta": best_local.get("metrics_delta"),
"candidates": [
{
"candidate_index": c.get("candidate_index"),
"valid": c.get("valid", False),
"goal_improved": c.get("goal_improved", False),
"bert_phrase_delta": c.get("bert_phrase_delta"),
"local_chunk_improved": c.get("local_chunk_improved", False),
"chunk_goal_delta": c.get("chunk_goal_delta"),
"chunk_relevance_before": c.get("chunk_relevance_before"),
"chunk_relevance_after": c.get("chunk_relevance_after"),
"term_diff": c.get("term_diff"),
"llm_prompt_debug": c.get("llm_prompt_debug"),
"metrics_delta": c.get("metrics_delta"),
"invalid_reasons": c.get("invalid_reasons", []),
"delta_score": c.get("delta_score"),
"candidate_score": c.get("candidate_score"),
"sentence_after": c.get("sentence_after"),
"error": c.get("error"),
}
for c in candidates
],
}
)
consecutive_failures = 0
cascade_level = 1
goal_attempt_cursor[goal_key] = base_attempt_cursor + 1
continue
local_pool = [
c
for c in candidates
if c.get("local_chunk_improved")
and c.get("edit_payload")
and c.get("candidate_score") is not None
]
local_pool.sort(
key=lambda c: (
float(c.get("chunk_goal_delta") or -999.0),
float(c.get("candidate_score") or -999.0),
),
reverse=True,
)
for c in local_pool[:4]:
queue_key = (
goal_key,
c.get("operation"),
c.get("span_start"),
c.get("span_end"),
str((c.get("sentence_after") or "")).strip().lower(),
)
if not any(x.get("queue_key") == queue_key for x in queued_candidates):
queued_candidates.append({"queue_key": queue_key, "candidate": c})
batch_applied = False
batch_info: Dict[str, Any] = {}
if len(queued_candidates) >= 2:
pool = [x["candidate"] for x in queued_candidates[:6]]
combos = _non_conflicting_edit_combos(pool, min_size=2, max_size=4)
best_batch: Optional[Dict[str, Any]] = None
prev_metrics = current_metrics
for combo in combos:
edits = [c.get("edit_payload") for c in combo if c.get("edit_payload")]
if len(edits) != len(combo):
continue
batch_sentences = _apply_edits_to_sentences(sentences, edits)
batch_text = " ".join(batch_sentences).strip()
batch_analysis = _build_analysis_snapshot(
batch_text, competitors, keywords, language, target_title, competitor_titles
)
batch_semantic = _build_semantic_snapshot(batch_text, competitors, language)
batch_metrics = _compute_metrics(batch_analysis, batch_semantic, keywords, language)
b_valid, b_reasons, b_goal = _is_candidate_valid(
current_metrics, batch_metrics, goal["type"], goal["label"], optimization_mode
)
b_delta = round(batch_metrics["score"] - current_metrics["score"], 3)
local_sum = sum(float(c.get("chunk_goal_delta") or 0.0) for c in combo)
if not (b_valid and (b_goal or b_delta > 0)):
continue
if goal.get("type") == "bert" and local_sum < (_min_chunk_delta("bert") * len(combo)):
continue
cand = {
"combo": combo,
"batch_text": batch_text,
"batch_analysis": batch_analysis,
"batch_semantic": batch_semantic,
"batch_metrics": batch_metrics,
"b_delta": b_delta,
"local_sum": local_sum,
"b_reasons": b_reasons,
"b_goal": b_goal,
}
if best_batch is None or (
cand["b_delta"],
cand["local_sum"],
len(cand["combo"]),
) > (
best_batch["b_delta"],
best_batch["local_sum"],
len(best_batch["combo"]),
):
best_batch = cand
if best_batch:
current_text = best_batch["batch_text"]
current_analysis = best_batch["batch_analysis"]
current_semantic = best_batch["batch_semantic"]
current_metrics = best_batch["batch_metrics"]
applied_changes += 1
batch_applied = True
batch_info = {
"status": "applied_batch",
"batch_candidate_ids": [c.get("candidate_index") for c in best_batch["combo"]],
"batch_size": len(best_batch["combo"]),
"batch_local_chunk_delta_sum": round(best_batch["local_sum"], 4),
"delta_score": best_batch["b_delta"],
"metrics_before": prev_metrics,
"metrics_after": best_batch["batch_metrics"],
"metrics_delta": _metrics_delta(prev_metrics, best_batch["batch_metrics"]),
}
queued_candidates = []
consecutive_failures = 0
cascade_level = 1
goal_attempt_cursor[goal_key] = 0
if not batch_applied:
batch_info = {
"status": "batch_rejected",
"reason": "Queued local improvements could not pass global constraints together.",
}
if batch_applied:
logs.append(
{
"step": step + 1,
"status": "applied_batch",
"goal": goal,
"cascade_level": cascade_level,
"operation": "batch",
"sentence_index": primary_span.get("sentence_index"),
"span_start": primary_span.get("span_start"),
"span_end": primary_span.get("span_end"),
"span_variant": primary_span.get("span_variant"),
"sentence_before": primary_span.get("sentence_before"),
"current_score": (batch_info.get("metrics_before") or {}).get("score"),
"metrics_before": batch_info.get("metrics_before"),
"metrics_after": current_metrics,
"reason": "Applied queued local-improvement edits as a batch.",
"batch_info": batch_info,
"candidates": [
{
"candidate_index": c.get("candidate_index"),
"valid": c.get("valid", False),
"goal_improved": c.get("goal_improved", False),
"bert_phrase_delta": c.get("bert_phrase_delta"),
"local_chunk_improved": c.get("local_chunk_improved", False),
"chunk_goal_delta": c.get("chunk_goal_delta"),
"chunk_relevance_before": c.get("chunk_relevance_before"),
"chunk_relevance_after": c.get("chunk_relevance_after"),
"term_diff": c.get("term_diff"),
"llm_prompt_debug": c.get("llm_prompt_debug"),
"metrics_delta": c.get("metrics_delta"),
"invalid_reasons": c.get("invalid_reasons", []),
"delta_score": c.get("delta_score"),
"candidate_score": c.get("candidate_score"),
"sentence_after": c.get("sentence_after"),
"error": c.get("error"),
}
for c in candidates
],
}
)
continue
logs.append(
{
"step": step + 1,
"status": "rejected",
"goal": goal,
"cascade_level": cascade_level,
"operation": primary_span.get("operation"),
"sentence_index": primary_span.get("sentence_index"),
"span_start": primary_span.get("span_start"),
"span_end": primary_span.get("span_end"),
"span_variant": primary_span.get("span_variant"),
"sentence_before": primary_span.get("sentence_before"),
"current_score": current_metrics["score"],
"reason": (
"No valid candidate satisfied constraints."
if not valid_raw_candidates
else "Valid candidates existed but none improved goal or total score."
),
"valid_candidates_count": len(valid_raw_candidates),
"promotable_candidates_count": len(valid_candidates),
"queued_local_candidates": len(local_pool),
"queued_total": len(queued_candidates),
"batch_info": batch_info if batch_info else None,
"candidates": [
{
"candidate_index": c.get("candidate_index"),
"valid": c.get("valid", False),
"goal_improved": c.get("goal_improved", False),
"bert_phrase_delta": c.get("bert_phrase_delta"),
"local_chunk_improved": c.get("local_chunk_improved", False),
"chunk_goal_delta": c.get("chunk_goal_delta"),
"chunk_relevance_before": c.get("chunk_relevance_before"),
"chunk_relevance_after": c.get("chunk_relevance_after"),
"term_diff": c.get("term_diff"),
"llm_prompt_debug": c.get("llm_prompt_debug"),
"metrics_delta": c.get("metrics_delta"),
"invalid_reasons": c.get("invalid_reasons", []),
"delta_score": c.get("delta_score"),
"candidate_score": c.get("candidate_score"),
"sentence_after": c.get("sentence_after"),
"error": c.get("error"),
}
for c in candidates
],
}
)
consecutive_failures += 1
if consecutive_failures >= 2 and cascade_level < 4:
cascade_level += 1
consecutive_failures = 0
logs[-1]["escalated_to_level"] = cascade_level
continue
best = sorted(
valid_candidates,
key=lambda c: (
1 if c.get("goal_improved") else 0,
float(c.get("bert_phrase_delta") or 0.0),
float(c.get("chunk_goal_delta") or 0.0),
c["metrics"]["score"],
),
reverse=True,
)[0]
prev_metrics = current_metrics
current_text = best["text"]
current_analysis = best["analysis"]
current_semantic = best["semantic"]
current_metrics = best["metrics"]
applied_changes += 1
queued_candidates = []
logs.append(
{
"step": step + 1,
"status": "applied",
"goal": goal,
"cascade_level": cascade_level,
"operation": best.get("operation"),
"sentence_index": best.get("sentence_index"),
"span_start": best.get("span_start"),
"span_end": best.get("span_end"),
"span_variant": best.get("span_variant"),
"sentence_before": best.get("sentence_before"),
"sentence_after": best["sentence_after"],
"current_score": prev_metrics["score"],
"metrics_before": prev_metrics,
"metrics_after": current_metrics,
"delta_score": round(current_metrics["score"] - prev_metrics["score"], 3),
"chosen_candidate_index": best.get("candidate_index"),
"candidates": [
{
"candidate_index": c.get("candidate_index"),
"valid": c.get("valid", False),
"goal_improved": c.get("goal_improved", False),
"bert_phrase_delta": c.get("bert_phrase_delta"),
"local_chunk_improved": c.get("local_chunk_improved", False),
"chunk_goal_delta": c.get("chunk_goal_delta"),
"chunk_relevance_before": c.get("chunk_relevance_before"),
"chunk_relevance_after": c.get("chunk_relevance_after"),
"term_diff": c.get("term_diff"),
"llm_prompt_debug": c.get("llm_prompt_debug"),
"metrics_delta": c.get("metrics_delta"),
"invalid_reasons": c.get("invalid_reasons", []),
"delta_score": c.get("delta_score"),
"candidate_score": c.get("candidate_score"),
"sentence_after": c.get("sentence_after"),
"error": c.get("error"),
}
for c in candidates
],
}
)
# After successful edit, return to cheapest level and reset failure streak.
consecutive_failures = 0
cascade_level = 1
goal_attempt_cursor[goal_key] = 0
return {
"ok": True,
"optimized_text": current_text,
"baseline_metrics": baseline_metrics,
"final_metrics": current_metrics,
"iterations": logs,
"applied_changes": applied_changes,
"optimization_mode": optimization_mode,
}