Alpha108 commited on
Commit
bf3d3b7
·
verified ·
1 Parent(s): 5ecf52b

Update utils/scorer.py

Browse files
Files changed (1) hide show
  1. utils/scorer.py +474 -278
utils/scorer.py CHANGED
@@ -1,25 +1,153 @@
1
  """
2
- GEO Scoring Module
3
  Analyzes content for Generative Engine Optimization (GEO) performance
4
  """
5
 
6
  import json
7
- from typing import Dict, Any, List
 
 
 
 
 
 
 
8
  from langchain.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate
9
 
10
 
11
- class GEOScorer:
12
- """Main class for calculating GEO scores and analysis"""
 
 
 
 
 
 
 
 
 
 
 
 
13
 
14
- def __init__(self, llm):
15
- self.llm = llm
16
- self.setup_prompts()
 
 
 
 
 
 
 
 
 
 
 
 
 
17
 
18
- def setup_prompts(self):
19
- """Initialize prompts for different types of analysis"""
 
 
 
20
 
21
- # Main GEO analysis prompt
22
- self.geo_analysis_prompt = """You are a Generative Engine Optimizer (GEO) specialist. Analyze the provided content for its effectiveness in AI-powered search engines and LLM systems.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
 
24
  Evaluate the content based on these GEO criteria (score 1-10 each):
25
 
@@ -38,7 +166,7 @@ Also identify:
38
  - Optimization opportunities
39
  - Specific enhancement recommendations
40
 
41
- Format your response as JSON (do NOT use curly braces for variables):
42
 
43
  {
44
  "geo_scores": {
@@ -66,19 +194,18 @@ Format your response as JSON (do NOT use curly braces for variables):
66
  "Specific actionable recommendation 1",
67
  "Specific actionable recommendation 2"
68
  ]
69
- }
70
- """
71
-
72
- # Quick scoring prompt for faster analysis
73
- self.quick_score_prompt = """Analyze this content for AI search optimization. Provide scores (1-10) for:
74
 
75
  1. AI Search Visibility
76
  2. Query Intent Matching
77
  3. Conversational Readiness
78
  4. Citation Worthiness
79
 
80
- Respond in JSON format:
81
- ```json
82
  {
83
  "scores": {
84
  "ai_search_visibility": 7.5,
@@ -88,20 +215,15 @@ Respond in JSON format:
88
  },
89
  "overall_score": 7.5,
90
  "top_recommendation": "Most important improvement needed"
91
- }
92
- ```"""
93
-
94
- # Competitive analysis prompt
95
- self.competitive_prompt = """Compare these content pieces for GEO performance. Identify which performs better for AI search and why.
96
-
97
- Content A: {content_a}
98
 
99
- Content B: {content_b}
100
 
101
- Provide analysis in JSON:
102
- ```json
103
  {
104
- "winner": "A" or "B",
105
  "score_comparison": {
106
  "content_a_score": 7.5,
107
  "content_b_score": 8.2
@@ -111,61 +233,126 @@ Provide analysis in JSON:
111
  "content_a": ["suggestion1"],
112
  "content_b": ["suggestion1"]
113
  }
114
- }
115
- ```"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
116
 
117
  def analyze_page_geo(self, content: str, title: str, detailed: bool = True) -> Dict[str, Any]:
118
  """
119
- Analyze a single page for GEO performance
120
  """
 
 
 
121
  try:
122
- # Choose prompt based on detail level
123
- if detailed:
124
- system_prompt = self.geo_analysis_prompt
125
- user_message = f"Title: {title}\n\nContent: {content[:8000]}"
126
- else:
127
- system_prompt = self.quick_score_prompt
128
- user_message = f"Title: {title}\n\nContent: {content[:4000]}"
129
-
130
- # Build prompt and run analysis
 
 
 
 
 
 
 
 
 
 
 
 
131
  prompt_template = ChatPromptTemplate.from_messages([
132
  SystemMessagePromptTemplate.from_template(system_prompt),
133
  HumanMessagePromptTemplate.from_template(user_message)
134
  ])
135
- # ("user", user_message)
136
- # ("system", system_prompt),
137
  chain = prompt_template | self.llm
138
- result = chain.invoke({}) # No variables needed
139
-
140
  # Extract and parse result
141
  result_content = result.content if hasattr(result, 'content') else str(result)
142
  parsed_result = self._parse_llm_response(result_content)
143
-
144
  # Add metadata
 
145
  parsed_result.update({
146
  'analyzed_title': title,
147
  'content_length': len(content),
 
148
  'word_count': len(content.split()),
149
- 'analysis_type': 'detailed' if detailed else 'quick'
 
 
150
  })
151
-
 
152
  return parsed_result
153
 
 
 
 
 
154
  except Exception as e:
155
- return {'error': f"GEO analysis failed: {str(e)}"}
 
156
 
157
  def analyze_multiple_pages(self, pages_data: List[Dict[str, Any]], detailed: bool = True) -> List[Dict[str, Any]]:
158
  """
159
- Analyze multiple pages and return consolidated results
160
-
161
- Args:
162
- pages_data (List[Dict]): List of page data with content and metadata
163
- detailed (bool): Whether to perform detailed analysis
164
-
165
- Returns:
166
- List[Dict]: List of GEO analysis results
167
  """
 
 
 
 
 
 
168
  results = []
 
 
 
169
 
170
  for i, page_data in enumerate(pages_data):
171
  try:
@@ -181,107 +368,136 @@ Provide analysis in JSON:
181
  'source_word_count': page_data.get('word_count', 0)
182
  })
183
 
 
 
 
184
  results.append(analysis)
185
 
186
  except Exception as e:
 
187
  results.append({
188
  'page_index': i,
189
  'page_url': page_data.get('url', ''),
190
- 'error': f"Analysis failed: {str(e)}"
 
191
  })
192
 
 
193
  return results
194
 
195
- def compare_content_geo(self, content_a: str, content_b: str, titles: tuple = None) -> Dict[str, Any]:
196
  """
