ai-seo-analyzer / semantic_graph.py
lsdf's picture
Add semantic cleanup, canonicalization, and generic-term downweighting
2aba76e
from collections import Counter, defaultdict
from typing import Dict, List, Any, Tuple
import math
import networkx as nx
import logic
def _normalize_to_1_100(values: Dict[str, float]) -> Dict[str, int]:
if not values:
return {}
v_min = min(values.values())
v_max = max(values.values())
if v_max == v_min:
return {k: 50 for k in values.keys()}
return {
k: int(round(1 + ((v - v_min) / (v_max - v_min)) * 99))
for k, v in values.items()
}
def _extract_significant_lemmas(sent: Dict[str, Any]) -> List[str]:
lemmas: List[str] = []
for tok in sent.get("tokens", []):
if tok.get("is_significant") and tok.get("lemma"):
lemmas.append(str(tok["lemma"]))
return lemmas
NOISE_PATTERNS = (
"contact us",
"responsible gaming",
"play responsibly",
"copyright",
"terms of use",
"new customers only",
"t&c apply",
"18+",
"info@",
)
GENERIC_TERMS = {
"игра", "играть", "казино", "ставка", "деньга", "деньги",
"демо", "режим", "уровень", "шаг", "выигрыш", "риск",
"game", "play", "casino", "bet", "demo", "mode", "level", "risk", "win",
}
def _is_noise_sentence(text: str) -> bool:
t = " ".join((text or "").lower().split())
if not t:
return True
if any(p in t for p in NOISE_PATTERNS):
return True
# Частые CTA/служебные короткие строки.
if len(t.split()) <= 3 and t in {"играть", "play", "chicken road", "chicken road game"}:
return True
return False
def _canonicalize_term(term: str) -> str:
t = " ".join((term or "").lower().replace("ё", "е").replace("-", " ").split())
if not t:
return t
# Бренд/наименование игры.
if t in {"чикен роад", "chicken road", "chickenroad", "chiken road"}:
return "chicken road"
if t in {"игры inout", "inout games", "inout game", "inout"}:
return "inout games"
# Полезная нормализация русской фразы под один термин.
if t in {"реальные деньги", "реальный деньги"}:
return "реальные деньги"
return t
def _extract_phrase_candidates(sentence_text: str, lang: str) -> List[str]:
"""
Извлекает фразовые кандидаты через существующую n-gram логику проекта,
чтобы сохранить естественные сочетания со стоп-словами внутри.
"""
candidates: List[str] = []
for n in (2, 3):
candidates.extend(logic.generate_ngrams_safe(sentence_text, lang, n))
return candidates
def _normalize_lemma_sequence(lemmas: List[str]) -> List[str]:
"""
Убирает подряд идущие дубликаты, чтобы не рождать шумные
термины вроде "дорожка дорожка" из технических повторов.
"""
if not lemmas:
return []
normalized = [lemmas[0]]
for item in lemmas[1:]:
if item != normalized[-1]:
normalized.append(item)
return normalized
def build_semantic_graph(
sentences_data: List[Dict[str, Any]],
window_size: int = 5,
min_phrase_freq: int = 2,
lang: str = "ru",
) -> Tuple[nx.DiGraph, Dict[str, int]]:
"""
Строит направленный граф из лемм и считает веса:
- edge_weight: условная вероятность P(B|A) * 100
- node_weight: нормализованный PageRank 1..100
Версия v5:
- связи считаются в локальном скользящем окне (а не "все со всеми")
- условная вероятность считается строго как cooc(a,b)/occ(a), поэтому <= 100
- узлы графа: слова + устойчивые словосочетания (bi/tri-grams)
- словосочетания извлекаются через Smart Window (как в основной логике n-gram)
- повышаем вклад устойчивых фраз в итоговый смысловой вес понятия
"""
term_occ = Counter()
pair_cooc = defaultdict(int)
phrase_occ = Counter()
phrase_sent_ids = defaultdict(set)
term_sent_ids = defaultdict(set)
sentence_words: List[List[str]] = []
sentence_phrases: List[List[str]] = []
for sent_id, sent in enumerate(sentences_data):
raw_text = sent.get("raw_text", "")
if _is_noise_sentence(raw_text):
sentence_words.append([])
sentence_phrases.append([])
continue
lemmas_raw = _extract_significant_lemmas(sent)
lemmas = [_canonicalize_term(x) for x in _normalize_lemma_sequence(lemmas_raw)]
lemmas = [x for x in lemmas if x]
sentence_words.append(lemmas)
phrase_candidates = _extract_phrase_candidates(raw_text, lang)
# Фильтр мусора фраз: минимум 2 слова, без дублирующегося подряд слова.
clean_phrases = []
for p in phrase_candidates:
parts = [x.strip() for x in p.split() if x.strip()]
if len(parts) < 2:
continue
bad_repeat = any(parts[i] == parts[i + 1] for i in range(len(parts) - 1))
if bad_repeat:
continue
clean_phrases.append(_canonicalize_term(" ".join(parts)))
sentence_phrases.append(clean_phrases)
if not lemmas:
continue
# Для каждого вхождения a считаем его локальный контекст в окне.
# cooc(a,b) увеличивается максимум на 1 на одно вхождение a,
# что гарантирует cooc(a,b) <= occ(a) и P(B|A) <= 100.
for i, a in enumerate(lemmas):
term_occ[a] += 1
left = max(0, i - max(1, window_size))
right = min(len(lemmas), i + max(1, window_size) + 1)
neighbors = set()
for j in range(left, right):
if j == i:
continue
b = lemmas[j]
if not b or a == b:
continue
neighbors.add(b)
for b in neighbors:
pair_cooc[(a, b)] += 1
for w in set(lemmas):
term_sent_ids[w].add(sent_id)
# Частоты/охват фраз по предложениям.
for sent_id, phrases in enumerate(sentence_phrases):
for phrase in phrases:
phrase_occ[phrase] += 1
phrase_sent_ids[phrase].add(sent_id)
# Оставляем только устойчивые фразы.
allowed_phrases = {
p for p, c in phrase_occ.items()
if c >= max(1, min_phrase_freq) and len(phrase_sent_ids[p]) >= max(1, min_phrase_freq)
}
# Добавляем связи phrase <-> word на уровне предложения.
for sent_id, words in enumerate(sentence_words):
if not words:
continue
phrases = [p for p in sentence_phrases[sent_id] if p in allowed_phrases]
if not phrases:
continue
uniq_words = set(words)
uniq_phrases = set(phrases)
for phrase in uniq_phrases:
term_occ[phrase] += 1
term_sent_ids[phrase].add(sent_id)
parts = set(phrase.split())
# Связываем фразу с ее компонентами всегда (ядро термина).
for w in parts:
pair_cooc[(phrase, w)] += 1
pair_cooc[(w, phrase)] += 1
# И с остальными словами предложения (контекст термина).
for w in uniq_words:
if w in parts:
continue
pair_cooc[(phrase, w)] += 1
pair_cooc[(w, phrase)] += 1
graph = nx.DiGraph()
for term, freq in term_occ.items():
term_type = "phrase" if " " in term else "word"
graph.add_node(term, frequency=int(freq), term_type=term_type)
for (a, b), cooc in pair_cooc.items():
base = term_occ.get(a, 0)
if base <= 0:
continue
weight = (cooc / base) * 100.0
weight = max(0.0, min(100.0, weight))
if weight > 0:
graph.add_edge(a, b, weight=round(weight, 3))
if graph.number_of_nodes() == 0:
return graph, {}
try:
pr = nx.pagerank(graph, weight="weight")
except Exception:
pr = {node: 1.0 for node in graph.nodes()}
# Комбинируем centrality с termness-фактором.
# Для слов: умеренный буст по контекстной связанности.
# Для фраз: более сильный буст по частоте/охвату, чтобы устойчивые термины поднимались в топ.
combined_scores = {}
total_sent = max(1, len(sentence_words))
for node, score in pr.items():
out_deg = graph.out_degree(node)
in_deg = graph.in_degree(node)
connectivity_factor = 1.0 + 0.025 * (out_deg + in_deg)
sent_df = len(term_sent_ids.get(node, set()))
idf_factor = 1.0 + 0.35 * math.log((1.0 + total_sent) / (1.0 + max(1, sent_df)))
if graph.nodes[node].get("term_type") == "phrase":
freq = max(1, int(graph.nodes[node].get("frequency", 1)))
sent_cover = len(phrase_sent_ids.get(node, set()))
termness = (1.0 + 0.22 * math.log1p(freq)) * (1.0 + 0.12 * math.log1p(sent_cover))
contains_generic = any(part in GENERIC_TERMS for part in node.split())
generic_penalty = 0.85 if contains_generic else 1.0
combined_scores[node] = score * connectivity_factor * termness * idf_factor * generic_penalty
else:
generic_penalty = 0.58 if node in GENERIC_TERMS else 1.0
combined_scores[node] = score * connectivity_factor * idf_factor * generic_penalty
node_weights = _normalize_to_1_100(combined_scores)
for node, w in node_weights.items():
graph.nodes[node]["weight"] = int(w)
return graph, node_weights
def get_graph_data_for_frontend(graph: nx.DiGraph, top_edges_per_node: int = 8) -> Dict[str, List[Dict[str, Any]]]:
nodes = []
links = []
for node, attrs in graph.nodes(data=True):
nodes.append(
{
"id": node,
"label": node,
"weight": int(attrs.get("weight", 1)),
"frequency": int(attrs.get("frequency", 0)),
}
)
for source in graph.nodes():
out_edges = sorted(
graph.out_edges(source, data=True),
key=lambda e: e[2].get("weight", 0),
reverse=True,
)[:top_edges_per_node]
for s, t, attrs in out_edges:
links.append(
{
"source": s,
"target": t,
"weight": float(attrs.get("weight", 0)),
}
)
return {"nodes": nodes, "links": links}
def get_top_keywords(node_weights: Dict[str, int], top_n: int = 20) -> List[Dict[str, Any]]:
items = sorted(node_weights.items(), key=lambda x: x[1], reverse=True)[:top_n]
return [{"lemma": lemma, "weight": int(weight)} for lemma, weight in items]