Spaces:
Running
Running
| import spacy | |
| from collections import Counter | |
| from typing import List, Dict | |
| import numpy as np | |
| from rank_bm25 import BM25Okapi | |
| import re | |
| # Новые импорты для BERT | |
| import torch | |
| from sentence_transformers import SentenceTransformer, util | |
| # --- Глобальные переменные --- | |
| LoadedModels = {} # spaCy модели | |
| BertModel = None # BERT модель (одна на все языки) | |
| MODEL_NAMES = { | |
| "en": "en_core_web_sm", | |
| "ru": "ru_core_news_sm", | |
| "de": "de_core_news_sm", | |
| "es": "es_core_news_sm", | |
| "it": "it_core_news_sm", | |
| "pl": "pl_core_news_sm", | |
| "pt": "pt_core_news_sm", | |
| } | |
| # --- SPACY (Лингвистика) --- | |
| def load_model_if_missing(lang: str): | |
| if lang in LoadedModels: return | |
| model_name = MODEL_NAMES.get(lang) | |
| if not model_name: return | |
| print(f"⏳ Loading spaCy model for {lang}...") | |
| try: | |
| LoadedModels[lang] = spacy.load(model_name) | |
| print(f"✅ Loaded spaCy: {lang}") | |
| except Exception as e: | |
| print(f"❌ Failed to load spaCy {lang}: {e}") | |
| def load_models(): | |
| """ | |
| Функция для предзагрузки всех моделей при старте (вызывается из main.py). | |
| """ | |
| print("🚀 Pre-loading all spaCy models...") | |
| for lang in MODEL_NAMES.keys(): | |
| load_model_if_missing(lang) | |
| def get_doc(text: str, lang: str): | |
| load_model_if_missing(lang) | |
| nlp = LoadedModels.get(lang) | |
| if not nlp: | |
| load_model_if_missing("en") | |
| nlp = LoadedModels.get("en") | |
| if not nlp: raise RuntimeError("No NLP models loaded.") | |
| nlp.max_length = 2000000 | |
| return nlp(text.lower()) | |
| # --- НОВАЯ ФУНКЦИЯ ФИЛЬТРАЦИИ --- | |
| def is_valid_token(t): | |
| """ | |
| Проверяет, является ли токен полезным словом. | |
| Исправленная версия: не удаляет слова из букв, даже если AI пометил их как символы. | |
| """ | |
| # 1. Базовые проверки spaCy (Стоп-слова, пунктуация, пробелы) | |
| if t.is_stop or t.is_punct or t.is_space: | |
| return False | |
| # 2. Числа (удаляем "18", "2023", "5") | |
| if t.is_digit or t.like_num: | |
| return False | |
| # 3. СИМВОЛЫ (ИСПРАВЛЕНИЕ) | |
| # Если spaCy говорит, что это символ (SYM), мы верим, ТОЛЬКО если это не буквы. | |
| # Это спасет слова типа "cross", "apk", "bet", которые могут быть ложно помечены. | |
| if t.pos_ == "SYM" and not t.text.isalpha(): | |
| return False | |
| # 4. Дополнительная страховка (явный мусор) | |
| garbage_chars = {'|', '+', '-', '—', '–', '>', '<', '=', '/', '\\', '★', '▶', '●', '•', '€', '$', '£'} | |
| if t.text.strip() in garbage_chars: | |
| return False | |
| # 5. Длина: Удаляем одиночные буквы, которые не являются словами | |
| # (опционально, но помогает чистить мусор типа "v", "s" если они не стоп-слова) | |
| if len(t.text) == 1 and not t.text.isalpha(): | |
| return False | |
| return True | |
| def get_lemmas_flat(text: str, lang: str) -> List[str]: | |
| """ | |
| Возвращает плоский список лемм для всего текста (нужен для BM25). | |
| """ | |
| if not text: return [] | |
| doc = get_doc(text, lang) | |
| # Используем нашу новую функцию фильтрации | |
| return [t.lemma_ for t in doc if is_valid_token(t)] | |
| def generate_ngrams_safe(text: str, lang: str, n: int) -> List[str]: | |
| """ | |
| Генерирует n-граммы. | |
| ЛОГИКА: Smart Window (Та самая, которая работает идеально). | |
| 1. Считает N-граммой последовательность из N ЗНАЧИМЫХ слов. | |
| 2. Сохраняет стоп-слова ВНУТРИ фразы (для читаемости: "bevor sie mit echtem geld"). | |
| 3. Игнорирует стоп-слова ПО КРАЯМ. | |
| """ | |
| if not text: return [] | |
| # ЕДИНСТВЕННОЕ ИЗМЕНЕНИЕ: Заменяем дефисы на пробелы, чтобы разлепить "Casino-Websites" | |
| text = re.sub(r'[\-\–\—\/]', ' ', text) | |
| # Дальше - ТА САМАЯ логика, которая работала идеально | |
| clean_text = " ".join(text.split()) | |
| doc = get_doc(clean_text, lang) | |
| all_ngrams = [] | |
| garbage_chars = {'|', '+', '-', '—', '–', '>', '<', '=', '/', '\\', '★', '▶', '●', '•', '€', '$', '£', '™', '®', '«', '»', '"', '(', ')', '[', ']', '!', '?', '.', ',', ':', ';'} | |
| for sent in doc.sents: | |
| # Разбираем предложение на токены | |
| tokens = [] | |
| for t in sent: | |
| # Стена | |
| is_wall = ( | |
| t.is_punct or t.is_space or t.pos_ == "SYM" or | |
| t.is_digit or t.like_num or | |
| t.text.strip() in garbage_chars | |
| ) | |
| if is_wall: | |
| tokens.append(None) # Маркер разрыва | |
| else: | |
| tokens.append({ | |
| "text": t.text.lower(), | |
| "is_stop": t.is_stop | |
| }) | |
| # Ищем последовательности (Smart Window) | |
| for i in range(len(tokens)): | |
| # Начало фразы не может быть стоп-словом или стеной | |
| if tokens[i] is None or tokens[i]["is_stop"]: | |
| continue | |
| current_phrase = [] | |
| significant_count = 0 | |
| for j in range(i, len(tokens)): | |
| token = tokens[j] | |
| if token is None: # Стена - конец фразы | |
| break | |
| current_phrase.append(token["text"]) | |
| # Если не стоп-слово, увеличиваем счетчик. | |
| # Если стоп-слово - просто добавляем в current_phrase, но счетчик стоит. | |
| # Это сохраняет "sie mit" внутри фразы. | |
| if not token["is_stop"]: | |
| significant_count += 1 | |
| # Если набрали N значимых слов | |
| if significant_count == n: | |
| ngram_str = " ".join(current_phrase) | |
| all_ngrams.append(ngram_str) | |
| break | |
| return all_ngrams | |
| # --- WORD COUNT --- | |
| def count_words(text: str, lang: str) -> Dict[str, int]: | |
| """ | |
| Считает слова. ПОЛНОСТЬЮ ИЗОЛИРОВАННАЯ ЛОГИКА. | |
| Не зависит от N-грамм или BM25. | |
| """ | |
| if not text.strip(): | |
| return {"total": 0, "significant": 0} | |
| # 1. Получаем чистый объект spaCy (без всяких re.sub и вырезаний) | |
| # Просто убираем лишние пробелы/энтеры | |
| clean_text = " ".join(text.split()) | |
| doc = get_doc(clean_text, lang) | |
| total_count = 0 | |
| significant_count = 0 | |
| for t in doc: | |
| # --- ФИЛЬТР ДЛЯ ОБЩЕГО КОЛИЧЕСТВА --- | |
| # Считаем всё, кроме пунктуации и пробелов | |
| if t.is_punct or t.is_space: | |
| continue | |
| total_count += 1 | |
| # --- ФИЛЬТР ДЛЯ ЗНАЧИМЫХ СЛОВ --- | |
| # Если это стоп-слово (der, die, das...) - не считаем в значимые | |
| if t.is_stop: | |
| continue | |
| # Если это цифра - не считаем | |
| if t.is_digit or t.like_num: | |
| continue | |
| # Если это спецсимвол - не считаем | |
| if t.pos_ == "SYM": | |
| continue | |
| significant_count += 1 | |
| return {"total": total_count, "significant": significant_count} | |
| # --- ANALYTICS (N-grams & BM25) --- | |
| def calculate_ngram_stats(target_text: str, competitor_texts: List[str], lang: str) -> Dict: | |
| stats = {} | |
| valid_competitors = [t for t in competitor_texts if t.strip()] | |
| for n in range(1, 5): | |
| key = {1: "unigrams", 2: "bigrams", 3: "trigrams", 4: "quadgrams"}[n] | |
| target_ngrams = generate_ngrams_safe(target_text, lang, n) | |
| target_counts = Counter(target_ngrams) | |
| comp_counters = [] | |
| all_comp_ngrams_combined = [] | |
| for t in valid_competitors: | |
| c_ngrams = generate_ngrams_safe(t, lang, n) | |
| comp_counters.append(Counter(c_ngrams)) | |
| all_comp_ngrams_combined.extend(c_ngrams) | |
| comp_counts_total = Counter(all_comp_ngrams_combined) | |
| all_unique = set(target_counts.keys()) | set(comp_counts_total.keys()) | |
| ngram_data = [] | |
| num_competitors = max(len(valid_competitors), 1) | |
| for ngram in all_unique: | |
| cnt_target = target_counts.get(ngram, 0) | |
| cnt_total_comp = comp_counts_total.get(ngram, 0) | |
| avg_comp = round(cnt_total_comp / num_competitors, 1) | |
| detailed_comp_counts = [c.get(ngram, 0) for c in comp_counters] | |
| # Считаем, у скольких конкурентов встречается эта фраза (> 0) | |
| comp_occurrence = sum(1 for count in detailed_comp_counts if count > 0) | |
| if cnt_target > 0 or avg_comp >= 0.5 or any(c > 0 for c in detailed_comp_counts): | |
| ngram_data.append({ | |
| "ngram": ngram, | |
| "target_count": cnt_target, | |
| "competitor_avg": avg_comp, | |
| "competitor_detailed": detailed_comp_counts, | |
| "comp_occurrence": comp_occurrence | |
| }) | |
| ngram_data.sort(key=lambda x: max([x["target_count"]] + x["competitor_detailed"]), reverse=True) | |
| stats[key] = ngram_data[:200] | |
| return stats | |
| def parse_keywords(raw_phrases: List[str], lang: str): | |
| key_phrases = [] | |
| keywords = set() | |
| for phrase in raw_phrases: | |
| if not phrase.strip(): continue | |
| lemmas = get_lemmas_flat(phrase, lang) | |
| if lemmas: | |
| key_phrases.append(phrase.strip()) # Для BERT храним исходную фразу, а не леммы! | |
| for w in lemmas: keywords.add(w) | |
| return list(key_phrases), list(keywords) | |
| def calculate_bm25_recommendations(target_text: str, competitor_texts: List[str], raw_keywords: List[str], lang: str): | |
| """ | |
| BM25. Использует ТУ ЖЕ функцию генерации, чтобы видеть то же, что и N-граммы. | |
| """ | |
| if not target_text or not raw_keywords: return [] | |
| recommendations = [] | |
| analyzed_keys = [] | |
| seen_terms = set() | |
| for phrase in raw_keywords: | |
| if not phrase.strip(): continue | |
| # Генерируем ключи той же функцией | |
| for n in range(1, 4): | |
| ngrams = generate_ngrams_safe(phrase, lang, n) | |
| for term in ngrams: | |
| if term not in seen_terms: | |
| analyzed_keys.append({"n": n, "term": term, "original": phrase}) | |
| seen_terms.add(term) | |
| for n in range(1, 4): | |
| current_n_keys = [k['term'] for k in analyzed_keys if k['n'] == n] | |
| if not current_n_keys: continue | |
| target_ngrams = generate_ngrams_safe(target_text, lang, n) | |
| comp_ngrams_list = [] | |
| for t in competitor_texts: | |
| if t.strip(): | |
| comp_ngrams_list.append(generate_ngrams_safe(t, lang, n)) | |
| else: | |
| comp_ngrams_list.append([]) | |
| corpus = [target_ngrams] + comp_ngrams_list | |
| bm25 = BM25Okapi(corpus) | |
| for term in current_n_keys: | |
| scores = bm25.get_scores([term]) | |
| score_target = scores[0] | |
| score_avg_comp = np.mean(scores[1:]) if len(scores) > 1 else 0 | |
| if n == 1: threshold = 0.5 | |
| elif n == 2: threshold = 0.25 | |
| else: threshold = 0.15 | |
| action = "ok" | |
| count_rec = 0 | |
| if score_target < score_avg_comp - threshold: | |
| action = "add" | |
| factor = 0.5 if n == 1 else 0.4 | |
| count_rec = max(1, int((score_avg_comp - score_target) * factor)) | |
| elif score_target > score_avg_comp + threshold * 2: | |
| action = "remove" | |
| factor = 0.5 | |
| count_rec = max(1, int((score_target - score_avg_comp) * factor)) | |
| recommendations.append({ | |
| "word": term, | |
| "type": f"{n}-gram", | |
| "my_score": round(score_target, 2), | |
| "avg_comp_score": round(score_avg_comp, 2), | |
| "action": action, | |
| "count": count_rec | |
| }) | |
| recommendations.sort(key=lambda x: (0 if x["action"] != "ok" else 1, -len(x["word"].split()), x["word"])) | |
| return recommendations | |
| # --- BERT / VECTOR ANALYSIS --- | |
| def get_bert_model(): | |
| """Загружает BERT на GPU, если он доступен""" | |
| global BertModel | |
| if BertModel is None: | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| print(f"🚀 Loading BERT model on {device}...") | |
| # Используем легкую и мощную мультиязычную модель | |
| BertModel = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2', device=device) | |
| print("✅ BERT Loaded successfully.") | |
| return BertModel | |
| def perform_bert_analysis(target_text: str, competitor_texts: List[str], key_phrases: List[str], lang: str): | |
| if not key_phrases: | |
| return {"detailed": [], "global_scores": []} | |
| model = get_bert_model() | |
| # 1. Функция-помощник: Получить чанки и их эмбеддинги | |
| def process_text(text): | |
| if not text.strip(): return [], None | |
| # Для BERT используем сырой текст (с абзацами), не пропуская через get_doc | |
| # чтобы сохранить структуру текста и контекст предложений | |
| load_model_if_missing(lang) | |
| nlp = LoadedModels.get(lang) | |
| if not nlp: | |
| load_model_if_missing("en") | |
| nlp = LoadedModels.get("en") | |
| nlp.max_length = 2000000 | |
| doc = nlp(text) # Сырой текст без .lower()! | |
| # Разбиваем на предложения > 10 символов | |
| chunks = [sent.text.strip() for sent in doc.sents if len(sent.text.strip()) > 10] | |
| if not chunks: return [], None | |
| embeddings = model.encode(chunks, convert_to_tensor=True) | |
| return chunks, embeddings | |
| # 2. Обрабатываем Наш текст | |
| target_chunks, target_emb = process_text(target_text) | |
| # 3. Обрабатываем Конкурентов (сохраняем структуру) | |
| competitors_data = [] | |
| for idx, comp_text in enumerate(competitor_texts): | |
| chunks, emb = process_text(comp_text) | |
| competitors_data.append({ | |
| "id": idx + 1, | |
| "chunks": chunks, | |
| "embeddings": emb | |
| }) | |
| # Эмбеддинги ключей | |
| keys_emb = model.encode(key_phrases, convert_to_tensor=True) | |
| # --- РАСЧЕТ GLOBAL SCORE --- | |
| # Global Score - это средний Max Score по всем ключевым словам. | |
| # То есть, насколько хорошо текст покрывает ВСЕ ключи в среднем. | |
| global_scores = [] | |
| # Считаем для нас | |
| if target_emb is not None: | |
| # Матрица [Key x Chunk] | |
| sims = util.cos_sim(keys_emb, target_emb) | |
| # Берем макс. сходство для каждого ключа (values), потом среднее по всем ключам | |
| # torch.max возвращает (values, indices) | |
| max_scores_per_key, _ = torch.max(sims, dim=1) | |
| avg_relevance = torch.mean(max_scores_per_key).item() | |
| global_scores.append({"name": "Мой текст", "score": round(avg_relevance, 3), "is_me": True}) | |
| else: | |
| global_scores.append({"name": "Мой текст", "score": 0, "is_me": True}) | |
| # Считаем для конкурентов | |
| for comp in competitors_data: | |
| if comp["embeddings"] is not None: | |
| sims = util.cos_sim(keys_emb, comp["embeddings"]) | |
| max_scores_per_key, _ = torch.max(sims, dim=1) | |
| avg_relevance = torch.mean(max_scores_per_key).item() | |
| global_scores.append({"name": f"Конкурент #{comp['id']}", "score": round(avg_relevance, 3), "is_me": False}) | |
| else: | |
| global_scores.append({"name": f"Конкурент #{comp['id']}", "score": 0, "is_me": False}) | |
| # Сортируем глобальный рейтинг (победитель сверху) | |
| global_scores.sort(key=lambda x: x["score"], reverse=True) | |
| # --- ДЕТАЛЬНЫЙ АНАЛИЗ ПО ФРАЗАМ --- | |
| detailed_results = [] | |
| for i, phrase in enumerate(key_phrases): | |
| # 1. Анализ моего текста | |
| my_top = [] | |
| my_max = 0 | |
| if target_emb is not None: | |
| # Считаем снова локально или берем из матрицы (тут проще локально для чистоты кода) | |
| # scores_target[i] уже посчитано выше в sims, но выше переменная sims переписывалась. | |
| # Для надежности пересчитаем векторную близость для одной фразы (это мгновенно) | |
| phrase_emb = keys_emb[i] | |
| scores = util.cos_sim(phrase_emb, target_emb)[0] # вектор [chunks] | |
| k = min(5, len(target_chunks)) | |
| vals, idxs = torch.topk(scores, k) | |
| my_max = vals[0].item() if k > 0 else 0 | |
| for rank in range(k): | |
| my_top.append({ | |
| "text": target_chunks[idxs[rank].item()], | |
| "score": round(vals[rank].item(), 3) | |
| }) | |
| # 2. Анализ конкурентов (Сборная солянка) | |
| # Собираем все чанки всех конкурентов с их скорами и ID | |
| all_comp_candidates = [] | |
| for comp in competitors_data: | |
| if comp["embeddings"] is not None: | |
| phrase_emb = keys_emb[i] | |
| scores = util.cos_sim(phrase_emb, comp["embeddings"])[0] | |
| # Берем топ-3 от каждого конкурента, чтобы добавить в общий пул | |
| k = min(3, len(comp["chunks"])) | |
| vals, idxs = torch.topk(scores, k) | |
| for rank in range(k): | |
| all_comp_candidates.append({ | |
| "text": comp["chunks"][idxs[rank].item()], | |
| "score": vals[rank].item(), | |
| "source": f"Конкурент #{comp['id']}" # <-- АТРИБУЦИЯ | |
| }) | |
| # Сортируем общий пул конкурентов и берем ТОП-5 абсолютных лидеров | |
| all_comp_candidates.sort(key=lambda x: x["score"], reverse=True) | |
| comp_top_5 = all_comp_candidates[:5] | |
| # Округляем скоры для вывода | |
| for item in comp_top_5: | |
| item["score"] = round(item["score"], 3) | |
| comp_max = comp_top_5[0]["score"] if comp_top_5 else 0 | |
| # Статус | |
| status = "ok" | |
| rec = "Тема раскрыта хорошо." | |
| if my_max < 0.5: | |
| status = "bad" | |
| rec = "Тема не раскрыта." | |
| elif comp_max > my_max + 0.1: | |
| status = "warning" | |
| rec = "Конкуренты раскрыли тему заметно лучше." | |
| elif my_max >= 0.7: | |
| status = "good" | |
| rec = "Отлично." | |
| detailed_results.append({ | |
| "phrase": phrase, | |
| "my_max_score": round(my_max, 2), | |
| "comp_max_score": round(comp_max, 2), | |
| "status": status, | |
| "recommendation": rec, | |
| "my_top_chunks": my_top, | |
| "comp_top_chunks": comp_top_5 | |
| }) | |
| return { | |
| "global_scores": global_scores, | |
| "detailed": detailed_results | |
| } | |
| # --- TITLE ANALYZER --- | |
| def analyze_title(target_title: str, competitor_titles: List[str], raw_keywords: List[str], lang: str) -> Dict: | |
| """ | |
| Orchestrator: анализирует Title тег. | |
| Возвращает словарь с 4 блоками: | |
| - length: длина в символах и словах | |
| - ngrams: частотный анализ (uni/bi) Target vs конкуренты | |
| - keyword_coverage: покрытие ключевых фраз в Title | |
| - bert: семантическая близость Title к ключам | |
| """ | |
| if not target_title.strip(): | |
| return {} | |
| valid_comp_titles = [t for t in competitor_titles if t.strip()] | |
| result = { | |
| "target_title": target_title.strip(), | |
| "competitor_titles": [t.strip() for t in valid_comp_titles], | |
| "length": _title_length(target_title, valid_comp_titles, lang), | |
| "ngrams": _title_ngrams(target_title, valid_comp_titles, lang), | |
| "keyword_coverage": _title_keyword_coverage(target_title, valid_comp_titles, raw_keywords, lang), | |
| "bert": _title_bert(target_title, valid_comp_titles, raw_keywords, lang), | |
| } | |
| return result | |
| def _title_length(target: str, comp_titles: List[str], lang: str) -> Dict: | |
| target_chars = len(target.strip()) | |
| target_wc = count_words(target, lang) | |
| comp_data = [] | |
| for t in comp_titles: | |
| comp_data.append({ | |
| "chars": len(t.strip()), | |
| "words": count_words(t, lang) | |
| }) | |
| avg_chars = round(sum(c["chars"] for c in comp_data) / max(len(comp_data), 1)) | |
| avg_total = round(sum(c["words"]["total"] for c in comp_data) / max(len(comp_data), 1)) | |
| status = "ok" | |
| if target_chars > 60: | |
| status = "too_long" | |
| elif target_chars < 30: | |
| status = "too_short" | |
| return { | |
| "target_chars": target_chars, | |
| "target_words": target_wc, | |
| "comp_data": comp_data, | |
| "avg_chars": avg_chars, | |
| "avg_words_total": avg_total, | |
| "status": status | |
| } | |
| def _title_ngrams(target: str, comp_titles: List[str], lang: str) -> Dict: | |
| stats = {} | |
| for n in [1, 2]: | |
| key = "unigrams" if n == 1 else "bigrams" | |
| target_ngrams = generate_ngrams_safe(target, lang, n) | |
| target_counts = Counter(target_ngrams) | |
| comp_counters = [] | |
| all_comp_combined = [] | |
| for t in comp_titles: | |
| c_ng = generate_ngrams_safe(t, lang, n) | |
| comp_counters.append(Counter(c_ng)) | |
| all_comp_combined.extend(c_ng) | |
| comp_total = Counter(all_comp_combined) | |
| all_unique = set(target_counts.keys()) | set(comp_total.keys()) | |
| ngram_data = [] | |
| num_comp = max(len(comp_titles), 1) | |
| for ngram in all_unique: | |
| cnt_t = target_counts.get(ngram, 0) | |
| cnt_c = comp_total.get(ngram, 0) | |
| avg_c = round(cnt_c / num_comp, 1) | |
| detailed = [c.get(ngram, 0) for c in comp_counters] | |
| occurrence = sum(1 for x in detailed if x > 0) | |
| if cnt_t > 0 or avg_c >= 0.3 or any(x > 0 for x in detailed): | |
| ngram_data.append({ | |
| "ngram": ngram, | |
| "target_count": cnt_t, | |
| "competitor_avg": avg_c, | |
| "competitor_detailed": detailed, | |
| "comp_occurrence": occurrence | |
| }) | |
| ngram_data.sort(key=lambda x: x["comp_occurrence"], reverse=True) | |
| stats[key] = ngram_data | |
| return stats | |
| def _title_keyword_coverage(target: str, comp_titles: List[str], raw_keywords: List[str], lang: str) -> List[Dict]: | |
| if not raw_keywords: | |
| return [] | |
| target_lower = target.lower() | |
| target_unigrams = set(generate_ngrams_safe(target, lang, 1)) | |
| target_bigrams = set(generate_ngrams_safe(target, lang, 2)) | |
| results = [] | |
| for phrase in raw_keywords: | |
| if not phrase.strip(): | |
| continue | |
| phrase_lower = phrase.strip().lower() | |
| exact_match = phrase_lower in target_lower | |
| phrase_uni = set(generate_ngrams_safe(phrase, lang, 1)) | |
| phrase_bi = set(generate_ngrams_safe(phrase, lang, 2)) | |
| uni_found = phrase_uni & target_unigrams | |
| bi_found = phrase_bi & target_bigrams | |
| if phrase_uni: | |
| word_coverage = round(len(uni_found) / len(phrase_uni) * 100) | |
| else: | |
| word_coverage = 0 | |
| comp_presence = 0 | |
| for ct in comp_titles: | |
| if phrase_lower in ct.lower(): | |
| comp_presence += 1 | |
| results.append({ | |
| "phrase": phrase.strip(), | |
| "exact_match": exact_match, | |
| "word_coverage": word_coverage, | |
| "words_found": sorted(list(uni_found)), | |
| "words_missing": sorted(list(phrase_uni - uni_found)), | |
| "bigrams_found": sorted(list(bi_found)), | |
| "comp_presence": comp_presence, | |
| "comp_total": len(comp_titles) | |
| }) | |
| return results | |
| def _title_bert(target: str, comp_titles: List[str], raw_keywords: List[str], lang: str) -> Dict: | |
| if not raw_keywords: | |
| return {"target_score": 0, "comp_scores": [], "per_keyword": []} | |
| model = get_bert_model() | |
| valid_keywords = [p.strip() for p in raw_keywords if p.strip()] | |
| if not valid_keywords: | |
| return {"target_score": 0, "comp_scores": [], "per_keyword": []} | |
| keys_emb = model.encode(valid_keywords, convert_to_tensor=True) | |
| target_emb = model.encode([target.strip()], convert_to_tensor=True) | |
| sims_target = util.cos_sim(keys_emb, target_emb).squeeze(1) | |
| target_per_kw = [round(s.item(), 3) for s in sims_target] | |
| target_avg = round(torch.mean(sims_target).item(), 3) | |
| comp_scores = [] | |
| for ct in comp_titles: | |
| ct_emb = model.encode([ct.strip()], convert_to_tensor=True) | |
| sims_c = util.cos_sim(keys_emb, ct_emb).squeeze(1) | |
| comp_scores.append(round(torch.mean(sims_c).item(), 3)) | |
| per_keyword = [] | |
| for i, kw in enumerate(valid_keywords): | |
| per_keyword.append({ | |
| "keyword": kw, | |
| "target_score": target_per_kw[i], | |
| "comp_scores": [round(util.cos_sim(keys_emb[i:i+1], model.encode([ct.strip()], convert_to_tensor=True))[0][0].item(), 3) for ct in comp_titles] | |
| }) | |
| return { | |
| "target_score": target_avg, | |
| "comp_scores": comp_scores, | |
| "per_keyword": per_keyword | |
| } |