from collections import Counter, defaultdict from typing import Dict, List, Any, Tuple import math import networkx as nx import logic def _normalize_to_1_100(values: Dict[str, float]) -> Dict[str, int]: if not values: return {} v_min = min(values.values()) v_max = max(values.values()) if v_max == v_min: return {k: 50 for k in values.keys()} return { k: int(round(1 + ((v - v_min) / (v_max - v_min)) * 99)) for k, v in values.items() } def _extract_significant_lemmas(sent: Dict[str, Any]) -> List[str]: lemmas: List[str] = [] for tok in sent.get("tokens", []): if tok.get("is_significant") and tok.get("lemma"): lemmas.append(str(tok["lemma"])) return lemmas NOISE_PATTERNS = ( "contact us", "responsible gaming", "play responsibly", "copyright", "terms of use", "new customers only", "t&c apply", "18+", "info@", ) GENERIC_TERMS = { "игра", "играть", "казино", "ставка", "деньга", "деньги", "демо", "режим", "уровень", "шаг", "выигрыш", "риск", "game", "play", "casino", "bet", "demo", "mode", "level", "risk", "win", } def _is_noise_sentence(text: str) -> bool: t = " ".join((text or "").lower().split()) if not t: return True if any(p in t for p in NOISE_PATTERNS): return True # Частые CTA/служебные короткие строки. if len(t.split()) <= 3 and t in {"играть", "play", "chicken road", "chicken road game"}: return True return False def _canonicalize_term(term: str) -> str: t = " ".join((term or "").lower().replace("ё", "е").replace("-", " ").split()) if not t: return t # Бренд/наименование игры. if t in {"чикен роад", "chicken road", "chickenroad", "chiken road"}: return "chicken road" if t in {"игры inout", "inout games", "inout game", "inout"}: return "inout games" # Полезная нормализация русской фразы под один термин. if t in {"реальные деньги", "реальный деньги"}: return "реальные деньги" return t def _extract_phrase_candidates(sentence_text: str, lang: str) -> List[str]: """ Извлекает фразовые кандидаты через существующую n-gram логику проекта, чтобы сохранить естественные сочетания со стоп-словами внутри. """ candidates: List[str] = [] for n in (2, 3): candidates.extend(logic.generate_ngrams_safe(sentence_text, lang, n)) return candidates def _normalize_lemma_sequence(lemmas: List[str]) -> List[str]: """ Убирает подряд идущие дубликаты, чтобы не рождать шумные термины вроде "дорожка дорожка" из технических повторов. """ if not lemmas: return [] normalized = [lemmas[0]] for item in lemmas[1:]: if item != normalized[-1]: normalized.append(item) return normalized def build_semantic_graph( sentences_data: List[Dict[str, Any]], window_size: int = 5, min_phrase_freq: int = 2, lang: str = "ru", ) -> Tuple[nx.DiGraph, Dict[str, int]]: """ Строит направленный граф из лемм и считает веса: - edge_weight: условная вероятность P(B|A) * 100 - node_weight: нормализованный PageRank 1..100 Версия v5: - связи считаются в локальном скользящем окне (а не "все со всеми") - условная вероятность считается строго как cooc(a,b)/occ(a), поэтому <= 100 - узлы графа: слова + устойчивые словосочетания (bi/tri-grams) - словосочетания извлекаются через Smart Window (как в основной логике n-gram) - повышаем вклад устойчивых фраз в итоговый смысловой вес понятия """ term_occ = Counter() pair_cooc = defaultdict(int) phrase_occ = Counter() phrase_sent_ids = defaultdict(set) term_sent_ids = defaultdict(set) sentence_words: List[List[str]] = [] sentence_phrases: List[List[str]] = [] for sent_id, sent in enumerate(sentences_data): raw_text = sent.get("raw_text", "") if _is_noise_sentence(raw_text): sentence_words.append([]) sentence_phrases.append([]) continue lemmas_raw = _extract_significant_lemmas(sent) lemmas = [_canonicalize_term(x) for x in _normalize_lemma_sequence(lemmas_raw)] lemmas = [x for x in lemmas if x] sentence_words.append(lemmas) phrase_candidates = _extract_phrase_candidates(raw_text, lang) # Фильтр мусора фраз: минимум 2 слова, без дублирующегося подряд слова. clean_phrases = [] for p in phrase_candidates: parts = [x.strip() for x in p.split() if x.strip()] if len(parts) < 2: continue bad_repeat = any(parts[i] == parts[i + 1] for i in range(len(parts) - 1)) if bad_repeat: continue clean_phrases.append(_canonicalize_term(" ".join(parts))) sentence_phrases.append(clean_phrases) if not lemmas: continue # Для каждого вхождения a считаем его локальный контекст в окне. # cooc(a,b) увеличивается максимум на 1 на одно вхождение a, # что гарантирует cooc(a,b) <= occ(a) и P(B|A) <= 100. for i, a in enumerate(lemmas): term_occ[a] += 1 left = max(0, i - max(1, window_size)) right = min(len(lemmas), i + max(1, window_size) + 1) neighbors = set() for j in range(left, right): if j == i: continue b = lemmas[j] if not b or a == b: continue neighbors.add(b) for b in neighbors: pair_cooc[(a, b)] += 1 for w in set(lemmas): term_sent_ids[w].add(sent_id) # Частоты/охват фраз по предложениям. for sent_id, phrases in enumerate(sentence_phrases): for phrase in phrases: phrase_occ[phrase] += 1 phrase_sent_ids[phrase].add(sent_id) # Оставляем только устойчивые фразы. allowed_phrases = { p for p, c in phrase_occ.items() if c >= max(1, min_phrase_freq) and len(phrase_sent_ids[p]) >= max(1, min_phrase_freq) } # Добавляем связи phrase <-> word на уровне предложения. for sent_id, words in enumerate(sentence_words): if not words: continue phrases = [p for p in sentence_phrases[sent_id] if p in allowed_phrases] if not phrases: continue uniq_words = set(words) uniq_phrases = set(phrases) for phrase in uniq_phrases: term_occ[phrase] += 1 term_sent_ids[phrase].add(sent_id) parts = set(phrase.split()) # Связываем фразу с ее компонентами всегда (ядро термина). for w in parts: pair_cooc[(phrase, w)] += 1 pair_cooc[(w, phrase)] += 1 # И с остальными словами предложения (контекст термина). for w in uniq_words: if w in parts: continue pair_cooc[(phrase, w)] += 1 pair_cooc[(w, phrase)] += 1 graph = nx.DiGraph() for term, freq in term_occ.items(): term_type = "phrase" if " " in term else "word" graph.add_node(term, frequency=int(freq), term_type=term_type) for (a, b), cooc in pair_cooc.items(): base = term_occ.get(a, 0) if base <= 0: continue weight = (cooc / base) * 100.0 weight = max(0.0, min(100.0, weight)) if weight > 0: graph.add_edge(a, b, weight=round(weight, 3)) if graph.number_of_nodes() == 0: return graph, {} try: pr = nx.pagerank(graph, weight="weight") except Exception: pr = {node: 1.0 for node in graph.nodes()} # Комбинируем centrality с termness-фактором. # Для слов: умеренный буст по контекстной связанности. # Для фраз: более сильный буст по частоте/охвату, чтобы устойчивые термины поднимались в топ. combined_scores = {} total_sent = max(1, len(sentence_words)) for node, score in pr.items(): out_deg = graph.out_degree(node) in_deg = graph.in_degree(node) connectivity_factor = 1.0 + 0.025 * (out_deg + in_deg) sent_df = len(term_sent_ids.get(node, set())) idf_factor = 1.0 + 0.35 * math.log((1.0 + total_sent) / (1.0 + max(1, sent_df))) if graph.nodes[node].get("term_type") == "phrase": freq = max(1, int(graph.nodes[node].get("frequency", 1))) sent_cover = len(phrase_sent_ids.get(node, set())) termness = (1.0 + 0.22 * math.log1p(freq)) * (1.0 + 0.12 * math.log1p(sent_cover)) contains_generic = any(part in GENERIC_TERMS for part in node.split()) generic_penalty = 0.85 if contains_generic else 1.0 combined_scores[node] = score * connectivity_factor * termness * idf_factor * generic_penalty else: generic_penalty = 0.58 if node in GENERIC_TERMS else 1.0 combined_scores[node] = score * connectivity_factor * idf_factor * generic_penalty node_weights = _normalize_to_1_100(combined_scores) for node, w in node_weights.items(): graph.nodes[node]["weight"] = int(w) return graph, node_weights def get_graph_data_for_frontend(graph: nx.DiGraph, top_edges_per_node: int = 8) -> Dict[str, List[Dict[str, Any]]]: nodes = [] links = [] for node, attrs in graph.nodes(data=True): nodes.append( { "id": node, "label": node, "weight": int(attrs.get("weight", 1)), "frequency": int(attrs.get("frequency", 0)), } ) for source in graph.nodes(): out_edges = sorted( graph.out_edges(source, data=True), key=lambda e: e[2].get("weight", 0), reverse=True, )[:top_edges_per_node] for s, t, attrs in out_edges: links.append( { "source": s, "target": t, "weight": float(attrs.get("weight", 0)), } ) return {"nodes": nodes, "links": links} def get_top_keywords(node_weights: Dict[str, int], top_n: int = 20) -> List[Dict[str, Any]]: items = sorted(node_weights.items(), key=lambda x: x[1], reverse=True)[:top_n] return [{"lemma": lemma, "weight": int(weight)} for lemma, weight in items]