|
|
"""
|
|
|
GEO Scoring Module
|
|
|
Analyzes content for Generative Engine Optimization (GEO) performance
|
|
|
"""
|
|
|
|
|
|
import json
|
|
|
from typing import Dict, Any, List
|
|
|
from langchain.prompts import ChatPromptTemplate
|
|
|
|
|
|
|
|
|
class GEOScorer:
|
|
|
"""Main class for calculating GEO scores and analysis"""
|
|
|
|
|
|
def __init__(self, llm):
|
|
|
self.llm = llm
|
|
|
self.setup_prompts()
|
|
|
|
|
|
def setup_prompts(self):
|
|
|
"""Initialize prompts for different types of analysis"""
|
|
|
|
|
|
|
|
|
self.geo_analysis_prompt = """You are a Generative Engine Optimizer (GEO) specialist. Analyze the provided content for its effectiveness in AI-powered search engines and LLM systems.
|
|
|
|
|
|
Evaluate the content based on these GEO criteria (score 1-10 each):
|
|
|
|
|
|
1. **AI Search Visibility**: How likely is this content to be surfaced by AI search engines?
|
|
|
2. **Query Intent Matching**: How well does the content match common user queries?
|
|
|
3. **Factual Accuracy & Authority**: How trustworthy and authoritative is the information?
|
|
|
4. **Conversational Readiness**: How suitable is the content for AI chat responses?
|
|
|
5. **Semantic Richness**: How well does the content use relevant semantic keywords?
|
|
|
6. **Context Completeness**: Does the content provide complete, self-contained answers?
|
|
|
7. **Citation Worthiness**: How likely are AI systems to cite this content?
|
|
|
8. **Multi-Query Coverage**: Does the content answer multiple related questions?
|
|
|
|
|
|
Also identify:
|
|
|
- Primary topics and entities
|
|
|
- Missing information gaps
|
|
|
- Optimization opportunities
|
|
|
- Specific enhancement recommendations
|
|
|
|
|
|
Format your response as JSON:
|
|
|
|
|
|
```json
|
|
|
{
|
|
|
"geo_scores": {
|
|
|
"ai_search_visibility": 7.5,
|
|
|
"query_intent_matching": 8.0,
|
|
|
"factual_accuracy": 9.0,
|
|
|
"conversational_readiness": 6.5,
|
|
|
"semantic_richness": 7.0,
|
|
|
"context_completeness": 8.5,
|
|
|
"citation_worthiness": 7.8,
|
|
|
"multi_query_coverage": 6.0
|
|
|
},
|
|
|
"overall_geo_score": 7.5,
|
|
|
"primary_topics": ["topic1", "topic2"],
|
|
|
"entities": ["entity1", "entity2"],
|
|
|
"missing_gaps": ["gap1", "gap2"],
|
|
|
"optimization_opportunities": [
|
|
|
{
|
|
|
"type": "semantic_enhancement",
|
|
|
"description": "Add more related terms",
|
|
|
"priority": "high"
|
|
|
}
|
|
|
],
|
|
|
"recommendations": [
|
|
|
"Specific actionable recommendation 1",
|
|
|
"Specific actionable recommendation 2"
|
|
|
]
|
|
|
}
|
|
|
```"""
|
|
|
|
|
|
|
|
|
self.quick_score_prompt = """Analyze this content for AI search optimization. Provide scores (1-10) for:
|
|
|
|
|
|
1. AI Search Visibility
|
|
|
2. Query Intent Matching
|
|
|
3. Conversational Readiness
|
|
|
4. Citation Worthiness
|
|
|
|
|
|
Respond in JSON format:
|
|
|
```json
|
|
|
{
|
|
|
"scores": {
|
|
|
"ai_search_visibility": 7.5,
|
|
|
"query_intent_matching": 8.0,
|
|
|
"conversational_readiness": 6.5,
|
|
|
"citation_worthiness": 7.8
|
|
|
},
|
|
|
"overall_score": 7.5,
|
|
|
"top_recommendation": "Most important improvement needed"
|
|
|
}
|
|
|
```"""
|
|
|
|
|
|
|
|
|
self.competitive_prompt = """Compare these content pieces for GEO performance. Identify which performs better for AI search and why.
|
|
|
|
|
|
Content A: {content_a}
|
|
|
|
|
|
Content B: {content_b}
|
|
|
|
|
|
Provide analysis in JSON:
|
|
|
```json
|
|
|
{
|
|
|
"winner": "A" or "B",
|
|
|
"score_comparison": {
|
|
|
"content_a_score": 7.5,
|
|
|
"content_b_score": 8.2
|
|
|
},
|
|
|
"key_differences": ["difference1", "difference2"],
|
|
|
"improvement_suggestions": {
|
|
|
"content_a": ["suggestion1"],
|
|
|
"content_b": ["suggestion1"]
|
|
|
}
|
|
|
}
|
|
|
```"""
|
|
|
|
|
|
def analyze_page_geo(self, content: str, title: str, detailed: bool = True) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Analyze a single page for GEO performance
|
|
|
|
|
|
Args:
|
|
|
content (str): Page content to analyze
|
|
|
title (str): Page title
|
|
|
detailed (bool): Whether to perform detailed analysis
|
|
|
|
|
|
Returns:
|
|
|
Dict: GEO analysis results
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
if detailed:
|
|
|
prompt_template = ChatPromptTemplate.from_messages([
|
|
|
("system", self.geo_analysis_prompt),
|
|
|
("user", f"Title: {title}\n\nContent: {content[:8000]}")
|
|
|
])
|
|
|
else:
|
|
|
prompt_template = ChatPromptTemplate.from_messages([
|
|
|
("system", self.quick_score_prompt),
|
|
|
("user", f"Title: {title}\n\nContent: {content[:4000]}")
|
|
|
])
|
|
|
|
|
|
|
|
|
chain = prompt_template | self.llm
|
|
|
result = chain.invoke({})
|
|
|
|
|
|
|
|
|
result_content = result.content if hasattr(result, 'content') else str(result)
|
|
|
parsed_result = self._parse_llm_response(result_content)
|
|
|
|
|
|
|
|
|
parsed_result.update({
|
|
|
'analyzed_title': title,
|
|
|
'content_length': len(content),
|
|
|
'word_count': len(content.split()),
|
|
|
'analysis_type': 'detailed' if detailed else 'quick'
|
|
|
})
|
|
|
|
|
|
return parsed_result
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'error': f"GEO analysis failed: {str(e)}"}
|
|
|
|
|
|
def analyze_multiple_pages(self, pages_data: List[Dict[str, Any]], detailed: bool = True) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Analyze multiple pages and return consolidated results
|
|
|
|
|
|
Args:
|
|
|
pages_data (List[Dict]): List of page data with content and metadata
|
|
|
detailed (bool): Whether to perform detailed analysis
|
|
|
|
|
|
Returns:
|
|
|
List[Dict]: List of GEO analysis results
|
|
|
"""
|
|
|
results = []
|
|
|
|
|
|
for i, page_data in enumerate(pages_data):
|
|
|
try:
|
|
|
content = page_data.get('content', '')
|
|
|
title = page_data.get('title', f'Page {i+1}')
|
|
|
|
|
|
analysis = self.analyze_page_geo(content, title, detailed)
|
|
|
|
|
|
|
|
|
analysis.update({
|
|
|
'page_url': page_data.get('url', ''),
|
|
|
'page_index': i,
|
|
|
'source_word_count': page_data.get('word_count', 0)
|
|
|
})
|
|
|
|
|
|
results.append(analysis)
|
|
|
|
|
|
except Exception as e:
|
|
|
results.append({
|
|
|
'page_index': i,
|
|
|
'page_url': page_data.get('url', ''),
|
|
|
'error': f"Analysis failed: {str(e)}"
|
|
|
})
|
|
|
|
|
|
return results
|
|
|
|
|
|
def compare_content_geo(self, content_a: str, content_b: str, titles: tuple = None) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Compare two pieces of content for GEO performance
|
|
|
|
|
|
Args:
|
|
|
content_a (str): First content to compare
|
|
|
content_b (str): Second content to compare
|
|
|
titles (tuple): Optional titles for the content pieces
|
|
|
|
|
|
Returns:
|
|
|
Dict: Comparison analysis results
|
|
|
"""
|
|
|
try:
|
|
|
title_a, title_b = titles if titles else ("Content A", "Content B")
|
|
|
|
|
|
prompt_template = ChatPromptTemplate.from_messages([
|
|
|
("system", self.competitive_prompt),
|
|
|
("user", "")
|
|
|
])
|
|
|
|
|
|
|
|
|
formatted_prompt = self.competitive_prompt.format(
|
|
|
content_a=f"Title: {title_a}\nContent: {content_a[:4000]}",
|
|
|
content_b=f"Title: {title_b}\nContent: {content_b[:4000]}"
|
|
|
)
|
|
|
|
|
|
chain = ChatPromptTemplate.from_messages([
|
|
|
("system", formatted_prompt),
|
|
|
("user", "Perform the comparison analysis.")
|
|
|
]) | self.llm
|
|
|
|
|
|
result = chain.invoke({})
|
|
|
result_content = result.content if hasattr(result, 'content') else str(result)
|
|
|
|
|
|
return self._parse_llm_response(result_content)
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'error': f"Comparison analysis failed: {str(e)}"}
|
|
|
|
|
|
def calculate_aggregate_scores(self, individual_results: List[Dict[str, Any]]) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Calculate aggregate GEO scores from multiple page analyses
|
|
|
|
|
|
Args:
|
|
|
individual_results (List[Dict]): List of individual page analysis results
|
|
|
|
|
|
Returns:
|
|
|
Dict: Aggregate scores and insights
|
|
|
"""
|
|
|
try:
|
|
|
valid_results = [r for r in individual_results if 'geo_scores' in r and not r.get('error')]
|
|
|
|
|
|
if not valid_results:
|
|
|
return {'error': 'No valid results to aggregate'}
|
|
|
|
|
|
|
|
|
score_keys = list(valid_results[0]['geo_scores'].keys())
|
|
|
avg_scores = {}
|
|
|
|
|
|
for key in score_keys:
|
|
|
scores = [r['geo_scores'][key] for r in valid_results if key in r['geo_scores']]
|
|
|
avg_scores[key] = sum(scores) / len(scores) if scores else 0
|
|
|
|
|
|
overall_avg = sum(avg_scores.values()) / len(avg_scores) if avg_scores else 0
|
|
|
|
|
|
|
|
|
all_recommendations = []
|
|
|
all_opportunities = []
|
|
|
all_topics = []
|
|
|
all_entities = []
|
|
|
|
|
|
for result in valid_results:
|
|
|
all_recommendations.extend(result.get('recommendations', []))
|
|
|
all_opportunities.extend(result.get('optimization_opportunities', []))
|
|
|
all_topics.extend(result.get('primary_topics', []))
|
|
|
all_entities.extend(result.get('entities', []))
|
|
|
|
|
|
|
|
|
unique_recommendations = list(set(all_recommendations))
|
|
|
unique_topics = list(set(all_topics))
|
|
|
unique_entities = list(set(all_entities))
|
|
|
|
|
|
|
|
|
best_score = max(avg_scores.items(), key=lambda x: x[1]) if avg_scores else ('none', 0)
|
|
|
worst_score = min(avg_scores.items(), key=lambda x: x[1]) if avg_scores else ('none', 0)
|
|
|
|
|
|
return {
|
|
|
'aggregate_scores': avg_scores,
|
|
|
'overall_score': overall_avg,
|
|
|
'pages_analyzed': len(valid_results),
|
|
|
'best_performing_metric': {
|
|
|
'metric': best_score[0],
|
|
|
'score': best_score[1]
|
|
|
},
|
|
|
'lowest_performing_metric': {
|
|
|
'metric': worst_score[0],
|
|
|
'score': worst_score[1]
|
|
|
},
|
|
|
'consolidated_recommendations': unique_recommendations[:10],
|
|
|
'all_topics': unique_topics,
|
|
|
'all_entities': unique_entities,
|
|
|
'high_priority_opportunities': [
|
|
|
opp for opp in all_opportunities
|
|
|
if opp.get('priority') == 'high'
|
|
|
][:5],
|
|
|
'score_distribution': self._calculate_score_distribution(avg_scores)
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'error': f"Aggregation failed: {str(e)}"}
|
|
|
|
|
|
def generate_geo_report(self, analysis_results: Dict[str, Any], website_url: str = None) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Generate a comprehensive GEO report
|
|
|
|
|
|
Args:
|
|
|
analysis_results (Dict): Results from aggregate analysis
|
|
|
website_url (str): Optional website URL for context
|
|
|
|
|
|
Returns:
|
|
|
Dict: Comprehensive GEO report
|
|
|
"""
|
|
|
try:
|
|
|
report = {
|
|
|
'report_metadata': {
|
|
|
'generated_at': self._get_timestamp(),
|
|
|
'website_url': website_url,
|
|
|
'analysis_type': 'GEO Performance Report'
|
|
|
},
|
|
|
'executive_summary': self._generate_executive_summary(analysis_results),
|
|
|
'detailed_scores': analysis_results.get('aggregate_scores', {}),
|
|
|
'performance_insights': self._generate_performance_insights(analysis_results),
|
|
|
'actionable_recommendations': self._prioritize_recommendations(
|
|
|
analysis_results.get('consolidated_recommendations', [])
|
|
|
),
|
|
|
'optimization_roadmap': self._create_optimization_roadmap(analysis_results),
|
|
|
'competitive_position': self._assess_competitive_position(analysis_results),
|
|
|
'technical_details': {
|
|
|
'pages_analyzed': analysis_results.get('pages_analyzed', 0),
|
|
|
'overall_score': analysis_results.get('overall_score', 0),
|
|
|
'score_distribution': analysis_results.get('score_distribution', {})
|
|
|
}
|
|
|
}
|
|
|
|
|
|
return report
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'error': f"Report generation failed: {str(e)}"}
|
|
|
|
|
|
def _parse_llm_response(self, response_text: str) -> Dict[str, Any]:
|
|
|
"""Parse LLM response and extract JSON content"""
|
|
|
try:
|
|
|
|
|
|
json_start = response_text.find('{')
|
|
|
json_end = response_text.rfind('}') + 1
|
|
|
|
|
|
if json_start != -1 and json_end != -1:
|
|
|
json_str = response_text[json_start:json_end]
|
|
|
return json.loads(json_str)
|
|
|
else:
|
|
|
|
|
|
return {'raw_response': response_text, 'parsing_error': 'No JSON found'}
|
|
|
|
|
|
except json.JSONDecodeError as e:
|
|
|
return {'raw_response': response_text, 'parsing_error': f'JSON decode error: {str(e)}'}
|
|
|
except Exception as e:
|
|
|
return {'raw_response': response_text, 'parsing_error': f'Unexpected error: {str(e)}'}
|
|
|
|
|
|
def _calculate_score_distribution(self, scores: Dict[str, float]) -> Dict[str, Any]:
|
|
|
"""Calculate distribution of scores for insights"""
|
|
|
if not scores:
|
|
|
return {}
|
|
|
|
|
|
score_values = list(scores.values())
|
|
|
|
|
|
return {
|
|
|
'highest_score': max(score_values),
|
|
|
'lowest_score': min(score_values),
|
|
|
'average_score': sum(score_values) / len(score_values),
|
|
|
'score_range': max(score_values) - min(score_values),
|
|
|
'scores_above_7': len([s for s in score_values if s >= 7.0]),
|
|
|
'scores_below_5': len([s for s in score_values if s < 5.0])
|
|
|
}
|
|
|
|
|
|
def _generate_executive_summary(self, analysis_results: Dict[str, Any]) -> str:
|
|
|
"""Generate executive summary based on analysis results"""
|
|
|
overall_score = analysis_results.get('overall_score', 0)
|
|
|
pages_analyzed = analysis_results.get('pages_analyzed', 0)
|
|
|
|
|
|
if overall_score >= 8.0:
|
|
|
performance = "excellent"
|
|
|
elif overall_score >= 6.5:
|
|
|
performance = "good"
|
|
|
elif overall_score >= 5.0:
|
|
|
performance = "moderate"
|
|
|
else:
|
|
|
performance = "needs improvement"
|
|
|
|
|
|
return f"Analysis of {pages_analyzed} pages shows {performance} GEO performance with an overall score of {overall_score:.1f}/10. Key opportunities exist in {analysis_results.get('lowest_performing_metric', {}).get('metric', 'multiple areas')}."
|
|
|
|
|
|
def _generate_performance_insights(self, analysis_results: Dict[str, Any]) -> List[str]:
|
|
|
"""Generate performance insights based on analysis"""
|
|
|
insights = []
|
|
|
|
|
|
best_metric = analysis_results.get('best_performing_metric', {})
|
|
|
worst_metric = analysis_results.get('lowest_performing_metric', {})
|
|
|
|
|
|
if best_metric.get('score', 0) >= 8.0:
|
|
|
insights.append(f"Strong performance in {best_metric.get('metric', 'unknown')} (score: {best_metric.get('score', 0):.1f})")
|
|
|
|
|
|
if worst_metric.get('score', 10) < 6.0:
|
|
|
insights.append(f"Significant improvement needed in {worst_metric.get('metric', 'unknown')} (score: {worst_metric.get('score', 0):.1f})")
|
|
|
|
|
|
score_dist = analysis_results.get('score_distribution', {})
|
|
|
if score_dist.get('score_range', 0) > 3.0:
|
|
|
insights.append("High variability in scores indicates inconsistent optimization across metrics")
|
|
|
|
|
|
return insights
|
|
|
|
|
|
def _prioritize_recommendations(self, recommendations: List[str]) -> List[Dict[str, Any]]:
|
|
|
"""Prioritize recommendations based on impact potential"""
|
|
|
prioritized = []
|
|
|
|
|
|
|
|
|
high_impact_keywords = ['semantic', 'structure', 'authority', 'factual']
|
|
|
medium_impact_keywords = ['readability', 'clarity', 'format']
|
|
|
|
|
|
for i, rec in enumerate(recommendations):
|
|
|
priority = 'low'
|
|
|
if any(keyword in rec.lower() for keyword in high_impact_keywords):
|
|
|
priority = 'high'
|
|
|
elif any(keyword in rec.lower() for keyword in medium_impact_keywords):
|
|
|
priority = 'medium'
|
|
|
|
|
|
prioritized.append({
|
|
|
'recommendation': rec,
|
|
|
'priority': priority,
|
|
|
'order': i + 1
|
|
|
})
|
|
|
|
|
|
|
|
|
priority_order = {'high': 1, 'medium': 2, 'low': 3}
|
|
|
prioritized.sort(key=lambda x: priority_order[x['priority']])
|
|
|
|
|
|
return prioritized
|
|
|
|
|
|
def _create_optimization_roadmap(self, analysis_results: Dict[str, Any]) -> Dict[str, List[str]]:
|
|
|
"""Create a phased optimization roadmap"""
|
|
|
roadmap = {
|
|
|
'immediate_actions': [],
|
|
|
'short_term_goals': [],
|
|
|
'long_term_strategy': []
|
|
|
}
|
|
|
|
|
|
overall_score = analysis_results.get('overall_score', 0)
|
|
|
worst_metric = analysis_results.get('lowest_performing_metric', {})
|
|
|
|
|
|
|
|
|
if worst_metric.get('score', 10) < 5.0:
|
|
|
roadmap['immediate_actions'].append(f"Address critical issues in {worst_metric.get('metric', 'low-scoring areas')}")
|
|
|
|
|
|
|
|
|
if overall_score < 7.0:
|
|
|
roadmap['short_term_goals'].append("Improve overall GEO score to above 7.0")
|
|
|
roadmap['short_term_goals'].append("Enhance content structure and semantic richness")
|
|
|
|
|
|
|
|
|
roadmap['long_term_strategy'].append("Establish consistent GEO optimization process")
|
|
|
roadmap['long_term_strategy'].append("Monitor and track AI search performance")
|
|
|
|
|
|
return roadmap
|
|
|
|
|
|
def _assess_competitive_position(self, analysis_results: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
"""Assess competitive position based on scores"""
|
|
|
overall_score = analysis_results.get('overall_score', 0)
|
|
|
|
|
|
if overall_score >= 8.5:
|
|
|
position = "market_leader"
|
|
|
description = "Content is highly optimized for AI search engines"
|
|
|
elif overall_score >= 7.0:
|
|
|
position = "competitive"
|
|
|
description = "Content performs well but has room for improvement"
|
|
|
elif overall_score >= 5.5:
|
|
|
position = "average"
|
|
|
description = "Content meets basic standards but lacks optimization"
|
|
|
else:
|
|
|
position = "needs_work"
|
|
|
description = "Content requires significant optimization for AI search"
|
|
|
|
|
|
return {
|
|
|
'position': position,
|
|
|
'description': description,
|
|
|
'score': overall_score,
|
|
|
'percentile_estimate': min(overall_score * 10, 100)
|
|
|
}
|
|
|
|
|
|
def _get_timestamp(self) -> str:
|
|
|
"""Get current timestamp"""
|
|
|
from datetime import datetime
|
|
|
return datetime.now().strftime('%Y-%m-%d %H:%M:%S') |