Spaces:
Sleeping
Sleeping
| import json | |
| import re | |
| from itertools import combinations | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import requests | |
| import logic | |
| import nlp_processor | |
| import semantic_graph | |
| STOP_WORDS = { | |
| "en": {"a", "an", "and", "or", "the", "to", "of", "for", "in", "on", "at", "by", "with", "from", "as", "is", "are", "be", "was", "were"}, | |
| "ru": {"и", "или", "в", "во", "на", "по", "с", "со", "к", "ко", "для", "из", "за", "что", "это", "как", "а", "но", "у", "о", "от"}, | |
| "de": {"und", "oder", "der", "die", "das", "zu", "von", "mit", "fur", "in", "auf", "ist", "sind"}, | |
| "es": {"y", "o", "el", "la", "los", "las", "de", "del", "en", "con", "para", "por", "es", "son"}, | |
| "it": {"e", "o", "il", "lo", "la", "i", "gli", "le", "di", "del", "in", "con", "per", "da", "e", "sono"}, | |
| "pl": {"i", "oraz", "lub", "w", "na", "z", "ze", "do", "od", "po", "dla", "to", "jest", "sa"}, | |
| "pt": {"e", "ou", "o", "a", "os", "as", "de", "do", "da", "em", "no", "na", "com", "para", "por", "e", "sao"}, | |
| } | |
| BERT_TARGET_THRESHOLD = 0.7 | |
| BERT_GOAL_DELTA_MIN = 0.005 | |
| SEMANTIC_GAP_TOLERANCE_PCT = 0.15 | |
| SEMANTIC_GAP_MIN_ABS = 3.0 | |
| def _tokenize(text: str) -> List[str]: | |
| return [ | |
| x | |
| for x in re.sub(r"[^\w\s-]+", " ", (text or "").lower(), flags=re.UNICODE).split() | |
| if len(x) >= 2 | |
| ] | |
| def _filter_stopwords(tokens: List[str], language: str) -> List[str]: | |
| stop = STOP_WORDS.get(language, STOP_WORDS["en"]) | |
| return [t for t in tokens if t not in stop] | |
| def _split_sentences(text: str) -> List[str]: | |
| text = (text or "").strip() | |
| if not text: | |
| return [] | |
| parts = re.split(r"(?<=[\.\!\?])\s+", text) | |
| parts = [p.strip() for p in parts if p.strip()] | |
| if len(parts) <= 1: | |
| parts = [p.strip() for p in re.split(r"\n+", text) if p.strip()] | |
| return parts | |
| def _max_sentences_for_level(cascade_level: int, operation: str) -> int: | |
| if operation == "insert": | |
| return 2 | |
| if cascade_level <= 1: | |
| return 2 | |
| if cascade_level == 2: | |
| return 3 | |
| return 4 | |
| def _validate_candidate_text(edited_text: str, cascade_level: int, operation: str) -> List[str]: | |
| reasons: List[str] = [] | |
| text = (edited_text or "").strip() | |
| if not text: | |
| reasons.append("empty_candidate") | |
| return reasons | |
| sentence_count = len(_split_sentences(text)) | |
| max_sent = _max_sentences_for_level(cascade_level, operation) | |
| if sentence_count > max_sent: | |
| reasons.append(f"too_many_sentences>{max_sent}") | |
| # Heuristic quality checks: duplicated words/entities and obvious malformed token joins. | |
| if re.search(r"\b([A-Za-z][A-Za-z0-9-]{1,})\s+\1\b", text, flags=re.IGNORECASE): | |
| reasons.append("duplicated_entity_or_word") | |
| # Catch broken lowercase+Camel join artifacts like "likemBit", but allow brand CamelCase like "RedDogCasino". | |
| if re.search(r"\b[a-z]{6,}[A-Z][a-z]+\b", text): | |
| reasons.append("suspicious_token_join") | |
| return reasons | |
| def _build_analysis_snapshot( | |
| target_text: str, | |
| competitors: List[str], | |
| keywords: List[str], | |
| language: str, | |
| target_title: str, | |
| competitor_titles: List[str], | |
| ) -> Dict[str, Any]: | |
| wc_target = logic.count_words(target_text, language) | |
| wc_comp = [logic.count_words(t, language) for t in competitors] | |
| if wc_comp: | |
| avg_total = sum(c["total"] for c in wc_comp) / len(wc_comp) | |
| avg_sig = sum(c["significant"] for c in wc_comp) / len(wc_comp) | |
| else: | |
| avg_total = 0 | |
| avg_sig = 0 | |
| ngram_stats = logic.calculate_ngram_stats(target_text, competitors, language) | |
| key_phrases, _ = logic.parse_keywords(keywords, language) | |
| bm25 = logic.calculate_bm25_recommendations(target_text, competitors, keywords, language) | |
| bert = logic.perform_bert_analysis(target_text, competitors, key_phrases, language) | |
| title_data = {} | |
| if (target_title or "").strip(): | |
| title_data = logic.analyze_title(target_title, competitor_titles, keywords, language) | |
| return { | |
| "word_counts": { | |
| "target": wc_target, | |
| "competitors": wc_comp, | |
| "avg": {"total": round(avg_total), "significant": round(avg_sig)}, | |
| }, | |
| "ngram_stats": ngram_stats, | |
| "bm25_recommendations": bm25, | |
| "bert_analysis": bert, | |
| "title_analysis": title_data, | |
| } | |
| def _build_semantic_snapshot( | |
| target_text: str, | |
| competitors: List[str], | |
| language: str, | |
| ) -> Dict[str, Any]: | |
| def _build_doc(text: str, doc_id: int) -> Dict[str, Any]: | |
| sentences_data = nlp_processor.preprocess_text(text, language) | |
| graph, word_weights = semantic_graph.build_semantic_graph(sentences_data, lang=language) | |
| graph_data = semantic_graph.get_graph_data_for_frontend(graph) | |
| return { | |
| "id": doc_id, | |
| "text": text, | |
| "word_weights": word_weights, | |
| "stats": { | |
| "nodes": len(graph_data.get("nodes", [])), | |
| "links": len(graph_data.get("links", [])), | |
| }, | |
| } | |
| target_doc = _build_doc(target_text, 0) | |
| comp_docs = [] | |
| for idx, c in enumerate([x for x in competitors if (x or "").strip()]): | |
| comp_docs.append(_build_doc(c, idx + 1)) | |
| num_comp = len(comp_docs) | |
| target_weights = target_doc["word_weights"] | |
| all_terms = set(target_weights.keys()) | |
| for c in comp_docs: | |
| all_terms.update(c["word_weights"].keys()) | |
| term_power_table = [] | |
| for term in all_terms: | |
| target_weight = int(target_weights.get(term, 0)) | |
| comp_weights = [int(c["word_weights"].get(term, 0)) for c in comp_docs] | |
| comp_avg = round(sum(comp_weights) / max(1, num_comp), 2) | |
| comp_occ = sum(1 for w in comp_weights if w > 0) | |
| term_power_table.append( | |
| { | |
| "term": term, | |
| "target_weight": target_weight, | |
| "competitor_avg_weight": comp_avg, | |
| "comp_occurrence": comp_occ, | |
| "comp_total": num_comp, | |
| } | |
| ) | |
| return {"comparison": {"term_power_table": term_power_table, "num_competitors": num_comp}} | |
| def _is_semantic_gap(target_weight: float, competitor_avg_weight: float) -> bool: | |
| # Gap is significant only when competitor is meaningfully above target: | |
| # - relative margin above tolerance band (15%) | |
| # - and absolute margin to avoid micro-noise around normalized weights | |
| if competitor_avg_weight <= 0: | |
| return False | |
| abs_gap = competitor_avg_weight - target_weight | |
| rel_threshold = target_weight * (1.0 + SEMANTIC_GAP_TOLERANCE_PCT) | |
| return (competitor_avg_weight > rel_threshold) and (abs_gap >= SEMANTIC_GAP_MIN_ABS) | |
| def _compute_metrics(analysis: Dict[str, Any], semantic: Dict[str, Any], keywords: List[str], language: str) -> Dict[str, Any]: | |
| competitor_count = len(analysis.get("word_counts", {}).get("competitors", [])) | |
| min_signal = 1 if competitor_count <= 1 else 2 | |
| bert_details = analysis.get("bert_analysis", {}).get("detailed", []) or [] | |
| bert_low = [d for d in bert_details if float(d.get("my_max_score", 0)) < BERT_TARGET_THRESHOLD] | |
| bert_phrase_scores = { | |
| str(d.get("phrase", "")).strip().lower(): float(d.get("my_max_score", 0) or 0.0) | |
| for d in bert_details | |
| if str(d.get("phrase", "")).strip() | |
| } | |
| bm25_remove = [x for x in (analysis.get("bm25_recommendations") or []) if x.get("action") == "remove"] | |
| bm25_remove_count = len(bm25_remove) | |
| ngram_signal_count = 0 | |
| ngrams = analysis.get("ngram_stats", {}) or {} | |
| for bucket_name in ("unigrams", "bigrams"): | |
| for item in (ngrams.get(bucket_name) or []): | |
| comp_occ = int(item.get("comp_occurrence", 0)) | |
| if comp_occ < min_signal: | |
| continue | |
| target = float(item.get("target_count", 0)) | |
| comp_avg = float(item.get("competitor_avg", 0)) | |
| ratio_signal = comp_avg > 0 if target == 0 else comp_avg >= target * 2 | |
| if ratio_signal: | |
| ngram_signal_count += 1 | |
| title_score = None | |
| title_bert = analysis.get("title_analysis", {}).get("bert", {}) | |
| if title_bert and title_bert.get("target_score") is not None: | |
| title_score = float(title_bert.get("target_score", 0)) | |
| keyword_terms = set() | |
| for kw in keywords: | |
| tokens = _filter_stopwords(_tokenize(kw), language) | |
| for t in tokens: | |
| keyword_terms.add(t) | |
| for n in (2, 3): | |
| for i in range(0, max(0, len(tokens) - n + 1)): | |
| keyword_terms.add(" ".join(tokens[i : i + n])) | |
| table = semantic.get("comparison", {}).get("term_power_table", []) or [] | |
| by_term = {str(r.get("term", "")).lower(): r for r in table} | |
| semantic_gap_count = 0 | |
| semantic_gap_sum = 0.0 | |
| semantic_gap_terms: List[Dict[str, Any]] = [] | |
| for term in keyword_terms: | |
| row = by_term.get(term) | |
| if not row: | |
| continue | |
| target_w = float(row.get("target_weight", 0)) | |
| comp_w = float(row.get("competitor_avg_weight", 0)) | |
| gap = comp_w - target_w | |
| if int(row.get("comp_occurrence", 0)) >= min_signal and _is_semantic_gap(target_w, comp_w): | |
| semantic_gap_count += 1 | |
| semantic_gap_sum += gap | |
| base = max(1.0, target_w) | |
| semantic_gap_terms.append( | |
| { | |
| "term": term, | |
| "target_weight": round(target_w, 2), | |
| "competitor_avg_weight": round(comp_w, 2), | |
| "gap": round(gap, 2), | |
| "gap_pct_of_target": round(gap / base, 4), | |
| "comp_occurrence": int(row.get("comp_occurrence", 0)), | |
| "comp_total": int(row.get("comp_total", 0)), | |
| } | |
| ) | |
| # Composite score (0..100) | |
| w_bert, w_bm25, w_ng, w_title, w_sem = 30, 20, 15, 10, 25 | |
| bert_comp = 1.0 - (len(bert_low) / max(1, len(bert_details))) | |
| bm25_comp = 1.0 if bm25_remove_count <= 3 else max(0.0, 1.0 - ((bm25_remove_count - 3) / 10.0)) | |
| ng_comp = max(0.0, 1.0 - (ngram_signal_count / 15.0)) | |
| title_comp = 1.0 if title_score is None else min(1.0, max(0.0, title_score / 0.65)) | |
| sem_comp = max(0.0, 1.0 - (semantic_gap_count / 20.0)) | |
| weighted = ( | |
| w_bert * bert_comp | |
| + w_bm25 * bm25_comp | |
| + w_ng * ng_comp | |
| + w_title * title_comp | |
| + w_sem * sem_comp | |
| ) | |
| total_w = w_bert + w_bm25 + w_ng + w_title + w_sem | |
| score = round((weighted / total_w) * 100.0, 2) | |
| return { | |
| "score": score, | |
| "competitor_count": competitor_count, | |
| "min_competitor_signal": min_signal, | |
| "bert_low_count": len(bert_low), | |
| "bert_total_keywords": len(bert_details), | |
| "bert_phrase_scores": bert_phrase_scores, | |
| "bm25_remove_count": bm25_remove_count, | |
| "ngram_signal_count": ngram_signal_count, | |
| "title_bert_score": title_score, | |
| "semantic_gap_count": semantic_gap_count, | |
| "semantic_gap_sum": round(semantic_gap_sum, 4), | |
| "semantic_gap_terms": sorted( | |
| semantic_gap_terms, | |
| key=lambda x: (x.get("gap", 0), x.get("gap_pct_of_target", 0)), | |
| reverse=True, | |
| )[:20], | |
| } | |
| def _choose_optimization_goal(analysis: Dict[str, Any], semantic: Dict[str, Any], keywords: List[str], language: str) -> Dict[str, Any]: | |
| bert_details = analysis.get("bert_analysis", {}).get("detailed", []) or [] | |
| low_bert = [x for x in bert_details if float(x.get("my_max_score", 0)) < BERT_TARGET_THRESHOLD] | |
| if low_bert: | |
| worst = sorted(low_bert, key=lambda x: float(x.get("my_max_score", 0)))[0] | |
| focus_terms = _filter_stopwords(_tokenize(worst.get("phrase", "")), language)[:4] | |
| return {"type": "bert", "label": str(worst.get("phrase", "")), "focus_terms": focus_terms, "avoid_terms": []} | |
| bm25_remove = [x for x in (analysis.get("bm25_recommendations") or []) if x.get("action") == "remove"] | |
| if len(bm25_remove) >= 4: | |
| spam_terms = [str(x.get("word", "")) for x in sorted(bm25_remove, key=lambda r: int(r.get("count", 0)), reverse=True)[:4]] | |
| return {"type": "bm25", "label": "reduce spam", "focus_terms": [], "avoid_terms": spam_terms} | |
| # Semantic keyword gaps | |
| lang_stop = STOP_WORDS.get(language, STOP_WORDS["en"]) | |
| keyword_terms = set() | |
| for kw in keywords: | |
| toks = [t for t in _tokenize(kw) if t not in lang_stop] | |
| keyword_terms.update(toks) | |
| for n in (2, 3): | |
| for i in range(0, max(0, len(toks) - n + 1)): | |
| keyword_terms.add(" ".join(toks[i : i + n])) | |
| table = semantic.get("comparison", {}).get("term_power_table", []) or [] | |
| candidate_rows: List[Tuple[str, float]] = [] | |
| for row in table: | |
| term = str(row.get("term", "")).lower() | |
| if term not in keyword_terms: | |
| continue | |
| target_w = float(row.get("target_weight", 0)) | |
| comp_w = float(row.get("competitor_avg_weight", 0)) | |
| gap = comp_w - target_w | |
| if _is_semantic_gap(target_w, comp_w): | |
| candidate_rows.append((term, gap)) | |
| if candidate_rows: | |
| top_term = sorted(candidate_rows, key=lambda x: x[1], reverse=True)[0][0] | |
| return {"type": "semantic", "label": top_term, "focus_terms": [top_term], "avoid_terms": []} | |
| # Fallback: ngram add signal | |
| for bucket_name in ("unigrams", "bigrams"): | |
| bucket = analysis.get("ngram_stats", {}).get(bucket_name, []) or [] | |
| for item in bucket: | |
| target = float(item.get("target_count", 0)) | |
| comp_avg = float(item.get("competitor_avg", 0)) | |
| if (target == 0 and comp_avg > 0) or (target > 0 and comp_avg >= target * 2): | |
| return {"type": "ngram", "label": str(item.get("ngram", "")), "focus_terms": _tokenize(str(item.get("ngram", "")))[:3], "avoid_terms": []} | |
| return {"type": "none", "label": "no-op", "focus_terms": [], "avoid_terms": []} | |
| def _choose_sentence_idx(sentences: List[str], focus_terms: List[str], avoid_terms: List[str], language: str) -> int: | |
| if not sentences: | |
| return 0 | |
| stop = STOP_WORDS.get(language, STOP_WORDS["en"]) | |
| focus = [x for x in focus_terms if x and x not in stop] | |
| if avoid_terms: | |
| best_idx, best_score = 0, -1.0 | |
| for i, s in enumerate(sentences): | |
| lower = s.lower() | |
| score = sum(lower.count(t.lower()) for t in avoid_terms if t) | |
| if score > best_score: | |
| best_idx, best_score = i, score | |
| return best_idx | |
| if focus: | |
| best_idx, best_score = 0, -1.0 | |
| for i, s in enumerate(sentences): | |
| lower = s.lower() | |
| score = sum(lower.count(t.lower()) for t in focus) | |
| if score > best_score: | |
| best_idx, best_score = i, score | |
| return best_idx | |
| return min(2, len(sentences) - 1) | |
| def _rank_sentence_indices( | |
| sentences: List[str], | |
| focus_terms: List[str], | |
| avoid_terms: List[str], | |
| language: str, | |
| goal_type: str = "", | |
| goal_label: str = "", | |
| ) -> List[int]: | |
| if not sentences: | |
| return [0] | |
| stop = STOP_WORDS.get(language, STOP_WORDS["en"]) | |
| focus = [x for x in focus_terms if x and x not in stop] | |
| avoid = [x for x in avoid_terms if x] | |
| center = (len(sentences) - 1) / 2.0 | |
| # For BERT optimization prefer natural prose chunks over list/menu/noisy blocks. | |
| candidate_indices = list(range(len(sentences))) | |
| if goal_type == "bert": | |
| non_noise = [i for i, s in enumerate(sentences) if not _is_noise_like_sentence(s)] | |
| if non_noise: | |
| candidate_indices = non_noise | |
| scored: List[Tuple[int, float, int]] = [] | |
| for idx in candidate_indices: | |
| s = sentences[idx] | |
| lower = s.lower() | |
| focus_score = sum(lower.count(t.lower()) for t in focus) | |
| avoid_score = sum(lower.count(t.lower()) for t in avoid) | |
| chunk_rel = _chunk_goal_relevance(s, goal_type, goal_label, focus_terms, language) | |
| noise_penalty = 1.0 if _is_noise_like_sentence(s) else 0.0 | |
| # Prefer semantically relevant and lexical matches; push noisy headers/CTA lower. | |
| score = (chunk_rel * 4.0) + (focus_score * 3.0) + (avoid_score * 2.0) - (noise_penalty * 3.0) - (abs(idx - center) * 0.05) | |
| scored.append((idx, score, len(s))) | |
| scored.sort(key=lambda x: (x[1], -x[2]), reverse=True) | |
| ordered = [idx for idx, _, _ in scored] | |
| if not ordered: | |
| ordered = list(range(len(sentences))) | |
| return ordered | |
| def _span_variants_for_level(cascade_level: int) -> List[Tuple[str, int, int]]: | |
| # (operation, left_radius, right_radius) | |
| if cascade_level <= 1: | |
| return [("rewrite", 0, 0), ("rewrite", 0, 1), ("rewrite", 1, 0)] | |
| if cascade_level == 2: | |
| return [("rewrite", 1, 1), ("rewrite", 0, 2), ("rewrite", 2, 0), ("rewrite", 1, 2), ("rewrite", 2, 1)] | |
| if cascade_level == 3: | |
| return [("insert", 0, 0), ("insert", 0, 0), ("insert", 0, 0), ("rewrite", 1, 1)] | |
| return [("rewrite", 2, 2), ("rewrite", 1, 3), ("rewrite", 3, 1), ("rewrite", 2, 3), ("rewrite", 3, 2)] | |
| def _choose_edit_span( | |
| sentences: List[str], | |
| goal: Dict[str, Any], | |
| language: str, | |
| cascade_level: int, | |
| attempt_cursor: int, | |
| ) -> Tuple[str, int, int, int, int]: | |
| ranked = _rank_sentence_indices( | |
| sentences, | |
| goal.get("focus_terms", []) or [], | |
| goal.get("avoid_terms", []) or [], | |
| language, | |
| str(goal.get("type", "") or ""), | |
| str(goal.get("label", "") or ""), | |
| ) | |
| variants = _span_variants_for_level(cascade_level) | |
| total = max(1, len(ranked) * len(variants)) | |
| pick = attempt_cursor % total | |
| sent_pick = pick % len(ranked) | |
| variant_pick = (pick // len(ranked)) % len(variants) | |
| sent_idx = ranked[sent_pick] | |
| operation, left_radius, right_radius = variants[variant_pick] | |
| if operation == "insert": | |
| span_start = sent_idx | |
| span_end = sent_idx | |
| else: | |
| span_start = max(0, sent_idx - left_radius) | |
| span_end = min(len(sentences) - 1, sent_idx + right_radius) | |
| return operation, span_start, span_end, sent_idx, variant_pick | |
| def _is_noise_like_sentence(text: str) -> bool: | |
| s = (text or "").strip() | |
| if not s: | |
| return True | |
| lower = s.lower() | |
| tokens = _tokenize(lower) | |
| if len(tokens) <= 2: | |
| return True | |
| if len(tokens) <= 8 and re.search(r"\b(play|explore|best|top|contact|login|signup|casino)\b", lower): | |
| return True | |
| if len(s) <= 90 and re.fullmatch(r"[A-Z0-9\s\-\|\:\.]+", s): | |
| return True | |
| if re.search(r"\b(best|top)\b.{0,20}\b(alternative|alternatives|casino|casinos)\b", lower) and len(tokens) <= 12: | |
| return True | |
| return False | |
| def _chunk_goal_relevance( | |
| text: str, | |
| goal_type: str, | |
| goal_label: str, | |
| focus_terms: List[str], | |
| language: str, | |
| ) -> float: | |
| chunk = (text or "").strip() | |
| if not chunk: | |
| return 0.0 | |
| if goal_type == "bert" and (goal_label or "").strip(): | |
| try: | |
| model = logic.get_bert_model() | |
| embeddings = model.encode([goal_label.strip(), chunk], convert_to_tensor=True) | |
| return float(logic.util.cos_sim(embeddings[0:1], embeddings[1:2])[0][0].item()) | |
| except Exception: | |
| pass | |
| # Lexical fallback for non-BERT goals or if embedding scoring is unavailable. | |
| toks = _filter_stopwords(_tokenize(chunk), language) | |
| if not toks: | |
| return 0.0 | |
| focus = [f.lower() for f in (focus_terms or []) if f] | |
| if not focus: | |
| return 0.0 | |
| overlap = 0.0 | |
| token_str = " ".join(toks) | |
| for f in focus: | |
| if " " in f: | |
| overlap += 1.0 if f in token_str else 0.0 | |
| else: | |
| overlap += 1.0 if f in toks else 0.0 | |
| return overlap / max(1.0, float(len(focus))) | |
| def _chunk_goal_delta( | |
| before_text: str, | |
| after_text: str, | |
| goal_type: str, | |
| goal_label: str, | |
| focus_terms: List[str], | |
| language: str, | |
| ) -> float: | |
| before_rel = _chunk_goal_relevance(before_text, goal_type, goal_label, focus_terms, language) | |
| after_rel = _chunk_goal_relevance(after_text, goal_type, goal_label, focus_terms, language) | |
| return round(after_rel - before_rel, 4) | |
| def _min_chunk_delta(goal_type: str) -> float: | |
| if goal_type == "bert": | |
| return 0.01 | |
| return 0.05 | |
| def _chunk_relevance_pair( | |
| before_text: str, | |
| after_text: str, | |
| goal_type: str, | |
| goal_label: str, | |
| focus_terms: List[str], | |
| language: str, | |
| ) -> Tuple[float, float]: | |
| before_rel = _chunk_goal_relevance(before_text, goal_type, goal_label, focus_terms, language) | |
| after_rel = _chunk_goal_relevance(after_text, goal_type, goal_label, focus_terms, language) | |
| return round(before_rel, 4), round(after_rel, 4) | |
| def _term_diff(before_text: str, after_text: str, language: str) -> Dict[str, List[str]]: | |
| before_tokens = _filter_stopwords(_tokenize(before_text), language) | |
| after_tokens = _filter_stopwords(_tokenize(after_text), language) | |
| before_set = set(before_tokens) | |
| after_set = set(after_tokens) | |
| removed = sorted(list(before_set - after_set))[:12] | |
| added = sorted(list(after_set - before_set))[:12] | |
| return {"added_terms": added, "removed_terms": removed} | |
| def _edits_conflict(a: Dict[str, Any], b: Dict[str, Any]) -> bool: | |
| if a.get("operation") == "insert" or b.get("operation") == "insert": | |
| return int(a.get("span_start", 0)) == int(b.get("span_start", 0)) | |
| a0, a1 = int(a.get("span_start", 0)), int(a.get("span_end", 0)) | |
| b0, b1 = int(b.get("span_start", 0)), int(b.get("span_end", 0)) | |
| return not (a1 < b0 or b1 < a0) | |
| def _apply_edits_to_sentences(sentences: List[str], edits: List[Dict[str, Any]]) -> List[str]: | |
| updated = list(sentences) | |
| ordered = sorted( | |
| edits, | |
| key=lambda e: (int(e.get("span_start", 0)), int(e.get("span_end", 0))), | |
| reverse=True, | |
| ) | |
| for edit in ordered: | |
| op = str(edit.get("operation", "rewrite")) | |
| start = int(edit.get("span_start", 0)) | |
| end = int(edit.get("span_end", start)) | |
| text = str(edit.get("edited_text", "")).strip() | |
| if not text: | |
| continue | |
| if op == "insert": | |
| updated = _insert_after(updated, end, text) | |
| else: | |
| updated = _replace_span(updated, start, end, text) | |
| return updated | |
| def _metrics_delta(prev_metrics: Dict[str, Any], next_metrics: Dict[str, Any]) -> Dict[str, Any]: | |
| keys = [ | |
| "score", | |
| "bert_low_count", | |
| "bm25_remove_count", | |
| "ngram_signal_count", | |
| "semantic_gap_count", | |
| "semantic_gap_sum", | |
| ] | |
| out: Dict[str, Any] = {} | |
| for k in keys: | |
| pv = prev_metrics.get(k) | |
| nv = next_metrics.get(k) | |
| if pv is None or nv is None: | |
| continue | |
| out[k] = round(float(nv) - float(pv), 4) | |
| pv_t = prev_metrics.get("title_bert_score") | |
| nv_t = next_metrics.get("title_bert_score") | |
| if pv_t is not None and nv_t is not None: | |
| out["title_bert_score"] = round(float(nv_t) - float(pv_t), 4) | |
| prev_terms = {str(x.get("term", "")).lower() for x in (prev_metrics.get("semantic_gap_terms") or []) if x.get("term")} | |
| next_terms = {str(x.get("term", "")).lower() for x in (next_metrics.get("semantic_gap_terms") or []) if x.get("term")} | |
| if prev_terms or next_terms: | |
| out["semantic_gap_terms_added"] = sorted(list(next_terms - prev_terms))[:12] | |
| out["semantic_gap_terms_removed"] = sorted(list(prev_terms - next_terms))[:12] | |
| return out | |
| def _non_conflicting_edit_combos(candidates: List[Dict[str, Any]], min_size: int = 2, max_size: int = 4) -> List[List[Dict[str, Any]]]: | |
| if not candidates: | |
| return [] | |
| n = len(candidates) | |
| combos: List[List[Dict[str, Any]]] = [] | |
| for r in range(max(2, min_size), min(max_size, n) + 1): | |
| for idxs in combinations(range(n), r): | |
| combo = [candidates[i] for i in idxs] | |
| conflict = False | |
| for i in range(len(combo)): | |
| for j in range(i + 1, len(combo)): | |
| e1 = combo[i].get("edit_payload") | |
| e2 = combo[j].get("edit_payload") | |
| if not e1 or not e2: | |
| conflict = True | |
| break | |
| if _edits_conflict(e1, e2): | |
| conflict = True | |
| break | |
| if conflict: | |
| break | |
| if not conflict: | |
| combos.append(combo) | |
| return combos | |
| def _extract_json_object(text: str) -> Optional[Dict[str, Any]]: | |
| raw = (text or "").strip() | |
| if not raw: | |
| return None | |
| try: | |
| return json.loads(raw) | |
| except Exception: | |
| pass | |
| m = re.search(r"\{[\s\S]*\}", raw) | |
| if not m: | |
| return None | |
| try: | |
| return json.loads(m.group(0)) | |
| except Exception: | |
| return None | |
| def _llm_edit_chunk( | |
| *, | |
| api_key: str, | |
| base_url: str, | |
| model: str, | |
| language: str, | |
| full_text: str, | |
| chunk_text: str, | |
| operation: str, | |
| context_before: str, | |
| context_after: str, | |
| cascade_level: int, | |
| goal_type: str, | |
| goal_label: str, | |
| focus_terms: List[str], | |
| avoid_terms: List[str], | |
| temperature: float, | |
| ) -> Dict[str, Any]: | |
| endpoint = base_url.rstrip("/") + "/chat/completions" | |
| op = operation if operation in {"rewrite", "insert"} else "rewrite" | |
| system_msg = ( | |
| "You are an SEO copy editor. Work locally, preserve narrative flow, factual tone, and language. " | |
| "Return strict JSON only: {\"edited_text\": \"...\"}. " | |
| "Do not rewrite the whole text. Never change topic or introduce unrelated entities." | |
| ) | |
| op_instruction = ( | |
| "Rewrite the provided chunk only." | |
| if op == "rewrite" | |
| else "Create a short bridge chunk (1-2 sentences) to insert after the chunk." | |
| ) | |
| max_sent = _max_sentences_for_level(cascade_level, op) | |
| user_msg = ( | |
| f"Language: {language}\n" | |
| f"Operation: {op}\n" | |
| f"Cascade level: L{cascade_level}\n" | |
| f"Goal: {goal_type} ({goal_label})\n" | |
| f"Instruction: {op_instruction}\n" | |
| f"Must preserve overall narrative and style.\n" | |
| "Text must be grammatically correct and natural for native readers.\n" | |
| "Keep edits tightly local to the provided chunk and immediate context only.\n" | |
| "Edit must be substantive (not just synonyms) and should increase relevance to the goal phrase.\n" | |
| f"Focus terms to strengthen: {', '.join(focus_terms) if focus_terms else '-'}\n" | |
| f"Terms to de-emphasize/avoid overuse: {', '.join(avoid_terms) if avoid_terms else '-'}\n\n" | |
| f"Chunk to edit/expand:\n{chunk_text}\n\n" | |
| f"Prev context:\n{context_before}\n\n" | |
| f"Next context:\n{context_after}\n\n" | |
| "Constraints:\n" | |
| "1) Keep text concise and locally coherent.\n" | |
| "2) Keep local coherence with surrounding text.\n" | |
| f"3) Max {max_sent} sentence(s) in edited_text.\n" | |
| "4) Keep key named entities from the original chunk unchanged when possible.\n" | |
| "5) For BERT goal, improve semantic match to goal phrase without keyword stuffing.\n" | |
| "6) Only output JSON object." | |
| ) | |
| payload = { | |
| "model": model, | |
| "temperature": float(max(0.0, min(1.2, temperature))), | |
| "messages": [ | |
| {"role": "system", "content": system_msg}, | |
| {"role": "user", "content": user_msg}, | |
| ], | |
| "response_format": {"type": "json_object"}, | |
| } | |
| headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} | |
| response = requests.post(endpoint, headers=headers, json=payload, timeout=60) | |
| response.raise_for_status() | |
| data = response.json() | |
| content = ( | |
| data.get("choices", [{}])[0] | |
| .get("message", {}) | |
| .get("content", "") | |
| ) | |
| parsed = _extract_json_object(content) | |
| edited = "" | |
| if parsed: | |
| edited = str(parsed.get("edited_text") or parsed.get("revised_sentence") or parsed.get("rewrite") or "").strip() | |
| if not edited: | |
| raise ValueError("LLM returned invalid JSON edit payload.") | |
| return { | |
| "edited_text": edited, | |
| "prompt_debug": { | |
| "operation": op, | |
| "cascade_level": cascade_level, | |
| "goal_type": goal_type, | |
| "goal_label": goal_label, | |
| "focus_terms": focus_terms, | |
| "avoid_terms": avoid_terms, | |
| "max_sentences": max_sent, | |
| "chunk_text": chunk_text, | |
| "context_before": context_before, | |
| "context_after": context_after, | |
| "temperature": float(max(0.0, min(1.2, temperature))), | |
| }, | |
| } | |
| def _replace_span(sentences: List[str], start_idx: int, end_idx: int, replacement_text: str) -> List[str]: | |
| replacement = _split_sentences(replacement_text) | |
| if not replacement: | |
| replacement = [replacement_text.strip()] | |
| return sentences[:start_idx] + replacement + sentences[end_idx + 1 :] | |
| def _insert_after(sentences: List[str], after_idx: int, inserted_text: str) -> List[str]: | |
| insertion = _split_sentences(inserted_text) | |
| if not insertion: | |
| insertion = [inserted_text.strip()] | |
| return sentences[: after_idx + 1] + insertion + sentences[after_idx + 1 :] | |
| def _goal_improved( | |
| goal_type: str, | |
| goal_label: str, | |
| prev_metrics: Dict[str, Any], | |
| next_metrics: Dict[str, Any], | |
| ) -> bool: | |
| if goal_type == "bert": | |
| key = (goal_label or "").strip().lower() | |
| prev_phrase = float((prev_metrics.get("bert_phrase_scores") or {}).get(key, 0.0)) | |
| next_phrase = float((next_metrics.get("bert_phrase_scores") or {}).get(key, 0.0)) | |
| # Accept smaller but real phrase-level gains to accumulate progress toward threshold (e.g. 0.51 -> 0.70). | |
| return (next_phrase - prev_phrase) >= BERT_GOAL_DELTA_MIN or next_metrics["bert_low_count"] < prev_metrics["bert_low_count"] | |
| if goal_type == "bm25": | |
| return next_metrics["bm25_remove_count"] < prev_metrics["bm25_remove_count"] | |
| if goal_type == "semantic": | |
| return next_metrics["semantic_gap_count"] < prev_metrics["semantic_gap_count"] | |
| if goal_type == "ngram": | |
| return next_metrics["ngram_signal_count"] < prev_metrics["ngram_signal_count"] | |
| return next_metrics["score"] > prev_metrics["score"] | |
| def _bert_phrase_delta(goal_label: str, prev_metrics: Dict[str, Any], next_metrics: Dict[str, Any]) -> float: | |
| key = (goal_label or "").strip().lower() | |
| prev_phrase = float((prev_metrics.get("bert_phrase_scores") or {}).get(key, 0.0)) | |
| next_phrase = float((next_metrics.get("bert_phrase_scores") or {}).get(key, 0.0)) | |
| return round(next_phrase - prev_phrase, 4) | |
| def _is_candidate_valid( | |
| prev_metrics: Dict[str, Any], | |
| next_metrics: Dict[str, Any], | |
| goal_type: str, | |
| goal_label: str, | |
| optimization_mode: str, | |
| ) -> Tuple[bool, List[str], bool]: | |
| mode = (optimization_mode or "balanced").lower() | |
| if mode not in {"conservative", "balanced", "aggressive"}: | |
| mode = "balanced" | |
| cfg = { | |
| "conservative": {"max_score_drop": 0.0, "max_title_drop": 0.02}, | |
| "balanced": {"max_score_drop": 1.0, "max_title_drop": 0.03}, | |
| "aggressive": {"max_score_drop": 2.0, "max_title_drop": 0.05}, | |
| }[mode] | |
| reasons = [] | |
| score_drop = float(prev_metrics["score"]) - float(next_metrics["score"]) | |
| if score_drop > cfg["max_score_drop"]: | |
| reasons.append(f"score_drop>{cfg['max_score_drop']}") | |
| # Hard regressions in critical counters. | |
| if next_metrics["bm25_remove_count"] > prev_metrics["bm25_remove_count"] + (1 if mode == "aggressive" else 0): | |
| reasons.append("bm25_remove_regression") | |
| if next_metrics["bert_low_count"] > prev_metrics["bert_low_count"] + (1 if mode == "aggressive" else 0): | |
| reasons.append("bert_low_regression") | |
| if next_metrics["semantic_gap_count"] > prev_metrics["semantic_gap_count"] + (1 if mode == "aggressive" else 0): | |
| reasons.append("semantic_gap_regression") | |
| prev_title = prev_metrics.get("title_bert_score") | |
| next_title = next_metrics.get("title_bert_score") | |
| if prev_title is not None and next_title is not None and next_title < (prev_title - cfg["max_title_drop"]): | |
| reasons.append("title_bert_drop") | |
| improved = _goal_improved(goal_type, goal_label, prev_metrics, next_metrics) | |
| # In conservative mode require explicit goal improvement. | |
| if mode == "conservative" and not improved: | |
| reasons.append("goal_not_improved") | |
| return (len(reasons) == 0), reasons, improved | |
| def optimize_text(request_data: Dict[str, Any]) -> Dict[str, Any]: | |
| target_text = str(request_data.get("target_text", "")).strip() | |
| competitors = [str(x) for x in (request_data.get("competitors") or []) if str(x).strip()] | |
| keywords = [str(x) for x in (request_data.get("keywords") or []) if str(x).strip()] | |
| language = str(request_data.get("language", "en")).strip() or "en" | |
| target_title = str(request_data.get("target_title", "") or "") | |
| competitor_titles = [str(x) for x in (request_data.get("competitor_titles") or [])] | |
| api_key = str(request_data.get("api_key", "")).strip() | |
| if not api_key: | |
| raise ValueError("API key is required.") | |
| base_url = str(request_data.get("api_base_url", "https://api.deepseek.com/v1")).strip() or "https://api.deepseek.com/v1" | |
| model = str(request_data.get("model", "deepseek-chat")).strip() or "deepseek-chat" | |
| max_iterations = int(request_data.get("max_iterations", 2) or 2) | |
| max_iterations = max(1, min(8, max_iterations)) | |
| candidates_per_iteration = int(request_data.get("candidates_per_iteration", 2) or 2) | |
| candidates_per_iteration = max(1, min(5, candidates_per_iteration)) | |
| temperature = float(request_data.get("temperature", 0.25) or 0.25) | |
| optimization_mode = str(request_data.get("optimization_mode", "balanced") or "balanced") | |
| baseline_analysis = _build_analysis_snapshot( | |
| target_text, competitors, keywords, language, target_title, competitor_titles | |
| ) | |
| baseline_semantic = _build_semantic_snapshot(target_text, competitors, language) | |
| baseline_metrics = _compute_metrics(baseline_analysis, baseline_semantic, keywords, language) | |
| current_text = target_text | |
| current_analysis = baseline_analysis | |
| current_semantic = baseline_semantic | |
| current_metrics = baseline_metrics | |
| logs: List[Dict[str, Any]] = [] | |
| applied_changes = 0 | |
| seen_candidate_rewrites = set() | |
| cascade_level = 1 | |
| consecutive_failures = 0 | |
| goal_attempt_cursor: Dict[str, int] = {} | |
| attempted_spans = set() | |
| queued_candidates: List[Dict[str, Any]] = [] | |
| for step in range(max_iterations): | |
| goal = _choose_optimization_goal(current_analysis, current_semantic, keywords, language) | |
| if goal["type"] == "none": | |
| logs.append({"step": step + 1, "status": "stopped", "reason": "No optimization goals left."}) | |
| break | |
| sentences = _split_sentences(current_text) | |
| if not sentences: | |
| logs.append({"step": step + 1, "status": "stopped", "reason": "No sentences available for editing."}) | |
| break | |
| goal_key = f"{goal.get('type', '')}:{goal.get('label', '')}".strip().lower() | |
| base_attempt_cursor = int(goal_attempt_cursor.get(goal_key, 0)) | |
| span_trials = 2 if cascade_level <= 2 else 3 | |
| local_candidates = candidates_per_iteration if cascade_level <= 2 else min(6, candidates_per_iteration + 1) | |
| candidates: List[Dict[str, Any]] = [] | |
| chosen_spans: List[Dict[str, Any]] = [] | |
| candidate_idx = 0 | |
| for st in range(span_trials): | |
| attempt_cursor = base_attempt_cursor + st | |
| operation, span_start, span_end, sent_idx, span_variant = _choose_edit_span( | |
| sentences, goal, language, cascade_level, attempt_cursor | |
| ) | |
| max_span_retries = max(1, len(sentences) * 4) | |
| retries = 0 | |
| while retries < max_span_retries: | |
| span_key = (goal_key, cascade_level, operation, span_start, span_end) | |
| if span_key not in attempted_spans: | |
| attempted_spans.add(span_key) | |
| break | |
| attempt_cursor += 1 | |
| operation, span_start, span_end, sent_idx, span_variant = _choose_edit_span( | |
| sentences, goal, language, cascade_level, attempt_cursor | |
| ) | |
| retries += 1 | |
| original_span_text = " ".join(sentences[span_start : span_end + 1]).strip() | |
| context_before = " ".join(sentences[max(0, span_start - 2) : span_start]).strip() | |
| context_after = " ".join(sentences[span_end + 1 : min(len(sentences), span_end + 3)]).strip() | |
| chosen_spans.append( | |
| { | |
| "operation": operation, | |
| "span_start": span_start, | |
| "span_end": span_end, | |
| "sentence_index": sent_idx, | |
| "span_variant": span_variant, | |
| "sentence_before": original_span_text, | |
| } | |
| ) | |
| per_span_candidates = max(1, local_candidates // span_trials) | |
| for ci in range(per_span_candidates): | |
| candidate_idx += 1 | |
| temp = min(1.1, max(0.0, temperature + (candidate_idx - 1) * 0.07)) | |
| try: | |
| llm_result = _llm_edit_chunk( | |
| api_key=api_key, | |
| base_url=base_url, | |
| model=model, | |
| language=language, | |
| full_text=current_text, | |
| chunk_text=original_span_text, | |
| operation=operation, | |
| context_before=context_before, | |
| context_after=context_after, | |
| cascade_level=cascade_level, | |
| goal_type=goal["type"], | |
| goal_label=goal["label"], | |
| focus_terms=goal["focus_terms"], | |
| avoid_terms=goal["avoid_terms"], | |
| temperature=temp, | |
| ) | |
| edited_text = str((llm_result or {}).get("edited_text", "")).strip() | |
| prompt_debug = (llm_result or {}).get("prompt_debug", {}) | |
| if not edited_text or edited_text == original_span_text: | |
| continue | |
| quality_issues = _validate_candidate_text(edited_text, cascade_level, operation) | |
| before_rel, after_rel = _chunk_relevance_pair( | |
| original_span_text, | |
| edited_text, | |
| goal["type"], | |
| goal["label"], | |
| goal.get("focus_terms", []) or [], | |
| language, | |
| ) | |
| chunk_delta = _chunk_goal_delta( | |
| original_span_text, | |
| edited_text, | |
| goal["type"], | |
| goal["label"], | |
| goal.get("focus_terms", []) or [], | |
| language, | |
| ) | |
| local_chunk_improved = chunk_delta >= _min_chunk_delta(goal["type"]) | |
| if quality_issues: | |
| candidates.append( | |
| { | |
| "candidate_index": candidate_idx, | |
| "error": "quality_validation_failed", | |
| "valid": False, | |
| "goal_improved": False, | |
| "local_chunk_improved": local_chunk_improved, | |
| "chunk_goal_delta": chunk_delta, | |
| "invalid_reasons": quality_issues, | |
| "delta_score": -999.0, | |
| "candidate_score": None, | |
| "sentence_after": edited_text, | |
| "chunk_relevance_before": before_rel, | |
| "chunk_relevance_after": after_rel, | |
| "term_diff": _term_diff(original_span_text, edited_text, language), | |
| "llm_prompt_debug": prompt_debug, | |
| "operation": operation, | |
| "sentence_index": sent_idx, | |
| "span_start": span_start, | |
| "span_end": span_end, | |
| "span_variant": span_variant, | |
| "sentence_before": original_span_text, | |
| } | |
| ) | |
| continue | |
| candidate_key = (operation, span_start, span_end, edited_text.strip().lower()) | |
| if candidate_key in seen_candidate_rewrites: | |
| candidates.append( | |
| { | |
| "candidate_index": candidate_idx, | |
| "error": "duplicate_candidate_rewrite", | |
| "valid": False, | |
| "goal_improved": False, | |
| "local_chunk_improved": local_chunk_improved, | |
| "chunk_goal_delta": chunk_delta, | |
| "invalid_reasons": ["duplicate_candidate_rewrite"], | |
| "delta_score": -999.0, | |
| "candidate_score": None, | |
| "chunk_relevance_before": before_rel, | |
| "chunk_relevance_after": after_rel, | |
| "term_diff": _term_diff(original_span_text, edited_text, language), | |
| "llm_prompt_debug": prompt_debug, | |
| "operation": operation, | |
| "sentence_index": sent_idx, | |
| "span_start": span_start, | |
| "span_end": span_end, | |
| "span_variant": span_variant, | |
| "sentence_before": original_span_text, | |
| } | |
| ) | |
| continue | |
| seen_candidate_rewrites.add(candidate_key) | |
| if operation == "insert": | |
| candidate_sentences = _insert_after(sentences, span_end, edited_text) | |
| else: | |
| candidate_sentences = _replace_span(sentences, span_start, span_end, edited_text) | |
| candidate_text = " ".join(candidate_sentences).strip() | |
| cand_analysis = _build_analysis_snapshot( | |
| candidate_text, competitors, keywords, language, target_title, competitor_titles | |
| ) | |
| cand_semantic = _build_semantic_snapshot(candidate_text, competitors, language) | |
| cand_metrics = _compute_metrics(cand_analysis, cand_semantic, keywords, language) | |
| valid, invalid_reasons, goal_improved = _is_candidate_valid( | |
| current_metrics, cand_metrics, goal["type"], goal["label"], optimization_mode | |
| ) | |
| delta_score = round(cand_metrics["score"] - current_metrics["score"], 3) | |
| bert_phrase_delta = _bert_phrase_delta(goal["label"], current_metrics, cand_metrics) if goal.get("type") == "bert" else 0.0 | |
| md = _metrics_delta(current_metrics, cand_metrics) | |
| candidates.append( | |
| { | |
| "candidate_index": candidate_idx, | |
| "sentence_before": original_span_text, | |
| "sentence_after": edited_text, | |
| "operation": operation, | |
| "sentence_index": sent_idx, | |
| "span_start": span_start, | |
| "span_end": span_end, | |
| "span_variant": span_variant, | |
| "text": candidate_text, | |
| "analysis": cand_analysis, | |
| "semantic": cand_semantic, | |
| "metrics": cand_metrics, | |
| "valid": valid, | |
| "goal_improved": goal_improved, | |
| "bert_phrase_delta": bert_phrase_delta, | |
| "local_chunk_improved": local_chunk_improved, | |
| "chunk_goal_delta": chunk_delta, | |
| "chunk_relevance_before": before_rel, | |
| "chunk_relevance_after": after_rel, | |
| "term_diff": _term_diff(original_span_text, edited_text, language), | |
| "llm_prompt_debug": prompt_debug, | |
| "invalid_reasons": invalid_reasons, | |
| "delta_score": delta_score, | |
| "candidate_score": cand_metrics.get("score"), | |
| "metrics_delta": md, | |
| "edit_payload": { | |
| "operation": operation, | |
| "span_start": span_start, | |
| "span_end": span_end, | |
| "edited_text": edited_text, | |
| }, | |
| } | |
| ) | |
| except Exception as e: | |
| candidates.append( | |
| { | |
| "candidate_index": candidate_idx, | |
| "error": str(e), | |
| "valid": False, | |
| "goal_improved": False, | |
| "local_chunk_improved": False, | |
| "chunk_goal_delta": -999.0, | |
| "invalid_reasons": [str(e)], | |
| "delta_score": -999.0, | |
| "candidate_score": None, | |
| "llm_prompt_debug": { | |
| "operation": operation, | |
| "cascade_level": cascade_level, | |
| "goal_type": goal.get("type"), | |
| "goal_label": goal.get("label"), | |
| }, | |
| "operation": operation, | |
| "sentence_index": sent_idx, | |
| "span_start": span_start, | |
| "span_end": span_end, | |
| "span_variant": span_variant, | |
| "sentence_before": original_span_text, | |
| } | |
| ) | |
| goal_attempt_cursor[goal_key] = base_attempt_cursor + span_trials | |
| primary_span = chosen_spans[0] if chosen_spans else {"operation": "-", "span_start": 0, "span_end": 0, "sentence_index": 0, "span_variant": 0, "sentence_before": ""} | |
| valid_raw_candidates = [c for c in candidates if c.get("valid")] | |
| valid_candidates = [ | |
| c | |
| for c in valid_raw_candidates | |
| if c.get("valid") | |
| and ( | |
| c.get("goal_improved") | |
| or (goal.get("type") == "bert" and float(c.get("bert_phrase_delta") or 0.0) > 0.0) | |
| or float(c.get("candidate_score") or -1) > float(current_metrics["score"]) | |
| ) | |
| and ( | |
| goal.get("type") != "bert" | |
| or float(c.get("bert_phrase_delta") or 0.0) > 0.0 | |
| or c.get("local_chunk_improved") | |
| ) | |
| ] | |
| if not valid_candidates: | |
| # Local-first accumulation mode: | |
| # if we have guardrail-valid candidates that improve chunk relevance, | |
| # apply the strongest local edit immediately and continue optimizing next chunks. | |
| local_progress_candidates = [ | |
| c | |
| for c in valid_raw_candidates | |
| if c.get("local_chunk_improved") | |
| ] | |
| if local_progress_candidates: | |
| best_local = sorted( | |
| local_progress_candidates, | |
| key=lambda c: ( | |
| float(c.get("chunk_goal_delta") or 0.0), | |
| float(c.get("bert_phrase_delta") or 0.0), | |
| float(c.get("candidate_score") or -999.0), | |
| ), | |
| reverse=True, | |
| )[0] | |
| prev_metrics = current_metrics | |
| current_text = best_local["text"] | |
| current_analysis = best_local["analysis"] | |
| current_semantic = best_local["semantic"] | |
| current_metrics = best_local["metrics"] | |
| applied_changes += 1 | |
| queued_candidates = [] | |
| logs.append( | |
| { | |
| "step": step + 1, | |
| "status": "applied_local_progress", | |
| "goal": goal, | |
| "cascade_level": cascade_level, | |
| "operation": best_local.get("operation"), | |
| "sentence_index": best_local.get("sentence_index"), | |
| "span_start": best_local.get("span_start"), | |
| "span_end": best_local.get("span_end"), | |
| "span_variant": best_local.get("span_variant"), | |
| "sentence_before": best_local.get("sentence_before"), | |
| "sentence_after": best_local.get("sentence_after"), | |
| "reason": "Applied best local-improvement candidate despite no immediate global gain.", | |
| "current_score": prev_metrics.get("score"), | |
| "metrics_before": prev_metrics, | |
| "metrics_after": current_metrics, | |
| "delta_score": round(float(current_metrics.get("score", 0)) - float(prev_metrics.get("score", 0)), 3), | |
| "chosen_candidate_index": best_local.get("candidate_index"), | |
| "chosen_chunk_goal_delta": best_local.get("chunk_goal_delta"), | |
| "chosen_bert_phrase_delta": best_local.get("bert_phrase_delta"), | |
| "chosen_metrics_delta": best_local.get("metrics_delta"), | |
| "candidates": [ | |
| { | |
| "candidate_index": c.get("candidate_index"), | |
| "valid": c.get("valid", False), | |
| "goal_improved": c.get("goal_improved", False), | |
| "bert_phrase_delta": c.get("bert_phrase_delta"), | |
| "local_chunk_improved": c.get("local_chunk_improved", False), | |
| "chunk_goal_delta": c.get("chunk_goal_delta"), | |
| "chunk_relevance_before": c.get("chunk_relevance_before"), | |
| "chunk_relevance_after": c.get("chunk_relevance_after"), | |
| "term_diff": c.get("term_diff"), | |
| "llm_prompt_debug": c.get("llm_prompt_debug"), | |
| "metrics_delta": c.get("metrics_delta"), | |
| "invalid_reasons": c.get("invalid_reasons", []), | |
| "delta_score": c.get("delta_score"), | |
| "candidate_score": c.get("candidate_score"), | |
| "sentence_after": c.get("sentence_after"), | |
| "error": c.get("error"), | |
| } | |
| for c in candidates | |
| ], | |
| } | |
| ) | |
| consecutive_failures = 0 | |
| cascade_level = 1 | |
| goal_attempt_cursor[goal_key] = base_attempt_cursor + 1 | |
| continue | |
| local_pool = [ | |
| c | |
| for c in candidates | |
| if c.get("local_chunk_improved") | |
| and c.get("edit_payload") | |
| and c.get("candidate_score") is not None | |
| ] | |
| local_pool.sort( | |
| key=lambda c: ( | |
| float(c.get("chunk_goal_delta") or -999.0), | |
| float(c.get("candidate_score") or -999.0), | |
| ), | |
| reverse=True, | |
| ) | |
| for c in local_pool[:4]: | |
| queue_key = ( | |
| goal_key, | |
| c.get("operation"), | |
| c.get("span_start"), | |
| c.get("span_end"), | |
| str((c.get("sentence_after") or "")).strip().lower(), | |
| ) | |
| if not any(x.get("queue_key") == queue_key for x in queued_candidates): | |
| queued_candidates.append({"queue_key": queue_key, "candidate": c}) | |
| batch_applied = False | |
| batch_info: Dict[str, Any] = {} | |
| if len(queued_candidates) >= 2: | |
| pool = [x["candidate"] for x in queued_candidates[:6]] | |
| combos = _non_conflicting_edit_combos(pool, min_size=2, max_size=4) | |
| best_batch: Optional[Dict[str, Any]] = None | |
| prev_metrics = current_metrics | |
| for combo in combos: | |
| edits = [c.get("edit_payload") for c in combo if c.get("edit_payload")] | |
| if len(edits) != len(combo): | |
| continue | |
| batch_sentences = _apply_edits_to_sentences(sentences, edits) | |
| batch_text = " ".join(batch_sentences).strip() | |
| batch_analysis = _build_analysis_snapshot( | |
| batch_text, competitors, keywords, language, target_title, competitor_titles | |
| ) | |
| batch_semantic = _build_semantic_snapshot(batch_text, competitors, language) | |
| batch_metrics = _compute_metrics(batch_analysis, batch_semantic, keywords, language) | |
| b_valid, b_reasons, b_goal = _is_candidate_valid( | |
| current_metrics, batch_metrics, goal["type"], goal["label"], optimization_mode | |
| ) | |
| b_delta = round(batch_metrics["score"] - current_metrics["score"], 3) | |
| local_sum = sum(float(c.get("chunk_goal_delta") or 0.0) for c in combo) | |
| if not (b_valid and (b_goal or b_delta > 0)): | |
| continue | |
| if goal.get("type") == "bert" and local_sum < (_min_chunk_delta("bert") * len(combo)): | |
| continue | |
| cand = { | |
| "combo": combo, | |
| "batch_text": batch_text, | |
| "batch_analysis": batch_analysis, | |
| "batch_semantic": batch_semantic, | |
| "batch_metrics": batch_metrics, | |
| "b_delta": b_delta, | |
| "local_sum": local_sum, | |
| "b_reasons": b_reasons, | |
| "b_goal": b_goal, | |
| } | |
| if best_batch is None or ( | |
| cand["b_delta"], | |
| cand["local_sum"], | |
| len(cand["combo"]), | |
| ) > ( | |
| best_batch["b_delta"], | |
| best_batch["local_sum"], | |
| len(best_batch["combo"]), | |
| ): | |
| best_batch = cand | |
| if best_batch: | |
| current_text = best_batch["batch_text"] | |
| current_analysis = best_batch["batch_analysis"] | |
| current_semantic = best_batch["batch_semantic"] | |
| current_metrics = best_batch["batch_metrics"] | |
| applied_changes += 1 | |
| batch_applied = True | |
| batch_info = { | |
| "status": "applied_batch", | |
| "batch_candidate_ids": [c.get("candidate_index") for c in best_batch["combo"]], | |
| "batch_size": len(best_batch["combo"]), | |
| "batch_local_chunk_delta_sum": round(best_batch["local_sum"], 4), | |
| "delta_score": best_batch["b_delta"], | |
| "metrics_before": prev_metrics, | |
| "metrics_after": best_batch["batch_metrics"], | |
| "metrics_delta": _metrics_delta(prev_metrics, best_batch["batch_metrics"]), | |
| } | |
| queued_candidates = [] | |
| consecutive_failures = 0 | |
| cascade_level = 1 | |
| goal_attempt_cursor[goal_key] = 0 | |
| if not batch_applied: | |
| batch_info = { | |
| "status": "batch_rejected", | |
| "reason": "Queued local improvements could not pass global constraints together.", | |
| } | |
| if batch_applied: | |
| logs.append( | |
| { | |
| "step": step + 1, | |
| "status": "applied_batch", | |
| "goal": goal, | |
| "cascade_level": cascade_level, | |
| "operation": "batch", | |
| "sentence_index": primary_span.get("sentence_index"), | |
| "span_start": primary_span.get("span_start"), | |
| "span_end": primary_span.get("span_end"), | |
| "span_variant": primary_span.get("span_variant"), | |
| "sentence_before": primary_span.get("sentence_before"), | |
| "current_score": (batch_info.get("metrics_before") or {}).get("score"), | |
| "metrics_before": batch_info.get("metrics_before"), | |
| "metrics_after": current_metrics, | |
| "reason": "Applied queued local-improvement edits as a batch.", | |
| "batch_info": batch_info, | |
| "candidates": [ | |
| { | |
| "candidate_index": c.get("candidate_index"), | |
| "valid": c.get("valid", False), | |
| "goal_improved": c.get("goal_improved", False), | |
| "bert_phrase_delta": c.get("bert_phrase_delta"), | |
| "local_chunk_improved": c.get("local_chunk_improved", False), | |
| "chunk_goal_delta": c.get("chunk_goal_delta"), | |
| "chunk_relevance_before": c.get("chunk_relevance_before"), | |
| "chunk_relevance_after": c.get("chunk_relevance_after"), | |
| "term_diff": c.get("term_diff"), | |
| "llm_prompt_debug": c.get("llm_prompt_debug"), | |
| "metrics_delta": c.get("metrics_delta"), | |
| "invalid_reasons": c.get("invalid_reasons", []), | |
| "delta_score": c.get("delta_score"), | |
| "candidate_score": c.get("candidate_score"), | |
| "sentence_after": c.get("sentence_after"), | |
| "error": c.get("error"), | |
| } | |
| for c in candidates | |
| ], | |
| } | |
| ) | |
| continue | |
| logs.append( | |
| { | |
| "step": step + 1, | |
| "status": "rejected", | |
| "goal": goal, | |
| "cascade_level": cascade_level, | |
| "operation": primary_span.get("operation"), | |
| "sentence_index": primary_span.get("sentence_index"), | |
| "span_start": primary_span.get("span_start"), | |
| "span_end": primary_span.get("span_end"), | |
| "span_variant": primary_span.get("span_variant"), | |
| "sentence_before": primary_span.get("sentence_before"), | |
| "current_score": current_metrics["score"], | |
| "reason": ( | |
| "No valid candidate satisfied constraints." | |
| if not valid_raw_candidates | |
| else "Valid candidates existed but none improved goal or total score." | |
| ), | |
| "valid_candidates_count": len(valid_raw_candidates), | |
| "promotable_candidates_count": len(valid_candidates), | |
| "queued_local_candidates": len(local_pool), | |
| "queued_total": len(queued_candidates), | |
| "batch_info": batch_info if batch_info else None, | |
| "candidates": [ | |
| { | |
| "candidate_index": c.get("candidate_index"), | |
| "valid": c.get("valid", False), | |
| "goal_improved": c.get("goal_improved", False), | |
| "bert_phrase_delta": c.get("bert_phrase_delta"), | |
| "local_chunk_improved": c.get("local_chunk_improved", False), | |
| "chunk_goal_delta": c.get("chunk_goal_delta"), | |
| "chunk_relevance_before": c.get("chunk_relevance_before"), | |
| "chunk_relevance_after": c.get("chunk_relevance_after"), | |
| "term_diff": c.get("term_diff"), | |
| "llm_prompt_debug": c.get("llm_prompt_debug"), | |
| "metrics_delta": c.get("metrics_delta"), | |
| "invalid_reasons": c.get("invalid_reasons", []), | |
| "delta_score": c.get("delta_score"), | |
| "candidate_score": c.get("candidate_score"), | |
| "sentence_after": c.get("sentence_after"), | |
| "error": c.get("error"), | |
| } | |
| for c in candidates | |
| ], | |
| } | |
| ) | |
| consecutive_failures += 1 | |
| if consecutive_failures >= 2 and cascade_level < 4: | |
| cascade_level += 1 | |
| consecutive_failures = 0 | |
| logs[-1]["escalated_to_level"] = cascade_level | |
| continue | |
| best = sorted( | |
| valid_candidates, | |
| key=lambda c: ( | |
| 1 if c.get("goal_improved") else 0, | |
| float(c.get("bert_phrase_delta") or 0.0), | |
| float(c.get("chunk_goal_delta") or 0.0), | |
| c["metrics"]["score"], | |
| ), | |
| reverse=True, | |
| )[0] | |
| prev_metrics = current_metrics | |
| current_text = best["text"] | |
| current_analysis = best["analysis"] | |
| current_semantic = best["semantic"] | |
| current_metrics = best["metrics"] | |
| applied_changes += 1 | |
| queued_candidates = [] | |
| logs.append( | |
| { | |
| "step": step + 1, | |
| "status": "applied", | |
| "goal": goal, | |
| "cascade_level": cascade_level, | |
| "operation": best.get("operation"), | |
| "sentence_index": best.get("sentence_index"), | |
| "span_start": best.get("span_start"), | |
| "span_end": best.get("span_end"), | |
| "span_variant": best.get("span_variant"), | |
| "sentence_before": best.get("sentence_before"), | |
| "sentence_after": best["sentence_after"], | |
| "current_score": prev_metrics["score"], | |
| "metrics_before": prev_metrics, | |
| "metrics_after": current_metrics, | |
| "delta_score": round(current_metrics["score"] - prev_metrics["score"], 3), | |
| "chosen_candidate_index": best.get("candidate_index"), | |
| "candidates": [ | |
| { | |
| "candidate_index": c.get("candidate_index"), | |
| "valid": c.get("valid", False), | |
| "goal_improved": c.get("goal_improved", False), | |
| "bert_phrase_delta": c.get("bert_phrase_delta"), | |
| "local_chunk_improved": c.get("local_chunk_improved", False), | |
| "chunk_goal_delta": c.get("chunk_goal_delta"), | |
| "chunk_relevance_before": c.get("chunk_relevance_before"), | |
| "chunk_relevance_after": c.get("chunk_relevance_after"), | |
| "term_diff": c.get("term_diff"), | |
| "llm_prompt_debug": c.get("llm_prompt_debug"), | |
| "metrics_delta": c.get("metrics_delta"), | |
| "invalid_reasons": c.get("invalid_reasons", []), | |
| "delta_score": c.get("delta_score"), | |
| "candidate_score": c.get("candidate_score"), | |
| "sentence_after": c.get("sentence_after"), | |
| "error": c.get("error"), | |
| } | |
| for c in candidates | |
| ], | |
| } | |
| ) | |
| # After successful edit, return to cheapest level and reset failure streak. | |
| consecutive_failures = 0 | |
| cascade_level = 1 | |
| goal_attempt_cursor[goal_key] = 0 | |
| return { | |
| "ok": True, | |
| "optimized_text": current_text, | |
| "baseline_metrics": baseline_metrics, | |
| "final_metrics": current_metrics, | |
| "iterations": logs, | |
| "applied_changes": applied_changes, | |
| "optimization_mode": optimization_mode, | |
| } | |