197
- Compare two pieces of content for GEO performance
198
-
199
- Args:
200
- content_a (str): First content to compare
201
- content_b (str): Second content to compare
202
- titles (tuple): Optional titles for the content pieces
203
-
204
- Returns:
205
- Dict: Comparison analysis results
206
  """
207
  try:
208
  title_a, title_b = titles if titles else ("Content A", "Content B")
209
 
210
- prompt_template = ChatPromptTemplate.from_messages([
211
- ("system", self.competitive_prompt),
212
- ("user", "")
213
- ])
 
 
 
 
214
 
215
- # Format the competitive analysis prompt
216
- formatted_prompt = self.competitive_prompt.format(
217
- content_a=f"Title: {title_a}\nContent: {content_a[:4000]}",
218
- content_b=f"Title: {title_b}\nContent: {content_b[:4000]}"
 
 
 
 
219
  )
220
 
221
- chain = ChatPromptTemplate.from_messages([
222
- ("system", formatted_prompt),
223
- ("user", "Perform the comparison analysis.")
224
- ]) | self.llm
 
 
 
 
 
 
 
 
 
 
225
 
 
226
  result = chain.invoke({})
227
  result_content = result.content if hasattr(result, 'content') else str(result)
228
 
229
- return self._parse_llm_response(result_content)
 
 
 
 
 
 
 
 
 
 
230
 
231
  except Exception as e:
232
- return {'error': f"Comparison analysis failed: {str(e)}"}
 
233
 
234
  def calculate_aggregate_scores(self, individual_results: List[Dict[str, Any]]) -> Dict[str, Any]:
235
  """
