| import re |
| from pathlib import Path |
| from html import unescape |
|
|
| try: |
| from .dataforseo_client import enrich_keywords |
| except ImportError: |
| def enrich_keywords(kws, **kwargs): |
| return [{'kw': k, 'volume': None, 'cpc': None} for k in kws] |
|
|
| try: |
| from .keyword_analytics import analyze_keywords, format_analytics_report |
| except ImportError: |
| def analyze_keywords(kws, **kwargs): |
| return {'summary': {}, 'top_keywords': kws} |
| def format_analytics_report(analytics): |
| return str(analytics) |
|
|
| try: |
| import spacy |
| _nlp = None |
| try: |
| _nlp = spacy.load('en_core_web_sm') |
| except Exception: |
| try: |
| _nlp = spacy.load('xx_ent_wiki_sm') |
| except Exception: |
| _nlp = None |
| except Exception: |
| _nlp = None |
|
|
| _STOPWORDS = set([ |
| |
| 'the','and','for','with','from','this','that','are','you','your','www','http','https','com','org','net', |
| 'page','pages','about','more','shop','home','contact','search','menu','cart','login','sign','account', |
| 'view','add','buy','price','click','here','read','more','next','back','new','all','get','use','our', |
| 'will','can','has','have','been','was','were','not','but','also','its','their','they','them','then', |
| |
| 'refund','return','returns','policy','policies','shipping','delivery','checkout','payment','order', |
| 'orders','invoice','receipt','terms','conditions','privacy','cookie','cookies','gdpr','legal', |
| 'copyright','rights','reserved','disclaimer','warranty','guarantee','exchange','cancel','cancellation', |
| 'subscribe','unsubscribe','newsletter','email','address','phone','fax','zip','postal','city','country', |
| 'days','hours','minutes','business','working','processing','handling','tracking','number','code', |
| 'please','note','important','information','details','contact','support','help','faq','questions', |
| |
| '14','30','60','90','100','200','404','500', |
| |
| 'ال','في','من','إلى','على','هذا','هذه','كان','كانت','لكن','أو','ولكن','حيث','عند','بعد','قبل', |
| 'مع','بين','حتى','لذلك','لهذا','كما','أيضا','فقط','جدا','جميع','كل','بعض','أي','لا','لم','لن','لما', |
| |
| 'سياسة','السياسة','الشحن','التوصيل','الإرجاع','الاسترداد','الدفع','الطلب','الطلبات', |
| 'الخصوصية','الشروط','الأحكام','التواصل','الدعم','المساعدة','الحساب','تسجيل','دخول', |
| ]) |
|
|
| |
| |
| _ADMIN_URL_PATTERNS = [ |
| '/privacy-policy', '/refund-policy', '/return-policy', '/shipping-policy', |
| '/terms-of-service', '/terms-and-conditions', '/cookie-policy', |
| '/checkout', '/cart', '/login', '/register', '/account', '/my-account', |
| '/sitemap.xml', '/robots.txt', '/feed', '/rss', |
| '/wp-admin', '/wp-json', '/xmlrpc.php', |
| '/disclaimer', '/legal', |
| ] |
|
|
| def _is_admin_page(url: str) -> bool: |
| """Check if a URL is an admin/legal page that should be excluded from keyword extraction. |
| Uses more precise matching to avoid false positives. |
| """ |
| url_lower = url.lower() |
| |
| |
| return any(pattern in url_lower for pattern in _ADMIN_URL_PATTERNS) |
|
|
| def _clean_html(text): |
| """Strip HTML tags and decode entities.""" |
| text = re.sub(r'<script[^>]*>.*?</script>', ' ', text, flags=re.DOTALL|re.IGNORECASE) |
| text = re.sub(r'<style[^>]*>.*?</style>', ' ', text, flags=re.DOTALL|re.IGNORECASE) |
| text = re.sub(r'<[^>]+>', ' ', text) |
| text = unescape(text) |
| return text |
|
|
| def extract_keywords_from_audit(audit_obj, top_n=20, enrich=False, analytics=False, expected_keywords=None): |
| """Extract candidate keywords from an audit object. |
| |
| Tries to use spaCy noun-chunk extraction when available, otherwise |
| falls back to simple regex tokenization and frequency counts. Returns |
| a list of dicts: {kw: <keyword>, count: <n>, volume: <int>, cpc: <float>} ordered by count desc. |
| |
| Args: |
| audit_obj: Audit dictionary with pages |
| top_n: Number of keywords to return |
| enrich: Whether to enrich with DataForSEO volume/CPC data (DISABLED - use SerpAPI instead) |
| analytics: Whether to return full analytics report |
| expected_keywords: List of expected keywords for coverage analysis |
| """ |
| pages = audit_obj.get('pages', []) if isinstance(audit_obj, dict) else [] |
| texts = [] |
| for p in pages: |
| |
| page_url = p.get('url', '') |
| if _is_admin_page(page_url): |
| continue |
| title = p.get('title', '') |
| headings = ' '.join(h.get('text', '') if isinstance(h, dict) else str(h) for h in p.get('headings', [])) |
| paras = p.get('paragraphs', []) |
| if isinstance(paras, list): |
| paras = ' '.join(par.get('text', '') if isinstance(par, dict) else str(par) for par in paras) |
| else: |
| paras = str(paras) |
| raw = p.get('text') or p.get('content') or p.get('html') or '' |
| cleaned = _clean_html(raw) |
| texts.append(f"{title} {title} {headings} {paras} {cleaned}") |
| combined = '\n'.join(texts) |
| if not combined.strip(): |
| return [] |
|
|
| |
| if _nlp is not None: |
| try: |
| doc = _nlp(combined[:100000]) |
| candidates = [] |
| for chunk in doc.noun_chunks: |
| txt = ' '.join([t.text for t in chunk if not t.is_stop]).strip().lower() |
| if len(txt) < 3 or txt in _STOPWORDS: |
| continue |
| candidates.append(txt) |
| for ent in doc.ents: |
| txt = ent.text.strip().lower() |
| if len(txt) < 3 or txt in _STOPWORDS: |
| continue |
| candidates.append(txt) |
| counts = {} |
| for c in candidates: |
| counts[c] = counts.get(c, 0) + 1 |
| items = sorted(counts.items(), key=lambda x: x[1], reverse=True) |
| results = [{'kw': k, 'count': v} for k, v in items] |
| |
| |
| if analytics: |
| total_words = len(combined.split()) |
| analytics_report = analyze_keywords(results, total_words=total_words, expected_keywords=expected_keywords) |
| |
| |
| if enrich and analytics_report['all_keywords']: |
| all_kws = analytics_report['all_keywords'] |
| enriched = enrich_keywords([r['kw'] for r in all_kws[:50]]) |
| enriched_map = {e['kw']: e for e in enriched} |
| |
| for r in all_kws: |
| if r['kw'] in enriched_map: |
| r.update(enriched_map[r['kw']]) |
| for r in analytics_report['top_keywords']: |
| if r['kw'] in enriched_map: |
| r.update(enriched_map[r['kw']]) |
| for category in ['primary', 'secondary', 'long_tail']: |
| for r in analytics_report['classification'][category]: |
| if r['kw'] in enriched_map: |
| r.update(enriched_map[r['kw']]) |
| for cluster_kws in analytics_report['clusters'].values(): |
| for r in cluster_kws: |
| if r['kw'] in enriched_map: |
| r.update(enriched_map[r['kw']]) |
| |
| return analytics_report |
| |
| |
| results = results[:top_n] |
| if enrich: |
| enriched = enrich_keywords([r['kw'] for r in results]) |
| enriched_map = {e['kw']: e for e in enriched} |
| for r in results: |
| if r['kw'] in enriched_map: |
| r.update(enriched_map[r['kw']]) |
| return results |
| except Exception: |
| pass |
|
|
| |
| words = re.findall(r"[\w\u0600-\u06FF]{3,}", combined.lower()) |
| |
| phrases = re.findall(r"\b([\w\u0600-\u06FF]+(?:\s+[\w\u0600-\u06FF]+){1,2})\b", combined.lower()) |
| counts = {} |
| for w in words: |
| if w in _STOPWORDS or len(w) < 3: |
| continue |
| counts[w] = counts.get(w, 0) + 1 |
| for p in phrases: |
| if len(p) > 5 and not any(s in p for s in _STOPWORDS): |
| counts[p] = counts.get(p, 0) + 2 |
| items = sorted(counts.items(), key=lambda x: x[1], reverse=True)[:top_n * 2] |
| |
| results = [{'kw': k, 'count': v} for k, v in items] |
| |
| |
| total_words = len(words) |
| |
| |
| if analytics: |
| analytics_report = analyze_keywords(results, total_words=total_words, expected_keywords=expected_keywords) |
| |
| |
| if enrich and analytics_report['all_keywords']: |
| |
| all_kws = analytics_report['all_keywords'] |
| enriched = enrich_keywords([r['kw'] for r in all_kws[:50]]) |
| enriched_map = {e['kw']: e for e in enriched} |
| |
| |
| for r in all_kws: |
| if r['kw'] in enriched_map: |
| r.update(enriched_map[r['kw']]) |
| |
| |
| for r in analytics_report['top_keywords']: |
| if r['kw'] in enriched_map: |
| r.update(enriched_map[r['kw']]) |
| |
| |
| for category in ['primary', 'secondary', 'long_tail']: |
| for r in analytics_report['classification'][category]: |
| if r['kw'] in enriched_map: |
| r.update(enriched_map[r['kw']]) |
| |
| |
| for cluster_kws in analytics_report['clusters'].values(): |
| for r in cluster_kws: |
| if r['kw'] in enriched_map: |
| r.update(enriched_map[r['kw']]) |
| |
| return analytics_report |
| |
| |
| results = results[:top_n] |
| |
| |
| if enrich and results: |
| enriched = enrich_keywords([r['kw'] for r in results]) |
| enriched_map = {e['kw']: e for e in enriched} |
| for r in results: |
| if r['kw'] in enriched_map: |
| r.update(enriched_map[r['kw']]) |
| |
| return results |
|
|