import json import re from itertools import combinations from typing import Any, Dict, List, Optional, Tuple import requests import logic import nlp_processor import semantic_graph STOP_WORDS = { "en": {"a", "an", "and", "or", "the", "to", "of", "for", "in", "on", "at", "by", "with", "from", "as", "is", "are", "be", "was", "were"}, "ru": {"и", "или", "в", "во", "на", "по", "с", "со", "к", "ко", "для", "из", "за", "что", "это", "как", "а", "но", "у", "о", "от"}, "de": {"und", "oder", "der", "die", "das", "zu", "von", "mit", "fur", "in", "auf", "ist", "sind"}, "es": {"y", "o", "el", "la", "los", "las", "de", "del", "en", "con", "para", "por", "es", "son"}, "it": {"e", "o", "il", "lo", "la", "i", "gli", "le", "di", "del", "in", "con", "per", "da", "e", "sono"}, "pl": {"i", "oraz", "lub", "w", "na", "z", "ze", "do", "od", "po", "dla", "to", "jest", "sa"}, "pt": {"e", "ou", "o", "a", "os", "as", "de", "do", "da", "em", "no", "na", "com", "para", "por", "e", "sao"}, } BERT_TARGET_THRESHOLD = 0.7 BERT_GOAL_DELTA_MIN = 0.005 SEMANTIC_GAP_TOLERANCE_PCT = 0.15 SEMANTIC_GAP_MIN_ABS = 3.0 def _tokenize(text: str) -> List[str]: return [ x for x in re.sub(r"[^\w\s-]+", " ", (text or "").lower(), flags=re.UNICODE).split() if len(x) >= 2 ] def _filter_stopwords(tokens: List[str], language: str) -> List[str]: stop = STOP_WORDS.get(language, STOP_WORDS["en"]) return [t for t in tokens if t not in stop] def _split_sentences(text: str) -> List[str]: text = (text or "").strip() if not text: return [] parts = re.split(r"(?<=[\.\!\?])\s+", text) parts = [p.strip() for p in parts if p.strip()] if len(parts) <= 1: parts = [p.strip() for p in re.split(r"\n+", text) if p.strip()] return parts def _max_sentences_for_level(cascade_level: int, operation: str) -> int: if operation == "insert": return 2 if cascade_level <= 1: return 2 if cascade_level == 2: return 3 return 4 def _validate_candidate_text(edited_text: str, cascade_level: int, operation: str) -> List[str]: reasons: List[str] = [] text = (edited_text or "").strip() if not text: reasons.append("empty_candidate") return reasons sentence_count = len(_split_sentences(text)) max_sent = _max_sentences_for_level(cascade_level, operation) if sentence_count > max_sent: reasons.append(f"too_many_sentences>{max_sent}") # Heuristic quality checks: duplicated words/entities and obvious malformed token joins. if re.search(r"\b([A-Za-z][A-Za-z0-9-]{1,})\s+\1\b", text, flags=re.IGNORECASE): reasons.append("duplicated_entity_or_word") # Catch broken lowercase+Camel join artifacts like "likemBit", but allow brand CamelCase like "RedDogCasino". if re.search(r"\b[a-z]{6,}[A-Z][a-z]+\b", text): reasons.append("suspicious_token_join") return reasons def _build_analysis_snapshot( target_text: str, competitors: List[str], keywords: List[str], language: str, target_title: str, competitor_titles: List[str], ) -> Dict[str, Any]: wc_target = logic.count_words(target_text, language) wc_comp = [logic.count_words(t, language) for t in competitors] if wc_comp: avg_total = sum(c["total"] for c in wc_comp) / len(wc_comp) avg_sig = sum(c["significant"] for c in wc_comp) / len(wc_comp) else: avg_total = 0 avg_sig = 0 ngram_stats = logic.calculate_ngram_stats(target_text, competitors, language) key_phrases, _ = logic.parse_keywords(keywords, language) bm25 = logic.calculate_bm25_recommendations(target_text, competitors, keywords, language) bert = logic.perform_bert_analysis(target_text, competitors, key_phrases, language) title_data = {} if (target_title or "").strip(): title_data = logic.analyze_title(target_title, competitor_titles, keywords, language) return { "word_counts": { "target": wc_target, "competitors": wc_comp, "avg": {"total": round(avg_total), "significant": round(avg_sig)}, }, "ngram_stats": ngram_stats, "bm25_recommendations": bm25, "bert_analysis": bert, "title_analysis": title_data, } def _build_semantic_snapshot( target_text: str, competitors: List[str], language: str, ) -> Dict[str, Any]: def _build_doc(text: str, doc_id: int) -> Dict[str, Any]: sentences_data = nlp_processor.preprocess_text(text, language) graph, word_weights = semantic_graph.build_semantic_graph(sentences_data, lang=language) graph_data = semantic_graph.get_graph_data_for_frontend(graph) return { "id": doc_id, "text": text, "word_weights": word_weights, "stats": { "nodes": len(graph_data.get("nodes", [])), "links": len(graph_data.get("links", [])), }, } target_doc = _build_doc(target_text, 0) comp_docs = [] for idx, c in enumerate([x for x in competitors if (x or "").strip()]): comp_docs.append(_build_doc(c, idx + 1)) num_comp = len(comp_docs) target_weights = target_doc["word_weights"] all_terms = set(target_weights.keys()) for c in comp_docs: all_terms.update(c["word_weights"].keys()) term_power_table = [] for term in all_terms: target_weight = int(target_weights.get(term, 0)) comp_weights = [int(c["word_weights"].get(term, 0)) for c in comp_docs] comp_avg = round(sum(comp_weights) / max(1, num_comp), 2) comp_occ = sum(1 for w in comp_weights if w > 0) term_power_table.append( { "term": term, "target_weight": target_weight, "competitor_avg_weight": comp_avg, "comp_occurrence": comp_occ, "comp_total": num_comp, } ) return {"comparison": {"term_power_table": term_power_table, "num_competitors": num_comp}} def _is_semantic_gap(target_weight: float, competitor_avg_weight: float) -> bool: # Gap is significant only when competitor is meaningfully above target: # - relative margin above tolerance band (15%) # - and absolute margin to avoid micro-noise around normalized weights if competitor_avg_weight <= 0: return False abs_gap = competitor_avg_weight - target_weight rel_threshold = target_weight * (1.0 + SEMANTIC_GAP_TOLERANCE_PCT) return (competitor_avg_weight > rel_threshold) and (abs_gap >= SEMANTIC_GAP_MIN_ABS) def _compute_metrics(analysis: Dict[str, Any], semantic: Dict[str, Any], keywords: List[str], language: str) -> Dict[str, Any]: competitor_count = len(analysis.get("word_counts", {}).get("competitors", [])) min_signal = 1 if competitor_count <= 1 else 2 bert_details = analysis.get("bert_analysis", {}).get("detailed", []) or [] bert_low = [d for d in bert_details if float(d.get("my_max_score", 0)) < BERT_TARGET_THRESHOLD] bert_phrase_scores = { str(d.get("phrase", "")).strip().lower(): float(d.get("my_max_score", 0) or 0.0) for d in bert_details if str(d.get("phrase", "")).strip() } bm25_remove = [x for x in (analysis.get("bm25_recommendations") or []) if x.get("action") == "remove"] bm25_remove_count = len(bm25_remove) ngram_signal_count = 0 ngrams = analysis.get("ngram_stats", {}) or {} for bucket_name in ("unigrams", "bigrams"): for item in (ngrams.get(bucket_name) or []): comp_occ = int(item.get("comp_occurrence", 0)) if comp_occ < min_signal: continue target = float(item.get("target_count", 0)) comp_avg = float(item.get("competitor_avg", 0)) ratio_signal = comp_avg > 0 if target == 0 else comp_avg >= target * 2 if ratio_signal: ngram_signal_count += 1 title_score = None title_bert = analysis.get("title_analysis", {}).get("bert", {}) if title_bert and title_bert.get("target_score") is not None: title_score = float(title_bert.get("target_score", 0)) keyword_terms = set() for kw in keywords: tokens = _filter_stopwords(_tokenize(kw), language) for t in tokens: keyword_terms.add(t) for n in (2, 3): for i in range(0, max(0, len(tokens) - n + 1)): keyword_terms.add(" ".join(tokens[i : i + n])) table = semantic.get("comparison", {}).get("term_power_table", []) or [] by_term = {str(r.get("term", "")).lower(): r for r in table} semantic_gap_count = 0 semantic_gap_sum = 0.0 semantic_gap_terms: List[Dict[str, Any]] = [] for term in keyword_terms: row = by_term.get(term) if not row: continue target_w = float(row.get("target_weight", 0)) comp_w = float(row.get("competitor_avg_weight", 0)) gap = comp_w - target_w if int(row.get("comp_occurrence", 0)) >= min_signal and _is_semantic_gap(target_w, comp_w): semantic_gap_count += 1 semantic_gap_sum += gap base = max(1.0, target_w) semantic_gap_terms.append( { "term": term, "target_weight": round(target_w, 2), "competitor_avg_weight": round(comp_w, 2), "gap": round(gap, 2), "gap_pct_of_target": round(gap / base, 4), "comp_occurrence": int(row.get("comp_occurrence", 0)), "comp_total": int(row.get("comp_total", 0)), } ) # Composite score (0..100) w_bert, w_bm25, w_ng, w_title, w_sem = 30, 20, 15, 10, 25 bert_comp = 1.0 - (len(bert_low) / max(1, len(bert_details))) bm25_comp = 1.0 if bm25_remove_count <= 3 else max(0.0, 1.0 - ((bm25_remove_count - 3) / 10.0)) ng_comp = max(0.0, 1.0 - (ngram_signal_count / 15.0)) title_comp = 1.0 if title_score is None else min(1.0, max(0.0, title_score / 0.65)) sem_comp = max(0.0, 1.0 - (semantic_gap_count / 20.0)) weighted = ( w_bert * bert_comp + w_bm25 * bm25_comp + w_ng * ng_comp + w_title * title_comp + w_sem * sem_comp ) total_w = w_bert + w_bm25 + w_ng + w_title + w_sem score = round((weighted / total_w) * 100.0, 2) return { "score": score, "competitor_count": competitor_count, "min_competitor_signal": min_signal, "bert_low_count": len(bert_low), "bert_total_keywords": len(bert_details), "bert_phrase_scores": bert_phrase_scores, "bm25_remove_count": bm25_remove_count, "ngram_signal_count": ngram_signal_count, "title_bert_score": title_score, "semantic_gap_count": semantic_gap_count, "semantic_gap_sum": round(semantic_gap_sum, 4), "semantic_gap_terms": sorted( semantic_gap_terms, key=lambda x: (x.get("gap", 0), x.get("gap_pct_of_target", 0)), reverse=True, )[:20], } def _choose_optimization_goal(analysis: Dict[str, Any], semantic: Dict[str, Any], keywords: List[str], language: str) -> Dict[str, Any]: bert_details = analysis.get("bert_analysis", {}).get("detailed", []) or [] low_bert = [x for x in bert_details if float(x.get("my_max_score", 0)) < BERT_TARGET_THRESHOLD] if low_bert: worst = sorted(low_bert, key=lambda x: float(x.get("my_max_score", 0)))[0] focus_terms = _filter_stopwords(_tokenize(worst.get("phrase", "")), language)[:4] return {"type": "bert", "label": str(worst.get("phrase", "")), "focus_terms": focus_terms, "avoid_terms": []} bm25_remove = [x for x in (analysis.get("bm25_recommendations") or []) if x.get("action") == "remove"] if len(bm25_remove) >= 4: spam_terms = [str(x.get("word", "")) for x in sorted(bm25_remove, key=lambda r: int(r.get("count", 0)), reverse=True)[:4]] return {"type": "bm25", "label": "reduce spam", "focus_terms": [], "avoid_terms": spam_terms} # Semantic keyword gaps lang_stop = STOP_WORDS.get(language, STOP_WORDS["en"]) keyword_terms = set() for kw in keywords: toks = [t for t in _tokenize(kw) if t not in lang_stop] keyword_terms.update(toks) for n in (2, 3): for i in range(0, max(0, len(toks) - n + 1)): keyword_terms.add(" ".join(toks[i : i + n])) table = semantic.get("comparison", {}).get("term_power_table", []) or [] candidate_rows: List[Tuple[str, float]] = [] for row in table: term = str(row.get("term", "")).lower() if term not in keyword_terms: continue target_w = float(row.get("target_weight", 0)) comp_w = float(row.get("competitor_avg_weight", 0)) gap = comp_w - target_w if _is_semantic_gap(target_w, comp_w): candidate_rows.append((term, gap)) if candidate_rows: top_term = sorted(candidate_rows, key=lambda x: x[1], reverse=True)[0][0] return {"type": "semantic", "label": top_term, "focus_terms": [top_term], "avoid_terms": []} # Fallback: ngram add signal for bucket_name in ("unigrams", "bigrams"): bucket = analysis.get("ngram_stats", {}).get(bucket_name, []) or [] for item in bucket: target = float(item.get("target_count", 0)) comp_avg = float(item.get("competitor_avg", 0)) if (target == 0 and comp_avg > 0) or (target > 0 and comp_avg >= target * 2): return {"type": "ngram", "label": str(item.get("ngram", "")), "focus_terms": _tokenize(str(item.get("ngram", "")))[:3], "avoid_terms": []} return {"type": "none", "label": "no-op", "focus_terms": [], "avoid_terms": []} def _choose_sentence_idx(sentences: List[str], focus_terms: List[str], avoid_terms: List[str], language: str) -> int: if not sentences: return 0 stop = STOP_WORDS.get(language, STOP_WORDS["en"]) focus = [x for x in focus_terms if x and x not in stop] if avoid_terms: best_idx, best_score = 0, -1.0 for i, s in enumerate(sentences): lower = s.lower() score = sum(lower.count(t.lower()) for t in avoid_terms if t) if score > best_score: best_idx, best_score = i, score return best_idx if focus: best_idx, best_score = 0, -1.0 for i, s in enumerate(sentences): lower = s.lower() score = sum(lower.count(t.lower()) for t in focus) if score > best_score: best_idx, best_score = i, score return best_idx return min(2, len(sentences) - 1) def _rank_sentence_indices( sentences: List[str], focus_terms: List[str], avoid_terms: List[str], language: str, goal_type: str = "", goal_label: str = "", ) -> List[int]: if not sentences: return [0] stop = STOP_WORDS.get(language, STOP_WORDS["en"]) focus = [x for x in focus_terms if x and x not in stop] avoid = [x for x in avoid_terms if x] center = (len(sentences) - 1) / 2.0 # For BERT optimization prefer natural prose chunks over list/menu/noisy blocks. candidate_indices = list(range(len(sentences))) if goal_type == "bert": non_noise = [i for i, s in enumerate(sentences) if not _is_noise_like_sentence(s)] if non_noise: candidate_indices = non_noise scored: List[Tuple[int, float, int]] = [] for idx in candidate_indices: s = sentences[idx] lower = s.lower() focus_score = sum(lower.count(t.lower()) for t in focus) avoid_score = sum(lower.count(t.lower()) for t in avoid) chunk_rel = _chunk_goal_relevance(s, goal_type, goal_label, focus_terms, language) noise_penalty = 1.0 if _is_noise_like_sentence(s) else 0.0 # Prefer semantically relevant and lexical matches; push noisy headers/CTA lower. score = (chunk_rel * 4.0) + (focus_score * 3.0) + (avoid_score * 2.0) - (noise_penalty * 3.0) - (abs(idx - center) * 0.05) scored.append((idx, score, len(s))) scored.sort(key=lambda x: (x[1], -x[2]), reverse=True) ordered = [idx for idx, _, _ in scored] if not ordered: ordered = list(range(len(sentences))) return ordered def _span_variants_for_level(cascade_level: int) -> List[Tuple[str, int, int]]: # (operation, left_radius, right_radius) if cascade_level <= 1: return [("rewrite", 0, 0), ("rewrite", 0, 1), ("rewrite", 1, 0)] if cascade_level == 2: return [("rewrite", 1, 1), ("rewrite", 0, 2), ("rewrite", 2, 0), ("rewrite", 1, 2), ("rewrite", 2, 1)] if cascade_level == 3: return [("insert", 0, 0), ("insert", 0, 0), ("insert", 0, 0), ("rewrite", 1, 1)] return [("rewrite", 2, 2), ("rewrite", 1, 3), ("rewrite", 3, 1), ("rewrite", 2, 3), ("rewrite", 3, 2)] def _choose_edit_span( sentences: List[str], goal: Dict[str, Any], language: str, cascade_level: int, attempt_cursor: int, ) -> Tuple[str, int, int, int, int]: ranked = _rank_sentence_indices( sentences, goal.get("focus_terms", []) or [], goal.get("avoid_terms", []) or [], language, str(goal.get("type", "") or ""), str(goal.get("label", "") or ""), ) variants = _span_variants_for_level(cascade_level) total = max(1, len(ranked) * len(variants)) pick = attempt_cursor % total sent_pick = pick % len(ranked) variant_pick = (pick // len(ranked)) % len(variants) sent_idx = ranked[sent_pick] operation, left_radius, right_radius = variants[variant_pick] if operation == "insert": span_start = sent_idx span_end = sent_idx else: span_start = max(0, sent_idx - left_radius) span_end = min(len(sentences) - 1, sent_idx + right_radius) return operation, span_start, span_end, sent_idx, variant_pick def _is_noise_like_sentence(text: str) -> bool: s = (text or "").strip() if not s: return True lower = s.lower() tokens = _tokenize(lower) if len(tokens) <= 2: return True if len(tokens) <= 8 and re.search(r"\b(play|explore|best|top|contact|login|signup|casino)\b", lower): return True if len(s) <= 90 and re.fullmatch(r"[A-Z0-9\s\-\|\:\.]+", s): return True if re.search(r"\b(best|top)\b.{0,20}\b(alternative|alternatives|casino|casinos)\b", lower) and len(tokens) <= 12: return True return False def _chunk_goal_relevance( text: str, goal_type: str, goal_label: str, focus_terms: List[str], language: str, ) -> float: chunk = (text or "").strip() if not chunk: return 0.0 if goal_type == "bert" and (goal_label or "").strip(): try: model = logic.get_bert_model() embeddings = model.encode([goal_label.strip(), chunk], convert_to_tensor=True) return float(logic.util.cos_sim(embeddings[0:1], embeddings[1:2])[0][0].item()) except Exception: pass # Lexical fallback for non-BERT goals or if embedding scoring is unavailable. toks = _filter_stopwords(_tokenize(chunk), language) if not toks: return 0.0 focus = [f.lower() for f in (focus_terms or []) if f] if not focus: return 0.0 overlap = 0.0 token_str = " ".join(toks) for f in focus: if " " in f: overlap += 1.0 if f in token_str else 0.0 else: overlap += 1.0 if f in toks else 0.0 return overlap / max(1.0, float(len(focus))) def _chunk_goal_delta( before_text: str, after_text: str, goal_type: str, goal_label: str, focus_terms: List[str], language: str, ) -> float: before_rel = _chunk_goal_relevance(before_text, goal_type, goal_label, focus_terms, language) after_rel = _chunk_goal_relevance(after_text, goal_type, goal_label, focus_terms, language) return round(after_rel - before_rel, 4) def _min_chunk_delta(goal_type: str) -> float: if goal_type == "bert": return 0.01 return 0.05 def _chunk_relevance_pair( before_text: str, after_text: str, goal_type: str, goal_label: str, focus_terms: List[str], language: str, ) -> Tuple[float, float]: before_rel = _chunk_goal_relevance(before_text, goal_type, goal_label, focus_terms, language) after_rel = _chunk_goal_relevance(after_text, goal_type, goal_label, focus_terms, language) return round(before_rel, 4), round(after_rel, 4) def _term_diff(before_text: str, after_text: str, language: str) -> Dict[str, List[str]]: before_tokens = _filter_stopwords(_tokenize(before_text), language) after_tokens = _filter_stopwords(_tokenize(after_text), language) before_set = set(before_tokens) after_set = set(after_tokens) removed = sorted(list(before_set - after_set))[:12] added = sorted(list(after_set - before_set))[:12] return {"added_terms": added, "removed_terms": removed} def _edits_conflict(a: Dict[str, Any], b: Dict[str, Any]) -> bool: if a.get("operation") == "insert" or b.get("operation") == "insert": return int(a.get("span_start", 0)) == int(b.get("span_start", 0)) a0, a1 = int(a.get("span_start", 0)), int(a.get("span_end", 0)) b0, b1 = int(b.get("span_start", 0)), int(b.get("span_end", 0)) return not (a1 < b0 or b1 < a0) def _apply_edits_to_sentences(sentences: List[str], edits: List[Dict[str, Any]]) -> List[str]: updated = list(sentences) ordered = sorted( edits, key=lambda e: (int(e.get("span_start", 0)), int(e.get("span_end", 0))), reverse=True, ) for edit in ordered: op = str(edit.get("operation", "rewrite")) start = int(edit.get("span_start", 0)) end = int(edit.get("span_end", start)) text = str(edit.get("edited_text", "")).strip() if not text: continue if op == "insert": updated = _insert_after(updated, end, text) else: updated = _replace_span(updated, start, end, text) return updated def _metrics_delta(prev_metrics: Dict[str, Any], next_metrics: Dict[str, Any]) -> Dict[str, Any]: keys = [ "score", "bert_low_count", "bm25_remove_count", "ngram_signal_count", "semantic_gap_count", "semantic_gap_sum", ] out: Dict[str, Any] = {} for k in keys: pv = prev_metrics.get(k) nv = next_metrics.get(k) if pv is None or nv is None: continue out[k] = round(float(nv) - float(pv), 4) pv_t = prev_metrics.get("title_bert_score") nv_t = next_metrics.get("title_bert_score") if pv_t is not None and nv_t is not None: out["title_bert_score"] = round(float(nv_t) - float(pv_t), 4) prev_terms = {str(x.get("term", "")).lower() for x in (prev_metrics.get("semantic_gap_terms") or []) if x.get("term")} next_terms = {str(x.get("term", "")).lower() for x in (next_metrics.get("semantic_gap_terms") or []) if x.get("term")} if prev_terms or next_terms: out["semantic_gap_terms_added"] = sorted(list(next_terms - prev_terms))[:12] out["semantic_gap_terms_removed"] = sorted(list(prev_terms - next_terms))[:12] return out def _non_conflicting_edit_combos(candidates: List[Dict[str, Any]], min_size: int = 2, max_size: int = 4) -> List[List[Dict[str, Any]]]: if not candidates: return [] n = len(candidates) combos: List[List[Dict[str, Any]]] = [] for r in range(max(2, min_size), min(max_size, n) + 1): for idxs in combinations(range(n), r): combo = [candidates[i] for i in idxs] conflict = False for i in range(len(combo)): for j in range(i + 1, len(combo)): e1 = combo[i].get("edit_payload") e2 = combo[j].get("edit_payload") if not e1 or not e2: conflict = True break if _edits_conflict(e1, e2): conflict = True break if conflict: break if not conflict: combos.append(combo) return combos def _extract_json_object(text: str) -> Optional[Dict[str, Any]]: raw = (text or "").strip() if not raw: return None try: return json.loads(raw) except Exception: pass m = re.search(r"\{[\s\S]*\}", raw) if not m: return None try: return json.loads(m.group(0)) except Exception: return None def _llm_edit_chunk( *, api_key: str, base_url: str, model: str, language: str, full_text: str, chunk_text: str, operation: str, context_before: str, context_after: str, cascade_level: int, goal_type: str, goal_label: str, focus_terms: List[str], avoid_terms: List[str], temperature: float, ) -> Dict[str, Any]: endpoint = base_url.rstrip("/") + "/chat/completions" op = operation if operation in {"rewrite", "insert"} else "rewrite" system_msg = ( "You are an SEO copy editor. Work locally, preserve narrative flow, factual tone, and language. " "Return strict JSON only: {\"edited_text\": \"...\"}. " "Do not rewrite the whole text. Never change topic or introduce unrelated entities." ) op_instruction = ( "Rewrite the provided chunk only." if op == "rewrite" else "Create a short bridge chunk (1-2 sentences) to insert after the chunk." ) max_sent = _max_sentences_for_level(cascade_level, op) user_msg = ( f"Language: {language}\n" f"Operation: {op}\n" f"Cascade level: L{cascade_level}\n" f"Goal: {goal_type} ({goal_label})\n" f"Instruction: {op_instruction}\n" f"Must preserve overall narrative and style.\n" "Text must be grammatically correct and natural for native readers.\n" "Keep edits tightly local to the provided chunk and immediate context only.\n" "Edit must be substantive (not just synonyms) and should increase relevance to the goal phrase.\n" f"Focus terms to strengthen: {', '.join(focus_terms) if focus_terms else '-'}\n" f"Terms to de-emphasize/avoid overuse: {', '.join(avoid_terms) if avoid_terms else '-'}\n\n" f"Chunk to edit/expand:\n{chunk_text}\n\n" f"Prev context:\n{context_before}\n\n" f"Next context:\n{context_after}\n\n" "Constraints:\n" "1) Keep text concise and locally coherent.\n" "2) Keep local coherence with surrounding text.\n" f"3) Max {max_sent} sentence(s) in edited_text.\n" "4) Keep key named entities from the original chunk unchanged when possible.\n" "5) For BERT goal, improve semantic match to goal phrase without keyword stuffing.\n" "6) Only output JSON object." ) payload = { "model": model, "temperature": float(max(0.0, min(1.2, temperature))), "messages": [ {"role": "system", "content": system_msg}, {"role": "user", "content": user_msg}, ], "response_format": {"type": "json_object"}, } headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} response = requests.post(endpoint, headers=headers, json=payload, timeout=60) response.raise_for_status() data = response.json() content = ( data.get("choices", [{}])[0] .get("message", {}) .get("content", "") ) parsed = _extract_json_object(content) edited = "" if parsed: edited = str(parsed.get("edited_text") or parsed.get("revised_sentence") or parsed.get("rewrite") or "").strip() if not edited: raise ValueError("LLM returned invalid JSON edit payload.") return { "edited_text": edited, "prompt_debug": { "operation": op, "cascade_level": cascade_level, "goal_type": goal_type, "goal_label": goal_label, "focus_terms": focus_terms, "avoid_terms": avoid_terms, "max_sentences": max_sent, "chunk_text": chunk_text, "context_before": context_before, "context_after": context_after, "temperature": float(max(0.0, min(1.2, temperature))), }, } def _replace_span(sentences: List[str], start_idx: int, end_idx: int, replacement_text: str) -> List[str]: replacement = _split_sentences(replacement_text) if not replacement: replacement = [replacement_text.strip()] return sentences[:start_idx] + replacement + sentences[end_idx + 1 :] def _insert_after(sentences: List[str], after_idx: int, inserted_text: str) -> List[str]: insertion = _split_sentences(inserted_text) if not insertion: insertion = [inserted_text.strip()] return sentences[: after_idx + 1] + insertion + sentences[after_idx + 1 :] def _goal_improved( goal_type: str, goal_label: str, prev_metrics: Dict[str, Any], next_metrics: Dict[str, Any], ) -> bool: if goal_type == "bert": key = (goal_label or "").strip().lower() prev_phrase = float((prev_metrics.get("bert_phrase_scores") or {}).get(key, 0.0)) next_phrase = float((next_metrics.get("bert_phrase_scores") or {}).get(key, 0.0)) # Accept smaller but real phrase-level gains to accumulate progress toward threshold (e.g. 0.51 -> 0.70). return (next_phrase - prev_phrase) >= BERT_GOAL_DELTA_MIN or next_metrics["bert_low_count"] < prev_metrics["bert_low_count"] if goal_type == "bm25": return next_metrics["bm25_remove_count"] < prev_metrics["bm25_remove_count"] if goal_type == "semantic": return next_metrics["semantic_gap_count"] < prev_metrics["semantic_gap_count"] if goal_type == "ngram": return next_metrics["ngram_signal_count"] < prev_metrics["ngram_signal_count"] return next_metrics["score"] > prev_metrics["score"] def _bert_phrase_delta(goal_label: str, prev_metrics: Dict[str, Any], next_metrics: Dict[str, Any]) -> float: key = (goal_label or "").strip().lower() prev_phrase = float((prev_metrics.get("bert_phrase_scores") or {}).get(key, 0.0)) next_phrase = float((next_metrics.get("bert_phrase_scores") or {}).get(key, 0.0)) return round(next_phrase - prev_phrase, 4) def _is_candidate_valid( prev_metrics: Dict[str, Any], next_metrics: Dict[str, Any], goal_type: str, goal_label: str, optimization_mode: str, ) -> Tuple[bool, List[str], bool]: mode = (optimization_mode or "balanced").lower() if mode not in {"conservative", "balanced", "aggressive"}: mode = "balanced" cfg = { "conservative": {"max_score_drop": 0.0, "max_title_drop": 0.02}, "balanced": {"max_score_drop": 1.0, "max_title_drop": 0.03}, "aggressive": {"max_score_drop": 2.0, "max_title_drop": 0.05}, }[mode] reasons = [] score_drop = float(prev_metrics["score"]) - float(next_metrics["score"]) if score_drop > cfg["max_score_drop"]: reasons.append(f"score_drop>{cfg['max_score_drop']}") # Hard regressions in critical counters. if next_metrics["bm25_remove_count"] > prev_metrics["bm25_remove_count"] + (1 if mode == "aggressive" else 0): reasons.append("bm25_remove_regression") if next_metrics["bert_low_count"] > prev_metrics["bert_low_count"] + (1 if mode == "aggressive" else 0): reasons.append("bert_low_regression") if next_metrics["semantic_gap_count"] > prev_metrics["semantic_gap_count"] + (1 if mode == "aggressive" else 0): reasons.append("semantic_gap_regression") prev_title = prev_metrics.get("title_bert_score") next_title = next_metrics.get("title_bert_score") if prev_title is not None and next_title is not None and next_title < (prev_title - cfg["max_title_drop"]): reasons.append("title_bert_drop") improved = _goal_improved(goal_type, goal_label, prev_metrics, next_metrics) # In conservative mode require explicit goal improvement. if mode == "conservative" and not improved: reasons.append("goal_not_improved") return (len(reasons) == 0), reasons, improved def optimize_text(request_data: Dict[str, Any]) -> Dict[str, Any]: target_text = str(request_data.get("target_text", "")).strip() competitors = [str(x) for x in (request_data.get("competitors") or []) if str(x).strip()] keywords = [str(x) for x in (request_data.get("keywords") or []) if str(x).strip()] language = str(request_data.get("language", "en")).strip() or "en" target_title = str(request_data.get("target_title", "") or "") competitor_titles = [str(x) for x in (request_data.get("competitor_titles") or [])] api_key = str(request_data.get("api_key", "")).strip() if not api_key: raise ValueError("API key is required.") base_url = str(request_data.get("api_base_url", "https://api.deepseek.com/v1")).strip() or "https://api.deepseek.com/v1" model = str(request_data.get("model", "deepseek-chat")).strip() or "deepseek-chat" max_iterations = int(request_data.get("max_iterations", 2) or 2) max_iterations = max(1, min(8, max_iterations)) candidates_per_iteration = int(request_data.get("candidates_per_iteration", 2) or 2) candidates_per_iteration = max(1, min(5, candidates_per_iteration)) temperature = float(request_data.get("temperature", 0.25) or 0.25) optimization_mode = str(request_data.get("optimization_mode", "balanced") or "balanced") baseline_analysis = _build_analysis_snapshot( target_text, competitors, keywords, language, target_title, competitor_titles ) baseline_semantic = _build_semantic_snapshot(target_text, competitors, language) baseline_metrics = _compute_metrics(baseline_analysis, baseline_semantic, keywords, language) current_text = target_text current_analysis = baseline_analysis current_semantic = baseline_semantic current_metrics = baseline_metrics logs: List[Dict[str, Any]] = [] applied_changes = 0 seen_candidate_rewrites = set() cascade_level = 1 consecutive_failures = 0 goal_attempt_cursor: Dict[str, int] = {} attempted_spans = set() queued_candidates: List[Dict[str, Any]] = [] for step in range(max_iterations): goal = _choose_optimization_goal(current_analysis, current_semantic, keywords, language) if goal["type"] == "none": logs.append({"step": step + 1, "status": "stopped", "reason": "No optimization goals left."}) break sentences = _split_sentences(current_text) if not sentences: logs.append({"step": step + 1, "status": "stopped", "reason": "No sentences available for editing."}) break goal_key = f"{goal.get('type', '')}:{goal.get('label', '')}".strip().lower() base_attempt_cursor = int(goal_attempt_cursor.get(goal_key, 0)) span_trials = 2 if cascade_level <= 2 else 3 local_candidates = candidates_per_iteration if cascade_level <= 2 else min(6, candidates_per_iteration + 1) candidates: List[Dict[str, Any]] = [] chosen_spans: List[Dict[str, Any]] = [] candidate_idx = 0 for st in range(span_trials): attempt_cursor = base_attempt_cursor + st operation, span_start, span_end, sent_idx, span_variant = _choose_edit_span( sentences, goal, language, cascade_level, attempt_cursor ) max_span_retries = max(1, len(sentences) * 4) retries = 0 while retries < max_span_retries: span_key = (goal_key, cascade_level, operation, span_start, span_end) if span_key not in attempted_spans: attempted_spans.add(span_key) break attempt_cursor += 1 operation, span_start, span_end, sent_idx, span_variant = _choose_edit_span( sentences, goal, language, cascade_level, attempt_cursor ) retries += 1 original_span_text = " ".join(sentences[span_start : span_end + 1]).strip() context_before = " ".join(sentences[max(0, span_start - 2) : span_start]).strip() context_after = " ".join(sentences[span_end + 1 : min(len(sentences), span_end + 3)]).strip() chosen_spans.append( { "operation": operation, "span_start": span_start, "span_end": span_end, "sentence_index": sent_idx, "span_variant": span_variant, "sentence_before": original_span_text, } ) per_span_candidates = max(1, local_candidates // span_trials) for ci in range(per_span_candidates): candidate_idx += 1 temp = min(1.1, max(0.0, temperature + (candidate_idx - 1) * 0.07)) try: llm_result = _llm_edit_chunk( api_key=api_key, base_url=base_url, model=model, language=language, full_text=current_text, chunk_text=original_span_text, operation=operation, context_before=context_before, context_after=context_after, cascade_level=cascade_level, goal_type=goal["type"], goal_label=goal["label"], focus_terms=goal["focus_terms"], avoid_terms=goal["avoid_terms"], temperature=temp, ) edited_text = str((llm_result or {}).get("edited_text", "")).strip() prompt_debug = (llm_result or {}).get("prompt_debug", {}) if not edited_text or edited_text == original_span_text: continue quality_issues = _validate_candidate_text(edited_text, cascade_level, operation) before_rel, after_rel = _chunk_relevance_pair( original_span_text, edited_text, goal["type"], goal["label"], goal.get("focus_terms", []) or [], language, ) chunk_delta = _chunk_goal_delta( original_span_text, edited_text, goal["type"], goal["label"], goal.get("focus_terms", []) or [], language, ) local_chunk_improved = chunk_delta >= _min_chunk_delta(goal["type"]) if quality_issues: candidates.append( { "candidate_index": candidate_idx, "error": "quality_validation_failed", "valid": False, "goal_improved": False, "local_chunk_improved": local_chunk_improved, "chunk_goal_delta": chunk_delta, "invalid_reasons": quality_issues, "delta_score": -999.0, "candidate_score": None, "sentence_after": edited_text, "chunk_relevance_before": before_rel, "chunk_relevance_after": after_rel, "term_diff": _term_diff(original_span_text, edited_text, language), "llm_prompt_debug": prompt_debug, "operation": operation, "sentence_index": sent_idx, "span_start": span_start, "span_end": span_end, "span_variant": span_variant, "sentence_before": original_span_text, } ) continue candidate_key = (operation, span_start, span_end, edited_text.strip().lower()) if candidate_key in seen_candidate_rewrites: candidates.append( { "candidate_index": candidate_idx, "error": "duplicate_candidate_rewrite", "valid": False, "goal_improved": False, "local_chunk_improved": local_chunk_improved, "chunk_goal_delta": chunk_delta, "invalid_reasons": ["duplicate_candidate_rewrite"], "delta_score": -999.0, "candidate_score": None, "chunk_relevance_before": before_rel, "chunk_relevance_after": after_rel, "term_diff": _term_diff(original_span_text, edited_text, language), "llm_prompt_debug": prompt_debug, "operation": operation, "sentence_index": sent_idx, "span_start": span_start, "span_end": span_end, "span_variant": span_variant, "sentence_before": original_span_text, } ) continue seen_candidate_rewrites.add(candidate_key) if operation == "insert": candidate_sentences = _insert_after(sentences, span_end, edited_text) else: candidate_sentences = _replace_span(sentences, span_start, span_end, edited_text) candidate_text = " ".join(candidate_sentences).strip() cand_analysis = _build_analysis_snapshot( candidate_text, competitors, keywords, language, target_title, competitor_titles ) cand_semantic = _build_semantic_snapshot(candidate_text, competitors, language) cand_metrics = _compute_metrics(cand_analysis, cand_semantic, keywords, language) valid, invalid_reasons, goal_improved = _is_candidate_valid( current_metrics, cand_metrics, goal["type"], goal["label"], optimization_mode ) delta_score = round(cand_metrics["score"] - current_metrics["score"], 3) bert_phrase_delta = _bert_phrase_delta(goal["label"], current_metrics, cand_metrics) if goal.get("type") == "bert" else 0.0 md = _metrics_delta(current_metrics, cand_metrics) candidates.append( { "candidate_index": candidate_idx, "sentence_before": original_span_text, "sentence_after": edited_text, "operation": operation, "sentence_index": sent_idx, "span_start": span_start, "span_end": span_end, "span_variant": span_variant, "text": candidate_text, "analysis": cand_analysis, "semantic": cand_semantic, "metrics": cand_metrics, "valid": valid, "goal_improved": goal_improved, "bert_phrase_delta": bert_phrase_delta, "local_chunk_improved": local_chunk_improved, "chunk_goal_delta": chunk_delta, "chunk_relevance_before": before_rel, "chunk_relevance_after": after_rel, "term_diff": _term_diff(original_span_text, edited_text, language), "llm_prompt_debug": prompt_debug, "invalid_reasons": invalid_reasons, "delta_score": delta_score, "candidate_score": cand_metrics.get("score"), "metrics_delta": md, "edit_payload": { "operation": operation, "span_start": span_start, "span_end": span_end, "edited_text": edited_text, }, } ) except Exception as e: candidates.append( { "candidate_index": candidate_idx, "error": str(e), "valid": False, "goal_improved": False, "local_chunk_improved": False, "chunk_goal_delta": -999.0, "invalid_reasons": [str(e)], "delta_score": -999.0, "candidate_score": None, "llm_prompt_debug": { "operation": operation, "cascade_level": cascade_level, "goal_type": goal.get("type"), "goal_label": goal.get("label"), }, "operation": operation, "sentence_index": sent_idx, "span_start": span_start, "span_end": span_end, "span_variant": span_variant, "sentence_before": original_span_text, } ) goal_attempt_cursor[goal_key] = base_attempt_cursor + span_trials primary_span = chosen_spans[0] if chosen_spans else {"operation": "-", "span_start": 0, "span_end": 0, "sentence_index": 0, "span_variant": 0, "sentence_before": ""} valid_raw_candidates = [c for c in candidates if c.get("valid")] valid_candidates = [ c for c in valid_raw_candidates if c.get("valid") and ( c.get("goal_improved") or (goal.get("type") == "bert" and float(c.get("bert_phrase_delta") or 0.0) > 0.0) or float(c.get("candidate_score") or -1) > float(current_metrics["score"]) ) and ( goal.get("type") != "bert" or float(c.get("bert_phrase_delta") or 0.0) > 0.0 or c.get("local_chunk_improved") ) ] if not valid_candidates: # Local-first accumulation mode: # if we have guardrail-valid candidates that improve chunk relevance, # apply the strongest local edit immediately and continue optimizing next chunks. local_progress_candidates = [ c for c in valid_raw_candidates if c.get("local_chunk_improved") ] if local_progress_candidates: best_local = sorted( local_progress_candidates, key=lambda c: ( float(c.get("chunk_goal_delta") or 0.0), float(c.get("bert_phrase_delta") or 0.0), float(c.get("candidate_score") or -999.0), ), reverse=True, )[0] prev_metrics = current_metrics current_text = best_local["text"] current_analysis = best_local["analysis"] current_semantic = best_local["semantic"] current_metrics = best_local["metrics"] applied_changes += 1 queued_candidates = [] logs.append( { "step": step + 1, "status": "applied_local_progress", "goal": goal, "cascade_level": cascade_level, "operation": best_local.get("operation"), "sentence_index": best_local.get("sentence_index"), "span_start": best_local.get("span_start"), "span_end": best_local.get("span_end"), "span_variant": best_local.get("span_variant"), "sentence_before": best_local.get("sentence_before"), "sentence_after": best_local.get("sentence_after"), "reason": "Applied best local-improvement candidate despite no immediate global gain.", "current_score": prev_metrics.get("score"), "metrics_before": prev_metrics, "metrics_after": current_metrics, "delta_score": round(float(current_metrics.get("score", 0)) - float(prev_metrics.get("score", 0)), 3), "chosen_candidate_index": best_local.get("candidate_index"), "chosen_chunk_goal_delta": best_local.get("chunk_goal_delta"), "chosen_bert_phrase_delta": best_local.get("bert_phrase_delta"), "chosen_metrics_delta": best_local.get("metrics_delta"), "candidates": [ { "candidate_index": c.get("candidate_index"), "valid": c.get("valid", False), "goal_improved": c.get("goal_improved", False), "bert_phrase_delta": c.get("bert_phrase_delta"), "local_chunk_improved": c.get("local_chunk_improved", False), "chunk_goal_delta": c.get("chunk_goal_delta"), "chunk_relevance_before": c.get("chunk_relevance_before"), "chunk_relevance_after": c.get("chunk_relevance_after"), "term_diff": c.get("term_diff"), "llm_prompt_debug": c.get("llm_prompt_debug"), "metrics_delta": c.get("metrics_delta"), "invalid_reasons": c.get("invalid_reasons", []), "delta_score": c.get("delta_score"), "candidate_score": c.get("candidate_score"), "sentence_after": c.get("sentence_after"), "error": c.get("error"), } for c in candidates ], } ) consecutive_failures = 0 cascade_level = 1 goal_attempt_cursor[goal_key] = base_attempt_cursor + 1 continue local_pool = [ c for c in candidates if c.get("local_chunk_improved") and c.get("edit_payload") and c.get("candidate_score") is not None ] local_pool.sort( key=lambda c: ( float(c.get("chunk_goal_delta") or -999.0), float(c.get("candidate_score") or -999.0), ), reverse=True, ) for c in local_pool[:4]: queue_key = ( goal_key, c.get("operation"), c.get("span_start"), c.get("span_end"), str((c.get("sentence_after") or "")).strip().lower(), ) if not any(x.get("queue_key") == queue_key for x in queued_candidates): queued_candidates.append({"queue_key": queue_key, "candidate": c}) batch_applied = False batch_info: Dict[str, Any] = {} if len(queued_candidates) >= 2: pool = [x["candidate"] for x in queued_candidates[:6]] combos = _non_conflicting_edit_combos(pool, min_size=2, max_size=4) best_batch: Optional[Dict[str, Any]] = None prev_metrics = current_metrics for combo in combos: edits = [c.get("edit_payload") for c in combo if c.get("edit_payload")] if len(edits) != len(combo): continue batch_sentences = _apply_edits_to_sentences(sentences, edits) batch_text = " ".join(batch_sentences).strip() batch_analysis = _build_analysis_snapshot( batch_text, competitors, keywords, language, target_title, competitor_titles ) batch_semantic = _build_semantic_snapshot(batch_text, competitors, language) batch_metrics = _compute_metrics(batch_analysis, batch_semantic, keywords, language) b_valid, b_reasons, b_goal = _is_candidate_valid( current_metrics, batch_metrics, goal["type"], goal["label"], optimization_mode ) b_delta = round(batch_metrics["score"] - current_metrics["score"], 3) local_sum = sum(float(c.get("chunk_goal_delta") or 0.0) for c in combo) if not (b_valid and (b_goal or b_delta > 0)): continue if goal.get("type") == "bert" and local_sum < (_min_chunk_delta("bert") * len(combo)): continue cand = { "combo": combo, "batch_text": batch_text, "batch_analysis": batch_analysis, "batch_semantic": batch_semantic, "batch_metrics": batch_metrics, "b_delta": b_delta, "local_sum": local_sum, "b_reasons": b_reasons, "b_goal": b_goal, } if best_batch is None or ( cand["b_delta"], cand["local_sum"], len(cand["combo"]), ) > ( best_batch["b_delta"], best_batch["local_sum"], len(best_batch["combo"]), ): best_batch = cand if best_batch: current_text = best_batch["batch_text"] current_analysis = best_batch["batch_analysis"] current_semantic = best_batch["batch_semantic"] current_metrics = best_batch["batch_metrics"] applied_changes += 1 batch_applied = True batch_info = { "status": "applied_batch", "batch_candidate_ids": [c.get("candidate_index") for c in best_batch["combo"]], "batch_size": len(best_batch["combo"]), "batch_local_chunk_delta_sum": round(best_batch["local_sum"], 4), "delta_score": best_batch["b_delta"], "metrics_before": prev_metrics, "metrics_after": best_batch["batch_metrics"], "metrics_delta": _metrics_delta(prev_metrics, best_batch["batch_metrics"]), } queued_candidates = [] consecutive_failures = 0 cascade_level = 1 goal_attempt_cursor[goal_key] = 0 if not batch_applied: batch_info = { "status": "batch_rejected", "reason": "Queued local improvements could not pass global constraints together.", } if batch_applied: logs.append( { "step": step + 1, "status": "applied_batch", "goal": goal, "cascade_level": cascade_level, "operation": "batch", "sentence_index": primary_span.get("sentence_index"), "span_start": primary_span.get("span_start"), "span_end": primary_span.get("span_end"), "span_variant": primary_span.get("span_variant"), "sentence_before": primary_span.get("sentence_before"), "current_score": (batch_info.get("metrics_before") or {}).get("score"), "metrics_before": batch_info.get("metrics_before"), "metrics_after": current_metrics, "reason": "Applied queued local-improvement edits as a batch.", "batch_info": batch_info, "candidates": [ { "candidate_index": c.get("candidate_index"), "valid": c.get("valid", False), "goal_improved": c.get("goal_improved", False), "bert_phrase_delta": c.get("bert_phrase_delta"), "local_chunk_improved": c.get("local_chunk_improved", False), "chunk_goal_delta": c.get("chunk_goal_delta"), "chunk_relevance_before": c.get("chunk_relevance_before"), "chunk_relevance_after": c.get("chunk_relevance_after"), "term_diff": c.get("term_diff"), "llm_prompt_debug": c.get("llm_prompt_debug"), "metrics_delta": c.get("metrics_delta"), "invalid_reasons": c.get("invalid_reasons", []), "delta_score": c.get("delta_score"), "candidate_score": c.get("candidate_score"), "sentence_after": c.get("sentence_after"), "error": c.get("error"), } for c in candidates ], } ) continue logs.append( { "step": step + 1, "status": "rejected", "goal": goal, "cascade_level": cascade_level, "operation": primary_span.get("operation"), "sentence_index": primary_span.get("sentence_index"), "span_start": primary_span.get("span_start"), "span_end": primary_span.get("span_end"), "span_variant": primary_span.get("span_variant"), "sentence_before": primary_span.get("sentence_before"), "current_score": current_metrics["score"], "reason": ( "No valid candidate satisfied constraints." if not valid_raw_candidates else "Valid candidates existed but none improved goal or total score." ), "valid_candidates_count": len(valid_raw_candidates), "promotable_candidates_count": len(valid_candidates), "queued_local_candidates": len(local_pool), "queued_total": len(queued_candidates), "batch_info": batch_info if batch_info else None, "candidates": [ { "candidate_index": c.get("candidate_index"), "valid": c.get("valid", False), "goal_improved": c.get("goal_improved", False), "bert_phrase_delta": c.get("bert_phrase_delta"), "local_chunk_improved": c.get("local_chunk_improved", False), "chunk_goal_delta": c.get("chunk_goal_delta"), "chunk_relevance_before": c.get("chunk_relevance_before"), "chunk_relevance_after": c.get("chunk_relevance_after"), "term_diff": c.get("term_diff"), "llm_prompt_debug": c.get("llm_prompt_debug"), "metrics_delta": c.get("metrics_delta"), "invalid_reasons": c.get("invalid_reasons", []), "delta_score": c.get("delta_score"), "candidate_score": c.get("candidate_score"), "sentence_after": c.get("sentence_after"), "error": c.get("error"), } for c in candidates ], } ) consecutive_failures += 1 if consecutive_failures >= 2 and cascade_level < 4: cascade_level += 1 consecutive_failures = 0 logs[-1]["escalated_to_level"] = cascade_level continue best = sorted( valid_candidates, key=lambda c: ( 1 if c.get("goal_improved") else 0, float(c.get("bert_phrase_delta") or 0.0), float(c.get("chunk_goal_delta") or 0.0), c["metrics"]["score"], ), reverse=True, )[0] prev_metrics = current_metrics current_text = best["text"] current_analysis = best["analysis"] current_semantic = best["semantic"] current_metrics = best["metrics"] applied_changes += 1 queued_candidates = [] logs.append( { "step": step + 1, "status": "applied", "goal": goal, "cascade_level": cascade_level, "operation": best.get("operation"), "sentence_index": best.get("sentence_index"), "span_start": best.get("span_start"), "span_end": best.get("span_end"), "span_variant": best.get("span_variant"), "sentence_before": best.get("sentence_before"), "sentence_after": best["sentence_after"], "current_score": prev_metrics["score"], "metrics_before": prev_metrics, "metrics_after": current_metrics, "delta_score": round(current_metrics["score"] - prev_metrics["score"], 3), "chosen_candidate_index": best.get("candidate_index"), "candidates": [ { "candidate_index": c.get("candidate_index"), "valid": c.get("valid", False), "goal_improved": c.get("goal_improved", False), "bert_phrase_delta": c.get("bert_phrase_delta"), "local_chunk_improved": c.get("local_chunk_improved", False), "chunk_goal_delta": c.get("chunk_goal_delta"), "chunk_relevance_before": c.get("chunk_relevance_before"), "chunk_relevance_after": c.get("chunk_relevance_after"), "term_diff": c.get("term_diff"), "llm_prompt_debug": c.get("llm_prompt_debug"), "metrics_delta": c.get("metrics_delta"), "invalid_reasons": c.get("invalid_reasons", []), "delta_score": c.get("delta_score"), "candidate_score": c.get("candidate_score"), "sentence_after": c.get("sentence_after"), "error": c.get("error"), } for c in candidates ], } ) # After successful edit, return to cheapest level and reset failure streak. consecutive_failures = 0 cascade_level = 1 goal_attempt_cursor[goal_key] = 0 return { "ok": True, "optimized_text": current_text, "baseline_metrics": baseline_metrics, "final_metrics": current_metrics, "iterations": logs, "applied_changes": applied_changes, "optimization_mode": optimization_mode, }