ai-seo-analyzer / logic.py
lsdf's picture
Add Title analyzer: length, ngrams, keyword coverage, BERT semantic
e3802f1
raw
history blame
27.6 kB
import spacy
from collections import Counter
from typing import List, Dict
import numpy as np
from rank_bm25 import BM25Okapi
import re
# Новые импорты для BERT
import torch
from sentence_transformers import SentenceTransformer, util
# --- Глобальные переменные ---
LoadedModels = {} # spaCy модели
BertModel = None # BERT модель (одна на все языки)
MODEL_NAMES = {
"en": "en_core_web_sm",
"ru": "ru_core_news_sm",
"de": "de_core_news_sm",
"es": "es_core_news_sm",
"it": "it_core_news_sm"
}
# --- SPACY (Лингвистика) ---
def load_model_if_missing(lang: str):
if lang in LoadedModels: return
model_name = MODEL_NAMES.get(lang)
if not model_name: return
print(f"⏳ Loading spaCy model for {lang}...")
try:
LoadedModels[lang] = spacy.load(model_name)
print(f"✅ Loaded spaCy: {lang}")
except Exception as e:
print(f"❌ Failed to load spaCy {lang}: {e}")
def load_models():
"""
Функция для предзагрузки всех моделей при старте (вызывается из main.py).
"""
print("🚀 Pre-loading all spaCy models...")
for lang in MODEL_NAMES.keys():
load_model_if_missing(lang)
def get_doc(text: str, lang: str):
load_model_if_missing(lang)
nlp = LoadedModels.get(lang)
if not nlp:
load_model_if_missing("en")
nlp = LoadedModels.get("en")
if not nlp: raise RuntimeError("No NLP models loaded.")
nlp.max_length = 2000000
return nlp(text.lower())
# --- НОВАЯ ФУНКЦИЯ ФИЛЬТРАЦИИ ---
def is_valid_token(t):
"""
Проверяет, является ли токен полезным словом.
Исправленная версия: не удаляет слова из букв, даже если AI пометил их как символы.
"""
# 1. Базовые проверки spaCy (Стоп-слова, пунктуация, пробелы)
if t.is_stop or t.is_punct or t.is_space:
return False
# 2. Числа (удаляем "18", "2023", "5")
if t.is_digit or t.like_num:
return False
# 3. СИМВОЛЫ (ИСПРАВЛЕНИЕ)
# Если spaCy говорит, что это символ (SYM), мы верим, ТОЛЬКО если это не буквы.
# Это спасет слова типа "cross", "apk", "bet", которые могут быть ложно помечены.
if t.pos_ == "SYM" and not t.text.isalpha():
return False
# 4. Дополнительная страховка (явный мусор)
garbage_chars = {'|', '+', '-', '—', '–', '>', '<', '=', '/', '\\', '★', '▶', '●', '•', '€', '$', '£'}
if t.text.strip() in garbage_chars:
return False
# 5. Длина: Удаляем одиночные буквы, которые не являются словами
# (опционально, но помогает чистить мусор типа "v", "s" если они не стоп-слова)
if len(t.text) == 1 and not t.text.isalpha():
return False
return True
def get_lemmas_flat(text: str, lang: str) -> List[str]:
"""
Возвращает плоский список лемм для всего текста (нужен для BM25).
"""
if not text: return []
doc = get_doc(text, lang)
# Используем нашу новую функцию фильтрации
return [t.lemma_ for t in doc if is_valid_token(t)]
def generate_ngrams_safe(text: str, lang: str, n: int) -> List[str]:
"""
Генерирует n-граммы.
ЛОГИКА: Smart Window (Та самая, которая работает идеально).
1. Считает N-граммой последовательность из N ЗНАЧИМЫХ слов.
2. Сохраняет стоп-слова ВНУТРИ фразы (для читаемости: "bevor sie mit echtem geld").
3. Игнорирует стоп-слова ПО КРАЯМ.
"""
if not text: return []
# ЕДИНСТВЕННОЕ ИЗМЕНЕНИЕ: Заменяем дефисы на пробелы, чтобы разлепить "Casino-Websites"
text = re.sub(r'[\-\–\—\/]', ' ', text)
# Дальше - ТА САМАЯ логика, которая работала идеально
clean_text = " ".join(text.split())
doc = get_doc(clean_text, lang)
all_ngrams = []
garbage_chars = {'|', '+', '-', '—', '–', '>', '<', '=', '/', '\\', '★', '▶', '●', '•', '€', '$', '£', '™', '®', '«', '»', '"', '(', ')', '[', ']', '!', '?', '.', ',', ':', ';'}
for sent in doc.sents:
# Разбираем предложение на токены
tokens = []
for t in sent:
# Стена
is_wall = (
t.is_punct or t.is_space or t.pos_ == "SYM" or
t.is_digit or t.like_num or
t.text.strip() in garbage_chars
)
if is_wall:
tokens.append(None) # Маркер разрыва
else:
tokens.append({
"text": t.text.lower(),
"is_stop": t.is_stop
})
# Ищем последовательности (Smart Window)
for i in range(len(tokens)):
# Начало фразы не может быть стоп-словом или стеной
if tokens[i] is None or tokens[i]["is_stop"]:
continue
current_phrase = []
significant_count = 0
for j in range(i, len(tokens)):
token = tokens[j]
if token is None: # Стена - конец фразы
break
current_phrase.append(token["text"])
# Если не стоп-слово, увеличиваем счетчик.
# Если стоп-слово - просто добавляем в current_phrase, но счетчик стоит.
# Это сохраняет "sie mit" внутри фразы.
if not token["is_stop"]:
significant_count += 1
# Если набрали N значимых слов
if significant_count == n:
ngram_str = " ".join(current_phrase)
all_ngrams.append(ngram_str)
break
return all_ngrams
# --- WORD COUNT ---
def count_words(text: str, lang: str) -> Dict[str, int]:
"""
Считает слова. ПОЛНОСТЬЮ ИЗОЛИРОВАННАЯ ЛОГИКА.
Не зависит от N-грамм или BM25.
"""
if not text.strip():
return {"total": 0, "significant": 0}
# 1. Получаем чистый объект spaCy (без всяких re.sub и вырезаний)
# Просто убираем лишние пробелы/энтеры
clean_text = " ".join(text.split())
doc = get_doc(clean_text, lang)
total_count = 0
significant_count = 0
for t in doc:
# --- ФИЛЬТР ДЛЯ ОБЩЕГО КОЛИЧЕСТВА ---
# Считаем всё, кроме пунктуации и пробелов
if t.is_punct or t.is_space:
continue
total_count += 1
# --- ФИЛЬТР ДЛЯ ЗНАЧИМЫХ СЛОВ ---
# Если это стоп-слово (der, die, das...) - не считаем в значимые
if t.is_stop:
continue
# Если это цифра - не считаем
if t.is_digit or t.like_num:
continue
# Если это спецсимвол - не считаем
if t.pos_ == "SYM":
continue
significant_count += 1
return {"total": total_count, "significant": significant_count}
# --- ANALYTICS (N-grams & BM25) ---
def calculate_ngram_stats(target_text: str, competitor_texts: List[str], lang: str) -> Dict:
stats = {}
valid_competitors = [t for t in competitor_texts if t.strip()]
for n in range(1, 5):
key = {1: "unigrams", 2: "bigrams", 3: "trigrams", 4: "quadgrams"}[n]
target_ngrams = generate_ngrams_safe(target_text, lang, n)
target_counts = Counter(target_ngrams)
comp_counters = []
all_comp_ngrams_combined = []
for t in valid_competitors:
c_ngrams = generate_ngrams_safe(t, lang, n)
comp_counters.append(Counter(c_ngrams))
all_comp_ngrams_combined.extend(c_ngrams)
comp_counts_total = Counter(all_comp_ngrams_combined)
all_unique = set(target_counts.keys()) | set(comp_counts_total.keys())
ngram_data = []
num_competitors = max(len(valid_competitors), 1)
for ngram in all_unique:
cnt_target = target_counts.get(ngram, 0)
cnt_total_comp = comp_counts_total.get(ngram, 0)
avg_comp = round(cnt_total_comp / num_competitors, 1)
detailed_comp_counts = [c.get(ngram, 0) for c in comp_counters]
# Считаем, у скольких конкурентов встречается эта фраза (> 0)
comp_occurrence = sum(1 for count in detailed_comp_counts if count > 0)
if cnt_target > 0 or avg_comp >= 0.5 or any(c > 0 for c in detailed_comp_counts):
ngram_data.append({
"ngram": ngram,
"target_count": cnt_target,
"competitor_avg": avg_comp,
"competitor_detailed": detailed_comp_counts,
"comp_occurrence": comp_occurrence
})
ngram_data.sort(key=lambda x: max([x["target_count"]] + x["competitor_detailed"]), reverse=True)
stats[key] = ngram_data[:200]
return stats
def parse_keywords(raw_phrases: List[str], lang: str):
key_phrases = []
keywords = set()
for phrase in raw_phrases:
if not phrase.strip(): continue
lemmas = get_lemmas_flat(phrase, lang)
if lemmas:
key_phrases.append(phrase.strip()) # Для BERT храним исходную фразу, а не леммы!
for w in lemmas: keywords.add(w)
return list(key_phrases), list(keywords)
def calculate_bm25_recommendations(target_text: str, competitor_texts: List[str], raw_keywords: List[str], lang: str):
"""
BM25. Использует ТУ ЖЕ функцию генерации, чтобы видеть то же, что и N-граммы.
"""
if not target_text or not raw_keywords: return []
recommendations = []
analyzed_keys = []
seen_terms = set()
for phrase in raw_keywords:
if not phrase.strip(): continue
# Генерируем ключи той же функцией
for n in range(1, 4):
ngrams = generate_ngrams_safe(phrase, lang, n)
for term in ngrams:
if term not in seen_terms:
analyzed_keys.append({"n": n, "term": term, "original": phrase})
seen_terms.add(term)
for n in range(1, 4):
current_n_keys = [k['term'] for k in analyzed_keys if k['n'] == n]
if not current_n_keys: continue
target_ngrams = generate_ngrams_safe(target_text, lang, n)
comp_ngrams_list = []
for t in competitor_texts:
if t.strip():
comp_ngrams_list.append(generate_ngrams_safe(t, lang, n))
else:
comp_ngrams_list.append([])
corpus = [target_ngrams] + comp_ngrams_list
bm25 = BM25Okapi(corpus)
for term in current_n_keys:
scores = bm25.get_scores([term])
score_target = scores[0]
score_avg_comp = np.mean(scores[1:]) if len(scores) > 1 else 0
if n == 1: threshold = 0.5
elif n == 2: threshold = 0.25
else: threshold = 0.15
action = "ok"
count_rec = 0
if score_target < score_avg_comp - threshold:
action = "add"
factor = 0.5 if n == 1 else 0.4
count_rec = max(1, int((score_avg_comp - score_target) * factor))
elif score_target > score_avg_comp + threshold * 2:
action = "remove"
factor = 0.5
count_rec = max(1, int((score_target - score_avg_comp) * factor))
recommendations.append({
"word": term,
"type": f"{n}-gram",
"my_score": round(score_target, 2),
"avg_comp_score": round(score_avg_comp, 2),
"action": action,
"count": count_rec
})
recommendations.sort(key=lambda x: (0 if x["action"] != "ok" else 1, -len(x["word"].split()), x["word"]))
return recommendations
# --- BERT / VECTOR ANALYSIS ---
def get_bert_model():
"""Загружает BERT на GPU, если он доступен"""
global BertModel
if BertModel is None:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"🚀 Loading BERT model on {device}...")
# Используем легкую и мощную мультиязычную модель
BertModel = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2', device=device)
print("✅ BERT Loaded successfully.")
return BertModel
def perform_bert_analysis(target_text: str, competitor_texts: List[str], key_phrases: List[str], lang: str):
if not key_phrases:
return {"detailed": [], "global_scores": []}
model = get_bert_model()
# 1. Функция-помощник: Получить чанки и их эмбеддинги
def process_text(text):
if not text.strip(): return [], None
# Для BERT используем сырой текст (с абзацами), не пропуская через get_doc
# чтобы сохранить структуру текста и контекст предложений
load_model_if_missing(lang)
nlp = LoadedModels.get(lang)
if not nlp:
load_model_if_missing("en")
nlp = LoadedModels.get("en")
nlp.max_length = 2000000
doc = nlp(text) # Сырой текст без .lower()!
# Разбиваем на предложения > 10 символов
chunks = [sent.text.strip() for sent in doc.sents if len(sent.text.strip()) > 10]
if not chunks: return [], None
embeddings = model.encode(chunks, convert_to_tensor=True)
return chunks, embeddings
# 2. Обрабатываем Наш текст
target_chunks, target_emb = process_text(target_text)
# 3. Обрабатываем Конкурентов (сохраняем структуру)
competitors_data = []
for idx, comp_text in enumerate(competitor_texts):
chunks, emb = process_text(comp_text)
competitors_data.append({
"id": idx + 1,
"chunks": chunks,
"embeddings": emb
})
# Эмбеддинги ключей
keys_emb = model.encode(key_phrases, convert_to_tensor=True)
# --- РАСЧЕТ GLOBAL SCORE ---
# Global Score - это средний Max Score по всем ключевым словам.
# То есть, насколько хорошо текст покрывает ВСЕ ключи в среднем.
global_scores = []
# Считаем для нас
if target_emb is not None:
# Матрица [Key x Chunk]
sims = util.cos_sim(keys_emb, target_emb)
# Берем макс. сходство для каждого ключа (values), потом среднее по всем ключам
# torch.max возвращает (values, indices)
max_scores_per_key, _ = torch.max(sims, dim=1)
avg_relevance = torch.mean(max_scores_per_key).item()
global_scores.append({"name": "Мой текст", "score": round(avg_relevance, 3), "is_me": True})
else:
global_scores.append({"name": "Мой текст", "score": 0, "is_me": True})
# Считаем для конкурентов
for comp in competitors_data:
if comp["embeddings"] is not None:
sims = util.cos_sim(keys_emb, comp["embeddings"])
max_scores_per_key, _ = torch.max(sims, dim=1)
avg_relevance = torch.mean(max_scores_per_key).item()
global_scores.append({"name": f"Конкурент #{comp['id']}", "score": round(avg_relevance, 3), "is_me": False})
else:
global_scores.append({"name": f"Конкурент #{comp['id']}", "score": 0, "is_me": False})
# Сортируем глобальный рейтинг (победитель сверху)
global_scores.sort(key=lambda x: x["score"], reverse=True)
# --- ДЕТАЛЬНЫЙ АНАЛИЗ ПО ФРАЗАМ ---
detailed_results = []
for i, phrase in enumerate(key_phrases):
# 1. Анализ моего текста
my_top = []
my_max = 0
if target_emb is not None:
# Считаем снова локально или берем из матрицы (тут проще локально для чистоты кода)
# scores_target[i] уже посчитано выше в sims, но выше переменная sims переписывалась.
# Для надежности пересчитаем векторную близость для одной фразы (это мгновенно)
phrase_emb = keys_emb[i]
scores = util.cos_sim(phrase_emb, target_emb)[0] # вектор [chunks]
k = min(5, len(target_chunks))
vals, idxs = torch.topk(scores, k)
my_max = vals[0].item() if k > 0 else 0
for rank in range(k):
my_top.append({
"text": target_chunks[idxs[rank].item()],
"score": round(vals[rank].item(), 3)
})
# 2. Анализ конкурентов (Сборная солянка)
# Собираем все чанки всех конкурентов с их скорами и ID
all_comp_candidates = []
for comp in competitors_data:
if comp["embeddings"] is not None:
phrase_emb = keys_emb[i]
scores = util.cos_sim(phrase_emb, comp["embeddings"])[0]
# Берем топ-3 от каждого конкурента, чтобы добавить в общий пул
k = min(3, len(comp["chunks"]))
vals, idxs = torch.topk(scores, k)
for rank in range(k):
all_comp_candidates.append({
"text": comp["chunks"][idxs[rank].item()],
"score": vals[rank].item(),
"source": f"Конкурент #{comp['id']}" # <-- АТРИБУЦИЯ
})
# Сортируем общий пул конкурентов и берем ТОП-5 абсолютных лидеров
all_comp_candidates.sort(key=lambda x: x["score"], reverse=True)
comp_top_5 = all_comp_candidates[:5]
# Округляем скоры для вывода
for item in comp_top_5:
item["score"] = round(item["score"], 3)
comp_max = comp_top_5[0]["score"] if comp_top_5 else 0
# Статус
status = "ok"
rec = "Тема раскрыта хорошо."
if my_max < 0.5:
status = "bad"
rec = "Тема не раскрыта."
elif comp_max > my_max + 0.1:
status = "warning"
rec = "Конкуренты раскрыли тему заметно лучше."
elif my_max >= 0.7:
status = "good"
rec = "Отлично."
detailed_results.append({
"phrase": phrase,
"my_max_score": round(my_max, 2),
"comp_max_score": round(comp_max, 2),
"status": status,
"recommendation": rec,
"my_top_chunks": my_top,
"comp_top_chunks": comp_top_5
})
return {
"global_scores": global_scores,
"detailed": detailed_results
}
# --- TITLE ANALYZER ---
def analyze_title(target_title: str, competitor_titles: List[str], raw_keywords: List[str], lang: str) -> Dict:
"""
Orchestrator: анализирует Title тег.
Возвращает словарь с 4 блоками:
- length: длина в символах и словах
- ngrams: частотный анализ (uni/bi) Target vs конкуренты
- keyword_coverage: покрытие ключевых фраз в Title
- bert: семантическая близость Title к ключам
"""
if not target_title.strip():
return {}
valid_comp_titles = [t for t in competitor_titles if t.strip()]
result = {
"target_title": target_title.strip(),
"competitor_titles": [t.strip() for t in valid_comp_titles],
"length": _title_length(target_title, valid_comp_titles, lang),
"ngrams": _title_ngrams(target_title, valid_comp_titles, lang),
"keyword_coverage": _title_keyword_coverage(target_title, valid_comp_titles, raw_keywords, lang),
"bert": _title_bert(target_title, valid_comp_titles, raw_keywords, lang),
}
return result
def _title_length(target: str, comp_titles: List[str], lang: str) -> Dict:
target_chars = len(target.strip())
target_wc = count_words(target, lang)
comp_data = []
for t in comp_titles:
comp_data.append({
"chars": len(t.strip()),
"words": count_words(t, lang)
})
avg_chars = round(sum(c["chars"] for c in comp_data) / max(len(comp_data), 1))
avg_total = round(sum(c["words"]["total"] for c in comp_data) / max(len(comp_data), 1))
status = "ok"
if target_chars > 60:
status = "too_long"
elif target_chars < 30:
status = "too_short"
return {
"target_chars": target_chars,
"target_words": target_wc,
"comp_data": comp_data,
"avg_chars": avg_chars,
"avg_words_total": avg_total,
"status": status
}
def _title_ngrams(target: str, comp_titles: List[str], lang: str) -> Dict:
stats = {}
for n in [1, 2]:
key = "unigrams" if n == 1 else "bigrams"
target_ngrams = generate_ngrams_safe(target, lang, n)
target_counts = Counter(target_ngrams)
comp_counters = []
all_comp_combined = []
for t in comp_titles:
c_ng = generate_ngrams_safe(t, lang, n)
comp_counters.append(Counter(c_ng))
all_comp_combined.extend(c_ng)
comp_total = Counter(all_comp_combined)
all_unique = set(target_counts.keys()) | set(comp_total.keys())
ngram_data = []
num_comp = max(len(comp_titles), 1)
for ngram in all_unique:
cnt_t = target_counts.get(ngram, 0)
cnt_c = comp_total.get(ngram, 0)
avg_c = round(cnt_c / num_comp, 1)
detailed = [c.get(ngram, 0) for c in comp_counters]
occurrence = sum(1 for x in detailed if x > 0)
if cnt_t > 0 or avg_c >= 0.3 or any(x > 0 for x in detailed):
ngram_data.append({
"ngram": ngram,
"target_count": cnt_t,
"competitor_avg": avg_c,
"competitor_detailed": detailed,
"comp_occurrence": occurrence
})
ngram_data.sort(key=lambda x: x["comp_occurrence"], reverse=True)
stats[key] = ngram_data
return stats
def _title_keyword_coverage(target: str, comp_titles: List[str], raw_keywords: List[str], lang: str) -> List[Dict]:
if not raw_keywords:
return []
target_lower = target.lower()
target_unigrams = set(generate_ngrams_safe(target, lang, 1))
target_bigrams = set(generate_ngrams_safe(target, lang, 2))
results = []
for phrase in raw_keywords:
if not phrase.strip():
continue
phrase_lower = phrase.strip().lower()
exact_match = phrase_lower in target_lower
phrase_uni = set(generate_ngrams_safe(phrase, lang, 1))
phrase_bi = set(generate_ngrams_safe(phrase, lang, 2))
uni_found = phrase_uni & target_unigrams
bi_found = phrase_bi & target_bigrams
if phrase_uni:
word_coverage = round(len(uni_found) / len(phrase_uni) * 100)
else:
word_coverage = 0
comp_presence = 0
for ct in comp_titles:
if phrase_lower in ct.lower():
comp_presence += 1
results.append({
"phrase": phrase.strip(),
"exact_match": exact_match,
"word_coverage": word_coverage,
"words_found": sorted(list(uni_found)),
"words_missing": sorted(list(phrase_uni - uni_found)),
"bigrams_found": sorted(list(bi_found)),
"comp_presence": comp_presence,
"comp_total": len(comp_titles)
})
return results
def _title_bert(target: str, comp_titles: List[str], raw_keywords: List[str], lang: str) -> Dict:
if not raw_keywords:
return {"target_score": 0, "comp_scores": [], "per_keyword": []}
model = get_bert_model()
valid_keywords = [p.strip() for p in raw_keywords if p.strip()]
if not valid_keywords:
return {"target_score": 0, "comp_scores": [], "per_keyword": []}
keys_emb = model.encode(valid_keywords, convert_to_tensor=True)
target_emb = model.encode([target.strip()], convert_to_tensor=True)
sims_target = util.cos_sim(keys_emb, target_emb).squeeze(1)
target_per_kw = [round(s.item(), 3) for s in sims_target]
target_avg = round(torch.mean(sims_target).item(), 3)
comp_scores = []
for ct in comp_titles:
ct_emb = model.encode([ct.strip()], convert_to_tensor=True)
sims_c = util.cos_sim(keys_emb, ct_emb).squeeze(1)
comp_scores.append(round(torch.mean(sims_c).item(), 3))
per_keyword = []
for i, kw in enumerate(valid_keywords):
per_keyword.append({
"keyword": kw,
"target_score": target_per_kw[i],
"comp_scores": [round(util.cos_sim(keys_emb[i:i+1], model.encode([ct.strip()], convert_to_tensor=True))[0][0].item(), 3) for ct in comp_titles]
})
return {
"target_score": target_avg,
"comp_scores": comp_scores,
"per_keyword": per_keyword
}