last_edit / server /keyword_engine.py
Moharek
Deploy Moharek GEO Platform
a74b879
import re
from pathlib import Path
from html import unescape
try:
from .dataforseo_client import enrich_keywords
except ImportError:
def enrich_keywords(kws, **kwargs):
return [{'kw': k, 'volume': None, 'cpc': None} for k in kws]
try:
from .keyword_analytics import analyze_keywords, format_analytics_report
except ImportError:
def analyze_keywords(kws, **kwargs):
return {'summary': {}, 'top_keywords': kws}
def format_analytics_report(analytics):
return str(analytics)
try:
import spacy
_nlp = None
try:
_nlp = spacy.load('en_core_web_sm')
except Exception:
try:
_nlp = spacy.load('xx_ent_wiki_sm')
except Exception:
_nlp = None
except Exception:
_nlp = None
_STOPWORDS = set([
# English generic
'the','and','for','with','from','this','that','are','you','your','www','http','https','com','org','net',
'page','pages','about','more','shop','home','contact','search','menu','cart','login','sign','account',
'view','add','buy','price','click','here','read','more','next','back','new','all','get','use','our',
'will','can','has','have','been','was','were','not','but','also','its','their','they','them','then',
# Legal/admin pages - these pollute keyword extraction
'refund','return','returns','policy','policies','shipping','delivery','checkout','payment','order',
'orders','invoice','receipt','terms','conditions','privacy','cookie','cookies','gdpr','legal',
'copyright','rights','reserved','disclaimer','warranty','guarantee','exchange','cancel','cancellation',
'subscribe','unsubscribe','newsletter','email','address','phone','fax','zip','postal','city','country',
'days','hours','minutes','business','working','processing','handling','tracking','number','code',
'please','note','important','information','details','contact','support','help','faq','questions',
# Numbers and codes that appear in admin pages
'14','30','60','90','100','200','404','500',
# Arabic generic stopwords
'ال','في','من','إلى','على','هذا','هذه','كان','كانت','لكن','أو','ولكن','حيث','عند','بعد','قبل',
'مع','بين','حتى','لذلك','لهذا','كما','أيضا','فقط','جدا','جميع','كل','بعض','أي','لا','لم','لن','لما',
# Arabic admin/legal terms
'سياسة','السياسة','الشحن','التوصيل','الإرجاع','الاسترداد','الدفع','الطلب','الطلبات',
'الخصوصية','الشروط','الأحكام','التواصل','الدعم','المساعدة','الحساب','تسجيل','دخول',
])
# Pages that should be excluded from keyword extraction (admin/legal pages)
# More precise patterns to avoid false positives
_ADMIN_URL_PATTERNS = [
'/privacy-policy', '/refund-policy', '/return-policy', '/shipping-policy',
'/terms-of-service', '/terms-and-conditions', '/cookie-policy',
'/checkout', '/cart', '/login', '/register', '/account', '/my-account',
'/sitemap.xml', '/robots.txt', '/feed', '/rss',
'/wp-admin', '/wp-json', '/xmlrpc.php',
'/disclaimer', '/legal',
]
def _is_admin_page(url: str) -> bool:
"""Check if a URL is an admin/legal page that should be excluded from keyword extraction.
Uses more precise matching to avoid false positives.
"""
url_lower = url.lower()
# Only exclude if URL contains EXACT admin patterns (with slashes)
# This prevents excluding pages like "privacy" in "privacy-matters.html"
return any(pattern in url_lower for pattern in _ADMIN_URL_PATTERNS)
def _clean_html(text):
"""Strip HTML tags and decode entities."""
text = re.sub(r'<script[^>]*>.*?</script>', ' ', text, flags=re.DOTALL|re.IGNORECASE)
text = re.sub(r'<style[^>]*>.*?</style>', ' ', text, flags=re.DOTALL|re.IGNORECASE)
text = re.sub(r'<[^>]+>', ' ', text)
text = unescape(text)
return text
def extract_keywords_from_audit(audit_obj, top_n=20, enrich=False, analytics=False, expected_keywords=None):
"""Extract candidate keywords from an audit object.
Tries to use spaCy noun-chunk extraction when available, otherwise
falls back to simple regex tokenization and frequency counts. Returns
a list of dicts: {kw: <keyword>, count: <n>, volume: <int>, cpc: <float>} ordered by count desc.
Args:
audit_obj: Audit dictionary with pages
top_n: Number of keywords to return
enrich: Whether to enrich with DataForSEO volume/CPC data (DISABLED - use SerpAPI instead)
analytics: Whether to return full analytics report
expected_keywords: List of expected keywords for coverage analysis
"""
pages = audit_obj.get('pages', []) if isinstance(audit_obj, dict) else []
texts = []
for p in pages:
# Skip admin/legal pages - they pollute keyword extraction
page_url = p.get('url', '')
if _is_admin_page(page_url):
continue
title = p.get('title', '')
headings = ' '.join(h.get('text', '') if isinstance(h, dict) else str(h) for h in p.get('headings', []))
paras = p.get('paragraphs', [])
if isinstance(paras, list):
paras = ' '.join(par.get('text', '') if isinstance(par, dict) else str(par) for par in paras)
else:
paras = str(paras)
raw = p.get('text') or p.get('content') or p.get('html') or ''
cleaned = _clean_html(raw)
texts.append(f"{title} {title} {headings} {paras} {cleaned}")
combined = '\n'.join(texts)
if not combined.strip():
return []
# spaCy path
if _nlp is not None:
try:
doc = _nlp(combined[:100000]) # limit to avoid memory issues
candidates = []
for chunk in doc.noun_chunks:
txt = ' '.join([t.text for t in chunk if not t.is_stop]).strip().lower()
if len(txt) < 3 or txt in _STOPWORDS:
continue
candidates.append(txt)
for ent in doc.ents:
txt = ent.text.strip().lower()
if len(txt) < 3 or txt in _STOPWORDS:
continue
candidates.append(txt)
counts = {}
for c in candidates:
counts[c] = counts.get(c, 0) + 1
items = sorted(counts.items(), key=lambda x: x[1], reverse=True)
results = [{'kw': k, 'count': v} for k, v in items]
# If analytics requested, process through analytics
if analytics:
total_words = len(combined.split())
analytics_report = analyze_keywords(results, total_words=total_words, expected_keywords=expected_keywords)
# Enrich if requested
if enrich and analytics_report['all_keywords']:
all_kws = analytics_report['all_keywords']
enriched = enrich_keywords([r['kw'] for r in all_kws[:50]])
enriched_map = {e['kw']: e for e in enriched}
for r in all_kws:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
for r in analytics_report['top_keywords']:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
for category in ['primary', 'secondary', 'long_tail']:
for r in analytics_report['classification'][category]:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
for cluster_kws in analytics_report['clusters'].values():
for r in cluster_kws:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
return analytics_report
# Simple mode
results = results[:top_n]
if enrich:
enriched = enrich_keywords([r['kw'] for r in results])
enriched_map = {e['kw']: e for e in enriched}
for r in results:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
return results
except Exception:
pass
# simple regex fallback (supports Arabic letters too)
words = re.findall(r"[\w\u0600-\u06FF]{3,}", combined.lower())
# extract 2-3 word phrases
phrases = re.findall(r"\b([\w\u0600-\u06FF]+(?:\s+[\w\u0600-\u06FF]+){1,2})\b", combined.lower())
counts = {}
for w in words:
if w in _STOPWORDS or len(w) < 3:
continue
counts[w] = counts.get(w, 0) + 1
for p in phrases:
if len(p) > 5 and not any(s in p for s in _STOPWORDS):
counts[p] = counts.get(p, 0) + 2 # boost phrases
items = sorted(counts.items(), key=lambda x: x[1], reverse=True)[:top_n * 2] # Get more for analytics
results = [{'kw': k, 'count': v} for k, v in items]
# Calculate total words for density
total_words = len(words)
# Run analytics if requested
if analytics:
analytics_report = analyze_keywords(results, total_words=total_words, expected_keywords=expected_keywords)
# Enrich with DataForSEO if requested
if enrich and analytics_report['all_keywords']:
# Enrich all keywords
all_kws = analytics_report['all_keywords']
enriched = enrich_keywords([r['kw'] for r in all_kws[:50]]) # Enrich top 50
enriched_map = {e['kw']: e for e in enriched}
# Update all keyword lists with enrichment data
for r in all_kws:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
# Update top_keywords
for r in analytics_report['top_keywords']:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
# Update classification keywords
for category in ['primary', 'secondary', 'long_tail']:
for r in analytics_report['classification'][category]:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
# Update cluster keywords
for cluster_kws in analytics_report['clusters'].values():
for r in cluster_kws:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
return analytics_report
# Standard flow (no analytics)
results = results[:top_n]
# Enrich with DataForSEO if requested
if enrich and results:
enriched = enrich_keywords([r['kw'] for r in results])
enriched_map = {e['kw']: e for e in enriched}
for r in results:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
return results