| """ |
| GEO Scoring Module |
| Analyzes content for Generative Engine Optimization (GEO) performance |
| """ |
|
|
| import json |
| from typing import Dict, Any, List |
| from langchain.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate |
|
|
|
|
| class GEOScorer: |
| """Main class for calculating GEO scores and analysis""" |
| |
| def __init__(self, llm): |
| self.llm = llm |
| self.setup_prompts() |
| |
| def setup_prompts(self): |
| """Initialize prompts for different types of analysis""" |
|
|
| |
| self.geo_analysis_prompt = ( |
| "You are a Generative Engine Optimization (GEO) Specialist. Your task is to critically analyze the input content for its effectiveness in AI-powered search engines and large language model (LLM) systems. " |
| "Evaluate the content using the following GEO criteria, assigning a score from 1 to 10 for each: \n\n" |
| "1. AI Search Visibility - How likely is the content to be surfaced by AI search engines?\n" |
| "2. Query Intent Matching - How well does the content align with common user queries?\n" |
| "3. Factual Accuracy & Authority - How trustworthy and authoritative is the information?\n" |
| "4. Conversational Readiness - Is the content well-suited for AI chat responses?\n" |
| "5. Semantic Richness - Does the content effectively use relevant semantic keywords?\n" |
| "6. Context Completeness - Is the content self-contained and does it provide complete answers?\n" |
| "7. Citation Worthiness - How likely is the content to be cited by AI systems?\n" |
| "8. Multi-Query Coverage - Does the content address multiple related questions?\n\n" |
| "Also provide:\n" |
| "- Key topics and entities mentioned\n" |
| "- Missing information or content gaps\n" |
| "- Specific optimization opportunities\n" |
| "- Actionable enhancement recommendations\n\n" |
| "Respond strictly in JSON format using the structure below (double curly braces shown here to escape string formatting, do NOT include them in actual output):\n\n" |
| "{{\n" |
| " \"geo_scores\": {{\n" |
| " \"ai_search_visibility\": 0.0,\n" |
| " \"query_intent_matching\": 0.0,\n" |
| " \"factual_accuracy\": 0.0,\n" |
| " \"conversational_readiness\": 0.0,\n" |
| " \"semantic_richness\": 0.0,\n" |
| " \"context_completeness\": 0.0,\n" |
| " \"citation_worthiness\": 0.0,\n" |
| " \"multi_query_coverage\": 0.0\n" |
| " }},\n" |
| " \"overall_geo_score\": 0.0,\n" |
| " \"primary_topics\": [\"topic1\", \"topic2\"],\n" |
| " \"entities\": [\"entity1\", \"entity2\"],\n" |
| " \"missing_gaps\": [\"gap1\", \"gap2\"],\n" |
| " \"optimization_opportunities\": [\n" |
| " {{\n" |
| " \"type\": \"semantic_enhancement\",\n" |
| " \"description\": \"Describe the improvement opportunity\",\n" |
| " \"priority\": \"high\"\n" |
| " }}\n" |
| " ],\n" |
| " \"recommendations\": [\n" |
| " \"Write clear and specific suggestions to improve the content\"\n" |
| " ]\n" |
| "}}" |
| ) |
|
|
| |
| self.quick_score_prompt = ( |
| "You are an AI Search Optimization Analyst. Evaluate the given content and provide a quick scoring based on key criteria.\n" |
| "Rate each of the following from 1 to 10:\n" |
| "1. AI Search Visibility\n" |
| "2. Query Intent Matching\n" |
| "3. Conversational Readiness\n" |
| "4. Citation Worthiness\n\n" |
| "{{\n" |
| " \"scores\": {{\n" |
| " \"ai_search_visibility\": 0.0,\n" |
| " \"query_intent_matching\": 0.0,\n" |
| " \"conversational_readiness\": 0.0,\n" |
| " \"citation_worthiness\": 0.0\n" |
| " }},\n" |
| " \"overall_score\": 0.0,\n" |
| " \"top_recommendation\": \"Provide the most critical improvement needed\"\n" |
| "}}" |
| ) |
|
|
| |
| self.competitive_prompt = ( |
| "Compare these content pieces for GEO performance. Identify which performs better for AI search and why.\n" |
| "Content A: {content_a}\n" |
| "Content B: {content_b}\n" |
| "Provide analysis in JSON:\n" |
| "{{\n" |
| " \"winner\": \"A\" or \"B\",\n" |
| " \"score_comparison\": {{\n" |
| " \"content_a_score\": 7.5,\n" |
| " \"content_b_score\": 8.2\n" |
| " }},\n" |
| " \"key_differences\": [\"difference1\", \"difference2\"],\n" |
| " \"improvement_suggestions\": {{\n" |
| " \"content_a\": [\"suggestion1\"],\n" |
| " \"content_b\": [\"suggestion1\"]\n" |
| " }}\n" |
| "}}" |
| ) |
| |
| def analyze_page_geo(self, content: str, title: str, detailed: bool = True) -> Dict[str, Any]: |
| """ |
| Analyze a single page for GEO performance |
| """ |
| try: |
| |
| if detailed: |
| system_prompt = self.geo_analysis_prompt |
| user_message = f"Title: {title}\n\nContent: {content[:8000]}" |
| else: |
| system_prompt = self.quick_score_prompt |
| user_message = f"Title: {title}\n\nContent: {content[:4000]}" |
|
|
| |
| prompt_template = ChatPromptTemplate.from_messages([ |
| SystemMessagePromptTemplate.from_template(system_prompt), |
| HumanMessagePromptTemplate.from_template(user_message) |
| ]) |
| |
| |
| chain = prompt_template | self.llm |
| result = chain.invoke({}) |
|
|
| |
| result_content = result.content if hasattr(result, 'content') else str(result) |
| parsed_result = self._parse_llm_response(result_content) |
|
|
| |
| parsed_result.update({ |
| 'analyzed_title': title, |
| 'content_length': len(content), |
| 'word_count': len(content.split()), |
| 'analysis_type': 'detailed' if detailed else 'quick' |
| }) |
|
|
| return parsed_result |
|
|
| except Exception as e: |
| return {'error': f"GEO analysis failed: {str(e)}"} |
| |
| def analyze_multiple_pages(self, pages_data: List[Dict[str, Any]], detailed: bool = True) -> List[Dict[str, Any]]: |
| """ |
| Analyze multiple pages and return consolidated results |
| |
| Args: |
| pages_data (List[Dict]): List of page data with content and metadata |
| detailed (bool): Whether to perform detailed analysis |
| |
| Returns: |
| List[Dict]: List of GEO analysis results |
| """ |
| results = [] |
| |
| for i, page_data in enumerate(pages_data): |
| try: |
| content = page_data.get('content', '') |
| title = page_data.get('title', f'Page {i+1}') |
| |
| analysis = self.analyze_page_geo(content, title, detailed) |
| |
| |
| analysis.update({ |
| 'page_url': page_data.get('url', ''), |
| 'page_index': i, |
| 'source_word_count': page_data.get('word_count', 0) |
| }) |
| |
| results.append(analysis) |
| |
| except Exception as e: |
| results.append({ |
| 'page_index': i, |
| 'page_url': page_data.get('url', ''), |
| 'error': f"Analysis failed: {str(e)}" |
| }) |
| |
| return results |
| |
| def compare_content_geo(self, content_a: str, content_b: str, titles: tuple = None) -> Dict[str, Any]: |
| """ |
| Compare two pieces of content for GEO performance |
| |
| Args: |
| content_a (str): First content to compare |
| content_b (str): Second content to compare |
| titles (tuple): Optional titles for the content pieces |
| |
| Returns: |
| Dict: Comparison analysis results |
| """ |
| try: |
| title_a, title_b = titles if titles else ("Content A", "Content B") |
| |
| prompt_template = ChatPromptTemplate.from_messages([ |
| ("system", self.competitive_prompt), |
| ("user", "") |
| ]) |
| |
| |
| formatted_prompt = self.competitive_prompt.format( |
| content_a=f"Title: {title_a}\nContent: {content_a[:4000]}", |
| content_b=f"Title: {title_b}\nContent: {content_b[:4000]}" |
| ) |
| |
| chain = ChatPromptTemplate.from_messages([ |
| ("system", formatted_prompt), |
| ("user", "Perform the comparison analysis.") |
| ]) | self.llm |
| |
| result = chain.invoke({}) |
| result_content = result.content if hasattr(result, 'content') else str(result) |
| |
| return self._parse_llm_response(result_content) |
| |
| except Exception as e: |
| return {'error': f"Comparison analysis failed: {str(e)}"} |
| |
| def calculate_aggregate_scores(self, individual_results: List[Dict[str, Any]]) -> Dict[str, Any]: |
| """ |
| Calculate aggregate GEO scores from multiple page analyses |
| |
| Args: |
| individual_results (List[Dict]): List of individual page analysis results |
| |
| Returns: |
| Dict: Aggregate scores and insights |
| """ |
| try: |
| valid_results = [r for r in individual_results if 'geo_scores' in r and not r.get('error')] |
| |
| if not valid_results: |
| return {'error': 'No valid results to aggregate'} |
| |
| |
| score_keys = list(valid_results[0]['geo_scores'].keys()) |
| avg_scores = {} |
| |
| for key in score_keys: |
| scores = [r['geo_scores'][key] for r in valid_results if key in r['geo_scores']] |
| avg_scores[key] = sum(scores) / len(scores) if scores else 0 |
| |
| overall_avg = sum(avg_scores.values()) / len(avg_scores) if avg_scores else 0 |
| |
| |
| all_recommendations = [] |
| all_opportunities = [] |
| all_topics = [] |
| all_entities = [] |
| |
| for result in valid_results: |
| all_recommendations.extend(result.get('recommendations', [])) |
| all_opportunities.extend(result.get('optimization_opportunities', [])) |
| all_topics.extend(result.get('primary_topics', [])) |
| all_entities.extend(result.get('entities', [])) |
| |
| |
| unique_recommendations = list(set(all_recommendations)) |
| unique_topics = list(set(all_topics)) |
| unique_entities = list(set(all_entities)) |
| |
| |
| best_score = max(avg_scores.items(), key=lambda x: x[1]) if avg_scores else ('none', 0) |
| worst_score = min(avg_scores.items(), key=lambda x: x[1]) if avg_scores else ('none', 0) |
| |
| return { |
| 'aggregate_scores': avg_scores, |
| 'overall_score': overall_avg, |
| 'pages_analyzed': len(valid_results), |
| 'best_performing_metric': { |
| 'metric': best_score[0], |
| 'score': best_score[1] |
| }, |
| 'lowest_performing_metric': { |
| 'metric': worst_score[0], |
| 'score': worst_score[1] |
| }, |
| 'consolidated_recommendations': unique_recommendations[:10], |
| 'all_topics': unique_topics, |
| 'all_entities': unique_entities, |
| 'high_priority_opportunities': [ |
| opp for opp in all_opportunities |
| if opp.get('priority') == 'high' |
| ][:5], |
| 'score_distribution': self._calculate_score_distribution(avg_scores) |
| } |
| |
| except Exception as e: |
| return {'error': f"Aggregation failed: {str(e)}"} |
| |
| def generate_geo_report(self, analysis_results: Dict[str, Any], website_url: str = None) -> Dict[str, Any]: |
| """ |
| Generate a comprehensive GEO report |
| |
| Args: |
| analysis_results (Dict): Results from aggregate analysis |
| website_url (str): Optional website URL for context |
| |
| Returns: |
| Dict: Comprehensive GEO report |
| """ |
| try: |
| report = { |
| 'report_metadata': { |
| 'generated_at': self._get_timestamp(), |
| 'website_url': website_url, |
| 'analysis_type': 'GEO Performance Report' |
| }, |
| 'executive_summary': self._generate_executive_summary(analysis_results), |
| 'detailed_scores': analysis_results.get('aggregate_scores', {}), |
| 'performance_insights': self._generate_performance_insights(analysis_results), |
| 'actionable_recommendations': self._prioritize_recommendations( |
| analysis_results.get('consolidated_recommendations', []) |
| ), |
| 'optimization_roadmap': self._create_optimization_roadmap(analysis_results), |
| 'competitive_position': self._assess_competitive_position(analysis_results), |
| 'technical_details': { |
| 'pages_analyzed': analysis_results.get('pages_analyzed', 0), |
| 'overall_score': analysis_results.get('overall_score', 0), |
| 'score_distribution': analysis_results.get('score_distribution', {}) |
| } |
| } |
| |
| return report |
| |
| except Exception as e: |
| return {'error': f"Report generation failed: {str(e)}"} |
| |
| def _parse_llm_response(self, response_text: str) -> Dict[str, Any]: |
| """Parse LLM response and extract JSON content""" |
| try: |
| |
| json_start = response_text.find('{') |
| json_end = response_text.rfind('}') + 1 |
| |
| if json_start != -1 and json_end != -1: |
| json_str = response_text[json_start:json_end] |
| return json.loads(json_str) |
| else: |
| |
| return {'raw_response': response_text, 'parsing_error': 'No JSON found'} |
| |
| except json.JSONDecodeError as e: |
| return {'raw_response': response_text, 'parsing_error': f'JSON decode error: {str(e)}'} |
| except Exception as e: |
| return {'raw_response': response_text, 'parsing_error': f'Unexpected error: {str(e)}'} |
| |
| def _calculate_score_distribution(self, scores: Dict[str, float]) -> Dict[str, Any]: |
| """Calculate distribution of scores for insights""" |
| if not scores: |
| return {} |
| |
| score_values = list(scores.values()) |
| |
| return { |
| 'highest_score': max(score_values), |
| 'lowest_score': min(score_values), |
| 'average_score': sum(score_values) / len(score_values), |
| 'score_range': max(score_values) - min(score_values), |
| 'scores_above_7': len([s for s in score_values if s >= 7.0]), |
| 'scores_below_5': len([s for s in score_values if s < 5.0]) |
| } |
| |
| def _generate_executive_summary(self, analysis_results: Dict[str, Any]) -> str: |
| """Generate executive summary based on analysis results""" |
| overall_score = analysis_results.get('overall_score', 0) |
| pages_analyzed = analysis_results.get('pages_analyzed', 0) |
| |
| if overall_score >= 8.0: |
| performance = "excellent" |
| elif overall_score >= 6.5: |
| performance = "good" |
| elif overall_score >= 5.0: |
| performance = "moderate" |
| else: |
| performance = "needs improvement" |
| |
| return f"Analysis of {pages_analyzed} pages shows {performance} GEO performance with an overall score of {overall_score:.1f}/10. Key opportunities exist in {analysis_results.get('lowest_performing_metric', {}).get('metric', 'multiple areas')}." |
| |
| def _generate_performance_insights(self, analysis_results: Dict[str, Any]) -> List[str]: |
| """Generate performance insights based on analysis""" |
| insights = [] |
| |
| best_metric = analysis_results.get('best_performing_metric', {}) |
| worst_metric = analysis_results.get('lowest_performing_metric', {}) |
| |
| if best_metric.get('score', 0) >= 8.0: |
| insights.append(f"Strong performance in {best_metric.get('metric', 'unknown')} (score: {best_metric.get('score', 0):.1f})") |
| |
| if worst_metric.get('score', 10) < 6.0: |
| insights.append(f"Significant improvement needed in {worst_metric.get('metric', 'unknown')} (score: {worst_metric.get('score', 0):.1f})") |
| |
| score_dist = analysis_results.get('score_distribution', {}) |
| if score_dist.get('score_range', 0) > 3.0: |
| insights.append("High variability in scores indicates inconsistent optimization across metrics") |
| |
| return insights |
| |
| def _prioritize_recommendations(self, recommendations: List[str]) -> List[Dict[str, Any]]: |
| """Prioritize recommendations based on impact potential""" |
| prioritized = [] |
| |
| |
| high_impact_keywords = ['semantic', 'structure', 'authority', 'factual'] |
| medium_impact_keywords = ['readability', 'clarity', 'format'] |
| |
| for i, rec in enumerate(recommendations): |
| priority = 'low' |
| if any(keyword in rec.lower() for keyword in high_impact_keywords): |
| priority = 'high' |
| elif any(keyword in rec.lower() for keyword in medium_impact_keywords): |
| priority = 'medium' |
| |
| prioritized.append({ |
| 'recommendation': rec, |
| 'priority': priority, |
| 'order': i + 1 |
| }) |
| |
| |
| priority_order = {'high': 1, 'medium': 2, 'low': 3} |
| prioritized.sort(key=lambda x: priority_order[x['priority']]) |
| |
| return prioritized |
| |
| def _create_optimization_roadmap(self, analysis_results: Dict[str, Any]) -> Dict[str, List[str]]: |
| """Create a phased optimization roadmap""" |
| roadmap = { |
| 'immediate_actions': [], |
| 'short_term_goals': [], |
| 'long_term_strategy': [] |
| } |
| |
| overall_score = analysis_results.get('overall_score', 0) |
| worst_metric = analysis_results.get('lowest_performing_metric', {}) |
| |
| |
| if worst_metric.get('score', 10) < 5.0: |
| roadmap['immediate_actions'].append(f"Address critical issues in {worst_metric.get('metric', 'low-scoring areas')}") |
| |
| |
| if overall_score < 7.0: |
| roadmap['short_term_goals'].append("Improve overall GEO score to above 7.0") |
| roadmap['short_term_goals'].append("Enhance content structure and semantic richness") |
| |
| |
| roadmap['long_term_strategy'].append("Establish consistent GEO optimization process") |
| roadmap['long_term_strategy'].append("Monitor and track AI search performance") |
| |
| return roadmap |
| |
| def _assess_competitive_position(self, analysis_results: Dict[str, Any]) -> Dict[str, Any]: |
| """Assess competitive position based on scores""" |
| overall_score = analysis_results.get('overall_score', 0) |
| |
| if overall_score >= 8.5: |
| position = "market_leader" |
| description = "Content is highly optimized for AI search engines" |
| elif overall_score >= 7.0: |
| position = "competitive" |
| description = "Content performs well but has room for improvement" |
| elif overall_score >= 5.5: |
| position = "average" |
| description = "Content meets basic standards but lacks optimization" |
| else: |
| position = "needs_work" |
| description = "Content requires significant optimization for AI search" |
| |
| return { |
| 'position': position, |
| 'description': description, |
| 'score': overall_score, |
| 'percentile_estimate': min(overall_score * 10, 100) |
| } |
| |
| def _get_timestamp(self) -> str: |
| """Get current timestamp""" |
| from datetime import datetime |
| return datetime.now().strftime('%Y-%m-%d %H:%M:%S') |