Spaces:
Runtime error
Runtime error
| """ | |
| GEO Scoring Module | |
| Analyzes content for Generative Engine Optimization (GEO) performance | |
| """ | |
| import json | |
| from typing import Dict, Any, List | |
| from langchain.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate | |
| class GEOScorer: | |
| """Main class for calculating GEO scores and analysis""" | |
| def __init__(self, llm): | |
| self.llm = llm | |
| self.setup_prompts() | |
| def setup_prompts(self): | |
| """Initialize prompts for different types of analysis""" | |
| # Main GEO analysis prompt | |
| self.geo_analysis_prompt = ( | |
| "You are a Generative Engine Optimization (GEO) Specialist. Your task is to critically analyze the input content for its effectiveness in AI-powered search engines and large language model (LLM) systems. " | |
| "Evaluate the content using the following GEO criteria, assigning a score from 1 to 10 for each: \n\n" | |
| "1. AI Search Visibility - How likely is the content to be surfaced by AI search engines?\n" | |
| "2. Query Intent Matching - How well does the content align with common user queries?\n" | |
| "3. Factual Accuracy & Authority - How trustworthy and authoritative is the information?\n" | |
| "4. Conversational Readiness - Is the content well-suited for AI chat responses?\n" | |
| "5. Semantic Richness - Does the content effectively use relevant semantic keywords?\n" | |
| "6. Context Completeness - Is the content self-contained and does it provide complete answers?\n" | |
| "7. Citation Worthiness - How likely is the content to be cited by AI systems?\n" | |
| "8. Multi-Query Coverage - Does the content address multiple related questions?\n\n" | |
| "Also provide:\n" | |
| "- Key topics and entities mentioned\n" | |
| "- Missing information or content gaps\n" | |
| "- Specific optimization opportunities\n" | |
| "- Actionable enhancement recommendations\n\n" | |
| "Respond strictly in JSON format using the structure below (double curly braces shown here to escape string formatting, do NOT include them in actual output):\n\n" | |
| "{{\n" | |
| " \"geo_scores\": {{\n" | |
| " \"ai_search_visibility\": 0.0,\n" | |
| " \"query_intent_matching\": 0.0,\n" | |
| " \"factual_accuracy\": 0.0,\n" | |
| " \"conversational_readiness\": 0.0,\n" | |
| " \"semantic_richness\": 0.0,\n" | |
| " \"context_completeness\": 0.0,\n" | |
| " \"citation_worthiness\": 0.0,\n" | |
| " \"multi_query_coverage\": 0.0\n" | |
| " }},\n" | |
| " \"overall_geo_score\": 0.0,\n" | |
| " \"primary_topics\": [\"topic1\", \"topic2\"],\n" | |
| " \"entities\": [\"entity1\", \"entity2\"],\n" | |
| " \"missing_gaps\": [\"gap1\", \"gap2\"],\n" | |
| " \"optimization_opportunities\": [\n" | |
| " {{\n" | |
| " \"type\": \"semantic_enhancement\",\n" | |
| " \"description\": \"Describe the improvement opportunity\",\n" | |
| " \"priority\": \"high\"\n" | |
| " }}\n" | |
| " ],\n" | |
| " \"recommendations\": [\n" | |
| " \"Write clear and specific suggestions to improve the content\"\n" | |
| " ]\n" | |
| "}}" | |
| ) | |
| # Quick scoring prompt for faster analysis | |
| self.quick_score_prompt = ( | |
| "You are an AI Search Optimization Analyst. Evaluate the given content and provide a quick scoring based on key criteria.\n" | |
| "Rate each of the following from 1 to 10:\n" | |
| "1. AI Search Visibility\n" | |
| "2. Query Intent Matching\n" | |
| "3. Conversational Readiness\n" | |
| "4. Citation Worthiness\n\n" | |
| "{{\n" | |
| " \"scores\": {{\n" | |
| " \"ai_search_visibility\": 0.0,\n" | |
| " \"query_intent_matching\": 0.0,\n" | |
| " \"conversational_readiness\": 0.0,\n" | |
| " \"citation_worthiness\": 0.0\n" | |
| " }},\n" | |
| " \"overall_score\": 0.0,\n" | |
| " \"top_recommendation\": \"Provide the most critical improvement needed\"\n" | |
| "}}" | |
| ) | |
| # Competitive analysis prompt | |
| self.competitive_prompt = ( | |
| "Compare these content pieces for GEO performance. Identify which performs better for AI search and why.\n" | |
| "Content A: {content_a}\n" | |
| "Content B: {content_b}\n" | |
| "Provide analysis in JSON:\n" | |
| "{{\n" | |
| " \"winner\": \"A\" or \"B\",\n" | |
| " \"score_comparison\": {{\n" | |
| " \"content_a_score\": 7.5,\n" | |
| " \"content_b_score\": 8.2\n" | |
| " }},\n" | |
| " \"key_differences\": [\"difference1\", \"difference2\"],\n" | |
| " \"improvement_suggestions\": {{\n" | |
| " \"content_a\": [\"suggestion1\"],\n" | |
| " \"content_b\": [\"suggestion1\"]\n" | |
| " }}\n" | |
| "}}" | |
| ) | |
| def analyze_page_geo(self, content: str, title: str, detailed: bool = True) -> Dict[str, Any]: | |
| """ | |
| Analyze a single page for GEO performance | |
| """ | |
| try: | |
| # Choose prompt based on detail level | |
| if detailed: | |
| system_prompt = self.geo_analysis_prompt | |
| user_message = f"Title: {title}\n\nContent: {content[:8000]}" | |
| else: | |
| system_prompt = self.quick_score_prompt | |
| user_message = f"Title: {title}\n\nContent: {content[:4000]}" | |
| # Build prompt and run analysis | |
| prompt_template = ChatPromptTemplate.from_messages([ | |
| SystemMessagePromptTemplate.from_template(system_prompt), | |
| HumanMessagePromptTemplate.from_template(user_message) | |
| ]) | |
| # ("user", user_message) | |
| # ("system", system_prompt), | |
| chain = prompt_template | self.llm | |
| result = chain.invoke({}) # No variables needed | |
| # Extract and parse result | |
| result_content = result.content if hasattr(result, 'content') else str(result) | |
| parsed_result = self._parse_llm_response(result_content) | |
| # Add metadata | |
| parsed_result.update({ | |
| 'analyzed_title': title, | |
| 'content_length': len(content), | |
| 'word_count': len(content.split()), | |
| 'analysis_type': 'detailed' if detailed else 'quick' | |
| }) | |
| return parsed_result | |
| except Exception as e: | |
| return {'error': f"GEO analysis failed: {str(e)}"} | |
| def analyze_multiple_pages(self, pages_data: List[Dict[str, Any]], detailed: bool = True) -> List[Dict[str, Any]]: | |
| """ | |
| Analyze multiple pages and return consolidated results | |
| Args: | |
| pages_data (List[Dict]): List of page data with content and metadata | |
| detailed (bool): Whether to perform detailed analysis | |
| Returns: | |
| List[Dict]: List of GEO analysis results | |
| """ | |
| results = [] | |
| for i, page_data in enumerate(pages_data): | |
| try: | |
| content = page_data.get('content', '') | |
| title = page_data.get('title', f'Page {i+1}') | |
| analysis = self.analyze_page_geo(content, title, detailed) | |
| # Add page-specific metadata | |
| analysis.update({ | |
| 'page_url': page_data.get('url', ''), | |
| 'page_index': i, | |
| 'source_word_count': page_data.get('word_count', 0) | |
| }) | |
| results.append(analysis) | |
| except Exception as e: | |
| results.append({ | |
| 'page_index': i, | |
| 'page_url': page_data.get('url', ''), | |
| 'error': f"Analysis failed: {str(e)}" | |
| }) | |
| return results | |
| def compare_content_geo(self, content_a: str, content_b: str, titles: tuple = None) -> Dict[str, Any]: | |
| """ | |
| Compare two pieces of content for GEO performance | |
| Args: | |
| content_a (str): First content to compare | |
| content_b (str): Second content to compare | |
| titles (tuple): Optional titles for the content pieces | |
| Returns: | |
| Dict: Comparison analysis results | |
| """ | |
| try: | |
| title_a, title_b = titles if titles else ("Content A", "Content B") | |
| prompt_template = ChatPromptTemplate.from_messages([ | |
| ("system", self.competitive_prompt), | |
| ("user", "") | |
| ]) | |
| # Format the competitive analysis prompt | |
| formatted_prompt = self.competitive_prompt.format( | |
| content_a=f"Title: {title_a}\nContent: {content_a[:4000]}", | |
| content_b=f"Title: {title_b}\nContent: {content_b[:4000]}" | |
| ) | |
| chain = ChatPromptTemplate.from_messages([ | |
| ("system", formatted_prompt), | |
| ("user", "Perform the comparison analysis.") | |
| ]) | self.llm | |
| result = chain.invoke({}) | |
| result_content = result.content if hasattr(result, 'content') else str(result) | |
| return self._parse_llm_response(result_content) | |
| except Exception as e: | |
| return {'error': f"Comparison analysis failed: {str(e)}"} | |
| def calculate_aggregate_scores(self, individual_results: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """ | |
| Calculate aggregate GEO scores from multiple page analyses | |
| Args: | |
| individual_results (List[Dict]): List of individual page analysis results | |
| Returns: | |
| Dict: Aggregate scores and insights | |
| """ | |
| try: | |
| valid_results = [r for r in individual_results if 'geo_scores' in r and not r.get('error')] | |
| if not valid_results: | |
| return {'error': 'No valid results to aggregate'} | |
| # Calculate average scores | |
| score_keys = list(valid_results[0]['geo_scores'].keys()) | |
| avg_scores = {} | |
| for key in score_keys: | |
| scores = [r['geo_scores'][key] for r in valid_results if key in r['geo_scores']] | |
| avg_scores[key] = sum(scores) / len(scores) if scores else 0 | |
| overall_avg = sum(avg_scores.values()) / len(avg_scores) if avg_scores else 0 | |
| # Collect all recommendations and opportunities | |
| all_recommendations = [] | |
| all_opportunities = [] | |
| all_topics = [] | |
| all_entities = [] | |
| for result in valid_results: | |
| all_recommendations.extend(result.get('recommendations', [])) | |
| all_opportunities.extend(result.get('optimization_opportunities', [])) | |
| all_topics.extend(result.get('primary_topics', [])) | |
| all_entities.extend(result.get('entities', [])) | |
| # Remove duplicates and prioritize | |
| unique_recommendations = list(set(all_recommendations)) | |
| unique_topics = list(set(all_topics)) | |
| unique_entities = list(set(all_entities)) | |
| # Find highest and lowest performing areas | |
| best_score = max(avg_scores.items(), key=lambda x: x[1]) if avg_scores else ('none', 0) | |
| worst_score = min(avg_scores.items(), key=lambda x: x[1]) if avg_scores else ('none', 0) | |
| return { | |
| 'aggregate_scores': avg_scores, | |
| 'overall_score': overall_avg, | |
| 'pages_analyzed': len(valid_results), | |
| 'best_performing_metric': { | |
| 'metric': best_score[0], | |
| 'score': best_score[1] | |
| }, | |
| 'lowest_performing_metric': { | |
| 'metric': worst_score[0], | |
| 'score': worst_score[1] | |
| }, | |
| 'consolidated_recommendations': unique_recommendations[:10], | |
| 'all_topics': unique_topics, | |
| 'all_entities': unique_entities, | |
| 'high_priority_opportunities': [ | |
| opp for opp in all_opportunities | |
| if opp.get('priority') == 'high' | |
| ][:5], | |
| 'score_distribution': self._calculate_score_distribution(avg_scores) | |
| } | |
| except Exception as e: | |
| return {'error': f"Aggregation failed: {str(e)}"} | |
| def generate_geo_report(self, analysis_results: Dict[str, Any], website_url: str = None) -> Dict[str, Any]: | |
| """ | |
| Generate a comprehensive GEO report | |
| Args: | |
| analysis_results (Dict): Results from aggregate analysis | |
| website_url (str): Optional website URL for context | |
| Returns: | |
| Dict: Comprehensive GEO report | |
| """ | |
| try: | |
| report = { | |
| 'report_metadata': { | |
| 'generated_at': self._get_timestamp(), | |
| 'website_url': website_url, | |
| 'analysis_type': 'GEO Performance Report' | |
| }, | |
| 'executive_summary': self._generate_executive_summary(analysis_results), | |
| 'detailed_scores': analysis_results.get('aggregate_scores', {}), | |
| 'performance_insights': self._generate_performance_insights(analysis_results), | |
| 'actionable_recommendations': self._prioritize_recommendations( | |
| analysis_results.get('consolidated_recommendations', []) | |
| ), | |
| 'optimization_roadmap': self._create_optimization_roadmap(analysis_results), | |
| 'competitive_position': self._assess_competitive_position(analysis_results), | |
| 'technical_details': { | |
| 'pages_analyzed': analysis_results.get('pages_analyzed', 0), | |
| 'overall_score': analysis_results.get('overall_score', 0), | |
| 'score_distribution': analysis_results.get('score_distribution', {}) | |
| } | |
| } | |
| return report | |
| except Exception as e: | |
| return {'error': f"Report generation failed: {str(e)}"} | |
| def _parse_llm_response(self, response_text: str) -> Dict[str, Any]: | |
| """Parse LLM response and extract JSON content""" | |
| try: | |
| # Find JSON content in the response | |
| json_start = response_text.find('{') | |
| json_end = response_text.rfind('}') + 1 | |
| if json_start != -1 and json_end != -1: | |
| json_str = response_text[json_start:json_end] | |
| return json.loads(json_str) | |
| else: | |
| # If no JSON found, return the raw response | |
| return {'raw_response': response_text, 'parsing_error': 'No JSON found'} | |
| except json.JSONDecodeError as e: | |
| return {'raw_response': response_text, 'parsing_error': f'JSON decode error: {str(e)}'} | |
| except Exception as e: | |
| return {'raw_response': response_text, 'parsing_error': f'Unexpected error: {str(e)}'} | |
| def _calculate_score_distribution(self, scores: Dict[str, float]) -> Dict[str, Any]: | |
| """Calculate distribution of scores for insights""" | |
| if not scores: | |
| return {} | |
| score_values = list(scores.values()) | |
| return { | |
| 'highest_score': max(score_values), | |
| 'lowest_score': min(score_values), | |
| 'average_score': sum(score_values) / len(score_values), | |
| 'score_range': max(score_values) - min(score_values), | |
| 'scores_above_7': len([s for s in score_values if s >= 7.0]), | |
| 'scores_below_5': len([s for s in score_values if s < 5.0]) | |
| } | |
| def _generate_executive_summary(self, analysis_results: Dict[str, Any]) -> str: | |
| """Generate executive summary based on analysis results""" | |
| overall_score = analysis_results.get('overall_score', 0) | |
| pages_analyzed = analysis_results.get('pages_analyzed', 0) | |
| if overall_score >= 8.0: | |
| performance = "excellent" | |
| elif overall_score >= 6.5: | |
| performance = "good" | |
| elif overall_score >= 5.0: | |
| performance = "moderate" | |
| else: | |
| performance = "needs improvement" | |
| return f"Analysis of {pages_analyzed} pages shows {performance} GEO performance with an overall score of {overall_score:.1f}/10. Key opportunities exist in {analysis_results.get('lowest_performing_metric', {}).get('metric', 'multiple areas')}." | |
| def _generate_performance_insights(self, analysis_results: Dict[str, Any]) -> List[str]: | |
| """Generate performance insights based on analysis""" | |
| insights = [] | |
| best_metric = analysis_results.get('best_performing_metric', {}) | |
| worst_metric = analysis_results.get('lowest_performing_metric', {}) | |
| if best_metric.get('score', 0) >= 8.0: | |
| insights.append(f"Strong performance in {best_metric.get('metric', 'unknown')} (score: {best_metric.get('score', 0):.1f})") | |
| if worst_metric.get('score', 10) < 6.0: | |
| insights.append(f"Significant improvement needed in {worst_metric.get('metric', 'unknown')} (score: {worst_metric.get('score', 0):.1f})") | |
| score_dist = analysis_results.get('score_distribution', {}) | |
| if score_dist.get('score_range', 0) > 3.0: | |
| insights.append("High variability in scores indicates inconsistent optimization across metrics") | |
| return insights | |
| def _prioritize_recommendations(self, recommendations: List[str]) -> List[Dict[str, Any]]: | |
| """Prioritize recommendations based on impact potential""" | |
| prioritized = [] | |
| # Simple prioritization based on keywords | |
| high_impact_keywords = ['semantic', 'structure', 'authority', 'factual'] | |
| medium_impact_keywords = ['readability', 'clarity', 'format'] | |
| for i, rec in enumerate(recommendations): | |
| priority = 'low' | |
| if any(keyword in rec.lower() for keyword in high_impact_keywords): | |
| priority = 'high' | |
| elif any(keyword in rec.lower() for keyword in medium_impact_keywords): | |
| priority = 'medium' | |
| prioritized.append({ | |
| 'recommendation': rec, | |
| 'priority': priority, | |
| 'order': i + 1 | |
| }) | |
| # Sort by priority | |
| priority_order = {'high': 1, 'medium': 2, 'low': 3} | |
| prioritized.sort(key=lambda x: priority_order[x['priority']]) | |
| return prioritized | |
| def _create_optimization_roadmap(self, analysis_results: Dict[str, Any]) -> Dict[str, List[str]]: | |
| """Create a phased optimization roadmap""" | |
| roadmap = { | |
| 'immediate_actions': [], | |
| 'short_term_goals': [], | |
| 'long_term_strategy': [] | |
| } | |
| overall_score = analysis_results.get('overall_score', 0) | |
| worst_metric = analysis_results.get('lowest_performing_metric', {}) | |
| # Immediate actions based on worst performing metric | |
| if worst_metric.get('score', 10) < 5.0: | |
| roadmap['immediate_actions'].append(f"Address critical issues in {worst_metric.get('metric', 'low-scoring areas')}") | |
| # Short-term goals | |
| if overall_score < 7.0: | |
| roadmap['short_term_goals'].append("Improve overall GEO score to above 7.0") | |
| roadmap['short_term_goals'].append("Enhance content structure and semantic richness") | |
| # Long-term strategy | |
| roadmap['long_term_strategy'].append("Establish consistent GEO optimization process") | |
| roadmap['long_term_strategy'].append("Monitor and track AI search performance") | |
| return roadmap | |
| def _assess_competitive_position(self, analysis_results: Dict[str, Any]) -> Dict[str, Any]: | |
| """Assess competitive position based on scores""" | |
| overall_score = analysis_results.get('overall_score', 0) | |
| if overall_score >= 8.5: | |
| position = "market_leader" | |
| description = "Content is highly optimized for AI search engines" | |
| elif overall_score >= 7.0: | |
| position = "competitive" | |
| description = "Content performs well but has room for improvement" | |
| elif overall_score >= 5.5: | |
| position = "average" | |
| description = "Content meets basic standards but lacks optimization" | |
| else: | |
| position = "needs_work" | |
| description = "Content requires significant optimization for AI search" | |
| return { | |
| 'position': position, | |
| 'description': description, | |
| 'score': overall_score, | |
| 'percentile_estimate': min(overall_score * 10, 100) # Rough percentile estimate | |
| } | |
| def _get_timestamp(self) -> str: | |
| """Get current timestamp""" | |
| from datetime import datetime | |
| return datetime.now().strftime('%Y-%m-%d %H:%M:%S') |