ai-seo-analyzer / logic.py
lsdf's picture
Add Portuguese language support across app and deployment
0c7bd2e
raw
history blame
27.7 kB
import spacy
from collections import Counter
from typing import List, Dict
import numpy as np
from rank_bm25 import BM25Okapi
import re
# Новые импорты для BERT
import torch
from sentence_transformers import SentenceTransformer, util
# --- Глобальные переменные ---
LoadedModels = {} # spaCy модели
BertModel = None # BERT модель (одна на все языки)
MODEL_NAMES = {
"en": "en_core_web_sm",
"ru": "ru_core_news_sm",
"de": "de_core_news_sm",
"es": "es_core_news_sm",
"it": "it_core_news_sm",
"pl": "pl_core_news_sm",
"pt": "pt_core_news_sm",
}
# --- SPACY (Лингвистика) ---
def load_model_if_missing(lang: str):
if lang in LoadedModels: return
model_name = MODEL_NAMES.get(lang)
if not model_name: return
print(f"⏳ Loading spaCy model for {lang}...")
try:
LoadedModels[lang] = spacy.load(model_name)
print(f"✅ Loaded spaCy: {lang}")
except Exception as e:
print(f"❌ Failed to load spaCy {lang}: {e}")
def load_models():
"""
Функция для предзагрузки всех моделей при старте (вызывается из main.py).
"""
print("🚀 Pre-loading all spaCy models...")
for lang in MODEL_NAMES.keys():
load_model_if_missing(lang)
def get_doc(text: str, lang: str):
load_model_if_missing(lang)
nlp = LoadedModels.get(lang)
if not nlp:
load_model_if_missing("en")
nlp = LoadedModels.get("en")
if not nlp: raise RuntimeError("No NLP models loaded.")
nlp.max_length = 2000000
return nlp(text.lower())
# --- НОВАЯ ФУНКЦИЯ ФИЛЬТРАЦИИ ---
def is_valid_token(t):
"""
Проверяет, является ли токен полезным словом.
Исправленная версия: не удаляет слова из букв, даже если AI пометил их как символы.
"""
# 1. Базовые проверки spaCy (Стоп-слова, пунктуация, пробелы)
if t.is_stop or t.is_punct or t.is_space:
return False
# 2. Числа (удаляем "18", "2023", "5")
if t.is_digit or t.like_num:
return False
# 3. СИМВОЛЫ (ИСПРАВЛЕНИЕ)
# Если spaCy говорит, что это символ (SYM), мы верим, ТОЛЬКО если это не буквы.
# Это спасет слова типа "cross", "apk", "bet", которые могут быть ложно помечены.
if t.pos_ == "SYM" and not t.text.isalpha():
return False
# 4. Дополнительная страховка (явный мусор)
garbage_chars = {'|', '+', '-', '—', '–', '>', '<', '=', '/', '\\', '★', '▶', '●', '•', '€', '$', '£'}
if t.text.strip() in garbage_chars:
return False
# 5. Длина: Удаляем одиночные буквы, которые не являются словами
# (опционально, но помогает чистить мусор типа "v", "s" если они не стоп-слова)
if len(t.text) == 1 and not t.text.isalpha():
return False
return True
def get_lemmas_flat(text: str, lang: str) -> List[str]:
"""
Возвращает плоский список лемм для всего текста (нужен для BM25).
"""
if not text: return []
doc = get_doc(text, lang)
# Используем нашу новую функцию фильтрации
return [t.lemma_ for t in doc if is_valid_token(t)]
def generate_ngrams_safe(text: str, lang: str, n: int) -> List[str]:
"""
Генерирует n-граммы.
ЛОГИКА: Smart Window (Та самая, которая работает идеально).
1. Считает N-граммой последовательность из N ЗНАЧИМЫХ слов.
2. Сохраняет стоп-слова ВНУТРИ фразы (для читаемости: "bevor sie mit echtem geld").
3. Игнорирует стоп-слова ПО КРАЯМ.
"""
if not text: return []
# ЕДИНСТВЕННОЕ ИЗМЕНЕНИЕ: Заменяем дефисы на пробелы, чтобы разлепить "Casino-Websites"
text = re.sub(r'[\-\–\—\/]', ' ', text)
# Дальше - ТА САМАЯ логика, которая работала идеально
clean_text = " ".join(text.split())
doc = get_doc(clean_text, lang)
all_ngrams = []
garbage_chars = {'|', '+', '-', '—', '–', '>', '<', '=', '/', '\\', '★', '▶', '●', '•', '€', '$', '£', '™', '®', '«', '»', '"', '(', ')', '[', ']', '!', '?', '.', ',', ':', ';'}
for sent in doc.sents:
# Разбираем предложение на токены
tokens = []
for t in sent:
# Стена
is_wall = (
t.is_punct or t.is_space or t.pos_ == "SYM" or
t.is_digit or t.like_num or
t.text.strip() in garbage_chars
)
if is_wall:
tokens.append(None) # Маркер разрыва
else:
tokens.append({
"text": t.text.lower(),
"is_stop": t.is_stop
})
# Ищем последовательности (Smart Window)
for i in range(len(tokens)):
# Начало фразы не может быть стоп-словом или стеной
if tokens[i] is None or tokens[i]["is_stop"]:
continue
current_phrase = []
significant_count = 0
for j in range(i, len(tokens)):
token = tokens[j]
if token is None: # Стена - конец фразы
break
current_phrase.append(token["text"])
# Если не стоп-слово, увеличиваем счетчик.
# Если стоп-слово - просто добавляем в current_phrase, но счетчик стоит.
# Это сохраняет "sie mit" внутри фразы.
if not token["is_stop"]:
significant_count += 1
# Если набрали N значимых слов
if significant_count == n:
ngram_str = " ".join(current_phrase)
all_ngrams.append(ngram_str)
break
return all_ngrams
# --- WORD COUNT ---
def count_words(text: str, lang: str) -> Dict[str, int]:
"""
Считает слова. ПОЛНОСТЬЮ ИЗОЛИРОВАННАЯ ЛОГИКА.
Не зависит от N-грамм или BM25.
"""
if not text.strip():
return {"total": 0, "significant": 0}
# 1. Получаем чистый объект spaCy (без всяких re.sub и вырезаний)
# Просто убираем лишние пробелы/энтеры
clean_text = " ".join(text.split())
doc = get_doc(clean_text, lang)
total_count = 0
significant_count = 0
for t in doc:
# --- ФИЛЬТР ДЛЯ ОБЩЕГО КОЛИЧЕСТВА ---
# Считаем всё, кроме пунктуации и пробелов
if t.is_punct or t.is_space:
continue
total_count += 1
# --- ФИЛЬТР ДЛЯ ЗНАЧИМЫХ СЛОВ ---
# Если это стоп-слово (der, die, das...) - не считаем в значимые
if t.is_stop:
continue
# Если это цифра - не считаем
if t.is_digit or t.like_num:
continue
# Если это спецсимвол - не считаем
if t.pos_ == "SYM":
continue
significant_count += 1
return {"total": total_count, "significant": significant_count}
# --- ANALYTICS (N-grams & BM25) ---
def calculate_ngram_stats(target_text: str, competitor_texts: List[str], lang: str) -> Dict:
stats = {}
valid_competitors = [t for t in competitor_texts if t.strip()]
for n in range(1, 5):
key = {1: "unigrams", 2: "bigrams", 3: "trigrams", 4: "quadgrams"}[n]
target_ngrams = generate_ngrams_safe(target_text, lang, n)
target_counts = Counter(target_ngrams)
comp_counters = []
all_comp_ngrams_combined = []
for t in valid_competitors:
c_ngrams = generate_ngrams_safe(t, lang, n)
comp_counters.append(Counter(c_ngrams))
all_comp_ngrams_combined.extend(c_ngrams)
comp_counts_total = Counter(all_comp_ngrams_combined)
all_unique = set(target_counts.keys()) | set(comp_counts_total.keys())
ngram_data = []
num_competitors = max(len(valid_competitors), 1)
for ngram in all_unique:
cnt_target = target_counts.get(ngram, 0)
cnt_total_comp = comp_counts_total.get(ngram, 0)
avg_comp = round(cnt_total_comp / num_competitors, 1)
detailed_comp_counts = [c.get(ngram, 0) for c in comp_counters]
# Считаем, у скольких конкурентов встречается эта фраза (> 0)
comp_occurrence = sum(1 for count in detailed_comp_counts if count > 0)
if cnt_target > 0 or avg_comp >= 0.5 or any(c > 0 for c in detailed_comp_counts):
ngram_data.append({
"ngram": ngram,
"target_count": cnt_target,
"competitor_avg": avg_comp,
"competitor_detailed": detailed_comp_counts,
"comp_occurrence": comp_occurrence
})
ngram_data.sort(key=lambda x: max([x["target_count"]] + x["competitor_detailed"]), reverse=True)
stats[key] = ngram_data[:200]
return stats
def parse_keywords(raw_phrases: List[str], lang: str):
key_phrases = []
keywords = set()
for phrase in raw_phrases:
if not phrase.strip(): continue
lemmas = get_lemmas_flat(phrase, lang)
if lemmas:
key_phrases.append(phrase.strip()) # Для BERT храним исходную фразу, а не леммы!
for w in lemmas: keywords.add(w)
return list(key_phrases), list(keywords)
def calculate_bm25_recommendations(target_text: str, competitor_texts: List[str], raw_keywords: List[str], lang: str):
"""
BM25. Использует ТУ ЖЕ функцию генерации, чтобы видеть то же, что и N-граммы.
"""
if not target_text or not raw_keywords: return []
recommendations = []
analyzed_keys = []
seen_terms = set()
for phrase in raw_keywords:
if not phrase.strip(): continue
# Генерируем ключи той же функцией
for n in range(1, 4):
ngrams = generate_ngrams_safe(phrase, lang, n)
for term in ngrams:
if term not in seen_terms:
analyzed_keys.append({"n": n, "term": term, "original": phrase})
seen_terms.add(term)
for n in range(1, 4):
current_n_keys = [k['term'] for k in analyzed_keys if k['n'] == n]
if not current_n_keys: continue
target_ngrams = generate_ngrams_safe(target_text, lang, n)
comp_ngrams_list = []
for t in competitor_texts:
if t.strip():
comp_ngrams_list.append(generate_ngrams_safe(t, lang, n))
else:
comp_ngrams_list.append([])
corpus = [target_ngrams] + comp_ngrams_list
bm25 = BM25Okapi(corpus)
for term in current_n_keys:
scores = bm25.get_scores([term])
score_target = scores[0]
score_avg_comp = np.mean(scores[1:]) if len(scores) > 1 else 0
if n == 1: threshold = 0.5
elif n == 2: threshold = 0.25
else: threshold = 0.15
action = "ok"
count_rec = 0
if score_target < score_avg_comp - threshold:
action = "add"
factor = 0.5 if n == 1 else 0.4
count_rec = max(1, int((score_avg_comp - score_target) * factor))
elif score_target > score_avg_comp + threshold * 2:
action = "remove"
factor = 0.5
count_rec = max(1, int((score_target - score_avg_comp) * factor))
recommendations.append({
"word": term,
"type": f"{n}-gram",
"my_score": round(score_target, 2),
"avg_comp_score": round(score_avg_comp, 2),
"action": action,
"count": count_rec
})
recommendations.sort(key=lambda x: (0 if x["action"] != "ok" else 1, -len(x["word"].split()), x["word"]))
return recommendations
# --- BERT / VECTOR ANALYSIS ---
def get_bert_model():
"""Загружает BERT на GPU, если он доступен"""
global BertModel
if BertModel is None:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"🚀 Loading BERT model on {device}...")
# Используем легкую и мощную мультиязычную модель
BertModel = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2', device=device)
print("✅ BERT Loaded successfully.")
return BertModel
def perform_bert_analysis(target_text: str, competitor_texts: List[str], key_phrases: List[str], lang: str):
if not key_phrases:
return {"detailed": [], "global_scores": []}
model = get_bert_model()
# 1. Функция-помощник: Получить чанки и их эмбеддинги
def process_text(text):
if not text.strip(): return [], None
# Для BERT используем сырой текст (с абзацами), не пропуская через get_doc
# чтобы сохранить структуру текста и контекст предложений
load_model_if_missing(lang)
nlp = LoadedModels.get(lang)
if not nlp:
load_model_if_missing("en")
nlp = LoadedModels.get("en")
nlp.max_length = 2000000
doc = nlp(text) # Сырой текст без .lower()!
# Разбиваем на предложения > 10 символов
chunks = [sent.text.strip() for sent in doc.sents if len(sent.text.strip()) > 10]
if not chunks: return [], None
embeddings = model.encode(chunks, convert_to_tensor=True)
return chunks, embeddings
# 2. Обрабатываем Наш текст
target_chunks, target_emb = process_text(target_text)
# 3. Обрабатываем Конкурентов (сохраняем структуру)
competitors_data = []
for idx, comp_text in enumerate(competitor_texts):
chunks, emb = process_text(comp_text)
competitors_data.append({
"id": idx + 1,
"chunks": chunks,
"embeddings": emb
})
# Эмбеддинги ключей
keys_emb = model.encode(key_phrases, convert_to_tensor=True)
# --- РАСЧЕТ GLOBAL SCORE ---
# Global Score - это средний Max Score по всем ключевым словам.
# То есть, насколько хорошо текст покрывает ВСЕ ключи в среднем.
global_scores = []
# Считаем для нас
if target_emb is not None:
# Матрица [Key x Chunk]
sims = util.cos_sim(keys_emb, target_emb)
# Берем макс. сходство для каждого ключа (values), потом среднее по всем ключам
# torch.max возвращает (values, indices)
max_scores_per_key, _ = torch.max(sims, dim=1)
avg_relevance = torch.mean(max_scores_per_key).item()
global_scores.append({"name": "Мой текст", "score": round(avg_relevance, 3), "is_me": True})
else:
global_scores.append({"name": "Мой текст", "score": 0, "is_me": True})
# Считаем для конкурентов
for comp in competitors_data:
if comp["embeddings"] is not None:
sims = util.cos_sim(keys_emb, comp["embeddings"])
max_scores_per_key, _ = torch.max(sims, dim=1)
avg_relevance = torch.mean(max_scores_per_key).item()
global_scores.append({"name": f"Конкурент #{comp['id']}", "score": round(avg_relevance, 3), "is_me": False})
else:
global_scores.append({"name": f"Конкурент #{comp['id']}", "score": 0, "is_me": False})
# Сортируем глобальный рейтинг (победитель сверху)
global_scores.sort(key=lambda x: x["score"], reverse=True)
# --- ДЕТАЛЬНЫЙ АНАЛИЗ ПО ФРАЗАМ ---
detailed_results = []
for i, phrase in enumerate(key_phrases):
# 1. Анализ моего текста
my_top = []
my_max = 0
if target_emb is not None:
# Считаем снова локально или берем из матрицы (тут проще локально для чистоты кода)
# scores_target[i] уже посчитано выше в sims, но выше переменная sims переписывалась.
# Для надежности пересчитаем векторную близость для одной фразы (это мгновенно)
phrase_emb = keys_emb[i]
scores = util.cos_sim(phrase_emb, target_emb)[0] # вектор [chunks]
k = min(5, len(target_chunks))
vals, idxs = torch.topk(scores, k)
my_max = vals[0].item() if k > 0 else 0
for rank in range(k):
my_top.append({
"text": target_chunks[idxs[rank].item()],
"score": round(vals[rank].item(), 3)
})
# 2. Анализ конкурентов (Сборная солянка)
# Собираем все чанки всех конкурентов с их скорами и ID
all_comp_candidates = []
for comp in competitors_data:
if comp["embeddings"] is not None:
phrase_emb = keys_emb[i]
scores = util.cos_sim(phrase_emb, comp["embeddings"])[0]
# Берем топ-3 от каждого конкурента, чтобы добавить в общий пул
k = min(3, len(comp["chunks"]))
vals, idxs = torch.topk(scores, k)
for rank in range(k):
all_comp_candidates.append({
"text": comp["chunks"][idxs[rank].item()],
"score": vals[rank].item(),
"source": f"Конкурент #{comp['id']}" # <-- АТРИБУЦИЯ
})
# Сортируем общий пул конкурентов и берем ТОП-5 абсолютных лидеров
all_comp_candidates.sort(key=lambda x: x["score"], reverse=True)
comp_top_5 = all_comp_candidates[:5]
# Округляем скоры для вывода
for item in comp_top_5:
item["score"] = round(item["score"], 3)
comp_max = comp_top_5[0]["score"] if comp_top_5 else 0
# Статус
status = "ok"
rec = "Тема раскрыта хорошо."
if my_max < 0.5:
status = "bad"
rec = "Тема не раскрыта."
elif comp_max > my_max + 0.1:
status = "warning"
rec = "Конкуренты раскрыли тему заметно лучше."
elif my_max >= 0.7:
status = "good"
rec = "Отлично."
detailed_results.append({
"phrase": phrase,
"my_max_score": round(my_max, 2),
"comp_max_score": round(comp_max, 2),
"status": status,
"recommendation": rec,
"my_top_chunks": my_top,
"comp_top_chunks": comp_top_5
})
return {
"global_scores": global_scores,
"detailed": detailed_results
}
# --- TITLE ANALYZER ---
def analyze_title(target_title: str, competitor_titles: List[str], raw_keywords: List[str], lang: str) -> Dict:
"""
Orchestrator: анализирует Title тег.
Возвращает словарь с 4 блоками:
- length: длина в символах и словах
- ngrams: частотный анализ (uni/bi) Target vs конкуренты
- keyword_coverage: покрытие ключевых фраз в Title
- bert: семантическая близость Title к ключам
"""
if not target_title.strip():
return {}
valid_comp_titles = [t for t in competitor_titles if t.strip()]
result = {
"target_title": target_title.strip(),
"competitor_titles": [t.strip() for t in valid_comp_titles],
"length": _title_length(target_title, valid_comp_titles, lang),
"ngrams": _title_ngrams(target_title, valid_comp_titles, lang),
"keyword_coverage": _title_keyword_coverage(target_title, valid_comp_titles, raw_keywords, lang),
"bert": _title_bert(target_title, valid_comp_titles, raw_keywords, lang),
}
return result
def _title_length(target: str, comp_titles: List[str], lang: str) -> Dict:
target_chars = len(target.strip())
target_wc = count_words(target, lang)
comp_data = []
for t in comp_titles:
comp_data.append({
"chars": len(t.strip()),
"words": count_words(t, lang)
})
avg_chars = round(sum(c["chars"] for c in comp_data) / max(len(comp_data), 1))
avg_total = round(sum(c["words"]["total"] for c in comp_data) / max(len(comp_data), 1))
status = "ok"
if target_chars > 60:
status = "too_long"
elif target_chars < 30:
status = "too_short"
return {
"target_chars": target_chars,
"target_words": target_wc,
"comp_data": comp_data,
"avg_chars": avg_chars,
"avg_words_total": avg_total,
"status": status
}
def _title_ngrams(target: str, comp_titles: List[str], lang: str) -> Dict:
stats = {}
for n in [1, 2]:
key = "unigrams" if n == 1 else "bigrams"
target_ngrams = generate_ngrams_safe(target, lang, n)
target_counts = Counter(target_ngrams)
comp_counters = []
all_comp_combined = []
for t in comp_titles:
c_ng = generate_ngrams_safe(t, lang, n)
comp_counters.append(Counter(c_ng))
all_comp_combined.extend(c_ng)
comp_total = Counter(all_comp_combined)
all_unique = set(target_counts.keys()) | set(comp_total.keys())
ngram_data = []
num_comp = max(len(comp_titles), 1)
for ngram in all_unique:
cnt_t = target_counts.get(ngram, 0)
cnt_c = comp_total.get(ngram, 0)
avg_c = round(cnt_c / num_comp, 1)
detailed = [c.get(ngram, 0) for c in comp_counters]
occurrence = sum(1 for x in detailed if x > 0)
if cnt_t > 0 or avg_c >= 0.3 or any(x > 0 for x in detailed):
ngram_data.append({
"ngram": ngram,
"target_count": cnt_t,
"competitor_avg": avg_c,
"competitor_detailed": detailed,
"comp_occurrence": occurrence
})
ngram_data.sort(key=lambda x: x["comp_occurrence"], reverse=True)
stats[key] = ngram_data
return stats
def _title_keyword_coverage(target: str, comp_titles: List[str], raw_keywords: List[str], lang: str) -> List[Dict]:
if not raw_keywords:
return []
target_lower = target.lower()
target_unigrams = set(generate_ngrams_safe(target, lang, 1))
target_bigrams = set(generate_ngrams_safe(target, lang, 2))
results = []
for phrase in raw_keywords:
if not phrase.strip():
continue
phrase_lower = phrase.strip().lower()
exact_match = phrase_lower in target_lower
phrase_uni = set(generate_ngrams_safe(phrase, lang, 1))
phrase_bi = set(generate_ngrams_safe(phrase, lang, 2))
uni_found = phrase_uni & target_unigrams
bi_found = phrase_bi & target_bigrams
if phrase_uni:
word_coverage = round(len(uni_found) / len(phrase_uni) * 100)
else:
word_coverage = 0
comp_presence = 0
for ct in comp_titles:
if phrase_lower in ct.lower():
comp_presence += 1
results.append({
"phrase": phrase.strip(),
"exact_match": exact_match,
"word_coverage": word_coverage,
"words_found": sorted(list(uni_found)),
"words_missing": sorted(list(phrase_uni - uni_found)),
"bigrams_found": sorted(list(bi_found)),
"comp_presence": comp_presence,
"comp_total": len(comp_titles)
})
return results
def _title_bert(target: str, comp_titles: List[str], raw_keywords: List[str], lang: str) -> Dict:
if not raw_keywords:
return {"target_score": 0, "comp_scores": [], "per_keyword": []}
model = get_bert_model()
valid_keywords = [p.strip() for p in raw_keywords if p.strip()]
if not valid_keywords:
return {"target_score": 0, "comp_scores": [], "per_keyword": []}
keys_emb = model.encode(valid_keywords, convert_to_tensor=True)
target_emb = model.encode([target.strip()], convert_to_tensor=True)
sims_target = util.cos_sim(keys_emb, target_emb).squeeze(1)
target_per_kw = [round(s.item(), 3) for s in sims_target]
target_avg = round(torch.mean(sims_target).item(), 3)
comp_scores = []
for ct in comp_titles:
ct_emb = model.encode([ct.strip()], convert_to_tensor=True)
sims_c = util.cos_sim(keys_emb, ct_emb).squeeze(1)
comp_scores.append(round(torch.mean(sims_c).item(), 3))
per_keyword = []
for i, kw in enumerate(valid_keywords):
per_keyword.append({
"keyword": kw,
"target_score": target_per_kw[i],
"comp_scores": [round(util.cos_sim(keys_emb[i:i+1], model.encode([ct.strip()], convert_to_tensor=True))[0][0].item(), 3) for ct in comp_titles]
})
return {
"target_score": target_avg,
"comp_scores": comp_scores,
"per_keyword": per_keyword
}