Alpha108's picture
Update utils/scorer.py
bf3d3b7 verified
raw
history blame
28.4 kB
"""
Improved GEO Scoring Module
Analyzes content for Generative Engine Optimization (GEO) performance
"""
import json
import re
import logging
import hashlib
import asyncio
from datetime import datetime
from typing import Dict, Any, List, Union, Optional, Tuple
from functools import lru_cache
from dataclasses import dataclass
from langchain.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate
@dataclass
class GEOConfig:
"""Configuration class for GEO scoring parameters"""
MAX_CONTENT_LENGTH: int = 8000
MIN_CONTENT_LENGTH: int = 100
QUICK_CONTENT_LENGTH: int = 4000
DEFAULT_TIMEOUT: int = 30
MAX_RETRIES: int = 3
CACHE_SIZE: int = 100
SMART_TRUNCATE_THRESHOLD: float = 0.8
class GEOValidator:
"""Input validation utilities for GEO analysis"""
@staticmethod
def validate_content_inputs(content: str, title: str, config: GEOConfig) -> Tuple[bool, str]:
"""Validate content and title inputs"""
if not isinstance(content, str) or not isinstance(title, str):
return False, "Content and title must be strings"
if len(content.strip()) < config.MIN_CONTENT_LENGTH:
return False, f"Content must be at least {config.MIN_CONTENT_LENGTH} characters"
if len(title.strip()) == 0:
return False, "Title cannot be empty"
if len(title) > 200:
return False, "Title too long (max 200 characters)"
return True, ""
@staticmethod
def validate_pages_data(pages_data: List[Dict[str, Any]]) -> Tuple[bool, str]:
"""Validate pages data structure"""
if not isinstance(pages_data, list):
return False, "Pages data must be a list"
if len(pages_data) == 0:
return False, "Pages data cannot be empty"
for i, page in enumerate(pages_data):
if not isinstance(page, dict):
return False, f"Page {i} must be a dictionary"
if 'content' not in page:
return False, f"Page {i} missing 'content' field"
return True, ""
class GEOContentProcessor:
"""Content processing utilities for GEO analysis"""
def __init__(self, config: GEOConfig):
self.config = config
self.dangerous_patterns = [
r'ignore\s+previous\s+instructions',
r'system\s*:',
r'assistant\s*:',
r'```json\s*{.*"prompt"',
r'<\s*system\s*>',
r'<\s*user\s*>',
r'forget\s+everything',
r'new\s+instructions\s*:',
]
def sanitize_content(self, content: str) -> str:
"""Sanitize content to prevent prompt injection"""
if not content:
return ""
# Remove potential prompt injection patterns
sanitized = content
for pattern in self.dangerous_patterns:
sanitized = re.sub(pattern, '[FILTERED]', sanitized, flags=re.IGNORECASE)
# Remove excessive whitespace
sanitized = re.sub(r'\s+', ' ', sanitized).strip()
# Hard limit on length
return sanitized[:self.config.MAX_CONTENT_LENGTH * 2]
def smart_truncate(self, content: str, max_length: int) -> str:
"""Intelligently truncate content preserving meaning"""
if len(content) <= max_length:
return content
# Find last complete sentence within limit
truncated = content[:max_length]
# Look for sentence endings
sentence_endings = ['. ', '! ', '? ']
best_cut = -1
for ending in sentence_endings:
last_occurrence = truncated.rfind(ending)
if last_occurrence > max_length * self.config.SMART_TRUNCATE_THRESHOLD:
best_cut = max(best_cut, last_occurrence + len(ending) - 1)
if best_cut > 0:
return truncated[:best_cut]
# If no good sentence break, look for paragraph breaks
last_paragraph = truncated.rfind('\n\n')
if last_paragraph > max_length * self.config.SMART_TRUNCATE_THRESHOLD:
return truncated[:last_paragraph]
# If no good breaks, just truncate and add ellipsis
return truncated.rstrip() + "..."
def generate_content_hash(self, content: str, title: str, analysis_type: str) -> str:
"""Generate hash for content caching"""
combined = f"{title}|{content}|{analysis_type}"
return hashlib.md5(combined.encode()).hexdigest()
class GEOPromptManager:
"""Manages prompts for different types of GEO analysis"""
def __init__(self):
self.prompts = self._initialize_prompts()
def _initialize_prompts(self) -> Dict[str, str]:
"""Initialize all prompts"""
return {
'detailed_analysis': self._get_detailed_prompt(),
'quick_analysis': self._get_quick_prompt(),
'competitive_analysis': self._get_competitive_prompt()
}
def _get_detailed_prompt(self) -> str:
return """You are a Generative Engine Optimizer (GEO) specialist. Analyze the provided content for its effectiveness in AI-powered search engines and LLM systems.
Evaluate the content based on these GEO criteria (score 1-10 each):
1. **AI Search Visibility**: How likely is this content to be surfaced by AI search engines?
2. **Query Intent Matching**: How well does the content match common user queries?
3. **Factual Accuracy & Authority**: How trustworthy and authoritative is the information?
4. **Conversational Readiness**: How suitable is the content for AI chat responses?
5. **Semantic Richness**: How well does the content use relevant semantic keywords?
6. **Context Completeness**: Does the content provide complete, self-contained answers?
7. **Citation Worthiness**: How likely are AI systems to cite this content?
8. **Multi-Query Coverage**: Does the content answer multiple related questions?
Also identify:
- Primary topics and entities
- Missing information gaps
- Optimization opportunities
- Specific enhancement recommendations
IMPORTANT: Respond ONLY with valid JSON. Do not include any text before or after the JSON.
{
"geo_scores": {
"ai_search_visibility": 7.5,
"query_intent_matching": 8.0,
"factual_accuracy": 9.0,
"conversational_readiness": 6.5,
"semantic_richness": 7.0,
"context_completeness": 8.5,
"citation_worthiness": 7.8,
"multi_query_coverage": 6.0
},
"overall_geo_score": 7.5,
"primary_topics": ["topic1", "topic2"],
"entities": ["entity1", "entity2"],
"missing_gaps": ["gap1", "gap2"],
"optimization_opportunities": [
{
"type": "semantic_enhancement",
"description": "Add more related terms",
"priority": "high"
}
],
"recommendations": [
"Specific actionable recommendation 1",
"Specific actionable recommendation 2"
]
}"""
def _get_quick_prompt(self) -> str:
return """Analyze this content for AI search optimization. Provide scores (1-10) for:
1. AI Search Visibility
2. Query Intent Matching
3. Conversational Readiness
4. Citation Worthiness
IMPORTANT: Respond ONLY with valid JSON. Do not include any text before or after the JSON.
{
"scores": {
"ai_search_visibility": 7.5,
"query_intent_matching": 8.0,
"conversational_readiness": 6.5,
"citation_worthiness": 7.8
},
"overall_score": 7.5,
"top_recommendation": "Most important improvement needed"
}"""
def _get_competitive_prompt(self) -> str:
return """Compare these content pieces for GEO performance. Identify which performs better for AI search and why.
IMPORTANT: Respond ONLY with valid JSON. Do not include any text before or after the JSON.
{
"winner": "A",
"score_comparison": {
"content_a_score": 7.5,
"content_b_score": 8.2
},
"key_differences": ["difference1", "difference2"],
"improvement_suggestions": {
"content_a": ["suggestion1"],
"content_b": ["suggestion1"]
}
}"""
def get_prompt(self, prompt_type: str) -> str:
"""Get prompt by type"""
return self.prompts.get(prompt_type, self.prompts['detailed_analysis'])
class GEOScorer:
"""Main class for calculating GEO scores and analysis"""
def __init__(self, llm, config: Optional[GEOConfig] = None, logger: Optional[logging.Logger] = None):
self.llm = llm
self.config = config or GEOConfig()
self.logger = logger or self._setup_logger()
# Initialize components
self.validator = GEOValidator()
self.processor = GEOContentProcessor(self.config)
self.prompt_manager = GEOPromptManager()
# Performance tracking
self.analysis_count = 0
self.cache_hits = 0
def _setup_logger(self) -> logging.Logger:
"""Setup default logger"""
logger = logging.getLogger(__name__)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
@lru_cache(maxsize=100)
def _get_cached_analysis(self, content_hash: str) -> Optional[Dict[str, Any]]:
"""Cache mechanism for repeated analyses"""
# This is a simple in-memory cache using lru_cache
# In production, you might want to use Redis or similar
return None
def analyze_page_geo(self, content: str, title: str, detailed: bool = True) -> Dict[str, Any]:
"""
Analyze a single page for GEO performance with improved error handling and validation
"""
start_time = datetime.now()
self.analysis_count += 1
try:
# Input validation
is_valid, error_msg = self.validator.validate_content_inputs(content, title, self.config)
if not is_valid:
self.logger.warning(f"Input validation failed: {error_msg}")
return {'error': error_msg, 'error_type': 'validation'}
# Check cache
analysis_type = 'detailed' if detailed else 'quick'
content_hash = self.processor.generate_content_hash(content, title, analysis_type)
# Process content
sanitized_content = self.processor.sanitize_content(content)
max_length = self.config.MAX_CONTENT_LENGTH if detailed else self.config.QUICK_CONTENT_LENGTH
processed_content = self.processor.smart_truncate(sanitized_content, max_length)
# Get appropriate prompt
prompt_type = 'detailed_analysis' if detailed else 'quick_analysis'
system_prompt = self.prompt_manager.get_prompt(prompt_type)
user_message = f"Title: {title}\n\nContent: {processed_content}"
# Build and execute prompt
prompt_template = ChatPromptTemplate.from_messages([
SystemMessagePromptTemplate.from_template(system_prompt),
HumanMessagePromptTemplate.from_template(user_message)
])
chain = prompt_template | self.llm
result = chain.invoke({})
# Extract and parse result
result_content = result.content if hasattr(result, 'content') else str(result)
parsed_result = self._parse_llm_response(result_content)
# Add metadata
processing_time = (datetime.now() - start_time).total_seconds()
parsed_result.update({
'analyzed_title': title,
'content_length': len(content),
'processed_content_length': len(processed_content),
'word_count': len(content.split()),
'analysis_type': analysis_type,
'processing_time_seconds': processing_time,
'content_hash': content_hash
})
self.logger.info(f"Analysis completed for '{title}' in {processing_time:.2f}s")
return parsed_result
except json.JSONDecodeError as e:
self.logger.error(f"JSON parsing failed for title '{title}': {e}")
return {'error': 'Invalid response format from LLM', 'error_type': 'parsing', 'title': title}
except Exception as e:
self.logger.error(f"Analysis failed for title '{title}': {e}")
return {'error': f"Analysis failed: {str(e)}", 'error_type': 'system', 'title': title}
def analyze_multiple_pages(self, pages_data: List[Dict[str, Any]], detailed: bool = True) -> List[Dict[str, Any]]:
"""
Analyze multiple pages with improved validation and error handling
"""
# Validate input
is_valid, error_msg = self.validator.validate_pages_data(pages_data)
if not is_valid:
self.logger.error(f"Pages data validation failed: {error_msg}")
return [{'error': error_msg, 'error_type': 'validation'}]
results = []
successful_analyses = 0
self.logger.info(f"Starting analysis of {len(pages_data)} pages")
for i, page_data in enumerate(pages_data):
try:
content = page_data.get('content', '')
title = page_data.get('title', f'Page {i+1}')
analysis = self.analyze_page_geo(content, title, detailed)
# Add page-specific metadata
analysis.update({
'page_url': page_data.get('url', ''),
'page_index': i,
'source_word_count': page_data.get('word_count', 0)
})
if 'error' not in analysis:
successful_analyses += 1
results.append(analysis)
except Exception as e:
self.logger.error(f"Failed to analyze page {i}: {e}")
results.append({
'page_index': i,
'page_url': page_data.get('url', ''),
'error': f"Analysis failed: {str(e)}",
'error_type': 'system'
})
self.logger.info(f"Completed analysis: {successful_analyses}/{len(pages_data)} successful")
return results
def compare_content_geo(self, content_a: str, content_b: str, titles: Optional[Tuple[str, str]] = None) -> Dict[str, Any]:
"""
Compare two pieces of content for GEO performance with improved validation
"""
try:
title_a, title_b = titles if titles else ("Content A", "Content B")
# Validate inputs
is_valid_a, error_a = self.validator.validate_content_inputs(content_a, title_a, self.config)
is_valid_b, error_b = self.validator.validate_content_inputs(content_b, title_b, self.config)
if not is_valid_a:
return {'error': f"Content A validation failed: {error_a}", 'error_type': 'validation'}
if not is_valid_b:
return {'error': f"Content B validation failed: {error_b}", 'error_type': 'validation'}
# Process content
processed_a = self.processor.smart_truncate(
self.processor.sanitize_content(content_a),
self.config.QUICK_CONTENT_LENGTH
)
processed_b = self.processor.smart_truncate(
self.processor.sanitize_content(content_b),
self.config.QUICK_CONTENT_LENGTH
)
# Build comparison prompt
system_prompt = self.prompt_manager.get_prompt('competitive_analysis')
user_message = f"""Content A:
Title: {title_a}
Content: {processed_a}
Content B:
Title: {title_b}
Content: {processed_b}"""
prompt_template = ChatPromptTemplate.from_messages([
SystemMessagePromptTemplate.from_template(system_prompt),
HumanMessagePromptTemplate.from_template(user_message)
])
chain = prompt_template | self.llm
result = chain.invoke({})
result_content = result.content if hasattr(result, 'content') else str(result)
comparison_result = self._parse_llm_response(result_content)
# Add metadata
comparison_result.update({
'content_a_title': title_a,
'content_b_title': title_b,
'content_a_length': len(content_a),
'content_b_length': len(content_b)
})
return comparison_result
except Exception as e:
self.logger.error(f"Comparison analysis failed: {e}")
return {'error': f"Comparison analysis failed: {str(e)}", 'error_type': 'system'}
def calculate_aggregate_scores(self, individual_results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Calculate aggregate GEO scores with improved error handling and insights
"""
try:
# Filter out error results
valid_results = [r for r in individual_results if 'geo_scores' in r and not r.get('error')]
error_results = [r for r in individual_results if r.get('error')]
if not valid_results:
return {
'error': 'No valid results to aggregate',
'error_type': 'no_data',
'total_errors': len(error_results),
'error_breakdown': self._analyze_errors(error_results)
}
# Calculate average scores
score_keys = list(valid_results[0]['geo_scores'].keys())
avg_scores = {}
score_details = {}
for key in score_keys:
scores = [r['geo_scores'][key] for r in valid_results if key in r['geo_scores']]
if scores:
avg_scores[key] = sum(scores) / len(scores)
score_details[key] = {
'average': avg_scores[key],
'min': min(scores),
'max': max(scores),
'count': len(scores)
}
else:
avg_scores[key] = 0
score_details[key] = {'average': 0, 'min': 0, 'max': 0, 'count': 0}
overall_avg = sum(avg_scores.values()) / len(avg_scores) if avg_scores else 0
# Collect insights
insights = self._generate_aggregate_insights(valid_results, avg_scores)
# Find performance patterns
best_score = max(avg_scores.items(), key=lambda x: x[1]) if avg_scores else ('none', 0)
worst_score = min(avg_scores.items(), key=lambda x: x[1]) if avg_scores else ('none', 0)
return {
'aggregate_scores': avg_scores,
'score_details': score_details,
'overall_score': overall_avg,
'pages_analyzed': len(valid_results),
'pages_with_errors': len(error_results),
'success_rate': len(valid_results) / len(individual_results) if individual_results else 0,
'best_performing_metric': {
'metric': best_score[0],
'score': best_score[1]
},
'lowest_performing_metric': {
'metric': worst_score[0],
'score': worst_score[1]
},
'insights': insights,
'score_distribution': self._calculate_score_distribution(avg_scores),
'processing_stats': self._calculate_processing_stats(valid_results)
}
except Exception as e:
self.logger.error(f"Aggregation failed: {e}")
return {'error': f"Aggregation failed: {str(e)}", 'error_type': 'system'}
def get_performance_stats(self) -> Dict[str, Any]:
"""Get performance statistics for the scorer"""
return {
'total_analyses': self.analysis_count,
'cache_hits': self.cache_hits,
'cache_hit_rate': self.cache_hits / max(self.analysis_count, 1),
'config': {
'max_content_length': self.config.MAX_CONTENT_LENGTH,
'cache_size': self.config.CACHE_SIZE
}
}
def _parse_llm_response(self, response_text: str) -> Dict[str, Any]:
"""Enhanced LLM response parsing with better error handling"""
try:
# Clean response text
cleaned_response = response_text.strip()
# Try to find JSON content
json_patterns = [
r'\{.*\}', # Look for JSON object
r'```json\s*(\{.*?\})\s*```', # JSON in code blocks
r'```\s*(\{.*?\})\s*```' # Generic code blocks
]
for pattern in json_patterns:
matches = re.findall(pattern, cleaned_response, re.DOTALL)
if matches:
json_str = matches[0] if isinstance(matches[0], str) else matches[0]
try:
return json.loads(json_str)
except json.JSONDecodeError:
continue
# If no JSON patterns found, try parsing the entire response
try:
return json.loads(cleaned_response)
except json.JSONDecodeError:
pass
# Last resort: return structured error
return {
'raw_response': response_text,
'parsing_error': 'No valid JSON found in response',
'error_type': 'parsing'
}
except Exception as e:
return {
'raw_response': response_text,
'parsing_error': f'Unexpected parsing error: {str(e)}',
'error_type': 'parsing'
}
def _analyze_errors(self, error_results: List[Dict[str, Any]]) -> Dict[str, int]:
"""Analyze error patterns"""
error_breakdown = {}
for result in error_results:
error_type = result.get('error_type', 'unknown')
error_breakdown[error_type] = error_breakdown.get(error_type, 0) + 1
return error_breakdown
def _generate_aggregate_insights(self, valid_results: List[Dict[str, Any]], avg_scores: Dict[str, float]) -> List[str]:
"""Generate insights from aggregate analysis"""
insights = []
if not avg_scores:
return ["No valid scores to analyze"]
overall_avg = sum(avg_scores.values()) / len(avg_scores)
# Performance level insights
if overall_avg >= 8.0:
insights.append("Excellent overall GEO performance across analyzed content")
elif overall_avg >= 6.5:
insights.append("Good GEO performance with room for targeted improvements")
elif overall_avg >= 5.0:
insights.append("Moderate GEO performance - significant optimization opportunities exist")
else:
insights.append("Below-average GEO performance - comprehensive optimization needed")
# Specific metric insights
best_metric = max(avg_scores.items(), key=lambda x: x[1])
worst_metric = min(avg_scores.items(), key=lambda x: x[1])
if best_metric[1] >= 8.0:
insights.append(f"Strong performance in {best_metric[0].replace('_', ' ')} (score: {best_metric[1]:.1f})")
if worst_metric[1] < 6.0:
insights.append(f"Critical improvement needed in {worst_metric[0].replace('_', ' ')} (score: {worst_metric[1]:.1f})")
# Consistency insights
score_values = list(avg_scores.values())
score_range = max(score_values) - min(score_values)
if score_range > 3.0:
insights.append("High variability in scores indicates inconsistent optimization across metrics")
elif score_range < 1.5:
insights.append("Consistent performance across all GEO metrics")
return insights
def _calculate_score_distribution(self, scores: Dict[str, float]) -> Dict[str, Any]:
"""Calculate enhanced score distribution statistics"""
if not scores:
return {}
score_values = list(scores.values())
return {
'highest_score': max(score_values),
'lowest_score': min(score_values),
'average_score': sum(score_values) / len(score_values),
'score_range': max(score_values) - min(score_values),
'scores_above_8': len([s for s in score_values if s >= 8.0]),
'scores_above_7': len([s for s in score_values if s >= 7.0]),
'scores_below_5': len([s for s in score_values if s < 5.0]),
'score_variance': sum((s - sum(score_values)/len(score_values))**2 for s in score_values) / len(score_values)
}
def _calculate_processing_stats(self, valid_results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Calculate processing statistics"""
processing_times = [r.get('processing_time_seconds', 0) for r in valid_results if 'processing_time_seconds' in r]
content_lengths = [r.get('content_length', 0) for r in valid_results if 'content_length' in r]
if not processing_times:
return {}
return {
'avg_processing_time': sum(processing_times) / len(processing_times),
'max_processing_time': max(processing_times),
'min_processing_time': min(processing_times),
'avg_content_length': sum(content_lengths) / len(content_lengths) if content_lengths else 0,
'total_processing_time': sum(processing_times)
}
def _get_timestamp(self) -> str:
"""Get current timestamp"""
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# Example usage and testing utilities
class GEOScorerTester:
"""Testing utilities for GEOScorer"""
@staticmethod
def create_test_content() -> List[Dict[str, Any]]:
"""Create test content for validation"""
return [
{
'title': 'How to Optimize Content for AI Search',
'content': 'AI search engines are revolutionizing how people find information. To optimize your content for AI-powered search, focus on creating comprehensive, factual, and well-structured content that directly answers user questions. Use semantic keywords, provide clear context, and ensure your content is authoritative and cite-worthy.',
'url': 'https://example.com/ai-search-optimization'
},
{
'title': 'Best Practices for GEO',
'content': 'Generative Engine Optimization (GEO) requires a different approach than traditional SEO. Focus on conversational readiness, semantic richness, and multi-query coverage. Ensure your content provides complete answers and is structured in a way that AI systems can easily understand and cite.',
'url': 'https://example.com/geo-best-practices'
}
]
@staticmethod
def run_basic_test(scorer: GEOScorer) -> Dict[str, Any]:
"""Run basic functionality test"""
test_content = GEOScorerTester.create_test_content()
results = scorer.analyze_multiple_pages(test_content, detailed=False)
aggregate = scorer.calculate_aggregate_scores(results)
stats = scorer.get_performance_stats()
return {
'individual_results': results,
'aggregate_results': aggregate,
'performance_stats': stats
}