236
- Calculate aggregate GEO scores from multiple page analyses
237
-
238
- Args:
239
- individual_results (List[Dict]): List of individual page analysis results
240
-
241
- Returns:
242
- Dict: Aggregate scores and insights
243
  """
244
  try:
 
245
  valid_results = [r for r in individual_results if 'geo_scores' in r and not r.get('error')]
 
246
 
247
  if not valid_results:
248
- return {'error': 'No valid results to aggregate'}
 
 
 
 
 
249
 
250
  # Calculate average scores
251
  score_keys = list(valid_results[0]['geo_scores'].keys())
252
  avg_scores = {}
 
253
 
254
  for key in score_keys:
255
  scores = [r['geo_scores'][key] for r in valid_results if key in r['geo_scores']]
256
- avg_scores[key] = sum(scores) / len(scores) if scores else 0
 
 
 
 
 
 
 
 
 
 
257
 
258
  overall_avg = sum(avg_scores.values()) / len(avg_scores) if avg_scores else 0
259
 
260
- # Collect all recommendations and opportunities
261
- all_recommendations = []
262
- all_opportunities = []
263
- all_topics = []
264
- all_entities = []
265
-
266
- for result in valid_results:
267
- all_recommendations.extend(result.get('recommendations', []))
268
- all_opportunities.extend(result.get('optimization_opportunities', []))
269
- all_topics.extend(result.get('primary_topics', []))
270
- all_entities.extend(result.get('entities', []))
271
-
272
- # Remove duplicates and prioritize
273
- unique_recommendations = list(set(all_recommendations))
274
- unique_topics = list(set(all_topics))
275
- unique_entities = list(set(all_entities))
276
 
277
- # Find highest and lowest performing areas
278
  best_score = max(avg_scores.items(), key=lambda x: x[1]) if avg_scores else ('none', 0)
279
  worst_score = min(avg_scores.items(), key=lambda x: x[1]) if avg_scores else ('none', 0)
280
 
281
  return {
282
  'aggregate_scores': avg_scores,
 
283
  'overall_score': overall_avg,
284
  'pages_analyzed': len(valid_results),
 
 
285
  'best_performing_metric': {
286
  'metric': best_score[0],
287
  'score': best_score[1]
@@ -290,205 +506,185 @@ Provide analysis in JSON:
290
  'metric': worst_score[0],
291
  'score': worst_score[1]
292
  },
293
- 'consolidated_recommendations': unique_recommendations[:10],
294
- 'all_topics': unique_topics,
295
- 'all_entities': unique_entities,
296
- 'high_priority_opportunities': [
297
- opp for opp in all_opportunities
298
- if opp.get('priority') == 'high'
299
- ][:5],
300
- 'score_distribution': self._calculate_score_distribution(avg_scores)
301
  }
302
 
303
  except Exception as e:
304
- return {'error': f"Aggregation failed: {str(e)}"}
 
305
 
306
- def generate_geo_report(self, analysis_results: Dict[str, Any], website_url: str = None) -> Dict[str, Any]:
307
- """
308
- Generate a comprehensive GEO report
309
-
310
- Args:
311
- analysis_results (Dict): Results from aggregate analysis
312
- website_url (str): Optional website URL for context
313
-
314
- Returns:
315
- Dict: Comprehensive GEO report
316
- """
317
- try:
318
- report = {
319
- 'report_metadata': {
320
- 'generated_at': self._get_timestamp(),
321
- 'website_url': website_url,
322
- 'analysis_type': 'GEO Performance Report'
323
- },
324
- 'executive_summary': self._generate_executive_summary(analysis_results),
325
- 'detailed_scores': analysis_results.get('aggregate_scores', {}),
326
- 'performance_insights': self._generate_performance_insights(analysis_results),
327
- 'actionable_recommendations': self._prioritize_recommendations(
328
- analysis_results.get('consolidated_recommendations', [])
329
- ),
330
- 'optimization_roadmap': self._create_optimization_roadmap(analysis_results),
331
- 'competitive_position': self._assess_competitive_position(analysis_results),
332
- 'technical_details': {
333
- 'pages_analyzed': analysis_results.get('pages_analyzed', 0),
334
- 'overall_score': analysis_results.get('overall_score', 0),
335
- 'score_distribution': analysis_results.get('score_distribution', {})
336
- }
337
  }
338
-
339
- return report
340
-
341
- except Exception as e:
342
- return {'error': f"Report generation failed: {str(e)}"}
343
 
344
  def _parse_llm_response(self, response_text: str) -> Dict[str, Any]:
345
- """Parse LLM response and extract JSON content"""
346
  try:
347
- # Find JSON content in the response
348
- json_start = response_text.find('{')
349
- json_end = response_text.rfind('}') + 1
350
 
351
- if json_start != -1 and json_end != -1:
352
- json_str = response_text[json_start:json_end]
353
- return json.loads(json_str)
354
- else:
355
- # If no JSON found, return the raw response
356
- return {'raw_response': response_text, 'parsing_error': 'No JSON found'}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
357
 
358
- except json.JSONDecodeError as e:
359
- return {'raw_response': response_text, 'parsing_error': f'JSON decode error: {str(e)}'}
360
  except Exception as e:
361
- return {'raw_response': response_text, 'parsing_error': f'Unexpected error: {str(e)}'}
 
 
 
 
362
 
363
- def _calculate_score_distribution(self, scores: Dict[str, float]) -> Dict[str, Any]:
364
- """Calculate distribution of scores for insights"""
365
- if not scores:
366
- return {}
 
 
 
 
 
 
 
367
 
368
- score_values = list(scores.values())
 
369
 
370
- return {
371
- 'highest_score': max(score_values),
372
- 'lowest_score': min(score_values),
373
- 'average_score': sum(score_values) / len(score_values),
374
- 'score_range': max(score_values) - min(score_values),
375
- 'scores_above_7': len([s for s in score_values if s >= 7.0]),
376
- 'scores_below_5': len([s for s in score_values if s < 5.0])
377
- }
378
-
379
- def _generate_executive_summary(self, analysis_results: Dict[str, Any]) -> str:
380
- """Generate executive summary based on analysis results"""
381
- overall_score = analysis_results.get('overall_score', 0)
382
- pages_analyzed = analysis_results.get('pages_analyzed', 0)
383
-
384
- if overall_score >= 8.0:
385
- performance = "excellent"
386
- elif overall_score >= 6.5:
387
- performance = "good"
388
- elif overall_score >= 5.0:
389
- performance = "moderate"
390
- else:
391
- performance = "needs improvement"
392
 
393
- return f"Analysis of {pages_analyzed} pages shows {performance} GEO performance with an overall score of {overall_score:.1f}/10. Key opportunities exist in {analysis_results.get('lowest_performing_metric', {}).get('metric', 'multiple areas')}."
394
-
395
- def _generate_performance_insights(self, analysis_results: Dict[str, Any]) -> List[str]:
396
- """Generate performance insights based on analysis"""
397
- insights = []
 
 
 
 
398
 
399
- best_metric = analysis_results.get('best_performing_metric', {})
400
- worst_metric = analysis_results.get('lowest_performing_metric', {})
 
401
 
402
- if best_metric.get('score', 0) >= 8.0:
403
- insights.append(f"Strong performance in {best_metric.get('metric', 'unknown')} (score: {best_metric.get('score', 0):.1f})")
404
 
405
- if worst_metric.get('score', 10) < 6.0:
406
- insights.append(f"Significant improvement needed in {worst_metric.get('metric', 'unknown')} (score: {worst_metric.get('score', 0):.1f})")
407
 
408
- score_dist = analysis_results.get('score_distribution', {})
409
- if score_dist.get('score_range', 0) > 3.0:
 
 
410
  insights.append("High variability in scores indicates inconsistent optimization across metrics")
 
 
411
 
412
  return insights
413
 
414
- def _prioritize_recommendations(self, recommendations: List[str]) -> List[Dict[str, Any]]:
415
- """Prioritize recommendations based on impact potential"""
416
- prioritized = []
417
-
418
- # Simple prioritization based on keywords
419
- high_impact_keywords = ['semantic', 'structure', 'authority', 'factual']
420
- medium_impact_keywords = ['readability', 'clarity', 'format']
421
-
422
- for i, rec in enumerate(recommendations):
423
- priority = 'low'
424
- if any(keyword in rec.lower() for keyword in high_impact_keywords):
425
- priority = 'high'
426
- elif any(keyword in rec.lower() for keyword in medium_impact_keywords):
427
- priority = 'medium'
428
-
429
- prioritized.append({
430
- 'recommendation': rec,
431
- 'priority': priority,
432
- 'order': i + 1
433
- })
434
 
435
- # Sort by priority
436
- priority_order = {'high': 1, 'medium': 2, 'low': 3}
437
- prioritized.sort(key=lambda x: priority_order[x['priority']])
438
 
439
- return prioritized
440
-
441
- def _create_optimization_roadmap(self, analysis_results: Dict[str, Any]) -> Dict[str, List[str]]:
442
- """Create a phased optimization roadmap"""
443
- roadmap = {
444
- 'immediate_actions': [],
445
- 'short_term_goals': [],
446
- 'long_term_strategy': []
 
447
  }
448
-
449
- overall_score = analysis_results.get('overall_score', 0)
450
- worst_metric = analysis_results.get('lowest_performing_metric', {})
451
-
452
- # Immediate actions based on worst performing metric
453
- if worst_metric.get('score', 10) < 5.0:
454
- roadmap['immediate_actions'].append(f"Address critical issues in {worst_metric.get('metric', 'low-scoring areas')}")
455
-
456
- # Short-term goals
457
- if overall_score < 7.0:
458
- roadmap['short_term_goals'].append("Improve overall GEO score to above 7.0")
459
- roadmap['short_term_goals'].append("Enhance content structure and semantic richness")
460
-
461
- # Long-term strategy
462
- roadmap['long_term_strategy'].append("Establish consistent GEO optimization process")
463
- roadmap['long_term_strategy'].append("Monitor and track AI search performance")
464
-
465
- return roadmap
466
 
467
- def _assess_competitive_position(self, analysis_results: Dict[str, Any]) -> Dict[str, Any]:
468
- """Assess competitive position based on scores"""
469
- overall_score = analysis_results.get('overall_score', 0)
470
-
471
- if overall_score >= 8.5:
472
- position = "market_leader"
473
- description = "Content is highly optimized for AI search engines"
474
- elif overall_score >= 7.0:
475
- position = "competitive"
476
- description = "Content performs well but has room for improvement"
477
- elif overall_score >= 5.5:
478
- position = "average"
479
- description = "Content meets basic standards but lacks optimization"
480
- else:
481
- position = "needs_work"
482
- description = "Content requires significant optimization for AI search"
483
 
484
  return {
485
- 'position': position,
486
- 'description': description,
487
- 'score': overall_score,
488
- 'percentile_estimate': min(overall_score * 10, 100) # Rough percentile estimate
 
489
  }
490
 
491
  def _get_timestamp(self) -> str:
492
  """Get current timestamp"""
493
- from datetime import datetime
494
- return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """
2
+ Improved GEO Scoring Module
3
  Analyzes content for Generative Engine Optimization (GEO) performance
