|
|
""" |
|
|
GEO Scoring Module |
|
|
Analyzes content for Generative Engine Optimization (GEO) performance |
|
|
""" |
|
|
|
|
|
import json |
|
|
from typing import Dict, Any, List |
|
|
from langchain.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate |
|
|
|
|
|
|
|
|
class GEOScorer: |
|
|
"""Main class for calculating GEO scores and analysis""" |
|
|
|
|
|
def __init__(self, llm): |
|
|
self.llm = llm |
|
|
self.setup_prompts() |
|
|
|
|
|
def setup_prompts(self): |
|
|
"""Initialize prompts for different types of analysis""" |
|
|
|
|
|
|
|
|
self.geo_analysis_prompt = ( |
|
|
"You are a Generative Engine Optimization (GEO) Specialist. Your task is to critically analyze the input content for its effectiveness in AI-powered search engines and large language model (LLM) systems. " |
|
|
"Evaluate the content using the following GEO criteria, assigning a score from 1 to 10 for each: \n\n" |
|
|
"1. AI Search Visibility - How likely is the content to be surfaced by AI search engines?\n" |
|
|
"2. Query Intent Matching - How well does the content align with common user queries?\n" |
|
|
"3. Factual Accuracy & Authority - How trustworthy and authoritative is the information?\n" |
|
|
"4. Conversational Readiness - Is the content well-suited for AI chat responses?\n" |
|
|
"5. Semantic Richness - Does the content effectively use relevant semantic keywords?\n" |
|
|
"6. Context Completeness - Is the content self-contained and does it provide complete answers?\n" |
|
|
"7. Citation Worthiness - How likely is the content to be cited by AI systems?\n" |
|
|
"8. Multi-Query Coverage - Does the content address multiple related questions?\n\n" |
|
|
"Also provide:\n" |
|
|
"- Key topics and entities mentioned\n" |
|
|
"- Missing information or content gaps\n" |
|
|
"- Specific optimization opportunities\n" |
|
|
"- Actionable enhancement recommendations\n\n" |
|
|
"Respond strictly in JSON format using the structure below (double curly braces shown here to escape string formatting, do NOT include them in actual output):\n\n" |
|
|
"{{\n" |
|
|
" \"geo_scores\": {{\n" |
|
|
" \"ai_search_visibility\": 0.0,\n" |
|
|
" \"query_intent_matching\": 0.0,\n" |
|
|
" \"factual_accuracy\": 0.0,\n" |
|
|
" \"conversational_readiness\": 0.0,\n" |
|
|
" \"semantic_richness\": 0.0,\n" |
|
|
" \"context_completeness\": 0.0,\n" |
|
|
" \"citation_worthiness\": 0.0,\n" |
|
|
" \"multi_query_coverage\": 0.0\n" |
|
|
" }},\n" |
|
|
" \"overall_geo_score\": 0.0,\n" |
|
|
" \"primary_topics\": [\"topic1\", \"topic2\"],\n" |
|
|
" \"entities\": [\"entity1\", \"entity2\"],\n" |
|
|
" \"missing_gaps\": [\"gap1\", \"gap2\"],\n" |
|
|
" \"optimization_opportunities\": [\n" |
|
|
" {{\n" |
|
|
" \"type\": \"semantic_enhancement\",\n" |
|
|
" \"description\": \"Describe the improvement opportunity\",\n" |
|
|
" \"priority\": \"high\"\n" |
|
|
" }}\n" |
|
|
" ],\n" |
|
|
" \"recommendations\": [\n" |
|
|
" \"Write clear and specific suggestions to improve the content\"\n" |
|
|
" ]\n" |
|
|
"}}" |
|
|
) |
|
|
|
|
|
|
|
|
self.quick_score_prompt = ( |
|
|
"You are an AI Search Optimization Analyst. Evaluate the given content and provide a quick scoring based on key criteria.\n" |
|
|
"Rate each of the following from 1 to 10:\n" |
|
|
"1. AI Search Visibility\n" |
|
|
"2. Query Intent Matching\n" |
|
|
"3. Conversational Readiness\n" |
|
|
"4. Citation Worthiness\n\n" |
|
|
"{{\n" |
|
|
" \"scores\": {{\n" |
|
|
" \"ai_search_visibility\": 0.0,\n" |
|
|
" \"query_intent_matching\": 0.0,\n" |
|
|
" \"conversational_readiness\": 0.0,\n" |
|
|
" \"citation_worthiness\": 0.0\n" |
|
|
" }},\n" |
|
|
" \"overall_score\": 0.0,\n" |
|
|
" \"top_recommendation\": \"Provide the most critical improvement needed\"\n" |
|
|
"}}" |
|
|
) |
|
|
|
|
|
|
|
|
self.competitive_prompt = ( |
|
|
"Compare these content pieces for GEO performance. Identify which performs better for AI search and why.\n" |
|
|
"Content A: {content_a}\n" |
|
|
"Content B: {content_b}\n" |
|
|
"Provide analysis in JSON:\n" |
|
|
"{{\n" |
|
|
" \"winner\": \"A\" or \"B\",\n" |
|
|
" \"score_comparison\": {{\n" |
|
|
" \"content_a_score\": 7.5,\n" |
|
|
" \"content_b_score\": 8.2\n" |
|
|
" }},\n" |
|
|
" \"key_differences\": [\"difference1\", \"difference2\"],\n" |
|
|
" \"improvement_suggestions\": {{\n" |
|
|
" \"content_a\": [\"suggestion1\"],\n" |
|
|
" \"content_b\": [\"suggestion1\"]\n" |
|
|
" }}\n" |
|
|
"}}" |
|
|
) |
|
|
|
|
|
def analyze_page_geo(self, content: str, title: str, detailed: bool = True) -> Dict[str, Any]: |
|
|
""" |
|
|
Analyze a single page for GEO performance |
|
|
""" |
|
|
try: |
|
|
|
|
|
if detailed: |
|
|
system_prompt = self.geo_analysis_prompt |
|
|
user_message = f"Title: {title}\n\nContent: {content[:8000]}" |
|
|
else: |
|
|
system_prompt = self.quick_score_prompt |
|
|
user_message = f"Title: {title}\n\nContent: {content[:4000]}" |
|
|
|
|
|
|
|
|
prompt_template = ChatPromptTemplate.from_messages([ |
|
|
SystemMessagePromptTemplate.from_template(system_prompt), |
|
|
HumanMessagePromptTemplate.from_template(user_message) |
|
|
]) |
|
|
|
|
|
|
|
|
chain = prompt_template | self.llm |
|
|
result = chain.invoke({}) |
|
|
|
|
|
|
|
|
result_content = result.content if hasattr(result, 'content') else str(result) |
|
|
parsed_result = self._parse_llm_response(result_content) |
|
|
|
|
|
|
|
|
parsed_result.update({ |
|
|
'analyzed_title': title, |
|
|
'content_length': len(content), |
|
|
'word_count': len(content.split()), |
|
|
'analysis_type': 'detailed' if detailed else 'quick' |
|
|
}) |
|
|
|
|
|
return parsed_result |
|
|
|
|
|
except Exception as e: |
|
|
return {'error': f"GEO analysis failed: {str(e)}"} |
|
|
|
|
|
def analyze_multiple_pages(self, pages_data: List[Dict[str, Any]], detailed: bool = True) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Analyze multiple pages and return consolidated results |
|
|
|
|
|
Args: |
|
|
pages_data (List[Dict]): List of page data with content and metadata |
|
|
detailed (bool): Whether to perform detailed analysis |
|
|
|
|
|
Returns: |
|
|
List[Dict]: List of GEO analysis results |
|
|
""" |
|
|
results = [] |
|
|
|
|
|
for i, page_data in enumerate(pages_data): |
|
|
try: |
|
|
content = page_data.get('content', '') |
|
|
title = page_data.get('title', f'Page {i+1}') |
|
|
|
|
|
analysis = self.analyze_page_geo(content, title, detailed) |
|
|
|
|
|
|
|
|
analysis.update({ |
|
|
'page_url': page_data.get('url', ''), |
|
|
'page_index': i, |
|
|
'source_word_count': page_data.get('word_count', 0) |
|
|
}) |
|
|
|
|
|
results.append(analysis) |
|
|
|
|
|
except Exception as e: |
|
|
results.append({ |
|
|
'page_index': i, |
|
|
'page_url': page_data.get('url', ''), |
|
|
'error': f"Analysis failed: {str(e)}" |
|
|
}) |
|
|
|
|
|
return results |
|
|
|
|
|
def compare_content_geo(self, content_a: str, content_b: str, titles: tuple = None) -> Dict[str, Any]: |
|
|
""" |
|
|
Compare two pieces of content for GEO performance |
|
|
|
|
|
Args: |
|
|
content_a (str): First content to compare |
|
|
content_b (str): Second content to compare |
|
|
titles (tuple): Optional titles for the content pieces |
|
|
|
|
|
Returns: |
|
|
Dict: Comparison analysis results |
|
|
""" |
|
|
try: |
|
|
title_a, title_b = titles if titles else ("Content A", "Content B") |
|
|
|
|
|
prompt_template = ChatPromptTemplate.from_messages([ |
|
|
("system", self.competitive_prompt), |
|
|
("user", "") |
|
|
]) |
|
|
|
|
|
|
|
|
formatted_prompt = self.competitive_prompt.format( |
|
|
content_a=f"Title: {title_a}\nContent: {content_a[:4000]}", |
|
|
content_b=f"Title: {title_b}\nContent: {content_b[:4000]}" |
|
|
) |
|
|
|
|
|
chain = ChatPromptTemplate.from_messages([ |
|
|
("system", formatted_prompt), |
|
|
("user", "Perform the comparison analysis.") |
|
|
]) | self.llm |
|
|
|
|
|
result = chain.invoke({}) |
|
|
result_content = result.content if hasattr(result, 'content') else str(result) |
|
|
|
|
|
return self._parse_llm_response(result_content) |
|
|
|
|
|
except Exception as e: |
|
|
return {'error': f"Comparison analysis failed: {str(e)}"} |
|
|
|
|
|
def calculate_aggregate_scores(self, individual_results: List[Dict[str, Any]]) -> Dict[str, Any]: |
|
|
""" |
|
|
Calculate aggregate GEO scores from multiple page analyses |
|
|
|
|
|
Args: |
|
|
individual_results (List[Dict]): List of individual page analysis results |
|
|
|
|
|
Returns: |
|
|
Dict: Aggregate scores and insights |
|
|
""" |
|
|
try: |
|
|
valid_results = [r for r in individual_results if 'geo_scores' in r and not r.get('error')] |
|
|
|
|
|
if not valid_results: |
|
|
return {'error': 'No valid results to aggregate'} |
|
|
|
|
|
|
|
|
score_keys = list(valid_results[0]['geo_scores'].keys()) |
|
|
avg_scores = {} |
|
|
|
|
|
for key in score_keys: |
|
|
scores = [r['geo_scores'][key] for r in valid_results if key in r['geo_scores']] |
|
|
avg_scores[key] = sum(scores) / len(scores) if scores else 0 |
|
|
|
|
|
overall_avg = sum(avg_scores.values()) / len(avg_scores) if avg_scores else 0 |
|
|
|
|
|
|
|
|
all_recommendations = [] |
|
|
all_opportunities = [] |
|
|
all_topics = [] |
|
|
all_entities = [] |
|
|
|
|
|
for result in valid_results: |
|
|
all_recommendations.extend(result.get('recommendations', [])) |
|
|
all_opportunities.extend(result.get('optimization_opportunities', [])) |
|
|
all_topics.extend(result.get('primary_topics', [])) |
|
|
all_entities.extend(result.get('entities', [])) |
|
|
|
|
|
|
|
|
unique_recommendations = list(set(all_recommendations)) |
|
|
unique_topics = list(set(all_topics)) |
|
|
unique_entities = list(set(all_entities)) |
|
|
|
|
|
|
|
|
best_score = max(avg_scores.items(), key=lambda x: x[1]) if avg_scores else ('none', 0) |
|
|
worst_score = min(avg_scores.items(), key=lambda x: x[1]) if avg_scores else ('none', 0) |
|
|
|
|
|
return { |
|
|
'aggregate_scores': avg_scores, |
|
|
'overall_score': overall_avg, |
|
|
'pages_analyzed': len(valid_results), |
|
|
'best_performing_metric': { |
|
|
'metric': best_score[0], |
|
|
'score': best_score[1] |
|
|
}, |
|
|
'lowest_performing_metric': { |
|
|
'metric': worst_score[0], |
|
|
'score': worst_score[1] |
|
|
}, |
|
|
'consolidated_recommendations': unique_recommendations[:10], |
|
|
'all_topics': unique_topics, |
|
|
'all_entities': unique_entities, |
|
|
'high_priority_opportunities': [ |
|
|
opp for opp in all_opportunities |
|
|
if opp.get('priority') == 'high' |
|
|
][:5], |
|
|
'score_distribution': self._calculate_score_distribution(avg_scores) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return {'error': f"Aggregation failed: {str(e)}"} |
|
|
|
|
|
def generate_geo_report(self, analysis_results: Dict[str, Any], website_url: str = None) -> Dict[str, Any]: |
|
|
""" |
|
|
Generate a comprehensive GEO report |
|
|
|
|
|
Args: |
|
|
analysis_results (Dict): Results from aggregate analysis |
|
|
website_url (str): Optional website URL for context |
|
|
|
|
|
Returns: |
|
|
Dict: Comprehensive GEO report |
|
|
""" |
|
|
try: |
|
|
report = { |
|
|
'report_metadata': { |
|
|
'generated_at': self._get_timestamp(), |
|
|
'website_url': website_url, |
|
|
'analysis_type': 'GEO Performance Report' |
|
|
}, |
|
|
'executive_summary': self._generate_executive_summary(analysis_results), |
|
|
'detailed_scores': analysis_results.get('aggregate_scores', {}), |
|
|
'performance_insights': self._generate_performance_insights(analysis_results), |
|
|
'actionable_recommendations': self._prioritize_recommendations( |
|
|
analysis_results.get('consolidated_recommendations', []) |
|
|
), |
|
|
'optimization_roadmap': self._create_optimization_roadmap(analysis_results), |
|
|
'competitive_position': self._assess_competitive_position(analysis_results), |
|
|
'technical_details': { |
|
|
'pages_analyzed': analysis_results.get('pages_analyzed', 0), |
|
|
'overall_score': analysis_results.get('overall_score', 0), |
|
|
'score_distribution': analysis_results.get('score_distribution', {}) |
|
|
} |
|
|
} |
|
|
|
|
|
return report |
|
|
|
|
|
except Exception as e: |
|
|
return {'error': f"Report generation failed: {str(e)}"} |
|
|
|
|
|
def _parse_llm_response(self, response_text: str) -> Dict[str, Any]: |
|
|
"""Parse LLM response and extract JSON content""" |
|
|
try: |
|
|
|
|
|
json_start = response_text.find('{') |
|
|
json_end = response_text.rfind('}') + 1 |
|
|
|
|
|
if json_start != -1 and json_end != -1: |
|
|
json_str = response_text[json_start:json_end] |
|
|
return json.loads(json_str) |
|
|
else: |
|
|
|
|
|
return {'raw_response': response_text, 'parsing_error': 'No JSON found'} |
|
|
|
|
|
except json.JSONDecodeError as e: |
|
|
return {'raw_response': response_text, 'parsing_error': f'JSON decode error: {str(e)}'} |
|
|
except Exception as e: |
|
|
return {'raw_response': response_text, 'parsing_error': f'Unexpected error: {str(e)}'} |
|
|
|
|
|
def _calculate_score_distribution(self, scores: Dict[str, float]) -> Dict[str, Any]: |
|
|
"""Calculate distribution of scores for insights""" |
|
|
if not scores: |
|
|
return {} |
|
|
|
|
|
score_values = list(scores.values()) |
|
|
|
|
|
return { |
|
|
'highest_score': max(score_values), |
|
|
'lowest_score': min(score_values), |
|
|
'average_score': sum(score_values) / len(score_values), |
|
|
'score_range': max(score_values) - min(score_values), |
|
|
'scores_above_7': len([s for s in score_values if s >= 7.0]), |
|
|
'scores_below_5': len([s for s in score_values if s < 5.0]) |
|
|
} |
|
|
|
|
|
def _generate_executive_summary(self, analysis_results: Dict[str, Any]) -> str: |
|
|
"""Generate executive summary based on analysis results""" |
|
|
overall_score = analysis_results.get('overall_score', 0) |
|
|
pages_analyzed = analysis_results.get('pages_analyzed', 0) |
|
|
|
|
|
if overall_score >= 8.0: |
|
|
performance = "excellent" |
|
|
elif overall_score >= 6.5: |
|
|
performance = "good" |
|
|
elif overall_score >= 5.0: |
|
|
performance = "moderate" |
|
|
else: |
|
|
performance = "needs improvement" |
|
|
|
|
|
return f"Analysis of {pages_analyzed} pages shows {performance} GEO performance with an overall score of {overall_score:.1f}/10. Key opportunities exist in {analysis_results.get('lowest_performing_metric', {}).get('metric', 'multiple areas')}." |
|
|
|
|
|
def _generate_performance_insights(self, analysis_results: Dict[str, Any]) -> List[str]: |
|
|
"""Generate performance insights based on analysis""" |
|
|
insights = [] |
|
|
|
|
|
best_metric = analysis_results.get('best_performing_metric', {}) |
|
|
worst_metric = analysis_results.get('lowest_performing_metric', {}) |
|
|
|
|
|
if best_metric.get('score', 0) >= 8.0: |
|
|
insights.append(f"Strong performance in {best_metric.get('metric', 'unknown')} (score: {best_metric.get('score', 0):.1f})") |
|
|
|
|
|
if worst_metric.get('score', 10) < 6.0: |
|
|
insights.append(f"Significant improvement needed in {worst_metric.get('metric', 'unknown')} (score: {worst_metric.get('score', 0):.1f})") |
|
|
|
|
|
score_dist = analysis_results.get('score_distribution', {}) |
|
|
if score_dist.get('score_range', 0) > 3.0: |
|
|
insights.append("High variability in scores indicates inconsistent optimization across metrics") |
|
|
|
|
|
return insights |
|
|
|
|
|
def _prioritize_recommendations(self, recommendations: List[str]) -> List[Dict[str, Any]]: |
|
|
"""Prioritize recommendations based on impact potential""" |
|
|
prioritized = [] |
|
|
|
|
|
|
|
|
high_impact_keywords = ['semantic', 'structure', 'authority', 'factual'] |
|
|
medium_impact_keywords = ['readability', 'clarity', 'format'] |
|
|
|
|
|
for i, rec in enumerate(recommendations): |
|
|
priority = 'low' |
|
|
if any(keyword in rec.lower() for keyword in high_impact_keywords): |
|
|
priority = 'high' |
|
|
elif any(keyword in rec.lower() for keyword in medium_impact_keywords): |
|
|
priority = 'medium' |
|
|
|
|
|
prioritized.append({ |
|
|
'recommendation': rec, |
|
|
'priority': priority, |
|
|
'order': i + 1 |
|
|
}) |
|
|
|
|
|
|
|
|
priority_order = {'high': 1, 'medium': 2, 'low': 3} |
|
|
prioritized.sort(key=lambda x: priority_order[x['priority']]) |
|
|
|
|
|
return prioritized |
|
|
|
|
|
def _create_optimization_roadmap(self, analysis_results: Dict[str, Any]) -> Dict[str, List[str]]: |
|
|
"""Create a phased optimization roadmap""" |
|
|
roadmap = { |
|
|
'immediate_actions': [], |
|
|
'short_term_goals': [], |
|
|
'long_term_strategy': [] |
|
|
} |
|
|
|
|
|
overall_score = analysis_results.get('overall_score', 0) |
|
|
worst_metric = analysis_results.get('lowest_performing_metric', {}) |
|
|
|
|
|
|
|
|
if worst_metric.get('score', 10) < 5.0: |
|
|
roadmap['immediate_actions'].append(f"Address critical issues in {worst_metric.get('metric', 'low-scoring areas')}") |
|
|
|
|
|
|
|
|
if overall_score < 7.0: |
|
|
roadmap['short_term_goals'].append("Improve overall GEO score to above 7.0") |
|
|
roadmap['short_term_goals'].append("Enhance content structure and semantic richness") |
|
|
|
|
|
|
|
|
roadmap['long_term_strategy'].append("Establish consistent GEO optimization process") |
|
|
roadmap['long_term_strategy'].append("Monitor and track AI search performance") |
|
|
|
|
|
return roadmap |
|
|
|
|
|
def _assess_competitive_position(self, analysis_results: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Assess competitive position based on scores""" |
|
|
overall_score = analysis_results.get('overall_score', 0) |
|
|
|
|
|
if overall_score >= 8.5: |
|
|
position = "market_leader" |
|
|
description = "Content is highly optimized for AI search engines" |
|
|
elif overall_score >= 7.0: |
|
|
position = "competitive" |
|
|
description = "Content performs well but has room for improvement" |
|
|
elif overall_score >= 5.5: |
|
|
position = "average" |
|
|
description = "Content meets basic standards but lacks optimization" |
|
|
else: |
|
|
position = "needs_work" |
|
|
description = "Content requires significant optimization for AI search" |
|
|
|
|
|
return { |
|
|
'position': position, |
|
|
'description': description, |
|
|
'score': overall_score, |
|
|
'percentile_estimate': min(overall_score * 10, 100) |
|
|
} |
|
|
|
|
|
def _get_timestamp(self) -> str: |
|
|
"""Get current timestamp""" |
|
|
from datetime import datetime |
|
|
return datetime.now().strftime('%Y-%m-%d %H:%M:%S') |