Spaces:
Runtime error
Runtime error
| """ | |
| Fixed GEO Scoring Module - Drop-in replacement for your original | |
| This version fixes the data format issues while keeping your existing structure | |
| """ | |
| import json | |
| import re | |
| import logging | |
| from typing import Dict, Any, List, Union, Optional | |
| from datetime import datetime | |
| from langchain.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate | |
| class GEOScorer: | |
| """Main class for calculating GEO scores and analysis - IMPROVED VERSION""" | |
| def __init__(self, llm, logger=None): | |
| self.llm = llm | |
| self.logger = logger or self._setup_logger() | |
| self.setup_prompts() | |
| def _setup_logger(self): | |
| """Setup default logger""" | |
| logger = logging.getLogger(__name__) | |
| if not logger.handlers: | |
| handler = logging.StreamHandler() | |
| formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') | |
| handler.setFormatter(formatter) | |
| logger.addHandler(handler) | |
| logger.setLevel(logging.INFO) | |
| return logger | |
| def setup_prompts(self): | |
| """Initialize prompts for different types of analysis""" | |
| # Main GEO analysis prompt | |
| self.geo_analysis_prompt = """You are a Generative Engine Optimizer (GEO) specialist. Analyze the provided content for its effectiveness in AI-powered search engines and LLM systems. | |
| Evaluate the content based on these GEO criteria (score 1-10 each): | |
| 1. **AI Search Visibility**: How likely is this content to be surfaced by AI search engines? | |
| 2. **Query Intent Matching**: How well does the content match common user queries? | |
| 3. **Factual Accuracy & Authority**: How trustworthy and authoritative is the information? | |
| 4. **Conversational Readiness**: How suitable is the content for AI chat responses? | |
| 5. **Semantic Richness**: How well does the content use relevant semantic keywords? | |
| 6. **Context Completeness**: Does the content provide complete, self-contained answers? | |
| 7. **Citation Worthiness**: How likely are AI systems to cite this content? | |
| 8. **Multi-Query Coverage**: Does the content answer multiple related questions? | |
| Also identify: | |
| - Primary topics and entities | |
| - Missing information gaps | |
| - Optimization opportunities | |
| - Specific enhancement recommendations | |
| IMPORTANT: Respond ONLY with valid JSON. Do not include any text before or after the JSON. | |
| { | |
| "geo_scores": { | |
| "ai_search_visibility": 7.5, | |
| "query_intent_matching": 8.0, | |
| "factual_accuracy": 9.0, | |
| "conversational_readiness": 6.5, | |
| "semantic_richness": 7.0, | |
| "context_completeness": 8.5, | |
| "citation_worthiness": 7.8, | |
| "multi_query_coverage": 6.0 | |
| }, | |
| "overall_geo_score": 7.5, | |
| "primary_topics": ["topic1", "topic2"], | |
| "entities": ["entity1", "entity2"], | |
| "missing_gaps": ["gap1", "gap2"], | |
| "optimization_opportunities": [ | |
| { | |
| "type": "semantic_enhancement", | |
| "description": "Add more related terms", | |
| "priority": "high" | |
| } | |
| ], | |
| "recommendations": [ | |
| "Specific actionable recommendation 1", | |
| "Specific actionable recommendation 2" | |
| ] | |
| }""" | |
| # Quick scoring prompt for faster analysis | |
| self.quick_score_prompt = """Analyze this content for AI search optimization. Provide scores (1-10) for: | |
| 1. AI Search Visibility | |
| 2. Query Intent Matching | |
| 3. Conversational Readiness | |
| 4. Citation Worthiness | |
| IMPORTANT: Respond ONLY with valid JSON. Do not include any text before or after the JSON. | |
| { | |
| "scores": { | |
| "ai_search_visibility": 7.5, | |
| "query_intent_matching": 8.0, | |
| "conversational_readiness": 6.5, | |
| "citation_worthiness": 7.8 | |
| }, | |
| "overall_score": 7.5, | |
| "top_recommendation": "Most important improvement needed" | |
| }""" | |
| # Competitive analysis prompt | |
| self.competitive_prompt = """Compare these content pieces for GEO performance. Identify which performs better for AI search and why. | |
| Content A: {content_a} | |
| Content B: {content_b} | |
| IMPORTANT: Respond ONLY with valid JSON. Do not include any text before or after the JSON. | |
| { | |
| "winner": "A", | |
| "score_comparison": { | |
| "content_a_score": 7.5, | |
| "content_b_score": 8.2 | |
| }, | |
| "key_differences": ["difference1", "difference2"], | |
| "improvement_suggestions": { | |
| "content_a": ["suggestion1"], | |
| "content_b": ["suggestion1"] | |
| } | |
| }""" | |
| def _normalize_page_data(self, page_data): | |
| """ | |
| FIXED: Normalize different data formats from web scrapers | |
| This handles the 'content' key error you were seeing | |
| """ | |
| if not isinstance(page_data, dict): | |
| self.logger.warning(f"Expected dict, got {type(page_data)}") | |
| return None | |
| # Try different field names for content | |
| content_fields = ['content', 'text', 'body', 'html_content', 'page_content', 'main_content'] | |
| content = "" | |
| for field in content_fields: | |
| if field in page_data and page_data[field]: | |
| content = str(page_data[field]) | |
| break | |
| if not content: | |
| self.logger.warning(f"No content found in page data. Available keys: {list(page_data.keys())}") | |
| return None | |
| # Try different field names for title | |
| title_fields = ['title', 'page_title', 'heading', 'h1', 'name'] | |
| title = "Untitled Page" | |
| for field in title_fields: | |
| if field in page_data and page_data[field]: | |
| title = str(page_data[field]) | |
| break | |
| # Try different field names for URL | |
| url_fields = ['url', 'link', 'page_url', 'source_url', 'href'] | |
| url = "" | |
| for field in url_fields: | |
| if field in page_data and page_data[field]: | |
| url = str(page_data[field]) | |
| break | |
| return { | |
| 'content': content, | |
| 'title': title, | |
| 'url': url, | |
| 'word_count': len(content.split()) if content else 0 | |
| } | |
| def _sanitize_content(self, content): | |
| """Basic content sanitization""" | |
| if not content: | |
| return "" | |
| # Remove potential prompt injection patterns | |
| dangerous_patterns = [ | |
| r'ignore\s+previous\s+instructions', | |
| r'system\s*:', | |
| r'assistant\s*:', | |
| ] | |
| sanitized = content | |
| for pattern in dangerous_patterns: | |
| sanitized = re.sub(pattern, '[FILTERED]', sanitized, flags=re.IGNORECASE) | |
| return sanitized[:8000] # Limit length | |
| def analyze_page_geo(self, content: str, title: str, detailed: bool = True) -> Dict[str, Any]: | |
| """ | |
| Analyze a single page for GEO performance | |
| FIXED: Better error handling and validation | |
| """ | |
| try: | |
| # Input validation | |
| if not content or not content.strip(): | |
| return {'error': 'Empty or missing content', 'error_type': 'validation'} | |
| if len(content.strip()) < 50: | |
| return {'error': 'Content too short for analysis', 'error_type': 'validation'} | |
| # Sanitize content | |
| sanitized_content = self._sanitize_content(content) | |
| # Choose prompt based on detail level | |
| if detailed: | |
| system_prompt = self.geo_analysis_prompt | |
| max_length = 8000 | |
| else: | |
| system_prompt = self.quick_score_prompt | |
| max_length = 4000 | |
| # Smart truncation | |
| if len(sanitized_content) > max_length: | |
| truncated = sanitized_content[:max_length] | |
| # Try to end at a sentence | |
| last_period = truncated.rfind('. ') | |
| if last_period > max_length * 0.8: | |
| sanitized_content = truncated[:last_period + 1] | |
| else: | |
| sanitized_content = truncated + "..." | |
| user_message = f"Title: {title}\n\nContent: {sanitized_content}" | |
| # Build prompt and run analysis | |
| prompt_template = ChatPromptTemplate.from_messages([ | |
| SystemMessagePromptTemplate.from_template(system_prompt), | |
| HumanMessagePromptTemplate.from_template(user_message) | |
| ]) | |
| chain = prompt_template | self.llm | |
| result = chain.invoke({}) | |
| # Extract and parse result | |
| result_content = result.content if hasattr(result, 'content') else str(result) | |
| parsed_result = self._parse_llm_response(result_content) | |
| # Add metadata | |
| parsed_result.update({ | |
| 'analyzed_title': title, | |
| 'content_length': len(content), | |
| 'word_count': len(content.split()), | |
| 'analysis_type': 'detailed' if detailed else 'quick' | |
| }) | |
| return parsed_result | |
| except json.JSONDecodeError as e: | |
| self.logger.error(f"JSON parsing failed for '{title}': {e}") | |
| return {'error': 'Invalid response format from LLM', 'error_type': 'parsing'} | |
| except Exception as e: | |
| self.logger.error(f"Analysis failed for '{title}': {e}") | |
| return {'error': f"Analysis failed: {str(e)}", 'error_type': 'system'} | |
| def analyze_multiple_pages(self, pages_data: List[Dict[str, Any]], detailed: bool = True) -> List[Dict[str, Any]]: | |
| """ | |
| FIXED: Analyze multiple pages with automatic data normalization | |
| This handles different data formats from web scrapers | |
| """ | |
| if not pages_data: | |
| self.logger.error("No pages data provided") | |
| return [{'error': 'No pages data provided', 'error_type': 'validation'}] | |
| results = [] | |
| successful_analyses = 0 | |
| self.logger.info(f"Starting analysis of {len(pages_data)} pages") | |
| for i, page_data in enumerate(pages_data): | |
| try: | |
| # FIXED: Normalize the data format | |
| normalized_page = self._normalize_page_data(page_data) | |
| if not normalized_page: | |
| self.logger.warning(f"Page {i}: Could not extract content. Available keys: {list(page_data.keys()) if isinstance(page_data, dict) else 'Not a dict'}") | |
| results.append({ | |
| 'page_index': i, | |
| 'error': 'Could not extract content from page data', | |
| 'error_type': 'data_format', | |
| 'available_keys': list(page_data.keys()) if isinstance(page_data, dict) else None | |
| }) | |
| continue | |
| content = normalized_page['content'] | |
| title = normalized_page['title'] | |
| analysis = self.analyze_page_geo(content, title, detailed) | |
| # Add page-specific metadata | |
| analysis.update({ | |
| 'page_url': normalized_page.get('url', ''), | |
| 'page_index': i, | |
| 'source_word_count': normalized_page.get('word_count', 0) | |
| }) | |
| if 'error' not in analysis: | |
| successful_analyses += 1 | |
| results.append(analysis) | |
| except Exception as e: | |
| self.logger.error(f"Failed to analyze page {i}: {e}") | |
| results.append({ | |
| 'page_index': i, | |
| 'error': f"Analysis failed: {str(e)}", | |
| 'error_type': 'system' | |
| }) | |
| self.logger.info(f"Completed analysis: {successful_analyses}/{len(pages_data)} successful") | |
| return results | |
| def compare_content_geo(self, content_a: str, content_b: str, titles: tuple = None) -> Dict[str, Any]: | |
| """ | |
| Compare two pieces of content for GEO performance | |
| """ | |
| try: | |
| title_a, title_b = titles if titles else ("Content A", "Content B") | |
| # Sanitize content | |
| content_a = self._sanitize_content(content_a) | |
| content_b = self._sanitize_content(content_b) | |
| # Format the competitive analysis prompt | |
| formatted_prompt = self.competitive_prompt.format( | |
| content_a=f"Title: {title_a}\nContent: {content_a[:4000]}", | |
| content_b=f"Title: {title_b}\nContent: {content_b[:4000]}" | |
| ) | |
| chain = ChatPromptTemplate.from_messages([ | |
| ("system", formatted_prompt), | |
| ("user", "Perform the comparison analysis.") | |
| ]) | self.llm | |
| result = chain.invoke({}) | |
| result_content = result.content if hasattr(result, 'content') else str(result) | |
| return self._parse_llm_response(result_content) | |
| except Exception as e: | |
| self.logger.error(f"Comparison analysis failed: {e}") | |
| return {'error': f"Comparison analysis failed: {str(e)}", 'error_type': 'system'} | |
| def calculate_aggregate_scores(self, individual_results: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """ | |
| Calculate aggregate GEO scores from multiple page analyses | |
| FIXED: Better error handling for missing data | |
| """ | |
| try: | |
| valid_results = [r for r in individual_results if 'geo_scores' in r and not r.get('error')] | |
| error_results = [r for r in individual_results if r.get('error')] | |
| if not valid_results: | |
| error_summary = {} | |
| for result in error_results: | |
| error_type = result.get('error_type', 'unknown') | |
| error_summary[error_type] = error_summary.get(error_type, 0) + 1 | |
| return { | |
| 'error': 'No valid results to aggregate', | |
| 'error_type': 'no_data', | |
| 'total_pages': len(individual_results), | |
| 'error_breakdown': error_summary, | |
| 'sample_errors': [r.get('error', 'Unknown error') for r in error_results[:3]] | |
| } | |
| # Calculate average scores | |
| score_keys = list(valid_results[0]['geo_scores'].keys()) | |
| avg_scores = {} | |
| for key in score_keys: | |
| scores = [r['geo_scores'][key] for r in valid_results if key in r['geo_scores']] | |
| avg_scores[key] = sum(scores) / len(scores) if scores else 0 | |
| overall_avg = sum(avg_scores.values()) / len(avg_scores) if avg_scores else 0 | |
| # Collect all recommendations and opportunities | |
| all_recommendations = [] | |
| all_opportunities = [] | |
| all_topics = [] | |
| all_entities = [] | |
| for result in valid_results: | |
| all_recommendations.extend(result.get('recommendations', [])) | |
| all_opportunities.extend(result.get('optimization_opportunities', [])) | |
| all_topics.extend(result.get('primary_topics', [])) | |
| all_entities.extend(result.get('entities', [])) | |
| # Remove duplicates | |
| unique_recommendations = list(set(all_recommendations)) | |
| unique_topics = list(set(all_topics)) | |
| unique_entities = list(set(all_entities)) | |
| # Find highest and lowest performing areas | |
| best_score = max(avg_scores.items(), key=lambda x: x[1]) if avg_scores else ('none', 0) | |
| worst_score = min(avg_scores.items(), key=lambda x: x[1]) if avg_scores else ('none', 0) | |
| return { | |
| 'aggregate_scores': avg_scores, | |
| 'overall_score': overall_avg, | |
| 'pages_analyzed': len(valid_results), | |
| 'pages_with_errors': len(error_results), | |
| 'success_rate': len(valid_results) / len(individual_results) if individual_results else 0, | |
| 'best_performing_metric': { | |
| 'metric': best_score[0], | |
| 'score': best_score[1] | |
| }, | |
| 'lowest_performing_metric': { | |
| 'metric': worst_score[0], | |
| 'score': worst_score[1] | |
| }, | |
| 'consolidated_recommendations': unique_recommendations[:10], | |
| 'all_topics': unique_topics, | |
| 'all_entities': unique_entities, | |
| 'high_priority_opportunities': [ | |
| opp for opp in all_opportunities | |
| if isinstance(opp, dict) and opp.get('priority') == 'high' | |
| ][:5], | |
| 'score_distribution': self._calculate_score_distribution(avg_scores) | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Aggregation failed: {e}") | |
| return {'error': f"Aggregation failed: {str(e)}", 'error_type': 'system'} | |
| def generate_geo_report(self, analysis_results: Dict[str, Any], website_url: str = None) -> Dict[str, Any]: | |
| """ | |
| Generate a comprehensive GEO report | |
| """ | |
| try: | |
| report = { | |
| 'report_metadata': { | |
| 'generated_at': self._get_timestamp(), | |
| 'website_url': website_url, | |
| 'analysis_type': 'GEO Performance Report' | |
| }, | |
| 'executive_summary': self._generate_executive_summary(analysis_results), | |
| 'detailed_scores': analysis_results.get('aggregate_scores', {}), | |
| 'performance_insights': self._generate_performance_insights(analysis_results), | |
| 'actionable_recommendations': self._prioritize_recommendations( | |
| analysis_results.get('consolidated_recommendations', []) | |
| ), | |
| 'optimization_roadmap': self._create_optimization_roadmap(analysis_results), | |
| 'competitive_position': self._assess_competitive_position(analysis_results), | |
| 'technical_details': { | |
| 'pages_analyzed': analysis_results.get('pages_analyzed', 0), | |
| 'overall_score': analysis_results.get('overall_score', 0), | |
| 'score_distribution': analysis_results.get('score_distribution', {}) | |
| } | |
| } | |
| return report | |
| except Exception as e: | |
| self.logger.error(f"Report generation failed: {e}") | |
| return {'error': f"Report generation failed: {str(e)}", 'error_type': 'system'} | |
| def _parse_llm_response(self, response_text: str) -> Dict[str, Any]: | |
| """FIXED: Enhanced LLM response parsing""" | |
| try: | |
| # Clean response text | |
| cleaned_response = response_text.strip() | |
| # Try to find JSON content with multiple patterns | |
| json_patterns = [ | |
| r'\{.*\}', # Simple JSON object | |
| r'```json\s*(\{.*?\})\s*```', # JSON in code blocks | |
| r'```\s*(\{.*?\})\s*```' # Generic code blocks | |
| ] | |
| for pattern in json_patterns: | |
| matches = re.findall(pattern, cleaned_response, re.DOTALL) | |
| if matches: | |
| json_str = matches[0] if len(matches) == 1 else matches[0] | |
| try: | |
| return json.loads(json_str) | |
| except json.JSONDecodeError: | |
| continue | |
| # Try parsing the entire response | |
| try: | |
| return json.loads(cleaned_response) | |
| except json.JSONDecodeError: | |
| pass | |
| # If all else fails, return structured error | |
| return { | |
| 'raw_response': response_text[:500], | |
| 'parsing_error': 'No valid JSON found in LLM response', | |
| 'error_type': 'parsing' | |
| } | |
| except Exception as e: | |
| return { | |
| 'raw_response': response_text[:500], | |
| 'parsing_error': f'Parsing error: {str(e)}', | |
| 'error_type': 'parsing' | |
| } | |
| def _calculate_score_distribution(self, scores: Dict[str, float]) -> Dict[str, Any]: | |
| """Calculate distribution of scores for insights""" | |
| if not scores: | |
| return {} | |
| score_values = list(scores.values()) | |
| return { | |
| 'highest_score': max(score_values), | |
| 'lowest_score': min(score_values), | |
| 'average_score': sum(score_values) / len(score_values), | |
| 'score_range': max(score_values) - min(score_values), | |
| 'scores_above_7': len([s for s in score_values if s >= 7.0]), | |
| 'scores_below_5': len([s for s in score_values if s < 5.0]) | |
| } | |
| def _generate_executive_summary(self, analysis_results: Dict[str, Any]) -> str: | |
| """Generate executive summary based on analysis results""" | |
| overall_score = analysis_results.get('overall_score', 0) | |
| pages_analyzed = analysis_results.get('pages_analyzed', 0) | |
| if overall_score >= 8.0: | |
| performance = "excellent" | |
| elif overall_score >= 6.5: | |
| performance = "good" | |
| elif overall_score >= 5.0: | |
| performance = "moderate" | |
| else: | |
| performance = "needs improvement" | |
| return f"Analysis of {pages_analyzed} pages shows {performance} GEO performance with an overall score of {overall_score:.1f}/10. Key opportunities exist in {analysis_results.get('lowest_performing_metric', {}).get('metric', 'multiple areas')}." | |
| def _generate_performance_insights(self, analysis_results: Dict[str, Any]) -> List[str]: | |
| """Generate performance insights based on analysis""" | |
| insights = [] | |
| best_metric = analysis_results.get('best_performing_metric', {}) | |
| worst_metric = analysis_results.get('lowest_performing_metric', {}) | |
| if best_metric.get('score', 0) >= 8.0: | |
| insights.append(f"Strong performance in {best_metric.get('metric', 'unknown')} (score: {best_metric.get('score', 0):.1f})") | |
| if worst_metric.get('score', 10) < 6.0: | |
| insights.append(f"Significant improvement needed in {worst_metric.get('metric', 'unknown')} (score: {worst_metric.get('score', 0):.1f})") | |
| score_dist = analysis_results.get('score_distribution', {}) | |
| if score_dist.get('score_range', 0) > 3.0: | |
| insights.append("High variability in scores indicates inconsistent optimization across metrics") | |
| return insights | |
| def _prioritize_recommendations(self, recommendations: List[str]) -> List[Dict[str, Any]]: | |
| """Prioritize recommendations based on impact potential""" | |
| prioritized = [] | |
| # Simple prioritization based on keywords | |
| high_impact_keywords = ['semantic', 'structure', 'authority', 'factual'] | |
| medium_impact_keywords = ['readability', 'clarity', 'format'] | |
| for i, rec in enumerate(recommendations): | |
| priority = 'low' | |
| if any(keyword in rec.lower() for keyword in high_impact_keywords): | |
| priority = 'high' | |
| elif any(keyword in rec.lower() for keyword in medium_impact_keywords): | |
| priority = 'medium' | |
| prioritized.append({ | |
| 'recommendation': rec, | |
| 'priority': priority, | |
| 'order': i + 1 | |
| }) | |
| # Sort by priority | |
| priority_order = {'high': 1, 'medium': 2, 'low': 3} | |
| prioritized.sort(key=lambda x: priority_order[x['priority']]) | |
| return prioritized | |
| def _create_optimization_roadmap(self, analysis_results: Dict[str, Any]) -> Dict[str, List[str]]: | |
| """Create a phased optimization roadmap""" | |
| roadmap = { | |
| 'immediate_actions': [], | |
| 'short_term_goals': [], | |
| 'long_term_strategy': [] | |
| } | |
| overall_score = analysis_results.get('overall_score', 0) | |
| worst_metric = analysis_results.get('lowest_performing_metric', {}) | |
| # Immediate actions based on worst performing metric | |
| if worst_metric.get('score', 10) < 5.0: | |
| roadmap['immediate_actions'].append(f"Address critical issues in {worst_metric.get('metric', 'low-scoring areas')}") | |
| # Short-term goals | |
| if overall_score < 7.0: | |
| roadmap['short_term_goals'].append("Improve overall GEO score to above 7.0") | |
| roadmap['short_term_goals'].append("Enhance content structure and semantic richness") | |
| # Long-term strategy | |
| roadmap['long_term_strategy'].append("Establish consistent GEO optimization process") | |
| roadmap['long_term_strategy'].append("Monitor and track AI search performance") | |
| return roadmap | |
| def _assess_competitive_position(self, analysis_results: Dict[str, Any]) -> Dict[str, Any]: | |
| """Assess competitive position based on scores""" | |
| overall_score = analysis_results.get('overall_score', 0) | |
| if overall_score >= 8.5: | |
| position = "market_leader" | |
| description = "Content is highly optimized for AI search engines" | |
| elif overall_score >= 7.0: | |
| position = "competitive" | |
| description = "Content performs well but has room for improvement" | |
| elif overall_score >= 5.5: | |
| position = "average" | |
| description = "Content meets basic standards but lacks optimization" | |
| else: | |
| position = "needs_work" | |
| description = "Content requires significant optimization for AI search" | |
| return { | |
| 'position': position, | |
| 'description': description, | |
| 'score': overall_score, | |
| 'percentile_estimate': min(overall_score * 10, 100) | |
| } | |
| def _get_timestamp(self) -> str: | |
| """Get current timestamp""" | |
| return datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| # Debug utility function | |
| def debug_scraped_data_format(scraped_data): | |
| """ | |
| Quick debug function to see what your scraper is returning | |
| Add this to your code to debug data format issues | |
| """ | |
| print("=== SCRAPED DATA DEBUG ===") | |
| print(f"Data type: {type(scraped_data)}") | |
| if isinstance(scraped_data, list): | |
| print(f"List length: {len(scraped_data)}") | |
| if scraped_data: | |
| print(f"First item type: {type(scraped_data[0])}") | |
| if isinstance(scraped_data[0], dict): | |
| print(f"First item keys: {list(scraped_data[0].keys())}") | |
| for key, value in list(scraped_data[0].items())[:3]: | |
| print(f" {key}: {str(value)[:100]}...") | |
| elif isinstance(scraped_data, dict): | |
| print(f"Dict keys: {list(scraped_data.keys())}") | |
| for key, value in list(scraped_data.items())[:3]: | |
| print(f" {key}: {str(value)[:100]}...") | |
| print("=== END DEBUG ===") |