Spaces:
Sleeping
Sleeping
| import json | |
| import difflib | |
| import html as html_lib | |
| import re | |
| from itertools import combinations | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import requests | |
| import logic | |
| import nlp_processor | |
| import semantic_graph | |
| STOP_WORDS = { | |
| "en": {"a", "an", "and", "or", "the", "to", "of", "for", "in", "on", "at", "by", "with", "from", "as", "is", "are", "be", "was", "were"}, | |
| "ru": {"и", "или", "в", "во", "на", "по", "с", "со", "к", "ко", "для", "из", "за", "что", "это", "как", "а", "но", "у", "о", "от"}, | |
| "de": {"und", "oder", "der", "die", "das", "zu", "von", "mit", "fur", "in", "auf", "ist", "sind"}, | |
| "es": {"y", "o", "el", "la", "los", "las", "de", "del", "en", "con", "para", "por", "es", "son"}, | |
| "it": {"e", "o", "il", "lo", "la", "i", "gli", "le", "di", "del", "in", "con", "per", "da", "e", "sono"}, | |
| "pl": {"i", "oraz", "lub", "w", "na", "z", "ze", "do", "od", "po", "dla", "to", "jest", "sa"}, | |
| "pt": {"e", "ou", "o", "a", "os", "as", "de", "do", "da", "em", "no", "na", "com", "para", "por", "e", "sao"}, | |
| } | |
| BERT_TARGET_THRESHOLD = 0.7 | |
| BERT_GOAL_DELTA_MIN = 0.005 | |
| TITLE_TARGET_THRESHOLD = 0.65 | |
| SEMANTIC_GAP_TOLERANCE_PCT = 0.15 | |
| SEMANTIC_GAP_MIN_ABS = 3.0 | |
| STAGE_ORDER = ["bert", "bm25", "ngram", "semantic", "title"] | |
| NGRAM_ATTEMPTS_PER_TERM = 3 | |
| def _normalize_stage_name(v: Any) -> str: | |
| s = str(v or "").strip().lower() | |
| return s if s in STAGE_ORDER else "" | |
| def _goal_label_canonical(goal: Dict[str, Any]) -> str: | |
| t = str(goal.get("type", "") or "").strip().lower() | |
| if t == "bm25": | |
| term = str(goal.get("bm25_word", "") or "").strip().lower() | |
| if term: | |
| return term | |
| label = str(goal.get("label", "") or "").strip().lower() | |
| if label.startswith("reduce spam:"): | |
| return label.replace("reduce spam:", "", 1).strip() | |
| return label | |
| return str(goal.get("label", "") or "").strip().lower() | |
| def _build_custom_goal(stage: str, value: str, language: str) -> Optional[Dict[str, Any]]: | |
| raw = str(value or "").strip() | |
| if not raw: | |
| return None | |
| if stage == "bert": | |
| return { | |
| "type": "bert", | |
| "label": raw, | |
| "focus_terms": _filter_stopwords(_tokenize(raw), language)[:6], | |
| "avoid_terms": [], | |
| "bert_phrase_score": 0.0, | |
| "bert_target": float(BERT_TARGET_THRESHOLD), | |
| } | |
| if stage == "bm25": | |
| return { | |
| "type": "bm25", | |
| "label": f"reduce spam: {raw}", | |
| "focus_terms": [], | |
| "avoid_terms": [raw], | |
| "bm25_count": 2, | |
| "bm25_word": raw, | |
| } | |
| if stage == "ngram": | |
| return { | |
| "type": "ngram", | |
| "label": raw, | |
| "focus_terms": [raw], | |
| "avoid_terms": [], | |
| "ngram_target_count": 0.0, | |
| "ngram_comp_avg": 1.0, | |
| "ngram_tolerance_pct": 0.5, | |
| "ngram_lower_bound": 0.5, | |
| "ngram_upper_bound": 1.5, | |
| "ngram_direction": "increase", | |
| "ngram_rank_index": 0, | |
| "ngram_candidates_total": 1, | |
| } | |
| if stage == "semantic": | |
| return { | |
| "type": "semantic", | |
| "label": raw, | |
| "focus_terms": [raw], | |
| "avoid_terms": [], | |
| "semantic_gap": float(SEMANTIC_GAP_MIN_ABS), | |
| } | |
| if stage == "title": | |
| return { | |
| "type": "title", | |
| "label": "title alignment", | |
| "focus_terms": _filter_stopwords(_tokenize(raw), language)[:8], | |
| "avoid_terms": [], | |
| "title_bert_score": 0.0, | |
| "title_target": float(TITLE_TARGET_THRESHOLD), | |
| } | |
| return None | |
| def _apply_stage_goal_override( | |
| goals: List[Dict[str, Any]], | |
| stage: str, | |
| language: str, | |
| stage_goal_overrides: Optional[Dict[str, Any]], | |
| ) -> List[Dict[str, Any]]: | |
| ov_all = stage_goal_overrides or {} | |
| ov = ov_all.get(stage) if isinstance(ov_all, dict) else None | |
| if not isinstance(ov, dict): | |
| return goals | |
| mode = str(ov.get("mode", "auto") or "auto").strip().lower() | |
| if mode not in {"auto", "manual", "mixed"}: | |
| mode = "auto" | |
| selected_raw = ov.get("selected") or [] | |
| custom_raw = ov.get("custom_add") or [] | |
| selected_set = { | |
| str(x or "").strip().lower() | |
| for x in selected_raw | |
| if str(x or "").strip() | |
| } | |
| custom_goals: List[Dict[str, Any]] = [] | |
| for item in custom_raw: | |
| g = _build_custom_goal(stage, str(item or ""), language) | |
| if g: | |
| custom_goals.append(g) | |
| if mode == "auto": | |
| out = list(goals) | |
| else: | |
| out = [] | |
| for g in goals: | |
| if _goal_label_canonical(g) in selected_set: | |
| out.append(g) | |
| if mode in {"manual", "mixed"}: | |
| out.extend(custom_goals) | |
| # Deduplicate by canonical goal label to keep deterministic cursor behavior. | |
| dedup: Dict[str, Dict[str, Any]] = {} | |
| for g in out: | |
| key = _goal_label_canonical(g) | |
| if key and key not in dedup: | |
| dedup[key] = g | |
| return list(dedup.values()) | |
| def _tokenize(text: str) -> List[str]: | |
| return [ | |
| x | |
| for x in re.sub(r"[^\w\s-]+", " ", (text or "").lower(), flags=re.UNICODE).split() | |
| if len(x) >= 2 | |
| ] | |
| def _tokenize_ngrams_strict(text: str) -> List[str]: | |
| """ | |
| Tokenizer for strict n-gram matching. | |
| В отличие от _tokenize() здесь НЕ отбрасываем короткие токены (например, "a"), | |
| чтобы совпадать с теми же n-граммами, которые генерирует логика BM25. | |
| """ | |
| return [ | |
| x | |
| for x in re.sub(r"[^\w\s-]+", " ", (text or "").lower(), flags=re.UNICODE).split() | |
| if x | |
| ] | |
| def _count_term_ngrams_strict(text: str, term: str) -> int: | |
| """ | |
| Strict count of exact token n-gram occurrences (overlapping allowed). | |
| Считает вхождения term как последовательности токенов. | |
| """ | |
| text_tokens = _tokenize_ngrams_strict(text) | |
| term_tokens = _tokenize_ngrams_strict(term) | |
| m = len(term_tokens) | |
| if m == 0 or not text_tokens or len(text_tokens) < m: | |
| return 0 | |
| cnt = 0 | |
| # Sliding window over token sequence | |
| for i in range(0, len(text_tokens) - m + 1): | |
| if text_tokens[i : i + m] == term_tokens: | |
| cnt += 1 | |
| return cnt | |
| def _filter_stopwords(tokens: List[str], language: str) -> List[str]: | |
| stop = STOP_WORDS.get(language, STOP_WORDS["en"]) | |
| return [t for t in tokens if t not in stop] | |
| def _split_sentences(text: str) -> List[str]: | |
| text = (text or "").strip() | |
| if not text: | |
| return [] | |
| parts = re.split(r"(?<=[\.\!\?])\s+", text) | |
| parts = [p.strip() for p in parts if p.strip()] | |
| if len(parts) <= 1: | |
| parts = [p.strip() for p in re.split(r"\n+", text) if p.strip()] | |
| return parts | |
| def _escape_html(v: Any) -> str: | |
| return html_lib.escape(str(v or ""), quote=True) | |
| def _diff_sentences_html(before_text: str, after_text: str) -> Tuple[str, List[Dict[str, str]]]: | |
| """ | |
| sentence-level diff for UI highlighting. | |
| Возвращает: | |
| - html для отображения ТОЛЬКО after_text (optimized), | |
| - список блоков что было/что стало (для "что именно менять"). | |
| """ | |
| before_sents = _split_sentences(before_text) | |
| after_sents = _split_sentences(after_text) | |
| matcher = difflib.SequenceMatcher(None, before_sents, after_sents, autojunk=False) | |
| parts: List[str] = [] | |
| changes: List[Dict[str, str]] = [] | |
| for tag, i1, i2, j1, j2 in matcher.get_opcodes(): | |
| if tag == "equal": | |
| for j in range(j1, j2): | |
| parts.append(_escape_html(after_sents[j])) | |
| elif tag == "replace": | |
| from_txt = " ".join(before_sents[i1:i2]).strip() | |
| to_txt = " ".join(after_sents[j1:j2]).strip() | |
| changes.append({"type": "replace", "from": from_txt, "to": to_txt}) | |
| for j in range(j1, j2): | |
| parts.append( | |
| f'<mark class="diff-changed" data-diff-kind="replace">{_escape_html(after_sents[j])}</mark>' | |
| ) | |
| elif tag == "insert": | |
| to_txt = " ".join(after_sents[j1:j2]).strip() | |
| changes.append({"type": "insert", "from": "", "to": to_txt}) | |
| for j in range(j1, j2): | |
| parts.append( | |
| f'<mark class="diff-changed" data-diff-kind="insert">{_escape_html(after_sents[j])}</mark>' | |
| ) | |
| elif tag == "delete": | |
| from_txt = " ".join(before_sents[i1:i2]).strip() | |
| changes.append({"type": "delete", "from": from_txt, "to": ""}) | |
| else: | |
| # Defensive: should not happen. | |
| for j in range(j1, j2): | |
| parts.append(_escape_html(after_sents[j])) | |
| diff_html = " ".join(parts).strip() | |
| return diff_html, changes | |
| def _diff_title_html(before_title: str, after_title: str) -> Tuple[str, List[Dict[str, str]]]: | |
| before_t = (before_title or "").strip() | |
| after_t = (after_title or "").strip() | |
| if before_t == after_t: | |
| return "", [] | |
| if not after_t: | |
| # Title removed: show nothing in UI, but keep "from/to" for debug. | |
| return "", [{"type": "delete", "from": before_t, "to": ""}] | |
| return ( | |
| f'<mark class="diff-changed" data-diff-kind="replace">{_escape_html(after_t)}</mark>', | |
| [{"type": "replace", "from": before_t, "to": after_t}], | |
| ) | |
| def _max_sentences_for_level(cascade_level: int, operation: str) -> int: | |
| if operation == "insert": | |
| return 2 | |
| if cascade_level <= 1: | |
| return 2 | |
| if cascade_level == 2: | |
| return 3 | |
| return 4 | |
| def _phrase_strategy_variants(mode: str, goal_type: str, goal_label: str) -> List[str]: | |
| normalized = (mode or "auto").strip().lower() | |
| if normalized == "ensemble": | |
| if goal_type != "bert": | |
| return ["auto"] | |
| phrase_len = len(_tokenize(goal_label or "")) | |
| if phrase_len >= 3: | |
| return ["distributed_preferred", "auto", "exact_preferred"] | |
| return ["auto", "exact_preferred", "distributed_preferred"] | |
| if normalized in {"auto", "exact_preferred", "distributed_preferred"}: | |
| return [normalized] | |
| return ["auto"] | |
| def _build_phrase_strategy_plan(mode: str, goal_type: str, goal_label: str, count: int) -> List[str]: | |
| variants = _phrase_strategy_variants(mode, goal_type, goal_label) | |
| if count <= 0: | |
| return [] | |
| return [variants[i % len(variants)] for i in range(count)] | |
| def _validate_title_candidate(edited_text: str) -> List[str]: | |
| """Guardrails for HTML <title> plain text (not body sentences).""" | |
| reasons: List[str] = [] | |
| text = (edited_text or "").strip() | |
| if not text: | |
| reasons.append("empty_title") | |
| return reasons | |
| if "\n" in text or "\r" in text: | |
| reasons.append("title_multiline") | |
| if len(text) > 90: | |
| reasons.append("title_too_long>90") | |
| if len(text) < 8: | |
| reasons.append("title_too_short") | |
| if re.search(r"<[^>]+>", text): | |
| reasons.append("title_contains_html_tags") | |
| return reasons | |
| def _validate_candidate_text( | |
| edited_text: str, | |
| cascade_level: int, | |
| operation: str, | |
| goal_label: str = "", | |
| focus_terms: Optional[List[str]] = None, | |
| ) -> List[str]: | |
| reasons: List[str] = [] | |
| text = (edited_text or "").strip() | |
| if not text: | |
| reasons.append("empty_candidate") | |
| return reasons | |
| sentence_count = len(_split_sentences(text)) | |
| max_sent = _max_sentences_for_level(cascade_level, operation) | |
| if sentence_count > max_sent: | |
| reasons.append(f"too_many_sentences>{max_sent}") | |
| # Heuristic quality checks: duplicated words/entities and obvious malformed token joins. | |
| if re.search(r"\b([A-Za-z][A-Za-z0-9-]{1,})\s+\1\b", text, flags=re.IGNORECASE): | |
| reasons.append("duplicated_entity_or_word") | |
| # Catch broken lowercase+Camel join artifacts like "likemBit", but allow brand CamelCase like "RedDogCasino". | |
| if re.search(r"\b[a-z]{6,}[A-Z][a-z]+\b", text): | |
| reasons.append("suspicious_token_join") | |
| # Anti-stuffing checks for BERT phrase goals. | |
| focus_terms = focus_terms or [] | |
| phrase = (goal_label or "").strip().lower() | |
| normalized = re.sub(r"\s+", " ", text.lower()) | |
| if phrase: | |
| phrase_occurrences = normalized.count(phrase) | |
| phrase_token_count = len(_tokenize(phrase)) | |
| # For long goal phrases, repeated exact matches are usually unnatural. | |
| if phrase_token_count >= 3 and phrase_occurrences > 1: | |
| reasons.append("exact_phrase_stuffing") | |
| elif phrase_occurrences > 2: | |
| reasons.append("exact_phrase_stuffing") | |
| for term in focus_terms: | |
| tok = (term or "").strip().lower() | |
| if not tok: | |
| continue | |
| term_occurrences = len(re.findall(rf"\b{re.escape(tok)}\b", normalized)) | |
| if term_occurrences > 3: | |
| reasons.append("focus_term_overuse") | |
| break | |
| return reasons | |
| def _build_analysis_snapshot( | |
| target_text: str, | |
| competitors: List[str], | |
| keywords: List[str], | |
| language: str, | |
| target_title: str, | |
| competitor_titles: List[str], | |
| ) -> Dict[str, Any]: | |
| wc_target = logic.count_words(target_text, language) | |
| wc_comp = [logic.count_words(t, language) for t in competitors] | |
| if wc_comp: | |
| avg_total = sum(c["total"] for c in wc_comp) / len(wc_comp) | |
| avg_sig = sum(c["significant"] for c in wc_comp) / len(wc_comp) | |
| else: | |
| avg_total = 0 | |
| avg_sig = 0 | |
| ngram_stats = logic.calculate_ngram_stats(target_text, competitors, language) | |
| key_phrases, _ = logic.parse_keywords(keywords, language) | |
| bm25 = logic.calculate_bm25_recommendations(target_text, competitors, keywords, language) | |
| bert = logic.perform_bert_analysis(target_text, competitors, key_phrases, language) | |
| title_data = {} | |
| if (target_title or "").strip(): | |
| title_data = logic.analyze_title(target_title, competitor_titles, keywords, language) | |
| return { | |
| "word_counts": { | |
| "target": wc_target, | |
| "competitors": wc_comp, | |
| "avg": {"total": round(avg_total), "significant": round(avg_sig)}, | |
| }, | |
| "ngram_stats": ngram_stats, | |
| "bm25_recommendations": bm25, | |
| "bert_analysis": bert, | |
| "title_analysis": title_data, | |
| } | |
| def _build_semantic_snapshot( | |
| target_text: str, | |
| competitors: List[str], | |
| language: str, | |
| ) -> Dict[str, Any]: | |
| def _build_doc(text: str, doc_id: int) -> Dict[str, Any]: | |
| sentences_data = nlp_processor.preprocess_text(text, language) | |
| graph, word_weights = semantic_graph.build_semantic_graph(sentences_data, lang=language) | |
| graph_data = semantic_graph.get_graph_data_for_frontend(graph) | |
| return { | |
| "id": doc_id, | |
| "text": text, | |
| "word_weights": word_weights, | |
| "stats": { | |
| "nodes": len(graph_data.get("nodes", [])), | |
| "links": len(graph_data.get("links", [])), | |
| }, | |
| } | |
| target_doc = _build_doc(target_text, 0) | |
| comp_docs = [] | |
| for idx, c in enumerate([x for x in competitors if (x or "").strip()]): | |
| comp_docs.append(_build_doc(c, idx + 1)) | |
| num_comp = len(comp_docs) | |
| target_weights = target_doc["word_weights"] | |
| all_terms = set(target_weights.keys()) | |
| for c in comp_docs: | |
| all_terms.update(c["word_weights"].keys()) | |
| term_power_table = [] | |
| for term in all_terms: | |
| target_weight = int(target_weights.get(term, 0)) | |
| comp_weights = [int(c["word_weights"].get(term, 0)) for c in comp_docs] | |
| comp_avg = round(sum(comp_weights) / max(1, num_comp), 2) | |
| comp_occ = sum(1 for w in comp_weights if w > 0) | |
| term_power_table.append( | |
| { | |
| "term": term, | |
| "target_weight": target_weight, | |
| "competitor_avg_weight": comp_avg, | |
| "comp_occurrence": comp_occ, | |
| "comp_total": num_comp, | |
| } | |
| ) | |
| return {"comparison": {"term_power_table": term_power_table, "num_competitors": num_comp}} | |
| def _is_semantic_gap(target_weight: float, competitor_avg_weight: float) -> bool: | |
| # Gap is significant only when competitor is meaningfully above target: | |
| # - relative margin above tolerance band (15%) | |
| # - and absolute margin to avoid micro-noise around normalized weights | |
| if competitor_avg_weight <= 0: | |
| return False | |
| abs_gap = competitor_avg_weight - target_weight | |
| rel_threshold = target_weight * (1.0 + SEMANTIC_GAP_TOLERANCE_PCT) | |
| return (competitor_avg_weight > rel_threshold) and (abs_gap >= SEMANTIC_GAP_MIN_ABS) | |
| def _ngram_tolerance_pct(competitor_avg: float) -> float: | |
| # User rule: | |
| # - avg >= 4 -> +/-20% | |
| # - avg < 4 -> +/-50% | |
| return 0.20 if competitor_avg >= 4.0 else 0.50 | |
| def _is_ngram_outside_tolerance(target_count: float, competitor_avg: float) -> bool: | |
| if competitor_avg <= 0: | |
| return False | |
| tol = _ngram_tolerance_pct(competitor_avg) | |
| low = competitor_avg * (1.0 - tol) | |
| high = competitor_avg * (1.0 + tol) | |
| return target_count < low or target_count > high | |
| def _ngram_deviation_ratio(target_count: float, competitor_avg: float) -> float: | |
| if competitor_avg <= 0: | |
| return 0.0 | |
| return abs(target_count - competitor_avg) / max(competitor_avg, 1e-6) | |
| def _keyword_unigram_set(keywords: List[str], language: str) -> set: | |
| out = set() | |
| for kw in keywords: | |
| toks = _filter_stopwords(_tokenize(kw), language) | |
| for t in toks: | |
| out.add(t) | |
| return out | |
| def _is_ngram_stage_candidate( | |
| ngram_label: str, | |
| comp_occurrence: int, | |
| competitor_count: int, | |
| keyword_unigrams: set, | |
| ) -> bool: | |
| ngram = (ngram_label or "").strip().lower() | |
| if not ngram: | |
| return False | |
| tokens = _tokenize(ngram) | |
| n = len(tokens) | |
| if competitor_count > 1: | |
| if comp_occurrence < 2: | |
| return False | |
| if n >= 2: | |
| # For multi-competitor mode, bi/tri-grams with K>=2 are always candidates. | |
| return True | |
| # Unigrams are candidates only if they belong to key phrases. | |
| return n == 1 and tokens[0] in keyword_unigrams | |
| # Single-competitor mode: keep broader eligibility. | |
| return comp_occurrence >= 1 | |
| def _chunk_ngram_count(text: str, ngram_label: str, language: str) -> int: | |
| toks = _filter_stopwords(_tokenize(text), language) | |
| phrase_toks = _filter_stopwords(_tokenize(ngram_label), language) | |
| if not toks or not phrase_toks: | |
| return 0 | |
| n = len(phrase_toks) | |
| if n == 1: | |
| term = phrase_toks[0] | |
| return sum(1 for t in toks if t == term) | |
| count = 0 | |
| for i in range(0, max(0, len(toks) - n + 1)): | |
| if toks[i : i + n] == phrase_toks: | |
| count += 1 | |
| return count | |
| def _build_ngram_stage_rows( | |
| analysis: Dict[str, Any], | |
| keywords: List[str], | |
| language: str, | |
| ) -> List[Tuple[str, float, float, float, int, float]]: | |
| """ | |
| Eligible underrepresented n-gram targets for the ngram stage. | |
| Each row: (ngram_label, target_count, comp_avg, tolerance_pct, comp_occurrence, dev_ratio). | |
| Priority (user policy): maximize competitor coverage Freq(K), then Avg(K), then | |
| how far below the band the target is (larger deviation first). | |
| Only terms with My=0 or outside the tolerance band below average are included. | |
| """ | |
| ngram_rows: List[Tuple[str, float, float, float, int, float]] = [] | |
| ngram_stats = analysis.get("ngram_stats", {}) or {} | |
| competitor_count = len((analysis.get("word_counts", {}) or {}).get("competitors", []) or []) | |
| keyword_unigrams = _keyword_unigram_set(keywords, language) | |
| for _bucket_name, bucket in ngram_stats.items(): | |
| if not isinstance(bucket, list): | |
| continue | |
| for item in bucket: | |
| ngram_label = str(item.get("ngram", "")).strip() | |
| if not ngram_label: | |
| continue | |
| target = float(item.get("target_count", 0)) | |
| comp_avg = float(item.get("competitor_avg", 0)) | |
| comp_occ = int(item.get("comp_occurrence", 0)) | |
| if not _is_ngram_stage_candidate(ngram_label, comp_occ, competitor_count, keyword_unigrams): | |
| continue | |
| if not _is_ngram_outside_tolerance(target, comp_avg): | |
| continue | |
| if target >= comp_avg: | |
| continue | |
| tol = _ngram_tolerance_pct(comp_avg) | |
| dev_ratio = _ngram_deviation_ratio(target, comp_avg) | |
| ngram_rows.append((ngram_label, target, comp_avg, tol, comp_occ, dev_ratio)) | |
| ngram_rows.sort(key=lambda x: (x[4], x[2], x[5]), reverse=True) | |
| return ngram_rows | |
| def _score_ngram_candidate_window(window_sentences: List[str], goal_label: str, language: str) -> float: | |
| """Heuristic: good place to add phrase — low local duplication, topical proximity, not boilerplate.""" | |
| chunk = " ".join(s for s in window_sentences if s).strip() | |
| if not chunk: | |
| return -1e6 | |
| phrase_count = float(_chunk_ngram_count(chunk, goal_label, language)) | |
| noise_n = sum(1 for s in window_sentences if _is_noise_like_sentence(s)) | |
| noise_frac = noise_n / max(1, len(window_sentences)) | |
| phrase_tokens = [t.lower() for t in _filter_stopwords(_tokenize(goal_label), language) if t] | |
| chunk_l = chunk.lower() | |
| unigram_hits = sum(1 for t in phrase_tokens if t and len(t) > 1 and t in chunk_l) | |
| rel_proxy = unigram_hits / max(1, len(phrase_tokens)) if phrase_tokens else 0.0 | |
| return ( | |
| -3.0 * phrase_count | |
| + 2.2 * rel_proxy | |
| - 4.0 * noise_frac | |
| + min(len(chunk) / 1200.0, 0.35) | |
| ) | |
| def _rank_ngram_overlap_sentence_indices( | |
| sentences: List[str], | |
| goal_label: str, | |
| language: str, | |
| ) -> List[int]: | |
| """ | |
| Slide overlapping multi-sentence windows over the document; each sentence gets the | |
| best score among windows that contain it. Order sentences by that score (desc). | |
| """ | |
| n = len(sentences) | |
| if n <= 0: | |
| return [0] | |
| if n == 1: | |
| return [0] | |
| # 2–4 sentences per window, stride 1 for strong overlap. | |
| w = min(4, max(2, n)) | |
| best: List[float] = [-1e9] * n | |
| for start in range(0, n - w + 1): | |
| win = sentences[start : start + w] | |
| sc = _score_ngram_candidate_window(win, goal_label, language) | |
| for j in range(start, start + w): | |
| if sc > best[j]: | |
| best[j] = sc | |
| center = (n - 1) / 2.0 | |
| scored_idx = [(i, best[i], -abs(i - center)) for i in range(n)] | |
| scored_idx.sort(key=lambda t: (t[1], t[2]), reverse=True) | |
| return [t[0] for t in scored_idx] | |
| def _compute_metrics( | |
| analysis: Dict[str, Any], | |
| semantic: Dict[str, Any], | |
| keywords: List[str], | |
| language: str, | |
| bert_stage_target: float = BERT_TARGET_THRESHOLD, | |
| ) -> Dict[str, Any]: | |
| competitor_count = len(analysis.get("word_counts", {}).get("competitors", [])) | |
| min_signal = 1 if competitor_count <= 1 else 2 | |
| bert_details = analysis.get("bert_analysis", {}).get("detailed", []) or [] | |
| bert_low = [d for d in bert_details if float(d.get("my_max_score", 0)) < float(bert_stage_target)] | |
| bert_phrase_scores = { | |
| str(d.get("phrase", "")).strip().lower(): float(d.get("my_max_score", 0) or 0.0) | |
| for d in bert_details | |
| if str(d.get("phrase", "")).strip() | |
| } | |
| bm25_remove = [x for x in (analysis.get("bm25_recommendations") or []) if x.get("action") == "remove"] | |
| bm25_remove_count = len(bm25_remove) | |
| ngram_signal_count = 0 | |
| ngram_gap_sum = 0.0 | |
| keyword_unigrams = _keyword_unigram_set(keywords, language) | |
| ngrams = analysis.get("ngram_stats", {}) or {} | |
| for bucket in ngrams.values(): | |
| if not isinstance(bucket, list): | |
| continue | |
| for item in bucket: | |
| comp_occ = int(item.get("comp_occurrence", 0)) | |
| ngram_label = str(item.get("ngram", "")) | |
| if not _is_ngram_stage_candidate(ngram_label, comp_occ, competitor_count, keyword_unigrams): | |
| continue | |
| target = float(item.get("target_count", 0)) | |
| comp_avg = float(item.get("competitor_avg", 0)) | |
| if _is_ngram_outside_tolerance(target, comp_avg): | |
| ngram_signal_count += 1 | |
| ngram_gap_sum += _ngram_deviation_ratio(target, comp_avg) | |
| title_score = None | |
| title_bert = analysis.get("title_analysis", {}).get("bert", {}) | |
| if title_bert and title_bert.get("target_score") is not None: | |
| title_score = float(title_bert.get("target_score", 0)) | |
| keyword_terms = set() | |
| for kw in keywords: | |
| tokens = _filter_stopwords(_tokenize(kw), language) | |
| for t in tokens: | |
| keyword_terms.add(t) | |
| for n in (2, 3): | |
| for i in range(0, max(0, len(tokens) - n + 1)): | |
| keyword_terms.add(" ".join(tokens[i : i + n])) | |
| table = semantic.get("comparison", {}).get("term_power_table", []) or [] | |
| by_term = {str(r.get("term", "")).lower(): r for r in table} | |
| semantic_gap_count = 0 | |
| semantic_gap_sum = 0.0 | |
| semantic_gap_terms: List[Dict[str, Any]] = [] | |
| for term in keyword_terms: | |
| row = by_term.get(term) | |
| if not row: | |
| continue | |
| target_w = float(row.get("target_weight", 0)) | |
| comp_w = float(row.get("competitor_avg_weight", 0)) | |
| gap = comp_w - target_w | |
| if int(row.get("comp_occurrence", 0)) >= min_signal and _is_semantic_gap(target_w, comp_w): | |
| semantic_gap_count += 1 | |
| semantic_gap_sum += gap | |
| base = max(1.0, target_w) | |
| semantic_gap_terms.append( | |
| { | |
| "term": term, | |
| "target_weight": round(target_w, 2), | |
| "competitor_avg_weight": round(comp_w, 2), | |
| "gap": round(gap, 2), | |
| "gap_pct_of_target": round(gap / base, 4), | |
| "comp_occurrence": int(row.get("comp_occurrence", 0)), | |
| "comp_total": int(row.get("comp_total", 0)), | |
| } | |
| ) | |
| # Composite score (0..100) | |
| w_bert, w_bm25, w_ng, w_title, w_sem = 30, 20, 15, 10, 25 | |
| bert_comp = 1.0 - (len(bert_low) / max(1, len(bert_details))) | |
| bm25_comp = 1.0 if bm25_remove_count <= 3 else max(0.0, 1.0 - ((bm25_remove_count - 3) / 10.0)) | |
| ng_comp = max(0.0, 1.0 - (ngram_signal_count / 15.0)) | |
| title_comp = 1.0 if title_score is None else min(1.0, max(0.0, title_score / 0.65)) | |
| sem_comp = max(0.0, 1.0 - (semantic_gap_count / 20.0)) | |
| weighted = ( | |
| w_bert * bert_comp | |
| + w_bm25 * bm25_comp | |
| + w_ng * ng_comp | |
| + w_title * title_comp | |
| + w_sem * sem_comp | |
| ) | |
| total_w = w_bert + w_bm25 + w_ng + w_title + w_sem | |
| score = round((weighted / total_w) * 100.0, 2) | |
| resolved_title = "" | |
| _td = analysis.get("title_analysis") or {} | |
| if isinstance(_td, dict) and (_td.get("target_title") or "").strip(): | |
| resolved_title = str(_td.get("target_title")).strip() | |
| return { | |
| "score": score, | |
| "competitor_count": competitor_count, | |
| "min_competitor_signal": min_signal, | |
| "bert_low_count": len(bert_low), | |
| "bert_total_keywords": len(bert_details), | |
| "bert_phrase_scores": bert_phrase_scores, | |
| "bm25_remove_count": bm25_remove_count, | |
| "ngram_signal_count": ngram_signal_count, | |
| "ngram_gap_sum": round(ngram_gap_sum, 4), | |
| "title_bert_score": title_score, | |
| "resolved_title": resolved_title, | |
| "semantic_gap_count": semantic_gap_count, | |
| "semantic_gap_sum": round(semantic_gap_sum, 4), | |
| "semantic_gap_terms": sorted( | |
| semantic_gap_terms, | |
| key=lambda x: (x.get("gap", 0), x.get("gap_pct_of_target", 0)), | |
| reverse=True, | |
| )[:20], | |
| } | |
| def _choose_optimization_goal( | |
| analysis: Dict[str, Any], | |
| semantic: Dict[str, Any], | |
| keywords: List[str], | |
| language: str, | |
| stage: str = "bert", | |
| bert_stage_target: float = BERT_TARGET_THRESHOLD, | |
| stage_cursor: int = 0, | |
| ) -> Dict[str, Any]: | |
| goals = _collect_optimization_goals( | |
| analysis=analysis, | |
| semantic=semantic, | |
| keywords=keywords, | |
| language=language, | |
| stage=stage, | |
| bert_stage_target=bert_stage_target, | |
| ) | |
| if not goals: | |
| return {"type": "none", "label": "no-op", "focus_terms": [], "avoid_terms": []} | |
| pick = max(0, int(stage_cursor)) | |
| if pick >= len(goals): | |
| return {"type": "none", "label": "no-op", "focus_terms": [], "avoid_terms": []} | |
| return goals[pick] | |
| def _collect_optimization_goals( | |
| analysis: Dict[str, Any], | |
| semantic: Dict[str, Any], | |
| keywords: List[str], | |
| language: str, | |
| stage: str = "bert", | |
| bert_stage_target: float = BERT_TARGET_THRESHOLD, | |
| stage_goal_overrides: Optional[Dict[str, Any]] = None, | |
| ) -> List[Dict[str, Any]]: | |
| goals: List[Dict[str, Any]] = [] | |
| bert_details = analysis.get("bert_analysis", {}).get("detailed", []) or [] | |
| low_bert = [x for x in bert_details if float(x.get("my_max_score", 0)) < float(bert_stage_target)] | |
| if low_bert: | |
| for row in sorted(low_bert, key=lambda x: float(x.get("my_max_score", 0))): | |
| phrase = str(row.get("phrase", "")).strip() | |
| if not phrase: | |
| continue | |
| focus_terms = _filter_stopwords(_tokenize(phrase), language)[:4] | |
| goals.append( | |
| { | |
| "type": "bert", | |
| "label": phrase, | |
| "focus_terms": focus_terms, | |
| "avoid_terms": [], | |
| "bert_phrase_score": float(row.get("my_max_score", 0) or 0.0), | |
| "bert_target": float(bert_stage_target), | |
| } | |
| ) | |
| bm25_remove = [x for x in (analysis.get("bm25_recommendations") or []) if x.get("action") == "remove"] | |
| if len(bm25_remove) >= 4: | |
| for row in sorted(bm25_remove, key=lambda r: int(r.get("count", 0)), reverse=True)[:8]: | |
| word = str(row.get("word", "")).strip() | |
| if not word: | |
| continue | |
| goals.append( | |
| { | |
| "type": "bm25", | |
| "label": f"reduce spam: {word}", | |
| "focus_terms": [], | |
| "avoid_terms": [word], | |
| "bm25_count": int(row.get("count", 0) or 0), | |
| "bm25_word": word, | |
| } | |
| ) | |
| # Semantic keyword gaps | |
| lang_stop = STOP_WORDS.get(language, STOP_WORDS["en"]) | |
| keyword_terms = set() | |
| for kw in keywords: | |
| toks = [t for t in _tokenize(kw) if t not in lang_stop] | |
| keyword_terms.update(toks) | |
| for n in (2, 3): | |
| for i in range(0, max(0, len(toks) - n + 1)): | |
| keyword_terms.add(" ".join(toks[i : i + n])) | |
| table = semantic.get("comparison", {}).get("term_power_table", []) or [] | |
| candidate_rows: List[Tuple[str, float]] = [] | |
| for row in table: | |
| term = str(row.get("term", "")).lower() | |
| if term not in keyword_terms: | |
| continue | |
| target_w = float(row.get("target_weight", 0)) | |
| comp_w = float(row.get("competitor_avg_weight", 0)) | |
| gap = comp_w - target_w | |
| if _is_semantic_gap(target_w, comp_w): | |
| candidate_rows.append((term, gap)) | |
| if candidate_rows: | |
| for term, gap in sorted(candidate_rows, key=lambda x: x[1], reverse=True)[:12]: | |
| goals.append( | |
| { | |
| "type": "semantic", | |
| "label": term, | |
| "focus_terms": [term], | |
| "avoid_terms": [], | |
| "semantic_gap": float(gap), | |
| } | |
| ) | |
| # N-gram balancing (toward competitor average with tolerance policy). | |
| ngram_rows = _build_ngram_stage_rows(analysis, keywords, language) | |
| if ngram_rows: | |
| for rank, (label, target, comp_avg, tol, _, _) in enumerate(ngram_rows): | |
| goals.append({ | |
| "type": "ngram", | |
| "label": label, | |
| "focus_terms": [label], | |
| "avoid_terms": [], | |
| "ngram_target_count": target, | |
| "ngram_comp_avg": comp_avg, | |
| "ngram_tolerance_pct": tol, | |
| "ngram_lower_bound": round(comp_avg * (1.0 - tol), 3), | |
| "ngram_upper_bound": round(comp_avg * (1.0 + tol), 3), | |
| "ngram_direction": "increase" if target < comp_avg else "decrease", | |
| "ngram_rank_index": rank, | |
| "ngram_candidates_total": len(ngram_rows), | |
| }) | |
| title_bert = analysis.get("title_analysis", {}).get("bert", {}) or {} | |
| title_target_score = title_bert.get("target_score") | |
| if ( | |
| keywords | |
| and title_target_score is not None | |
| and float(title_target_score) < TITLE_TARGET_THRESHOLD | |
| ): | |
| goals.append( | |
| { | |
| "type": "title", | |
| "label": "title alignment", | |
| "focus_terms": _filter_stopwords(_tokenize(" ".join(keywords[:8])), language)[:8], | |
| "avoid_terms": [], | |
| "title_bert_score": float(title_target_score) if title_target_score is not None else None, | |
| "title_target": float(TITLE_TARGET_THRESHOLD), | |
| } | |
| ) | |
| stage_goals = [g for g in goals if g.get("type") == stage] | |
| return _apply_stage_goal_override(stage_goals, stage, language, stage_goal_overrides) | |
| def _per_goal_budget( | |
| goal: Dict[str, Any], | |
| max_iterations: int, | |
| candidates_per_iteration: int, | |
| bert_stage_target: float, | |
| ) -> Tuple[int, int]: | |
| """ | |
| Scale per-goal iteration and candidate budgets by how far the metric is from its target. | |
| Returns (effective_max_iterations_for_this_goal, effective_candidates_per_iteration). | |
| """ | |
| t = str(goal.get("type", "") or "") | |
| raw = 0.0 | |
| if t == "bert": | |
| sc = float(goal.get("bert_phrase_score", 0.0) or 0.0) | |
| tgt = float(goal.get("bert_target", bert_stage_target) or bert_stage_target) | |
| raw = max(0.0, (tgt - sc) / max(tgt, 1e-6)) | |
| elif t == "ngram": | |
| ca = float(goal.get("ngram_comp_avg", 0.0) or 0.0) | |
| tc = float(goal.get("ngram_target_count", 0.0) or 0.0) | |
| if str(goal.get("ngram_direction", "increase")) == "increase": | |
| need = max(0.0, ca - tc) | |
| raw = min(1.0, need / max(ca, 1e-6)) | |
| else: | |
| need = max(0.0, tc - ca) | |
| raw = min(1.0, need / max(tc, 1e-6)) | |
| elif t == "semantic": | |
| gap = float(goal.get("semantic_gap", 0.0) or 0.0) | |
| raw = min(1.0, gap / max(SEMANTIC_GAP_MIN_ABS * 4.0, 1e-6)) | |
| elif t == "bm25": | |
| c = int(goal.get("bm25_count", 0) or 0) | |
| raw = min(1.0, max(0, c - 1) / 8.0) | |
| elif t == "title": | |
| ts = goal.get("title_bert_score") | |
| if ts is None: | |
| raw = 0.5 | |
| else: | |
| tgt = float(goal.get("title_target", TITLE_TARGET_THRESHOLD) or TITLE_TARGET_THRESHOLD) | |
| raw = max(0.0, (tgt - float(ts)) / max(tgt, 1e-6)) | |
| else: | |
| raw = 0.0 | |
| iter_mult = 1.0 + 2.0 * min(1.0, raw) | |
| cand_mult = 1.0 + 1.0 * min(1.0, raw) | |
| eff_iter = max(1, min(int(round(max_iterations * iter_mult)), max_iterations * 3)) | |
| eff_cand = max(1, min(int(round(candidates_per_iteration * cand_mult)), 5)) | |
| return eff_iter, eff_cand | |
| def _estimate_total_loop_budget( | |
| analysis: Dict[str, Any], | |
| semantic: Dict[str, Any], | |
| keywords: List[str], | |
| language: str, | |
| max_iterations: int, | |
| candidates_per_iteration: int, | |
| bert_stage_target: float, | |
| active_stage_order: Optional[List[str]] = None, | |
| stage_goal_overrides: Optional[Dict[str, Any]] = None, | |
| ) -> int: | |
| total = 0 | |
| stages = active_stage_order or list(STAGE_ORDER) | |
| for st in stages: | |
| for g in _collect_optimization_goals( | |
| analysis, | |
| semantic, | |
| keywords, | |
| language, | |
| stage=st, | |
| bert_stage_target=bert_stage_target, | |
| stage_goal_overrides=stage_goal_overrides, | |
| ): | |
| ei, _ = _per_goal_budget(g, max_iterations, candidates_per_iteration, bert_stage_target) | |
| total += ei | |
| return min(480, max(1, total)) | |
| def _choose_sentence_idx(sentences: List[str], focus_terms: List[str], avoid_terms: List[str], language: str) -> int: | |
| if not sentences: | |
| return 0 | |
| stop = STOP_WORDS.get(language, STOP_WORDS["en"]) | |
| focus = [x for x in focus_terms if x and x not in stop] | |
| if avoid_terms: | |
| best_idx, best_score = 0, -1.0 | |
| for i, s in enumerate(sentences): | |
| lower = s.lower() | |
| score = sum(lower.count(t.lower()) for t in avoid_terms if t) | |
| if score > best_score: | |
| best_idx, best_score = i, score | |
| return best_idx | |
| if focus: | |
| best_idx, best_score = 0, -1.0 | |
| for i, s in enumerate(sentences): | |
| lower = s.lower() | |
| score = sum(lower.count(t.lower()) for t in focus) | |
| if score > best_score: | |
| best_idx, best_score = i, score | |
| return best_idx | |
| return min(2, len(sentences) - 1) | |
| def _rank_sentence_indices( | |
| sentences: List[str], | |
| focus_terms: List[str], | |
| avoid_terms: List[str], | |
| language: str, | |
| goal_type: str = "", | |
| goal_label: str = "", | |
| ) -> List[int]: | |
| if not sentences: | |
| return [0] | |
| stop = STOP_WORDS.get(language, STOP_WORDS["en"]) | |
| focus = [x for x in focus_terms if x and x not in stop] | |
| goal_phrase = (goal_label or "").strip().lower() | |
| avoid = [x for x in avoid_terms if x] | |
| center = (len(sentences) - 1) / 2.0 | |
| # N-gram stage: overlapping sentence windows — pick spans where insertion is natural | |
| # while document-level phrase count remains the primary optimization signal. | |
| if goal_type == "ngram" and (goal_label or "").strip(): | |
| return _rank_ngram_overlap_sentence_indices(sentences, str(goal_label).strip(), language) | |
| # For BERT optimization prefer natural prose chunks over list/menu/noisy blocks. | |
| candidate_indices = list(range(len(sentences))) | |
| if goal_type == "bert": | |
| non_noise = [i for i, s in enumerate(sentences) if not _is_noise_like_sentence(s)] | |
| if non_noise: | |
| candidate_indices = non_noise | |
| scored: List[Tuple[int, float, int]] = [] | |
| for idx in candidate_indices: | |
| s = sentences[idx] | |
| lower = s.lower() | |
| # For bm25 "remove" goals we must select spans that actually contain the exact n-gram term. | |
| # Otherwise the LLM can "optimize around" the goal without decreasing the real BM25 remove metric. | |
| if goal_type == "bm25" and avoid and not focus_terms: | |
| bm25_term = avoid[0] | |
| bm25_count = _count_term_ngrams_strict(s, bm25_term) | |
| focus_score = 0 | |
| avoid_score = float(bm25_count) | |
| chunk_rel = float(bm25_count) | |
| else: | |
| focus_score = sum(lower.count(t.lower()) for t in focus) | |
| avoid_score = sum(lower.count(t.lower()) for t in avoid) | |
| chunk_rel = _chunk_goal_relevance(s, goal_type, goal_label, focus_terms, language) | |
| noise_penalty = 1.0 if _is_noise_like_sentence(s) else 0.0 | |
| # For BERT goals, do not over-focus on existing occurrences only: | |
| # prioritize semantically relevant chunks where phrase/terms may still be underrepresented. | |
| if goal_type == "bert": | |
| tokenized = _filter_stopwords(_tokenize(lower), language) | |
| token_set = set(tokenized) | |
| core_terms = [t.lower() for t in focus if t] | |
| core_hits = sum(1 for t in core_terms if t in token_set) | |
| coverage = (core_hits / max(1, len(core_terms))) if core_terms else 0.0 | |
| phrase_present = 1.0 if (goal_phrase and goal_phrase in lower) else 0.0 | |
| # Boost candidates where semantic context is relevant but explicit core terms are not saturated yet. | |
| missing_term_boost = (1.0 - coverage) * 1.4 if chunk_rel >= 0.18 else 0.0 | |
| phrase_absent_boost = 0.35 if (goal_phrase and not phrase_present and chunk_rel >= 0.2) else 0.0 | |
| score = ( | |
| (chunk_rel * 5.0) | |
| + (focus_score * 1.2) | |
| + (missing_term_boost + phrase_absent_boost) | |
| + (avoid_score * 1.5) | |
| - (noise_penalty * 3.0) | |
| - (abs(idx - center) * 0.05) | |
| ) | |
| else: | |
| # Prefer semantically relevant and lexical matches; push noisy headers/CTA lower. | |
| score = (chunk_rel * 4.0) + (focus_score * 3.0) + (avoid_score * 2.0) - (noise_penalty * 3.0) - (abs(idx - center) * 0.05) | |
| scored.append((idx, score, len(s))) | |
| scored.sort(key=lambda x: (x[1], -x[2]), reverse=True) | |
| ordered = [idx for idx, _, _ in scored] | |
| if not ordered: | |
| ordered = list(range(len(sentences))) | |
| return ordered | |
| def _span_variants_for_level(cascade_level: int) -> List[Tuple[str, int, int]]: | |
| # (operation, left_radius, right_radius) | |
| if cascade_level <= 1: | |
| return [("rewrite", 0, 0), ("rewrite", 0, 1), ("rewrite", 1, 0)] | |
| if cascade_level == 2: | |
| return [("rewrite", 1, 1), ("rewrite", 0, 2), ("rewrite", 2, 0), ("rewrite", 1, 2), ("rewrite", 2, 1)] | |
| if cascade_level == 3: | |
| return [("insert", 0, 0), ("insert", 0, 0), ("insert", 0, 0), ("rewrite", 1, 1)] | |
| return [("rewrite", 2, 2), ("rewrite", 1, 3), ("rewrite", 3, 1), ("rewrite", 2, 3), ("rewrite", 3, 2)] | |
| def _choose_edit_span( | |
| sentences: List[str], | |
| goal: Dict[str, Any], | |
| language: str, | |
| cascade_level: int, | |
| attempt_cursor: int, | |
| ) -> Tuple[str, int, int, int, int]: | |
| ranked = _rank_sentence_indices( | |
| sentences, | |
| goal.get("focus_terms", []) or [], | |
| goal.get("avoid_terms", []) or [], | |
| language, | |
| str(goal.get("type", "") or ""), | |
| str(goal.get("label", "") or ""), | |
| ) | |
| variants = _span_variants_for_level(cascade_level) | |
| total = max(1, len(ranked) * len(variants)) | |
| pick = attempt_cursor % total | |
| sent_pick = pick % len(ranked) | |
| variant_pick = (pick // len(ranked)) % len(variants) | |
| sent_idx = ranked[sent_pick] | |
| operation, left_radius, right_radius = variants[variant_pick] | |
| if operation == "insert": | |
| span_start = sent_idx | |
| span_end = sent_idx | |
| else: | |
| span_start = max(0, sent_idx - left_radius) | |
| span_end = min(len(sentences) - 1, sent_idx + right_radius) | |
| return operation, span_start, span_end, sent_idx, variant_pick | |
| def _is_noise_like_sentence(text: str) -> bool: | |
| s = (text or "").strip() | |
| if not s: | |
| return True | |
| lower = s.lower() | |
| tokens = _tokenize(lower) | |
| if len(tokens) <= 2: | |
| return True | |
| if len(tokens) <= 8 and re.search(r"\b(play|explore|best|top|contact|login|signup|casino)\b", lower): | |
| return True | |
| if len(s) <= 90 and re.fullmatch(r"[A-Z0-9\s\-\|\:\.]+", s): | |
| return True | |
| if re.search(r"\b(best|top)\b.{0,20}\b(alternative|alternatives|casino|casinos)\b", lower) and len(tokens) <= 12: | |
| return True | |
| return False | |
| def _chunk_goal_relevance( | |
| text: str, | |
| goal_type: str, | |
| goal_label: str, | |
| focus_terms: List[str], | |
| language: str, | |
| goal_meta: Optional[Dict[str, Any]] = None, | |
| ) -> float: | |
| chunk = (text or "").strip() | |
| if not chunk: | |
| return 0.0 | |
| if goal_type == "bert" and (goal_label or "").strip(): | |
| try: | |
| model = logic.get_bert_model() | |
| embeddings = model.encode([goal_label.strip(), chunk], convert_to_tensor=True) | |
| return float(logic.util.cos_sim(embeddings[0:1], embeddings[1:2])[0][0].item()) | |
| except Exception: | |
| pass | |
| if goal_type == "ngram" and (goal_label or "").strip(): | |
| return float(_chunk_ngram_count(chunk, goal_label, language)) | |
| # Lexical fallback for non-BERT goals or if embedding scoring is unavailable. | |
| toks = _filter_stopwords(_tokenize(chunk), language) | |
| if not toks: | |
| return 0.0 | |
| focus = [f.lower() for f in (focus_terms or []) if f] | |
| if not focus: | |
| return 0.0 | |
| overlap = 0.0 | |
| token_str = " ".join(toks) | |
| for f in focus: | |
| if " " in f: | |
| overlap += 1.0 if f in token_str else 0.0 | |
| else: | |
| overlap += 1.0 if f in toks else 0.0 | |
| return overlap / max(1.0, float(len(focus))) | |
| def _chunk_goal_delta( | |
| before_text: str, | |
| after_text: str, | |
| goal_type: str, | |
| goal_label: str, | |
| focus_terms: List[str], | |
| language: str, | |
| goal_meta: Optional[Dict[str, Any]] = None, | |
| ) -> float: | |
| # BM25 "remove spam" must be judged by strict decrease of the exact n-gram term count. | |
| # Иначе LLM может улучшать семантику рядом, но не уменьшать реальный BM25 remove metric. | |
| if goal_type == "bm25": | |
| term = None | |
| avoid = [] | |
| focus = [] | |
| if isinstance(goal_meta, dict): | |
| term = goal_meta.get("bm25_word") | |
| avoid = goal_meta.get("avoid_terms") or [] | |
| focus = goal_meta.get("focus_terms") or [] | |
| # Fallback: try to infer from avoid_terms | |
| if not term and avoid: | |
| term = avoid[0] | |
| if not term: | |
| return 0.0 | |
| before_count = _count_term_ngrams_strict(before_text or "", str(term)) | |
| after_count = _count_term_ngrams_strict(after_text or "", str(term)) | |
| # For now bm25 goals are "remove" (focus_terms is empty, avoid_terms contains term). | |
| # Keep the sign direction explicit so add-mode can be introduced later safely. | |
| if avoid and not focus: | |
| return round(float(before_count - after_count), 4) | |
| # If ever we add bm25 "add" goals with focus terms, then increase is good. | |
| return round(float(after_count - before_count), 4) | |
| before_rel = _chunk_goal_relevance(before_text, goal_type, goal_label, focus_terms, language, goal_meta) | |
| after_rel = _chunk_goal_relevance(after_text, goal_type, goal_label, focus_terms, language, goal_meta) | |
| if goal_type == "ngram": | |
| target_avg = float((goal_meta or {}).get("ngram_comp_avg", 0.0)) | |
| if target_avg > 0: | |
| # Positive delta means closer to competitor average regardless of direction. | |
| before_dist = abs(before_rel - target_avg) | |
| after_dist = abs(after_rel - target_avg) | |
| return round(before_dist - after_dist, 4) | |
| return round(after_rel - before_rel, 4) | |
| def _min_chunk_delta(goal_type: str) -> float: | |
| if goal_type == "bert": | |
| return 0.01 | |
| if goal_type == "title": | |
| return 0.005 | |
| if goal_type == "ngram": | |
| # Require at least one occurrence-equivalent movement toward target zone. | |
| return 0.5 | |
| return 0.05 | |
| def _chunk_relevance_pair( | |
| before_text: str, | |
| after_text: str, | |
| goal_type: str, | |
| goal_label: str, | |
| focus_terms: List[str], | |
| language: str, | |
| goal_meta: Optional[Dict[str, Any]] = None, | |
| ) -> Tuple[float, float]: | |
| before_rel = _chunk_goal_relevance(before_text, goal_type, goal_label, focus_terms, language, goal_meta) | |
| after_rel = _chunk_goal_relevance(after_text, goal_type, goal_label, focus_terms, language, goal_meta) | |
| return round(before_rel, 4), round(after_rel, 4) | |
| def _term_diff(before_text: str, after_text: str, language: str) -> Dict[str, List[str]]: | |
| before_tokens = _filter_stopwords(_tokenize(before_text), language) | |
| after_tokens = _filter_stopwords(_tokenize(after_text), language) | |
| before_set = set(before_tokens) | |
| after_set = set(after_tokens) | |
| removed = sorted(list(before_set - after_set))[:12] | |
| added = sorted(list(after_set - before_set))[:12] | |
| return {"added_terms": added, "removed_terms": removed} | |
| def _edits_conflict(a: Dict[str, Any], b: Dict[str, Any]) -> bool: | |
| if a.get("operation") == "title_rewrite" or b.get("operation") == "title_rewrite": | |
| return True | |
| if a.get("operation") == "insert" or b.get("operation") == "insert": | |
| return int(a.get("span_start", 0)) == int(b.get("span_start", 0)) | |
| a0, a1 = int(a.get("span_start", 0)), int(a.get("span_end", 0)) | |
| b0, b1 = int(b.get("span_start", 0)), int(b.get("span_end", 0)) | |
| return not (a1 < b0 or b1 < a0) | |
| def _apply_edits_to_sentences(sentences: List[str], edits: List[Dict[str, Any]]) -> List[str]: | |
| updated = list(sentences) | |
| ordered = sorted( | |
| edits, | |
| key=lambda e: (int(e.get("span_start", 0)), int(e.get("span_end", 0))), | |
| reverse=True, | |
| ) | |
| for edit in ordered: | |
| op = str(edit.get("operation", "rewrite")) | |
| start = int(edit.get("span_start", 0)) | |
| end = int(edit.get("span_end", start)) | |
| text = str(edit.get("edited_text", "")).strip() | |
| if not text: | |
| continue | |
| if op == "insert": | |
| updated = _insert_after(updated, end, text) | |
| else: | |
| updated = _replace_span(updated, start, end, text) | |
| return updated | |
| def _metrics_delta(prev_metrics: Dict[str, Any], next_metrics: Dict[str, Any]) -> Dict[str, Any]: | |
| keys = [ | |
| "score", | |
| "bert_low_count", | |
| "bm25_remove_count", | |
| "ngram_signal_count", | |
| "ngram_gap_sum", | |
| "semantic_gap_count", | |
| "semantic_gap_sum", | |
| ] | |
| out: Dict[str, Any] = {} | |
| for k in keys: | |
| pv = prev_metrics.get(k) | |
| nv = next_metrics.get(k) | |
| if pv is None or nv is None: | |
| continue | |
| out[k] = round(float(nv) - float(pv), 4) | |
| pv_t = prev_metrics.get("title_bert_score") | |
| nv_t = next_metrics.get("title_bert_score") | |
| if pv_t is not None and nv_t is not None: | |
| out["title_bert_score"] = round(float(nv_t) - float(pv_t), 4) | |
| prev_terms = {str(x.get("term", "")).lower() for x in (prev_metrics.get("semantic_gap_terms") or []) if x.get("term")} | |
| next_terms = {str(x.get("term", "")).lower() for x in (next_metrics.get("semantic_gap_terms") or []) if x.get("term")} | |
| if prev_terms or next_terms: | |
| out["semantic_gap_terms_added"] = sorted(list(next_terms - prev_terms))[:12] | |
| out["semantic_gap_terms_removed"] = sorted(list(prev_terms - next_terms))[:12] | |
| return out | |
| def _non_conflicting_edit_combos(candidates: List[Dict[str, Any]], min_size: int = 2, max_size: int = 4) -> List[List[Dict[str, Any]]]: | |
| if not candidates: | |
| return [] | |
| n = len(candidates) | |
| combos: List[List[Dict[str, Any]]] = [] | |
| for r in range(max(2, min_size), min(max_size, n) + 1): | |
| for idxs in combinations(range(n), r): | |
| combo = [candidates[i] for i in idxs] | |
| conflict = False | |
| for i in range(len(combo)): | |
| for j in range(i + 1, len(combo)): | |
| e1 = combo[i].get("edit_payload") | |
| e2 = combo[j].get("edit_payload") | |
| if not e1 or not e2: | |
| conflict = True | |
| break | |
| if _edits_conflict(e1, e2): | |
| conflict = True | |
| break | |
| if conflict: | |
| break | |
| if not conflict: | |
| combos.append(combo) | |
| return combos | |
| def _extract_json_object(text: str) -> Optional[Dict[str, Any]]: | |
| raw = (text or "").strip() | |
| if not raw: | |
| return None | |
| try: | |
| return json.loads(raw) | |
| except Exception: | |
| pass | |
| m = re.search(r"\{[\s\S]*\}", raw) | |
| if not m: | |
| return None | |
| try: | |
| return json.loads(m.group(0)) | |
| except Exception: | |
| return None | |
| def _llm_edit_chunk( | |
| *, | |
| api_key: str, | |
| base_url: str, | |
| model: str, | |
| language: str, | |
| full_text: str, | |
| chunk_text: str, | |
| operation: str, | |
| context_before: str, | |
| context_after: str, | |
| cascade_level: int, | |
| goal_type: str, | |
| goal_label: str, | |
| focus_terms: List[str], | |
| avoid_terms: List[str], | |
| temperature: float, | |
| phrase_strategy_mode: str = "auto", | |
| ) -> Dict[str, Any]: | |
| # Title optimization must never touch body text; see optimize_text branch for goal.type == "title". | |
| if str(goal_type or "").strip().lower() == "title": | |
| raise ValueError( | |
| "Internal error: goal_type 'title' must be handled via _llm_edit_title(), not _llm_edit_chunk(). " | |
| "If you see this on a server, deploy the current optimizer (title-only LLM path)." | |
| ) | |
| endpoint = base_url.rstrip("/") + "/chat/completions" | |
| op = operation if operation in {"rewrite", "insert"} else "rewrite" | |
| system_msg = ( | |
| "You are a semantic-vector optimizer for SEO tasks. " | |
| "Your task is to improve chunk relevance to the focus terms/goal phrase with minimal local edits. " | |
| "Preserve narrative flow, factual tone, and language. " | |
| "Return strict JSON only: {\"edited_text\": \"...\", \"rationale\": \"...\"}. " | |
| "Do not rewrite the whole text. Never change topic or introduce unrelated entities." | |
| ) | |
| op_instruction = ( | |
| "Rewrite the provided chunk only." | |
| if op == "rewrite" | |
| else "Create a short bridge chunk (1-2 sentences) to insert after the chunk." | |
| ) | |
| max_sent = _max_sentences_for_level(cascade_level, op) | |
| phrase_tokens = _filter_stopwords(_tokenize(goal_label or ""), language) | |
| phrase_len = len(phrase_tokens) | |
| strategy_mode = (phrase_strategy_mode or "auto").strip().lower() | |
| if strategy_mode not in {"auto", "exact_preferred", "distributed_preferred", "ensemble"}: | |
| strategy_mode = "auto" | |
| if strategy_mode == "exact_preferred": | |
| phrase_strategy = ( | |
| "Prefer one natural exact phrase mention when grammatically correct; otherwise use distributed core-term coverage." | |
| ) | |
| elif strategy_mode == "distributed_preferred": | |
| phrase_strategy = ( | |
| "Prefer distributed semantic coverage: spread core terms/lemmas naturally and avoid exact phrase unless absolutely natural." | |
| ) | |
| elif phrase_len >= 3: | |
| phrase_strategy = ( | |
| "Prefer distributed semantic coverage for long phrases: naturally spread core terms/lemmas across the local paragraph. " | |
| "Use exact phrase only if it is grammatically natural." | |
| ) | |
| elif phrase_len == 2: | |
| phrase_strategy = ( | |
| "For two-term goals, use either one natural exact phrase or distributed use of both terms without repetition." | |
| ) | |
| else: | |
| phrase_strategy = ( | |
| "For single-term goals, improve relevance using natural lexical variants and nearby semantic anchors." | |
| ) | |
| user_msg = ( | |
| f"Language: {language}\n" | |
| f"Operation: {op}\n" | |
| f"Cascade level: L{cascade_level}\n" | |
| f"Goal: {goal_type} ({goal_label})\n" | |
| f"Goal token count (without stopwords): {phrase_len}\n" | |
| f"Instruction: {op_instruction}\n" | |
| f"Must preserve overall narrative and style.\n" | |
| "Text must be grammatically correct and natural for native readers.\n" | |
| "Keep edits tightly local to the provided chunk and immediate context only.\n" | |
| "Edit must be substantive (not just synonyms) and should increase relevance to the goal phrase.\n" | |
| "Do not change the sentence subject/entity focus unless absolutely required by grammar.\n" | |
| f"Phrase strategy: {phrase_strategy}\n" | |
| f"Focus terms to strengthen: {', '.join(focus_terms) if focus_terms else '-'}\n" | |
| f"Terms to de-emphasize/avoid overuse: {', '.join(avoid_terms) if avoid_terms else '-'}\n\n" | |
| f"Chunk to edit/expand:\n{chunk_text}\n\n" | |
| f"Prev context:\n{context_before}\n\n" | |
| f"Next context:\n{context_after}\n\n" | |
| "Constraints:\n" | |
| "1) Keep text concise and locally coherent.\n" | |
| "2) Keep local coherence with surrounding text.\n" | |
| f"3) Max {max_sent} sentence(s) in edited_text.\n" | |
| "4) Keep key named entities from the original chunk unchanged when possible.\n" | |
| "5) For BERT goals, prioritize semantic alignment over exact phrase repetition.\n" | |
| "6) If exact phrase sounds unnatural, do NOT force it; use grammatically correct distributed wording.\n" | |
| "7) Exact phrase may appear at most once, and only when it reads naturally.\n" | |
| "8) Avoid repeating the same focus term more than needed; no stuffing.\n" | |
| "9) For rewrite: preserve original meaning sentence-by-sentence while improving relevance.\n" | |
| "10) Provide rationale in one short sentence.\n" | |
| "11) Only output JSON object." | |
| ) | |
| payload = { | |
| "model": model, | |
| "temperature": float(max(0.0, min(1.2, temperature))), | |
| "messages": [ | |
| {"role": "system", "content": system_msg}, | |
| {"role": "user", "content": user_msg}, | |
| ], | |
| "response_format": {"type": "json_object"}, | |
| } | |
| headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} | |
| response = requests.post(endpoint, headers=headers, json=payload, timeout=60) | |
| response.raise_for_status() | |
| data = response.json() | |
| content = ( | |
| data.get("choices", [{}])[0] | |
| .get("message", {}) | |
| .get("content", "") | |
| ) | |
| parsed = _extract_json_object(content) | |
| edited = "" | |
| rationale = "" | |
| if parsed: | |
| edited = str(parsed.get("edited_text") or parsed.get("revised_sentence") or parsed.get("rewrite") or "").strip() | |
| rationale = str(parsed.get("rationale") or parsed.get("why") or "").strip() | |
| if not edited: | |
| raise ValueError("LLM returned invalid JSON edit payload.") | |
| return { | |
| "edited_text": edited, | |
| "rationale": rationale, | |
| "prompt_debug": { | |
| "operation": op, | |
| "cascade_level": cascade_level, | |
| "goal_type": goal_type, | |
| "goal_label": goal_label, | |
| "focus_terms": focus_terms, | |
| "avoid_terms": avoid_terms, | |
| "phrase_strategy_mode": strategy_mode, | |
| "goal_token_count": phrase_len, | |
| "phrase_strategy": phrase_strategy, | |
| "max_sentences": max_sent, | |
| "chunk_text": chunk_text, | |
| "context_before": context_before, | |
| "context_after": context_after, | |
| "temperature": float(max(0.0, min(1.2, temperature))), | |
| }, | |
| } | |
| def _llm_edit_title( | |
| *, | |
| api_key: str, | |
| base_url: str, | |
| model: str, | |
| language: str, | |
| current_title: str, | |
| body_excerpt: str, | |
| competitor_title_hint: str, | |
| focus_terms: List[str], | |
| avoid_terms: List[str], | |
| keywords: List[str], | |
| cascade_level: int, | |
| temperature: float, | |
| phrase_strategy_mode: str = "auto", | |
| ) -> Dict[str, Any]: | |
| """ | |
| Rewrite only the page <title> plain text. Backend metric: mean BERT cos-sim(title, each keyword). | |
| """ | |
| endpoint = base_url.rstrip("/") + "/chat/completions" | |
| strategy_mode = (phrase_strategy_mode or "auto").strip().lower() | |
| if strategy_mode not in {"auto", "exact_preferred", "distributed_preferred", "ensemble"}: | |
| strategy_mode = "auto" | |
| phrase_tokens = _filter_stopwords(_tokenize(" ".join(keywords[:6])), language) | |
| phrase_len = len(phrase_tokens) | |
| if strategy_mode == "exact_preferred": | |
| phrase_strategy = ( | |
| "Prefer one natural exact phrase from the keyword list when it fits; otherwise distribute core terms." | |
| ) | |
| elif strategy_mode == "distributed_preferred": | |
| phrase_strategy = "Prefer natural distribution of core terms; avoid stiff exact-match phrasing." | |
| elif phrase_len >= 3: | |
| phrase_strategy = ( | |
| "Spread core lemmas across the title; exact multi-word match only if it reads like a real title." | |
| ) | |
| elif phrase_len == 2: | |
| phrase_strategy = "Use both core ideas in one short title line without repetition." | |
| else: | |
| phrase_strategy = "Keep the title specific and aligned with the keyword theme using natural wording." | |
| system_msg = ( | |
| "You optimize HTML <title> tag text for SEO. " | |
| "Output JSON only: {\"edited_text\": \"...\", \"rationale\": \"...\"}. " | |
| "edited_text must be the new title line ONLY: plain text, no <title> tags, no quotes wrapping the whole title. " | |
| "Do not change the article body. Preserve the primary brand/site name from the current title when present." | |
| ) | |
| kw_line = ", ".join(k.strip() for k in (keywords or [])[:24] if k.strip()) or "-" | |
| user_msg = ( | |
| f"Language: {language}\n" | |
| f"Cascade level: L{cascade_level}\n" | |
| f"Task: Improve semantic alignment of the TITLE with these keywords (average embedding similarity should rise).\n" | |
| f"Keywords (priority): {kw_line}\n" | |
| f"Focus terms: {', '.join(focus_terms) if focus_terms else '-'}\n" | |
| f"De-emphasize: {', '.join(avoid_terms) if avoid_terms else '-'}\n" | |
| f"Phrase strategy: {phrase_strategy}\n\n" | |
| f"Current title:\n{current_title.strip()}\n\n" | |
| f"Page content excerpt (context only; do not paste into title):\n{(body_excerpt or '')[:900]}\n\n" | |
| f"{competitor_title_hint}\n\n" | |
| "Constraints:\n" | |
| "1) Length about 35–60 characters when possible; hard max 88 characters.\n" | |
| "2) One line; no line breaks; title case or sentence case per language norms.\n" | |
| "3) Natural human phrasing; no keyword stuffing; each important term at most once unless grammar needs it.\n" | |
| "4) Do not add unrelated entities or claims.\n" | |
| "5) Rationale: one short sentence.\n" | |
| ) | |
| payload = { | |
| "model": model, | |
| "temperature": float(max(0.0, min(1.2, temperature))), | |
| "messages": [ | |
| {"role": "system", "content": system_msg}, | |
| {"role": "user", "content": user_msg}, | |
| ], | |
| "response_format": {"type": "json_object"}, | |
| } | |
| headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} | |
| response = requests.post(endpoint, headers=headers, json=payload, timeout=60) | |
| response.raise_for_status() | |
| data = response.json() | |
| content = ( | |
| data.get("choices", [{}])[0] | |
| .get("message", {}) | |
| .get("content", "") | |
| ) | |
| parsed = _extract_json_object(content) | |
| edited = "" | |
| rationale = "" | |
| if parsed: | |
| edited = str(parsed.get("edited_text") or parsed.get("title") or "").strip() | |
| rationale = str(parsed.get("rationale") or "").strip() | |
| if not edited: | |
| raise ValueError("LLM returned invalid JSON title payload.") | |
| return { | |
| "edited_text": edited, | |
| "rationale": rationale, | |
| "prompt_debug": { | |
| "operation": "title_rewrite", | |
| "cascade_level": cascade_level, | |
| "goal_type": "title", | |
| "goal_label": "title alignment", | |
| "focus_terms": focus_terms, | |
| "avoid_terms": avoid_terms, | |
| "phrase_strategy_mode": strategy_mode, | |
| "phrase_strategy": phrase_strategy, | |
| "chunk_text": current_title.strip(), | |
| "context_before": (body_excerpt or "")[:500], | |
| "context_after": competitor_title_hint[:500] if competitor_title_hint else "", | |
| "temperature": float(max(0.0, min(1.2, temperature))), | |
| }, | |
| } | |
| def _replace_span(sentences: List[str], start_idx: int, end_idx: int, replacement_text: str) -> List[str]: | |
| replacement = _split_sentences(replacement_text) | |
| if not replacement: | |
| replacement = [replacement_text.strip()] | |
| return sentences[:start_idx] + replacement + sentences[end_idx + 1 :] | |
| def _insert_after(sentences: List[str], after_idx: int, inserted_text: str) -> List[str]: | |
| insertion = _split_sentences(inserted_text) | |
| if not insertion: | |
| insertion = [inserted_text.strip()] | |
| return sentences[: after_idx + 1] + insertion + sentences[after_idx + 1 :] | |
| def _goal_improved( | |
| goal_type: str, | |
| goal_label: str, | |
| prev_metrics: Dict[str, Any], | |
| next_metrics: Dict[str, Any], | |
| ) -> bool: | |
| if goal_type == "bert": | |
| key = (goal_label or "").strip().lower() | |
| prev_phrase = float((prev_metrics.get("bert_phrase_scores") or {}).get(key, 0.0)) | |
| next_phrase = float((next_metrics.get("bert_phrase_scores") or {}).get(key, 0.0)) | |
| # Accept smaller but real phrase-level gains to accumulate progress toward threshold (e.g. 0.51 -> 0.70). | |
| return (next_phrase - prev_phrase) >= BERT_GOAL_DELTA_MIN or next_metrics["bert_low_count"] < prev_metrics["bert_low_count"] | |
| if goal_type == "bm25": | |
| return next_metrics["bm25_remove_count"] < prev_metrics["bm25_remove_count"] | |
| if goal_type == "semantic": | |
| return next_metrics["semantic_gap_count"] < prev_metrics["semantic_gap_count"] | |
| if goal_type == "ngram": | |
| return ( | |
| next_metrics["ngram_signal_count"] < prev_metrics["ngram_signal_count"] | |
| or float(next_metrics.get("ngram_gap_sum", 0.0)) < float(prev_metrics.get("ngram_gap_sum", 0.0)) | |
| ) | |
| if goal_type == "title": | |
| pt = prev_metrics.get("title_bert_score") | |
| nt = next_metrics.get("title_bert_score") | |
| if pt is None or nt is None: | |
| return False | |
| return float(nt) > float(pt) or float(nt) >= TITLE_TARGET_THRESHOLD | |
| return next_metrics["score"] > prev_metrics["score"] | |
| def _bert_phrase_delta(goal_label: str, prev_metrics: Dict[str, Any], next_metrics: Dict[str, Any]) -> float: | |
| key = (goal_label or "").strip().lower() | |
| prev_phrase = float((prev_metrics.get("bert_phrase_scores") or {}).get(key, 0.0)) | |
| next_phrase = float((next_metrics.get("bert_phrase_scores") or {}).get(key, 0.0)) | |
| return round(next_phrase - prev_phrase, 4) | |
| def _safe_delta(prev_metrics: Dict[str, Any], next_metrics: Dict[str, Any], key: str) -> float: | |
| try: | |
| return float(next_metrics.get(key, 0.0)) - float(prev_metrics.get(key, 0.0)) | |
| except Exception: | |
| return 0.0 | |
| def _stage_primary_progress(stage: str, prev_metrics: Dict[str, Any], next_metrics: Dict[str, Any]) -> bool: | |
| if stage == "bert": | |
| prev_low = int(prev_metrics.get("bert_low_count", 0)) | |
| next_low = int(next_metrics.get("bert_low_count", 0)) | |
| if next_low < prev_low: | |
| return True | |
| prev_max = max([0.0] + [float(v) for v in (prev_metrics.get("bert_phrase_scores") or {}).values()]) | |
| next_max = max([0.0] + [float(v) for v in (next_metrics.get("bert_phrase_scores") or {}).values()]) | |
| return (next_max - prev_max) >= BERT_GOAL_DELTA_MIN | |
| if stage == "bm25": | |
| return int(next_metrics.get("bm25_remove_count", 0)) < int(prev_metrics.get("bm25_remove_count", 0)) | |
| if stage == "semantic": | |
| return ( | |
| int(next_metrics.get("semantic_gap_count", 0)) < int(prev_metrics.get("semantic_gap_count", 0)) | |
| or float(next_metrics.get("semantic_gap_sum", 0.0)) < float(prev_metrics.get("semantic_gap_sum", 0.0)) | |
| ) | |
| if stage == "ngram": | |
| return ( | |
| int(next_metrics.get("ngram_signal_count", 0)) < int(prev_metrics.get("ngram_signal_count", 0)) | |
| or float(next_metrics.get("ngram_gap_sum", 0.0)) < float(prev_metrics.get("ngram_gap_sum", 0.0)) | |
| ) | |
| if stage == "title": | |
| pv = prev_metrics.get("title_bert_score") | |
| nv = next_metrics.get("title_bert_score") | |
| if pv is None or nv is None: | |
| return False | |
| return float(nv) > float(pv) | |
| return False | |
| def _is_stage_complete(stage: str, metrics: Dict[str, Any], bert_stage_target: float = BERT_TARGET_THRESHOLD) -> bool: | |
| if stage == "bert": | |
| # Complete only when all tracked BERT phrase scores meet the threshold. | |
| # This enforces per-phrase target behavior (no early exit on one strong phrase). | |
| scores = [float(v) for v in (metrics.get("bert_phrase_scores") or {}).values()] | |
| if not scores: | |
| return True | |
| return min(scores) >= float(bert_stage_target) | |
| if stage == "bm25": | |
| return int(metrics.get("bm25_remove_count", 0)) <= 3 | |
| if stage == "semantic": | |
| return int(metrics.get("semantic_gap_count", 0)) <= 0 | |
| if stage == "ngram": | |
| return int(metrics.get("ngram_signal_count", 0)) <= 0 | |
| if stage == "title": | |
| score = metrics.get("title_bert_score") | |
| return (score is None) or (float(score) >= TITLE_TARGET_THRESHOLD) | |
| return True | |
| def _advance_ngram_term_cursor(cursor_state: Dict[str, Dict[str, int]], stage_key: str) -> None: | |
| state = cursor_state.get(stage_key) or {"term_index": 0, "attempt_count": 0} | |
| attempts = int(state.get("attempt_count", 0)) + 1 | |
| term_index = int(state.get("term_index", 0)) | |
| if attempts >= NGRAM_ATTEMPTS_PER_TERM: | |
| term_index += 1 | |
| attempts = 0 | |
| cursor_state[stage_key] = {"term_index": term_index, "attempt_count": attempts} | |
| def _candidate_utility( | |
| *, | |
| prev_metrics: Dict[str, Any], | |
| next_metrics: Dict[str, Any], | |
| goal_type: str, | |
| goal_label: str, | |
| bert_phrase_delta: float, | |
| chunk_goal_delta: float, | |
| local_chunk_improved: bool, | |
| ) -> float: | |
| score_delta = _safe_delta(prev_metrics, next_metrics, "score") | |
| bm25_delta = _safe_delta(prev_metrics, next_metrics, "bm25_remove_count") | |
| bert_low_delta = _safe_delta(prev_metrics, next_metrics, "bert_low_count") | |
| ngram_delta = _safe_delta(prev_metrics, next_metrics, "ngram_signal_count") | |
| sem_gap_delta = _safe_delta(prev_metrics, next_metrics, "semantic_gap_count") | |
| title_delta = _safe_delta(prev_metrics, next_metrics, "title_bert_score") | |
| # Dynamic emphasis: | |
| # - if target phrase is still far from threshold, prioritize phrase-level BERT gains | |
| # - but keep non-BERT regressions as penalties to preserve future optimization capacity | |
| key = (goal_label or "").strip().lower() | |
| prev_phrase = float((prev_metrics.get("bert_phrase_scores") or {}).get(key, 0.0)) | |
| bert_push_mode = (goal_type == "bert" and prev_phrase < BERT_TARGET_THRESHOLD) | |
| w_phrase = 7.5 if bert_push_mode else 3.0 | |
| w_chunk = 1.6 if bert_push_mode else 1.0 | |
| w_score = 1.0 | |
| utility = ( | |
| (w_phrase * float(bert_phrase_delta)) | |
| + (w_chunk * float(chunk_goal_delta)) | |
| + (w_score * float(score_delta)) | |
| ) | |
| if local_chunk_improved: | |
| utility += 0.3 | |
| # Cross-metric guardrails as soft penalties (in addition to hard validity checks). | |
| utility -= max(0.0, bm25_delta) * 1.8 | |
| utility -= max(0.0, bert_low_delta) * 2.4 | |
| utility -= max(0.0, ngram_delta) * 0.6 | |
| utility -= max(0.0, sem_gap_delta) * 1.5 | |
| utility += min(0.0, title_delta) * 1.2 | |
| if goal_type == "title": | |
| utility += max(0.0, title_delta) * 5.0 | |
| return round(float(utility), 4) | |
| def _is_candidate_valid( | |
| prev_metrics: Dict[str, Any], | |
| next_metrics: Dict[str, Any], | |
| goal_type: str, | |
| goal_label: str, | |
| optimization_mode: str, | |
| ) -> Tuple[bool, List[str], bool]: | |
| mode = (optimization_mode or "balanced").lower() | |
| if mode not in {"conservative", "balanced", "aggressive"}: | |
| mode = "balanced" | |
| cfg = { | |
| "conservative": {"max_score_drop": 0.0, "max_title_drop": 0.02}, | |
| "balanced": {"max_score_drop": 1.0, "max_title_drop": 0.03}, | |
| "aggressive": {"max_score_drop": 2.0, "max_title_drop": 0.05}, | |
| }[mode] | |
| reasons = [] | |
| score_drop = float(prev_metrics["score"]) - float(next_metrics["score"]) | |
| if score_drop > cfg["max_score_drop"]: | |
| reasons.append(f"score_drop>{cfg['max_score_drop']}") | |
| # Hard regressions in critical counters. | |
| if next_metrics["bm25_remove_count"] > prev_metrics["bm25_remove_count"] + (1 if mode == "aggressive" else 0): | |
| reasons.append("bm25_remove_regression") | |
| if next_metrics["bert_low_count"] > prev_metrics["bert_low_count"] + (1 if mode == "aggressive" else 0): | |
| reasons.append("bert_low_regression") | |
| if next_metrics["semantic_gap_count"] > prev_metrics["semantic_gap_count"] + (1 if mode == "aggressive" else 0): | |
| reasons.append("semantic_gap_regression") | |
| prev_title = prev_metrics.get("title_bert_score") | |
| next_title = next_metrics.get("title_bert_score") | |
| if goal_type != "title": | |
| if prev_title is not None and next_title is not None and next_title < (prev_title - cfg["max_title_drop"]): | |
| reasons.append("title_bert_drop") | |
| improved = _goal_improved(goal_type, goal_label, prev_metrics, next_metrics) | |
| # In conservative mode require explicit goal improvement. | |
| if mode == "conservative" and not improved: | |
| reasons.append("goal_not_improved") | |
| return (len(reasons) == 0), reasons, improved | |
| def optimize_text( | |
| request_data: Dict[str, Any], | |
| progress_callback: Optional[Any] = None, | |
| cancel_event: Optional[Any] = None, | |
| ) -> Dict[str, Any]: | |
| target_text = str(request_data.get("target_text", "")).strip() | |
| competitors = [str(x) for x in (request_data.get("competitors") or []) if str(x).strip()] | |
| keywords = [str(x) for x in (request_data.get("keywords") or []) if str(x).strip()] | |
| language = str(request_data.get("language", "en")).strip() or "en" | |
| target_title = str(request_data.get("target_title", "") or "") | |
| competitor_titles = [str(x) for x in (request_data.get("competitor_titles") or [])] | |
| diff_mode_used = str(request_data.get("diff_mode", "diff_from_input") or "diff_from_input").strip().lower() | |
| if diff_mode_used not in {"diff_from_input", "diff_from_original"}: | |
| diff_mode_used = "diff_from_input" | |
| diff_base_text = target_text | |
| diff_base_title = target_title | |
| if diff_mode_used == "diff_from_original": | |
| diff_base_text = str(request_data.get("original_target_text") or target_text or "").strip() | |
| diff_base_title = str(request_data.get("original_target_title") or target_title or "").strip() | |
| else: | |
| diff_base_text = (target_text or "").strip() | |
| diff_base_title = (target_title or "").strip() | |
| api_key = str(request_data.get("api_key", "")).strip() | |
| if not api_key: | |
| raise ValueError("API key is required.") | |
| base_url = str(request_data.get("api_base_url", "https://api.deepseek.com/v1")).strip() or "https://api.deepseek.com/v1" | |
| model = str(request_data.get("model", "deepseek-chat")).strip() or "deepseek-chat" | |
| max_iterations = int(request_data.get("max_iterations", 2) or 2) | |
| max_iterations = max(1, min(8, max_iterations)) | |
| candidates_per_iteration = int(request_data.get("candidates_per_iteration", 2) or 2) | |
| candidates_per_iteration = max(1, min(5, candidates_per_iteration)) | |
| temperature = float(request_data.get("temperature", 0.25) or 0.25) | |
| optimization_mode = str(request_data.get("optimization_mode", "balanced") or "balanced") | |
| phrase_strategy_mode = str(request_data.get("phrase_strategy_mode", "auto") or "auto").strip().lower() | |
| if phrase_strategy_mode not in {"auto", "exact_preferred", "distributed_preferred", "ensemble"}: | |
| phrase_strategy_mode = "auto" | |
| bert_stage_target = float(request_data.get("bert_stage_target", BERT_TARGET_THRESHOLD) or BERT_TARGET_THRESHOLD) | |
| bert_stage_target = max(0.0, min(1.0, bert_stage_target)) | |
| stage_goal_overrides = request_data.get("stage_goal_overrides") or {} | |
| if not isinstance(stage_goal_overrides, dict): | |
| stage_goal_overrides = {} | |
| req_enabled_stages = request_data.get("enabled_stages") or [] | |
| active_stage_order: List[str] = [] | |
| if isinstance(req_enabled_stages, list): | |
| for x in req_enabled_stages: | |
| st = _normalize_stage_name(x) | |
| if st and st not in active_stage_order: | |
| active_stage_order.append(st) | |
| if not active_stage_order: | |
| active_stage_order = list(STAGE_ORDER) | |
| baseline_analysis = _build_analysis_snapshot( | |
| target_text, competitors, keywords, language, target_title, competitor_titles | |
| ) | |
| baseline_semantic = _build_semantic_snapshot(target_text, competitors, language) | |
| baseline_metrics = _compute_metrics( | |
| baseline_analysis, baseline_semantic, keywords, language, bert_stage_target=bert_stage_target | |
| ) | |
| # Per-goal iteration budget scales with deficit; total loop steps = sum(effective iters per goal). | |
| baseline_goal_counts = { | |
| st: len( | |
| _collect_optimization_goals( | |
| baseline_analysis, | |
| baseline_semantic, | |
| keywords, | |
| language, | |
| stage=st, | |
| bert_stage_target=bert_stage_target, | |
| stage_goal_overrides=stage_goal_overrides, | |
| ) | |
| ) | |
| for st in active_stage_order | |
| } | |
| ngram_row_count = int(baseline_goal_counts.get("ngram", 0)) | |
| total_loop_steps = _estimate_total_loop_budget( | |
| baseline_analysis, | |
| baseline_semantic, | |
| keywords, | |
| language, | |
| max_iterations, | |
| candidates_per_iteration, | |
| bert_stage_target, | |
| active_stage_order=active_stage_order, | |
| stage_goal_overrides=stage_goal_overrides, | |
| ) | |
| current_text = target_text | |
| current_title = (target_title or "").strip() | |
| current_analysis = baseline_analysis | |
| current_semantic = baseline_semantic | |
| current_metrics = baseline_metrics | |
| logs: List[Dict[str, Any]] = [] | |
| applied_changes = 0 | |
| def _emit(ev: str, **kwargs: Any) -> None: | |
| if progress_callback: | |
| try: | |
| progress_callback({"event": ev, **kwargs}) | |
| except Exception: | |
| pass | |
| def _cancelled() -> bool: | |
| return cancel_event is not None and getattr(cancel_event, "is_set", lambda: False)() | |
| def _pack_result(stopped_early: bool = False, stop_reason: str = "") -> Dict[str, Any]: | |
| # Title string must match what last metrics used (title_analysis.target_title), not only the mutable var. | |
| _ta = (current_analysis or {}).get("title_analysis") or {} | |
| _ot = "" | |
| if isinstance(_ta, dict) and (_ta.get("target_title") or "").strip(): | |
| _ot = str(_ta.get("target_title")).strip() | |
| if not _ot: | |
| _ot = (current_title or "").strip() | |
| if not _ot: | |
| _ot = (target_title or "").strip() | |
| diff_body_html = "" | |
| diff_changes: List[Dict[str, str]] = [] | |
| if (diff_base_text or "").strip() != (current_text or "").strip(): | |
| dh, dc = _diff_sentences_html(diff_base_text or "", current_text or "") | |
| if dc: | |
| diff_body_html = dh | |
| diff_changes = dc | |
| diff_title_html = "" | |
| diff_title_changes: List[Dict[str, str]] = [] | |
| if (diff_base_title or "").strip() != (_ot or "").strip(): | |
| dth, dtc = _diff_title_html(diff_base_title or "", _ot or "") | |
| if dtc: | |
| diff_title_html = dth | |
| diff_title_changes = dtc | |
| return { | |
| "ok": True, | |
| "optimized_text": current_text, | |
| "optimized_title": _ot, | |
| "baseline_metrics": baseline_metrics, | |
| "final_metrics": current_metrics, | |
| "iterations": logs, | |
| "applied_changes": applied_changes, | |
| "optimization_mode": optimization_mode, | |
| "phrase_strategy_mode": phrase_strategy_mode, | |
| "bert_stage_target": round(bert_stage_target, 4), | |
| "diff_mode": diff_mode_used, | |
| "diff_body_html": diff_body_html, | |
| "diff_title_html": diff_title_html, | |
| "diff_changes": diff_changes, | |
| "diff_title_changes": diff_title_changes, | |
| "stopped_early": stopped_early, | |
| "stop_reason": stop_reason, | |
| } | |
| _emit("preparing", message="Подготовка", phase="baseline") | |
| _emit( | |
| "started", | |
| total_steps=total_loop_steps, | |
| max_iterations_setting=max_iterations, | |
| ngram_targets=ngram_row_count, | |
| stages_order=list(active_stage_order), | |
| ) | |
| seen_candidate_rewrites = set() | |
| cascade_level = 1 | |
| consecutive_failures = 0 | |
| goal_attempt_cursor: Dict[str, int] = {} | |
| attempted_spans = set() | |
| queued_candidates: List[Dict[str, Any]] = [] | |
| stage_idx = 0 | |
| stage_no_progress_steps = 0 | |
| stage_goal_cursor: Dict[str, Dict[str, int]] = {} | |
| for step in range(total_loop_steps): | |
| if _cancelled(): | |
| logs.append( | |
| { | |
| "step": step + 1, | |
| "status": "stopped", | |
| "reason": "Остановка пользователем (сохранён текущий текст и метрики).", | |
| } | |
| ) | |
| return _pack_result(stopped_early=True, stop_reason="user_cancelled") | |
| while stage_idx < len(active_stage_order) and _is_stage_complete( | |
| active_stage_order[stage_idx], current_metrics, bert_stage_target=bert_stage_target | |
| ): | |
| stage_idx += 1 | |
| stage_no_progress_steps = 0 | |
| if stage_idx >= len(active_stage_order): | |
| logs.append({"step": step + 1, "status": "stopped", "reason": "All optimization stages completed."}) | |
| break | |
| active_stage = active_stage_order[stage_idx] | |
| goals_for_stage = _collect_optimization_goals( | |
| current_analysis, | |
| current_semantic, | |
| keywords, | |
| language, | |
| stage=active_stage, | |
| bert_stage_target=bert_stage_target, | |
| stage_goal_overrides=stage_goal_overrides, | |
| ) | |
| state = stage_goal_cursor.get(active_stage) or {"goal_index": 0, "attempt_count": 0} | |
| goal_index = int(state.get("goal_index", 0)) | |
| attempt_count = int(state.get("attempt_count", 0)) | |
| # Advance across goals that exhausted per-goal iteration budget (scaled by deficit). | |
| while goal_index < len(goals_for_stage): | |
| g_try = goals_for_stage[goal_index] | |
| eff_max_iter, _ = _per_goal_budget(g_try, max_iterations, candidates_per_iteration, bert_stage_target) | |
| if attempt_count < eff_max_iter: | |
| break | |
| goal_index += 1 | |
| attempt_count = 0 | |
| if goal_index >= len(goals_for_stage): | |
| stage_idx += 1 | |
| stage_no_progress_steps = 0 | |
| logs.append( | |
| { | |
| "step": step + 1, | |
| "status": "stage_skipped", | |
| "stage": active_stage, | |
| "reason": f"All goals exhausted for stage '{active_stage}' (per-goal iteration budget).", | |
| } | |
| ) | |
| stage_goal_cursor[active_stage] = {"goal_index": goal_index, "attempt_count": attempt_count} | |
| continue | |
| goal = goals_for_stage[goal_index] | |
| eff_max_iter, eff_cand = _per_goal_budget(goal, max_iterations, candidates_per_iteration, bert_stage_target) | |
| attempt_count += 1 | |
| stage_goal_cursor[active_stage] = {"goal_index": goal_index, "attempt_count": attempt_count} | |
| if goal["type"] == "none": | |
| stage_idx += 1 | |
| stage_no_progress_steps = 0 | |
| logs.append( | |
| { | |
| "step": step + 1, | |
| "status": "stage_skipped", | |
| "stage": active_stage, | |
| "reason": f"No actionable goals for stage '{active_stage}', moving to next stage.", | |
| } | |
| ) | |
| continue | |
| _emit( | |
| "step_start", | |
| step=step + 1, | |
| total_steps=total_loop_steps, | |
| active_stage=active_stage, | |
| goal_type=goal.get("type"), | |
| goal_label=goal.get("label"), | |
| score=current_metrics.get("score"), | |
| goal_budget_iter=eff_max_iter, | |
| goal_budget_candidates=eff_cand, | |
| ) | |
| goal_key = f"{goal.get('type', '')}:{goal.get('label', '')}".strip().lower() | |
| base_attempt_cursor = int(goal_attempt_cursor.get(goal_key, 0)) | |
| candidates: List[Dict[str, Any]] = [] | |
| chosen_spans: List[Dict[str, Any]] = [] | |
| candidate_idx = 0 | |
| span_trials_eff = 1 | |
| if goal.get("type") == "title": | |
| if not current_title: | |
| logs.append( | |
| { | |
| "step": step + 1, | |
| "status": "stage_skipped", | |
| "stage": active_stage, | |
| "reason": "Поле Title пустое — этап title пропущен.", | |
| } | |
| ) | |
| stage_idx += 1 | |
| continue | |
| chosen_spans = [ | |
| { | |
| "operation": "title_rewrite", | |
| "span_start": 0, | |
| "span_end": 0, | |
| "sentence_index": 0, | |
| "span_variant": 0, | |
| "sentence_before": current_title, | |
| } | |
| ] | |
| original_span_text = current_title | |
| body_excerpt = (current_text or "").strip()[:1200] | |
| comp_ctx = " | ".join(t.strip() for t in competitor_titles[:4] if t.strip()) | |
| competitor_title_hint = ( | |
| f"Примеры title конкурентов (только стиль/длина): {comp_ctx}" if comp_ctx else "" | |
| ) | |
| strategy_plan = _build_phrase_strategy_plan( | |
| phrase_strategy_mode, | |
| "title", | |
| str(goal.get("label", "")), | |
| eff_cand, | |
| ) | |
| for strategy_variant in strategy_plan: | |
| candidate_idx += 1 | |
| temp = min(1.1, max(0.0, temperature + (candidate_idx - 1) * 0.07)) | |
| if _cancelled(): | |
| logs.append( | |
| { | |
| "step": step + 1, | |
| "status": "stopped", | |
| "reason": "Остановка пользователем перед запросом к LLM.", | |
| } | |
| ) | |
| return _pack_result(stopped_early=True, stop_reason="user_cancelled") | |
| _emit( | |
| "llm_call", | |
| candidate_index=candidate_idx, | |
| span_trial=1, | |
| span_trials=1, | |
| strategy=strategy_variant, | |
| ) | |
| try: | |
| llm_result = _llm_edit_title( | |
| api_key=api_key, | |
| base_url=base_url, | |
| model=model, | |
| language=language, | |
| current_title=current_title, | |
| body_excerpt=body_excerpt, | |
| competitor_title_hint=competitor_title_hint, | |
| focus_terms=goal.get("focus_terms", []) or [], | |
| avoid_terms=goal.get("avoid_terms", []) or [], | |
| keywords=keywords, | |
| cascade_level=cascade_level, | |
| temperature=temp, | |
| phrase_strategy_mode=strategy_variant, | |
| ) | |
| edited_text = str((llm_result or {}).get("edited_text", "")).strip() | |
| llm_rationale = str((llm_result or {}).get("rationale", "")).strip() | |
| prompt_debug = (llm_result or {}).get("prompt_debug", {}) | |
| if not edited_text or edited_text == original_span_text.strip(): | |
| continue | |
| quality_issues = _validate_title_candidate(edited_text) | |
| cand_analysis = _build_analysis_snapshot( | |
| current_text, competitors, keywords, language, edited_text, competitor_titles | |
| ) | |
| cand_semantic = _build_semantic_snapshot(current_text, competitors, language) | |
| cand_metrics = _compute_metrics( | |
| cand_analysis, cand_semantic, keywords, language, bert_stage_target=bert_stage_target | |
| ) | |
| before_rel = float(current_metrics.get("title_bert_score") or 0.0) | |
| after_rel = float(cand_metrics.get("title_bert_score") or 0.0) | |
| chunk_delta = round(after_rel - before_rel, 4) | |
| local_chunk_improved = chunk_delta >= _min_chunk_delta("title") | |
| bert_phrase_delta = 0.0 | |
| valid, invalid_reasons, goal_improved = _is_candidate_valid( | |
| current_metrics, cand_metrics, goal["type"], goal["label"], optimization_mode | |
| ) | |
| delta_score = round(cand_metrics["score"] - current_metrics["score"], 3) | |
| candidate_utility = _candidate_utility( | |
| prev_metrics=current_metrics, | |
| next_metrics=cand_metrics, | |
| goal_type=str(goal.get("type", "")), | |
| goal_label=str(goal.get("label", "")), | |
| bert_phrase_delta=bert_phrase_delta, | |
| chunk_goal_delta=chunk_delta, | |
| local_chunk_improved=local_chunk_improved, | |
| ) | |
| md = _metrics_delta(current_metrics, cand_metrics) | |
| if quality_issues: | |
| candidates.append( | |
| { | |
| "candidate_index": candidate_idx, | |
| "error": "quality_validation_failed", | |
| "valid": False, | |
| "goal_improved": False, | |
| "local_chunk_improved": local_chunk_improved, | |
| "chunk_goal_delta": chunk_delta, | |
| "invalid_reasons": quality_issues, | |
| "delta_score": -999.0, | |
| "candidate_score": None, | |
| "sentence_after": edited_text, | |
| "chunk_relevance_before": before_rel, | |
| "chunk_relevance_after": after_rel, | |
| "term_diff": _term_diff(original_span_text, edited_text, language), | |
| "llm_prompt_debug": prompt_debug, | |
| "llm_rationale": llm_rationale, | |
| "operation": "title_rewrite", | |
| "sentence_index": 0, | |
| "span_start": 0, | |
| "span_end": 0, | |
| "span_variant": 0, | |
| "phrase_strategy_used": strategy_variant, | |
| "sentence_before": original_span_text, | |
| } | |
| ) | |
| continue | |
| candidate_key = ("title_rewrite", 0, 0, edited_text.strip().lower()) | |
| if candidate_key in seen_candidate_rewrites: | |
| candidates.append( | |
| { | |
| "candidate_index": candidate_idx, | |
| "error": "duplicate_candidate_rewrite", | |
| "valid": False, | |
| "goal_improved": False, | |
| "local_chunk_improved": local_chunk_improved, | |
| "chunk_goal_delta": chunk_delta, | |
| "invalid_reasons": ["duplicate_candidate_rewrite"], | |
| "delta_score": -999.0, | |
| "candidate_score": None, | |
| "chunk_relevance_before": before_rel, | |
| "chunk_relevance_after": after_rel, | |
| "term_diff": _term_diff(original_span_text, edited_text, language), | |
| "llm_prompt_debug": prompt_debug, | |
| "llm_rationale": llm_rationale, | |
| "operation": "title_rewrite", | |
| "sentence_index": 0, | |
| "span_start": 0, | |
| "span_end": 0, | |
| "span_variant": 0, | |
| "phrase_strategy_used": strategy_variant, | |
| "sentence_before": original_span_text, | |
| } | |
| ) | |
| continue | |
| seen_candidate_rewrites.add(candidate_key) | |
| candidate_text = current_text | |
| candidates.append( | |
| { | |
| "candidate_index": candidate_idx, | |
| "sentence_before": original_span_text, | |
| "sentence_after": edited_text, | |
| "operation": "title_rewrite", | |
| "sentence_index": 0, | |
| "span_start": 0, | |
| "span_end": 0, | |
| "span_variant": 0, | |
| "phrase_strategy_used": strategy_variant, | |
| "text": candidate_text, | |
| "new_title": edited_text, | |
| "analysis": cand_analysis, | |
| "semantic": cand_semantic, | |
| "metrics": cand_metrics, | |
| "valid": valid, | |
| "goal_improved": goal_improved, | |
| "bert_phrase_delta": bert_phrase_delta, | |
| "local_chunk_improved": local_chunk_improved, | |
| "chunk_goal_delta": chunk_delta, | |
| "chunk_relevance_before": before_rel, | |
| "chunk_relevance_after": after_rel, | |
| "term_diff": _term_diff(original_span_text, edited_text, language), | |
| "llm_prompt_debug": prompt_debug, | |
| "llm_rationale": llm_rationale, | |
| "invalid_reasons": invalid_reasons, | |
| "delta_score": delta_score, | |
| "candidate_score": cand_metrics.get("score"), | |
| "candidate_utility": candidate_utility, | |
| "metrics_delta": md, | |
| "edit_payload": { | |
| "operation": "title_rewrite", | |
| "span_start": 0, | |
| "span_end": 0, | |
| "edited_text": edited_text, | |
| }, | |
| } | |
| ) | |
| except Exception as e: | |
| candidates.append( | |
| { | |
| "candidate_index": candidate_idx, | |
| "error": str(e), | |
| "valid": False, | |
| "goal_improved": False, | |
| "local_chunk_improved": False, | |
| "chunk_goal_delta": -999.0, | |
| "invalid_reasons": [str(e)], | |
| "delta_score": -999.0, | |
| "candidate_score": None, | |
| "llm_prompt_debug": { | |
| "operation": "title_rewrite", | |
| "cascade_level": cascade_level, | |
| "goal_type": goal.get("type"), | |
| "goal_label": goal.get("label"), | |
| "phrase_strategy_mode": strategy_variant, | |
| }, | |
| "llm_rationale": "", | |
| "operation": "title_rewrite", | |
| "sentence_index": 0, | |
| "span_start": 0, | |
| "span_end": 0, | |
| "span_variant": 0, | |
| "phrase_strategy_used": strategy_variant, | |
| "sentence_before": current_title, | |
| } | |
| ) | |
| else: | |
| sentences = _split_sentences(current_text) | |
| if not sentences: | |
| logs.append({"step": step + 1, "status": "stopped", "reason": "No sentences available for editing."}) | |
| break | |
| span_trials = 2 if cascade_level <= 2 else 3 | |
| local_candidates = eff_cand if cascade_level <= 2 else min(6, eff_cand + 1) | |
| span_trials_eff = span_trials | |
| for st in range(span_trials): | |
| attempt_cursor = base_attempt_cursor + st | |
| operation, span_start, span_end, sent_idx, span_variant = _choose_edit_span( | |
| sentences, goal, language, cascade_level, attempt_cursor | |
| ) | |
| max_span_retries = max(1, len(sentences) * 4) | |
| retries = 0 | |
| while retries < max_span_retries: | |
| span_key = (goal_key, cascade_level, operation, span_start, span_end) | |
| if span_key not in attempted_spans: | |
| attempted_spans.add(span_key) | |
| break | |
| attempt_cursor += 1 | |
| operation, span_start, span_end, sent_idx, span_variant = _choose_edit_span( | |
| sentences, goal, language, cascade_level, attempt_cursor | |
| ) | |
| retries += 1 | |
| original_span_text = " ".join(sentences[span_start : span_end + 1]).strip() | |
| context_before = " ".join(sentences[max(0, span_start - 2) : span_start]).strip() | |
| context_after = " ".join(sentences[span_end + 1 : min(len(sentences), span_end + 3)]).strip() | |
| chosen_spans.append( | |
| { | |
| "operation": operation, | |
| "span_start": span_start, | |
| "span_end": span_end, | |
| "sentence_index": sent_idx, | |
| "span_variant": span_variant, | |
| "sentence_before": original_span_text, | |
| } | |
| ) | |
| per_span_candidates = max(1, local_candidates // span_trials) | |
| strategy_plan = _build_phrase_strategy_plan( | |
| phrase_strategy_mode, | |
| str(goal.get("type", "")), | |
| str(goal.get("label", "")), | |
| per_span_candidates, | |
| ) | |
| for strategy_variant in strategy_plan: | |
| candidate_idx += 1 | |
| temp = min(1.1, max(0.0, temperature + (candidate_idx - 1) * 0.07)) | |
| if _cancelled(): | |
| logs.append( | |
| { | |
| "step": step + 1, | |
| "status": "stopped", | |
| "reason": "Остановка пользователем перед запросом к LLM.", | |
| } | |
| ) | |
| return _pack_result(stopped_early=True, stop_reason="user_cancelled") | |
| _emit( | |
| "llm_call", | |
| candidate_index=candidate_idx, | |
| span_trial=st + 1, | |
| span_trials=span_trials, | |
| strategy=strategy_variant, | |
| ) | |
| try: | |
| llm_result = _llm_edit_chunk( | |
| api_key=api_key, | |
| base_url=base_url, | |
| model=model, | |
| language=language, | |
| full_text=current_text, | |
| chunk_text=original_span_text, | |
| operation=operation, | |
| context_before=context_before, | |
| context_after=context_after, | |
| cascade_level=cascade_level, | |
| goal_type=goal["type"], | |
| goal_label=goal["label"], | |
| focus_terms=goal["focus_terms"], | |
| avoid_terms=goal["avoid_terms"], | |
| temperature=temp, | |
| phrase_strategy_mode=strategy_variant, | |
| ) | |
| edited_text = str((llm_result or {}).get("edited_text", "")).strip() | |
| llm_rationale = str((llm_result or {}).get("rationale", "")).strip() | |
| prompt_debug = (llm_result or {}).get("prompt_debug", {}) | |
| if not edited_text or edited_text == original_span_text: | |
| continue | |
| quality_issues = _validate_candidate_text( | |
| edited_text, | |
| cascade_level, | |
| operation, | |
| goal_label=goal.get("label", ""), | |
| focus_terms=goal.get("focus_terms", []) or [], | |
| ) | |
| before_rel, after_rel = _chunk_relevance_pair( | |
| original_span_text, | |
| edited_text, | |
| goal["type"], | |
| goal["label"], | |
| goal.get("focus_terms", []) or [], | |
| language, | |
| goal, | |
| ) | |
| chunk_delta = _chunk_goal_delta( | |
| original_span_text, | |
| edited_text, | |
| goal["type"], | |
| goal["label"], | |
| goal.get("focus_terms", []) or [], | |
| language, | |
| goal, | |
| ) | |
| local_chunk_improved = chunk_delta >= _min_chunk_delta(goal["type"]) | |
| if quality_issues: | |
| candidates.append( | |
| { | |
| "candidate_index": candidate_idx, | |
| "error": "quality_validation_failed", | |
| "valid": False, | |
| "goal_improved": False, | |
| "local_chunk_improved": local_chunk_improved, | |
| "chunk_goal_delta": chunk_delta, | |
| "invalid_reasons": quality_issues, | |
| "delta_score": -999.0, | |
| "candidate_score": None, | |
| "sentence_after": edited_text, | |
| "chunk_relevance_before": before_rel, | |
| "chunk_relevance_after": after_rel, | |
| "term_diff": _term_diff(original_span_text, edited_text, language), | |
| "llm_prompt_debug": prompt_debug, | |
| "llm_rationale": llm_rationale, | |
| "operation": operation, | |
| "sentence_index": sent_idx, | |
| "span_start": span_start, | |
| "span_end": span_end, | |
| "span_variant": span_variant, | |
| "phrase_strategy_used": strategy_variant, | |
| "sentence_before": original_span_text, | |
| } | |
| ) | |
| continue | |
| candidate_key = (operation, span_start, span_end, edited_text.strip().lower()) | |
| if candidate_key in seen_candidate_rewrites: | |
| candidates.append( | |
| { | |
| "candidate_index": candidate_idx, | |
| "error": "duplicate_candidate_rewrite", | |
| "valid": False, | |
| "goal_improved": False, | |
| "local_chunk_improved": local_chunk_improved, | |
| "chunk_goal_delta": chunk_delta, | |
| "invalid_reasons": ["duplicate_candidate_rewrite"], | |
| "delta_score": -999.0, | |
| "candidate_score": None, | |
| "chunk_relevance_before": before_rel, | |
| "chunk_relevance_after": after_rel, | |
| "term_diff": _term_diff(original_span_text, edited_text, language), | |
| "llm_prompt_debug": prompt_debug, | |
| "llm_rationale": llm_rationale, | |
| "operation": operation, | |
| "sentence_index": sent_idx, | |
| "span_start": span_start, | |
| "span_end": span_end, | |
| "span_variant": span_variant, | |
| "phrase_strategy_used": strategy_variant, | |
| "sentence_before": original_span_text, | |
| } | |
| ) | |
| continue | |
| seen_candidate_rewrites.add(candidate_key) | |
| if operation == "insert": | |
| candidate_sentences = _insert_after(sentences, span_end, edited_text) | |
| else: | |
| candidate_sentences = _replace_span(sentences, span_start, span_end, edited_text) | |
| candidate_text = " ".join(candidate_sentences).strip() | |
| cand_analysis = _build_analysis_snapshot( | |
| candidate_text, competitors, keywords, language, current_title, competitor_titles | |
| ) | |
| cand_semantic = _build_semantic_snapshot(candidate_text, competitors, language) | |
| cand_metrics = _compute_metrics( | |
| cand_analysis, cand_semantic, keywords, language, bert_stage_target=bert_stage_target | |
| ) | |
| valid, invalid_reasons, goal_improved = _is_candidate_valid( | |
| current_metrics, cand_metrics, goal["type"], goal["label"], optimization_mode | |
| ) | |
| delta_score = round(cand_metrics["score"] - current_metrics["score"], 3) | |
| bert_phrase_delta = _bert_phrase_delta(goal["label"], current_metrics, cand_metrics) if goal.get("type") == "bert" else 0.0 | |
| candidate_utility = _candidate_utility( | |
| prev_metrics=current_metrics, | |
| next_metrics=cand_metrics, | |
| goal_type=str(goal.get("type", "")), | |
| goal_label=str(goal.get("label", "")), | |
| bert_phrase_delta=bert_phrase_delta, | |
| chunk_goal_delta=chunk_delta, | |
| local_chunk_improved=local_chunk_improved, | |
| ) | |
| md = _metrics_delta(current_metrics, cand_metrics) | |
| candidates.append( | |
| { | |
| "candidate_index": candidate_idx, | |
| "sentence_before": original_span_text, | |
| "sentence_after": edited_text, | |
| "operation": operation, | |
| "sentence_index": sent_idx, | |
| "span_start": span_start, | |
| "span_end": span_end, | |
| "span_variant": span_variant, | |
| "phrase_strategy_used": strategy_variant, | |
| "text": candidate_text, | |
| "analysis": cand_analysis, | |
| "semantic": cand_semantic, | |
| "metrics": cand_metrics, | |
| "valid": valid, | |
| "goal_improved": goal_improved, | |
| "bert_phrase_delta": bert_phrase_delta, | |
| "local_chunk_improved": local_chunk_improved, | |
| "chunk_goal_delta": chunk_delta, | |
| "chunk_relevance_before": before_rel, | |
| "chunk_relevance_after": after_rel, | |
| "term_diff": _term_diff(original_span_text, edited_text, language), | |
| "llm_prompt_debug": prompt_debug, | |
| "llm_rationale": llm_rationale, | |
| "invalid_reasons": invalid_reasons, | |
| "delta_score": delta_score, | |
| "candidate_score": cand_metrics.get("score"), | |
| "candidate_utility": candidate_utility, | |
| "metrics_delta": md, | |
| "edit_payload": { | |
| "operation": operation, | |
| "span_start": span_start, | |
| "span_end": span_end, | |
| "edited_text": edited_text, | |
| }, | |
| } | |
| ) | |
| except Exception as e: | |
| candidates.append( | |
| { | |
| "candidate_index": candidate_idx, | |
| "error": str(e), | |
| "valid": False, | |
| "goal_improved": False, | |
| "local_chunk_improved": False, | |
| "chunk_goal_delta": -999.0, | |
| "invalid_reasons": [str(e)], | |
| "delta_score": -999.0, | |
| "candidate_score": None, | |
| "llm_prompt_debug": { | |
| "operation": operation, | |
| "cascade_level": cascade_level, | |
| "goal_type": goal.get("type"), | |
| "goal_label": goal.get("label"), | |
| "phrase_strategy_mode": strategy_variant, | |
| }, | |
| "llm_rationale": "", | |
| "operation": operation, | |
| "sentence_index": sent_idx, | |
| "span_start": span_start, | |
| "span_end": span_end, | |
| "span_variant": span_variant, | |
| "phrase_strategy_used": strategy_variant, | |
| "sentence_before": original_span_text, | |
| } | |
| ) | |
| goal_attempt_cursor[goal_key] = base_attempt_cursor + span_trials_eff | |
| primary_span = chosen_spans[0] if chosen_spans else {"operation": "-", "span_start": 0, "span_end": 0, "sentence_index": 0, "span_variant": 0, "sentence_before": ""} | |
| valid_raw_candidates = [c for c in candidates if c.get("valid")] | |
| valid_candidates = [ | |
| c | |
| for c in valid_raw_candidates | |
| if c.get("valid") | |
| and ( | |
| c.get("goal_improved") | |
| or (goal.get("type") == "bert" and float(c.get("bert_phrase_delta") or 0.0) > 0.0) | |
| or float(c.get("candidate_score") or -1) > float(current_metrics["score"]) | |
| or float(c.get("candidate_utility") or -999.0) > 0.0 | |
| ) | |
| and ( | |
| goal.get("type") != "bert" | |
| or float(c.get("bert_phrase_delta") or 0.0) > 0.0 | |
| or c.get("local_chunk_improved") | |
| ) | |
| ] | |
| if not valid_candidates: | |
| # Local-first accumulation mode: | |
| # if we have guardrail-valid candidates that improve chunk relevance, | |
| # apply the strongest local edit immediately and continue optimizing next chunks. | |
| local_progress_candidates = [ | |
| c | |
| for c in valid_raw_candidates | |
| if c.get("local_chunk_improved") | |
| ] | |
| if local_progress_candidates: | |
| best_local = sorted( | |
| local_progress_candidates, | |
| key=lambda c: ( | |
| float(c.get("candidate_utility") or -999.0), | |
| float(c.get("chunk_goal_delta") or 0.0), | |
| float(c.get("bert_phrase_delta") or 0.0), | |
| float(c.get("candidate_score") or -999.0), | |
| ), | |
| reverse=True, | |
| )[0] | |
| prev_metrics = current_metrics | |
| current_text = best_local["text"] | |
| if str(best_local.get("operation") or "") == "title_rewrite": | |
| nt = (best_local.get("new_title") or best_local.get("sentence_after") or "").strip() | |
| if nt: | |
| current_title = nt | |
| current_analysis = best_local["analysis"] | |
| current_semantic = best_local["semantic"] | |
| current_metrics = best_local["metrics"] | |
| progressed_stage = _stage_primary_progress(active_stage, prev_metrics, current_metrics) | |
| if progressed_stage: | |
| stage_no_progress_steps = 0 | |
| else: | |
| stage_no_progress_steps += 1 | |
| applied_changes += 1 | |
| queued_candidates = [] | |
| logs.append( | |
| { | |
| "step": step + 1, | |
| "status": "applied_local_progress", | |
| "stage": active_stage, | |
| "goal": goal, | |
| "cascade_level": cascade_level, | |
| "operation": best_local.get("operation"), | |
| "sentence_index": best_local.get("sentence_index"), | |
| "span_start": best_local.get("span_start"), | |
| "span_end": best_local.get("span_end"), | |
| "span_variant": best_local.get("span_variant"), | |
| "sentence_before": best_local.get("sentence_before"), | |
| "sentence_after": best_local.get("sentence_after"), | |
| "reason": "Applied best local-improvement candidate despite no immediate global gain.", | |
| "current_score": prev_metrics.get("score"), | |
| "metrics_before": prev_metrics, | |
| "metrics_after": current_metrics, | |
| "delta_score": round(float(current_metrics.get("score", 0)) - float(prev_metrics.get("score", 0)), 3), | |
| "chosen_candidate_index": best_local.get("candidate_index"), | |
| "chosen_chunk_goal_delta": best_local.get("chunk_goal_delta"), | |
| "chosen_bert_phrase_delta": best_local.get("bert_phrase_delta"), | |
| "chosen_candidate_utility": best_local.get("candidate_utility"), | |
| "chosen_metrics_delta": best_local.get("metrics_delta"), | |
| "candidates": [ | |
| { | |
| "candidate_index": c.get("candidate_index"), | |
| "valid": c.get("valid", False), | |
| "goal_improved": c.get("goal_improved", False), | |
| "bert_phrase_delta": c.get("bert_phrase_delta"), | |
| "local_chunk_improved": c.get("local_chunk_improved", False), | |
| "chunk_goal_delta": c.get("chunk_goal_delta"), | |
| "chunk_relevance_before": c.get("chunk_relevance_before"), | |
| "chunk_relevance_after": c.get("chunk_relevance_after"), | |
| "term_diff": c.get("term_diff"), | |
| "llm_prompt_debug": c.get("llm_prompt_debug"), | |
| "llm_rationale": c.get("llm_rationale"), | |
| "metrics_delta": c.get("metrics_delta"), | |
| "candidate_utility": c.get("candidate_utility"), | |
| "invalid_reasons": c.get("invalid_reasons", []), | |
| "delta_score": c.get("delta_score"), | |
| "candidate_score": c.get("candidate_score"), | |
| "sentence_after": c.get("sentence_after"), | |
| "error": c.get("error"), | |
| } | |
| for c in candidates | |
| ], | |
| } | |
| ) | |
| consecutive_failures = 0 | |
| cascade_level = 1 | |
| goal_attempt_cursor[goal_key] = base_attempt_cursor + 1 | |
| continue | |
| local_pool = [ | |
| c | |
| for c in candidates | |
| if c.get("local_chunk_improved") | |
| and c.get("edit_payload") | |
| and c.get("candidate_score") is not None | |
| and goal.get("type") != "title" | |
| ] | |
| local_pool.sort( | |
| key=lambda c: ( | |
| float(c.get("candidate_utility") or -999.0), | |
| float(c.get("chunk_goal_delta") or -999.0), | |
| float(c.get("candidate_score") or -999.0), | |
| ), | |
| reverse=True, | |
| ) | |
| for c in local_pool[:4]: | |
| queue_key = ( | |
| goal_key, | |
| c.get("operation"), | |
| c.get("span_start"), | |
| c.get("span_end"), | |
| str((c.get("sentence_after") or "")).strip().lower(), | |
| ) | |
| if not any(x.get("queue_key") == queue_key for x in queued_candidates): | |
| queued_candidates.append({"queue_key": queue_key, "candidate": c}) | |
| batch_applied = False | |
| batch_info: Dict[str, Any] = {} | |
| if len(queued_candidates) >= 2 and goal.get("type") != "title": | |
| pool = [x["candidate"] for x in queued_candidates[:6]] | |
| combos = _non_conflicting_edit_combos(pool, min_size=2, max_size=4) | |
| best_batch: Optional[Dict[str, Any]] = None | |
| prev_metrics = current_metrics | |
| for combo in combos: | |
| edits = [c.get("edit_payload") for c in combo if c.get("edit_payload")] | |
| if len(edits) != len(combo): | |
| continue | |
| batch_sentences = _apply_edits_to_sentences(sentences, edits) | |
| batch_text = " ".join(batch_sentences).strip() | |
| batch_analysis = _build_analysis_snapshot( | |
| batch_text, competitors, keywords, language, current_title, competitor_titles | |
| ) | |
| batch_semantic = _build_semantic_snapshot(batch_text, competitors, language) | |
| batch_metrics = _compute_metrics( | |
| batch_analysis, batch_semantic, keywords, language, bert_stage_target=bert_stage_target | |
| ) | |
| b_valid, b_reasons, b_goal = _is_candidate_valid( | |
| current_metrics, batch_metrics, goal["type"], goal["label"], optimization_mode | |
| ) | |
| b_delta = round(batch_metrics["score"] - current_metrics["score"], 3) | |
| local_sum = sum(float(c.get("chunk_goal_delta") or 0.0) for c in combo) | |
| if not (b_valid and (b_goal or b_delta > 0)): | |
| continue | |
| if goal.get("type") == "bert" and local_sum < (_min_chunk_delta("bert") * len(combo)): | |
| continue | |
| cand = { | |
| "combo": combo, | |
| "batch_text": batch_text, | |
| "batch_analysis": batch_analysis, | |
| "batch_semantic": batch_semantic, | |
| "batch_metrics": batch_metrics, | |
| "b_delta": b_delta, | |
| "local_sum": local_sum, | |
| "b_reasons": b_reasons, | |
| "b_goal": b_goal, | |
| } | |
| if best_batch is None or ( | |
| cand["b_delta"], | |
| cand["local_sum"], | |
| len(cand["combo"]), | |
| ) > ( | |
| best_batch["b_delta"], | |
| best_batch["local_sum"], | |
| len(best_batch["combo"]), | |
| ): | |
| best_batch = cand | |
| if best_batch: | |
| current_text = best_batch["batch_text"] | |
| current_analysis = best_batch["batch_analysis"] | |
| current_semantic = best_batch["batch_semantic"] | |
| current_metrics = best_batch["batch_metrics"] | |
| progressed_stage = _stage_primary_progress(active_stage, prev_metrics, current_metrics) | |
| if progressed_stage: | |
| stage_no_progress_steps = 0 | |
| else: | |
| stage_no_progress_steps += 1 | |
| applied_changes += 1 | |
| batch_applied = True | |
| batch_info = { | |
| "status": "applied_batch", | |
| "batch_candidate_ids": [c.get("candidate_index") for c in best_batch["combo"]], | |
| "batch_size": len(best_batch["combo"]), | |
| "batch_local_chunk_delta_sum": round(best_batch["local_sum"], 4), | |
| "delta_score": best_batch["b_delta"], | |
| "metrics_before": prev_metrics, | |
| "metrics_after": best_batch["batch_metrics"], | |
| "metrics_delta": _metrics_delta(prev_metrics, best_batch["batch_metrics"]), | |
| } | |
| queued_candidates = [] | |
| consecutive_failures = 0 | |
| cascade_level = 1 | |
| goal_attempt_cursor[goal_key] = 0 | |
| if not batch_applied: | |
| batch_info = { | |
| "status": "batch_rejected", | |
| "reason": "Queued local improvements could not pass global constraints together.", | |
| } | |
| if batch_applied: | |
| logs.append( | |
| { | |
| "step": step + 1, | |
| "status": "applied_batch", | |
| "stage": active_stage, | |
| "goal": goal, | |
| "cascade_level": cascade_level, | |
| "operation": "batch", | |
| "sentence_index": primary_span.get("sentence_index"), | |
| "span_start": primary_span.get("span_start"), | |
| "span_end": primary_span.get("span_end"), | |
| "span_variant": primary_span.get("span_variant"), | |
| "sentence_before": primary_span.get("sentence_before"), | |
| "current_score": (batch_info.get("metrics_before") or {}).get("score"), | |
| "metrics_before": batch_info.get("metrics_before"), | |
| "metrics_after": current_metrics, | |
| "reason": "Applied queued local-improvement edits as a batch.", | |
| "batch_info": batch_info, | |
| "candidates": [ | |
| { | |
| "candidate_index": c.get("candidate_index"), | |
| "valid": c.get("valid", False), | |
| "goal_improved": c.get("goal_improved", False), | |
| "bert_phrase_delta": c.get("bert_phrase_delta"), | |
| "local_chunk_improved": c.get("local_chunk_improved", False), | |
| "chunk_goal_delta": c.get("chunk_goal_delta"), | |
| "chunk_relevance_before": c.get("chunk_relevance_before"), | |
| "chunk_relevance_after": c.get("chunk_relevance_after"), | |
| "term_diff": c.get("term_diff"), | |
| "llm_prompt_debug": c.get("llm_prompt_debug"), | |
| "llm_rationale": c.get("llm_rationale"), | |
| "metrics_delta": c.get("metrics_delta"), | |
| "candidate_utility": c.get("candidate_utility"), | |
| "invalid_reasons": c.get("invalid_reasons", []), | |
| "delta_score": c.get("delta_score"), | |
| "candidate_score": c.get("candidate_score"), | |
| "sentence_after": c.get("sentence_after"), | |
| "error": c.get("error"), | |
| } | |
| for c in candidates | |
| ], | |
| } | |
| ) | |
| continue | |
| logs.append( | |
| { | |
| "step": step + 1, | |
| "status": "rejected", | |
| "stage": active_stage, | |
| "goal": goal, | |
| "cascade_level": cascade_level, | |
| "operation": primary_span.get("operation"), | |
| "sentence_index": primary_span.get("sentence_index"), | |
| "span_start": primary_span.get("span_start"), | |
| "span_end": primary_span.get("span_end"), | |
| "span_variant": primary_span.get("span_variant"), | |
| "sentence_before": primary_span.get("sentence_before"), | |
| "current_score": current_metrics["score"], | |
| "reason": ( | |
| "No valid candidate satisfied constraints." | |
| if not valid_raw_candidates | |
| else "Valid candidates existed but none improved goal or total score." | |
| ), | |
| "valid_candidates_count": len(valid_raw_candidates), | |
| "promotable_candidates_count": len(valid_candidates), | |
| "queued_local_candidates": len(local_pool), | |
| "queued_total": len(queued_candidates), | |
| "batch_info": batch_info if batch_info else None, | |
| "candidates": [ | |
| { | |
| "candidate_index": c.get("candidate_index"), | |
| "valid": c.get("valid", False), | |
| "goal_improved": c.get("goal_improved", False), | |
| "bert_phrase_delta": c.get("bert_phrase_delta"), | |
| "local_chunk_improved": c.get("local_chunk_improved", False), | |
| "chunk_goal_delta": c.get("chunk_goal_delta"), | |
| "chunk_relevance_before": c.get("chunk_relevance_before"), | |
| "chunk_relevance_after": c.get("chunk_relevance_after"), | |
| "term_diff": c.get("term_diff"), | |
| "llm_prompt_debug": c.get("llm_prompt_debug"), | |
| "llm_rationale": c.get("llm_rationale"), | |
| "metrics_delta": c.get("metrics_delta"), | |
| "candidate_utility": c.get("candidate_utility"), | |
| "invalid_reasons": c.get("invalid_reasons", []), | |
| "delta_score": c.get("delta_score"), | |
| "candidate_score": c.get("candidate_score"), | |
| "sentence_after": c.get("sentence_after"), | |
| "error": c.get("error"), | |
| } | |
| for c in candidates | |
| ], | |
| } | |
| ) | |
| stage_no_progress_steps += 1 | |
| # Stage transition is controlled by per-stage iteration budget and completion checks. | |
| consecutive_failures += 1 | |
| if consecutive_failures >= 2 and cascade_level < 4: | |
| cascade_level += 1 | |
| consecutive_failures = 0 | |
| logs[-1]["escalated_to_level"] = cascade_level | |
| continue | |
| best = sorted( | |
| valid_candidates, | |
| key=lambda c: ( | |
| 1 if c.get("goal_improved") else 0, | |
| float(c.get("candidate_utility") or -999.0), | |
| float(c.get("bert_phrase_delta") or 0.0), | |
| float(c.get("chunk_goal_delta") or 0.0), | |
| c["metrics"]["score"], | |
| ), | |
| reverse=True, | |
| )[0] | |
| prev_metrics = current_metrics | |
| current_text = best["text"] | |
| if str(best.get("operation") or "") == "title_rewrite": | |
| nt = (best.get("new_title") or best.get("sentence_after") or "").strip() | |
| if nt: | |
| current_title = nt | |
| current_analysis = best["analysis"] | |
| current_semantic = best["semantic"] | |
| current_metrics = best["metrics"] | |
| progressed_stage = _stage_primary_progress(active_stage, prev_metrics, current_metrics) | |
| if progressed_stage: | |
| stage_no_progress_steps = 0 | |
| else: | |
| stage_no_progress_steps += 1 | |
| applied_changes += 1 | |
| queued_candidates = [] | |
| logs.append( | |
| { | |
| "step": step + 1, | |
| "status": "applied", | |
| "stage": active_stage, | |
| "goal": goal, | |
| "cascade_level": cascade_level, | |
| "operation": best.get("operation"), | |
| "sentence_index": best.get("sentence_index"), | |
| "span_start": best.get("span_start"), | |
| "span_end": best.get("span_end"), | |
| "span_variant": best.get("span_variant"), | |
| "sentence_before": best.get("sentence_before"), | |
| "sentence_after": best["sentence_after"], | |
| "current_score": prev_metrics["score"], | |
| "metrics_before": prev_metrics, | |
| "metrics_after": current_metrics, | |
| "delta_score": round(current_metrics["score"] - prev_metrics["score"], 3), | |
| "chosen_candidate_index": best.get("candidate_index"), | |
| "chosen_candidate_utility": best.get("candidate_utility"), | |
| "candidates": [ | |
| { | |
| "candidate_index": c.get("candidate_index"), | |
| "valid": c.get("valid", False), | |
| "goal_improved": c.get("goal_improved", False), | |
| "bert_phrase_delta": c.get("bert_phrase_delta"), | |
| "local_chunk_improved": c.get("local_chunk_improved", False), | |
| "chunk_goal_delta": c.get("chunk_goal_delta"), | |
| "chunk_relevance_before": c.get("chunk_relevance_before"), | |
| "chunk_relevance_after": c.get("chunk_relevance_after"), | |
| "term_diff": c.get("term_diff"), | |
| "llm_prompt_debug": c.get("llm_prompt_debug"), | |
| "llm_rationale": c.get("llm_rationale"), | |
| "metrics_delta": c.get("metrics_delta"), | |
| "candidate_utility": c.get("candidate_utility"), | |
| "invalid_reasons": c.get("invalid_reasons", []), | |
| "delta_score": c.get("delta_score"), | |
| "candidate_score": c.get("candidate_score"), | |
| "sentence_after": c.get("sentence_after"), | |
| "error": c.get("error"), | |
| } | |
| for c in candidates | |
| ], | |
| } | |
| ) | |
| # After successful edit, return to cheapest level and reset failure streak. | |
| consecutive_failures = 0 | |
| cascade_level = 1 | |
| goal_attempt_cursor[goal_key] = 0 | |
| return _pack_result() | |