4
  """
5
 
6
  import json
7
+ import re
8
+ import logging
9
+ import hashlib
10
+ import asyncio
11
+ from datetime import datetime
12
+ from typing import Dict, Any, List, Union, Optional, Tuple
13
+ from functools import lru_cache
14
+ from dataclasses import dataclass
15
  from langchain.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate
16
 
17
 
18
+ @dataclass
19
+ class GEOConfig:
20
+ """Configuration class for GEO scoring parameters"""
21
+ MAX_CONTENT_LENGTH: int = 8000
22
+ MIN_CONTENT_LENGTH: int = 100
23
+ QUICK_CONTENT_LENGTH: int = 4000
24
+ DEFAULT_TIMEOUT: int = 30
25
+ MAX_RETRIES: int = 3
26
+ CACHE_SIZE: int = 100
27
+ SMART_TRUNCATE_THRESHOLD: float = 0.8
28
+
29
+
30
+ class GEOValidator:
31
+ """Input validation utilities for GEO analysis"""
32
 
33
+ @staticmethod
34
+ def validate_content_inputs(content: str, title: str, config: GEOConfig) -> Tuple[bool, str]:
35
+ """Validate content and title inputs"""
36
+ if not isinstance(content, str) or not isinstance(title, str):
37
+ return False, "Content and title must be strings"
38
+
39
+ if len(content.strip()) < config.MIN_CONTENT_LENGTH:
40
+ return False, f"Content must be at least {config.MIN_CONTENT_LENGTH} characters"
41
+
42
+ if len(title.strip()) == 0:
43
+ return False, "Title cannot be empty"
44
+
45
+ if len(title) > 200:
46
+ return False, "Title too long (max 200 characters)"
47
+
48
+ return True, ""
49
 
50
+ @staticmethod
51
+ def validate_pages_data(pages_data: List[Dict[str, Any]]) -> Tuple[bool, str]:
52
+ """Validate pages data structure"""
53
+ if not isinstance(pages_data, list):
54
+ return False, "Pages data must be a list"
55
 
56
+ if len(pages_data) == 0:
57
+ return False, "Pages data cannot be empty"
58
+
59
+ for i, page in enumerate(pages_data):
60
+ if not isinstance(page, dict):
61
+ return False, f"Page {i} must be a dictionary"
62
+
63
+ if 'content' not in page:
64
+ return False, f"Page {i} missing 'content' field"
65
+
66
+ return True, ""
67
+
68
+
69
+ class GEOContentProcessor:
70
+ """Content processing utilities for GEO analysis"""
71
+
72
+ def __init__(self, config: GEOConfig):
73
+ self.config = config
74
+ self.dangerous_patterns = [
75
+ r'ignore\s+previous\s+instructions',
76
+ r'system\s*:',
77
+ r'assistant\s*:',
78
+ r'```json\s*{.*"prompt"',
79
+ r'<\s*system\s*>',
80
+ r'<\s*user\s*>',
81
+ r'forget\s+everything',
82
+ r'new\s+instructions\s*:',
83
+ ]
84
+
85
+ def sanitize_content(self, content: str) -> str:
86
+ """Sanitize content to prevent prompt injection"""
87
+ if not content:
88
+ return ""
89
+
90
+ # Remove potential prompt injection patterns
91
+ sanitized = content
92
+ for pattern in self.dangerous_patterns:
93
+ sanitized = re.sub(pattern, '[FILTERED]', sanitized, flags=re.IGNORECASE)
94
+
95
+ # Remove excessive whitespace
96
+ sanitized = re.sub(r'\s+', ' ', sanitized).strip()
97
+
98
+ # Hard limit on length
99
+ return sanitized[:self.config.MAX_CONTENT_LENGTH * 2]
100
+
101
+ def smart_truncate(self, content: str, max_length: int) -> str:
102
+ """Intelligently truncate content preserving meaning"""
103
+ if len(content) <= max_length:
104
+ return content
105
+
106
+ # Find last complete sentence within limit
107
+ truncated = content[:max_length]
108
+
109
+ # Look for sentence endings
110
+ sentence_endings = ['. ', '! ', '? ']
111
+ best_cut = -1
112
+
113
+ for ending in sentence_endings:
114
+ last_occurrence = truncated.rfind(ending)
115
+ if last_occurrence > max_length * self.config.SMART_TRUNCATE_THRESHOLD:
116
+ best_cut = max(best_cut, last_occurrence + len(ending) - 1)
117
+
118
+ if best_cut > 0:
119
+ return truncated[:best_cut]
120
+
121
+ # If no good sentence break, look for paragraph breaks
122
+ last_paragraph = truncated.rfind('\n\n')
123
+ if last_paragraph > max_length * self.config.SMART_TRUNCATE_THRESHOLD:
124
+ return truncated[:last_paragraph]
125
+
126
+ # If no good breaks, just truncate and add ellipsis
127
+ return truncated.rstrip() + "..."
128
+
129
+ def generate_content_hash(self, content: str, title: str, analysis_type: str) -> str:
130
+ """Generate hash for content caching"""
131
+ combined = f"{title}|{content}|{analysis_type}"
132
+ return hashlib.md5(combined.encode()).hexdigest()
133
+
134
+
135
+ class GEOPromptManager:
136
+ """Manages prompts for different types of GEO analysis"""
137
+
138
+ def __init__(self):
139
+ self.prompts = self._initialize_prompts()
140
+
141
+ def _initialize_prompts(self) -> Dict[str, str]:
142
+ """Initialize all prompts"""
143
+ return {
144
+ 'detailed_analysis': self._get_detailed_prompt(),
145
+ 'quick_analysis': self._get_quick_prompt(),
146
+ 'competitive_analysis': self._get_competitive_prompt()
147
+ }
148
+
149
+ def _get_detailed_prompt(self) -> str:
150
+ return """You are a Generative Engine Optimizer (GEO) specialist. Analyze the provided content for its effectiveness in AI-powered search engines and LLM systems.
151
 
