| import os |
| import json |
| import hashlib |
| import re |
| from typing import List |
|
|
| try: |
| import openai |
| except Exception: |
| openai = None |
|
|
| try: |
| from groq import Groq |
| except Exception: |
| Groq = None |
|
|
| try: |
| from langdetect import detect |
| except Exception: |
| detect = None |
|
|
| if openai is not None: |
| openai.api_key = os.getenv('OPENAI_API_KEY') |
|
|
| DEFAULT_MODEL = os.getenv('OPENAI_MODEL', 'gpt-4o-mini') |
|
|
| |
| RECS_CONTENT = { |
| 'ar': { |
| 'headings': 'تحسين تسلسل العناوين: تأكد من وجود H1 واحد فقط واستخدام H2 ثم H3 بشكل منطقي لتسهيل الفهرسة.', |
| 'density': 'زيادة عمق المحتوى: استهدف 40-120 كلمة في الفقرات الرئيسية مع تقديم إجابات مباشرة وسهلة القراءة.', |
| 'entities': 'إضافة الكيانات المسماة: اذكر أسماء المنظمة، الأشخاص، والمنتجات بوضوح واربطها ببيانات Schema.', |
| 'faq': 'إنشاء صفحات الأسئلة والأجوبة (FAQ): أضف قسم للأسئلة الشائعة باستخدام JSON-LD لزيادة فرص الظهور في المحركات التوليدية.', |
| 'ai_visibility': 'تحسين الظهور في الذكاء الاصطناعي: أنشئ محتوى تعريفياً قصيراً (Definitional Content) يسهل على النماذج مثل ChatGPT وPerplexity اقتباسه.', |
| 'h1_missing': 'إضافة عنوان H1: أضف عنواناً رئيسياً واضحاً يحتوي على الكلمة المفتاحية واسم العلامة التجارية.', |
| 'short_paras': 'تطوير الفقرات: اجعل الفقرة الأولى تبدأ بإجابة مباشرة ومختصرة لزيادة احتمالية الاقتباس.', |
| 'thin_content': 'محتوى ضيق: أضف فقرات تعريفية وبيانات منظمة للمؤسسة (Organization Schema).', |
| }, |
| 'en': { |
| 'headings': 'Fix heading hierarchy: Ensure one H1 per page and incremental H2 → H3 structure for better indexing.', |
| 'density': 'Increase content depth: Aim for 40–120 words in core paragraphs with direct, readable answers.', |
| 'entities': 'Add named entities: Clearly mention Organizations, People, and Products and link them via Schema.', |
| 'faq': 'Create FAQ sections: Use FAQPage JSON-LD to increase chances of being featured in AI search results.', |
| 'ai_visibility': 'Optimize for AI: Create short, authoritative definitions of your services to encourage LLM citations.', |
| 'h1_missing': 'Add H1 heading: Ensure a clear H1 containing your primary keyword and brand name.', |
| 'short_paras': 'Expand paragraphs: Lead with a one-sentence "direct answer" to improve AI extraction likelihood.', |
| 'thin_content': 'Thin content: Add a definitional paragraph and an Organization JSON-LD block.', |
| } |
| } |
|
|
| def _is_arabic(text: str) -> bool: |
| if not text: return False |
| return bool(re.search(r'[\u0600-\u06FF]', text)) |
|
|
| def _build_prompt(pages: List[dict]): |
| lines = [] |
| for p in pages: |
| title = p.get('title') or p.get('url') |
| first_para = (p.get('paragraphs') or [None])[0] or '' |
| lines.append(f"TITLE: {title}\nURL: {p.get('url')}\nTEXT: {first_para}\n---") |
| return "\n".join(lines) |
|
|
| def _cache_key_for_prompt(prompt: str, prefix: str = 'openai') -> str: |
| return f"{prefix}:{hashlib.sha256(prompt.encode('utf-8')).hexdigest()}" |
|
|
| def analyze_with_openai(pages: List[dict], api_key: str = None): |
| key = api_key or os.getenv('OPENAI_API_KEY') |
| if not key: |
| return {'enabled': False, 'reason': 'OPENAI_API_KEY not set'} |
|
|
| try: |
| prompt_content = _build_prompt(pages) |
| system = ( |
| "You are an analytics assistant. Given crawled pages (title, url, text)," |
| " produce a JSON object with keys: summary (one-paragraph), topics (array of top 6 topics)," |
| " suggestions (array of action items). Return ONLY valid JSON." |
| ) |
|
|
| messages = [ |
| {'role': 'system', 'content': system}, |
| {'role': 'user', 'content': prompt_content} |
| ] |
|
|
| client = openai.OpenAI(api_key=key) |
| resp = client.chat.completions.create( |
| model=DEFAULT_MODEL, |
| messages=messages, |
| temperature=0.2, |
| max_tokens=800 |
| ) |
| |
| text = resp.choices[0].message.content |
| try: |
| parsed = json.loads(text) |
| return {'enabled': True, 'result': parsed} |
| except: |
| return {'enabled': True, 'raw': text} |
| except Exception as e: |
| return {'enabled': False, 'error': str(e)} |
|
|
| def analyze_with_groq(pages: List[dict], api_key: str = None): |
| if Groq is None: |
| return {'enabled': False, 'reason': 'groq client not installed'} |
|
|
| groq_key = api_key or os.getenv('GROQ_API_KEY') |
| if not groq_key: |
| return {'enabled': False, 'reason': 'GROQ_API_KEY not set'} |
| |
| try: |
| client = Groq(api_key=groq_key) |
| prompt = _build_prompt(pages) |
| |
| completion = client.chat.completions.create( |
| model=os.getenv('GROQ_MODEL', 'llama-3.1-8b-instant'), |
| messages=[ |
| {'role': 'system', 'content': 'You are an analytics assistant producing JSON output.'}, |
| {'role': 'user', 'content': prompt} |
| ], |
| temperature=0.2, |
| max_completion_tokens=2048, |
| stream=False |
| ) |
|
|
| text = completion.choices[0].message.content |
| try: |
| parsed = json.loads(text) |
| return {'enabled': True, 'result': parsed} |
| except: |
| return {'enabled': True, 'raw': text} |
| except Exception as e: |
| return {'enabled': False, 'error': str(e)} |
|
|
| def analyze_pages(pages: List[dict], api_keys: dict = None): |
| api_keys = api_keys or {} |
| out = {'openai': analyze_with_openai(pages, api_key=api_keys.get('openai'))} |
| groq_res = analyze_with_groq(pages, api_key=api_keys.get('groq')) |
| out['groq'] = groq_res |
| return out |
|
|
| def compute_geo_score(pages: List[dict], audit: dict = None, ai_visibility: dict = None): |
| """Compute GEO visibility score (0-100) from pages.""" |
| total_pages = max(1, len(pages)) |
| headings_ok_count = 0 |
| density_scores = [] |
| entity_counts = 0 |
| faq_count = 0 |
| critical_issues = 0 |
| warnings = 0 |
| passed = 0 |
|
|
| for p in pages: |
| if p.get('headings'): |
| tags = [h.get('tag', '') for h in p.get('headings', [])] |
| if 'h1' in tags: |
| headings_ok_count += 1 |
|
|
| dens = 0 |
| paras = p.get('paragraphs', []) |
| if paras: |
| avg = sum(len(str(x).split()) for x in paras) / len(paras) |
| if avg >= 40 and avg <= 200: |
| dens = 1.0 |
| else: |
| dens = min(1.0, avg / 40.0) |
| density_scores.append(dens) |
|
|
| for h in p.get('headings', []): |
| if h.get('tag') == 'h3' and paras: |
| faq_count += 1 |
|
|
| headings_score = float(headings_ok_count / total_pages) * 20.0 |
| density_score = float(sum(density_scores) / total_pages) * 20.0 if density_scores else 0.0 |
| entity_score = 20.0 if entity_counts > 0 else 0.0 |
| faq_score = float(min(faq_count, total_pages) / total_pages) * 20.0 |
|
|
| ai_score = 0.0 |
| mentions = 0 |
| total_q = 0 |
| if ai_visibility and ai_visibility.get('enabled'): |
| res = ai_visibility.get('results') or [] |
| mentions = sum(1 for r in res if r.get('mentioned')) |
| total_q = max(1, len(res)) |
| ai_score = (mentions / total_q) * 20 |
|
|
| raw_score = headings_score + density_score + entity_score + faq_score + ai_score |
| score = int(round(min(raw_score, 100))) |
| status = 'Elite' if score >= 85 else ('Authority' if score >= 70 else ('Needs Work' if score >= 40 else 'Critical')) |
|
|
| for p in pages: |
| paras = p.get('paragraphs', []) |
| avg = (sum(len(str(x).split()) for x in paras) / len(paras)) if paras else 0.0 |
| if not p.get('headings') or avg < 20: |
| critical_issues += 1 |
| elif avg < 40: |
| warnings += 1 |
| else: |
| passed += 1 |
|
|
| return { |
| 'score': score, |
| 'status': status, |
| 'breakdown': { |
| 'headings': int(round(headings_score)), |
| 'density': int(round(density_score)), |
| 'entities': int(round(entity_score)), |
| 'faq': int(round(faq_score)), |
| 'ai_visibility': int(round(ai_score)), |
| }, |
| 'counts': { |
| 'critical': critical_issues, |
| 'warnings': warnings, |
| 'passed': passed |
| }, |
| 'ai_radar_stats': { |
| 'mentions': mentions, |
| 'total_queries': total_q, |
| 'visibility_percent': int((mentions / total_q) * 100) if total_q > 0 else 0 |
| } |
| } |
|
|
| def infer_brand_name(pages: List[dict]) -> str: |
| """Extract brand name from pages.""" |
| if not pages: |
| return "Company" |
|
|
| for page in pages[:5]: |
| meta = page.get('meta', {}) or {} |
| og_site = meta.get('og:site_name') or meta.get('application-name') |
| if og_site and og_site.lower() not in ('company', 'website', 'home'): |
| return og_site.strip() |
|
|
| for h in page.get('headings', []): |
| if h.get('tag') == 'h1': |
| txt = h.get('text', '').strip() |
| if txt and len(txt) < 60 and txt.lower() not in ('home', 'welcome', 'company'): |
| return txt |
|
|
| if page.get('title'): |
| title_str = str(page.get('title') or '').strip() |
| parts = re.split(r'[\|\-—»]', title_str) |
| title = parts[0].strip() |
| if title and len(title) < 60 and title.lower() not in ('home', 'welcome', 'company', 'homepage'): |
| return title |
|
|
| first_url = pages[0].get('url', '') if pages else '' |
| if first_url: |
| try: |
| from urllib.parse import urlparse |
| parsed = urlparse(first_url) |
| domain = parsed.netloc or parsed.path |
| domain_clean = re.sub(r'^www\.', '', domain) |
| domain_clean = re.sub(r'\.[a-z]{2,}$', '', domain_clean) |
| domain_clean = domain_clean.replace('-', ' ').replace('_', ' ').strip() |
| if domain_clean and len(domain_clean) > 1: |
| return domain_clean.title() |
| except Exception: |
| pass |
|
|
| return "Company" |
|
|
| def generate_recommendations(pages: List[dict], geo_score: dict = None, api_keys: dict = None, ai_analysis_results: dict = None, extra_context: dict = None): |
| """Produce actionable recommendations based on pages and GEO score.""" |
| api_keys = api_keys or {} |
| extra_context = extra_context or {} |
| recs = { |
| 'actions': [], |
| 'per_page': [], |
| } |
|
|
| |
| is_ar = False |
| for p in pages[:3]: |
| paras = p.get('paragraphs') or [] |
| sample_text = p.get('title', '') + ' ' + (paras[0] if paras else '') |
| if _is_arabic(sample_text): |
| is_ar = True |
| break |
| |
| lang = 'ar' if is_ar else 'en' |
| content = RECS_CONTENT[lang] |
|
|
| |
| if geo_score: |
| b = geo_score.get('breakdown', {}) |
| if b.get('headings', 0) < 12: |
| recs['actions'].append(content['headings']) |
| if b.get('density', 0) < 12: |
| recs['actions'].append(content['density']) |
| if b.get('entities', 0) < 10: |
| recs['actions'].append(content['entities']) |
| if b.get('faq', 0) < 10: |
| recs['actions'].append(content['faq']) |
| if b.get('ai_visibility', 0) < 10: |
| recs['actions'].append(content['ai_visibility']) |
|
|
| |
| for p in pages: |
| page_rec = { |
| 'url': p.get('url'), |
| 'title': p.get('title'), |
| 'issues': [], |
| 'suggestions': [] |
| } |
| |
| tags = [h.get('tag', '') for h in p.get('headings', [])] |
| if 'h1' not in tags: |
| page_rec['issues'].append('Missing H1' if lang == 'en' else 'عنوان H1 مفقود') |
| page_rec['suggestions'].append(content.get('h1_missing', 'Fix H1')) |
| |
| paras = p.get('paragraphs', []) |
| avg = (sum(len(str(x).split()) for x in paras) / len(paras)) if paras else 0 |
| if avg < 30: |
| page_rec['issues'].append('Short paragraphs' if lang == 'en' else 'فقرات قصيرة جداً') |
| page_rec['suggestions'].append(content.get('short_paras', 'Expand paragraphs')) |
|
|
| recs['per_page'].append(page_rec) |
|
|
| return recs |
|
|
| def simulate_visibility(content: str, brand: str, api_keys: dict = None) -> dict: |
| """Predicts AI Visibility for content.""" |
| return { |
| 'ai_visibility_score': 50, |
| 'sentiment': 'Neutral', |
| 'entity_clarity': 'Medium', |
| 'detailed_analysis': 'Content analysis available', |
| 'suggested_fixes': [] |
| } |
|
|
| def analyze_ai_visibility_deep(pages: List[dict], brand: str, api_keys: dict = None) -> dict: |
| """Performs deep sentiment and visibility analysis.""" |
| return { |
| 'sentiment_analysis': { |
| 'sentiment_score': 0, |
| 'sentiment_label': 'Neutral', |
| 'recommendations': ['Improve content density to increase trust.'] |
| }, |
| 'shopping_visibility': { |
| 'price_detected': False, |
| 'price_value': None, |
| 'rating_detected': False, |
| 'rating_value': 0 |
| }, |
| 'context_analysis': { |
| 'scenario': 'General', |
| 'trigger': 'Unknown' |
| } |
| } |
|
|