File size: 11,156 Bytes
a74b879 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 | import re
from pathlib import Path
from html import unescape
try:
from .dataforseo_client import enrich_keywords
except ImportError:
def enrich_keywords(kws, **kwargs):
return [{'kw': k, 'volume': None, 'cpc': None} for k in kws]
try:
from .keyword_analytics import analyze_keywords, format_analytics_report
except ImportError:
def analyze_keywords(kws, **kwargs):
return {'summary': {}, 'top_keywords': kws}
def format_analytics_report(analytics):
return str(analytics)
try:
import spacy
_nlp = None
try:
_nlp = spacy.load('en_core_web_sm')
except Exception:
try:
_nlp = spacy.load('xx_ent_wiki_sm')
except Exception:
_nlp = None
except Exception:
_nlp = None
_STOPWORDS = set([
# English generic
'the','and','for','with','from','this','that','are','you','your','www','http','https','com','org','net',
'page','pages','about','more','shop','home','contact','search','menu','cart','login','sign','account',
'view','add','buy','price','click','here','read','more','next','back','new','all','get','use','our',
'will','can','has','have','been','was','were','not','but','also','its','their','they','them','then',
# Legal/admin pages - these pollute keyword extraction
'refund','return','returns','policy','policies','shipping','delivery','checkout','payment','order',
'orders','invoice','receipt','terms','conditions','privacy','cookie','cookies','gdpr','legal',
'copyright','rights','reserved','disclaimer','warranty','guarantee','exchange','cancel','cancellation',
'subscribe','unsubscribe','newsletter','email','address','phone','fax','zip','postal','city','country',
'days','hours','minutes','business','working','processing','handling','tracking','number','code',
'please','note','important','information','details','contact','support','help','faq','questions',
# Numbers and codes that appear in admin pages
'14','30','60','90','100','200','404','500',
# Arabic generic stopwords
'ال','في','من','إلى','على','هذا','هذه','كان','كانت','لكن','أو','ولكن','حيث','عند','بعد','قبل',
'مع','بين','حتى','لذلك','لهذا','كما','أيضا','فقط','جدا','جميع','كل','بعض','أي','لا','لم','لن','لما',
# Arabic admin/legal terms
'سياسة','السياسة','الشحن','التوصيل','الإرجاع','الاسترداد','الدفع','الطلب','الطلبات',
'الخصوصية','الشروط','الأحكام','التواصل','الدعم','المساعدة','الحساب','تسجيل','دخول',
])
# Pages that should be excluded from keyword extraction (admin/legal pages)
# More precise patterns to avoid false positives
_ADMIN_URL_PATTERNS = [
'/privacy-policy', '/refund-policy', '/return-policy', '/shipping-policy',
'/terms-of-service', '/terms-and-conditions', '/cookie-policy',
'/checkout', '/cart', '/login', '/register', '/account', '/my-account',
'/sitemap.xml', '/robots.txt', '/feed', '/rss',
'/wp-admin', '/wp-json', '/xmlrpc.php',
'/disclaimer', '/legal',
]
def _is_admin_page(url: str) -> bool:
"""Check if a URL is an admin/legal page that should be excluded from keyword extraction.
Uses more precise matching to avoid false positives.
"""
url_lower = url.lower()
# Only exclude if URL contains EXACT admin patterns (with slashes)
# This prevents excluding pages like "privacy" in "privacy-matters.html"
return any(pattern in url_lower for pattern in _ADMIN_URL_PATTERNS)
def _clean_html(text):
"""Strip HTML tags and decode entities."""
text = re.sub(r'<script[^>]*>.*?</script>', ' ', text, flags=re.DOTALL|re.IGNORECASE)
text = re.sub(r'<style[^>]*>.*?</style>', ' ', text, flags=re.DOTALL|re.IGNORECASE)
text = re.sub(r'<[^>]+>', ' ', text)
text = unescape(text)
return text
def extract_keywords_from_audit(audit_obj, top_n=20, enrich=False, analytics=False, expected_keywords=None):
"""Extract candidate keywords from an audit object.
Tries to use spaCy noun-chunk extraction when available, otherwise
falls back to simple regex tokenization and frequency counts. Returns
a list of dicts: {kw: <keyword>, count: <n>, volume: <int>, cpc: <float>} ordered by count desc.
Args:
audit_obj: Audit dictionary with pages
top_n: Number of keywords to return
enrich: Whether to enrich with DataForSEO volume/CPC data (DISABLED - use SerpAPI instead)
analytics: Whether to return full analytics report
expected_keywords: List of expected keywords for coverage analysis
"""
pages = audit_obj.get('pages', []) if isinstance(audit_obj, dict) else []
texts = []
for p in pages:
# Skip admin/legal pages - they pollute keyword extraction
page_url = p.get('url', '')
if _is_admin_page(page_url):
continue
title = p.get('title', '')
headings = ' '.join(h.get('text', '') if isinstance(h, dict) else str(h) for h in p.get('headings', []))
paras = p.get('paragraphs', [])
if isinstance(paras, list):
paras = ' '.join(par.get('text', '') if isinstance(par, dict) else str(par) for par in paras)
else:
paras = str(paras)
raw = p.get('text') or p.get('content') or p.get('html') or ''
cleaned = _clean_html(raw)
texts.append(f"{title} {title} {headings} {paras} {cleaned}")
combined = '\n'.join(texts)
if not combined.strip():
return []
# spaCy path
if _nlp is not None:
try:
doc = _nlp(combined[:100000]) # limit to avoid memory issues
candidates = []
for chunk in doc.noun_chunks:
txt = ' '.join([t.text for t in chunk if not t.is_stop]).strip().lower()
if len(txt) < 3 or txt in _STOPWORDS:
continue
candidates.append(txt)
for ent in doc.ents:
txt = ent.text.strip().lower()
if len(txt) < 3 or txt in _STOPWORDS:
continue
candidates.append(txt)
counts = {}
for c in candidates:
counts[c] = counts.get(c, 0) + 1
items = sorted(counts.items(), key=lambda x: x[1], reverse=True)
results = [{'kw': k, 'count': v} for k, v in items]
# If analytics requested, process through analytics
if analytics:
total_words = len(combined.split())
analytics_report = analyze_keywords(results, total_words=total_words, expected_keywords=expected_keywords)
# Enrich if requested
if enrich and analytics_report['all_keywords']:
all_kws = analytics_report['all_keywords']
enriched = enrich_keywords([r['kw'] for r in all_kws[:50]])
enriched_map = {e['kw']: e for e in enriched}
for r in all_kws:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
for r in analytics_report['top_keywords']:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
for category in ['primary', 'secondary', 'long_tail']:
for r in analytics_report['classification'][category]:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
for cluster_kws in analytics_report['clusters'].values():
for r in cluster_kws:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
return analytics_report
# Simple mode
results = results[:top_n]
if enrich:
enriched = enrich_keywords([r['kw'] for r in results])
enriched_map = {e['kw']: e for e in enriched}
for r in results:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
return results
except Exception:
pass
# simple regex fallback (supports Arabic letters too)
words = re.findall(r"[\w\u0600-\u06FF]{3,}", combined.lower())
# extract 2-3 word phrases
phrases = re.findall(r"\b([\w\u0600-\u06FF]+(?:\s+[\w\u0600-\u06FF]+){1,2})\b", combined.lower())
counts = {}
for w in words:
if w in _STOPWORDS or len(w) < 3:
continue
counts[w] = counts.get(w, 0) + 1
for p in phrases:
if len(p) > 5 and not any(s in p for s in _STOPWORDS):
counts[p] = counts.get(p, 0) + 2 # boost phrases
items = sorted(counts.items(), key=lambda x: x[1], reverse=True)[:top_n * 2] # Get more for analytics
results = [{'kw': k, 'count': v} for k, v in items]
# Calculate total words for density
total_words = len(words)
# Run analytics if requested
if analytics:
analytics_report = analyze_keywords(results, total_words=total_words, expected_keywords=expected_keywords)
# Enrich with DataForSEO if requested
if enrich and analytics_report['all_keywords']:
# Enrich all keywords
all_kws = analytics_report['all_keywords']
enriched = enrich_keywords([r['kw'] for r in all_kws[:50]]) # Enrich top 50
enriched_map = {e['kw']: e for e in enriched}
# Update all keyword lists with enrichment data
for r in all_kws:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
# Update top_keywords
for r in analytics_report['top_keywords']:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
# Update classification keywords
for category in ['primary', 'secondary', 'long_tail']:
for r in analytics_report['classification'][category]:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
# Update cluster keywords
for cluster_kws in analytics_report['clusters'].values():
for r in cluster_kws:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
return analytics_report
# Standard flow (no analytics)
results = results[:top_n]
# Enrich with DataForSEO if requested
if enrich and results:
enriched = enrich_keywords([r['kw'] for r in results])
enriched_map = {e['kw']: e for e in enriched}
for r in results:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
return results
|