Spaces:
Sleeping
Sleeping
| from collections import Counter, defaultdict | |
| from typing import Dict, List, Any, Tuple | |
| import math | |
| import networkx as nx | |
| import logic | |
| def _normalize_to_1_100(values: Dict[str, float]) -> Dict[str, int]: | |
| if not values: | |
| return {} | |
| v_min = min(values.values()) | |
| v_max = max(values.values()) | |
| if v_max == v_min: | |
| return {k: 50 for k in values.keys()} | |
| return { | |
| k: int(round(1 + ((v - v_min) / (v_max - v_min)) * 99)) | |
| for k, v in values.items() | |
| } | |
| def _extract_significant_lemmas(sent: Dict[str, Any]) -> List[str]: | |
| lemmas: List[str] = [] | |
| for tok in sent.get("tokens", []): | |
| if tok.get("is_significant") and tok.get("lemma"): | |
| lemmas.append(str(tok["lemma"])) | |
| return lemmas | |
| NOISE_PATTERNS = ( | |
| "contact us", | |
| "responsible gaming", | |
| "play responsibly", | |
| "copyright", | |
| "terms of use", | |
| "new customers only", | |
| "t&c apply", | |
| "18+", | |
| "info@", | |
| ) | |
| GENERIC_TERMS = { | |
| "игра", "играть", "казино", "ставка", "деньга", "деньги", | |
| "демо", "режим", "уровень", "шаг", "выигрыш", "риск", | |
| "game", "play", "casino", "bet", "demo", "mode", "level", "risk", "win", | |
| } | |
| def _is_noise_sentence(text: str) -> bool: | |
| t = " ".join((text or "").lower().split()) | |
| if not t: | |
| return True | |
| if any(p in t for p in NOISE_PATTERNS): | |
| return True | |
| # Частые CTA/служебные короткие строки. | |
| if len(t.split()) <= 3 and t in {"играть", "play", "chicken road", "chicken road game"}: | |
| return True | |
| return False | |
| def _canonicalize_term(term: str) -> str: | |
| t = " ".join((term or "").lower().replace("ё", "е").replace("-", " ").split()) | |
| if not t: | |
| return t | |
| # Бренд/наименование игры. | |
| if t in {"чикен роад", "chicken road", "chickenroad", "chiken road"}: | |
| return "chicken road" | |
| if t in {"игры inout", "inout games", "inout game", "inout"}: | |
| return "inout games" | |
| # Полезная нормализация русской фразы под один термин. | |
| if t in {"реальные деньги", "реальный деньги"}: | |
| return "реальные деньги" | |
| return t | |
| def _extract_phrase_candidates(sentence_text: str, lang: str) -> List[str]: | |
| """ | |
| Извлекает фразовые кандидаты через существующую n-gram логику проекта, | |
| чтобы сохранить естественные сочетания со стоп-словами внутри. | |
| """ | |
| candidates: List[str] = [] | |
| for n in (2, 3): | |
| candidates.extend(logic.generate_ngrams_safe(sentence_text, lang, n)) | |
| return candidates | |
| def _normalize_lemma_sequence(lemmas: List[str]) -> List[str]: | |
| """ | |
| Убирает подряд идущие дубликаты, чтобы не рождать шумные | |
| термины вроде "дорожка дорожка" из технических повторов. | |
| """ | |
| if not lemmas: | |
| return [] | |
| normalized = [lemmas[0]] | |
| for item in lemmas[1:]: | |
| if item != normalized[-1]: | |
| normalized.append(item) | |
| return normalized | |
| def build_semantic_graph( | |
| sentences_data: List[Dict[str, Any]], | |
| window_size: int = 5, | |
| min_phrase_freq: int = 2, | |
| lang: str = "ru", | |
| ) -> Tuple[nx.DiGraph, Dict[str, int]]: | |
| """ | |
| Строит направленный граф из лемм и считает веса: | |
| - edge_weight: условная вероятность P(B|A) * 100 | |
| - node_weight: нормализованный PageRank 1..100 | |
| Версия v5: | |
| - связи считаются в локальном скользящем окне (а не "все со всеми") | |
| - условная вероятность считается строго как cooc(a,b)/occ(a), поэтому <= 100 | |
| - узлы графа: слова + устойчивые словосочетания (bi/tri-grams) | |
| - словосочетания извлекаются через Smart Window (как в основной логике n-gram) | |
| - повышаем вклад устойчивых фраз в итоговый смысловой вес понятия | |
| """ | |
| term_occ = Counter() | |
| pair_cooc = defaultdict(int) | |
| phrase_occ = Counter() | |
| phrase_sent_ids = defaultdict(set) | |
| term_sent_ids = defaultdict(set) | |
| sentence_words: List[List[str]] = [] | |
| sentence_phrases: List[List[str]] = [] | |
| for sent_id, sent in enumerate(sentences_data): | |
| raw_text = sent.get("raw_text", "") | |
| if _is_noise_sentence(raw_text): | |
| sentence_words.append([]) | |
| sentence_phrases.append([]) | |
| continue | |
| lemmas_raw = _extract_significant_lemmas(sent) | |
| lemmas = [_canonicalize_term(x) for x in _normalize_lemma_sequence(lemmas_raw)] | |
| lemmas = [x for x in lemmas if x] | |
| sentence_words.append(lemmas) | |
| phrase_candidates = _extract_phrase_candidates(raw_text, lang) | |
| # Фильтр мусора фраз: минимум 2 слова, без дублирующегося подряд слова. | |
| clean_phrases = [] | |
| for p in phrase_candidates: | |
| parts = [x.strip() for x in p.split() if x.strip()] | |
| if len(parts) < 2: | |
| continue | |
| bad_repeat = any(parts[i] == parts[i + 1] for i in range(len(parts) - 1)) | |
| if bad_repeat: | |
| continue | |
| clean_phrases.append(_canonicalize_term(" ".join(parts))) | |
| sentence_phrases.append(clean_phrases) | |
| if not lemmas: | |
| continue | |
| # Для каждого вхождения a считаем его локальный контекст в окне. | |
| # cooc(a,b) увеличивается максимум на 1 на одно вхождение a, | |
| # что гарантирует cooc(a,b) <= occ(a) и P(B|A) <= 100. | |
| for i, a in enumerate(lemmas): | |
| term_occ[a] += 1 | |
| left = max(0, i - max(1, window_size)) | |
| right = min(len(lemmas), i + max(1, window_size) + 1) | |
| neighbors = set() | |
| for j in range(left, right): | |
| if j == i: | |
| continue | |
| b = lemmas[j] | |
| if not b or a == b: | |
| continue | |
| neighbors.add(b) | |
| for b in neighbors: | |
| pair_cooc[(a, b)] += 1 | |
| for w in set(lemmas): | |
| term_sent_ids[w].add(sent_id) | |
| # Частоты/охват фраз по предложениям. | |
| for sent_id, phrases in enumerate(sentence_phrases): | |
| for phrase in phrases: | |
| phrase_occ[phrase] += 1 | |
| phrase_sent_ids[phrase].add(sent_id) | |
| # Оставляем только устойчивые фразы. | |
| allowed_phrases = { | |
| p for p, c in phrase_occ.items() | |
| if c >= max(1, min_phrase_freq) and len(phrase_sent_ids[p]) >= max(1, min_phrase_freq) | |
| } | |
| # Добавляем связи phrase <-> word на уровне предложения. | |
| for sent_id, words in enumerate(sentence_words): | |
| if not words: | |
| continue | |
| phrases = [p for p in sentence_phrases[sent_id] if p in allowed_phrases] | |
| if not phrases: | |
| continue | |
| uniq_words = set(words) | |
| uniq_phrases = set(phrases) | |
| for phrase in uniq_phrases: | |
| term_occ[phrase] += 1 | |
| term_sent_ids[phrase].add(sent_id) | |
| parts = set(phrase.split()) | |
| # Связываем фразу с ее компонентами всегда (ядро термина). | |
| for w in parts: | |
| pair_cooc[(phrase, w)] += 1 | |
| pair_cooc[(w, phrase)] += 1 | |
| # И с остальными словами предложения (контекст термина). | |
| for w in uniq_words: | |
| if w in parts: | |
| continue | |
| pair_cooc[(phrase, w)] += 1 | |
| pair_cooc[(w, phrase)] += 1 | |
| graph = nx.DiGraph() | |
| for term, freq in term_occ.items(): | |
| term_type = "phrase" if " " in term else "word" | |
| graph.add_node(term, frequency=int(freq), term_type=term_type) | |
| for (a, b), cooc in pair_cooc.items(): | |
| base = term_occ.get(a, 0) | |
| if base <= 0: | |
| continue | |
| weight = (cooc / base) * 100.0 | |
| weight = max(0.0, min(100.0, weight)) | |
| if weight > 0: | |
| graph.add_edge(a, b, weight=round(weight, 3)) | |
| if graph.number_of_nodes() == 0: | |
| return graph, {} | |
| try: | |
| pr = nx.pagerank(graph, weight="weight") | |
| except Exception: | |
| pr = {node: 1.0 for node in graph.nodes()} | |
| # Комбинируем centrality с termness-фактором. | |
| # Для слов: умеренный буст по контекстной связанности. | |
| # Для фраз: более сильный буст по частоте/охвату, чтобы устойчивые термины поднимались в топ. | |
| combined_scores = {} | |
| total_sent = max(1, len(sentence_words)) | |
| for node, score in pr.items(): | |
| out_deg = graph.out_degree(node) | |
| in_deg = graph.in_degree(node) | |
| connectivity_factor = 1.0 + 0.025 * (out_deg + in_deg) | |
| sent_df = len(term_sent_ids.get(node, set())) | |
| idf_factor = 1.0 + 0.35 * math.log((1.0 + total_sent) / (1.0 + max(1, sent_df))) | |
| if graph.nodes[node].get("term_type") == "phrase": | |
| freq = max(1, int(graph.nodes[node].get("frequency", 1))) | |
| sent_cover = len(phrase_sent_ids.get(node, set())) | |
| termness = (1.0 + 0.22 * math.log1p(freq)) * (1.0 + 0.12 * math.log1p(sent_cover)) | |
| contains_generic = any(part in GENERIC_TERMS for part in node.split()) | |
| generic_penalty = 0.85 if contains_generic else 1.0 | |
| combined_scores[node] = score * connectivity_factor * termness * idf_factor * generic_penalty | |
| else: | |
| generic_penalty = 0.58 if node in GENERIC_TERMS else 1.0 | |
| combined_scores[node] = score * connectivity_factor * idf_factor * generic_penalty | |
| node_weights = _normalize_to_1_100(combined_scores) | |
| for node, w in node_weights.items(): | |
| graph.nodes[node]["weight"] = int(w) | |
| return graph, node_weights | |
| def get_graph_data_for_frontend(graph: nx.DiGraph, top_edges_per_node: int = 8) -> Dict[str, List[Dict[str, Any]]]: | |
| nodes = [] | |
| links = [] | |
| for node, attrs in graph.nodes(data=True): | |
| nodes.append( | |
| { | |
| "id": node, | |
| "label": node, | |
| "weight": int(attrs.get("weight", 1)), | |
| "frequency": int(attrs.get("frequency", 0)), | |
| } | |
| ) | |
| for source in graph.nodes(): | |
| out_edges = sorted( | |
| graph.out_edges(source, data=True), | |
| key=lambda e: e[2].get("weight", 0), | |
| reverse=True, | |
| )[:top_edges_per_node] | |
| for s, t, attrs in out_edges: | |
| links.append( | |
| { | |
| "source": s, | |
| "target": t, | |
| "weight": float(attrs.get("weight", 0)), | |
| } | |
| ) | |
| return {"nodes": nodes, "links": links} | |
| def get_top_keywords(node_weights: Dict[str, int], top_n: int = 20) -> List[Dict[str, Any]]: | |
| items = sorted(node_weights.items(), key=lambda x: x[1], reverse=True)[:top_n] | |
| return [{"lemma": lemma, "weight": int(weight)} for lemma, weight in items] | |