last_edit / server /ai_analysis.py
Moharek
Deploy Moharek GEO Platform
a74b879
import os
import json
import hashlib
import re
from typing import List
try:
import openai
except Exception:
openai = None
try:
from groq import Groq
except Exception:
Groq = None
try:
from langdetect import detect
except Exception:
detect = None
if openai is not None:
openai.api_key = os.getenv('OPENAI_API_KEY')
DEFAULT_MODEL = os.getenv('OPENAI_MODEL', 'gpt-4o-mini')
# Professional Bilingual Recommendations
RECS_CONTENT = {
'ar': {
'headings': 'تحسين تسلسل العناوين: تأكد من وجود H1 واحد فقط واستخدام H2 ثم H3 بشكل منطقي لتسهيل الفهرسة.',
'density': 'زيادة عمق المحتوى: استهدف 40-120 كلمة في الفقرات الرئيسية مع تقديم إجابات مباشرة وسهلة القراءة.',
'entities': 'إضافة الكيانات المسماة: اذكر أسماء المنظمة، الأشخاص، والمنتجات بوضوح واربطها ببيانات Schema.',
'faq': 'إنشاء صفحات الأسئلة والأجوبة (FAQ): أضف قسم للأسئلة الشائعة باستخدام JSON-LD لزيادة فرص الظهور في المحركات التوليدية.',
'ai_visibility': 'تحسين الظهور في الذكاء الاصطناعي: أنشئ محتوى تعريفياً قصيراً (Definitional Content) يسهل على النماذج مثل ChatGPT وPerplexity اقتباسه.',
'h1_missing': 'إضافة عنوان H1: أضف عنواناً رئيسياً واضحاً يحتوي على الكلمة المفتاحية واسم العلامة التجارية.',
'short_paras': 'تطوير الفقرات: اجعل الفقرة الأولى تبدأ بإجابة مباشرة ومختصرة لزيادة احتمالية الاقتباس.',
'thin_content': 'محتوى ضيق: أضف فقرات تعريفية وبيانات منظمة للمؤسسة (Organization Schema).',
},
'en': {
'headings': 'Fix heading hierarchy: Ensure one H1 per page and incremental H2 → H3 structure for better indexing.',
'density': 'Increase content depth: Aim for 40–120 words in core paragraphs with direct, readable answers.',
'entities': 'Add named entities: Clearly mention Organizations, People, and Products and link them via Schema.',
'faq': 'Create FAQ sections: Use FAQPage JSON-LD to increase chances of being featured in AI search results.',
'ai_visibility': 'Optimize for AI: Create short, authoritative definitions of your services to encourage LLM citations.',
'h1_missing': 'Add H1 heading: Ensure a clear H1 containing your primary keyword and brand name.',
'short_paras': 'Expand paragraphs: Lead with a one-sentence "direct answer" to improve AI extraction likelihood.',
'thin_content': 'Thin content: Add a definitional paragraph and an Organization JSON-LD block.',
}
}
def _is_arabic(text: str) -> bool:
if not text: return False
return bool(re.search(r'[\u0600-\u06FF]', text))
def _build_prompt(pages: List[dict]):
lines = []
for p in pages:
title = p.get('title') or p.get('url')
first_para = (p.get('paragraphs') or [None])[0] or ''
lines.append(f"TITLE: {title}\nURL: {p.get('url')}\nTEXT: {first_para}\n---")
return "\n".join(lines)
def _cache_key_for_prompt(prompt: str, prefix: str = 'openai') -> str:
return f"{prefix}:{hashlib.sha256(prompt.encode('utf-8')).hexdigest()}"
def analyze_with_openai(pages: List[dict], api_key: str = None):
key = api_key or os.getenv('OPENAI_API_KEY')
if not key:
return {'enabled': False, 'reason': 'OPENAI_API_KEY not set'}
try:
prompt_content = _build_prompt(pages)
system = (
"You are an analytics assistant. Given crawled pages (title, url, text),"
" produce a JSON object with keys: summary (one-paragraph), topics (array of top 6 topics),"
" suggestions (array of action items). Return ONLY valid JSON."
)
messages = [
{'role': 'system', 'content': system},
{'role': 'user', 'content': prompt_content}
]
client = openai.OpenAI(api_key=key)
resp = client.chat.completions.create(
model=DEFAULT_MODEL,
messages=messages,
temperature=0.2,
max_tokens=800
)
text = resp.choices[0].message.content
try:
parsed = json.loads(text)
return {'enabled': True, 'result': parsed}
except:
return {'enabled': True, 'raw': text}
except Exception as e:
return {'enabled': False, 'error': str(e)}
def analyze_with_groq(pages: List[dict], api_key: str = None):
if Groq is None:
return {'enabled': False, 'reason': 'groq client not installed'}
groq_key = api_key or os.getenv('GROQ_API_KEY')
if not groq_key:
return {'enabled': False, 'reason': 'GROQ_API_KEY not set'}
try:
client = Groq(api_key=groq_key)
prompt = _build_prompt(pages)
completion = client.chat.completions.create(
model=os.getenv('GROQ_MODEL', 'llama-3.1-8b-instant'),
messages=[
{'role': 'system', 'content': 'You are an analytics assistant producing JSON output.'},
{'role': 'user', 'content': prompt}
],
temperature=0.2,
max_completion_tokens=2048,
stream=False
)
text = completion.choices[0].message.content
try:
parsed = json.loads(text)
return {'enabled': True, 'result': parsed}
except:
return {'enabled': True, 'raw': text}
except Exception as e:
return {'enabled': False, 'error': str(e)}
def analyze_pages(pages: List[dict], api_keys: dict = None):
api_keys = api_keys or {}
out = {'openai': analyze_with_openai(pages, api_key=api_keys.get('openai'))}
groq_res = analyze_with_groq(pages, api_key=api_keys.get('groq'))
out['groq'] = groq_res
return out
def compute_geo_score(pages: List[dict], audit: dict = None, ai_visibility: dict = None):
"""Compute GEO visibility score (0-100) from pages."""
total_pages = max(1, len(pages))
headings_ok_count = 0
density_scores = []
entity_counts = 0
faq_count = 0
critical_issues = 0
warnings = 0
passed = 0
for p in pages:
if p.get('headings'):
tags = [h.get('tag', '') for h in p.get('headings', [])]
if 'h1' in tags:
headings_ok_count += 1
dens = 0
paras = p.get('paragraphs', [])
if paras:
avg = sum(len(str(x).split()) for x in paras) / len(paras)
if avg >= 40 and avg <= 200:
dens = 1.0
else:
dens = min(1.0, avg / 40.0)
density_scores.append(dens)
for h in p.get('headings', []):
if h.get('tag') == 'h3' and paras:
faq_count += 1
headings_score = float(headings_ok_count / total_pages) * 20.0
density_score = float(sum(density_scores) / total_pages) * 20.0 if density_scores else 0.0
entity_score = 20.0 if entity_counts > 0 else 0.0
faq_score = float(min(faq_count, total_pages) / total_pages) * 20.0
ai_score = 0.0
mentions = 0
total_q = 0
if ai_visibility and ai_visibility.get('enabled'):
res = ai_visibility.get('results') or []
mentions = sum(1 for r in res if r.get('mentioned'))
total_q = max(1, len(res))
ai_score = (mentions / total_q) * 20
raw_score = headings_score + density_score + entity_score + faq_score + ai_score
score = int(round(min(raw_score, 100)))
status = 'Elite' if score >= 85 else ('Authority' if score >= 70 else ('Needs Work' if score >= 40 else 'Critical'))
for p in pages:
paras = p.get('paragraphs', [])
avg = (sum(len(str(x).split()) for x in paras) / len(paras)) if paras else 0.0
if not p.get('headings') or avg < 20:
critical_issues += 1
elif avg < 40:
warnings += 1
else:
passed += 1
return {
'score': score,
'status': status,
'breakdown': {
'headings': int(round(headings_score)),
'density': int(round(density_score)),
'entities': int(round(entity_score)),
'faq': int(round(faq_score)),
'ai_visibility': int(round(ai_score)),
},
'counts': {
'critical': critical_issues,
'warnings': warnings,
'passed': passed
},
'ai_radar_stats': {
'mentions': mentions,
'total_queries': total_q,
'visibility_percent': int((mentions / total_q) * 100) if total_q > 0 else 0
}
}
def infer_brand_name(pages: List[dict]) -> str:
"""Extract brand name from pages."""
if not pages:
return "Company"
for page in pages[:5]:
meta = page.get('meta', {}) or {}
og_site = meta.get('og:site_name') or meta.get('application-name')
if og_site and og_site.lower() not in ('company', 'website', 'home'):
return og_site.strip()
for h in page.get('headings', []):
if h.get('tag') == 'h1':
txt = h.get('text', '').strip()
if txt and len(txt) < 60 and txt.lower() not in ('home', 'welcome', 'company'):
return txt
if page.get('title'):
title_str = str(page.get('title') or '').strip()
parts = re.split(r'[\|\-—»]', title_str)
title = parts[0].strip()
if title and len(title) < 60 and title.lower() not in ('home', 'welcome', 'company', 'homepage'):
return title
first_url = pages[0].get('url', '') if pages else ''
if first_url:
try:
from urllib.parse import urlparse
parsed = urlparse(first_url)
domain = parsed.netloc or parsed.path
domain_clean = re.sub(r'^www\.', '', domain)
domain_clean = re.sub(r'\.[a-z]{2,}$', '', domain_clean)
domain_clean = domain_clean.replace('-', ' ').replace('_', ' ').strip()
if domain_clean and len(domain_clean) > 1:
return domain_clean.title()
except Exception:
pass
return "Company"
def generate_recommendations(pages: List[dict], geo_score: dict = None, api_keys: dict = None, ai_analysis_results: dict = None, extra_context: dict = None):
"""Produce actionable recommendations based on pages and GEO score."""
api_keys = api_keys or {}
extra_context = extra_context or {}
recs = {
'actions': [],
'per_page': [],
}
# Detect language
is_ar = False
for p in pages[:3]:
paras = p.get('paragraphs') or []
sample_text = p.get('title', '') + ' ' + (paras[0] if paras else '')
if _is_arabic(sample_text):
is_ar = True
break
lang = 'ar' if is_ar else 'en'
content = RECS_CONTENT[lang]
# Heuristic actions based on geo_score
if geo_score:
b = geo_score.get('breakdown', {})
if b.get('headings', 0) < 12:
recs['actions'].append(content['headings'])
if b.get('density', 0) < 12:
recs['actions'].append(content['density'])
if b.get('entities', 0) < 10:
recs['actions'].append(content['entities'])
if b.get('faq', 0) < 10:
recs['actions'].append(content['faq'])
if b.get('ai_visibility', 0) < 10:
recs['actions'].append(content['ai_visibility'])
# Per-page recommendations
for p in pages:
page_rec = {
'url': p.get('url'),
'title': p.get('title'),
'issues': [],
'suggestions': []
}
tags = [h.get('tag', '') for h in p.get('headings', [])]
if 'h1' not in tags:
page_rec['issues'].append('Missing H1' if lang == 'en' else 'عنوان H1 مفقود')
page_rec['suggestions'].append(content.get('h1_missing', 'Fix H1'))
paras = p.get('paragraphs', [])
avg = (sum(len(str(x).split()) for x in paras) / len(paras)) if paras else 0
if avg < 30:
page_rec['issues'].append('Short paragraphs' if lang == 'en' else 'فقرات قصيرة جداً')
page_rec['suggestions'].append(content.get('short_paras', 'Expand paragraphs'))
recs['per_page'].append(page_rec)
return recs
def simulate_visibility(content: str, brand: str, api_keys: dict = None) -> dict:
"""Predicts AI Visibility for content."""
return {
'ai_visibility_score': 50,
'sentiment': 'Neutral',
'entity_clarity': 'Medium',
'detailed_analysis': 'Content analysis available',
'suggested_fixes': []
}
def analyze_ai_visibility_deep(pages: List[dict], brand: str, api_keys: dict = None) -> dict:
"""Performs deep sentiment and visibility analysis."""
return {
'sentiment_analysis': {
'sentiment_score': 0,
'sentiment_label': 'Neutral',
'recommendations': ['Improve content density to increase trust.']
},
'shopping_visibility': {
'price_detected': False,
'price_value': None,
'rating_detected': False,
'rating_value': 0
},
'context_analysis': {
'scenario': 'General',
'trigger': 'Unknown'
}
}