152
  Evaluate the content based on these GEO criteria (score 1-10 each):
153
 
 
166
  - Optimization opportunities
167
  - Specific enhancement recommendations
168
 
169
+ IMPORTANT: Respond ONLY with valid JSON. Do not include any text before or after the JSON.
170
 
171
  {
172
  "geo_scores": {
 
194
  "Specific actionable recommendation 1",
195
  "Specific actionable recommendation 2"
196
  ]
197
+ }"""
198
+
199
+ def _get_quick_prompt(self) -> str:
200
+ return """Analyze this content for AI search optimization. Provide scores (1-10) for:
 
201
 
202
  1. AI Search Visibility
203
  2. Query Intent Matching
204
  3. Conversational Readiness
205
  4. Citation Worthiness
206
 
207
+ IMPORTANT: Respond ONLY with valid JSON. Do not include any text before or after the JSON.
208
+
209
  {
210
  "scores": {
211
  "ai_search_visibility": 7.5,
 
215
  },
216
  "overall_score": 7.5,
217
  "top_recommendation": "Most important improvement needed"
218
+ }"""
219
+
220
+ def _get_competitive_prompt(self) -> str:
221
+ return """Compare these content pieces for GEO performance. Identify which performs better for AI search and why.
 
 
 
222
 
223
+ IMPORTANT: Respond ONLY with valid JSON. Do not include any text before or after the JSON.
224
 
 
 
225
  {
226
+ "winner": "A",
227
  "score_comparison": {
228
  "content_a_score": 7.5,
229
  "content_b_score": 8.2
 
233
  "content_a": ["suggestion1"],
234
  "content_b": ["suggestion1"]
235
  }
236
+ }"""
237
+
238
+ def get_prompt(self, prompt_type: str) -> str:
239
+ """Get prompt by type"""
240
+ return self.prompts.get(prompt_type, self.prompts['detailed_analysis'])
241
+
242
+
243
+ class GEOScorer:
244
+ """Main class for calculating GEO scores and analysis"""
245
+
246
+ def __init__(self, llm, config: Optional[GEOConfig] = None, logger: Optional[logging.Logger] = None):
247
+ self.llm = llm
248
+ self.config = config or GEOConfig()
249
+ self.logger = logger or self._setup_logger()
250
+
251
+ # Initialize components
252
+ self.validator = GEOValidator()
253
+ self.processor = GEOContentProcessor(self.config)
254
+ self.prompt_manager = GEOPromptManager()
255
+
256
+ # Performance tracking
257
+ self.analysis_count = 0
258
+ self.cache_hits = 0
259
+
260
+ def _setup_logger(self) -> logging.Logger:
261
+ """Setup default logger"""
262
+ logger = logging.getLogger(__name__)
263
+ if not logger.handlers:
264
+ handler = logging.StreamHandler()
265
+ formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
266
+ handler.setFormatter(formatter)
267
+ logger.addHandler(handler)
268
+ logger.setLevel(logging.INFO)
269
+ return logger
270
+
271
+ @lru_cache(maxsize=100)
272
+ def _get_cached_analysis(self, content_hash: str) -> Optional[Dict[str, Any]]:
273
+ """Cache mechanism for repeated analyses"""
274
+ # This is a simple in-memory cache using lru_cache
275
+ # In production, you might want to use Redis or similar
276
+ return None
277
 
278
  def analyze_page_geo(self, content: str, title: str, detailed: bool = True) -> Dict[str, Any]:
279
  """
