""" Improved GEO Scoring Module Analyzes content for Generative Engine Optimization (GEO) performance """ import json import re import logging import hashlib import asyncio from datetime import datetime from typing import Dict, Any, List, Union, Optional, Tuple from functools import lru_cache from dataclasses import dataclass from langchain.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate @dataclass class GEOConfig: """Configuration class for GEO scoring parameters""" MAX_CONTENT_LENGTH: int = 8000 MIN_CONTENT_LENGTH: int = 100 QUICK_CONTENT_LENGTH: int = 4000 DEFAULT_TIMEOUT: int = 30 MAX_RETRIES: int = 3 CACHE_SIZE: int = 100 SMART_TRUNCATE_THRESHOLD: float = 0.8 class GEOValidator: """Input validation utilities for GEO analysis""" @staticmethod def validate_content_inputs(content: str, title: str, config: GEOConfig) -> Tuple[bool, str]: """Validate content and title inputs""" if not isinstance(content, str) or not isinstance(title, str): return False, "Content and title must be strings" if len(content.strip()) < config.MIN_CONTENT_LENGTH: return False, f"Content must be at least {config.MIN_CONTENT_LENGTH} characters" if len(title.strip()) == 0: return False, "Title cannot be empty" if len(title) > 200: return False, "Title too long (max 200 characters)" return True, "" @staticmethod def validate_pages_data(pages_data: List[Dict[str, Any]]) -> Tuple[bool, str]: """Validate pages data structure""" if not isinstance(pages_data, list): return False, "Pages data must be a list" if len(pages_data) == 0: return False, "Pages data cannot be empty" for i, page in enumerate(pages_data): if not isinstance(page, dict): return False, f"Page {i} must be a dictionary" if 'content' not in page: return False, f"Page {i} missing 'content' field" return True, "" class GEOContentProcessor: """Content processing utilities for GEO analysis""" def __init__(self, config: GEOConfig): self.config = config self.dangerous_patterns = [ r'ignore\s+previous\s+instructions', r'system\s*:', r'assistant\s*:', r'```json\s*{.*"prompt"', r'<\s*system\s*>', r'<\s*user\s*>', r'forget\s+everything', r'new\s+instructions\s*:', ] def sanitize_content(self, content: str) -> str: """Sanitize content to prevent prompt injection""" if not content: return "" # Remove potential prompt injection patterns sanitized = content for pattern in self.dangerous_patterns: sanitized = re.sub(pattern, '[FILTERED]', sanitized, flags=re.IGNORECASE) # Remove excessive whitespace sanitized = re.sub(r'\s+', ' ', sanitized).strip() # Hard limit on length return sanitized[:self.config.MAX_CONTENT_LENGTH * 2] def smart_truncate(self, content: str, max_length: int) -> str: """Intelligently truncate content preserving meaning""" if len(content) <= max_length: return content # Find last complete sentence within limit truncated = content[:max_length] # Look for sentence endings sentence_endings = ['. ', '! ', '? '] best_cut = -1 for ending in sentence_endings: last_occurrence = truncated.rfind(ending) if last_occurrence > max_length * self.config.SMART_TRUNCATE_THRESHOLD: best_cut = max(best_cut, last_occurrence + len(ending) - 1) if best_cut > 0: return truncated[:best_cut] # If no good sentence break, look for paragraph breaks last_paragraph = truncated.rfind('\n\n') if last_paragraph > max_length * self.config.SMART_TRUNCATE_THRESHOLD: return truncated[:last_paragraph] # If no good breaks, just truncate and add ellipsis return truncated.rstrip() + "..." def generate_content_hash(self, content: str, title: str, analysis_type: str) -> str: """Generate hash for content caching""" combined = f"{title}|{content}|{analysis_type}" return hashlib.md5(combined.encode()).hexdigest() class GEOPromptManager: """Manages prompts for different types of GEO analysis""" def __init__(self): self.prompts = self._initialize_prompts() def _initialize_prompts(self) -> Dict[str, str]: """Initialize all prompts""" return { 'detailed_analysis': self._get_detailed_prompt(), 'quick_analysis': self._get_quick_prompt(), 'competitive_analysis': self._get_competitive_prompt() } def _get_detailed_prompt(self) -> str: return """You are a Generative Engine Optimizer (GEO) specialist. Analyze the provided content for its effectiveness in AI-powered search engines and LLM systems. Evaluate the content based on these GEO criteria (score 1-10 each): 1. **AI Search Visibility**: How likely is this content to be surfaced by AI search engines? 2. **Query Intent Matching**: How well does the content match common user queries? 3. **Factual Accuracy & Authority**: How trustworthy and authoritative is the information? 4. **Conversational Readiness**: How suitable is the content for AI chat responses? 5. **Semantic Richness**: How well does the content use relevant semantic keywords? 6. **Context Completeness**: Does the content provide complete, self-contained answers? 7. **Citation Worthiness**: How likely are AI systems to cite this content? 8. **Multi-Query Coverage**: Does the content answer multiple related questions? Also identify: - Primary topics and entities - Missing information gaps - Optimization opportunities - Specific enhancement recommendations IMPORTANT: Respond ONLY with valid JSON. Do not include any text before or after the JSON. { "geo_scores": { "ai_search_visibility": 7.5, "query_intent_matching": 8.0, "factual_accuracy": 9.0, "conversational_readiness": 6.5, "semantic_richness": 7.0, "context_completeness": 8.5, "citation_worthiness": 7.8, "multi_query_coverage": 6.0 }, "overall_geo_score": 7.5, "primary_topics": ["topic1", "topic2"], "entities": ["entity1", "entity2"], "missing_gaps": ["gap1", "gap2"], "optimization_opportunities": [ { "type": "semantic_enhancement", "description": "Add more related terms", "priority": "high" } ], "recommendations": [ "Specific actionable recommendation 1", "Specific actionable recommendation 2" ] }""" def _get_quick_prompt(self) -> str: return """Analyze this content for AI search optimization. Provide scores (1-10) for: 1. AI Search Visibility 2. Query Intent Matching 3. Conversational Readiness 4. Citation Worthiness IMPORTANT: Respond ONLY with valid JSON. Do not include any text before or after the JSON. { "scores": { "ai_search_visibility": 7.5, "query_intent_matching": 8.0, "conversational_readiness": 6.5, "citation_worthiness": 7.8 }, "overall_score": 7.5, "top_recommendation": "Most important improvement needed" }""" def _get_competitive_prompt(self) -> str: return """Compare these content pieces for GEO performance. Identify which performs better for AI search and why. IMPORTANT: Respond ONLY with valid JSON. Do not include any text before or after the JSON. { "winner": "A", "score_comparison": { "content_a_score": 7.5, "content_b_score": 8.2 }, "key_differences": ["difference1", "difference2"], "improvement_suggestions": { "content_a": ["suggestion1"], "content_b": ["suggestion1"] } }""" def get_prompt(self, prompt_type: str) -> str: """Get prompt by type""" return self.prompts.get(prompt_type, self.prompts['detailed_analysis']) class GEOScorer: """Main class for calculating GEO scores and analysis""" def __init__(self, llm, config: Optional[GEOConfig] = None, logger: Optional[logging.Logger] = None): self.llm = llm self.config = config or GEOConfig() self.logger = logger or self._setup_logger() # Initialize components self.validator = GEOValidator() self.processor = GEOContentProcessor(self.config) self.prompt_manager = GEOPromptManager() # Performance tracking self.analysis_count = 0 self.cache_hits = 0 def _setup_logger(self) -> logging.Logger: """Setup default logger""" logger = logging.getLogger(__name__) if not logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) return logger @lru_cache(maxsize=100) def _get_cached_analysis(self, content_hash: str) -> Optional[Dict[str, Any]]: """Cache mechanism for repeated analyses""" # This is a simple in-memory cache using lru_cache # In production, you might want to use Redis or similar return None def analyze_page_geo(self, content: str, title: str, detailed: bool = True) -> Dict[str, Any]: """ Analyze a single page for GEO performance with improved error handling and validation """ start_time = datetime.now() self.analysis_count += 1 try: # Input validation is_valid, error_msg = self.validator.validate_content_inputs(content, title, self.config) if not is_valid: self.logger.warning(f"Input validation failed: {error_msg}") return {'error': error_msg, 'error_type': 'validation'} # Check cache analysis_type = 'detailed' if detailed else 'quick' content_hash = self.processor.generate_content_hash(content, title, analysis_type) # Process content sanitized_content = self.processor.sanitize_content(content) max_length = self.config.MAX_CONTENT_LENGTH if detailed else self.config.QUICK_CONTENT_LENGTH processed_content = self.processor.smart_truncate(sanitized_content, max_length) # Get appropriate prompt prompt_type = 'detailed_analysis' if detailed else 'quick_analysis' system_prompt = self.prompt_manager.get_prompt(prompt_type) user_message = f"Title: {title}\n\nContent: {processed_content}" # Build and execute prompt prompt_template = ChatPromptTemplate.from_messages([ SystemMessagePromptTemplate.from_template(system_prompt), HumanMessagePromptTemplate.from_template(user_message) ]) chain = prompt_template | self.llm result = chain.invoke({}) # Extract and parse result result_content = result.content if hasattr(result, 'content') else str(result) parsed_result = self._parse_llm_response(result_content) # Add metadata processing_time = (datetime.now() - start_time).total_seconds() parsed_result.update({ 'analyzed_title': title, 'content_length': len(content), 'processed_content_length': len(processed_content), 'word_count': len(content.split()), 'analysis_type': analysis_type, 'processing_time_seconds': processing_time, 'content_hash': content_hash }) self.logger.info(f"Analysis completed for '{title}' in {processing_time:.2f}s") return parsed_result except json.JSONDecodeError as e: self.logger.error(f"JSON parsing failed for title '{title}': {e}") return {'error': 'Invalid response format from LLM', 'error_type': 'parsing', 'title': title} except Exception as e: self.logger.error(f"Analysis failed for title '{title}': {e}") return {'error': f"Analysis failed: {str(e)}", 'error_type': 'system', 'title': title} def analyze_multiple_pages(self, pages_data: List[Dict[str, Any]], detailed: bool = True) -> List[Dict[str, Any]]: """ Analyze multiple pages with improved validation and error handling """ # Validate input is_valid, error_msg = self.validator.validate_pages_data(pages_data) if not is_valid: self.logger.error(f"Pages data validation failed: {error_msg}") return [{'error': error_msg, 'error_type': 'validation'}] results = [] successful_analyses = 0 self.logger.info(f"Starting analysis of {len(pages_data)} pages") for i, page_data in enumerate(pages_data): try: content = page_data.get('content', '') title = page_data.get('title', f'Page {i+1}') analysis = self.analyze_page_geo(content, title, detailed) # Add page-specific metadata analysis.update({ 'page_url': page_data.get('url', ''), 'page_index': i, 'source_word_count': page_data.get('word_count', 0) }) if 'error' not in analysis: successful_analyses += 1 results.append(analysis) except Exception as e: self.logger.error(f"Failed to analyze page {i}: {e}") results.append({ 'page_index': i, 'page_url': page_data.get('url', ''), 'error': f"Analysis failed: {str(e)}", 'error_type': 'system' }) self.logger.info(f"Completed analysis: {successful_analyses}/{len(pages_data)} successful") return results def compare_content_geo(self, content_a: str, content_b: str, titles: Optional[Tuple[str, str]] = None) -> Dict[str, Any]: """ Compare two pieces of content for GEO performance with improved validation """ try: title_a, title_b = titles if titles else ("Content A", "Content B") # Validate inputs is_valid_a, error_a = self.validator.validate_content_inputs(content_a, title_a, self.config) is_valid_b, error_b = self.validator.validate_content_inputs(content_b, title_b, self.config) if not is_valid_a: return {'error': f"Content A validation failed: {error_a}", 'error_type': 'validation'} if not is_valid_b: return {'error': f"Content B validation failed: {error_b}", 'error_type': 'validation'} # Process content processed_a = self.processor.smart_truncate( self.processor.sanitize_content(content_a), self.config.QUICK_CONTENT_LENGTH ) processed_b = self.processor.smart_truncate( self.processor.sanitize_content(content_b), self.config.QUICK_CONTENT_LENGTH ) # Build comparison prompt system_prompt = self.prompt_manager.get_prompt('competitive_analysis') user_message = f"""Content A: Title: {title_a} Content: {processed_a} Content B: Title: {title_b} Content: {processed_b}""" prompt_template = ChatPromptTemplate.from_messages([ SystemMessagePromptTemplate.from_template(system_prompt), HumanMessagePromptTemplate.from_template(user_message) ]) chain = prompt_template | self.llm result = chain.invoke({}) result_content = result.content if hasattr(result, 'content') else str(result) comparison_result = self._parse_llm_response(result_content) # Add metadata comparison_result.update({ 'content_a_title': title_a, 'content_b_title': title_b, 'content_a_length': len(content_a), 'content_b_length': len(content_b) }) return comparison_result except Exception as e: self.logger.error(f"Comparison analysis failed: {e}") return {'error': f"Comparison analysis failed: {str(e)}", 'error_type': 'system'} def calculate_aggregate_scores(self, individual_results: List[Dict[str, Any]]) -> Dict[str, Any]: """ Calculate aggregate GEO scores with improved error handling and insights """ try: # Filter out error results valid_results = [r for r in individual_results if 'geo_scores' in r and not r.get('error')] error_results = [r for r in individual_results if r.get('error')] if not valid_results: return { 'error': 'No valid results to aggregate', 'error_type': 'no_data', 'total_errors': len(error_results), 'error_breakdown': self._analyze_errors(error_results) } # Calculate average scores score_keys = list(valid_results[0]['geo_scores'].keys()) avg_scores = {} score_details = {} for key in score_keys: scores = [r['geo_scores'][key] for r in valid_results if key in r['geo_scores']] if scores: avg_scores[key] = sum(scores) / len(scores) score_details[key] = { 'average': avg_scores[key], 'min': min(scores), 'max': max(scores), 'count': len(scores) } else: avg_scores[key] = 0 score_details[key] = {'average': 0, 'min': 0, 'max': 0, 'count': 0} overall_avg = sum(avg_scores.values()) / len(avg_scores) if avg_scores else 0 # Collect insights insights = self._generate_aggregate_insights(valid_results, avg_scores) # Find performance patterns best_score = max(avg_scores.items(), key=lambda x: x[1]) if avg_scores else ('none', 0) worst_score = min(avg_scores.items(), key=lambda x: x[1]) if avg_scores else ('none', 0) return { 'aggregate_scores': avg_scores, 'score_details': score_details, 'overall_score': overall_avg, 'pages_analyzed': len(valid_results), 'pages_with_errors': len(error_results), 'success_rate': len(valid_results) / len(individual_results) if individual_results else 0, 'best_performing_metric': { 'metric': best_score[0], 'score': best_score[1] }, 'lowest_performing_metric': { 'metric': worst_score[0], 'score': worst_score[1] }, 'insights': insights, 'score_distribution': self._calculate_score_distribution(avg_scores), 'processing_stats': self._calculate_processing_stats(valid_results) } except Exception as e: self.logger.error(f"Aggregation failed: {e}") return {'error': f"Aggregation failed: {str(e)}", 'error_type': 'system'} def get_performance_stats(self) -> Dict[str, Any]: """Get performance statistics for the scorer""" return { 'total_analyses': self.analysis_count, 'cache_hits': self.cache_hits, 'cache_hit_rate': self.cache_hits / max(self.analysis_count, 1), 'config': { 'max_content_length': self.config.MAX_CONTENT_LENGTH, 'cache_size': self.config.CACHE_SIZE } } def _parse_llm_response(self, response_text: str) -> Dict[str, Any]: """Enhanced LLM response parsing with better error handling""" try: # Clean response text cleaned_response = response_text.strip() # Try to find JSON content json_patterns = [ r'\{.*\}', # Look for JSON object r'```json\s*(\{.*?\})\s*```', # JSON in code blocks r'```\s*(\{.*?\})\s*```' # Generic code blocks ] for pattern in json_patterns: matches = re.findall(pattern, cleaned_response, re.DOTALL) if matches: json_str = matches[0] if isinstance(matches[0], str) else matches[0] try: return json.loads(json_str) except json.JSONDecodeError: continue # If no JSON patterns found, try parsing the entire response try: return json.loads(cleaned_response) except json.JSONDecodeError: pass # Last resort: return structured error return { 'raw_response': response_text, 'parsing_error': 'No valid JSON found in response', 'error_type': 'parsing' } except Exception as e: return { 'raw_response': response_text, 'parsing_error': f'Unexpected parsing error: {str(e)}', 'error_type': 'parsing' } def _analyze_errors(self, error_results: List[Dict[str, Any]]) -> Dict[str, int]: """Analyze error patterns""" error_breakdown = {} for result in error_results: error_type = result.get('error_type', 'unknown') error_breakdown[error_type] = error_breakdown.get(error_type, 0) + 1 return error_breakdown def _generate_aggregate_insights(self, valid_results: List[Dict[str, Any]], avg_scores: Dict[str, float]) -> List[str]: """Generate insights from aggregate analysis""" insights = [] if not avg_scores: return ["No valid scores to analyze"] overall_avg = sum(avg_scores.values()) / len(avg_scores) # Performance level insights if overall_avg >= 8.0: insights.append("Excellent overall GEO performance across analyzed content") elif overall_avg >= 6.5: insights.append("Good GEO performance with room for targeted improvements") elif overall_avg >= 5.0: insights.append("Moderate GEO performance - significant optimization opportunities exist") else: insights.append("Below-average GEO performance - comprehensive optimization needed") # Specific metric insights best_metric = max(avg_scores.items(), key=lambda x: x[1]) worst_metric = min(avg_scores.items(), key=lambda x: x[1]) if best_metric[1] >= 8.0: insights.append(f"Strong performance in {best_metric[0].replace('_', ' ')} (score: {best_metric[1]:.1f})") if worst_metric[1] < 6.0: insights.append(f"Critical improvement needed in {worst_metric[0].replace('_', ' ')} (score: {worst_metric[1]:.1f})") # Consistency insights score_values = list(avg_scores.values()) score_range = max(score_values) - min(score_values) if score_range > 3.0: insights.append("High variability in scores indicates inconsistent optimization across metrics") elif score_range < 1.5: insights.append("Consistent performance across all GEO metrics") return insights def _calculate_score_distribution(self, scores: Dict[str, float]) -> Dict[str, Any]: """Calculate enhanced score distribution statistics""" if not scores: return {} score_values = list(scores.values()) return { 'highest_score': max(score_values), 'lowest_score': min(score_values), 'average_score': sum(score_values) / len(score_values), 'score_range': max(score_values) - min(score_values), 'scores_above_8': len([s for s in score_values if s >= 8.0]), 'scores_above_7': len([s for s in score_values if s >= 7.0]), 'scores_below_5': len([s for s in score_values if s < 5.0]), 'score_variance': sum((s - sum(score_values)/len(score_values))**2 for s in score_values) / len(score_values) } def _calculate_processing_stats(self, valid_results: List[Dict[str, Any]]) -> Dict[str, Any]: """Calculate processing statistics""" processing_times = [r.get('processing_time_seconds', 0) for r in valid_results if 'processing_time_seconds' in r] content_lengths = [r.get('content_length', 0) for r in valid_results if 'content_length' in r] if not processing_times: return {} return { 'avg_processing_time': sum(processing_times) / len(processing_times), 'max_processing_time': max(processing_times), 'min_processing_time': min(processing_times), 'avg_content_length': sum(content_lengths) / len(content_lengths) if content_lengths else 0, 'total_processing_time': sum(processing_times) } def _get_timestamp(self) -> str: """Get current timestamp""" return datetime.now().strftime('%Y-%m-%d %H:%M:%S') # Example usage and testing utilities class GEOScorerTester: """Testing utilities for GEOScorer""" @staticmethod def create_test_content() -> List[Dict[str, Any]]: """Create test content for validation""" return [ { 'title': 'How to Optimize Content for AI Search', 'content': 'AI search engines are revolutionizing how people find information. To optimize your content for AI-powered search, focus on creating comprehensive, factual, and well-structured content that directly answers user questions. Use semantic keywords, provide clear context, and ensure your content is authoritative and cite-worthy.', 'url': 'https://example.com/ai-search-optimization' }, { 'title': 'Best Practices for GEO', 'content': 'Generative Engine Optimization (GEO) requires a different approach than traditional SEO. Focus on conversational readiness, semantic richness, and multi-query coverage. Ensure your content provides complete answers and is structured in a way that AI systems can easily understand and cite.', 'url': 'https://example.com/geo-best-practices' } ] @staticmethod def run_basic_test(scorer: GEOScorer) -> Dict[str, Any]: """Run basic functionality test""" test_content = GEOScorerTester.create_test_content() results = scorer.analyze_multiple_pages(test_content, detailed=False) aggregate = scorer.calculate_aggregate_scores(results) stats = scorer.get_performance_stats() return { 'individual_results': results, 'aggregate_results': aggregate, 'performance_stats': stats }