diff --git "a/optimizer.py" "b/optimizer.py"
--- "a/optimizer.py"
+++ "b/optimizer.py"
@@ -1,6 +1,4 @@
import json
-import difflib
-import html as html_lib
import re
from itertools import combinations
from typing import Any, Dict, List, Optional, Tuple
@@ -24,135 +22,8 @@ STOP_WORDS = {
BERT_TARGET_THRESHOLD = 0.7
BERT_GOAL_DELTA_MIN = 0.005
-TITLE_TARGET_THRESHOLD = 0.65
SEMANTIC_GAP_TOLERANCE_PCT = 0.15
SEMANTIC_GAP_MIN_ABS = 3.0
-STAGE_ORDER = ["bert", "bm25", "ngram", "semantic", "title"]
-NGRAM_ATTEMPTS_PER_TERM = 3
-
-
-def _normalize_stage_name(v: Any) -> str:
- s = str(v or "").strip().lower()
- return s if s in STAGE_ORDER else ""
-
-
-def _goal_label_canonical(goal: Dict[str, Any]) -> str:
- t = str(goal.get("type", "") or "").strip().lower()
- if t == "bm25":
- term = str(goal.get("bm25_word", "") or "").strip().lower()
- if term:
- return term
- label = str(goal.get("label", "") or "").strip().lower()
- if label.startswith("reduce spam:"):
- return label.replace("reduce spam:", "", 1).strip()
- return label
- return str(goal.get("label", "") or "").strip().lower()
-
-
-def _build_custom_goal(stage: str, value: str, language: str) -> Optional[Dict[str, Any]]:
- raw = str(value or "").strip()
- if not raw:
- return None
- if stage == "bert":
- return {
- "type": "bert",
- "label": raw,
- "focus_terms": _filter_stopwords(_tokenize(raw), language)[:6],
- "avoid_terms": [],
- "bert_phrase_score": 0.0,
- "bert_target": float(BERT_TARGET_THRESHOLD),
- }
- if stage == "bm25":
- return {
- "type": "bm25",
- "label": f"reduce spam: {raw}",
- "focus_terms": [],
- "avoid_terms": [raw],
- "bm25_count": 2,
- "bm25_word": raw,
- }
- if stage == "ngram":
- return {
- "type": "ngram",
- "label": raw,
- "focus_terms": [raw],
- "avoid_terms": [],
- "ngram_target_count": 0.0,
- "ngram_comp_avg": 1.0,
- "ngram_tolerance_pct": 0.5,
- "ngram_lower_bound": 0.5,
- "ngram_upper_bound": 1.5,
- "ngram_direction": "increase",
- "ngram_rank_index": 0,
- "ngram_candidates_total": 1,
- }
- if stage == "semantic":
- return {
- "type": "semantic",
- "label": raw,
- "focus_terms": [raw],
- "avoid_terms": [],
- "semantic_gap": float(SEMANTIC_GAP_MIN_ABS),
- }
- if stage == "title":
- return {
- "type": "title",
- "label": "title alignment",
- "focus_terms": _filter_stopwords(_tokenize(raw), language)[:8],
- "avoid_terms": [],
- "title_bert_score": 0.0,
- "title_target": float(TITLE_TARGET_THRESHOLD),
- }
- return None
-
-
-def _apply_stage_goal_override(
- goals: List[Dict[str, Any]],
- stage: str,
- language: str,
- stage_goal_overrides: Optional[Dict[str, Any]],
-) -> List[Dict[str, Any]]:
- ov_all = stage_goal_overrides or {}
- ov = ov_all.get(stage) if isinstance(ov_all, dict) else None
- if not isinstance(ov, dict):
- return goals
-
- mode = str(ov.get("mode", "auto") or "auto").strip().lower()
- if mode not in {"auto", "manual", "mixed"}:
- mode = "auto"
-
- selected_raw = ov.get("selected") or []
- custom_raw = ov.get("custom_add") or []
- selected_set = {
- str(x or "").strip().lower()
- for x in selected_raw
- if str(x or "").strip()
- }
-
- custom_goals: List[Dict[str, Any]] = []
- for item in custom_raw:
- g = _build_custom_goal(stage, str(item or ""), language)
- if g:
- custom_goals.append(g)
-
- if mode == "auto":
- out = list(goals)
- else:
- out = []
- for g in goals:
- if _goal_label_canonical(g) in selected_set:
- out.append(g)
-
- if mode in {"manual", "mixed"}:
- out.extend(custom_goals)
-
- # Deduplicate by canonical goal label to keep deterministic cursor behavior.
- dedup: Dict[str, Dict[str, Any]] = {}
- for g in out:
- key = _goal_label_canonical(g)
- if key and key not in dedup:
- dedup[key] = g
- return list(dedup.values())
def _tokenize(text: str) -> List[str]:
@@ -163,38 +34,6 @@ def _tokenize(text: str) -> List[str]:
]
-def _tokenize_ngrams_strict(text: str) -> List[str]:
- """
- Tokenizer for strict n-gram matching.
- В отличие от _tokenize() здесь НЕ отбрасываем короткие токены (например, "a"),
- чтобы совпадать с теми же n-граммами, которые генерирует логика BM25.
- """
- return [
- x
- for x in re.sub(r"[^\w\s-]+", " ", (text or "").lower(), flags=re.UNICODE).split()
- if x
- ]
-
-
-def _count_term_ngrams_strict(text: str, term: str) -> int:
- """
- Strict count of exact token n-gram occurrences (overlapping allowed).
- Считает вхождения term как последовательности токенов.
- """
- text_tokens = _tokenize_ngrams_strict(text)
- term_tokens = _tokenize_ngrams_strict(term)
- m = len(term_tokens)
- if m == 0 or not text_tokens or len(text_tokens) < m:
- return 0
-
- cnt = 0
- # Sliding window over token sequence
- for i in range(0, len(text_tokens) - m + 1):
- if text_tokens[i : i + m] == term_tokens:
- cnt += 1
- return cnt
-
-
def _filter_stopwords(tokens: List[str], language: str) -> List[str]:
stop = STOP_WORDS.get(language, STOP_WORDS["en"])
return [t for t in tokens if t not in stop]
@@ -211,69 +50,6 @@ def _split_sentences(text: str) -> List[str]:
return parts
-def _escape_html(v: Any) -> str:
- return html_lib.escape(str(v or ""), quote=True)
-
-
-def _diff_sentences_html(before_text: str, after_text: str) -> Tuple[str, List[Dict[str, str]]]:
- """
- sentence-level diff for UI highlighting.
- Возвращает:
- - html для отображения ТОЛЬКО after_text (optimized),
- - список блоков что было/что стало (для "что именно менять").
- """
- before_sents = _split_sentences(before_text)
- after_sents = _split_sentences(after_text)
-
- matcher = difflib.SequenceMatcher(None, before_sents, after_sents, autojunk=False)
- parts: List[str] = []
- changes: List[Dict[str, str]] = []
-
- for tag, i1, i2, j1, j2 in matcher.get_opcodes():
- if tag == "equal":
- for j in range(j1, j2):
- parts.append(_escape_html(after_sents[j]))
- elif tag == "replace":
- from_txt = " ".join(before_sents[i1:i2]).strip()
- to_txt = " ".join(after_sents[j1:j2]).strip()
- changes.append({"type": "replace", "from": from_txt, "to": to_txt})
- for j in range(j1, j2):
- parts.append(
- f'{_escape_html(after_sents[j])}'
- )
- elif tag == "insert":
- to_txt = " ".join(after_sents[j1:j2]).strip()
- changes.append({"type": "insert", "from": "", "to": to_txt})
- for j in range(j1, j2):
- parts.append(
- f'{_escape_html(after_sents[j])}'
- )
- elif tag == "delete":
- from_txt = " ".join(before_sents[i1:i2]).strip()
- changes.append({"type": "delete", "from": from_txt, "to": ""})
- else:
- # Defensive: should not happen.
- for j in range(j1, j2):
- parts.append(_escape_html(after_sents[j]))
-
- diff_html = " ".join(parts).strip()
- return diff_html, changes
-
-
-def _diff_title_html(before_title: str, after_title: str) -> Tuple[str, List[Dict[str, str]]]:
- before_t = (before_title or "").strip()
- after_t = (after_title or "").strip()
- if before_t == after_t:
- return "", []
- if not after_t:
- # Title removed: show nothing in UI, but keep "from/to" for debug.
- return "", [{"type": "delete", "from": before_t, "to": ""}]
- return (
- f'{_escape_html(after_t)}',
- [{"type": "replace", "from": before_t, "to": after_t}],
- )
-
-
def _max_sentences_for_level(cascade_level: int, operation: str) -> int:
if operation == "insert":
return 2
@@ -284,52 +60,7 @@ def _max_sentences_for_level(cascade_level: int, operation: str) -> int:
return 4
-def _phrase_strategy_variants(mode: str, goal_type: str, goal_label: str) -> List[str]:
- normalized = (mode or "auto").strip().lower()
- if normalized == "ensemble":
- if goal_type != "bert":
- return ["auto"]
- phrase_len = len(_tokenize(goal_label or ""))
- if phrase_len >= 3:
- return ["distributed_preferred", "auto", "exact_preferred"]
- return ["auto", "exact_preferred", "distributed_preferred"]
- if normalized in {"auto", "exact_preferred", "distributed_preferred"}:
- return [normalized]
- return ["auto"]
-
-
-def _build_phrase_strategy_plan(mode: str, goal_type: str, goal_label: str, count: int) -> List[str]:
- variants = _phrase_strategy_variants(mode, goal_type, goal_label)
- if count <= 0:
- return []
- return [variants[i % len(variants)] for i in range(count)]
-
-
-def _validate_title_candidate(edited_text: str) -> List[str]:
- """Guardrails for HTML
plain text (not body sentences)."""
- reasons: List[str] = []
- text = (edited_text or "").strip()
- if not text:
- reasons.append("empty_title")
- return reasons
- if "\n" in text or "\r" in text:
- reasons.append("title_multiline")
- if len(text) > 90:
- reasons.append("title_too_long>90")
- if len(text) < 8:
- reasons.append("title_too_short")
- if re.search(r"<[^>]+>", text):
- reasons.append("title_contains_html_tags")
- return reasons
-
-
-def _validate_candidate_text(
- edited_text: str,
- cascade_level: int,
- operation: str,
- goal_label: str = "",
- focus_terms: Optional[List[str]] = None,
-) -> List[str]:
+def _validate_candidate_text(edited_text: str, cascade_level: int, operation: str) -> List[str]:
reasons: List[str] = []
text = (edited_text or "").strip()
if not text:
@@ -348,29 +79,6 @@ def _validate_candidate_text(
if re.search(r"\b[a-z]{6,}[A-Z][a-z]+\b", text):
reasons.append("suspicious_token_join")
- # Anti-stuffing checks for BERT phrase goals.
- focus_terms = focus_terms or []
- phrase = (goal_label or "").strip().lower()
- normalized = re.sub(r"\s+", " ", text.lower())
-
- if phrase:
- phrase_occurrences = normalized.count(phrase)
- phrase_token_count = len(_tokenize(phrase))
- # For long goal phrases, repeated exact matches are usually unnatural.
- if phrase_token_count >= 3 and phrase_occurrences > 1:
- reasons.append("exact_phrase_stuffing")
- elif phrase_occurrences > 2:
- reasons.append("exact_phrase_stuffing")
-
- for term in focus_terms:
- tok = (term or "").strip().lower()
- if not tok:
- continue
- term_occurrences = len(re.findall(rf"\b{re.escape(tok)}\b", normalized))
- if term_occurrences > 3:
- reasons.append("focus_term_overuse")
- break
-
return reasons
@@ -472,177 +180,12 @@ def _is_semantic_gap(target_weight: float, competitor_avg_weight: float) -> bool
return (competitor_avg_weight > rel_threshold) and (abs_gap >= SEMANTIC_GAP_MIN_ABS)
-def _ngram_tolerance_pct(competitor_avg: float) -> float:
- # User rule:
- # - avg >= 4 -> +/-20%
- # - avg < 4 -> +/-50%
- return 0.20 if competitor_avg >= 4.0 else 0.50
-
-
-def _is_ngram_outside_tolerance(target_count: float, competitor_avg: float) -> bool:
- if competitor_avg <= 0:
- return False
- tol = _ngram_tolerance_pct(competitor_avg)
- low = competitor_avg * (1.0 - tol)
- high = competitor_avg * (1.0 + tol)
- return target_count < low or target_count > high
-
-
-def _ngram_deviation_ratio(target_count: float, competitor_avg: float) -> float:
- if competitor_avg <= 0:
- return 0.0
- return abs(target_count - competitor_avg) / max(competitor_avg, 1e-6)
-
-
-def _keyword_unigram_set(keywords: List[str], language: str) -> set:
- out = set()
- for kw in keywords:
- toks = _filter_stopwords(_tokenize(kw), language)
- for t in toks:
- out.add(t)
- return out
-
-
-def _is_ngram_stage_candidate(
- ngram_label: str,
- comp_occurrence: int,
- competitor_count: int,
- keyword_unigrams: set,
-) -> bool:
- ngram = (ngram_label or "").strip().lower()
- if not ngram:
- return False
- tokens = _tokenize(ngram)
- n = len(tokens)
- if competitor_count > 1:
- if comp_occurrence < 2:
- return False
- if n >= 2:
- # For multi-competitor mode, bi/tri-grams with K>=2 are always candidates.
- return True
- # Unigrams are candidates only if they belong to key phrases.
- return n == 1 and tokens[0] in keyword_unigrams
- # Single-competitor mode: keep broader eligibility.
- return comp_occurrence >= 1
-
-
-def _chunk_ngram_count(text: str, ngram_label: str, language: str) -> int:
- toks = _filter_stopwords(_tokenize(text), language)
- phrase_toks = _filter_stopwords(_tokenize(ngram_label), language)
- if not toks or not phrase_toks:
- return 0
- n = len(phrase_toks)
- if n == 1:
- term = phrase_toks[0]
- return sum(1 for t in toks if t == term)
- count = 0
- for i in range(0, max(0, len(toks) - n + 1)):
- if toks[i : i + n] == phrase_toks:
- count += 1
- return count
-
-
-def _build_ngram_stage_rows(
- analysis: Dict[str, Any],
- keywords: List[str],
- language: str,
-) -> List[Tuple[str, float, float, float, int, float]]:
- """
- Eligible underrepresented n-gram targets for the ngram stage.
- Each row: (ngram_label, target_count, comp_avg, tolerance_pct, comp_occurrence, dev_ratio).
-
- Priority (user policy): maximize competitor coverage Freq(K), then Avg(K), then
- how far below the band the target is (larger deviation first).
- Only terms with My=0 or outside the tolerance band below average are included.
- """
- ngram_rows: List[Tuple[str, float, float, float, int, float]] = []
- ngram_stats = analysis.get("ngram_stats", {}) or {}
- competitor_count = len((analysis.get("word_counts", {}) or {}).get("competitors", []) or [])
- keyword_unigrams = _keyword_unigram_set(keywords, language)
- for _bucket_name, bucket in ngram_stats.items():
- if not isinstance(bucket, list):
- continue
- for item in bucket:
- ngram_label = str(item.get("ngram", "")).strip()
- if not ngram_label:
- continue
- target = float(item.get("target_count", 0))
- comp_avg = float(item.get("competitor_avg", 0))
- comp_occ = int(item.get("comp_occurrence", 0))
- if not _is_ngram_stage_candidate(ngram_label, comp_occ, competitor_count, keyword_unigrams):
- continue
- if not _is_ngram_outside_tolerance(target, comp_avg):
- continue
- if target >= comp_avg:
- continue
- tol = _ngram_tolerance_pct(comp_avg)
- dev_ratio = _ngram_deviation_ratio(target, comp_avg)
- ngram_rows.append((ngram_label, target, comp_avg, tol, comp_occ, dev_ratio))
- ngram_rows.sort(key=lambda x: (x[4], x[2], x[5]), reverse=True)
- return ngram_rows
-
-
-def _score_ngram_candidate_window(window_sentences: List[str], goal_label: str, language: str) -> float:
- """Heuristic: good place to add phrase — low local duplication, topical proximity, not boilerplate."""
- chunk = " ".join(s for s in window_sentences if s).strip()
- if not chunk:
- return -1e6
- phrase_count = float(_chunk_ngram_count(chunk, goal_label, language))
- noise_n = sum(1 for s in window_sentences if _is_noise_like_sentence(s))
- noise_frac = noise_n / max(1, len(window_sentences))
- phrase_tokens = [t.lower() for t in _filter_stopwords(_tokenize(goal_label), language) if t]
- chunk_l = chunk.lower()
- unigram_hits = sum(1 for t in phrase_tokens if t and len(t) > 1 and t in chunk_l)
- rel_proxy = unigram_hits / max(1, len(phrase_tokens)) if phrase_tokens else 0.0
- return (
- -3.0 * phrase_count
- + 2.2 * rel_proxy
- - 4.0 * noise_frac
- + min(len(chunk) / 1200.0, 0.35)
- )
-
-
-def _rank_ngram_overlap_sentence_indices(
- sentences: List[str],
- goal_label: str,
- language: str,
-) -> List[int]:
- """
- Slide overlapping multi-sentence windows over the document; each sentence gets the
- best score among windows that contain it. Order sentences by that score (desc).
- """
- n = len(sentences)
- if n <= 0:
- return [0]
- if n == 1:
- return [0]
- # 2–4 sentences per window, stride 1 for strong overlap.
- w = min(4, max(2, n))
- best: List[float] = [-1e9] * n
- for start in range(0, n - w + 1):
- win = sentences[start : start + w]
- sc = _score_ngram_candidate_window(win, goal_label, language)
- for j in range(start, start + w):
- if sc > best[j]:
- best[j] = sc
- center = (n - 1) / 2.0
- scored_idx = [(i, best[i], -abs(i - center)) for i in range(n)]
- scored_idx.sort(key=lambda t: (t[1], t[2]), reverse=True)
- return [t[0] for t in scored_idx]
-
-
-def _compute_metrics(
- analysis: Dict[str, Any],
- semantic: Dict[str, Any],
- keywords: List[str],
- language: str,
- bert_stage_target: float = BERT_TARGET_THRESHOLD,
-) -> Dict[str, Any]:
+def _compute_metrics(analysis: Dict[str, Any], semantic: Dict[str, Any], keywords: List[str], language: str) -> Dict[str, Any]:
competitor_count = len(analysis.get("word_counts", {}).get("competitors", []))
min_signal = 1 if competitor_count <= 1 else 2
bert_details = analysis.get("bert_analysis", {}).get("detailed", []) or []
- bert_low = [d for d in bert_details if float(d.get("my_max_score", 0)) < float(bert_stage_target)]
+ bert_low = [d for d in bert_details if float(d.get("my_max_score", 0)) < BERT_TARGET_THRESHOLD]
bert_phrase_scores = {
str(d.get("phrase", "")).strip().lower(): float(d.get("my_max_score", 0) or 0.0)
for d in bert_details
@@ -653,22 +196,17 @@ def _compute_metrics(
bm25_remove_count = len(bm25_remove)
ngram_signal_count = 0
- ngram_gap_sum = 0.0
- keyword_unigrams = _keyword_unigram_set(keywords, language)
ngrams = analysis.get("ngram_stats", {}) or {}
- for bucket in ngrams.values():
- if not isinstance(bucket, list):
- continue
- for item in bucket:
+ for bucket_name in ("unigrams", "bigrams"):
+ for item in (ngrams.get(bucket_name) or []):
comp_occ = int(item.get("comp_occurrence", 0))
- ngram_label = str(item.get("ngram", ""))
- if not _is_ngram_stage_candidate(ngram_label, comp_occ, competitor_count, keyword_unigrams):
+ if comp_occ < min_signal:
continue
target = float(item.get("target_count", 0))
comp_avg = float(item.get("competitor_avg", 0))
- if _is_ngram_outside_tolerance(target, comp_avg):
+ ratio_signal = comp_avg > 0 if target == 0 else comp_avg >= target * 2
+ if ratio_signal:
ngram_signal_count += 1
- ngram_gap_sum += _ngram_deviation_ratio(target, comp_avg)
title_score = None
title_bert = analysis.get("title_analysis", {}).get("bert", {})
@@ -730,11 +268,6 @@ def _compute_metrics(
total_w = w_bert + w_bm25 + w_ng + w_title + w_sem
score = round((weighted / total_w) * 100.0, 2)
- resolved_title = ""
- _td = analysis.get("title_analysis") or {}
- if isinstance(_td, dict) and (_td.get("target_title") or "").strip():
- resolved_title = str(_td.get("target_title")).strip()
-
return {
"score": score,
"competitor_count": competitor_count,
@@ -744,9 +277,7 @@ def _compute_metrics(
"bert_phrase_scores": bert_phrase_scores,
"bm25_remove_count": bm25_remove_count,
"ngram_signal_count": ngram_signal_count,
- "ngram_gap_sum": round(ngram_gap_sum, 4),
"title_bert_score": title_score,
- "resolved_title": resolved_title,
"semantic_gap_count": semantic_gap_count,
"semantic_gap_sum": round(semantic_gap_sum, 4),
"semantic_gap_terms": sorted(
@@ -757,76 +288,18 @@ def _compute_metrics(
}
-def _choose_optimization_goal(
- analysis: Dict[str, Any],
- semantic: Dict[str, Any],
- keywords: List[str],
- language: str,
- stage: str = "bert",
- bert_stage_target: float = BERT_TARGET_THRESHOLD,
- stage_cursor: int = 0,
-) -> Dict[str, Any]:
- goals = _collect_optimization_goals(
- analysis=analysis,
- semantic=semantic,
- keywords=keywords,
- language=language,
- stage=stage,
- bert_stage_target=bert_stage_target,
- )
- if not goals:
- return {"type": "none", "label": "no-op", "focus_terms": [], "avoid_terms": []}
- pick = max(0, int(stage_cursor))
- if pick >= len(goals):
- return {"type": "none", "label": "no-op", "focus_terms": [], "avoid_terms": []}
- return goals[pick]
-
-
-def _collect_optimization_goals(
- analysis: Dict[str, Any],
- semantic: Dict[str, Any],
- keywords: List[str],
- language: str,
- stage: str = "bert",
- bert_stage_target: float = BERT_TARGET_THRESHOLD,
- stage_goal_overrides: Optional[Dict[str, Any]] = None,
-) -> List[Dict[str, Any]]:
- goals: List[Dict[str, Any]] = []
+def _choose_optimization_goal(analysis: Dict[str, Any], semantic: Dict[str, Any], keywords: List[str], language: str) -> Dict[str, Any]:
bert_details = analysis.get("bert_analysis", {}).get("detailed", []) or []
- low_bert = [x for x in bert_details if float(x.get("my_max_score", 0)) < float(bert_stage_target)]
+ low_bert = [x for x in bert_details if float(x.get("my_max_score", 0)) < BERT_TARGET_THRESHOLD]
if low_bert:
- for row in sorted(low_bert, key=lambda x: float(x.get("my_max_score", 0))):
- phrase = str(row.get("phrase", "")).strip()
- if not phrase:
- continue
- focus_terms = _filter_stopwords(_tokenize(phrase), language)[:4]
- goals.append(
- {
- "type": "bert",
- "label": phrase,
- "focus_terms": focus_terms,
- "avoid_terms": [],
- "bert_phrase_score": float(row.get("my_max_score", 0) or 0.0),
- "bert_target": float(bert_stage_target),
- }
- )
+ worst = sorted(low_bert, key=lambda x: float(x.get("my_max_score", 0)))[0]
+ focus_terms = _filter_stopwords(_tokenize(worst.get("phrase", "")), language)[:4]
+ return {"type": "bert", "label": str(worst.get("phrase", "")), "focus_terms": focus_terms, "avoid_terms": []}
bm25_remove = [x for x in (analysis.get("bm25_recommendations") or []) if x.get("action") == "remove"]
if len(bm25_remove) >= 4:
- for row in sorted(bm25_remove, key=lambda r: int(r.get("count", 0)), reverse=True)[:8]:
- word = str(row.get("word", "")).strip()
- if not word:
- continue
- goals.append(
- {
- "type": "bm25",
- "label": f"reduce spam: {word}",
- "focus_terms": [],
- "avoid_terms": [word],
- "bm25_count": int(row.get("count", 0) or 0),
- "bm25_word": word,
- }
- )
+ spam_terms = [str(x.get("word", "")) for x in sorted(bm25_remove, key=lambda r: int(r.get("count", 0)), reverse=True)[:4]]
+ return {"type": "bm25", "label": "reduce spam", "focus_terms": [], "avoid_terms": spam_terms}
# Semantic keyword gaps
lang_stop = STOP_WORDS.get(language, STOP_WORDS["en"])
@@ -849,133 +322,19 @@ def _collect_optimization_goals(
if _is_semantic_gap(target_w, comp_w):
candidate_rows.append((term, gap))
if candidate_rows:
- for term, gap in sorted(candidate_rows, key=lambda x: x[1], reverse=True)[:12]:
- goals.append(
- {
- "type": "semantic",
- "label": term,
- "focus_terms": [term],
- "avoid_terms": [],
- "semantic_gap": float(gap),
- }
- )
-
- # N-gram balancing (toward competitor average with tolerance policy).
- ngram_rows = _build_ngram_stage_rows(analysis, keywords, language)
- if ngram_rows:
- for rank, (label, target, comp_avg, tol, _, _) in enumerate(ngram_rows):
- goals.append({
- "type": "ngram",
- "label": label,
- "focus_terms": [label],
- "avoid_terms": [],
- "ngram_target_count": target,
- "ngram_comp_avg": comp_avg,
- "ngram_tolerance_pct": tol,
- "ngram_lower_bound": round(comp_avg * (1.0 - tol), 3),
- "ngram_upper_bound": round(comp_avg * (1.0 + tol), 3),
- "ngram_direction": "increase" if target < comp_avg else "decrease",
- "ngram_rank_index": rank,
- "ngram_candidates_total": len(ngram_rows),
- })
-
- title_bert = analysis.get("title_analysis", {}).get("bert", {}) or {}
- title_target_score = title_bert.get("target_score")
- if (
- keywords
- and title_target_score is not None
- and float(title_target_score) < TITLE_TARGET_THRESHOLD
- ):
- goals.append(
- {
- "type": "title",
- "label": "title alignment",
- "focus_terms": _filter_stopwords(_tokenize(" ".join(keywords[:8])), language)[:8],
- "avoid_terms": [],
- "title_bert_score": float(title_target_score) if title_target_score is not None else None,
- "title_target": float(TITLE_TARGET_THRESHOLD),
- }
- )
-
- stage_goals = [g for g in goals if g.get("type") == stage]
- return _apply_stage_goal_override(stage_goals, stage, language, stage_goal_overrides)
-
-
-def _per_goal_budget(
- goal: Dict[str, Any],
- max_iterations: int,
- candidates_per_iteration: int,
- bert_stage_target: float,
-) -> Tuple[int, int]:
- """
- Scale per-goal iteration and candidate budgets by how far the metric is from its target.
- Returns (effective_max_iterations_for_this_goal, effective_candidates_per_iteration).
- """
- t = str(goal.get("type", "") or "")
- raw = 0.0
-
- if t == "bert":
- sc = float(goal.get("bert_phrase_score", 0.0) or 0.0)
- tgt = float(goal.get("bert_target", bert_stage_target) or bert_stage_target)
- raw = max(0.0, (tgt - sc) / max(tgt, 1e-6))
- elif t == "ngram":
- ca = float(goal.get("ngram_comp_avg", 0.0) or 0.0)
- tc = float(goal.get("ngram_target_count", 0.0) or 0.0)
- if str(goal.get("ngram_direction", "increase")) == "increase":
- need = max(0.0, ca - tc)
- raw = min(1.0, need / max(ca, 1e-6))
- else:
- need = max(0.0, tc - ca)
- raw = min(1.0, need / max(tc, 1e-6))
- elif t == "semantic":
- gap = float(goal.get("semantic_gap", 0.0) or 0.0)
- raw = min(1.0, gap / max(SEMANTIC_GAP_MIN_ABS * 4.0, 1e-6))
- elif t == "bm25":
- c = int(goal.get("bm25_count", 0) or 0)
- raw = min(1.0, max(0, c - 1) / 8.0)
- elif t == "title":
- ts = goal.get("title_bert_score")
- if ts is None:
- raw = 0.5
- else:
- tgt = float(goal.get("title_target", TITLE_TARGET_THRESHOLD) or TITLE_TARGET_THRESHOLD)
- raw = max(0.0, (tgt - float(ts)) / max(tgt, 1e-6))
- else:
- raw = 0.0
-
- iter_mult = 1.0 + 2.0 * min(1.0, raw)
- cand_mult = 1.0 + 1.0 * min(1.0, raw)
- eff_iter = max(1, min(int(round(max_iterations * iter_mult)), max_iterations * 3))
- eff_cand = max(1, min(int(round(candidates_per_iteration * cand_mult)), 5))
- return eff_iter, eff_cand
+ top_term = sorted(candidate_rows, key=lambda x: x[1], reverse=True)[0][0]
+ return {"type": "semantic", "label": top_term, "focus_terms": [top_term], "avoid_terms": []}
+ # Fallback: ngram add signal
+ for bucket_name in ("unigrams", "bigrams"):
+ bucket = analysis.get("ngram_stats", {}).get(bucket_name, []) or []
+ for item in bucket:
+ target = float(item.get("target_count", 0))
+ comp_avg = float(item.get("competitor_avg", 0))
+ if (target == 0 and comp_avg > 0) or (target > 0 and comp_avg >= target * 2):
+ return {"type": "ngram", "label": str(item.get("ngram", "")), "focus_terms": _tokenize(str(item.get("ngram", "")))[:3], "avoid_terms": []}
-def _estimate_total_loop_budget(
- analysis: Dict[str, Any],
- semantic: Dict[str, Any],
- keywords: List[str],
- language: str,
- max_iterations: int,
- candidates_per_iteration: int,
- bert_stage_target: float,
- active_stage_order: Optional[List[str]] = None,
- stage_goal_overrides: Optional[Dict[str, Any]] = None,
-) -> int:
- total = 0
- stages = active_stage_order or list(STAGE_ORDER)
- for st in stages:
- for g in _collect_optimization_goals(
- analysis,
- semantic,
- keywords,
- language,
- stage=st,
- bert_stage_target=bert_stage_target,
- stage_goal_overrides=stage_goal_overrides,
- ):
- ei, _ = _per_goal_budget(g, max_iterations, candidates_per_iteration, bert_stage_target)
- total += ei
- return min(480, max(1, total))
+ return {"type": "none", "label": "no-op", "focus_terms": [], "avoid_terms": []}
def _choose_sentence_idx(sentences: List[str], focus_terms: List[str], avoid_terms: List[str], language: str) -> int:
@@ -1017,15 +376,9 @@ def _rank_sentence_indices(
return [0]
stop = STOP_WORDS.get(language, STOP_WORDS["en"])
focus = [x for x in focus_terms if x and x not in stop]
- goal_phrase = (goal_label or "").strip().lower()
avoid = [x for x in avoid_terms if x]
center = (len(sentences) - 1) / 2.0
- # N-gram stage: overlapping sentence windows — pick spans where insertion is natural
- # while document-level phrase count remains the primary optimization signal.
- if goal_type == "ngram" and (goal_label or "").strip():
- return _rank_ngram_overlap_sentence_indices(sentences, str(goal_label).strip(), language)
-
# For BERT optimization prefer natural prose chunks over list/menu/noisy blocks.
candidate_indices = list(range(len(sentences)))
if goal_type == "bert":
@@ -1037,43 +390,12 @@ def _rank_sentence_indices(
for idx in candidate_indices:
s = sentences[idx]
lower = s.lower()
- # For bm25 "remove" goals we must select spans that actually contain the exact n-gram term.
- # Otherwise the LLM can "optimize around" the goal without decreasing the real BM25 remove metric.
- if goal_type == "bm25" and avoid and not focus_terms:
- bm25_term = avoid[0]
- bm25_count = _count_term_ngrams_strict(s, bm25_term)
- focus_score = 0
- avoid_score = float(bm25_count)
- chunk_rel = float(bm25_count)
- else:
- focus_score = sum(lower.count(t.lower()) for t in focus)
- avoid_score = sum(lower.count(t.lower()) for t in avoid)
- chunk_rel = _chunk_goal_relevance(s, goal_type, goal_label, focus_terms, language)
+ focus_score = sum(lower.count(t.lower()) for t in focus)
+ avoid_score = sum(lower.count(t.lower()) for t in avoid)
+ chunk_rel = _chunk_goal_relevance(s, goal_type, goal_label, focus_terms, language)
noise_penalty = 1.0 if _is_noise_like_sentence(s) else 0.0
-
- # For BERT goals, do not over-focus on existing occurrences only:
- # prioritize semantically relevant chunks where phrase/terms may still be underrepresented.
- if goal_type == "bert":
- tokenized = _filter_stopwords(_tokenize(lower), language)
- token_set = set(tokenized)
- core_terms = [t.lower() for t in focus if t]
- core_hits = sum(1 for t in core_terms if t in token_set)
- coverage = (core_hits / max(1, len(core_terms))) if core_terms else 0.0
- phrase_present = 1.0 if (goal_phrase and goal_phrase in lower) else 0.0
- # Boost candidates where semantic context is relevant but explicit core terms are not saturated yet.
- missing_term_boost = (1.0 - coverage) * 1.4 if chunk_rel >= 0.18 else 0.0
- phrase_absent_boost = 0.35 if (goal_phrase and not phrase_present and chunk_rel >= 0.2) else 0.0
- score = (
- (chunk_rel * 5.0)
- + (focus_score * 1.2)
- + (missing_term_boost + phrase_absent_boost)
- + (avoid_score * 1.5)
- - (noise_penalty * 3.0)
- - (abs(idx - center) * 0.05)
- )
- else:
- # Prefer semantically relevant and lexical matches; push noisy headers/CTA lower.
- score = (chunk_rel * 4.0) + (focus_score * 3.0) + (avoid_score * 2.0) - (noise_penalty * 3.0) - (abs(idx - center) * 0.05)
+ # Prefer semantically relevant and lexical matches; push noisy headers/CTA lower.
+ score = (chunk_rel * 4.0) + (focus_score * 3.0) + (avoid_score * 2.0) - (noise_penalty * 3.0) - (abs(idx - center) * 0.05)
scored.append((idx, score, len(s)))
scored.sort(key=lambda x: (x[1], -x[2]), reverse=True)
@@ -1152,7 +474,6 @@ def _chunk_goal_relevance(
goal_label: str,
focus_terms: List[str],
language: str,
- goal_meta: Optional[Dict[str, Any]] = None,
) -> float:
chunk = (text or "").strip()
if not chunk:
@@ -1164,8 +485,6 @@ def _chunk_goal_relevance(
return float(logic.util.cos_sim(embeddings[0:1], embeddings[1:2])[0][0].item())
except Exception:
pass
- if goal_type == "ngram" and (goal_label or "").strip():
- return float(_chunk_ngram_count(chunk, goal_label, language))
# Lexical fallback for non-BERT goals or if embedding scoring is unavailable.
toks = _filter_stopwords(_tokenize(chunk), language)
@@ -1191,54 +510,15 @@ def _chunk_goal_delta(
goal_label: str,
focus_terms: List[str],
language: str,
- goal_meta: Optional[Dict[str, Any]] = None,
) -> float:
- # BM25 "remove spam" must be judged by strict decrease of the exact n-gram term count.
- # Иначе LLM может улучшать семантику рядом, но не уменьшать реальный BM25 remove metric.
- if goal_type == "bm25":
- term = None
- avoid = []
- focus = []
- if isinstance(goal_meta, dict):
- term = goal_meta.get("bm25_word")
- avoid = goal_meta.get("avoid_terms") or []
- focus = goal_meta.get("focus_terms") or []
- # Fallback: try to infer from avoid_terms
- if not term and avoid:
- term = avoid[0]
- if not term:
- return 0.0
-
- before_count = _count_term_ngrams_strict(before_text or "", str(term))
- after_count = _count_term_ngrams_strict(after_text or "", str(term))
-
- # For now bm25 goals are "remove" (focus_terms is empty, avoid_terms contains term).
- # Keep the sign direction explicit so add-mode can be introduced later safely.
- if avoid and not focus:
- return round(float(before_count - after_count), 4)
- # If ever we add bm25 "add" goals with focus terms, then increase is good.
- return round(float(after_count - before_count), 4)
-
- before_rel = _chunk_goal_relevance(before_text, goal_type, goal_label, focus_terms, language, goal_meta)
- after_rel = _chunk_goal_relevance(after_text, goal_type, goal_label, focus_terms, language, goal_meta)
- if goal_type == "ngram":
- target_avg = float((goal_meta or {}).get("ngram_comp_avg", 0.0))
- if target_avg > 0:
- # Positive delta means closer to competitor average regardless of direction.
- before_dist = abs(before_rel - target_avg)
- after_dist = abs(after_rel - target_avg)
- return round(before_dist - after_dist, 4)
+ before_rel = _chunk_goal_relevance(before_text, goal_type, goal_label, focus_terms, language)
+ after_rel = _chunk_goal_relevance(after_text, goal_type, goal_label, focus_terms, language)
return round(after_rel - before_rel, 4)
def _min_chunk_delta(goal_type: str) -> float:
if goal_type == "bert":
return 0.01
- if goal_type == "title":
- return 0.005
- if goal_type == "ngram":
- # Require at least one occurrence-equivalent movement toward target zone.
- return 0.5
return 0.05
@@ -1249,10 +529,9 @@ def _chunk_relevance_pair(
goal_label: str,
focus_terms: List[str],
language: str,
- goal_meta: Optional[Dict[str, Any]] = None,
) -> Tuple[float, float]:
- before_rel = _chunk_goal_relevance(before_text, goal_type, goal_label, focus_terms, language, goal_meta)
- after_rel = _chunk_goal_relevance(after_text, goal_type, goal_label, focus_terms, language, goal_meta)
+ before_rel = _chunk_goal_relevance(before_text, goal_type, goal_label, focus_terms, language)
+ after_rel = _chunk_goal_relevance(after_text, goal_type, goal_label, focus_terms, language)
return round(before_rel, 4), round(after_rel, 4)
@@ -1267,8 +546,6 @@ def _term_diff(before_text: str, after_text: str, language: str) -> Dict[str, Li
def _edits_conflict(a: Dict[str, Any], b: Dict[str, Any]) -> bool:
- if a.get("operation") == "title_rewrite" or b.get("operation") == "title_rewrite":
- return True
if a.get("operation") == "insert" or b.get("operation") == "insert":
return int(a.get("span_start", 0)) == int(b.get("span_start", 0))
a0, a1 = int(a.get("span_start", 0)), int(a.get("span_end", 0))
@@ -1303,7 +580,6 @@ def _metrics_delta(prev_metrics: Dict[str, Any], next_metrics: Dict[str, Any]) -
"bert_low_count",
"bm25_remove_count",
"ngram_signal_count",
- "ngram_gap_sum",
"semantic_gap_count",
"semantic_gap_sum",
]
@@ -1386,21 +662,12 @@ def _llm_edit_chunk(
focus_terms: List[str],
avoid_terms: List[str],
temperature: float,
- phrase_strategy_mode: str = "auto",
) -> Dict[str, Any]:
- # Title optimization must never touch body text; see optimize_text branch for goal.type == "title".
- if str(goal_type or "").strip().lower() == "title":
- raise ValueError(
- "Internal error: goal_type 'title' must be handled via _llm_edit_title(), not _llm_edit_chunk(). "
- "If you see this on a server, deploy the current optimizer (title-only LLM path)."
- )
endpoint = base_url.rstrip("/") + "/chat/completions"
op = operation if operation in {"rewrite", "insert"} else "rewrite"
system_msg = (
- "You are a semantic-vector optimizer for SEO tasks. "
- "Your task is to improve chunk relevance to the focus terms/goal phrase with minimal local edits. "
- "Preserve narrative flow, factual tone, and language. "
- "Return strict JSON only: {\"edited_text\": \"...\", \"rationale\": \"...\"}. "
+ "You are an SEO copy editor. Work locally, preserve narrative flow, factual tone, and language. "
+ "Return strict JSON only: {\"edited_text\": \"...\"}. "
"Do not rewrite the whole text. Never change topic or introduce unrelated entities."
)
op_instruction = (
@@ -1409,45 +676,16 @@ def _llm_edit_chunk(
else "Create a short bridge chunk (1-2 sentences) to insert after the chunk."
)
max_sent = _max_sentences_for_level(cascade_level, op)
- phrase_tokens = _filter_stopwords(_tokenize(goal_label or ""), language)
- phrase_len = len(phrase_tokens)
- strategy_mode = (phrase_strategy_mode or "auto").strip().lower()
- if strategy_mode not in {"auto", "exact_preferred", "distributed_preferred", "ensemble"}:
- strategy_mode = "auto"
- if strategy_mode == "exact_preferred":
- phrase_strategy = (
- "Prefer one natural exact phrase mention when grammatically correct; otherwise use distributed core-term coverage."
- )
- elif strategy_mode == "distributed_preferred":
- phrase_strategy = (
- "Prefer distributed semantic coverage: spread core terms/lemmas naturally and avoid exact phrase unless absolutely natural."
- )
- elif phrase_len >= 3:
- phrase_strategy = (
- "Prefer distributed semantic coverage for long phrases: naturally spread core terms/lemmas across the local paragraph. "
- "Use exact phrase only if it is grammatically natural."
- )
- elif phrase_len == 2:
- phrase_strategy = (
- "For two-term goals, use either one natural exact phrase or distributed use of both terms without repetition."
- )
- else:
- phrase_strategy = (
- "For single-term goals, improve relevance using natural lexical variants and nearby semantic anchors."
- )
user_msg = (
f"Language: {language}\n"
f"Operation: {op}\n"
f"Cascade level: L{cascade_level}\n"
f"Goal: {goal_type} ({goal_label})\n"
- f"Goal token count (without stopwords): {phrase_len}\n"
f"Instruction: {op_instruction}\n"
f"Must preserve overall narrative and style.\n"
"Text must be grammatically correct and natural for native readers.\n"
"Keep edits tightly local to the provided chunk and immediate context only.\n"
"Edit must be substantive (not just synonyms) and should increase relevance to the goal phrase.\n"
- "Do not change the sentence subject/entity focus unless absolutely required by grammar.\n"
- f"Phrase strategy: {phrase_strategy}\n"
f"Focus terms to strengthen: {', '.join(focus_terms) if focus_terms else '-'}\n"
f"Terms to de-emphasize/avoid overuse: {', '.join(avoid_terms) if avoid_terms else '-'}\n\n"
f"Chunk to edit/expand:\n{chunk_text}\n\n"
@@ -1458,13 +696,8 @@ def _llm_edit_chunk(
"2) Keep local coherence with surrounding text.\n"
f"3) Max {max_sent} sentence(s) in edited_text.\n"
"4) Keep key named entities from the original chunk unchanged when possible.\n"
- "5) For BERT goals, prioritize semantic alignment over exact phrase repetition.\n"
- "6) If exact phrase sounds unnatural, do NOT force it; use grammatically correct distributed wording.\n"
- "7) Exact phrase may appear at most once, and only when it reads naturally.\n"
- "8) Avoid repeating the same focus term more than needed; no stuffing.\n"
- "9) For rewrite: preserve original meaning sentence-by-sentence while improving relevance.\n"
- "10) Provide rationale in one short sentence.\n"
- "11) Only output JSON object."
+ "5) For BERT goal, improve semantic match to goal phrase without keyword stuffing.\n"
+ "6) Only output JSON object."
)
payload = {
"model": model,
@@ -1486,15 +719,12 @@ def _llm_edit_chunk(
)
parsed = _extract_json_object(content)
edited = ""
- rationale = ""
if parsed:
edited = str(parsed.get("edited_text") or parsed.get("revised_sentence") or parsed.get("rewrite") or "").strip()
- rationale = str(parsed.get("rationale") or parsed.get("why") or "").strip()
if not edited:
raise ValueError("LLM returned invalid JSON edit payload.")
return {
"edited_text": edited,
- "rationale": rationale,
"prompt_debug": {
"operation": op,
"cascade_level": cascade_level,
@@ -1502,9 +732,6 @@ def _llm_edit_chunk(
"goal_label": goal_label,
"focus_terms": focus_terms,
"avoid_terms": avoid_terms,
- "phrase_strategy_mode": strategy_mode,
- "goal_token_count": phrase_len,
- "phrase_strategy": phrase_strategy,
"max_sentences": max_sent,
"chunk_text": chunk_text,
"context_before": context_before,
@@ -1514,117 +741,6 @@ def _llm_edit_chunk(
}
-def _llm_edit_title(
- *,
- api_key: str,
- base_url: str,
- model: str,
- language: str,
- current_title: str,
- body_excerpt: str,
- competitor_title_hint: str,
- focus_terms: List[str],
- avoid_terms: List[str],
- keywords: List[str],
- cascade_level: int,
- temperature: float,
- phrase_strategy_mode: str = "auto",
-) -> Dict[str, Any]:
- """
- Rewrite only the page plain text. Backend metric: mean BERT cos-sim(title, each keyword).
- """
- endpoint = base_url.rstrip("/") + "/chat/completions"
- strategy_mode = (phrase_strategy_mode or "auto").strip().lower()
- if strategy_mode not in {"auto", "exact_preferred", "distributed_preferred", "ensemble"}:
- strategy_mode = "auto"
- phrase_tokens = _filter_stopwords(_tokenize(" ".join(keywords[:6])), language)
- phrase_len = len(phrase_tokens)
- if strategy_mode == "exact_preferred":
- phrase_strategy = (
- "Prefer one natural exact phrase from the keyword list when it fits; otherwise distribute core terms."
- )
- elif strategy_mode == "distributed_preferred":
- phrase_strategy = "Prefer natural distribution of core terms; avoid stiff exact-match phrasing."
- elif phrase_len >= 3:
- phrase_strategy = (
- "Spread core lemmas across the title; exact multi-word match only if it reads like a real title."
- )
- elif phrase_len == 2:
- phrase_strategy = "Use both core ideas in one short title line without repetition."
- else:
- phrase_strategy = "Keep the title specific and aligned with the keyword theme using natural wording."
-
- system_msg = (
- "You optimize HTML tag text for SEO. "
- "Output JSON only: {\"edited_text\": \"...\", \"rationale\": \"...\"}. "
- "edited_text must be the new title line ONLY: plain text, no tags, no quotes wrapping the whole title. "
- "Do not change the article body. Preserve the primary brand/site name from the current title when present."
- )
- kw_line = ", ".join(k.strip() for k in (keywords or [])[:24] if k.strip()) or "-"
- user_msg = (
- f"Language: {language}\n"
- f"Cascade level: L{cascade_level}\n"
- f"Task: Improve semantic alignment of the TITLE with these keywords (average embedding similarity should rise).\n"
- f"Keywords (priority): {kw_line}\n"
- f"Focus terms: {', '.join(focus_terms) if focus_terms else '-'}\n"
- f"De-emphasize: {', '.join(avoid_terms) if avoid_terms else '-'}\n"
- f"Phrase strategy: {phrase_strategy}\n\n"
- f"Current title:\n{current_title.strip()}\n\n"
- f"Page content excerpt (context only; do not paste into title):\n{(body_excerpt or '')[:900]}\n\n"
- f"{competitor_title_hint}\n\n"
- "Constraints:\n"
- "1) Length about 35–60 characters when possible; hard max 88 characters.\n"
- "2) One line; no line breaks; title case or sentence case per language norms.\n"
- "3) Natural human phrasing; no keyword stuffing; each important term at most once unless grammar needs it.\n"
- "4) Do not add unrelated entities or claims.\n"
- "5) Rationale: one short sentence.\n"
- )
- payload = {
- "model": model,
- "temperature": float(max(0.0, min(1.2, temperature))),
- "messages": [
- {"role": "system", "content": system_msg},
- {"role": "user", "content": user_msg},
- ],
- "response_format": {"type": "json_object"},
- }
- headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
- response = requests.post(endpoint, headers=headers, json=payload, timeout=60)
- response.raise_for_status()
- data = response.json()
- content = (
- data.get("choices", [{}])[0]
- .get("message", {})
- .get("content", "")
- )
- parsed = _extract_json_object(content)
- edited = ""
- rationale = ""
- if parsed:
- edited = str(parsed.get("edited_text") or parsed.get("title") or "").strip()
- rationale = str(parsed.get("rationale") or "").strip()
- if not edited:
- raise ValueError("LLM returned invalid JSON title payload.")
- return {
- "edited_text": edited,
- "rationale": rationale,
- "prompt_debug": {
- "operation": "title_rewrite",
- "cascade_level": cascade_level,
- "goal_type": "title",
- "goal_label": "title alignment",
- "focus_terms": focus_terms,
- "avoid_terms": avoid_terms,
- "phrase_strategy_mode": strategy_mode,
- "phrase_strategy": phrase_strategy,
- "chunk_text": current_title.strip(),
- "context_before": (body_excerpt or "")[:500],
- "context_after": competitor_title_hint[:500] if competitor_title_hint else "",
- "temperature": float(max(0.0, min(1.2, temperature))),
- },
- }
-
-
def _replace_span(sentences: List[str], start_idx: int, end_idx: int, replacement_text: str) -> List[str]:
replacement = _split_sentences(replacement_text)
if not replacement:
@@ -1656,16 +772,7 @@ def _goal_improved(
if goal_type == "semantic":
return next_metrics["semantic_gap_count"] < prev_metrics["semantic_gap_count"]
if goal_type == "ngram":
- return (
- next_metrics["ngram_signal_count"] < prev_metrics["ngram_signal_count"]
- or float(next_metrics.get("ngram_gap_sum", 0.0)) < float(prev_metrics.get("ngram_gap_sum", 0.0))
- )
- if goal_type == "title":
- pt = prev_metrics.get("title_bert_score")
- nt = next_metrics.get("title_bert_score")
- if pt is None or nt is None:
- return False
- return float(nt) > float(pt) or float(nt) >= TITLE_TARGET_THRESHOLD
+ return next_metrics["ngram_signal_count"] < prev_metrics["ngram_signal_count"]
return next_metrics["score"] > prev_metrics["score"]
@@ -1676,120 +783,6 @@ def _bert_phrase_delta(goal_label: str, prev_metrics: Dict[str, Any], next_metri
return round(next_phrase - prev_phrase, 4)
-def _safe_delta(prev_metrics: Dict[str, Any], next_metrics: Dict[str, Any], key: str) -> float:
- try:
- return float(next_metrics.get(key, 0.0)) - float(prev_metrics.get(key, 0.0))
- except Exception:
- return 0.0
-
-
-def _stage_primary_progress(stage: str, prev_metrics: Dict[str, Any], next_metrics: Dict[str, Any]) -> bool:
- if stage == "bert":
- prev_low = int(prev_metrics.get("bert_low_count", 0))
- next_low = int(next_metrics.get("bert_low_count", 0))
- if next_low < prev_low:
- return True
- prev_max = max([0.0] + [float(v) for v in (prev_metrics.get("bert_phrase_scores") or {}).values()])
- next_max = max([0.0] + [float(v) for v in (next_metrics.get("bert_phrase_scores") or {}).values()])
- return (next_max - prev_max) >= BERT_GOAL_DELTA_MIN
- if stage == "bm25":
- return int(next_metrics.get("bm25_remove_count", 0)) < int(prev_metrics.get("bm25_remove_count", 0))
- if stage == "semantic":
- return (
- int(next_metrics.get("semantic_gap_count", 0)) < int(prev_metrics.get("semantic_gap_count", 0))
- or float(next_metrics.get("semantic_gap_sum", 0.0)) < float(prev_metrics.get("semantic_gap_sum", 0.0))
- )
- if stage == "ngram":
- return (
- int(next_metrics.get("ngram_signal_count", 0)) < int(prev_metrics.get("ngram_signal_count", 0))
- or float(next_metrics.get("ngram_gap_sum", 0.0)) < float(prev_metrics.get("ngram_gap_sum", 0.0))
- )
- if stage == "title":
- pv = prev_metrics.get("title_bert_score")
- nv = next_metrics.get("title_bert_score")
- if pv is None or nv is None:
- return False
- return float(nv) > float(pv)
- return False
-
-
-def _is_stage_complete(stage: str, metrics: Dict[str, Any], bert_stage_target: float = BERT_TARGET_THRESHOLD) -> bool:
- if stage == "bert":
- # Complete only when all tracked BERT phrase scores meet the threshold.
- # This enforces per-phrase target behavior (no early exit on one strong phrase).
- scores = [float(v) for v in (metrics.get("bert_phrase_scores") or {}).values()]
- if not scores:
- return True
- return min(scores) >= float(bert_stage_target)
- if stage == "bm25":
- return int(metrics.get("bm25_remove_count", 0)) <= 3
- if stage == "semantic":
- return int(metrics.get("semantic_gap_count", 0)) <= 0
- if stage == "ngram":
- return int(metrics.get("ngram_signal_count", 0)) <= 0
- if stage == "title":
- score = metrics.get("title_bert_score")
- return (score is None) or (float(score) >= TITLE_TARGET_THRESHOLD)
- return True
-
-
-def _advance_ngram_term_cursor(cursor_state: Dict[str, Dict[str, int]], stage_key: str) -> None:
- state = cursor_state.get(stage_key) or {"term_index": 0, "attempt_count": 0}
- attempts = int(state.get("attempt_count", 0)) + 1
- term_index = int(state.get("term_index", 0))
- if attempts >= NGRAM_ATTEMPTS_PER_TERM:
- term_index += 1
- attempts = 0
- cursor_state[stage_key] = {"term_index": term_index, "attempt_count": attempts}
-
-
-def _candidate_utility(
- *,
- prev_metrics: Dict[str, Any],
- next_metrics: Dict[str, Any],
- goal_type: str,
- goal_label: str,
- bert_phrase_delta: float,
- chunk_goal_delta: float,
- local_chunk_improved: bool,
-) -> float:
- score_delta = _safe_delta(prev_metrics, next_metrics, "score")
- bm25_delta = _safe_delta(prev_metrics, next_metrics, "bm25_remove_count")
- bert_low_delta = _safe_delta(prev_metrics, next_metrics, "bert_low_count")
- ngram_delta = _safe_delta(prev_metrics, next_metrics, "ngram_signal_count")
- sem_gap_delta = _safe_delta(prev_metrics, next_metrics, "semantic_gap_count")
- title_delta = _safe_delta(prev_metrics, next_metrics, "title_bert_score")
-
- # Dynamic emphasis:
- # - if target phrase is still far from threshold, prioritize phrase-level BERT gains
- # - but keep non-BERT regressions as penalties to preserve future optimization capacity
- key = (goal_label or "").strip().lower()
- prev_phrase = float((prev_metrics.get("bert_phrase_scores") or {}).get(key, 0.0))
- bert_push_mode = (goal_type == "bert" and prev_phrase < BERT_TARGET_THRESHOLD)
-
- w_phrase = 7.5 if bert_push_mode else 3.0
- w_chunk = 1.6 if bert_push_mode else 1.0
- w_score = 1.0
-
- utility = (
- (w_phrase * float(bert_phrase_delta))
- + (w_chunk * float(chunk_goal_delta))
- + (w_score * float(score_delta))
- )
- if local_chunk_improved:
- utility += 0.3
-
- # Cross-metric guardrails as soft penalties (in addition to hard validity checks).
- utility -= max(0.0, bm25_delta) * 1.8
- utility -= max(0.0, bert_low_delta) * 2.4
- utility -= max(0.0, ngram_delta) * 0.6
- utility -= max(0.0, sem_gap_delta) * 1.5
- utility += min(0.0, title_delta) * 1.2
- if goal_type == "title":
- utility += max(0.0, title_delta) * 5.0
- return round(float(utility), 4)
-
-
def _is_candidate_valid(
prev_metrics: Dict[str, Any],
next_metrics: Dict[str, Any],
@@ -1822,9 +815,8 @@ def _is_candidate_valid(
prev_title = prev_metrics.get("title_bert_score")
next_title = next_metrics.get("title_bert_score")
- if goal_type != "title":
- if prev_title is not None and next_title is not None and next_title < (prev_title - cfg["max_title_drop"]):
- reasons.append("title_bert_drop")
+ if prev_title is not None and next_title is not None and next_title < (prev_title - cfg["max_title_drop"]):
+ reasons.append("title_bert_drop")
improved = _goal_improved(goal_type, goal_label, prev_metrics, next_metrics)
@@ -1835,11 +827,7 @@ def _is_candidate_valid(
return (len(reasons) == 0), reasons, improved
-def optimize_text(
- request_data: Dict[str, Any],
- progress_callback: Optional[Any] = None,
- cancel_event: Optional[Any] = None,
-) -> Dict[str, Any]:
+def optimize_text(request_data: Dict[str, Any]) -> Dict[str, Any]:
target_text = str(request_data.get("target_text", "")).strip()
competitors = [str(x) for x in (request_data.get("competitors") or []) if str(x).strip()]
keywords = [str(x) for x in (request_data.get("keywords") or []) if str(x).strip()]
@@ -1847,18 +835,6 @@ def optimize_text(
target_title = str(request_data.get("target_title", "") or "")
competitor_titles = [str(x) for x in (request_data.get("competitor_titles") or [])]
- diff_mode_used = str(request_data.get("diff_mode", "diff_from_input") or "diff_from_input").strip().lower()
- if diff_mode_used not in {"diff_from_input", "diff_from_original"}:
- diff_mode_used = "diff_from_input"
- diff_base_text = target_text
- diff_base_title = target_title
- if diff_mode_used == "diff_from_original":
- diff_base_text = str(request_data.get("original_target_text") or target_text or "").strip()
- diff_base_title = str(request_data.get("original_target_title") or target_title or "").strip()
- else:
- diff_base_text = (target_text or "").strip()
- diff_base_title = (target_title or "").strip()
-
api_key = str(request_data.get("api_key", "")).strip()
if not api_key:
raise ValueError("API key is required.")
@@ -1870,339 +846,122 @@ def optimize_text(
candidates_per_iteration = max(1, min(5, candidates_per_iteration))
temperature = float(request_data.get("temperature", 0.25) or 0.25)
optimization_mode = str(request_data.get("optimization_mode", "balanced") or "balanced")
- phrase_strategy_mode = str(request_data.get("phrase_strategy_mode", "auto") or "auto").strip().lower()
- if phrase_strategy_mode not in {"auto", "exact_preferred", "distributed_preferred", "ensemble"}:
- phrase_strategy_mode = "auto"
- bert_stage_target = float(request_data.get("bert_stage_target", BERT_TARGET_THRESHOLD) or BERT_TARGET_THRESHOLD)
- bert_stage_target = max(0.0, min(1.0, bert_stage_target))
- stage_goal_overrides = request_data.get("stage_goal_overrides") or {}
- if not isinstance(stage_goal_overrides, dict):
- stage_goal_overrides = {}
- req_enabled_stages = request_data.get("enabled_stages") or []
- active_stage_order: List[str] = []
- if isinstance(req_enabled_stages, list):
- for x in req_enabled_stages:
- st = _normalize_stage_name(x)
- if st and st not in active_stage_order:
- active_stage_order.append(st)
- if not active_stage_order:
- active_stage_order = list(STAGE_ORDER)
baseline_analysis = _build_analysis_snapshot(
target_text, competitors, keywords, language, target_title, competitor_titles
)
baseline_semantic = _build_semantic_snapshot(target_text, competitors, language)
- baseline_metrics = _compute_metrics(
- baseline_analysis, baseline_semantic, keywords, language, bert_stage_target=bert_stage_target
- )
-
- # Per-goal iteration budget scales with deficit; total loop steps = sum(effective iters per goal).
- baseline_goal_counts = {
- st: len(
- _collect_optimization_goals(
- baseline_analysis,
- baseline_semantic,
- keywords,
- language,
- stage=st,
- bert_stage_target=bert_stage_target,
- stage_goal_overrides=stage_goal_overrides,
- )
- )
- for st in active_stage_order
- }
- ngram_row_count = int(baseline_goal_counts.get("ngram", 0))
- total_loop_steps = _estimate_total_loop_budget(
- baseline_analysis,
- baseline_semantic,
- keywords,
- language,
- max_iterations,
- candidates_per_iteration,
- bert_stage_target,
- active_stage_order=active_stage_order,
- stage_goal_overrides=stage_goal_overrides,
- )
+ baseline_metrics = _compute_metrics(baseline_analysis, baseline_semantic, keywords, language)
current_text = target_text
- current_title = (target_title or "").strip()
current_analysis = baseline_analysis
current_semantic = baseline_semantic
current_metrics = baseline_metrics
logs: List[Dict[str, Any]] = []
applied_changes = 0
-
- def _emit(ev: str, **kwargs: Any) -> None:
- if progress_callback:
- try:
- progress_callback({"event": ev, **kwargs})
- except Exception:
- pass
-
- def _cancelled() -> bool:
- return cancel_event is not None and getattr(cancel_event, "is_set", lambda: False)()
-
- def _pack_result(stopped_early: bool = False, stop_reason: str = "") -> Dict[str, Any]:
- # Title string must match what last metrics used (title_analysis.target_title), not only the mutable var.
- _ta = (current_analysis or {}).get("title_analysis") or {}
- _ot = ""
- if isinstance(_ta, dict) and (_ta.get("target_title") or "").strip():
- _ot = str(_ta.get("target_title")).strip()
- if not _ot:
- _ot = (current_title or "").strip()
- if not _ot:
- _ot = (target_title or "").strip()
-
- diff_body_html = ""
- diff_changes: List[Dict[str, str]] = []
- if (diff_base_text or "").strip() != (current_text or "").strip():
- dh, dc = _diff_sentences_html(diff_base_text or "", current_text or "")
- if dc:
- diff_body_html = dh
- diff_changes = dc
-
- diff_title_html = ""
- diff_title_changes: List[Dict[str, str]] = []
- if (diff_base_title or "").strip() != (_ot or "").strip():
- dth, dtc = _diff_title_html(diff_base_title or "", _ot or "")
- if dtc:
- diff_title_html = dth
- diff_title_changes = dtc
- return {
- "ok": True,
- "optimized_text": current_text,
- "optimized_title": _ot,
- "baseline_metrics": baseline_metrics,
- "final_metrics": current_metrics,
- "iterations": logs,
- "applied_changes": applied_changes,
- "optimization_mode": optimization_mode,
- "phrase_strategy_mode": phrase_strategy_mode,
- "bert_stage_target": round(bert_stage_target, 4),
- "diff_mode": diff_mode_used,
- "diff_body_html": diff_body_html,
- "diff_title_html": diff_title_html,
- "diff_changes": diff_changes,
- "diff_title_changes": diff_title_changes,
- "stopped_early": stopped_early,
- "stop_reason": stop_reason,
- }
-
- _emit("preparing", message="Подготовка", phase="baseline")
- _emit(
- "started",
- total_steps=total_loop_steps,
- max_iterations_setting=max_iterations,
- ngram_targets=ngram_row_count,
- stages_order=list(active_stage_order),
- )
-
seen_candidate_rewrites = set()
cascade_level = 1
consecutive_failures = 0
goal_attempt_cursor: Dict[str, int] = {}
attempted_spans = set()
queued_candidates: List[Dict[str, Any]] = []
- stage_idx = 0
- stage_no_progress_steps = 0
- stage_goal_cursor: Dict[str, Dict[str, int]] = {}
-
- for step in range(total_loop_steps):
- if _cancelled():
- logs.append(
- {
- "step": step + 1,
- "status": "stopped",
- "reason": "Остановка пользователем (сохранён текущий текст и метрики).",
- }
- )
- return _pack_result(stopped_early=True, stop_reason="user_cancelled")
-
- while stage_idx < len(active_stage_order) and _is_stage_complete(
- active_stage_order[stage_idx], current_metrics, bert_stage_target=bert_stage_target
- ):
- stage_idx += 1
- stage_no_progress_steps = 0
- if stage_idx >= len(active_stage_order):
- logs.append({"step": step + 1, "status": "stopped", "reason": "All optimization stages completed."})
- break
-
- active_stage = active_stage_order[stage_idx]
- goals_for_stage = _collect_optimization_goals(
- current_analysis,
- current_semantic,
- keywords,
- language,
- stage=active_stage,
- bert_stage_target=bert_stage_target,
- stage_goal_overrides=stage_goal_overrides,
- )
- state = stage_goal_cursor.get(active_stage) or {"goal_index": 0, "attempt_count": 0}
- goal_index = int(state.get("goal_index", 0))
- attempt_count = int(state.get("attempt_count", 0))
-
- # Advance across goals that exhausted per-goal iteration budget (scaled by deficit).
- while goal_index < len(goals_for_stage):
- g_try = goals_for_stage[goal_index]
- eff_max_iter, _ = _per_goal_budget(g_try, max_iterations, candidates_per_iteration, bert_stage_target)
- if attempt_count < eff_max_iter:
- break
- goal_index += 1
- attempt_count = 0
-
- if goal_index >= len(goals_for_stage):
- stage_idx += 1
- stage_no_progress_steps = 0
- logs.append(
- {
- "step": step + 1,
- "status": "stage_skipped",
- "stage": active_stage,
- "reason": f"All goals exhausted for stage '{active_stage}' (per-goal iteration budget).",
- }
- )
- stage_goal_cursor[active_stage] = {"goal_index": goal_index, "attempt_count": attempt_count}
- continue
- goal = goals_for_stage[goal_index]
- eff_max_iter, eff_cand = _per_goal_budget(goal, max_iterations, candidates_per_iteration, bert_stage_target)
- attempt_count += 1
- stage_goal_cursor[active_stage] = {"goal_index": goal_index, "attempt_count": attempt_count}
+ for step in range(max_iterations):
+ goal = _choose_optimization_goal(current_analysis, current_semantic, keywords, language)
if goal["type"] == "none":
- stage_idx += 1
- stage_no_progress_steps = 0
- logs.append(
- {
- "step": step + 1,
- "status": "stage_skipped",
- "stage": active_stage,
- "reason": f"No actionable goals for stage '{active_stage}', moving to next stage.",
- }
- )
- continue
+ logs.append({"step": step + 1, "status": "stopped", "reason": "No optimization goals left."})
+ break
- _emit(
- "step_start",
- step=step + 1,
- total_steps=total_loop_steps,
- active_stage=active_stage,
- goal_type=goal.get("type"),
- goal_label=goal.get("label"),
- score=current_metrics.get("score"),
- goal_budget_iter=eff_max_iter,
- goal_budget_candidates=eff_cand,
- )
+ sentences = _split_sentences(current_text)
+ if not sentences:
+ logs.append({"step": step + 1, "status": "stopped", "reason": "No sentences available for editing."})
+ break
goal_key = f"{goal.get('type', '')}:{goal.get('label', '')}".strip().lower()
base_attempt_cursor = int(goal_attempt_cursor.get(goal_key, 0))
+ span_trials = 2 if cascade_level <= 2 else 3
+ local_candidates = candidates_per_iteration if cascade_level <= 2 else min(6, candidates_per_iteration + 1)
candidates: List[Dict[str, Any]] = []
chosen_spans: List[Dict[str, Any]] = []
candidate_idx = 0
- span_trials_eff = 1
- if goal.get("type") == "title":
- if not current_title:
- logs.append(
- {
- "step": step + 1,
- "status": "stage_skipped",
- "stage": active_stage,
- "reason": "Поле Title пустое — этап title пропущен.",
- }
+ for st in range(span_trials):
+ attempt_cursor = base_attempt_cursor + st
+ operation, span_start, span_end, sent_idx, span_variant = _choose_edit_span(
+ sentences, goal, language, cascade_level, attempt_cursor
+ )
+ max_span_retries = max(1, len(sentences) * 4)
+ retries = 0
+ while retries < max_span_retries:
+ span_key = (goal_key, cascade_level, operation, span_start, span_end)
+ if span_key not in attempted_spans:
+ attempted_spans.add(span_key)
+ break
+ attempt_cursor += 1
+ operation, span_start, span_end, sent_idx, span_variant = _choose_edit_span(
+ sentences, goal, language, cascade_level, attempt_cursor
)
- stage_idx += 1
- continue
+ retries += 1
- chosen_spans = [
+ original_span_text = " ".join(sentences[span_start : span_end + 1]).strip()
+ context_before = " ".join(sentences[max(0, span_start - 2) : span_start]).strip()
+ context_after = " ".join(sentences[span_end + 1 : min(len(sentences), span_end + 3)]).strip()
+ chosen_spans.append(
{
- "operation": "title_rewrite",
- "span_start": 0,
- "span_end": 0,
- "sentence_index": 0,
- "span_variant": 0,
- "sentence_before": current_title,
+ "operation": operation,
+ "span_start": span_start,
+ "span_end": span_end,
+ "sentence_index": sent_idx,
+ "span_variant": span_variant,
+ "sentence_before": original_span_text,
}
- ]
- original_span_text = current_title
- body_excerpt = (current_text or "").strip()[:1200]
- comp_ctx = " | ".join(t.strip() for t in competitor_titles[:4] if t.strip())
- competitor_title_hint = (
- f"Примеры title конкурентов (только стиль/длина): {comp_ctx}" if comp_ctx else ""
- )
- strategy_plan = _build_phrase_strategy_plan(
- phrase_strategy_mode,
- "title",
- str(goal.get("label", "")),
- eff_cand,
)
- for strategy_variant in strategy_plan:
+
+ per_span_candidates = max(1, local_candidates // span_trials)
+ for ci in range(per_span_candidates):
candidate_idx += 1
temp = min(1.1, max(0.0, temperature + (candidate_idx - 1) * 0.07))
- if _cancelled():
- logs.append(
- {
- "step": step + 1,
- "status": "stopped",
- "reason": "Остановка пользователем перед запросом к LLM.",
- }
- )
- return _pack_result(stopped_early=True, stop_reason="user_cancelled")
- _emit(
- "llm_call",
- candidate_index=candidate_idx,
- span_trial=1,
- span_trials=1,
- strategy=strategy_variant,
- )
try:
- llm_result = _llm_edit_title(
+ llm_result = _llm_edit_chunk(
api_key=api_key,
base_url=base_url,
model=model,
language=language,
- current_title=current_title,
- body_excerpt=body_excerpt,
- competitor_title_hint=competitor_title_hint,
- focus_terms=goal.get("focus_terms", []) or [],
- avoid_terms=goal.get("avoid_terms", []) or [],
- keywords=keywords,
+ full_text=current_text,
+ chunk_text=original_span_text,
+ operation=operation,
+ context_before=context_before,
+ context_after=context_after,
cascade_level=cascade_level,
+ goal_type=goal["type"],
+ goal_label=goal["label"],
+ focus_terms=goal["focus_terms"],
+ avoid_terms=goal["avoid_terms"],
temperature=temp,
- phrase_strategy_mode=strategy_variant,
)
edited_text = str((llm_result or {}).get("edited_text", "")).strip()
- llm_rationale = str((llm_result or {}).get("rationale", "")).strip()
prompt_debug = (llm_result or {}).get("prompt_debug", {})
- if not edited_text or edited_text == original_span_text.strip():
+ if not edited_text or edited_text == original_span_text:
continue
- quality_issues = _validate_title_candidate(edited_text)
- cand_analysis = _build_analysis_snapshot(
- current_text, competitors, keywords, language, edited_text, competitor_titles
+ quality_issues = _validate_candidate_text(edited_text, cascade_level, operation)
+ before_rel, after_rel = _chunk_relevance_pair(
+ original_span_text,
+ edited_text,
+ goal["type"],
+ goal["label"],
+ goal.get("focus_terms", []) or [],
+ language,
)
- cand_semantic = _build_semantic_snapshot(current_text, competitors, language)
- cand_metrics = _compute_metrics(
- cand_analysis, cand_semantic, keywords, language, bert_stage_target=bert_stage_target
- )
- before_rel = float(current_metrics.get("title_bert_score") or 0.0)
- after_rel = float(cand_metrics.get("title_bert_score") or 0.0)
- chunk_delta = round(after_rel - before_rel, 4)
- local_chunk_improved = chunk_delta >= _min_chunk_delta("title")
- bert_phrase_delta = 0.0
- valid, invalid_reasons, goal_improved = _is_candidate_valid(
- current_metrics, cand_metrics, goal["type"], goal["label"], optimization_mode
- )
- delta_score = round(cand_metrics["score"] - current_metrics["score"], 3)
- candidate_utility = _candidate_utility(
- prev_metrics=current_metrics,
- next_metrics=cand_metrics,
- goal_type=str(goal.get("type", "")),
- goal_label=str(goal.get("label", "")),
- bert_phrase_delta=bert_phrase_delta,
- chunk_goal_delta=chunk_delta,
- local_chunk_improved=local_chunk_improved,
+ chunk_delta = _chunk_goal_delta(
+ original_span_text,
+ edited_text,
+ goal["type"],
+ goal["label"],
+ goal.get("focus_terms", []) or [],
+ language,
)
- md = _metrics_delta(current_metrics, cand_metrics)
+ local_chunk_improved = chunk_delta >= _min_chunk_delta(goal["type"])
if quality_issues:
candidates.append(
@@ -2221,19 +980,17 @@ def optimize_text(
"chunk_relevance_after": after_rel,
"term_diff": _term_diff(original_span_text, edited_text, language),
"llm_prompt_debug": prompt_debug,
- "llm_rationale": llm_rationale,
- "operation": "title_rewrite",
- "sentence_index": 0,
- "span_start": 0,
- "span_end": 0,
- "span_variant": 0,
- "phrase_strategy_used": strategy_variant,
+ "operation": operation,
+ "sentence_index": sent_idx,
+ "span_start": span_start,
+ "span_end": span_end,
+ "span_variant": span_variant,
"sentence_before": original_span_text,
}
)
continue
- candidate_key = ("title_rewrite", 0, 0, edited_text.strip().lower())
+ candidate_key = (operation, span_start, span_end, edited_text.strip().lower())
if candidate_key in seen_candidate_rewrites:
candidates.append(
{
@@ -2250,33 +1007,45 @@ def optimize_text(
"chunk_relevance_after": after_rel,
"term_diff": _term_diff(original_span_text, edited_text, language),
"llm_prompt_debug": prompt_debug,
- "llm_rationale": llm_rationale,
- "operation": "title_rewrite",
- "sentence_index": 0,
- "span_start": 0,
- "span_end": 0,
- "span_variant": 0,
- "phrase_strategy_used": strategy_variant,
+ "operation": operation,
+ "sentence_index": sent_idx,
+ "span_start": span_start,
+ "span_end": span_end,
+ "span_variant": span_variant,
"sentence_before": original_span_text,
}
)
continue
seen_candidate_rewrites.add(candidate_key)
- candidate_text = current_text
+ if operation == "insert":
+ candidate_sentences = _insert_after(sentences, span_end, edited_text)
+ else:
+ candidate_sentences = _replace_span(sentences, span_start, span_end, edited_text)
+ candidate_text = " ".join(candidate_sentences).strip()
+
+ cand_analysis = _build_analysis_snapshot(
+ candidate_text, competitors, keywords, language, target_title, competitor_titles
+ )
+ cand_semantic = _build_semantic_snapshot(candidate_text, competitors, language)
+ cand_metrics = _compute_metrics(cand_analysis, cand_semantic, keywords, language)
+ valid, invalid_reasons, goal_improved = _is_candidate_valid(
+ current_metrics, cand_metrics, goal["type"], goal["label"], optimization_mode
+ )
+ delta_score = round(cand_metrics["score"] - current_metrics["score"], 3)
+ bert_phrase_delta = _bert_phrase_delta(goal["label"], current_metrics, cand_metrics) if goal.get("type") == "bert" else 0.0
+ md = _metrics_delta(current_metrics, cand_metrics)
candidates.append(
{
"candidate_index": candidate_idx,
"sentence_before": original_span_text,
"sentence_after": edited_text,
- "operation": "title_rewrite",
- "sentence_index": 0,
- "span_start": 0,
- "span_end": 0,
- "span_variant": 0,
- "phrase_strategy_used": strategy_variant,
+ "operation": operation,
+ "sentence_index": sent_idx,
+ "span_start": span_start,
+ "span_end": span_end,
+ "span_variant": span_variant,
"text": candidate_text,
- "new_title": edited_text,
"analysis": cand_analysis,
"semantic": cand_semantic,
"metrics": cand_metrics,
@@ -2289,16 +1058,14 @@ def optimize_text(
"chunk_relevance_after": after_rel,
"term_diff": _term_diff(original_span_text, edited_text, language),
"llm_prompt_debug": prompt_debug,
- "llm_rationale": llm_rationale,
"invalid_reasons": invalid_reasons,
"delta_score": delta_score,
"candidate_score": cand_metrics.get("score"),
- "candidate_utility": candidate_utility,
"metrics_delta": md,
"edit_payload": {
- "operation": "title_rewrite",
- "span_start": 0,
- "span_end": 0,
+ "operation": operation,
+ "span_start": span_start,
+ "span_end": span_end,
"edited_text": edited_text,
},
}
@@ -2316,299 +1083,21 @@ def optimize_text(
"delta_score": -999.0,
"candidate_score": None,
"llm_prompt_debug": {
- "operation": "title_rewrite",
+ "operation": operation,
"cascade_level": cascade_level,
"goal_type": goal.get("type"),
"goal_label": goal.get("label"),
- "phrase_strategy_mode": strategy_variant,
},
- "llm_rationale": "",
- "operation": "title_rewrite",
- "sentence_index": 0,
- "span_start": 0,
- "span_end": 0,
- "span_variant": 0,
- "phrase_strategy_used": strategy_variant,
- "sentence_before": current_title,
+ "operation": operation,
+ "sentence_index": sent_idx,
+ "span_start": span_start,
+ "span_end": span_end,
+ "span_variant": span_variant,
+ "sentence_before": original_span_text,
}
)
- else:
- sentences = _split_sentences(current_text)
- if not sentences:
- logs.append({"step": step + 1, "status": "stopped", "reason": "No sentences available for editing."})
- break
-
- span_trials = 2 if cascade_level <= 2 else 3
- local_candidates = eff_cand if cascade_level <= 2 else min(6, eff_cand + 1)
- span_trials_eff = span_trials
-
- for st in range(span_trials):
- attempt_cursor = base_attempt_cursor + st
- operation, span_start, span_end, sent_idx, span_variant = _choose_edit_span(
- sentences, goal, language, cascade_level, attempt_cursor
- )
- max_span_retries = max(1, len(sentences) * 4)
- retries = 0
- while retries < max_span_retries:
- span_key = (goal_key, cascade_level, operation, span_start, span_end)
- if span_key not in attempted_spans:
- attempted_spans.add(span_key)
- break
- attempt_cursor += 1
- operation, span_start, span_end, sent_idx, span_variant = _choose_edit_span(
- sentences, goal, language, cascade_level, attempt_cursor
- )
- retries += 1
-
- original_span_text = " ".join(sentences[span_start : span_end + 1]).strip()
- context_before = " ".join(sentences[max(0, span_start - 2) : span_start]).strip()
- context_after = " ".join(sentences[span_end + 1 : min(len(sentences), span_end + 3)]).strip()
- chosen_spans.append(
- {
- "operation": operation,
- "span_start": span_start,
- "span_end": span_end,
- "sentence_index": sent_idx,
- "span_variant": span_variant,
- "sentence_before": original_span_text,
- }
- )
-
- per_span_candidates = max(1, local_candidates // span_trials)
- strategy_plan = _build_phrase_strategy_plan(
- phrase_strategy_mode,
- str(goal.get("type", "")),
- str(goal.get("label", "")),
- per_span_candidates,
- )
- for strategy_variant in strategy_plan:
- candidate_idx += 1
- temp = min(1.1, max(0.0, temperature + (candidate_idx - 1) * 0.07))
- if _cancelled():
- logs.append(
- {
- "step": step + 1,
- "status": "stopped",
- "reason": "Остановка пользователем перед запросом к LLM.",
- }
- )
- return _pack_result(stopped_early=True, stop_reason="user_cancelled")
- _emit(
- "llm_call",
- candidate_index=candidate_idx,
- span_trial=st + 1,
- span_trials=span_trials,
- strategy=strategy_variant,
- )
- try:
- llm_result = _llm_edit_chunk(
- api_key=api_key,
- base_url=base_url,
- model=model,
- language=language,
- full_text=current_text,
- chunk_text=original_span_text,
- operation=operation,
- context_before=context_before,
- context_after=context_after,
- cascade_level=cascade_level,
- goal_type=goal["type"],
- goal_label=goal["label"],
- focus_terms=goal["focus_terms"],
- avoid_terms=goal["avoid_terms"],
- temperature=temp,
- phrase_strategy_mode=strategy_variant,
- )
- edited_text = str((llm_result or {}).get("edited_text", "")).strip()
- llm_rationale = str((llm_result or {}).get("rationale", "")).strip()
- prompt_debug = (llm_result or {}).get("prompt_debug", {})
- if not edited_text or edited_text == original_span_text:
- continue
-
- quality_issues = _validate_candidate_text(
- edited_text,
- cascade_level,
- operation,
- goal_label=goal.get("label", ""),
- focus_terms=goal.get("focus_terms", []) or [],
- )
- before_rel, after_rel = _chunk_relevance_pair(
- original_span_text,
- edited_text,
- goal["type"],
- goal["label"],
- goal.get("focus_terms", []) or [],
- language,
- goal,
- )
- chunk_delta = _chunk_goal_delta(
- original_span_text,
- edited_text,
- goal["type"],
- goal["label"],
- goal.get("focus_terms", []) or [],
- language,
- goal,
- )
- local_chunk_improved = chunk_delta >= _min_chunk_delta(goal["type"])
-
- if quality_issues:
- candidates.append(
- {
- "candidate_index": candidate_idx,
- "error": "quality_validation_failed",
- "valid": False,
- "goal_improved": False,
- "local_chunk_improved": local_chunk_improved,
- "chunk_goal_delta": chunk_delta,
- "invalid_reasons": quality_issues,
- "delta_score": -999.0,
- "candidate_score": None,
- "sentence_after": edited_text,
- "chunk_relevance_before": before_rel,
- "chunk_relevance_after": after_rel,
- "term_diff": _term_diff(original_span_text, edited_text, language),
- "llm_prompt_debug": prompt_debug,
- "llm_rationale": llm_rationale,
- "operation": operation,
- "sentence_index": sent_idx,
- "span_start": span_start,
- "span_end": span_end,
- "span_variant": span_variant,
- "phrase_strategy_used": strategy_variant,
- "sentence_before": original_span_text,
- }
- )
- continue
-
- candidate_key = (operation, span_start, span_end, edited_text.strip().lower())
- if candidate_key in seen_candidate_rewrites:
- candidates.append(
- {
- "candidate_index": candidate_idx,
- "error": "duplicate_candidate_rewrite",
- "valid": False,
- "goal_improved": False,
- "local_chunk_improved": local_chunk_improved,
- "chunk_goal_delta": chunk_delta,
- "invalid_reasons": ["duplicate_candidate_rewrite"],
- "delta_score": -999.0,
- "candidate_score": None,
- "chunk_relevance_before": before_rel,
- "chunk_relevance_after": after_rel,
- "term_diff": _term_diff(original_span_text, edited_text, language),
- "llm_prompt_debug": prompt_debug,
- "llm_rationale": llm_rationale,
- "operation": operation,
- "sentence_index": sent_idx,
- "span_start": span_start,
- "span_end": span_end,
- "span_variant": span_variant,
- "phrase_strategy_used": strategy_variant,
- "sentence_before": original_span_text,
- }
- )
- continue
- seen_candidate_rewrites.add(candidate_key)
-
- if operation == "insert":
- candidate_sentences = _insert_after(sentences, span_end, edited_text)
- else:
- candidate_sentences = _replace_span(sentences, span_start, span_end, edited_text)
- candidate_text = " ".join(candidate_sentences).strip()
-
- cand_analysis = _build_analysis_snapshot(
- candidate_text, competitors, keywords, language, current_title, competitor_titles
- )
- cand_semantic = _build_semantic_snapshot(candidate_text, competitors, language)
- cand_metrics = _compute_metrics(
- cand_analysis, cand_semantic, keywords, language, bert_stage_target=bert_stage_target
- )
- valid, invalid_reasons, goal_improved = _is_candidate_valid(
- current_metrics, cand_metrics, goal["type"], goal["label"], optimization_mode
- )
- delta_score = round(cand_metrics["score"] - current_metrics["score"], 3)
- bert_phrase_delta = _bert_phrase_delta(goal["label"], current_metrics, cand_metrics) if goal.get("type") == "bert" else 0.0
- candidate_utility = _candidate_utility(
- prev_metrics=current_metrics,
- next_metrics=cand_metrics,
- goal_type=str(goal.get("type", "")),
- goal_label=str(goal.get("label", "")),
- bert_phrase_delta=bert_phrase_delta,
- chunk_goal_delta=chunk_delta,
- local_chunk_improved=local_chunk_improved,
- )
- md = _metrics_delta(current_metrics, cand_metrics)
- candidates.append(
- {
- "candidate_index": candidate_idx,
- "sentence_before": original_span_text,
- "sentence_after": edited_text,
- "operation": operation,
- "sentence_index": sent_idx,
- "span_start": span_start,
- "span_end": span_end,
- "span_variant": span_variant,
- "phrase_strategy_used": strategy_variant,
- "text": candidate_text,
- "analysis": cand_analysis,
- "semantic": cand_semantic,
- "metrics": cand_metrics,
- "valid": valid,
- "goal_improved": goal_improved,
- "bert_phrase_delta": bert_phrase_delta,
- "local_chunk_improved": local_chunk_improved,
- "chunk_goal_delta": chunk_delta,
- "chunk_relevance_before": before_rel,
- "chunk_relevance_after": after_rel,
- "term_diff": _term_diff(original_span_text, edited_text, language),
- "llm_prompt_debug": prompt_debug,
- "llm_rationale": llm_rationale,
- "invalid_reasons": invalid_reasons,
- "delta_score": delta_score,
- "candidate_score": cand_metrics.get("score"),
- "candidate_utility": candidate_utility,
- "metrics_delta": md,
- "edit_payload": {
- "operation": operation,
- "span_start": span_start,
- "span_end": span_end,
- "edited_text": edited_text,
- },
- }
- )
- except Exception as e:
- candidates.append(
- {
- "candidate_index": candidate_idx,
- "error": str(e),
- "valid": False,
- "goal_improved": False,
- "local_chunk_improved": False,
- "chunk_goal_delta": -999.0,
- "invalid_reasons": [str(e)],
- "delta_score": -999.0,
- "candidate_score": None,
- "llm_prompt_debug": {
- "operation": operation,
- "cascade_level": cascade_level,
- "goal_type": goal.get("type"),
- "goal_label": goal.get("label"),
- "phrase_strategy_mode": strategy_variant,
- },
- "llm_rationale": "",
- "operation": operation,
- "sentence_index": sent_idx,
- "span_start": span_start,
- "span_end": span_end,
- "span_variant": span_variant,
- "phrase_strategy_used": strategy_variant,
- "sentence_before": original_span_text,
- }
- )
-
- goal_attempt_cursor[goal_key] = base_attempt_cursor + span_trials_eff
+ goal_attempt_cursor[goal_key] = base_attempt_cursor + span_trials
primary_span = chosen_spans[0] if chosen_spans else {"operation": "-", "span_start": 0, "span_end": 0, "sentence_index": 0, "span_variant": 0, "sentence_before": ""}
valid_raw_candidates = [c for c in candidates if c.get("valid")]
@@ -2620,7 +1109,6 @@ def optimize_text(
c.get("goal_improved")
or (goal.get("type") == "bert" and float(c.get("bert_phrase_delta") or 0.0) > 0.0)
or float(c.get("candidate_score") or -1) > float(current_metrics["score"])
- or float(c.get("candidate_utility") or -999.0) > 0.0
)
and (
goal.get("type") != "bert"
@@ -2641,7 +1129,6 @@ def optimize_text(
best_local = sorted(
local_progress_candidates,
key=lambda c: (
- float(c.get("candidate_utility") or -999.0),
float(c.get("chunk_goal_delta") or 0.0),
float(c.get("bert_phrase_delta") or 0.0),
float(c.get("candidate_score") or -999.0),
@@ -2651,18 +1138,9 @@ def optimize_text(
prev_metrics = current_metrics
current_text = best_local["text"]
- if str(best_local.get("operation") or "") == "title_rewrite":
- nt = (best_local.get("new_title") or best_local.get("sentence_after") or "").strip()
- if nt:
- current_title = nt
current_analysis = best_local["analysis"]
current_semantic = best_local["semantic"]
current_metrics = best_local["metrics"]
- progressed_stage = _stage_primary_progress(active_stage, prev_metrics, current_metrics)
- if progressed_stage:
- stage_no_progress_steps = 0
- else:
- stage_no_progress_steps += 1
applied_changes += 1
queued_candidates = []
@@ -2670,7 +1148,6 @@ def optimize_text(
{
"step": step + 1,
"status": "applied_local_progress",
- "stage": active_stage,
"goal": goal,
"cascade_level": cascade_level,
"operation": best_local.get("operation"),
@@ -2688,7 +1165,6 @@ def optimize_text(
"chosen_candidate_index": best_local.get("candidate_index"),
"chosen_chunk_goal_delta": best_local.get("chunk_goal_delta"),
"chosen_bert_phrase_delta": best_local.get("bert_phrase_delta"),
- "chosen_candidate_utility": best_local.get("candidate_utility"),
"chosen_metrics_delta": best_local.get("metrics_delta"),
"candidates": [
{
@@ -2702,9 +1178,7 @@ def optimize_text(
"chunk_relevance_after": c.get("chunk_relevance_after"),
"term_diff": c.get("term_diff"),
"llm_prompt_debug": c.get("llm_prompt_debug"),
- "llm_rationale": c.get("llm_rationale"),
"metrics_delta": c.get("metrics_delta"),
- "candidate_utility": c.get("candidate_utility"),
"invalid_reasons": c.get("invalid_reasons", []),
"delta_score": c.get("delta_score"),
"candidate_score": c.get("candidate_score"),
@@ -2726,11 +1200,9 @@ def optimize_text(
if c.get("local_chunk_improved")
and c.get("edit_payload")
and c.get("candidate_score") is not None
- and goal.get("type") != "title"
]
local_pool.sort(
key=lambda c: (
- float(c.get("candidate_utility") or -999.0),
float(c.get("chunk_goal_delta") or -999.0),
float(c.get("candidate_score") or -999.0),
),
@@ -2749,7 +1221,7 @@ def optimize_text(
batch_applied = False
batch_info: Dict[str, Any] = {}
- if len(queued_candidates) >= 2 and goal.get("type") != "title":
+ if len(queued_candidates) >= 2:
pool = [x["candidate"] for x in queued_candidates[:6]]
combos = _non_conflicting_edit_combos(pool, min_size=2, max_size=4)
best_batch: Optional[Dict[str, Any]] = None
@@ -2761,12 +1233,10 @@ def optimize_text(
batch_sentences = _apply_edits_to_sentences(sentences, edits)
batch_text = " ".join(batch_sentences).strip()
batch_analysis = _build_analysis_snapshot(
- batch_text, competitors, keywords, language, current_title, competitor_titles
+ batch_text, competitors, keywords, language, target_title, competitor_titles
)
batch_semantic = _build_semantic_snapshot(batch_text, competitors, language)
- batch_metrics = _compute_metrics(
- batch_analysis, batch_semantic, keywords, language, bert_stage_target=bert_stage_target
- )
+ batch_metrics = _compute_metrics(batch_analysis, batch_semantic, keywords, language)
b_valid, b_reasons, b_goal = _is_candidate_valid(
current_metrics, batch_metrics, goal["type"], goal["label"], optimization_mode
)
@@ -2802,11 +1272,6 @@ def optimize_text(
current_analysis = best_batch["batch_analysis"]
current_semantic = best_batch["batch_semantic"]
current_metrics = best_batch["batch_metrics"]
- progressed_stage = _stage_primary_progress(active_stage, prev_metrics, current_metrics)
- if progressed_stage:
- stage_no_progress_steps = 0
- else:
- stage_no_progress_steps += 1
applied_changes += 1
batch_applied = True
batch_info = {
@@ -2834,7 +1299,6 @@ def optimize_text(
{
"step": step + 1,
"status": "applied_batch",
- "stage": active_stage,
"goal": goal,
"cascade_level": cascade_level,
"operation": "batch",
@@ -2860,9 +1324,7 @@ def optimize_text(
"chunk_relevance_after": c.get("chunk_relevance_after"),
"term_diff": c.get("term_diff"),
"llm_prompt_debug": c.get("llm_prompt_debug"),
- "llm_rationale": c.get("llm_rationale"),
"metrics_delta": c.get("metrics_delta"),
- "candidate_utility": c.get("candidate_utility"),
"invalid_reasons": c.get("invalid_reasons", []),
"delta_score": c.get("delta_score"),
"candidate_score": c.get("candidate_score"),
@@ -2879,7 +1341,6 @@ def optimize_text(
{
"step": step + 1,
"status": "rejected",
- "stage": active_stage,
"goal": goal,
"cascade_level": cascade_level,
"operation": primary_span.get("operation"),
@@ -2911,9 +1372,7 @@ def optimize_text(
"chunk_relevance_after": c.get("chunk_relevance_after"),
"term_diff": c.get("term_diff"),
"llm_prompt_debug": c.get("llm_prompt_debug"),
- "llm_rationale": c.get("llm_rationale"),
"metrics_delta": c.get("metrics_delta"),
- "candidate_utility": c.get("candidate_utility"),
"invalid_reasons": c.get("invalid_reasons", []),
"delta_score": c.get("delta_score"),
"candidate_score": c.get("candidate_score"),
@@ -2924,8 +1383,6 @@ def optimize_text(
],
}
)
- stage_no_progress_steps += 1
- # Stage transition is controlled by per-stage iteration budget and completion checks.
consecutive_failures += 1
if consecutive_failures >= 2 and cascade_level < 4:
cascade_level += 1
@@ -2937,7 +1394,6 @@ def optimize_text(
valid_candidates,
key=lambda c: (
1 if c.get("goal_improved") else 0,
- float(c.get("candidate_utility") or -999.0),
float(c.get("bert_phrase_delta") or 0.0),
float(c.get("chunk_goal_delta") or 0.0),
c["metrics"]["score"],
@@ -2947,18 +1403,9 @@ def optimize_text(
prev_metrics = current_metrics
current_text = best["text"]
- if str(best.get("operation") or "") == "title_rewrite":
- nt = (best.get("new_title") or best.get("sentence_after") or "").strip()
- if nt:
- current_title = nt
current_analysis = best["analysis"]
current_semantic = best["semantic"]
current_metrics = best["metrics"]
- progressed_stage = _stage_primary_progress(active_stage, prev_metrics, current_metrics)
- if progressed_stage:
- stage_no_progress_steps = 0
- else:
- stage_no_progress_steps += 1
applied_changes += 1
queued_candidates = []
@@ -2966,7 +1413,6 @@ def optimize_text(
{
"step": step + 1,
"status": "applied",
- "stage": active_stage,
"goal": goal,
"cascade_level": cascade_level,
"operation": best.get("operation"),
@@ -2981,7 +1427,6 @@ def optimize_text(
"metrics_after": current_metrics,
"delta_score": round(current_metrics["score"] - prev_metrics["score"], 3),
"chosen_candidate_index": best.get("candidate_index"),
- "chosen_candidate_utility": best.get("candidate_utility"),
"candidates": [
{
"candidate_index": c.get("candidate_index"),
@@ -2994,9 +1439,7 @@ def optimize_text(
"chunk_relevance_after": c.get("chunk_relevance_after"),
"term_diff": c.get("term_diff"),
"llm_prompt_debug": c.get("llm_prompt_debug"),
- "llm_rationale": c.get("llm_rationale"),
"metrics_delta": c.get("metrics_delta"),
- "candidate_utility": c.get("candidate_utility"),
"invalid_reasons": c.get("invalid_reasons", []),
"delta_score": c.get("delta_score"),
"candidate_score": c.get("candidate_score"),
@@ -3012,4 +1455,12 @@ def optimize_text(
cascade_level = 1
goal_attempt_cursor[goal_key] = 0
- return _pack_result()
+ return {
+ "ok": True,
+ "optimized_text": current_text,
+ "baseline_metrics": baseline_metrics,
+ "final_metrics": current_metrics,
+ "iterations": logs,
+ "applied_changes": applied_changes,
+ "optimization_mode": optimization_mode,
+ }