import json import difflib import html as html_lib import re from itertools import combinations from typing import Any, Dict, List, Optional, Tuple import requests import logic import nlp_processor import semantic_graph STOP_WORDS = { "en": {"a", "an", "and", "or", "the", "to", "of", "for", "in", "on", "at", "by", "with", "from", "as", "is", "are", "be", "was", "were"}, "ru": {"и", "или", "в", "во", "на", "по", "с", "со", "к", "ко", "для", "из", "за", "что", "это", "как", "а", "но", "у", "о", "от"}, "de": {"und", "oder", "der", "die", "das", "zu", "von", "mit", "fur", "in", "auf", "ist", "sind"}, "es": {"y", "o", "el", "la", "los", "las", "de", "del", "en", "con", "para", "por", "es", "son"}, "it": {"e", "o", "il", "lo", "la", "i", "gli", "le", "di", "del", "in", "con", "per", "da", "e", "sono"}, "pl": {"i", "oraz", "lub", "w", "na", "z", "ze", "do", "od", "po", "dla", "to", "jest", "sa"}, "pt": {"e", "ou", "o", "a", "os", "as", "de", "do", "da", "em", "no", "na", "com", "para", "por", "e", "sao"}, } BERT_TARGET_THRESHOLD = 0.7 BERT_GOAL_DELTA_MIN = 0.005 TITLE_TARGET_THRESHOLD = 0.65 SEMANTIC_GAP_TOLERANCE_PCT = 0.15 SEMANTIC_GAP_MIN_ABS = 3.0 STAGE_ORDER = ["bert", "bm25", "ngram", "semantic", "title"] NGRAM_ATTEMPTS_PER_TERM = 3 def _normalize_stage_name(v: Any) -> str: s = str(v or "").strip().lower() return s if s in STAGE_ORDER else "" def _goal_label_canonical(goal: Dict[str, Any]) -> str: t = str(goal.get("type", "") or "").strip().lower() if t == "bm25": term = str(goal.get("bm25_word", "") or "").strip().lower() if term: return term label = str(goal.get("label", "") or "").strip().lower() if label.startswith("reduce spam:"): return label.replace("reduce spam:", "", 1).strip() return label return str(goal.get("label", "") or "").strip().lower() def _build_custom_goal(stage: str, value: str, language: str) -> Optional[Dict[str, Any]]: raw = str(value or "").strip() if not raw: return None if stage == "bert": return { "type": "bert", "label": raw, "focus_terms": _filter_stopwords(_tokenize(raw), language)[:6], "avoid_terms": [], "bert_phrase_score": 0.0, "bert_target": float(BERT_TARGET_THRESHOLD), } if stage == "bm25": return { "type": "bm25", "label": f"reduce spam: {raw}", "focus_terms": [], "avoid_terms": [raw], "bm25_count": 2, "bm25_word": raw, } if stage == "ngram": return { "type": "ngram", "label": raw, "focus_terms": [raw], "avoid_terms": [], "ngram_target_count": 0.0, "ngram_comp_avg": 1.0, "ngram_tolerance_pct": 0.5, "ngram_lower_bound": 0.5, "ngram_upper_bound": 1.5, "ngram_direction": "increase", "ngram_rank_index": 0, "ngram_candidates_total": 1, } if stage == "semantic": return { "type": "semantic", "label": raw, "focus_terms": [raw], "avoid_terms": [], "semantic_gap": float(SEMANTIC_GAP_MIN_ABS), } if stage == "title": return { "type": "title", "label": "title alignment", "focus_terms": _filter_stopwords(_tokenize(raw), language)[:8], "avoid_terms": [], "title_bert_score": 0.0, "title_target": float(TITLE_TARGET_THRESHOLD), } return None def _apply_stage_goal_override( goals: List[Dict[str, Any]], stage: str, language: str, stage_goal_overrides: Optional[Dict[str, Any]], ) -> List[Dict[str, Any]]: ov_all = stage_goal_overrides or {} ov = ov_all.get(stage) if isinstance(ov_all, dict) else None if not isinstance(ov, dict): return goals mode = str(ov.get("mode", "auto") or "auto").strip().lower() if mode not in {"auto", "manual", "mixed"}: mode = "auto" selected_raw = ov.get("selected") or [] custom_raw = ov.get("custom_add") or [] selected_set = { str(x or "").strip().lower() for x in selected_raw if str(x or "").strip() } custom_goals: List[Dict[str, Any]] = [] for item in custom_raw: g = _build_custom_goal(stage, str(item or ""), language) if g: custom_goals.append(g) if mode == "auto": out = list(goals) else: out = [] for g in goals: if _goal_label_canonical(g) in selected_set: out.append(g) if mode in {"manual", "mixed"}: out.extend(custom_goals) # Deduplicate by canonical goal label to keep deterministic cursor behavior. dedup: Dict[str, Dict[str, Any]] = {} for g in out: key = _goal_label_canonical(g) if key and key not in dedup: dedup[key] = g return list(dedup.values()) def _tokenize(text: str) -> List[str]: return [ x for x in re.sub(r"[^\w\s-]+", " ", (text or "").lower(), flags=re.UNICODE).split() if len(x) >= 2 ] def _tokenize_ngrams_strict(text: str) -> List[str]: """ Tokenizer for strict n-gram matching. В отличие от _tokenize() здесь НЕ отбрасываем короткие токены (например, "a"), чтобы совпадать с теми же n-граммами, которые генерирует логика BM25. """ return [ x for x in re.sub(r"[^\w\s-]+", " ", (text or "").lower(), flags=re.UNICODE).split() if x ] def _count_term_ngrams_strict(text: str, term: str) -> int: """ Strict count of exact token n-gram occurrences (overlapping allowed). Считает вхождения term как последовательности токенов. """ text_tokens = _tokenize_ngrams_strict(text) term_tokens = _tokenize_ngrams_strict(term) m = len(term_tokens) if m == 0 or not text_tokens or len(text_tokens) < m: return 0 cnt = 0 # Sliding window over token sequence for i in range(0, len(text_tokens) - m + 1): if text_tokens[i : i + m] == term_tokens: cnt += 1 return cnt def _filter_stopwords(tokens: List[str], language: str) -> List[str]: stop = STOP_WORDS.get(language, STOP_WORDS["en"]) return [t for t in tokens if t not in stop] def _split_sentences(text: str) -> List[str]: text = (text or "").strip() if not text: return [] parts = re.split(r"(?<=[\.\!\?])\s+", text) parts = [p.strip() for p in parts if p.strip()] if len(parts) <= 1: parts = [p.strip() for p in re.split(r"\n+", text) if p.strip()] return parts def _escape_html(v: Any) -> str: return html_lib.escape(str(v or ""), quote=True) def _diff_sentences_html(before_text: str, after_text: str) -> Tuple[str, List[Dict[str, str]]]: """ sentence-level diff for UI highlighting. Возвращает: - html для отображения ТОЛЬКО after_text (optimized), - список блоков что было/что стало (для "что именно менять"). """ before_sents = _split_sentences(before_text) after_sents = _split_sentences(after_text) matcher = difflib.SequenceMatcher(None, before_sents, after_sents, autojunk=False) parts: List[str] = [] changes: List[Dict[str, str]] = [] for tag, i1, i2, j1, j2 in matcher.get_opcodes(): if tag == "equal": for j in range(j1, j2): parts.append(_escape_html(after_sents[j])) elif tag == "replace": from_txt = " ".join(before_sents[i1:i2]).strip() to_txt = " ".join(after_sents[j1:j2]).strip() changes.append({"type": "replace", "from": from_txt, "to": to_txt}) for j in range(j1, j2): parts.append( f'{_escape_html(after_sents[j])}' ) elif tag == "insert": to_txt = " ".join(after_sents[j1:j2]).strip() changes.append({"type": "insert", "from": "", "to": to_txt}) for j in range(j1, j2): parts.append( f'{_escape_html(after_sents[j])}' ) elif tag == "delete": from_txt = " ".join(before_sents[i1:i2]).strip() changes.append({"type": "delete", "from": from_txt, "to": ""}) else: # Defensive: should not happen. for j in range(j1, j2): parts.append(_escape_html(after_sents[j])) diff_html = " ".join(parts).strip() return diff_html, changes def _diff_title_html(before_title: str, after_title: str) -> Tuple[str, List[Dict[str, str]]]: before_t = (before_title or "").strip() after_t = (after_title or "").strip() if before_t == after_t: return "", [] if not after_t: # Title removed: show nothing in UI, but keep "from/to" for debug. return "", [{"type": "delete", "from": before_t, "to": ""}] return ( f'{_escape_html(after_t)}', [{"type": "replace", "from": before_t, "to": after_t}], ) def _max_sentences_for_level(cascade_level: int, operation: str) -> int: if operation == "insert": return 2 if cascade_level <= 1: return 2 if cascade_level == 2: return 3 return 4 def _phrase_strategy_variants(mode: str, goal_type: str, goal_label: str) -> List[str]: normalized = (mode or "auto").strip().lower() if normalized == "ensemble": if goal_type != "bert": return ["auto"] phrase_len = len(_tokenize(goal_label or "")) if phrase_len >= 3: return ["distributed_preferred", "auto", "exact_preferred"] return ["auto", "exact_preferred", "distributed_preferred"] if normalized in {"auto", "exact_preferred", "distributed_preferred"}: return [normalized] return ["auto"] def _build_phrase_strategy_plan(mode: str, goal_type: str, goal_label: str, count: int) -> List[str]: variants = _phrase_strategy_variants(mode, goal_type, goal_label) if count <= 0: return [] return [variants[i % len(variants)] for i in range(count)] def _validate_title_candidate(edited_text: str) -> List[str]: """Guardrails for HTML plain text (not body sentences).""" reasons: List[str] = [] text = (edited_text or "").strip() if not text: reasons.append("empty_title") return reasons if "\n" in text or "\r" in text: reasons.append("title_multiline") if len(text) > 90: reasons.append("title_too_long>90") if len(text) < 8: reasons.append("title_too_short") if re.search(r"<[^>]+>", text): reasons.append("title_contains_html_tags") return reasons def _validate_candidate_text( edited_text: str, cascade_level: int, operation: str, goal_label: str = "", focus_terms: Optional[List[str]] = None, ) -> List[str]: reasons: List[str] = [] text = (edited_text or "").strip() if not text: reasons.append("empty_candidate") return reasons sentence_count = len(_split_sentences(text)) max_sent = _max_sentences_for_level(cascade_level, operation) if sentence_count > max_sent: reasons.append(f"too_many_sentences>{max_sent}") # Heuristic quality checks: duplicated words/entities and obvious malformed token joins. if re.search(r"\b([A-Za-z][A-Za-z0-9-]{1,})\s+\1\b", text, flags=re.IGNORECASE): reasons.append("duplicated_entity_or_word") # Catch broken lowercase+Camel join artifacts like "likemBit", but allow brand CamelCase like "RedDogCasino". if re.search(r"\b[a-z]{6,}[A-Z][a-z]+\b", text): reasons.append("suspicious_token_join") # Anti-stuffing checks for BERT phrase goals. focus_terms = focus_terms or [] phrase = (goal_label or "").strip().lower() normalized = re.sub(r"\s+", " ", text.lower()) if phrase: phrase_occurrences = normalized.count(phrase) phrase_token_count = len(_tokenize(phrase)) # For long goal phrases, repeated exact matches are usually unnatural. if phrase_token_count >= 3 and phrase_occurrences > 1: reasons.append("exact_phrase_stuffing") elif phrase_occurrences > 2: reasons.append("exact_phrase_stuffing") for term in focus_terms: tok = (term or "").strip().lower() if not tok: continue term_occurrences = len(re.findall(rf"\b{re.escape(tok)}\b", normalized)) if term_occurrences > 3: reasons.append("focus_term_overuse") break return reasons def _build_analysis_snapshot( target_text: str, competitors: List[str], keywords: List[str], language: str, target_title: str, competitor_titles: List[str], ) -> Dict[str, Any]: wc_target = logic.count_words(target_text, language) wc_comp = [logic.count_words(t, language) for t in competitors] if wc_comp: avg_total = sum(c["total"] for c in wc_comp) / len(wc_comp) avg_sig = sum(c["significant"] for c in wc_comp) / len(wc_comp) else: avg_total = 0 avg_sig = 0 ngram_stats = logic.calculate_ngram_stats(target_text, competitors, language) key_phrases, _ = logic.parse_keywords(keywords, language) bm25 = logic.calculate_bm25_recommendations(target_text, competitors, keywords, language) bert = logic.perform_bert_analysis(target_text, competitors, key_phrases, language) title_data = {} if (target_title or "").strip(): title_data = logic.analyze_title(target_title, competitor_titles, keywords, language) return { "word_counts": { "target": wc_target, "competitors": wc_comp, "avg": {"total": round(avg_total), "significant": round(avg_sig)}, }, "ngram_stats": ngram_stats, "bm25_recommendations": bm25, "bert_analysis": bert, "title_analysis": title_data, } def _build_semantic_snapshot( target_text: str, competitors: List[str], language: str, ) -> Dict[str, Any]: def _build_doc(text: str, doc_id: int) -> Dict[str, Any]: sentences_data = nlp_processor.preprocess_text(text, language) graph, word_weights = semantic_graph.build_semantic_graph(sentences_data, lang=language) graph_data = semantic_graph.get_graph_data_for_frontend(graph) return { "id": doc_id, "text": text, "word_weights": word_weights, "stats": { "nodes": len(graph_data.get("nodes", [])), "links": len(graph_data.get("links", [])), }, } target_doc = _build_doc(target_text, 0) comp_docs = [] for idx, c in enumerate([x for x in competitors if (x or "").strip()]): comp_docs.append(_build_doc(c, idx + 1)) num_comp = len(comp_docs) target_weights = target_doc["word_weights"] all_terms = set(target_weights.keys()) for c in comp_docs: all_terms.update(c["word_weights"].keys()) term_power_table = [] for term in all_terms: target_weight = int(target_weights.get(term, 0)) comp_weights = [int(c["word_weights"].get(term, 0)) for c in comp_docs] comp_avg = round(sum(comp_weights) / max(1, num_comp), 2) comp_occ = sum(1 for w in comp_weights if w > 0) term_power_table.append( { "term": term, "target_weight": target_weight, "competitor_avg_weight": comp_avg, "comp_occurrence": comp_occ, "comp_total": num_comp, } ) return {"comparison": {"term_power_table": term_power_table, "num_competitors": num_comp}} def _is_semantic_gap(target_weight: float, competitor_avg_weight: float) -> bool: # Gap is significant only when competitor is meaningfully above target: # - relative margin above tolerance band (15%) # - and absolute margin to avoid micro-noise around normalized weights if competitor_avg_weight <= 0: return False abs_gap = competitor_avg_weight - target_weight rel_threshold = target_weight * (1.0 + SEMANTIC_GAP_TOLERANCE_PCT) return (competitor_avg_weight > rel_threshold) and (abs_gap >= SEMANTIC_GAP_MIN_ABS) def _ngram_tolerance_pct(competitor_avg: float) -> float: # User rule: # - avg >= 4 -> +/-20% # - avg < 4 -> +/-50% return 0.20 if competitor_avg >= 4.0 else 0.50 def _is_ngram_outside_tolerance(target_count: float, competitor_avg: float) -> bool: if competitor_avg <= 0: return False tol = _ngram_tolerance_pct(competitor_avg) low = competitor_avg * (1.0 - tol) high = competitor_avg * (1.0 + tol) return target_count < low or target_count > high def _ngram_deviation_ratio(target_count: float, competitor_avg: float) -> float: if competitor_avg <= 0: return 0.0 return abs(target_count - competitor_avg) / max(competitor_avg, 1e-6) def _keyword_unigram_set(keywords: List[str], language: str) -> set: out = set() for kw in keywords: toks = _filter_stopwords(_tokenize(kw), language) for t in toks: out.add(t) return out def _is_ngram_stage_candidate( ngram_label: str, comp_occurrence: int, competitor_count: int, keyword_unigrams: set, ) -> bool: ngram = (ngram_label or "").strip().lower() if not ngram: return False tokens = _tokenize(ngram) n = len(tokens) if competitor_count > 1: if comp_occurrence < 2: return False if n >= 2: # For multi-competitor mode, bi/tri-grams with K>=2 are always candidates. return True # Unigrams are candidates only if they belong to key phrases. return n == 1 and tokens[0] in keyword_unigrams # Single-competitor mode: keep broader eligibility. return comp_occurrence >= 1 def _chunk_ngram_count(text: str, ngram_label: str, language: str) -> int: toks = _filter_stopwords(_tokenize(text), language) phrase_toks = _filter_stopwords(_tokenize(ngram_label), language) if not toks or not phrase_toks: return 0 n = len(phrase_toks) if n == 1: term = phrase_toks[0] return sum(1 for t in toks if t == term) count = 0 for i in range(0, max(0, len(toks) - n + 1)): if toks[i : i + n] == phrase_toks: count += 1 return count def _build_ngram_stage_rows( analysis: Dict[str, Any], keywords: List[str], language: str, ) -> List[Tuple[str, float, float, float, int, float]]: """ Eligible underrepresented n-gram targets for the ngram stage. Each row: (ngram_label, target_count, comp_avg, tolerance_pct, comp_occurrence, dev_ratio). Priority (user policy): maximize competitor coverage Freq(K), then Avg(K), then how far below the band the target is (larger deviation first). Only terms with My=0 or outside the tolerance band below average are included. """ ngram_rows: List[Tuple[str, float, float, float, int, float]] = [] ngram_stats = analysis.get("ngram_stats", {}) or {} competitor_count = len((analysis.get("word_counts", {}) or {}).get("competitors", []) or []) keyword_unigrams = _keyword_unigram_set(keywords, language) for _bucket_name, bucket in ngram_stats.items(): if not isinstance(bucket, list): continue for item in bucket: ngram_label = str(item.get("ngram", "")).strip() if not ngram_label: continue target = float(item.get("target_count", 0)) comp_avg = float(item.get("competitor_avg", 0)) comp_occ = int(item.get("comp_occurrence", 0)) if not _is_ngram_stage_candidate(ngram_label, comp_occ, competitor_count, keyword_unigrams): continue if not _is_ngram_outside_tolerance(target, comp_avg): continue if target >= comp_avg: continue tol = _ngram_tolerance_pct(comp_avg) dev_ratio = _ngram_deviation_ratio(target, comp_avg) ngram_rows.append((ngram_label, target, comp_avg, tol, comp_occ, dev_ratio)) ngram_rows.sort(key=lambda x: (x[4], x[2], x[5]), reverse=True) return ngram_rows def _score_ngram_candidate_window(window_sentences: List[str], goal_label: str, language: str) -> float: """Heuristic: good place to add phrase — low local duplication, topical proximity, not boilerplate.""" chunk = " ".join(s for s in window_sentences if s).strip() if not chunk: return -1e6 phrase_count = float(_chunk_ngram_count(chunk, goal_label, language)) noise_n = sum(1 for s in window_sentences if _is_noise_like_sentence(s)) noise_frac = noise_n / max(1, len(window_sentences)) phrase_tokens = [t.lower() for t in _filter_stopwords(_tokenize(goal_label), language) if t] chunk_l = chunk.lower() unigram_hits = sum(1 for t in phrase_tokens if t and len(t) > 1 and t in chunk_l) rel_proxy = unigram_hits / max(1, len(phrase_tokens)) if phrase_tokens else 0.0 return ( -3.0 * phrase_count + 2.2 * rel_proxy - 4.0 * noise_frac + min(len(chunk) / 1200.0, 0.35) ) def _rank_ngram_overlap_sentence_indices( sentences: List[str], goal_label: str, language: str, ) -> List[int]: """ Slide overlapping multi-sentence windows over the document; each sentence gets the best score among windows that contain it. Order sentences by that score (desc). """ n = len(sentences) if n <= 0: return [0] if n == 1: return [0] # 2–4 sentences per window, stride 1 for strong overlap. w = min(4, max(2, n)) best: List[float] = [-1e9] * n for start in range(0, n - w + 1): win = sentences[start : start + w] sc = _score_ngram_candidate_window(win, goal_label, language) for j in range(start, start + w): if sc > best[j]: best[j] = sc center = (n - 1) / 2.0 scored_idx = [(i, best[i], -abs(i - center)) for i in range(n)] scored_idx.sort(key=lambda t: (t[1], t[2]), reverse=True) return [t[0] for t in scored_idx] def _compute_metrics( analysis: Dict[str, Any], semantic: Dict[str, Any], keywords: List[str], language: str, bert_stage_target: float = BERT_TARGET_THRESHOLD, ) -> Dict[str, Any]: competitor_count = len(analysis.get("word_counts", {}).get("competitors", [])) min_signal = 1 if competitor_count <= 1 else 2 bert_details = analysis.get("bert_analysis", {}).get("detailed", []) or [] bert_low = [d for d in bert_details if float(d.get("my_max_score", 0)) < float(bert_stage_target)] bert_phrase_scores = { str(d.get("phrase", "")).strip().lower(): float(d.get("my_max_score", 0) or 0.0) for d in bert_details if str(d.get("phrase", "")).strip() } bm25_remove = [x for x in (analysis.get("bm25_recommendations") or []) if x.get("action") == "remove"] bm25_remove_count = len(bm25_remove) ngram_signal_count = 0 ngram_gap_sum = 0.0 keyword_unigrams = _keyword_unigram_set(keywords, language) ngrams = analysis.get("ngram_stats", {}) or {} for bucket in ngrams.values(): if not isinstance(bucket, list): continue for item in bucket: comp_occ = int(item.get("comp_occurrence", 0)) ngram_label = str(item.get("ngram", "")) if not _is_ngram_stage_candidate(ngram_label, comp_occ, competitor_count, keyword_unigrams): continue target = float(item.get("target_count", 0)) comp_avg = float(item.get("competitor_avg", 0)) if _is_ngram_outside_tolerance(target, comp_avg): ngram_signal_count += 1 ngram_gap_sum += _ngram_deviation_ratio(target, comp_avg) title_score = None title_bert = analysis.get("title_analysis", {}).get("bert", {}) if title_bert and title_bert.get("target_score") is not None: title_score = float(title_bert.get("target_score", 0)) keyword_terms = set() for kw in keywords: tokens = _filter_stopwords(_tokenize(kw), language) for t in tokens: keyword_terms.add(t) for n in (2, 3): for i in range(0, max(0, len(tokens) - n + 1)): keyword_terms.add(" ".join(tokens[i : i + n])) table = semantic.get("comparison", {}).get("term_power_table", []) or [] by_term = {str(r.get("term", "")).lower(): r for r in table} semantic_gap_count = 0 semantic_gap_sum = 0.0 semantic_gap_terms: List[Dict[str, Any]] = [] for term in keyword_terms: row = by_term.get(term) if not row: continue target_w = float(row.get("target_weight", 0)) comp_w = float(row.get("competitor_avg_weight", 0)) gap = comp_w - target_w if int(row.get("comp_occurrence", 0)) >= min_signal and _is_semantic_gap(target_w, comp_w): semantic_gap_count += 1 semantic_gap_sum += gap base = max(1.0, target_w) semantic_gap_terms.append( { "term": term, "target_weight": round(target_w, 2), "competitor_avg_weight": round(comp_w, 2), "gap": round(gap, 2), "gap_pct_of_target": round(gap / base, 4), "comp_occurrence": int(row.get("comp_occurrence", 0)), "comp_total": int(row.get("comp_total", 0)), } ) # Composite score (0..100) w_bert, w_bm25, w_ng, w_title, w_sem = 30, 20, 15, 10, 25 bert_comp = 1.0 - (len(bert_low) / max(1, len(bert_details))) bm25_comp = 1.0 if bm25_remove_count <= 3 else max(0.0, 1.0 - ((bm25_remove_count - 3) / 10.0)) ng_comp = max(0.0, 1.0 - (ngram_signal_count / 15.0)) title_comp = 1.0 if title_score is None else min(1.0, max(0.0, title_score / 0.65)) sem_comp = max(0.0, 1.0 - (semantic_gap_count / 20.0)) weighted = ( w_bert * bert_comp + w_bm25 * bm25_comp + w_ng * ng_comp + w_title * title_comp + w_sem * sem_comp ) total_w = w_bert + w_bm25 + w_ng + w_title + w_sem score = round((weighted / total_w) * 100.0, 2) resolved_title = "" _td = analysis.get("title_analysis") or {} if isinstance(_td, dict) and (_td.get("target_title") or "").strip(): resolved_title = str(_td.get("target_title")).strip() return { "score": score, "competitor_count": competitor_count, "min_competitor_signal": min_signal, "bert_low_count": len(bert_low), "bert_total_keywords": len(bert_details), "bert_phrase_scores": bert_phrase_scores, "bm25_remove_count": bm25_remove_count, "ngram_signal_count": ngram_signal_count, "ngram_gap_sum": round(ngram_gap_sum, 4), "title_bert_score": title_score, "resolved_title": resolved_title, "semantic_gap_count": semantic_gap_count, "semantic_gap_sum": round(semantic_gap_sum, 4), "semantic_gap_terms": sorted( semantic_gap_terms, key=lambda x: (x.get("gap", 0), x.get("gap_pct_of_target", 0)), reverse=True, )[:20], } def _choose_optimization_goal( analysis: Dict[str, Any], semantic: Dict[str, Any], keywords: List[str], language: str, stage: str = "bert", bert_stage_target: float = BERT_TARGET_THRESHOLD, stage_cursor: int = 0, ) -> Dict[str, Any]: goals = _collect_optimization_goals( analysis=analysis, semantic=semantic, keywords=keywords, language=language, stage=stage, bert_stage_target=bert_stage_target, ) if not goals: return {"type": "none", "label": "no-op", "focus_terms": [], "avoid_terms": []} pick = max(0, int(stage_cursor)) if pick >= len(goals): return {"type": "none", "label": "no-op", "focus_terms": [], "avoid_terms": []} return goals[pick] def _collect_optimization_goals( analysis: Dict[str, Any], semantic: Dict[str, Any], keywords: List[str], language: str, stage: str = "bert", bert_stage_target: float = BERT_TARGET_THRESHOLD, stage_goal_overrides: Optional[Dict[str, Any]] = None, ) -> List[Dict[str, Any]]: goals: List[Dict[str, Any]] = [] bert_details = analysis.get("bert_analysis", {}).get("detailed", []) or [] low_bert = [x for x in bert_details if float(x.get("my_max_score", 0)) < float(bert_stage_target)] if low_bert: for row in sorted(low_bert, key=lambda x: float(x.get("my_max_score", 0))): phrase = str(row.get("phrase", "")).strip() if not phrase: continue focus_terms = _filter_stopwords(_tokenize(phrase), language)[:4] goals.append( { "type": "bert", "label": phrase, "focus_terms": focus_terms, "avoid_terms": [], "bert_phrase_score": float(row.get("my_max_score", 0) or 0.0), "bert_target": float(bert_stage_target), } ) bm25_remove = [x for x in (analysis.get("bm25_recommendations") or []) if x.get("action") == "remove"] if len(bm25_remove) >= 4: for row in sorted(bm25_remove, key=lambda r: int(r.get("count", 0)), reverse=True)[:8]: word = str(row.get("word", "")).strip() if not word: continue goals.append( { "type": "bm25", "label": f"reduce spam: {word}", "focus_terms": [], "avoid_terms": [word], "bm25_count": int(row.get("count", 0) or 0), "bm25_word": word, } ) # Semantic keyword gaps lang_stop = STOP_WORDS.get(language, STOP_WORDS["en"]) keyword_terms = set() for kw in keywords: toks = [t for t in _tokenize(kw) if t not in lang_stop] keyword_terms.update(toks) for n in (2, 3): for i in range(0, max(0, len(toks) - n + 1)): keyword_terms.add(" ".join(toks[i : i + n])) table = semantic.get("comparison", {}).get("term_power_table", []) or [] candidate_rows: List[Tuple[str, float]] = [] for row in table: term = str(row.get("term", "")).lower() if term not in keyword_terms: continue target_w = float(row.get("target_weight", 0)) comp_w = float(row.get("competitor_avg_weight", 0)) gap = comp_w - target_w if _is_semantic_gap(target_w, comp_w): candidate_rows.append((term, gap)) if candidate_rows: for term, gap in sorted(candidate_rows, key=lambda x: x[1], reverse=True)[:12]: goals.append( { "type": "semantic", "label": term, "focus_terms": [term], "avoid_terms": [], "semantic_gap": float(gap), } ) # N-gram balancing (toward competitor average with tolerance policy). ngram_rows = _build_ngram_stage_rows(analysis, keywords, language) if ngram_rows: for rank, (label, target, comp_avg, tol, _, _) in enumerate(ngram_rows): goals.append({ "type": "ngram", "label": label, "focus_terms": [label], "avoid_terms": [], "ngram_target_count": target, "ngram_comp_avg": comp_avg, "ngram_tolerance_pct": tol, "ngram_lower_bound": round(comp_avg * (1.0 - tol), 3), "ngram_upper_bound": round(comp_avg * (1.0 + tol), 3), "ngram_direction": "increase" if target < comp_avg else "decrease", "ngram_rank_index": rank, "ngram_candidates_total": len(ngram_rows), }) title_bert = analysis.get("title_analysis", {}).get("bert", {}) or {} title_target_score = title_bert.get("target_score") if ( keywords and title_target_score is not None and float(title_target_score) < TITLE_TARGET_THRESHOLD ): goals.append( { "type": "title", "label": "title alignment", "focus_terms": _filter_stopwords(_tokenize(" ".join(keywords[:8])), language)[:8], "avoid_terms": [], "title_bert_score": float(title_target_score) if title_target_score is not None else None, "title_target": float(TITLE_TARGET_THRESHOLD), } ) stage_goals = [g for g in goals if g.get("type") == stage] return _apply_stage_goal_override(stage_goals, stage, language, stage_goal_overrides) def _per_goal_budget( goal: Dict[str, Any], max_iterations: int, candidates_per_iteration: int, bert_stage_target: float, ) -> Tuple[int, int]: """ Scale per-goal iteration and candidate budgets by how far the metric is from its target. Returns (effective_max_iterations_for_this_goal, effective_candidates_per_iteration). """ t = str(goal.get("type", "") or "") raw = 0.0 if t == "bert": sc = float(goal.get("bert_phrase_score", 0.0) or 0.0) tgt = float(goal.get("bert_target", bert_stage_target) or bert_stage_target) raw = max(0.0, (tgt - sc) / max(tgt, 1e-6)) elif t == "ngram": ca = float(goal.get("ngram_comp_avg", 0.0) or 0.0) tc = float(goal.get("ngram_target_count", 0.0) or 0.0) if str(goal.get("ngram_direction", "increase")) == "increase": need = max(0.0, ca - tc) raw = min(1.0, need / max(ca, 1e-6)) else: need = max(0.0, tc - ca) raw = min(1.0, need / max(tc, 1e-6)) elif t == "semantic": gap = float(goal.get("semantic_gap", 0.0) or 0.0) raw = min(1.0, gap / max(SEMANTIC_GAP_MIN_ABS * 4.0, 1e-6)) elif t == "bm25": c = int(goal.get("bm25_count", 0) or 0) raw = min(1.0, max(0, c - 1) / 8.0) elif t == "title": ts = goal.get("title_bert_score") if ts is None: raw = 0.5 else: tgt = float(goal.get("title_target", TITLE_TARGET_THRESHOLD) or TITLE_TARGET_THRESHOLD) raw = max(0.0, (tgt - float(ts)) / max(tgt, 1e-6)) else: raw = 0.0 iter_mult = 1.0 + 2.0 * min(1.0, raw) cand_mult = 1.0 + 1.0 * min(1.0, raw) eff_iter = max(1, min(int(round(max_iterations * iter_mult)), max_iterations * 3)) eff_cand = max(1, min(int(round(candidates_per_iteration * cand_mult)), 5)) return eff_iter, eff_cand def _estimate_total_loop_budget( analysis: Dict[str, Any], semantic: Dict[str, Any], keywords: List[str], language: str, max_iterations: int, candidates_per_iteration: int, bert_stage_target: float, active_stage_order: Optional[List[str]] = None, stage_goal_overrides: Optional[Dict[str, Any]] = None, ) -> int: total = 0 stages = active_stage_order or list(STAGE_ORDER) for st in stages: for g in _collect_optimization_goals( analysis, semantic, keywords, language, stage=st, bert_stage_target=bert_stage_target, stage_goal_overrides=stage_goal_overrides, ): ei, _ = _per_goal_budget(g, max_iterations, candidates_per_iteration, bert_stage_target) total += ei return min(480, max(1, total)) def _choose_sentence_idx(sentences: List[str], focus_terms: List[str], avoid_terms: List[str], language: str) -> int: if not sentences: return 0 stop = STOP_WORDS.get(language, STOP_WORDS["en"]) focus = [x for x in focus_terms if x and x not in stop] if avoid_terms: best_idx, best_score = 0, -1.0 for i, s in enumerate(sentences): lower = s.lower() score = sum(lower.count(t.lower()) for t in avoid_terms if t) if score > best_score: best_idx, best_score = i, score return best_idx if focus: best_idx, best_score = 0, -1.0 for i, s in enumerate(sentences): lower = s.lower() score = sum(lower.count(t.lower()) for t in focus) if score > best_score: best_idx, best_score = i, score return best_idx return min(2, len(sentences) - 1) def _rank_sentence_indices( sentences: List[str], focus_terms: List[str], avoid_terms: List[str], language: str, goal_type: str = "", goal_label: str = "", ) -> List[int]: if not sentences: return [0] stop = STOP_WORDS.get(language, STOP_WORDS["en"]) focus = [x for x in focus_terms if x and x not in stop] goal_phrase = (goal_label or "").strip().lower() avoid = [x for x in avoid_terms if x] center = (len(sentences) - 1) / 2.0 # N-gram stage: overlapping sentence windows — pick spans where insertion is natural # while document-level phrase count remains the primary optimization signal. if goal_type == "ngram" and (goal_label or "").strip(): return _rank_ngram_overlap_sentence_indices(sentences, str(goal_label).strip(), language) # For BERT optimization prefer natural prose chunks over list/menu/noisy blocks. candidate_indices = list(range(len(sentences))) if goal_type == "bert": non_noise = [i for i, s in enumerate(sentences) if not _is_noise_like_sentence(s)] if non_noise: candidate_indices = non_noise scored: List[Tuple[int, float, int]] = [] for idx in candidate_indices: s = sentences[idx] lower = s.lower() # For bm25 "remove" goals we must select spans that actually contain the exact n-gram term. # Otherwise the LLM can "optimize around" the goal without decreasing the real BM25 remove metric. if goal_type == "bm25" and avoid and not focus_terms: bm25_term = avoid[0] bm25_count = _count_term_ngrams_strict(s, bm25_term) focus_score = 0 avoid_score = float(bm25_count) chunk_rel = float(bm25_count) else: focus_score = sum(lower.count(t.lower()) for t in focus) avoid_score = sum(lower.count(t.lower()) for t in avoid) chunk_rel = _chunk_goal_relevance(s, goal_type, goal_label, focus_terms, language) noise_penalty = 1.0 if _is_noise_like_sentence(s) else 0.0 # For BERT goals, do not over-focus on existing occurrences only: # prioritize semantically relevant chunks where phrase/terms may still be underrepresented. if goal_type == "bert": tokenized = _filter_stopwords(_tokenize(lower), language) token_set = set(tokenized) core_terms = [t.lower() for t in focus if t] core_hits = sum(1 for t in core_terms if t in token_set) coverage = (core_hits / max(1, len(core_terms))) if core_terms else 0.0 phrase_present = 1.0 if (goal_phrase and goal_phrase in lower) else 0.0 # Boost candidates where semantic context is relevant but explicit core terms are not saturated yet. missing_term_boost = (1.0 - coverage) * 1.4 if chunk_rel >= 0.18 else 0.0 phrase_absent_boost = 0.35 if (goal_phrase and not phrase_present and chunk_rel >= 0.2) else 0.0 score = ( (chunk_rel * 5.0) + (focus_score * 1.2) + (missing_term_boost + phrase_absent_boost) + (avoid_score * 1.5) - (noise_penalty * 3.0) - (abs(idx - center) * 0.05) ) else: # Prefer semantically relevant and lexical matches; push noisy headers/CTA lower. score = (chunk_rel * 4.0) + (focus_score * 3.0) + (avoid_score * 2.0) - (noise_penalty * 3.0) - (abs(idx - center) * 0.05) scored.append((idx, score, len(s))) scored.sort(key=lambda x: (x[1], -x[2]), reverse=True) ordered = [idx for idx, _, _ in scored] if not ordered: ordered = list(range(len(sentences))) return ordered def _span_variants_for_level(cascade_level: int) -> List[Tuple[str, int, int]]: # (operation, left_radius, right_radius) if cascade_level <= 1: return [("rewrite", 0, 0), ("rewrite", 0, 1), ("rewrite", 1, 0)] if cascade_level == 2: return [("rewrite", 1, 1), ("rewrite", 0, 2), ("rewrite", 2, 0), ("rewrite", 1, 2), ("rewrite", 2, 1)] if cascade_level == 3: return [("insert", 0, 0), ("insert", 0, 0), ("insert", 0, 0), ("rewrite", 1, 1)] return [("rewrite", 2, 2), ("rewrite", 1, 3), ("rewrite", 3, 1), ("rewrite", 2, 3), ("rewrite", 3, 2)] def _choose_edit_span( sentences: List[str], goal: Dict[str, Any], language: str, cascade_level: int, attempt_cursor: int, ) -> Tuple[str, int, int, int, int]: ranked = _rank_sentence_indices( sentences, goal.get("focus_terms", []) or [], goal.get("avoid_terms", []) or [], language, str(goal.get("type", "") or ""), str(goal.get("label", "") or ""), ) variants = _span_variants_for_level(cascade_level) total = max(1, len(ranked) * len(variants)) pick = attempt_cursor % total sent_pick = pick % len(ranked) variant_pick = (pick // len(ranked)) % len(variants) sent_idx = ranked[sent_pick] operation, left_radius, right_radius = variants[variant_pick] if operation == "insert": span_start = sent_idx span_end = sent_idx else: span_start = max(0, sent_idx - left_radius) span_end = min(len(sentences) - 1, sent_idx + right_radius) return operation, span_start, span_end, sent_idx, variant_pick def _is_noise_like_sentence(text: str) -> bool: s = (text or "").strip() if not s: return True lower = s.lower() tokens = _tokenize(lower) if len(tokens) <= 2: return True if len(tokens) <= 8 and re.search(r"\b(play|explore|best|top|contact|login|signup|casino)\b", lower): return True if len(s) <= 90 and re.fullmatch(r"[A-Z0-9\s\-\|\:\.]+", s): return True if re.search(r"\b(best|top)\b.{0,20}\b(alternative|alternatives|casino|casinos)\b", lower) and len(tokens) <= 12: return True return False def _chunk_goal_relevance( text: str, goal_type: str, goal_label: str, focus_terms: List[str], language: str, goal_meta: Optional[Dict[str, Any]] = None, ) -> float: chunk = (text or "").strip() if not chunk: return 0.0 if goal_type == "bert" and (goal_label or "").strip(): try: model = logic.get_bert_model() embeddings = model.encode([goal_label.strip(), chunk], convert_to_tensor=True) return float(logic.util.cos_sim(embeddings[0:1], embeddings[1:2])[0][0].item()) except Exception: pass if goal_type == "ngram" and (goal_label or "").strip(): return float(_chunk_ngram_count(chunk, goal_label, language)) # Lexical fallback for non-BERT goals or if embedding scoring is unavailable. toks = _filter_stopwords(_tokenize(chunk), language) if not toks: return 0.0 focus = [f.lower() for f in (focus_terms or []) if f] if not focus: return 0.0 overlap = 0.0 token_str = " ".join(toks) for f in focus: if " " in f: overlap += 1.0 if f in token_str else 0.0 else: overlap += 1.0 if f in toks else 0.0 return overlap / max(1.0, float(len(focus))) def _chunk_goal_delta( before_text: str, after_text: str, goal_type: str, goal_label: str, focus_terms: List[str], language: str, goal_meta: Optional[Dict[str, Any]] = None, ) -> float: # BM25 "remove spam" must be judged by strict decrease of the exact n-gram term count. # Иначе LLM может улучшать семантику рядом, но не уменьшать реальный BM25 remove metric. if goal_type == "bm25": term = None avoid = [] focus = [] if isinstance(goal_meta, dict): term = goal_meta.get("bm25_word") avoid = goal_meta.get("avoid_terms") or [] focus = goal_meta.get("focus_terms") or [] # Fallback: try to infer from avoid_terms if not term and avoid: term = avoid[0] if not term: return 0.0 before_count = _count_term_ngrams_strict(before_text or "", str(term)) after_count = _count_term_ngrams_strict(after_text or "", str(term)) # For now bm25 goals are "remove" (focus_terms is empty, avoid_terms contains term). # Keep the sign direction explicit so add-mode can be introduced later safely. if avoid and not focus: return round(float(before_count - after_count), 4) # If ever we add bm25 "add" goals with focus terms, then increase is good. return round(float(after_count - before_count), 4) before_rel = _chunk_goal_relevance(before_text, goal_type, goal_label, focus_terms, language, goal_meta) after_rel = _chunk_goal_relevance(after_text, goal_type, goal_label, focus_terms, language, goal_meta) if goal_type == "ngram": target_avg = float((goal_meta or {}).get("ngram_comp_avg", 0.0)) if target_avg > 0: # Positive delta means closer to competitor average regardless of direction. before_dist = abs(before_rel - target_avg) after_dist = abs(after_rel - target_avg) return round(before_dist - after_dist, 4) return round(after_rel - before_rel, 4) def _min_chunk_delta(goal_type: str) -> float: if goal_type == "bert": return 0.01 if goal_type == "title": return 0.005 if goal_type == "ngram": # Require at least one occurrence-equivalent movement toward target zone. return 0.5 return 0.05 def _chunk_relevance_pair( before_text: str, after_text: str, goal_type: str, goal_label: str, focus_terms: List[str], language: str, goal_meta: Optional[Dict[str, Any]] = None, ) -> Tuple[float, float]: before_rel = _chunk_goal_relevance(before_text, goal_type, goal_label, focus_terms, language, goal_meta) after_rel = _chunk_goal_relevance(after_text, goal_type, goal_label, focus_terms, language, goal_meta) return round(before_rel, 4), round(after_rel, 4) def _term_diff(before_text: str, after_text: str, language: str) -> Dict[str, List[str]]: before_tokens = _filter_stopwords(_tokenize(before_text), language) after_tokens = _filter_stopwords(_tokenize(after_text), language) before_set = set(before_tokens) after_set = set(after_tokens) removed = sorted(list(before_set - after_set))[:12] added = sorted(list(after_set - before_set))[:12] return {"added_terms": added, "removed_terms": removed} def _edits_conflict(a: Dict[str, Any], b: Dict[str, Any]) -> bool: if a.get("operation") == "title_rewrite" or b.get("operation") == "title_rewrite": return True if a.get("operation") == "insert" or b.get("operation") == "insert": return int(a.get("span_start", 0)) == int(b.get("span_start", 0)) a0, a1 = int(a.get("span_start", 0)), int(a.get("span_end", 0)) b0, b1 = int(b.get("span_start", 0)), int(b.get("span_end", 0)) return not (a1 < b0 or b1 < a0) def _apply_edits_to_sentences(sentences: List[str], edits: List[Dict[str, Any]]) -> List[str]: updated = list(sentences) ordered = sorted( edits, key=lambda e: (int(e.get("span_start", 0)), int(e.get("span_end", 0))), reverse=True, ) for edit in ordered: op = str(edit.get("operation", "rewrite")) start = int(edit.get("span_start", 0)) end = int(edit.get("span_end", start)) text = str(edit.get("edited_text", "")).strip() if not text: continue if op == "insert": updated = _insert_after(updated, end, text) else: updated = _replace_span(updated, start, end, text) return updated def _metrics_delta(prev_metrics: Dict[str, Any], next_metrics: Dict[str, Any]) -> Dict[str, Any]: keys = [ "score", "bert_low_count", "bm25_remove_count", "ngram_signal_count", "ngram_gap_sum", "semantic_gap_count", "semantic_gap_sum", ] out: Dict[str, Any] = {} for k in keys: pv = prev_metrics.get(k) nv = next_metrics.get(k) if pv is None or nv is None: continue out[k] = round(float(nv) - float(pv), 4) pv_t = prev_metrics.get("title_bert_score") nv_t = next_metrics.get("title_bert_score") if pv_t is not None and nv_t is not None: out["title_bert_score"] = round(float(nv_t) - float(pv_t), 4) prev_terms = {str(x.get("term", "")).lower() for x in (prev_metrics.get("semantic_gap_terms") or []) if x.get("term")} next_terms = {str(x.get("term", "")).lower() for x in (next_metrics.get("semantic_gap_terms") or []) if x.get("term")} if prev_terms or next_terms: out["semantic_gap_terms_added"] = sorted(list(next_terms - prev_terms))[:12] out["semantic_gap_terms_removed"] = sorted(list(prev_terms - next_terms))[:12] return out def _non_conflicting_edit_combos(candidates: List[Dict[str, Any]], min_size: int = 2, max_size: int = 4) -> List[List[Dict[str, Any]]]: if not candidates: return [] n = len(candidates) combos: List[List[Dict[str, Any]]] = [] for r in range(max(2, min_size), min(max_size, n) + 1): for idxs in combinations(range(n), r): combo = [candidates[i] for i in idxs] conflict = False for i in range(len(combo)): for j in range(i + 1, len(combo)): e1 = combo[i].get("edit_payload") e2 = combo[j].get("edit_payload") if not e1 or not e2: conflict = True break if _edits_conflict(e1, e2): conflict = True break if conflict: break if not conflict: combos.append(combo) return combos def _extract_json_object(text: str) -> Optional[Dict[str, Any]]: raw = (text or "").strip() if not raw: return None try: return json.loads(raw) except Exception: pass m = re.search(r"\{[\s\S]*\}", raw) if not m: return None try: return json.loads(m.group(0)) except Exception: return None def _llm_edit_chunk( *, api_key: str, base_url: str, model: str, language: str, full_text: str, chunk_text: str, operation: str, context_before: str, context_after: str, cascade_level: int, goal_type: str, goal_label: str, focus_terms: List[str], avoid_terms: List[str], temperature: float, phrase_strategy_mode: str = "auto", ) -> Dict[str, Any]: # Title optimization must never touch body text; see optimize_text branch for goal.type == "title". if str(goal_type or "").strip().lower() == "title": raise ValueError( "Internal error: goal_type 'title' must be handled via _llm_edit_title(), not _llm_edit_chunk(). " "If you see this on a server, deploy the current optimizer (title-only LLM path)." ) endpoint = base_url.rstrip("/") + "/chat/completions" op = operation if operation in {"rewrite", "insert"} else "rewrite" system_msg = ( "You are a semantic-vector optimizer for SEO tasks. " "Your task is to improve chunk relevance to the focus terms/goal phrase with minimal local edits. " "Preserve narrative flow, factual tone, and language. " "Return strict JSON only: {\"edited_text\": \"...\", \"rationale\": \"...\"}. " "Do not rewrite the whole text. Never change topic or introduce unrelated entities." ) op_instruction = ( "Rewrite the provided chunk only." if op == "rewrite" else "Create a short bridge chunk (1-2 sentences) to insert after the chunk." ) max_sent = _max_sentences_for_level(cascade_level, op) phrase_tokens = _filter_stopwords(_tokenize(goal_label or ""), language) phrase_len = len(phrase_tokens) strategy_mode = (phrase_strategy_mode or "auto").strip().lower() if strategy_mode not in {"auto", "exact_preferred", "distributed_preferred", "ensemble"}: strategy_mode = "auto" if strategy_mode == "exact_preferred": phrase_strategy = ( "Prefer one natural exact phrase mention when grammatically correct; otherwise use distributed core-term coverage." ) elif strategy_mode == "distributed_preferred": phrase_strategy = ( "Prefer distributed semantic coverage: spread core terms/lemmas naturally and avoid exact phrase unless absolutely natural." ) elif phrase_len >= 3: phrase_strategy = ( "Prefer distributed semantic coverage for long phrases: naturally spread core terms/lemmas across the local paragraph. " "Use exact phrase only if it is grammatically natural." ) elif phrase_len == 2: phrase_strategy = ( "For two-term goals, use either one natural exact phrase or distributed use of both terms without repetition." ) else: phrase_strategy = ( "For single-term goals, improve relevance using natural lexical variants and nearby semantic anchors." ) user_msg = ( f"Language: {language}\n" f"Operation: {op}\n" f"Cascade level: L{cascade_level}\n" f"Goal: {goal_type} ({goal_label})\n" f"Goal token count (without stopwords): {phrase_len}\n" f"Instruction: {op_instruction}\n" f"Must preserve overall narrative and style.\n" "Text must be grammatically correct and natural for native readers.\n" "Keep edits tightly local to the provided chunk and immediate context only.\n" "Edit must be substantive (not just synonyms) and should increase relevance to the goal phrase.\n" "Do not change the sentence subject/entity focus unless absolutely required by grammar.\n" f"Phrase strategy: {phrase_strategy}\n" f"Focus terms to strengthen: {', '.join(focus_terms) if focus_terms else '-'}\n" f"Terms to de-emphasize/avoid overuse: {', '.join(avoid_terms) if avoid_terms else '-'}\n\n" f"Chunk to edit/expand:\n{chunk_text}\n\n" f"Prev context:\n{context_before}\n\n" f"Next context:\n{context_after}\n\n" "Constraints:\n" "1) Keep text concise and locally coherent.\n" "2) Keep local coherence with surrounding text.\n" f"3) Max {max_sent} sentence(s) in edited_text.\n" "4) Keep key named entities from the original chunk unchanged when possible.\n" "5) For BERT goals, prioritize semantic alignment over exact phrase repetition.\n" "6) If exact phrase sounds unnatural, do NOT force it; use grammatically correct distributed wording.\n" "7) Exact phrase may appear at most once, and only when it reads naturally.\n" "8) Avoid repeating the same focus term more than needed; no stuffing.\n" "9) For rewrite: preserve original meaning sentence-by-sentence while improving relevance.\n" "10) Provide rationale in one short sentence.\n" "11) Only output JSON object." ) payload = { "model": model, "temperature": float(max(0.0, min(1.2, temperature))), "messages": [ {"role": "system", "content": system_msg}, {"role": "user", "content": user_msg}, ], "response_format": {"type": "json_object"}, } headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} response = requests.post(endpoint, headers=headers, json=payload, timeout=60) response.raise_for_status() data = response.json() content = ( data.get("choices", [{}])[0] .get("message", {}) .get("content", "") ) parsed = _extract_json_object(content) edited = "" rationale = "" if parsed: edited = str(parsed.get("edited_text") or parsed.get("revised_sentence") or parsed.get("rewrite") or "").strip() rationale = str(parsed.get("rationale") or parsed.get("why") or "").strip() if not edited: raise ValueError("LLM returned invalid JSON edit payload.") return { "edited_text": edited, "rationale": rationale, "prompt_debug": { "operation": op, "cascade_level": cascade_level, "goal_type": goal_type, "goal_label": goal_label, "focus_terms": focus_terms, "avoid_terms": avoid_terms, "phrase_strategy_mode": strategy_mode, "goal_token_count": phrase_len, "phrase_strategy": phrase_strategy, "max_sentences": max_sent, "chunk_text": chunk_text, "context_before": context_before, "context_after": context_after, "temperature": float(max(0.0, min(1.2, temperature))), }, } def _llm_edit_title( *, api_key: str, base_url: str, model: str, language: str, current_title: str, body_excerpt: str, competitor_title_hint: str, focus_terms: List[str], avoid_terms: List[str], keywords: List[str], cascade_level: int, temperature: float, phrase_strategy_mode: str = "auto", ) -> Dict[str, Any]: """ Rewrite only the page <title> plain text. Backend metric: mean BERT cos-sim(title, each keyword). """ endpoint = base_url.rstrip("/") + "/chat/completions" strategy_mode = (phrase_strategy_mode or "auto").strip().lower() if strategy_mode not in {"auto", "exact_preferred", "distributed_preferred", "ensemble"}: strategy_mode = "auto" phrase_tokens = _filter_stopwords(_tokenize(" ".join(keywords[:6])), language) phrase_len = len(phrase_tokens) if strategy_mode == "exact_preferred": phrase_strategy = ( "Prefer one natural exact phrase from the keyword list when it fits; otherwise distribute core terms." ) elif strategy_mode == "distributed_preferred": phrase_strategy = "Prefer natural distribution of core terms; avoid stiff exact-match phrasing." elif phrase_len >= 3: phrase_strategy = ( "Spread core lemmas across the title; exact multi-word match only if it reads like a real title." ) elif phrase_len == 2: phrase_strategy = "Use both core ideas in one short title line without repetition." else: phrase_strategy = "Keep the title specific and aligned with the keyword theme using natural wording." system_msg = ( "You optimize HTML <title> tag text for SEO. " "Output JSON only: {\"edited_text\": \"...\", \"rationale\": \"...\"}. " "edited_text must be the new title line ONLY: plain text, no <title> tags, no quotes wrapping the whole title. " "Do not change the article body. Preserve the primary brand/site name from the current title when present." ) kw_line = ", ".join(k.strip() for k in (keywords or [])[:24] if k.strip()) or "-" user_msg = ( f"Language: {language}\n" f"Cascade level: L{cascade_level}\n" f"Task: Improve semantic alignment of the TITLE with these keywords (average embedding similarity should rise).\n" f"Keywords (priority): {kw_line}\n" f"Focus terms: {', '.join(focus_terms) if focus_terms else '-'}\n" f"De-emphasize: {', '.join(avoid_terms) if avoid_terms else '-'}\n" f"Phrase strategy: {phrase_strategy}\n\n" f"Current title:\n{current_title.strip()}\n\n" f"Page content excerpt (context only; do not paste into title):\n{(body_excerpt or '')[:900]}\n\n" f"{competitor_title_hint}\n\n" "Constraints:\n" "1) Length about 35–60 characters when possible; hard max 88 characters.\n" "2) One line; no line breaks; title case or sentence case per language norms.\n" "3) Natural human phrasing; no keyword stuffing; each important term at most once unless grammar needs it.\n" "4) Do not add unrelated entities or claims.\n" "5) Rationale: one short sentence.\n" ) payload = { "model": model, "temperature": float(max(0.0, min(1.2, temperature))), "messages": [ {"role": "system", "content": system_msg}, {"role": "user", "content": user_msg}, ], "response_format": {"type": "json_object"}, } headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} response = requests.post(endpoint, headers=headers, json=payload, timeout=60) response.raise_for_status() data = response.json() content = ( data.get("choices", [{}])[0] .get("message", {}) .get("content", "") ) parsed = _extract_json_object(content) edited = "" rationale = "" if parsed: edited = str(parsed.get("edited_text") or parsed.get("title") or "").strip() rationale = str(parsed.get("rationale") or "").strip() if not edited: raise ValueError("LLM returned invalid JSON title payload.") return { "edited_text": edited, "rationale": rationale, "prompt_debug": { "operation": "title_rewrite", "cascade_level": cascade_level, "goal_type": "title", "goal_label": "title alignment", "focus_terms": focus_terms, "avoid_terms": avoid_terms, "phrase_strategy_mode": strategy_mode, "phrase_strategy": phrase_strategy, "chunk_text": current_title.strip(), "context_before": (body_excerpt or "")[:500], "context_after": competitor_title_hint[:500] if competitor_title_hint else "", "temperature": float(max(0.0, min(1.2, temperature))), }, } def _replace_span(sentences: List[str], start_idx: int, end_idx: int, replacement_text: str) -> List[str]: replacement = _split_sentences(replacement_text) if not replacement: replacement = [replacement_text.strip()] return sentences[:start_idx] + replacement + sentences[end_idx + 1 :] def _insert_after(sentences: List[str], after_idx: int, inserted_text: str) -> List[str]: insertion = _split_sentences(inserted_text) if not insertion: insertion = [inserted_text.strip()] return sentences[: after_idx + 1] + insertion + sentences[after_idx + 1 :] def _goal_improved( goal_type: str, goal_label: str, prev_metrics: Dict[str, Any], next_metrics: Dict[str, Any], ) -> bool: if goal_type == "bert": key = (goal_label or "").strip().lower() prev_phrase = float((prev_metrics.get("bert_phrase_scores") or {}).get(key, 0.0)) next_phrase = float((next_metrics.get("bert_phrase_scores") or {}).get(key, 0.0)) # Accept smaller but real phrase-level gains to accumulate progress toward threshold (e.g. 0.51 -> 0.70). return (next_phrase - prev_phrase) >= BERT_GOAL_DELTA_MIN or next_metrics["bert_low_count"] < prev_metrics["bert_low_count"] if goal_type == "bm25": return next_metrics["bm25_remove_count"] < prev_metrics["bm25_remove_count"] if goal_type == "semantic": return next_metrics["semantic_gap_count"] < prev_metrics["semantic_gap_count"] if goal_type == "ngram": return ( next_metrics["ngram_signal_count"] < prev_metrics["ngram_signal_count"] or float(next_metrics.get("ngram_gap_sum", 0.0)) < float(prev_metrics.get("ngram_gap_sum", 0.0)) ) if goal_type == "title": pt = prev_metrics.get("title_bert_score") nt = next_metrics.get("title_bert_score") if pt is None or nt is None: return False return float(nt) > float(pt) or float(nt) >= TITLE_TARGET_THRESHOLD return next_metrics["score"] > prev_metrics["score"] def _bert_phrase_delta(goal_label: str, prev_metrics: Dict[str, Any], next_metrics: Dict[str, Any]) -> float: key = (goal_label or "").strip().lower() prev_phrase = float((prev_metrics.get("bert_phrase_scores") or {}).get(key, 0.0)) next_phrase = float((next_metrics.get("bert_phrase_scores") or {}).get(key, 0.0)) return round(next_phrase - prev_phrase, 4) def _safe_delta(prev_metrics: Dict[str, Any], next_metrics: Dict[str, Any], key: str) -> float: try: return float(next_metrics.get(key, 0.0)) - float(prev_metrics.get(key, 0.0)) except Exception: return 0.0 def _stage_primary_progress(stage: str, prev_metrics: Dict[str, Any], next_metrics: Dict[str, Any]) -> bool: if stage == "bert": prev_low = int(prev_metrics.get("bert_low_count", 0)) next_low = int(next_metrics.get("bert_low_count", 0)) if next_low < prev_low: return True prev_max = max([0.0] + [float(v) for v in (prev_metrics.get("bert_phrase_scores") or {}).values()]) next_max = max([0.0] + [float(v) for v in (next_metrics.get("bert_phrase_scores") or {}).values()]) return (next_max - prev_max) >= BERT_GOAL_DELTA_MIN if stage == "bm25": return int(next_metrics.get("bm25_remove_count", 0)) < int(prev_metrics.get("bm25_remove_count", 0)) if stage == "semantic": return ( int(next_metrics.get("semantic_gap_count", 0)) < int(prev_metrics.get("semantic_gap_count", 0)) or float(next_metrics.get("semantic_gap_sum", 0.0)) < float(prev_metrics.get("semantic_gap_sum", 0.0)) ) if stage == "ngram": return ( int(next_metrics.get("ngram_signal_count", 0)) < int(prev_metrics.get("ngram_signal_count", 0)) or float(next_metrics.get("ngram_gap_sum", 0.0)) < float(prev_metrics.get("ngram_gap_sum", 0.0)) ) if stage == "title": pv = prev_metrics.get("title_bert_score") nv = next_metrics.get("title_bert_score") if pv is None or nv is None: return False return float(nv) > float(pv) return False def _is_stage_complete(stage: str, metrics: Dict[str, Any], bert_stage_target: float = BERT_TARGET_THRESHOLD) -> bool: if stage == "bert": # Complete only when all tracked BERT phrase scores meet the threshold. # This enforces per-phrase target behavior (no early exit on one strong phrase). scores = [float(v) for v in (metrics.get("bert_phrase_scores") or {}).values()] if not scores: return True return min(scores) >= float(bert_stage_target) if stage == "bm25": return int(metrics.get("bm25_remove_count", 0)) <= 3 if stage == "semantic": return int(metrics.get("semantic_gap_count", 0)) <= 0 if stage == "ngram": return int(metrics.get("ngram_signal_count", 0)) <= 0 if stage == "title": score = metrics.get("title_bert_score") return (score is None) or (float(score) >= TITLE_TARGET_THRESHOLD) return True def _advance_ngram_term_cursor(cursor_state: Dict[str, Dict[str, int]], stage_key: str) -> None: state = cursor_state.get(stage_key) or {"term_index": 0, "attempt_count": 0} attempts = int(state.get("attempt_count", 0)) + 1 term_index = int(state.get("term_index", 0)) if attempts >= NGRAM_ATTEMPTS_PER_TERM: term_index += 1 attempts = 0 cursor_state[stage_key] = {"term_index": term_index, "attempt_count": attempts} def _candidate_utility( *, prev_metrics: Dict[str, Any], next_metrics: Dict[str, Any], goal_type: str, goal_label: str, bert_phrase_delta: float, chunk_goal_delta: float, local_chunk_improved: bool, ) -> float: score_delta = _safe_delta(prev_metrics, next_metrics, "score") bm25_delta = _safe_delta(prev_metrics, next_metrics, "bm25_remove_count") bert_low_delta = _safe_delta(prev_metrics, next_metrics, "bert_low_count") ngram_delta = _safe_delta(prev_metrics, next_metrics, "ngram_signal_count") sem_gap_delta = _safe_delta(prev_metrics, next_metrics, "semantic_gap_count") title_delta = _safe_delta(prev_metrics, next_metrics, "title_bert_score") # Dynamic emphasis: # - if target phrase is still far from threshold, prioritize phrase-level BERT gains # - but keep non-BERT regressions as penalties to preserve future optimization capacity key = (goal_label or "").strip().lower() prev_phrase = float((prev_metrics.get("bert_phrase_scores") or {}).get(key, 0.0)) bert_push_mode = (goal_type == "bert" and prev_phrase < BERT_TARGET_THRESHOLD) w_phrase = 7.5 if bert_push_mode else 3.0 w_chunk = 1.6 if bert_push_mode else 1.0 w_score = 1.0 utility = ( (w_phrase * float(bert_phrase_delta)) + (w_chunk * float(chunk_goal_delta)) + (w_score * float(score_delta)) ) if local_chunk_improved: utility += 0.3 # Cross-metric guardrails as soft penalties (in addition to hard validity checks). utility -= max(0.0, bm25_delta) * 1.8 utility -= max(0.0, bert_low_delta) * 2.4 utility -= max(0.0, ngram_delta) * 0.6 utility -= max(0.0, sem_gap_delta) * 1.5 utility += min(0.0, title_delta) * 1.2 if goal_type == "title": utility += max(0.0, title_delta) * 5.0 return round(float(utility), 4) def _is_candidate_valid( prev_metrics: Dict[str, Any], next_metrics: Dict[str, Any], goal_type: str, goal_label: str, optimization_mode: str, ) -> Tuple[bool, List[str], bool]: mode = (optimization_mode or "balanced").lower() if mode not in {"conservative", "balanced", "aggressive"}: mode = "balanced" cfg = { "conservative": {"max_score_drop": 0.0, "max_title_drop": 0.02}, "balanced": {"max_score_drop": 1.0, "max_title_drop": 0.03}, "aggressive": {"max_score_drop": 2.0, "max_title_drop": 0.05}, }[mode] reasons = [] score_drop = float(prev_metrics["score"]) - float(next_metrics["score"]) if score_drop > cfg["max_score_drop"]: reasons.append(f"score_drop>{cfg['max_score_drop']}") # Hard regressions in critical counters. if next_metrics["bm25_remove_count"] > prev_metrics["bm25_remove_count"] + (1 if mode == "aggressive" else 0): reasons.append("bm25_remove_regression") if next_metrics["bert_low_count"] > prev_metrics["bert_low_count"] + (1 if mode == "aggressive" else 0): reasons.append("bert_low_regression") if next_metrics["semantic_gap_count"] > prev_metrics["semantic_gap_count"] + (1 if mode == "aggressive" else 0): reasons.append("semantic_gap_regression") prev_title = prev_metrics.get("title_bert_score") next_title = next_metrics.get("title_bert_score") if goal_type != "title": if prev_title is not None and next_title is not None and next_title < (prev_title - cfg["max_title_drop"]): reasons.append("title_bert_drop") improved = _goal_improved(goal_type, goal_label, prev_metrics, next_metrics) # In conservative mode require explicit goal improvement. if mode == "conservative" and not improved: reasons.append("goal_not_improved") return (len(reasons) == 0), reasons, improved def optimize_text( request_data: Dict[str, Any], progress_callback: Optional[Any] = None, cancel_event: Optional[Any] = None, ) -> Dict[str, Any]: target_text = str(request_data.get("target_text", "")).strip() competitors = [str(x) for x in (request_data.get("competitors") or []) if str(x).strip()] keywords = [str(x) for x in (request_data.get("keywords") or []) if str(x).strip()] language = str(request_data.get("language", "en")).strip() or "en" target_title = str(request_data.get("target_title", "") or "") competitor_titles = [str(x) for x in (request_data.get("competitor_titles") or [])] diff_mode_used = str(request_data.get("diff_mode", "diff_from_input") or "diff_from_input").strip().lower() if diff_mode_used not in {"diff_from_input", "diff_from_original"}: diff_mode_used = "diff_from_input" diff_base_text = target_text diff_base_title = target_title if diff_mode_used == "diff_from_original": diff_base_text = str(request_data.get("original_target_text") or target_text or "").strip() diff_base_title = str(request_data.get("original_target_title") or target_title or "").strip() else: diff_base_text = (target_text or "").strip() diff_base_title = (target_title or "").strip() api_key = str(request_data.get("api_key", "")).strip() if not api_key: raise ValueError("API key is required.") base_url = str(request_data.get("api_base_url", "https://api.deepseek.com/v1")).strip() or "https://api.deepseek.com/v1" model = str(request_data.get("model", "deepseek-chat")).strip() or "deepseek-chat" max_iterations = int(request_data.get("max_iterations", 2) or 2) max_iterations = max(1, min(8, max_iterations)) candidates_per_iteration = int(request_data.get("candidates_per_iteration", 2) or 2) candidates_per_iteration = max(1, min(5, candidates_per_iteration)) temperature = float(request_data.get("temperature", 0.25) or 0.25) optimization_mode = str(request_data.get("optimization_mode", "balanced") or "balanced") phrase_strategy_mode = str(request_data.get("phrase_strategy_mode", "auto") or "auto").strip().lower() if phrase_strategy_mode not in {"auto", "exact_preferred", "distributed_preferred", "ensemble"}: phrase_strategy_mode = "auto" bert_stage_target = float(request_data.get("bert_stage_target", BERT_TARGET_THRESHOLD) or BERT_TARGET_THRESHOLD) bert_stage_target = max(0.0, min(1.0, bert_stage_target)) stage_goal_overrides = request_data.get("stage_goal_overrides") or {} if not isinstance(stage_goal_overrides, dict): stage_goal_overrides = {} req_enabled_stages = request_data.get("enabled_stages") or [] active_stage_order: List[str] = [] if isinstance(req_enabled_stages, list): for x in req_enabled_stages: st = _normalize_stage_name(x) if st and st not in active_stage_order: active_stage_order.append(st) if not active_stage_order: active_stage_order = list(STAGE_ORDER) baseline_analysis = _build_analysis_snapshot( target_text, competitors, keywords, language, target_title, competitor_titles ) baseline_semantic = _build_semantic_snapshot(target_text, competitors, language) baseline_metrics = _compute_metrics( baseline_analysis, baseline_semantic, keywords, language, bert_stage_target=bert_stage_target ) # Per-goal iteration budget scales with deficit; total loop steps = sum(effective iters per goal). baseline_goal_counts = { st: len( _collect_optimization_goals( baseline_analysis, baseline_semantic, keywords, language, stage=st, bert_stage_target=bert_stage_target, stage_goal_overrides=stage_goal_overrides, ) ) for st in active_stage_order } ngram_row_count = int(baseline_goal_counts.get("ngram", 0)) total_loop_steps = _estimate_total_loop_budget( baseline_analysis, baseline_semantic, keywords, language, max_iterations, candidates_per_iteration, bert_stage_target, active_stage_order=active_stage_order, stage_goal_overrides=stage_goal_overrides, ) current_text = target_text current_title = (target_title or "").strip() current_analysis = baseline_analysis current_semantic = baseline_semantic current_metrics = baseline_metrics logs: List[Dict[str, Any]] = [] applied_changes = 0 def _emit(ev: str, **kwargs: Any) -> None: if progress_callback: try: progress_callback({"event": ev, **kwargs}) except Exception: pass def _cancelled() -> bool: return cancel_event is not None and getattr(cancel_event, "is_set", lambda: False)() def _pack_result(stopped_early: bool = False, stop_reason: str = "") -> Dict[str, Any]: # Title string must match what last metrics used (title_analysis.target_title), not only the mutable var. _ta = (current_analysis or {}).get("title_analysis") or {} _ot = "" if isinstance(_ta, dict) and (_ta.get("target_title") or "").strip(): _ot = str(_ta.get("target_title")).strip() if not _ot: _ot = (current_title or "").strip() if not _ot: _ot = (target_title or "").strip() diff_body_html = "" diff_changes: List[Dict[str, str]] = [] if (diff_base_text or "").strip() != (current_text or "").strip(): dh, dc = _diff_sentences_html(diff_base_text or "", current_text or "") if dc: diff_body_html = dh diff_changes = dc diff_title_html = "" diff_title_changes: List[Dict[str, str]] = [] if (diff_base_title or "").strip() != (_ot or "").strip(): dth, dtc = _diff_title_html(diff_base_title or "", _ot or "") if dtc: diff_title_html = dth diff_title_changes = dtc return { "ok": True, "optimized_text": current_text, "optimized_title": _ot, "baseline_metrics": baseline_metrics, "final_metrics": current_metrics, "iterations": logs, "applied_changes": applied_changes, "optimization_mode": optimization_mode, "phrase_strategy_mode": phrase_strategy_mode, "bert_stage_target": round(bert_stage_target, 4), "diff_mode": diff_mode_used, "diff_body_html": diff_body_html, "diff_title_html": diff_title_html, "diff_changes": diff_changes, "diff_title_changes": diff_title_changes, "stopped_early": stopped_early, "stop_reason": stop_reason, } _emit("preparing", message="Подготовка", phase="baseline") _emit( "started", total_steps=total_loop_steps, max_iterations_setting=max_iterations, ngram_targets=ngram_row_count, stages_order=list(active_stage_order), ) seen_candidate_rewrites = set() cascade_level = 1 consecutive_failures = 0 goal_attempt_cursor: Dict[str, int] = {} attempted_spans = set() queued_candidates: List[Dict[str, Any]] = [] stage_idx = 0 stage_no_progress_steps = 0 stage_goal_cursor: Dict[str, Dict[str, int]] = {} for step in range(total_loop_steps): if _cancelled(): logs.append( { "step": step + 1, "status": "stopped", "reason": "Остановка пользователем (сохранён текущий текст и метрики).", } ) return _pack_result(stopped_early=True, stop_reason="user_cancelled") while stage_idx < len(active_stage_order) and _is_stage_complete( active_stage_order[stage_idx], current_metrics, bert_stage_target=bert_stage_target ): stage_idx += 1 stage_no_progress_steps = 0 if stage_idx >= len(active_stage_order): logs.append({"step": step + 1, "status": "stopped", "reason": "All optimization stages completed."}) break active_stage = active_stage_order[stage_idx] goals_for_stage = _collect_optimization_goals( current_analysis, current_semantic, keywords, language, stage=active_stage, bert_stage_target=bert_stage_target, stage_goal_overrides=stage_goal_overrides, ) state = stage_goal_cursor.get(active_stage) or {"goal_index": 0, "attempt_count": 0} goal_index = int(state.get("goal_index", 0)) attempt_count = int(state.get("attempt_count", 0)) # Advance across goals that exhausted per-goal iteration budget (scaled by deficit). while goal_index < len(goals_for_stage): g_try = goals_for_stage[goal_index] eff_max_iter, _ = _per_goal_budget(g_try, max_iterations, candidates_per_iteration, bert_stage_target) if attempt_count < eff_max_iter: break goal_index += 1 attempt_count = 0 if goal_index >= len(goals_for_stage): stage_idx += 1 stage_no_progress_steps = 0 logs.append( { "step": step + 1, "status": "stage_skipped", "stage": active_stage, "reason": f"All goals exhausted for stage '{active_stage}' (per-goal iteration budget).", } ) stage_goal_cursor[active_stage] = {"goal_index": goal_index, "attempt_count": attempt_count} continue goal = goals_for_stage[goal_index] eff_max_iter, eff_cand = _per_goal_budget(goal, max_iterations, candidates_per_iteration, bert_stage_target) attempt_count += 1 stage_goal_cursor[active_stage] = {"goal_index": goal_index, "attempt_count": attempt_count} if goal["type"] == "none": stage_idx += 1 stage_no_progress_steps = 0 logs.append( { "step": step + 1, "status": "stage_skipped", "stage": active_stage, "reason": f"No actionable goals for stage '{active_stage}', moving to next stage.", } ) continue _emit( "step_start", step=step + 1, total_steps=total_loop_steps, active_stage=active_stage, goal_type=goal.get("type"), goal_label=goal.get("label"), score=current_metrics.get("score"), goal_budget_iter=eff_max_iter, goal_budget_candidates=eff_cand, ) goal_key = f"{goal.get('type', '')}:{goal.get('label', '')}".strip().lower() base_attempt_cursor = int(goal_attempt_cursor.get(goal_key, 0)) candidates: List[Dict[str, Any]] = [] chosen_spans: List[Dict[str, Any]] = [] candidate_idx = 0 span_trials_eff = 1 if goal.get("type") == "title": if not current_title: logs.append( { "step": step + 1, "status": "stage_skipped", "stage": active_stage, "reason": "Поле Title пустое — этап title пропущен.", } ) stage_idx += 1 continue chosen_spans = [ { "operation": "title_rewrite", "span_start": 0, "span_end": 0, "sentence_index": 0, "span_variant": 0, "sentence_before": current_title, } ] original_span_text = current_title body_excerpt = (current_text or "").strip()[:1200] comp_ctx = " | ".join(t.strip() for t in competitor_titles[:4] if t.strip()) competitor_title_hint = ( f"Примеры title конкурентов (только стиль/длина): {comp_ctx}" if comp_ctx else "" ) strategy_plan = _build_phrase_strategy_plan( phrase_strategy_mode, "title", str(goal.get("label", "")), eff_cand, ) for strategy_variant in strategy_plan: candidate_idx += 1 temp = min(1.1, max(0.0, temperature + (candidate_idx - 1) * 0.07)) if _cancelled(): logs.append( { "step": step + 1, "status": "stopped", "reason": "Остановка пользователем перед запросом к LLM.", } ) return _pack_result(stopped_early=True, stop_reason="user_cancelled") _emit( "llm_call", candidate_index=candidate_idx, span_trial=1, span_trials=1, strategy=strategy_variant, ) try: llm_result = _llm_edit_title( api_key=api_key, base_url=base_url, model=model, language=language, current_title=current_title, body_excerpt=body_excerpt, competitor_title_hint=competitor_title_hint, focus_terms=goal.get("focus_terms", []) or [], avoid_terms=goal.get("avoid_terms", []) or [], keywords=keywords, cascade_level=cascade_level, temperature=temp, phrase_strategy_mode=strategy_variant, ) edited_text = str((llm_result or {}).get("edited_text", "")).strip() llm_rationale = str((llm_result or {}).get("rationale", "")).strip() prompt_debug = (llm_result or {}).get("prompt_debug", {}) if not edited_text or edited_text == original_span_text.strip(): continue quality_issues = _validate_title_candidate(edited_text) cand_analysis = _build_analysis_snapshot( current_text, competitors, keywords, language, edited_text, competitor_titles ) cand_semantic = _build_semantic_snapshot(current_text, competitors, language) cand_metrics = _compute_metrics( cand_analysis, cand_semantic, keywords, language, bert_stage_target=bert_stage_target ) before_rel = float(current_metrics.get("title_bert_score") or 0.0) after_rel = float(cand_metrics.get("title_bert_score") or 0.0) chunk_delta = round(after_rel - before_rel, 4) local_chunk_improved = chunk_delta >= _min_chunk_delta("title") bert_phrase_delta = 0.0 valid, invalid_reasons, goal_improved = _is_candidate_valid( current_metrics, cand_metrics, goal["type"], goal["label"], optimization_mode ) delta_score = round(cand_metrics["score"] - current_metrics["score"], 3) candidate_utility = _candidate_utility( prev_metrics=current_metrics, next_metrics=cand_metrics, goal_type=str(goal.get("type", "")), goal_label=str(goal.get("label", "")), bert_phrase_delta=bert_phrase_delta, chunk_goal_delta=chunk_delta, local_chunk_improved=local_chunk_improved, ) md = _metrics_delta(current_metrics, cand_metrics) if quality_issues: candidates.append( { "candidate_index": candidate_idx, "error": "quality_validation_failed", "valid": False, "goal_improved": False, "local_chunk_improved": local_chunk_improved, "chunk_goal_delta": chunk_delta, "invalid_reasons": quality_issues, "delta_score": -999.0, "candidate_score": None, "sentence_after": edited_text, "chunk_relevance_before": before_rel, "chunk_relevance_after": after_rel, "term_diff": _term_diff(original_span_text, edited_text, language), "llm_prompt_debug": prompt_debug, "llm_rationale": llm_rationale, "operation": "title_rewrite", "sentence_index": 0, "span_start": 0, "span_end": 0, "span_variant": 0, "phrase_strategy_used": strategy_variant, "sentence_before": original_span_text, } ) continue candidate_key = ("title_rewrite", 0, 0, edited_text.strip().lower()) if candidate_key in seen_candidate_rewrites: candidates.append( { "candidate_index": candidate_idx, "error": "duplicate_candidate_rewrite", "valid": False, "goal_improved": False, "local_chunk_improved": local_chunk_improved, "chunk_goal_delta": chunk_delta, "invalid_reasons": ["duplicate_candidate_rewrite"], "delta_score": -999.0, "candidate_score": None, "chunk_relevance_before": before_rel, "chunk_relevance_after": after_rel, "term_diff": _term_diff(original_span_text, edited_text, language), "llm_prompt_debug": prompt_debug, "llm_rationale": llm_rationale, "operation": "title_rewrite", "sentence_index": 0, "span_start": 0, "span_end": 0, "span_variant": 0, "phrase_strategy_used": strategy_variant, "sentence_before": original_span_text, } ) continue seen_candidate_rewrites.add(candidate_key) candidate_text = current_text candidates.append( { "candidate_index": candidate_idx, "sentence_before": original_span_text, "sentence_after": edited_text, "operation": "title_rewrite", "sentence_index": 0, "span_start": 0, "span_end": 0, "span_variant": 0, "phrase_strategy_used": strategy_variant, "text": candidate_text, "new_title": edited_text, "analysis": cand_analysis, "semantic": cand_semantic, "metrics": cand_metrics, "valid": valid, "goal_improved": goal_improved, "bert_phrase_delta": bert_phrase_delta, "local_chunk_improved": local_chunk_improved, "chunk_goal_delta": chunk_delta, "chunk_relevance_before": before_rel, "chunk_relevance_after": after_rel, "term_diff": _term_diff(original_span_text, edited_text, language), "llm_prompt_debug": prompt_debug, "llm_rationale": llm_rationale, "invalid_reasons": invalid_reasons, "delta_score": delta_score, "candidate_score": cand_metrics.get("score"), "candidate_utility": candidate_utility, "metrics_delta": md, "edit_payload": { "operation": "title_rewrite", "span_start": 0, "span_end": 0, "edited_text": edited_text, }, } ) except Exception as e: candidates.append( { "candidate_index": candidate_idx, "error": str(e), "valid": False, "goal_improved": False, "local_chunk_improved": False, "chunk_goal_delta": -999.0, "invalid_reasons": [str(e)], "delta_score": -999.0, "candidate_score": None, "llm_prompt_debug": { "operation": "title_rewrite", "cascade_level": cascade_level, "goal_type": goal.get("type"), "goal_label": goal.get("label"), "phrase_strategy_mode": strategy_variant, }, "llm_rationale": "", "operation": "title_rewrite", "sentence_index": 0, "span_start": 0, "span_end": 0, "span_variant": 0, "phrase_strategy_used": strategy_variant, "sentence_before": current_title, } ) else: sentences = _split_sentences(current_text) if not sentences: logs.append({"step": step + 1, "status": "stopped", "reason": "No sentences available for editing."}) break span_trials = 2 if cascade_level <= 2 else 3 local_candidates = eff_cand if cascade_level <= 2 else min(6, eff_cand + 1) span_trials_eff = span_trials for st in range(span_trials): attempt_cursor = base_attempt_cursor + st operation, span_start, span_end, sent_idx, span_variant = _choose_edit_span( sentences, goal, language, cascade_level, attempt_cursor ) max_span_retries = max(1, len(sentences) * 4) retries = 0 while retries < max_span_retries: span_key = (goal_key, cascade_level, operation, span_start, span_end) if span_key not in attempted_spans: attempted_spans.add(span_key) break attempt_cursor += 1 operation, span_start, span_end, sent_idx, span_variant = _choose_edit_span( sentences, goal, language, cascade_level, attempt_cursor ) retries += 1 original_span_text = " ".join(sentences[span_start : span_end + 1]).strip() context_before = " ".join(sentences[max(0, span_start - 2) : span_start]).strip() context_after = " ".join(sentences[span_end + 1 : min(len(sentences), span_end + 3)]).strip() chosen_spans.append( { "operation": operation, "span_start": span_start, "span_end": span_end, "sentence_index": sent_idx, "span_variant": span_variant, "sentence_before": original_span_text, } ) per_span_candidates = max(1, local_candidates // span_trials) strategy_plan = _build_phrase_strategy_plan( phrase_strategy_mode, str(goal.get("type", "")), str(goal.get("label", "")), per_span_candidates, ) for strategy_variant in strategy_plan: candidate_idx += 1 temp = min(1.1, max(0.0, temperature + (candidate_idx - 1) * 0.07)) if _cancelled(): logs.append( { "step": step + 1, "status": "stopped", "reason": "Остановка пользователем перед запросом к LLM.", } ) return _pack_result(stopped_early=True, stop_reason="user_cancelled") _emit( "llm_call", candidate_index=candidate_idx, span_trial=st + 1, span_trials=span_trials, strategy=strategy_variant, ) try: llm_result = _llm_edit_chunk( api_key=api_key, base_url=base_url, model=model, language=language, full_text=current_text, chunk_text=original_span_text, operation=operation, context_before=context_before, context_after=context_after, cascade_level=cascade_level, goal_type=goal["type"], goal_label=goal["label"], focus_terms=goal["focus_terms"], avoid_terms=goal["avoid_terms"], temperature=temp, phrase_strategy_mode=strategy_variant, ) edited_text = str((llm_result or {}).get("edited_text", "")).strip() llm_rationale = str((llm_result or {}).get("rationale", "")).strip() prompt_debug = (llm_result or {}).get("prompt_debug", {}) if not edited_text or edited_text == original_span_text: continue quality_issues = _validate_candidate_text( edited_text, cascade_level, operation, goal_label=goal.get("label", ""), focus_terms=goal.get("focus_terms", []) or [], ) before_rel, after_rel = _chunk_relevance_pair( original_span_text, edited_text, goal["type"], goal["label"], goal.get("focus_terms", []) or [], language, goal, ) chunk_delta = _chunk_goal_delta( original_span_text, edited_text, goal["type"], goal["label"], goal.get("focus_terms", []) or [], language, goal, ) local_chunk_improved = chunk_delta >= _min_chunk_delta(goal["type"]) if quality_issues: candidates.append( { "candidate_index": candidate_idx, "error": "quality_validation_failed", "valid": False, "goal_improved": False, "local_chunk_improved": local_chunk_improved, "chunk_goal_delta": chunk_delta, "invalid_reasons": quality_issues, "delta_score": -999.0, "candidate_score": None, "sentence_after": edited_text, "chunk_relevance_before": before_rel, "chunk_relevance_after": after_rel, "term_diff": _term_diff(original_span_text, edited_text, language), "llm_prompt_debug": prompt_debug, "llm_rationale": llm_rationale, "operation": operation, "sentence_index": sent_idx, "span_start": span_start, "span_end": span_end, "span_variant": span_variant, "phrase_strategy_used": strategy_variant, "sentence_before": original_span_text, } ) continue candidate_key = (operation, span_start, span_end, edited_text.strip().lower()) if candidate_key in seen_candidate_rewrites: candidates.append( { "candidate_index": candidate_idx, "error": "duplicate_candidate_rewrite", "valid": False, "goal_improved": False, "local_chunk_improved": local_chunk_improved, "chunk_goal_delta": chunk_delta, "invalid_reasons": ["duplicate_candidate_rewrite"], "delta_score": -999.0, "candidate_score": None, "chunk_relevance_before": before_rel, "chunk_relevance_after": after_rel, "term_diff": _term_diff(original_span_text, edited_text, language), "llm_prompt_debug": prompt_debug, "llm_rationale": llm_rationale, "operation": operation, "sentence_index": sent_idx, "span_start": span_start, "span_end": span_end, "span_variant": span_variant, "phrase_strategy_used": strategy_variant, "sentence_before": original_span_text, } ) continue seen_candidate_rewrites.add(candidate_key) if operation == "insert": candidate_sentences = _insert_after(sentences, span_end, edited_text) else: candidate_sentences = _replace_span(sentences, span_start, span_end, edited_text) candidate_text = " ".join(candidate_sentences).strip() cand_analysis = _build_analysis_snapshot( candidate_text, competitors, keywords, language, current_title, competitor_titles ) cand_semantic = _build_semantic_snapshot(candidate_text, competitors, language) cand_metrics = _compute_metrics( cand_analysis, cand_semantic, keywords, language, bert_stage_target=bert_stage_target ) valid, invalid_reasons, goal_improved = _is_candidate_valid( current_metrics, cand_metrics, goal["type"], goal["label"], optimization_mode ) delta_score = round(cand_metrics["score"] - current_metrics["score"], 3) bert_phrase_delta = _bert_phrase_delta(goal["label"], current_metrics, cand_metrics) if goal.get("type") == "bert" else 0.0 candidate_utility = _candidate_utility( prev_metrics=current_metrics, next_metrics=cand_metrics, goal_type=str(goal.get("type", "")), goal_label=str(goal.get("label", "")), bert_phrase_delta=bert_phrase_delta, chunk_goal_delta=chunk_delta, local_chunk_improved=local_chunk_improved, ) md = _metrics_delta(current_metrics, cand_metrics) candidates.append( { "candidate_index": candidate_idx, "sentence_before": original_span_text, "sentence_after": edited_text, "operation": operation, "sentence_index": sent_idx, "span_start": span_start, "span_end": span_end, "span_variant": span_variant, "phrase_strategy_used": strategy_variant, "text": candidate_text, "analysis": cand_analysis, "semantic": cand_semantic, "metrics": cand_metrics, "valid": valid, "goal_improved": goal_improved, "bert_phrase_delta": bert_phrase_delta, "local_chunk_improved": local_chunk_improved, "chunk_goal_delta": chunk_delta, "chunk_relevance_before": before_rel, "chunk_relevance_after": after_rel, "term_diff": _term_diff(original_span_text, edited_text, language), "llm_prompt_debug": prompt_debug, "llm_rationale": llm_rationale, "invalid_reasons": invalid_reasons, "delta_score": delta_score, "candidate_score": cand_metrics.get("score"), "candidate_utility": candidate_utility, "metrics_delta": md, "edit_payload": { "operation": operation, "span_start": span_start, "span_end": span_end, "edited_text": edited_text, }, } ) except Exception as e: candidates.append( { "candidate_index": candidate_idx, "error": str(e), "valid": False, "goal_improved": False, "local_chunk_improved": False, "chunk_goal_delta": -999.0, "invalid_reasons": [str(e)], "delta_score": -999.0, "candidate_score": None, "llm_prompt_debug": { "operation": operation, "cascade_level": cascade_level, "goal_type": goal.get("type"), "goal_label": goal.get("label"), "phrase_strategy_mode": strategy_variant, }, "llm_rationale": "", "operation": operation, "sentence_index": sent_idx, "span_start": span_start, "span_end": span_end, "span_variant": span_variant, "phrase_strategy_used": strategy_variant, "sentence_before": original_span_text, } ) goal_attempt_cursor[goal_key] = base_attempt_cursor + span_trials_eff primary_span = chosen_spans[0] if chosen_spans else {"operation": "-", "span_start": 0, "span_end": 0, "sentence_index": 0, "span_variant": 0, "sentence_before": ""} valid_raw_candidates = [c for c in candidates if c.get("valid")] valid_candidates = [ c for c in valid_raw_candidates if c.get("valid") and ( c.get("goal_improved") or (goal.get("type") == "bert" and float(c.get("bert_phrase_delta") or 0.0) > 0.0) or float(c.get("candidate_score") or -1) > float(current_metrics["score"]) or float(c.get("candidate_utility") or -999.0) > 0.0 ) and ( goal.get("type") != "bert" or float(c.get("bert_phrase_delta") or 0.0) > 0.0 or c.get("local_chunk_improved") ) ] if not valid_candidates: # Local-first accumulation mode: # if we have guardrail-valid candidates that improve chunk relevance, # apply the strongest local edit immediately and continue optimizing next chunks. local_progress_candidates = [ c for c in valid_raw_candidates if c.get("local_chunk_improved") ] if local_progress_candidates: best_local = sorted( local_progress_candidates, key=lambda c: ( float(c.get("candidate_utility") or -999.0), float(c.get("chunk_goal_delta") or 0.0), float(c.get("bert_phrase_delta") or 0.0), float(c.get("candidate_score") or -999.0), ), reverse=True, )[0] prev_metrics = current_metrics current_text = best_local["text"] if str(best_local.get("operation") or "") == "title_rewrite": nt = (best_local.get("new_title") or best_local.get("sentence_after") or "").strip() if nt: current_title = nt current_analysis = best_local["analysis"] current_semantic = best_local["semantic"] current_metrics = best_local["metrics"] progressed_stage = _stage_primary_progress(active_stage, prev_metrics, current_metrics) if progressed_stage: stage_no_progress_steps = 0 else: stage_no_progress_steps += 1 applied_changes += 1 queued_candidates = [] logs.append( { "step": step + 1, "status": "applied_local_progress", "stage": active_stage, "goal": goal, "cascade_level": cascade_level, "operation": best_local.get("operation"), "sentence_index": best_local.get("sentence_index"), "span_start": best_local.get("span_start"), "span_end": best_local.get("span_end"), "span_variant": best_local.get("span_variant"), "sentence_before": best_local.get("sentence_before"), "sentence_after": best_local.get("sentence_after"), "reason": "Applied best local-improvement candidate despite no immediate global gain.", "current_score": prev_metrics.get("score"), "metrics_before": prev_metrics, "metrics_after": current_metrics, "delta_score": round(float(current_metrics.get("score", 0)) - float(prev_metrics.get("score", 0)), 3), "chosen_candidate_index": best_local.get("candidate_index"), "chosen_chunk_goal_delta": best_local.get("chunk_goal_delta"), "chosen_bert_phrase_delta": best_local.get("bert_phrase_delta"), "chosen_candidate_utility": best_local.get("candidate_utility"), "chosen_metrics_delta": best_local.get("metrics_delta"), "candidates": [ { "candidate_index": c.get("candidate_index"), "valid": c.get("valid", False), "goal_improved": c.get("goal_improved", False), "bert_phrase_delta": c.get("bert_phrase_delta"), "local_chunk_improved": c.get("local_chunk_improved", False), "chunk_goal_delta": c.get("chunk_goal_delta"), "chunk_relevance_before": c.get("chunk_relevance_before"), "chunk_relevance_after": c.get("chunk_relevance_after"), "term_diff": c.get("term_diff"), "llm_prompt_debug": c.get("llm_prompt_debug"), "llm_rationale": c.get("llm_rationale"), "metrics_delta": c.get("metrics_delta"), "candidate_utility": c.get("candidate_utility"), "invalid_reasons": c.get("invalid_reasons", []), "delta_score": c.get("delta_score"), "candidate_score": c.get("candidate_score"), "sentence_after": c.get("sentence_after"), "error": c.get("error"), } for c in candidates ], } ) consecutive_failures = 0 cascade_level = 1 goal_attempt_cursor[goal_key] = base_attempt_cursor + 1 continue local_pool = [ c for c in candidates if c.get("local_chunk_improved") and c.get("edit_payload") and c.get("candidate_score") is not None and goal.get("type") != "title" ] local_pool.sort( key=lambda c: ( float(c.get("candidate_utility") or -999.0), float(c.get("chunk_goal_delta") or -999.0), float(c.get("candidate_score") or -999.0), ), reverse=True, ) for c in local_pool[:4]: queue_key = ( goal_key, c.get("operation"), c.get("span_start"), c.get("span_end"), str((c.get("sentence_after") or "")).strip().lower(), ) if not any(x.get("queue_key") == queue_key for x in queued_candidates): queued_candidates.append({"queue_key": queue_key, "candidate": c}) batch_applied = False batch_info: Dict[str, Any] = {} if len(queued_candidates) >= 2 and goal.get("type") != "title": pool = [x["candidate"] for x in queued_candidates[:6]] combos = _non_conflicting_edit_combos(pool, min_size=2, max_size=4) best_batch: Optional[Dict[str, Any]] = None prev_metrics = current_metrics for combo in combos: edits = [c.get("edit_payload") for c in combo if c.get("edit_payload")] if len(edits) != len(combo): continue batch_sentences = _apply_edits_to_sentences(sentences, edits) batch_text = " ".join(batch_sentences).strip() batch_analysis = _build_analysis_snapshot( batch_text, competitors, keywords, language, current_title, competitor_titles ) batch_semantic = _build_semantic_snapshot(batch_text, competitors, language) batch_metrics = _compute_metrics( batch_analysis, batch_semantic, keywords, language, bert_stage_target=bert_stage_target ) b_valid, b_reasons, b_goal = _is_candidate_valid( current_metrics, batch_metrics, goal["type"], goal["label"], optimization_mode ) b_delta = round(batch_metrics["score"] - current_metrics["score"], 3) local_sum = sum(float(c.get("chunk_goal_delta") or 0.0) for c in combo) if not (b_valid and (b_goal or b_delta > 0)): continue if goal.get("type") == "bert" and local_sum < (_min_chunk_delta("bert") * len(combo)): continue cand = { "combo": combo, "batch_text": batch_text, "batch_analysis": batch_analysis, "batch_semantic": batch_semantic, "batch_metrics": batch_metrics, "b_delta": b_delta, "local_sum": local_sum, "b_reasons": b_reasons, "b_goal": b_goal, } if best_batch is None or ( cand["b_delta"], cand["local_sum"], len(cand["combo"]), ) > ( best_batch["b_delta"], best_batch["local_sum"], len(best_batch["combo"]), ): best_batch = cand if best_batch: current_text = best_batch["batch_text"] current_analysis = best_batch["batch_analysis"] current_semantic = best_batch["batch_semantic"] current_metrics = best_batch["batch_metrics"] progressed_stage = _stage_primary_progress(active_stage, prev_metrics, current_metrics) if progressed_stage: stage_no_progress_steps = 0 else: stage_no_progress_steps += 1 applied_changes += 1 batch_applied = True batch_info = { "status": "applied_batch", "batch_candidate_ids": [c.get("candidate_index") for c in best_batch["combo"]], "batch_size": len(best_batch["combo"]), "batch_local_chunk_delta_sum": round(best_batch["local_sum"], 4), "delta_score": best_batch["b_delta"], "metrics_before": prev_metrics, "metrics_after": best_batch["batch_metrics"], "metrics_delta": _metrics_delta(prev_metrics, best_batch["batch_metrics"]), } queued_candidates = [] consecutive_failures = 0 cascade_level = 1 goal_attempt_cursor[goal_key] = 0 if not batch_applied: batch_info = { "status": "batch_rejected", "reason": "Queued local improvements could not pass global constraints together.", } if batch_applied: logs.append( { "step": step + 1, "status": "applied_batch", "stage": active_stage, "goal": goal, "cascade_level": cascade_level, "operation": "batch", "sentence_index": primary_span.get("sentence_index"), "span_start": primary_span.get("span_start"), "span_end": primary_span.get("span_end"), "span_variant": primary_span.get("span_variant"), "sentence_before": primary_span.get("sentence_before"), "current_score": (batch_info.get("metrics_before") or {}).get("score"), "metrics_before": batch_info.get("metrics_before"), "metrics_after": current_metrics, "reason": "Applied queued local-improvement edits as a batch.", "batch_info": batch_info, "candidates": [ { "candidate_index": c.get("candidate_index"), "valid": c.get("valid", False), "goal_improved": c.get("goal_improved", False), "bert_phrase_delta": c.get("bert_phrase_delta"), "local_chunk_improved": c.get("local_chunk_improved", False), "chunk_goal_delta": c.get("chunk_goal_delta"), "chunk_relevance_before": c.get("chunk_relevance_before"), "chunk_relevance_after": c.get("chunk_relevance_after"), "term_diff": c.get("term_diff"), "llm_prompt_debug": c.get("llm_prompt_debug"), "llm_rationale": c.get("llm_rationale"), "metrics_delta": c.get("metrics_delta"), "candidate_utility": c.get("candidate_utility"), "invalid_reasons": c.get("invalid_reasons", []), "delta_score": c.get("delta_score"), "candidate_score": c.get("candidate_score"), "sentence_after": c.get("sentence_after"), "error": c.get("error"), } for c in candidates ], } ) continue logs.append( { "step": step + 1, "status": "rejected", "stage": active_stage, "goal": goal, "cascade_level": cascade_level, "operation": primary_span.get("operation"), "sentence_index": primary_span.get("sentence_index"), "span_start": primary_span.get("span_start"), "span_end": primary_span.get("span_end"), "span_variant": primary_span.get("span_variant"), "sentence_before": primary_span.get("sentence_before"), "current_score": current_metrics["score"], "reason": ( "No valid candidate satisfied constraints." if not valid_raw_candidates else "Valid candidates existed but none improved goal or total score." ), "valid_candidates_count": len(valid_raw_candidates), "promotable_candidates_count": len(valid_candidates), "queued_local_candidates": len(local_pool), "queued_total": len(queued_candidates), "batch_info": batch_info if batch_info else None, "candidates": [ { "candidate_index": c.get("candidate_index"), "valid": c.get("valid", False), "goal_improved": c.get("goal_improved", False), "bert_phrase_delta": c.get("bert_phrase_delta"), "local_chunk_improved": c.get("local_chunk_improved", False), "chunk_goal_delta": c.get("chunk_goal_delta"), "chunk_relevance_before": c.get("chunk_relevance_before"), "chunk_relevance_after": c.get("chunk_relevance_after"), "term_diff": c.get("term_diff"), "llm_prompt_debug": c.get("llm_prompt_debug"), "llm_rationale": c.get("llm_rationale"), "metrics_delta": c.get("metrics_delta"), "candidate_utility": c.get("candidate_utility"), "invalid_reasons": c.get("invalid_reasons", []), "delta_score": c.get("delta_score"), "candidate_score": c.get("candidate_score"), "sentence_after": c.get("sentence_after"), "error": c.get("error"), } for c in candidates ], } ) stage_no_progress_steps += 1 # Stage transition is controlled by per-stage iteration budget and completion checks. consecutive_failures += 1 if consecutive_failures >= 2 and cascade_level < 4: cascade_level += 1 consecutive_failures = 0 logs[-1]["escalated_to_level"] = cascade_level continue best = sorted( valid_candidates, key=lambda c: ( 1 if c.get("goal_improved") else 0, float(c.get("candidate_utility") or -999.0), float(c.get("bert_phrase_delta") or 0.0), float(c.get("chunk_goal_delta") or 0.0), c["metrics"]["score"], ), reverse=True, )[0] prev_metrics = current_metrics current_text = best["text"] if str(best.get("operation") or "") == "title_rewrite": nt = (best.get("new_title") or best.get("sentence_after") or "").strip() if nt: current_title = nt current_analysis = best["analysis"] current_semantic = best["semantic"] current_metrics = best["metrics"] progressed_stage = _stage_primary_progress(active_stage, prev_metrics, current_metrics) if progressed_stage: stage_no_progress_steps = 0 else: stage_no_progress_steps += 1 applied_changes += 1 queued_candidates = [] logs.append( { "step": step + 1, "status": "applied", "stage": active_stage, "goal": goal, "cascade_level": cascade_level, "operation": best.get("operation"), "sentence_index": best.get("sentence_index"), "span_start": best.get("span_start"), "span_end": best.get("span_end"), "span_variant": best.get("span_variant"), "sentence_before": best.get("sentence_before"), "sentence_after": best["sentence_after"], "current_score": prev_metrics["score"], "metrics_before": prev_metrics, "metrics_after": current_metrics, "delta_score": round(current_metrics["score"] - prev_metrics["score"], 3), "chosen_candidate_index": best.get("candidate_index"), "chosen_candidate_utility": best.get("candidate_utility"), "candidates": [ { "candidate_index": c.get("candidate_index"), "valid": c.get("valid", False), "goal_improved": c.get("goal_improved", False), "bert_phrase_delta": c.get("bert_phrase_delta"), "local_chunk_improved": c.get("local_chunk_improved", False), "chunk_goal_delta": c.get("chunk_goal_delta"), "chunk_relevance_before": c.get("chunk_relevance_before"), "chunk_relevance_after": c.get("chunk_relevance_after"), "term_diff": c.get("term_diff"), "llm_prompt_debug": c.get("llm_prompt_debug"), "llm_rationale": c.get("llm_rationale"), "metrics_delta": c.get("metrics_delta"), "candidate_utility": c.get("candidate_utility"), "invalid_reasons": c.get("invalid_reasons", []), "delta_score": c.get("delta_score"), "candidate_score": c.get("candidate_score"), "sentence_after": c.get("sentence_after"), "error": c.get("error"), } for c in candidates ], } ) # After successful edit, return to cheapest level and reset failure streak. consecutive_failures = 0 cascade_level = 1 goal_attempt_cursor[goal_key] = 0 return _pack_result()