280
+ Analyze a single page for GEO performance with improved error handling and validation
281
  """
282
+ start_time = datetime.now()
283
+ self.analysis_count += 1
284
+
285
  try:
286
+ # Input validation
287
+ is_valid, error_msg = self.validator.validate_content_inputs(content, title, self.config)
288
+ if not is_valid:
289
+ self.logger.warning(f"Input validation failed: {error_msg}")
290
+ return {'error': error_msg, 'error_type': 'validation'}
291
+
292
+ # Check cache
293
+ analysis_type = 'detailed' if detailed else 'quick'
294
+ content_hash = self.processor.generate_content_hash(content, title, analysis_type)
295
+
296
+ # Process content
297
+ sanitized_content = self.processor.sanitize_content(content)
298
+ max_length = self.config.MAX_CONTENT_LENGTH if detailed else self.config.QUICK_CONTENT_LENGTH
299
+ processed_content = self.processor.smart_truncate(sanitized_content, max_length)
300
+
301
+ # Get appropriate prompt
302
+ prompt_type = 'detailed_analysis' if detailed else 'quick_analysis'
303
+ system_prompt = self.prompt_manager.get_prompt(prompt_type)
304
+ user_message = f"Title: {title}\n\nContent: {processed_content}"
305
+
306
+ # Build and execute prompt
307
  prompt_template = ChatPromptTemplate.from_messages([
308
  SystemMessagePromptTemplate.from_template(system_prompt),
309
  HumanMessagePromptTemplate.from_template(user_message)
310
  ])
311
+
 
312
  chain = prompt_template | self.llm
313
+ result = chain.invoke({})
314
+
315
  # Extract and parse result
316
  result_content = result.content if hasattr(result, 'content') else str(result)
317
  parsed_result = self._parse_llm_response(result_content)
318
+
319
  # Add metadata
320
+ processing_time = (datetime.now() - start_time).total_seconds()
321
  parsed_result.update({
322
  'analyzed_title': title,
323
  'content_length': len(content),
324
+ 'processed_content_length': len(processed_content),
325
  'word_count': len(content.split()),
326
+ 'analysis_type': analysis_type,
327
+ 'processing_time_seconds': processing_time,
328
+ 'content_hash': content_hash
329
  })
330
+
331
+ self.logger.info(f"Analysis completed for '{title}' in {processing_time:.2f}s")
332
  return parsed_result
333
 
334
+ except json.JSONDecodeError as e:
335
+ self.logger.error(f"JSON parsing failed for title '{title}': {e}")
336
+ return {'error': 'Invalid response format from LLM', 'error_type': 'parsing', 'title': title}
337
+
338
  except Exception as e:
339
+ self.logger.error(f"Analysis failed for title '{title}': {e}")
340
+ return {'error': f"Analysis failed: {str(e)}", 'error_type': 'system', 'title': title}
341
 
342
  def analyze_multiple_pages(self, pages_data: List[Dict[str, Any]], detailed: bool = True) -> List[Dict[str, Any]]:
343
  """
344
+ Analyze multiple pages with improved validation and error handling
 
 
 
 
 
 
 
345
  """
346
+ # Validate input
347
+ is_valid, error_msg = self.validator.validate_pages_data(pages_data)
348
+ if not is_valid:
349
+ self.logger.error(f"Pages data validation failed: {error_msg}")
350
+ return [{'error': error_msg, 'error_type': 'validation'}]
351
+
352
  results = []
353
+ successful_analyses = 0
354
+
355
+ self.logger.info(f"Starting analysis of {len(pages_data)} pages")
356
 
357
  for i, page_data in enumerate(pages_data):
358
  try:
 
368
  'source_word_count': page_data.get('word_count', 0)
369
  })
370
 
371
+ if 'error' not in analysis:
372
+ successful_analyses += 1
373
+
374
  results.append(analysis)
375
 
376
  except Exception as e:
377
+ self.logger.error(f"Failed to analyze page {i}: {e}")
378
  results.append({
379
  'page_index': i,
380
  'page_url': page_data.get('url', ''),
381
+ 'error': f"Analysis failed: {str(e)}",
382
+ 'error_type': 'system'
383
  })
384
 
385
+ self.logger.info(f"Completed analysis: {successful_analyses}/{len(pages_data)} successful")
386
  return results
387
 
388
+ def compare_content_geo(self, content_a: str, content_b: str, titles: Optional[Tuple[str, str]] = None) -> Dict[str, Any]:
389
  """
390
+ Compare two pieces of content for GEO performance with improved validation
 
 
 
 
 
 
 
 
391
  """
392
  try:
393
  title_a, title_b = titles if titles else ("Content A", "Content B")
394
 
395
+ # Validate inputs
396
+ is_valid_a, error_a = self.validator.validate_content_inputs(content_a, title_a, self.config)
397
+ is_valid_b, error_b = self.validator.validate_content_inputs(content_b, title_b, self.config)
398
+
399
+ if not is_valid_a:
400
+ return {'error': f"Content A validation failed: {error_a}", 'error_type': 'validation'}
401
+ if not is_valid_b:
402
+ return {'error': f"Content B validation failed: {error_b}", 'error_type': 'validation'}
403
 
404
+ # Process content
405
+ processed_a = self.processor.smart_truncate(
406
+ self.processor.sanitize_content(content_a),
407
+ self.config.QUICK_CONTENT_LENGTH
408
+ )
409
+ processed_b = self.processor.smart_truncate(
410
+ self.processor.sanitize_content(content_b),
411
+ self.config.QUICK_CONTENT_LENGTH
412
  )
413
 
414
+ # Build comparison prompt
415
+ system_prompt = self.prompt_manager.get_prompt('competitive_analysis')
416
+ user_message = f"""Content A:
417
+ Title: {title_a}
418
+ Content: {processed_a}
419
+
420
+ Content B:
421
+ Title: {title_b}
422
+ Content: {processed_b}"""
423
+
424
+ prompt_template = ChatPromptTemplate.from_messages([
425
+ SystemMessagePromptTemplate.from_template(system_prompt),
426
+ HumanMessagePromptTemplate.from_template(user_message)
427
+ ])
428
 
429
+ chain = prompt_template | self.llm
430
  result = chain.invoke({})
431
  result_content = result.content if hasattr(result, 'content') else str(result)
432
 
433
+ comparison_result = self._parse_llm_response(result_content)
434
+
435
+ # Add metadata
436
+ comparison_result.update({
437
+ 'content_a_title': title_a,
438
+ 'content_b_title': title_b,
439
+ 'content_a_length': len(content_a),
440
+ 'content_b_length': len(content_b)
441
+ })
442
+
443
+ return comparison_result
444
 
445
  except Exception as e:
446
+ self.logger.error(f"Comparison analysis failed: {e}")
447
+ return {'error': f"Comparison analysis failed: {str(e)}", 'error_type': 'system'}
448
 
449
  def calculate_aggregate_scores(self, individual_results: List[Dict[str, Any]]) -> Dict[str, Any]:
450
  """
