import re from pathlib import Path from html import unescape try: from .dataforseo_client import enrich_keywords except ImportError: def enrich_keywords(kws, **kwargs): return [{'kw': k, 'volume': None, 'cpc': None} for k in kws] try: from .keyword_analytics import analyze_keywords, format_analytics_report except ImportError: def analyze_keywords(kws, **kwargs): return {'summary': {}, 'top_keywords': kws} def format_analytics_report(analytics): return str(analytics) try: import spacy _nlp = None try: _nlp = spacy.load('en_core_web_sm') except Exception: try: _nlp = spacy.load('xx_ent_wiki_sm') except Exception: _nlp = None except Exception: _nlp = None _STOPWORDS = set([ # English generic 'the','and','for','with','from','this','that','are','you','your','www','http','https','com','org','net', 'page','pages','about','more','shop','home','contact','search','menu','cart','login','sign','account', 'view','add','buy','price','click','here','read','more','next','back','new','all','get','use','our', 'will','can','has','have','been','was','were','not','but','also','its','their','they','them','then', # Legal/admin pages - these pollute keyword extraction 'refund','return','returns','policy','policies','shipping','delivery','checkout','payment','order', 'orders','invoice','receipt','terms','conditions','privacy','cookie','cookies','gdpr','legal', 'copyright','rights','reserved','disclaimer','warranty','guarantee','exchange','cancel','cancellation', 'subscribe','unsubscribe','newsletter','email','address','phone','fax','zip','postal','city','country', 'days','hours','minutes','business','working','processing','handling','tracking','number','code', 'please','note','important','information','details','contact','support','help','faq','questions', # Numbers and codes that appear in admin pages '14','30','60','90','100','200','404','500', # Arabic generic stopwords 'ال','في','من','إلى','على','هذا','هذه','كان','كانت','لكن','أو','ولكن','حيث','عند','بعد','قبل', 'مع','بين','حتى','لذلك','لهذا','كما','أيضا','فقط','جدا','جميع','كل','بعض','أي','لا','لم','لن','لما', # Arabic admin/legal terms 'سياسة','السياسة','الشحن','التوصيل','الإرجاع','الاسترداد','الدفع','الطلب','الطلبات', 'الخصوصية','الشروط','الأحكام','التواصل','الدعم','المساعدة','الحساب','تسجيل','دخول', ]) # Pages that should be excluded from keyword extraction (admin/legal pages) # More precise patterns to avoid false positives _ADMIN_URL_PATTERNS = [ '/privacy-policy', '/refund-policy', '/return-policy', '/shipping-policy', '/terms-of-service', '/terms-and-conditions', '/cookie-policy', '/checkout', '/cart', '/login', '/register', '/account', '/my-account', '/sitemap.xml', '/robots.txt', '/feed', '/rss', '/wp-admin', '/wp-json', '/xmlrpc.php', '/disclaimer', '/legal', ] def _is_admin_page(url: str) -> bool: """Check if a URL is an admin/legal page that should be excluded from keyword extraction. Uses more precise matching to avoid false positives. """ url_lower = url.lower() # Only exclude if URL contains EXACT admin patterns (with slashes) # This prevents excluding pages like "privacy" in "privacy-matters.html" return any(pattern in url_lower for pattern in _ADMIN_URL_PATTERNS) def _clean_html(text): """Strip HTML tags and decode entities.""" text = re.sub(r']*>.*?', ' ', text, flags=re.DOTALL|re.IGNORECASE) text = re.sub(r']*>.*?', ' ', text, flags=re.DOTALL|re.IGNORECASE) text = re.sub(r'<[^>]+>', ' ', text) text = unescape(text) return text def extract_keywords_from_audit(audit_obj, top_n=20, enrich=False, analytics=False, expected_keywords=None): """Extract candidate keywords from an audit object. Tries to use spaCy noun-chunk extraction when available, otherwise falls back to simple regex tokenization and frequency counts. Returns a list of dicts: {kw: , count: , volume: , cpc: } ordered by count desc. Args: audit_obj: Audit dictionary with pages top_n: Number of keywords to return enrich: Whether to enrich with DataForSEO volume/CPC data (DISABLED - use SerpAPI instead) analytics: Whether to return full analytics report expected_keywords: List of expected keywords for coverage analysis """ pages = audit_obj.get('pages', []) if isinstance(audit_obj, dict) else [] texts = [] for p in pages: # Skip admin/legal pages - they pollute keyword extraction page_url = p.get('url', '') if _is_admin_page(page_url): continue title = p.get('title', '') headings = ' '.join(h.get('text', '') if isinstance(h, dict) else str(h) for h in p.get('headings', [])) paras = p.get('paragraphs', []) if isinstance(paras, list): paras = ' '.join(par.get('text', '') if isinstance(par, dict) else str(par) for par in paras) else: paras = str(paras) raw = p.get('text') or p.get('content') or p.get('html') or '' cleaned = _clean_html(raw) texts.append(f"{title} {title} {headings} {paras} {cleaned}") combined = '\n'.join(texts) if not combined.strip(): return [] # spaCy path if _nlp is not None: try: doc = _nlp(combined[:100000]) # limit to avoid memory issues candidates = [] for chunk in doc.noun_chunks: txt = ' '.join([t.text for t in chunk if not t.is_stop]).strip().lower() if len(txt) < 3 or txt in _STOPWORDS: continue candidates.append(txt) for ent in doc.ents: txt = ent.text.strip().lower() if len(txt) < 3 or txt in _STOPWORDS: continue candidates.append(txt) counts = {} for c in candidates: counts[c] = counts.get(c, 0) + 1 items = sorted(counts.items(), key=lambda x: x[1], reverse=True) results = [{'kw': k, 'count': v} for k, v in items] # If analytics requested, process through analytics if analytics: total_words = len(combined.split()) analytics_report = analyze_keywords(results, total_words=total_words, expected_keywords=expected_keywords) # Enrich if requested if enrich and analytics_report['all_keywords']: all_kws = analytics_report['all_keywords'] enriched = enrich_keywords([r['kw'] for r in all_kws[:50]]) enriched_map = {e['kw']: e for e in enriched} for r in all_kws: if r['kw'] in enriched_map: r.update(enriched_map[r['kw']]) for r in analytics_report['top_keywords']: if r['kw'] in enriched_map: r.update(enriched_map[r['kw']]) for category in ['primary', 'secondary', 'long_tail']: for r in analytics_report['classification'][category]: if r['kw'] in enriched_map: r.update(enriched_map[r['kw']]) for cluster_kws in analytics_report['clusters'].values(): for r in cluster_kws: if r['kw'] in enriched_map: r.update(enriched_map[r['kw']]) return analytics_report # Simple mode results = results[:top_n] if enrich: enriched = enrich_keywords([r['kw'] for r in results]) enriched_map = {e['kw']: e for e in enriched} for r in results: if r['kw'] in enriched_map: r.update(enriched_map[r['kw']]) return results except Exception: pass # simple regex fallback (supports Arabic letters too) words = re.findall(r"[\w\u0600-\u06FF]{3,}", combined.lower()) # extract 2-3 word phrases phrases = re.findall(r"\b([\w\u0600-\u06FF]+(?:\s+[\w\u0600-\u06FF]+){1,2})\b", combined.lower()) counts = {} for w in words: if w in _STOPWORDS or len(w) < 3: continue counts[w] = counts.get(w, 0) + 1 for p in phrases: if len(p) > 5 and not any(s in p for s in _STOPWORDS): counts[p] = counts.get(p, 0) + 2 # boost phrases items = sorted(counts.items(), key=lambda x: x[1], reverse=True)[:top_n * 2] # Get more for analytics results = [{'kw': k, 'count': v} for k, v in items] # Calculate total words for density total_words = len(words) # Run analytics if requested if analytics: analytics_report = analyze_keywords(results, total_words=total_words, expected_keywords=expected_keywords) # Enrich with DataForSEO if requested if enrich and analytics_report['all_keywords']: # Enrich all keywords all_kws = analytics_report['all_keywords'] enriched = enrich_keywords([r['kw'] for r in all_kws[:50]]) # Enrich top 50 enriched_map = {e['kw']: e for e in enriched} # Update all keyword lists with enrichment data for r in all_kws: if r['kw'] in enriched_map: r.update(enriched_map[r['kw']]) # Update top_keywords for r in analytics_report['top_keywords']: if r['kw'] in enriched_map: r.update(enriched_map[r['kw']]) # Update classification keywords for category in ['primary', 'secondary', 'long_tail']: for r in analytics_report['classification'][category]: if r['kw'] in enriched_map: r.update(enriched_map[r['kw']]) # Update cluster keywords for cluster_kws in analytics_report['clusters'].values(): for r in cluster_kws: if r['kw'] in enriched_map: r.update(enriched_map[r['kw']]) return analytics_report # Standard flow (no analytics) results = results[:top_n] # Enrich with DataForSEO if requested if enrich and results: enriched = enrich_keywords([r['kw'] for r in results]) enriched_map = {e['kw']: e for e in enriched} for r in results: if r['kw'] in enriched_map: r.update(enriched_map[r['kw']]) return results