451
+ Calculate aggregate GEO scores with improved error handling and insights
 
 
 
 
 
 
452
  """
453
  try:
454
+ # Filter out error results
455
  valid_results = [r for r in individual_results if 'geo_scores' in r and not r.get('error')]
456
+ error_results = [r for r in individual_results if r.get('error')]
457
 
458
  if not valid_results:
459
+ return {
460
+ 'error': 'No valid results to aggregate',
461
+ 'error_type': 'no_data',
462
+ 'total_errors': len(error_results),
463
+ 'error_breakdown': self._analyze_errors(error_results)
464
+ }
465
 
466
  # Calculate average scores
467
  score_keys = list(valid_results[0]['geo_scores'].keys())
468
  avg_scores = {}
469
+ score_details = {}
470
 
471
  for key in score_keys:
472
  scores = [r['geo_scores'][key] for r in valid_results if key in r['geo_scores']]
473
+ if scores:
474
+ avg_scores[key] = sum(scores) / len(scores)
475
+ score_details[key] = {
476
+ 'average': avg_scores[key],
477
+ 'min': min(scores),
478
+ 'max': max(scores),
479
+ 'count': len(scores)
480
+ }
481
+ else:
482
+ avg_scores[key] = 0
483
+ score_details[key] = {'average': 0, 'min': 0, 'max': 0, 'count': 0}
484
 
485
  overall_avg = sum(avg_scores.values()) / len(avg_scores) if avg_scores else 0
486
 
487
+ # Collect insights
488
+ insights = self._generate_aggregate_insights(valid_results, avg_scores)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
489
 
490
+ # Find performance patterns
491
  best_score = max(avg_scores.items(), key=lambda x: x[1]) if avg_scores else ('none', 0)
492
  worst_score = min(avg_scores.items(), key=lambda x: x[1]) if avg_scores else ('none', 0)
493
 
494
  return {
495
  'aggregate_scores': avg_scores,
496
+ 'score_details': score_details,
497
  'overall_score': overall_avg,
498
  'pages_analyzed': len(valid_results),
499
+ 'pages_with_errors': len(error_results),
500
+ 'success_rate': len(valid_results) / len(individual_results) if individual_results else 0,
501
  'best_performing_metric': {
502
  'metric': best_score[0],
503
  'score': best_score[1]
 
506
  'metric': worst_score[0],
507
  'score': worst_score[1]
508
  },
509
+ 'insights': insights,
510
+ 'score_distribution': self._calculate_score_distribution(avg_scores),
511
+ 'processing_stats': self._calculate_processing_stats(valid_results)
 
 
 
 
 
512
  }
513
 
514
  except Exception as e:
515
+ self.logger.error(f"Aggregation failed: {e}")
516
+ return {'error': f"Aggregation failed: {str(e)}", 'error_type': 'system'}
517
 
518
+ def get_performance_stats(self) -> Dict[str, Any]:
519
+ """Get performance statistics for the scorer"""
520
+ return {
521
+ 'total_analyses': self.analysis_count,
522
+ 'cache_hits': self.cache_hits,
523
+ 'cache_hit_rate': self.cache_hits / max(self.analysis_count, 1),
524
+ 'config': {
525
+ 'max_content_length': self.config.MAX_CONTENT_LENGTH,
526
+ 'cache_size': self.config.CACHE_SIZE
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
527
  }
528
+ }
 
 
 
 
529
 
530
  def _parse_llm_response(self, response_text: str) -> Dict[str, Any]:
531
+ """Enhanced LLM response parsing with better error handling"""
532
  try:
533
+ # Clean response text
534
+ cleaned_response = response_text.strip()
 
535
 
536
+ # Try to find JSON content
537
+ json_patterns = [
538
+ r'\{.*\}', # Look for JSON object
539
+ r'```json\s*(\{.*?\})\s*```', # JSON in code blocks
540
+ r'```\s*(\{.*?\})\s*```' # Generic code blocks
541
+ ]
542
+
543
+ for pattern in json_patterns:
544
+ matches = re.findall(pattern, cleaned_response, re.DOTALL)
545
+ if matches:
546
+ json_str = matches[0] if isinstance(matches[0], str) else matches[0]
547
+ try:
548
+ return json.loads(json_str)
549
+ except json.JSONDecodeError:
550
+ continue
551
+
552
+ # If no JSON patterns found, try parsing the entire response
553
+ try:
554
+ return json.loads(cleaned_response)
555
+ except json.JSONDecodeError:
556
+ pass
557
+
558
+ # Last resort: return structured error
559
+ return {
560
+ 'raw_response': response_text,
561
+ 'parsing_error': 'No valid JSON found in response',
562
+ 'error_type': 'parsing'
563
+ }
564
 
 
 
565
  except Exception as e:
566
+ return {
567
+ 'raw_response': response_text,
568
+ 'parsing_error': f'Unexpected parsing error: {str(e)}',
569
+ 'error_type': 'parsing'
570
+ }
571
 
572
+ def _analyze_errors(self, error_results: List[Dict[str, Any]]) -> Dict[str, int]:
573
+ """Analyze error patterns"""
574
+ error_breakdown = {}
575
+ for result in error_results:
576
+ error_type = result.get('error_type', 'unknown')
577
+ error_breakdown[error_type] = error_breakdown.get(error_type, 0) + 1
578
+ return error_breakdown
579
+
580
+ def _generate_aggregate_insights(self, valid_results: List[Dict[str, Any]], avg_scores: Dict[str, float]) -> List[str]:
581
+ """Generate insights from aggregate analysis"""
582
+ insights = []
583
 
584
+ if not avg_scores:
585
+ return ["No valid scores to analyze"]
586
 
587
+ overall_avg = sum(avg_scores.values()) / len(avg_scores)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
588
 
589
+ # Performance level insights
590
+ if overall_avg >= 8.0:
591
+ insights.append("Excellent overall GEO performance across analyzed content")
592
+ elif overall_avg >= 6.5:
593
+ insights.append("Good GEO performance with room for targeted improvements")
594
+ elif overall_avg >= 5.0:
595
+ insights.append("Moderate GEO performance - significant optimization opportunities exist")
596
+ else:
597
+ insights.append("Below-average GEO performance - comprehensive optimization needed")
598
 
599
+ # Specific metric insights
600
+ best_metric = max(avg_scores.items(), key=lambda x: x[1])
601
+ worst_metric = min(avg_scores.items(), key=lambda x: x[1])
602
 
603
+ if best_metric[1] >= 8.0:
604
+ insights.append(f"Strong performance in {best_metric[0].replace('_', ' ')} (score: {best_metric[1]:.1f})")
605
 
606
+ if worst_metric[1] < 6.0:
607
+ insights.append(f"Critical improvement needed in {worst_metric[0].replace('_', ' ')} (score: {worst_metric[1]:.1f})")
608
 
609
+ # Consistency insights
610
+ score_values = list(avg_scores.values())
611
+ score_range = max(score_values) - min(score_values)
612
+ if score_range > 3.0:
613
  insights.append("High variability in scores indicates inconsistent optimization across metrics")
614
+ elif score_range < 1.5:
615
+ insights.append("Consistent performance across all GEO metrics")
616
 
617
  return insights
618
 
619
+ def _calculate_score_distribution(self, scores: Dict[str, float]) -> Dict[str, Any]:
620
+ """Calculate enhanced score distribution statistics"""
621
+ if not scores:
622
+ return {}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
623
 
624
+ score_values = list(scores.values())
 
 
625
 
626
+ return {
627
+ 'highest_score': max(score_values),
628
+ 'lowest_score': min(score_values),
629
+ 'average_score': sum(score_values) / len(score_values),
630
+ 'score_range': max(score_values) - min(score_values),
631
+ 'scores_above_8': len([s for s in score_values if s >= 8.0]),
632
+ 'scores_above_7': len([s for s in score_values if s >= 7.0]),
633
+ 'scores_below_5': len([s for s in score_values if s < 5.0]),
634
+ 'score_variance': sum((s - sum(score_values)/len(score_values))**2 for s in score_values) / len(score_values)
635
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
636
 
637
+ def _calculate_processing_stats(self, valid_results: List[Dict[str, Any]]) -> Dict[str, Any]:
638
+ """Calculate processing statistics"""
639
+ processing_times = [r.get('processing_time_seconds', 0) for r in valid_results if 'processing_time_seconds' in r]
640
+ content_lengths = [r.get('content_length', 0) for r in valid_results if 'content_length' in r]
641
+
642
+ if not processing_times:
643
+ return {}
 
 
 
 
 
 
 
 
 
644
 
645
  return {
646
+ 'avg_processing_time': sum(processing_times) / len(processing_times),
647
+ 'max_processing_time': max(processing_times),
648
+ 'min_processing_time': min(processing_times),
649
+ 'avg_content_length': sum(content_lengths) / len(content_lengths) if content_lengths else 0,
650
+ 'total_processing_time': sum(processing_times)
651
  }
652
 
653
  def _get_timestamp(self) -> str:
654
  """Get current timestamp"""
655
+ return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
656
+
657
+
658
+ # Example usage and testing utilities
659
+ class GEOScorerTester:
660
+ """Testing utilities for GEOScorer"""
661
+
662
+ @staticmethod
663
+ def create_test_content() -> List[Dict[str, Any]]:
664
+ """Create test content for validation"""
665
+ return [
666
+ {
667
+ 'title': 'How to Optimize Content for AI Search',
668
+ 'content': 'AI search engines are revolutionizing how people find information. To optimize your content for AI-powered search, focus on creating comprehensive, factual, and well-structured content that directly answers user questions. Use semantic keywords, provide clear context, and ensure your content is authoritative and cite-worthy.',
669
+ 'url': 'https://example.com/ai-search-optimization'
670
+ },
671
+ {
672
+ 'title': 'Best Practices for GEO',
673
+ 'content': 'Generative Engine Optimization (GEO) requires a different approach than traditional SEO. Focus on conversational readiness, semantic richness, and multi-query coverage. Ensure your content provides complete answers and is structured in a way that AI systems can easily understand and cite.',
674
+ 'url': 'https://example.com/geo-best-practices'
675
+ }
676
+ ]
677
+
678
+ @staticmethod
679
+ def run_basic_test(scorer: GEOScorer) -> Dict[str, Any]:
680
+ """Run basic functionality test"""
681
+ test_content = GEOScorerTester.create_test_content()
682
+ results = scorer.analyze_multiple_pages(test_content, detailed=False)
683
+ aggregate = scorer.calculate_aggregate_scores(results)
684
+ stats = scorer.get_performance_stats()
685
+
686
+ return {
687
+ 'individual_results': results,
688
+ 'aggregate_results': aggregate,
689
+ 'performance_stats': stats
690
+ }