Update utils/scorer.py

#3
by Alpha108 - opened
Files changed (1) hide show
  1. utils/scorer.py +277 -441
utils/scorer.py CHANGED
@@ -1,494 +1,330 @@
1
  """
2
- GEO Scoring Module
3
- Analyzes content for Generative Engine Optimization (GEO) performance
4
  """
5
 
6
- import json
7
- from typing import Dict, Any, List
8
- from langchain.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate
9
 
10
-
11
- class GEOScorer:
12
- """Main class for calculating GEO scores and analysis"""
13
-
14
- def __init__(self, llm):
15
- self.llm = llm
16
- self.setup_prompts()
17
 
18
- def setup_prompts(self):
19
- """Initialize prompts for different types of analysis"""
20
-
21
- # Main GEO analysis prompt
22
- self.geo_analysis_prompt = """You are a Generative Engine Optimizer (GEO) specialist. Analyze the provided content for its effectiveness in AI-powered search engines and LLM systems.
23
-
24
- Evaluate the content based on these GEO criteria (score 1-10 each):
25
-
26
- 1. **AI Search Visibility**: How likely is this content to be surfaced by AI search engines?
27
- 2. **Query Intent Matching**: How well does the content match common user queries?
28
- 3. **Factual Accuracy & Authority**: How trustworthy and authoritative is the information?
29
- 4. **Conversational Readiness**: How suitable is the content for AI chat responses?
30
- 5. **Semantic Richness**: How well does the content use relevant semantic keywords?
31
- 6. **Context Completeness**: Does the content provide complete, self-contained answers?
32
- 7. **Citation Worthiness**: How likely are AI systems to cite this content?
33
- 8. **Multi-Query Coverage**: Does the content answer multiple related questions?
34
-
35
- Also identify:
36
- - Primary topics and entities
37
- - Missing information gaps
38
- - Optimization opportunities
39
- - Specific enhancement recommendations
40
-
41
- Format your response as JSON (do NOT use curly braces for variables):
42
-
43
- {
44
- "geo_scores": {
45
- "ai_search_visibility": 7.5,
46
- "query_intent_matching": 8.0,
47
- "factual_accuracy": 9.0,
48
- "conversational_readiness": 6.5,
49
- "semantic_richness": 7.0,
50
- "context_completeness": 8.5,
51
- "citation_worthiness": 7.8,
52
- "multi_query_coverage": 6.0
53
- },
54
- "overall_geo_score": 7.5,
55
- "primary_topics": ["topic1", "topic2"],
56
- "entities": ["entity1", "entity2"],
57
- "missing_gaps": ["gap1", "gap2"],
58
- "optimization_opportunities": [
59
- {
60
- "type": "semantic_enhancement",
61
- "description": "Add more related terms",
62
- "priority": "high"
63
- }
64
- ],
65
- "recommendations": [
66
- "Specific actionable recommendation 1",
67
- "Specific actionable recommendation 2"
68
- ]
69
- }
70
- """
71
-
72
- # Quick scoring prompt for faster analysis
73
- self.quick_score_prompt = """Analyze this content for AI search optimization. Provide scores (1-10) for:
74
-
75
- 1. AI Search Visibility
76
- 2. Query Intent Matching
77
- 3. Conversational Readiness
78
- 4. Citation Worthiness
79
-
80
- Respond in JSON format:
81
- ```json
82
- {
83
- "scores": {
84
- "ai_search_visibility": 7.5,
85
- "query_intent_matching": 8.0,
86
- "conversational_readiness": 6.5,
87
- "citation_worthiness": 7.8
88
- },
89
- "overall_score": 7.5,
90
- "top_recommendation": "Most important improvement needed"
91
- }
92
- ```"""
93
-
94
- # Competitive analysis prompt
95
- self.competitive_prompt = """Compare these content pieces for GEO performance. Identify which performs better for AI search and why.
96
-
97
- Content A: {content_a}
98
-
99
- Content B: {content_b}
100
-
101
- Provide analysis in JSON:
102
- ```json
103
- {
104
- "winner": "A" or "B",
105
- "score_comparison": {
106
- "content_a_score": 7.5,
107
- "content_b_score": 8.2
108
- },
109
- "key_differences": ["difference1", "difference2"],
110
- "improvement_suggestions": {
111
- "content_a": ["suggestion1"],
112
- "content_b": ["suggestion1"]
113
- }
114
- }
115
- ```"""
116
 
117
- def analyze_page_geo(self, content: str, title: str, detailed: bool = True) -> Dict[str, Any]:
118
  """
119
- Analyze a single page for GEO performance
 
 
 
 
 
 
120
  """
121
  try:
122
- # Choose prompt based on detail level
123
- if detailed:
124
- system_prompt = self.geo_analysis_prompt
125
- user_message = f"Title: {title}\n\nContent: {content[:8000]}"
 
 
 
126
  else:
127
- system_prompt = self.quick_score_prompt
128
- user_message = f"Title: {title}\n\nContent: {content[:4000]}"
129
-
130
- # Build prompt and run analysis
131
- prompt_template = ChatPromptTemplate.from_messages([
132
- SystemMessagePromptTemplate.from_template(system_prompt),
133
- HumanMessagePromptTemplate.from_template(user_message)
134
- ])
135
- # ("user", user_message)
136
- # ("system", system_prompt),
137
- chain = prompt_template | self.llm
138
- result = chain.invoke({}) # No variables needed
139
-
140
- # Extract and parse result
141
- result_content = result.content if hasattr(result, 'content') else str(result)
142
- parsed_result = self._parse_llm_response(result_content)
143
-
144
- # Add metadata
145
- parsed_result.update({
146
- 'analyzed_title': title,
147
- 'content_length': len(content),
148
- 'word_count': len(content.split()),
149
- 'analysis_type': 'detailed' if detailed else 'quick'
150
- })
151
-
152
- return parsed_result
153
-
154
  except Exception as e:
155
- return {'error': f"GEO analysis failed: {str(e)}"}
 
156
 
157
- def analyze_multiple_pages(self, pages_data: List[Dict[str, Any]], detailed: bool = True) -> List[Dict[str, Any]]:
158
- """
159
- Analyze multiple pages and return consolidated results
160
 
161
- Args:
162
- pages_data (List[Dict]): List of page data with content and metadata
163
- detailed (bool): Whether to perform detailed analysis
164
-
165
- Returns:
166
- List[Dict]: List of GEO analysis results
167
- """
168
- results = []
169
 
170
- for i, page_data in enumerate(pages_data):
171
- try:
172
- content = page_data.get('content', '')
173
- title = page_data.get('title', f'Page {i+1}')
174
-
175
- analysis = self.analyze_page_geo(content, title, detailed)
176
-
177
- # Add page-specific metadata
178
- analysis.update({
179
- 'page_url': page_data.get('url', ''),
180
- 'page_index': i,
181
- 'source_word_count': page_data.get('word_count', 0)
182
- })
183
-
184
- results.append(analysis)
185
-
186
- except Exception as e:
187
- results.append({
188
- 'page_index': i,
189
- 'page_url': page_data.get('url', ''),
190
- 'error': f"Analysis failed: {str(e)}"
191
- })
192
 
193
- return results
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
194
 
195
- def compare_content_geo(self, content_a: str, content_b: str, titles: tuple = None) -> Dict[str, Any]:
196
- """
197
- Compare two pieces of content for GEO performance
198
 
199
- Args:
200
- content_a (str): First content to compare
201
- content_b (str): Second content to compare
202
- titles (tuple): Optional titles for the content pieces
203
-
204
- Returns:
205
- Dict: Comparison analysis results
206
- """
207
- try:
208
- title_a, title_b = titles if titles else ("Content A", "Content B")
209
-
210
- prompt_template = ChatPromptTemplate.from_messages([
211
- ("system", self.competitive_prompt),
212
- ("user", "")
213
- ])
214
-
215
- # Format the competitive analysis prompt
216
- formatted_prompt = self.competitive_prompt.format(
217
- content_a=f"Title: {title_a}\nContent: {content_a[:4000]}",
218
- content_b=f"Title: {title_b}\nContent: {content_b[:4000]}"
219
- )
220
-
221
- chain = ChatPromptTemplate.from_messages([
222
- ("system", formatted_prompt),
223
- ("user", "Perform the comparison analysis.")
224
- ]) | self.llm
225
 
226
- result = chain.invoke({})
227
- result_content = result.content if hasattr(result, 'content') else str(result)
 
 
 
228
 
229
- return self._parse_llm_response(result_content)
 
230
 
231
- except Exception as e:
232
- return {'error': f"Comparison analysis failed: {str(e)}"}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
233
 
234
- def calculate_aggregate_scores(self, individual_results: List[Dict[str, Any]]) -> Dict[str, Any]:
 
 
 
 
235
  """
236
- Calculate aggregate GEO scores from multiple page analyses
237
 
238
  Args:
239
- individual_results (List[Dict]): List of individual page analysis results
 
240
 
241
  Returns:
242
- Dict: Aggregate scores and insights
243
  """
 
 
244
  try:
245
- valid_results = [r for r in individual_results if 'geo_scores' in r and not r.get('error')]
246
-
247
- if not valid_results:
248
- return {'error': 'No valid results to aggregate'}
249
-
250
- # Calculate average scores
251
- score_keys = list(valid_results[0]['geo_scores'].keys())
252
- avg_scores = {}
253
 
254
- for key in score_keys:
255
- scores = [r['geo_scores'][key] for r in valid_results if key in r['geo_scores']]
256
- avg_scores[key] = sum(scores) / len(scores) if scores else 0
 
 
 
 
257
 
258
- overall_avg = sum(avg_scores.values()) / len(avg_scores) if avg_scores else 0
 
259
 
260
- # Collect all recommendations and opportunities
261
- all_recommendations = []
262
- all_opportunities = []
263
- all_topics = []
264
- all_entities = []
265
 
266
- for result in valid_results:
267
- all_recommendations.extend(result.get('recommendations', []))
268
- all_opportunities.extend(result.get('optimization_opportunities', []))
269
- all_topics.extend(result.get('primary_topics', []))
270
- all_entities.extend(result.get('entities', []))
271
 
272
- # Remove duplicates and prioritize
273
- unique_recommendations = list(set(all_recommendations))
274
- unique_topics = list(set(all_topics))
275
- unique_entities = list(set(all_entities))
 
 
 
 
 
 
 
 
276
 
277
- # Find highest and lowest performing areas
278
- best_score = max(avg_scores.items(), key=lambda x: x[1]) if avg_scores else ('none', 0)
279
- worst_score = min(avg_scores.items(), key=lambda x: x[1]) if avg_scores else ('none', 0)
280
 
 
 
281
  return {
282
- 'aggregate_scores': avg_scores,
283
- 'overall_score': overall_avg,
284
- 'pages_analyzed': len(valid_results),
285
- 'best_performing_metric': {
286
- 'metric': best_score[0],
287
- 'score': best_score[1]
288
- },
289
- 'lowest_performing_metric': {
290
- 'metric': worst_score[0],
291
- 'score': worst_score[1]
292
- },
293
- 'consolidated_recommendations': unique_recommendations[:10],
294
- 'all_topics': unique_topics,
295
- 'all_entities': unique_entities,
296
- 'high_priority_opportunities': [
297
- opp for opp in all_opportunities
298
- if opp.get('priority') == 'high'
299
- ][:5],
300
- 'score_distribution': self._calculate_score_distribution(avg_scores)
301
  }
302
-
303
- except Exception as e:
304
- return {'error': f"Aggregation failed: {str(e)}"}
 
 
 
305
 
306
- def generate_geo_report(self, analysis_results: Dict[str, Any], website_url: str = None) -> Dict[str, Any]:
307
- """
308
- Generate a comprehensive GEO report
309
 
310
- Args:
311
- analysis_results (Dict): Results from aggregate analysis
312
- website_url (str): Optional website URL for context
313
-
314
- Returns:
315
- Dict: Comprehensive GEO report
316
- """
317
- try:
318
- report = {
319
- 'report_metadata': {
320
- 'generated_at': self._get_timestamp(),
321
- 'website_url': website_url,
322
- 'analysis_type': 'GEO Performance Report'
323
- },
324
- 'executive_summary': self._generate_executive_summary(analysis_results),
325
- 'detailed_scores': analysis_results.get('aggregate_scores', {}),
326
- 'performance_insights': self._generate_performance_insights(analysis_results),
327
- 'actionable_recommendations': self._prioritize_recommendations(
328
- analysis_results.get('consolidated_recommendations', [])
329
- ),
330
- 'optimization_roadmap': self._create_optimization_roadmap(analysis_results),
331
- 'competitive_position': self._assess_competitive_position(analysis_results),
332
- 'technical_details': {
333
- 'pages_analyzed': analysis_results.get('pages_analyzed', 0),
334
- 'overall_score': analysis_results.get('overall_score', 0),
335
- 'score_distribution': analysis_results.get('score_distribution', {})
336
- }
337
  }
338
 
339
- return report
 
 
 
 
 
 
340
 
341
- except Exception as e:
342
- return {'error': f"Report generation failed: {str(e)}"}
343
-
344
- def _parse_llm_response(self, response_text: str) -> Dict[str, Any]:
345
- """Parse LLM response and extract JSON content"""
346
- try:
347
- # Find JSON content in the response
348
- json_start = response_text.find('{')
349
- json_end = response_text.rfind('}') + 1
350
 
351
- if json_start != -1 and json_end != -1:
352
- json_str = response_text[json_start:json_end]
353
- return json.loads(json_str)
354
  else:
355
- # If no JSON found, return the raw response
356
- return {'raw_response': response_text, 'parsing_error': 'No JSON found'}
357
 
358
- except json.JSONDecodeError as e:
359
- return {'raw_response': response_text, 'parsing_error': f'JSON decode error: {str(e)}'}
360
- except Exception as e:
361
- return {'raw_response': response_text, 'parsing_error': f'Unexpected error: {str(e)}'}
 
 
 
 
 
 
 
 
 
 
 
362
 
363
- def _calculate_score_distribution(self, scores: Dict[str, float]) -> Dict[str, Any]:
364
- """Calculate distribution of scores for insights"""
365
- if not scores:
366
- return {}
367
-
368
- score_values = list(scores.values())
369
-
370
- return {
371
- 'highest_score': max(score_values),
372
- 'lowest_score': min(score_values),
373
- 'average_score': sum(score_values) / len(score_values),
374
- 'score_range': max(score_values) - min(score_values),
375
- 'scores_above_7': len([s for s in score_values if s >= 7.0]),
376
- 'scores_below_5': len([s for s in score_values if s < 5.0])
 
 
 
 
 
 
 
 
 
 
 
 
 
 
377
  }
 
378
 
379
- def _generate_executive_summary(self, analysis_results: Dict[str, Any]) -> str:
380
- """Generate executive summary based on analysis results"""
381
- overall_score = analysis_results.get('overall_score', 0)
382
- pages_analyzed = analysis_results.get('pages_analyzed', 0)
383
-
384
- if overall_score >= 8.0:
385
- performance = "excellent"
386
- elif overall_score >= 6.5:
387
- performance = "good"
388
- elif overall_score >= 5.0:
389
- performance = "moderate"
390
- else:
391
- performance = "needs improvement"
392
-
393
- return f"Analysis of {pages_analyzed} pages shows {performance} GEO performance with an overall score of {overall_score:.1f}/10. Key opportunities exist in {analysis_results.get('lowest_performing_metric', {}).get('metric', 'multiple areas')}."
394
 
395
- def _generate_performance_insights(self, analysis_results: Dict[str, Any]) -> List[str]:
396
- """Generate performance insights based on analysis"""
397
- insights = []
398
-
399
- best_metric = analysis_results.get('best_performing_metric', {})
400
- worst_metric = analysis_results.get('lowest_performing_metric', {})
401
-
402
- if best_metric.get('score', 0) >= 8.0:
403
- insights.append(f"Strong performance in {best_metric.get('metric', 'unknown')} (score: {best_metric.get('score', 0):.1f})")
404
-
405
- if worst_metric.get('score', 10) < 6.0:
406
- insights.append(f"Significant improvement needed in {worst_metric.get('metric', 'unknown')} (score: {worst_metric.get('score', 0):.1f})")
407
-
408
- score_dist = analysis_results.get('score_distribution', {})
409
- if score_dist.get('score_range', 0) > 3.0:
410
- insights.append("High variability in scores indicates inconsistent optimization across metrics")
411
-
412
- return insights
413
 
414
- def _prioritize_recommendations(self, recommendations: List[str]) -> List[Dict[str, Any]]:
415
- """Prioritize recommendations based on impact potential"""
416
- prioritized = []
417
-
418
- # Simple prioritization based on keywords
419
- high_impact_keywords = ['semantic', 'structure', 'authority', 'factual']
420
- medium_impact_keywords = ['readability', 'clarity', 'format']
421
-
422
- for i, rec in enumerate(recommendations):
423
- priority = 'low'
424
- if any(keyword in rec.lower() for keyword in high_impact_keywords):
425
- priority = 'high'
426
- elif any(keyword in rec.lower() for keyword in medium_impact_keywords):
427
- priority = 'medium'
428
-
429
- prioritized.append({
430
- 'recommendation': rec,
431
- 'priority': priority,
432
- 'order': i + 1
433
- })
434
-
435
- # Sort by priority
436
- priority_order = {'high': 1, 'medium': 2, 'low': 3}
437
- prioritized.sort(key=lambda x: priority_order[x['priority']])
438
-
439
- return prioritized
440
 
441
- def _create_optimization_roadmap(self, analysis_results: Dict[str, Any]) -> Dict[str, List[str]]:
442
- """Create a phased optimization roadmap"""
443
- roadmap = {
444
- 'immediate_actions': [],
445
- 'short_term_goals': [],
446
- 'long_term_strategy': []
447
- }
448
-
449
- overall_score = analysis_results.get('overall_score', 0)
450
- worst_metric = analysis_results.get('lowest_performing_metric', {})
451
-
452
- # Immediate actions based on worst performing metric
453
- if worst_metric.get('score', 10) < 5.0:
454
- roadmap['immediate_actions'].append(f"Address critical issues in {worst_metric.get('metric', 'low-scoring areas')}")
455
-
456
- # Short-term goals
457
- if overall_score < 7.0:
458
- roadmap['short_term_goals'].append("Improve overall GEO score to above 7.0")
459
- roadmap['short_term_goals'].append("Enhance content structure and semantic richness")
460
-
461
- # Long-term strategy
462
- roadmap['long_term_strategy'].append("Establish consistent GEO optimization process")
463
- roadmap['long_term_strategy'].append("Monitor and track AI search performance")
464
-
465
- return roadmap
466
 
467
- def _assess_competitive_position(self, analysis_results: Dict[str, Any]) -> Dict[str, Any]:
468
- """Assess competitive position based on scores"""
469
- overall_score = analysis_results.get('overall_score', 0)
470
-
471
- if overall_score >= 8.5:
472
- position = "market_leader"
473
- description = "Content is highly optimized for AI search engines"
474
- elif overall_score >= 7.0:
475
- position = "competitive"
476
- description = "Content performs well but has room for improvement"
477
- elif overall_score >= 5.5:
478
- position = "average"
479
- description = "Content meets basic standards but lacks optimization"
480
- else:
481
- position = "needs_work"
482
- description = "Content requires significant optimization for AI search"
483
-
484
- return {
485
- 'position': position,
486
- 'description': description,
487
- 'score': overall_score,
488
- 'percentile_estimate': min(overall_score * 10, 100) # Rough percentile estimate
489
- }
490
 
491
- def _get_timestamp(self) -> str:
492
- """Get current timestamp"""
493
- from datetime import datetime
494
- return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
 
 
 
 
 
 
1
  """
2
+ GEO Scorer Data Integration Fix
3
+ Handles various data formats from web scrapers and ensures compatibility
4
  """
5
 
6
+ import logging
7
+ from typing import Dict, Any, List, Union, Optional
 
8
 
9
+ class GEODataAdapter:
10
+ """Adapter to handle different data formats from web scrapers"""
 
 
 
 
 
11
 
12
+ def __init__(self, logger: Optional[logging.Logger] = None):
13
+ self.logger = logger or logging.getLogger(__name__)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14
 
15
+ def normalize_scraped_data(self, scraped_data: Union[Dict, List]) -> List[Dict[str, Any]]:
16
  """
17
+ Normalize scraped data to the format expected by GEOScorer
18
+
19
+ Args:
20
+ scraped_data: Raw data from web scraper (various formats)
21
+
22
+ Returns:
23
+ List[Dict]: Normalized data ready for GEO analysis
24
  """
25
  try:
26
+ # Handle different input formats
27
+ if isinstance(scraped_data, dict):
28
+ # Single page data
29
+ normalized = [self._normalize_single_page(scraped_data)]
30
+ elif isinstance(scraped_data, list):
31
+ # Multiple pages
32
+ normalized = [self._normalize_single_page(page) for page in scraped_data]
33
  else:
34
+ raise ValueError(f"Unsupported data type: {type(scraped_data)}")
35
+
36
+ # Filter out invalid entries
37
+ valid_pages = [page for page in normalized if page.get('content')]
38
+
39
+ self.logger.info(f"Normalized {len(valid_pages)} valid pages from {len(normalized) if isinstance(normalized, list) else 1} total")
40
+
41
+ return valid_pages
42
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43
  except Exception as e:
44
+ self.logger.error(f"Data normalization failed: {e}")
45
+ return []
46
 
47
+ def _normalize_single_page(self, page_data: Dict[str, Any]) -> Dict[str, Any]:
48
+ """Normalize a single page's data structure"""
 
49
 
50
+ # Common field mappings from different scrapers
51
+ content_fields = ['content', 'text', 'body', 'html_content', 'page_content', 'main_content']
52
+ title_fields = ['title', 'page_title', 'heading', 'h1', 'name']
53
+ url_fields = ['url', 'link', 'page_url', 'source_url', 'href']
 
 
 
 
54
 
55
+ # Extract content (try multiple possible field names)
56
+ content = ""
57
+ for field in content_fields:
58
+ if field in page_data and page_data[field]:
59
+ content = str(page_data[field])
60
+ break
61
+
62
+ # Extract title
63
+ title = "Untitled Page"
64
+ for field in title_fields:
65
+ if field in page_data and page_data[field]:
66
+ title = str(page_data[field])
67
+ break
 
 
 
 
 
 
 
 
 
68
 
69
+ # Extract URL
70
+ url = ""
71
+ for field in url_fields:
72
+ if field in page_data and page_data[field]:
73
+ url = str(page_data[field])
74
+ break
75
+
76
+ # Create normalized structure
77
+ normalized = {
78
+ 'content': content,
79
+ 'title': title,
80
+ 'url': url,
81
+ 'word_count': len(content.split()) if content else 0,
82
+ 'original_data': page_data # Keep original for debugging
83
+ }
84
+
85
+ # Add any additional metadata
86
+ metadata_fields = ['description', 'keywords', 'author', 'date', 'meta_description']
87
+ for field in metadata_fields:
88
+ if field in page_data:
89
+ normalized[field] = page_data[field]
90
+
91
+ return normalized
92
 
93
+ def validate_normalized_data(self, normalized_data: List[Dict[str, Any]]) -> Dict[str, Any]:
94
+ """Validate normalized data and provide diagnostics"""
 
95
 
96
+ validation_results = {
97
+ 'total_pages': len(normalized_data),
98
+ 'valid_pages': 0,
99
+ 'invalid_pages': 0,
100
+ 'issues': [],
101
+ 'summary': {}
102
+ }
103
+
104
+ for i, page in enumerate(normalized_data):
105
+ issues = []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
106
 
107
+ # Check required fields
108
+ if not page.get('content'):
109
+ issues.append(f"Page {i}: Missing or empty content")
110
+ elif len(page['content'].strip()) < 50:
111
+ issues.append(f"Page {i}: Content too short ({len(page['content'])} chars)")
112
 
113
+ if not page.get('title'):
114
+ issues.append(f"Page {i}: Missing title")
115
 
116
+ if issues:
117
+ validation_results['invalid_pages'] += 1
118
+ validation_results['issues'].extend(issues)
119
+ else:
120
+ validation_results['valid_pages'] += 1
121
+
122
+ # Generate summary
123
+ content_lengths = [len(page.get('content', '')) for page in normalized_data if page.get('content')]
124
+ if content_lengths:
125
+ validation_results['summary'] = {
126
+ 'avg_content_length': sum(content_lengths) / len(content_lengths),
127
+ 'min_content_length': min(content_lengths),
128
+ 'max_content_length': max(content_lengths),
129
+ 'pages_with_titles': len([p for p in normalized_data if p.get('title') and p['title'] != 'Untitled Page']),
130
+ 'pages_with_urls': len([p for p in normalized_data if p.get('url')])
131
+ }
132
+
133
+ return validation_results
134
+
135
+
136
+ class GEOScorerWithAdapter(GEOScorer):
137
+ """Extended GEOScorer with built-in data adaptation"""
138
 
139
+ def __init__(self, llm, config: Optional[GEOConfig] = None, logger: Optional[logging.Logger] = None):
140
+ super().__init__(llm, config, logger)
141
+ self.data_adapter = GEODataAdapter(logger)
142
+
143
+ def analyze_scraped_data(self, scraped_data: Union[Dict, List], detailed: bool = True) -> Dict[str, Any]:
144
  """
145
+ Analyze scraped data with automatic format detection and normalization
146
 
147
  Args:
148
+ scraped_data: Raw scraped data in any format
149
+ detailed: Whether to perform detailed analysis
150
 
151
  Returns:
152
+ Dict: Complete analysis results with diagnostics
153
  """
154
+ self.logger.info("Starting analysis of scraped data")
155
+
156
  try:
157
+ # Step 1: Normalize the data
158
+ normalized_data = self.data_adapter.normalize_scraped_data(scraped_data)
 
 
 
 
 
 
159
 
160
+ if not normalized_data:
161
+ return {
162
+ 'error': 'No valid data could be extracted from scraped content',
163
+ 'error_type': 'data_normalization',
164
+ 'original_data_type': str(type(scraped_data)),
165
+ 'original_data_sample': str(scraped_data)[:200] if scraped_data else None
166
+ }
167
 
168
+ # Step 2: Validate normalized data
169
+ validation_results = self.data_adapter.validate_normalized_data(normalized_data)
170
 
171
+ # Step 3: Analyze valid pages
172
+ analysis_results = self.analyze_multiple_pages(normalized_data, detailed)
 
 
 
173
 
174
+ # Step 4: Calculate aggregate scores
175
+ aggregate_results = self.calculate_aggregate_scores(analysis_results)
 
 
 
176
 
177
+ # Step 5: Combine all results
178
+ complete_results = {
179
+ 'data_validation': validation_results,
180
+ 'individual_analyses': analysis_results,
181
+ 'aggregate_scores': aggregate_results,
182
+ 'processing_summary': {
183
+ 'pages_scraped': validation_results['total_pages'],
184
+ 'pages_analyzed': len([r for r in analysis_results if not r.get('error')]),
185
+ 'overall_success_rate': validation_results['valid_pages'] / max(validation_results['total_pages'], 1),
186
+ 'analysis_type': 'detailed' if detailed else 'quick'
187
+ }
188
+ }
189
 
190
+ self.logger.info(f"Analysis completed: {complete_results['processing_summary']}")
191
+ return complete_results
 
192
 
193
+ except Exception as e:
194
+ self.logger.error(f"Scraped data analysis failed: {e}")
195
  return {
196
+ 'error': f'Analysis failed: {str(e)}',
197
+ 'error_type': 'system',
198
+ 'original_data_type': str(type(scraped_data)),
199
+ 'traceback': str(e)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
200
  }
201
+
202
+
203
+ # Debugging utility functions
204
+ def debug_scraped_data(scraped_data: Union[Dict, List]) -> Dict[str, Any]:
205
+ """
206
+ Debug utility to understand the structure of scraped data
207
 
208
+ Args:
209
+ scraped_data: The raw scraped data causing issues
 
210
 
211
+ Returns:
212
+ Dict: Detailed breakdown of the data structure
213
+ """
214
+ debug_info = {
215
+ 'data_type': str(type(scraped_data)),
216
+ 'data_structure': {},
217
+ 'sample_content': {},
218
+ 'recommendations': []
219
+ }
220
+
221
+ try:
222
+ if isinstance(scraped_data, dict):
223
+ debug_info['data_structure'] = {
224
+ 'is_dict': True,
225
+ 'keys': list(scraped_data.keys()),
226
+ 'key_count': len(scraped_data.keys())
 
 
 
 
 
 
 
 
 
 
 
227
  }
228
 
229
+ # Sample first few key-value pairs
230
+ for i, (key, value) in enumerate(list(scraped_data.items())[:5]):
231
+ debug_info['sample_content'][key] = {
232
+ 'type': str(type(value)),
233
+ 'length': len(str(value)) if value else 0,
234
+ 'sample': str(value)[:100] if value else None
235
+ }
236
 
237
+ # Check for common content fields
238
+ content_fields = ['content', 'text', 'body', 'html_content', 'page_content']
239
+ found_content_fields = [field for field in content_fields if field in scraped_data]
 
 
 
 
 
 
240
 
241
+ if found_content_fields:
242
+ debug_info['recommendations'].append(f"Found potential content fields: {found_content_fields}")
 
243
  else:
244
+ debug_info['recommendations'].append("No standard content fields found. Check field names.")
 
245
 
246
+ elif isinstance(scraped_data, list):
247
+ debug_info['data_structure'] = {
248
+ 'is_list': True,
249
+ 'length': len(scraped_data),
250
+ 'first_item_type': str(type(scraped_data[0])) if scraped_data else 'empty'
251
+ }
252
+
253
+ if scraped_data and isinstance(scraped_data[0], dict):
254
+ debug_info['sample_content']['first_item_keys'] = list(scraped_data[0].keys())
255
+
256
+ else:
257
+ debug_info['recommendations'].append(f"Unexpected data type: {type(scraped_data)}")
258
+
259
+ except Exception as e:
260
+ debug_info['error'] = f"Debug analysis failed: {str(e)}"
261
 
262
+ return debug_info
263
+
264
+
265
+ def create_test_scraped_data() -> List[Dict[str, Any]]:
266
+ """Create test data in various formats that scrapers might return"""
267
+
268
+ # Format 1: Standard format
269
+ format1 = {
270
+ 'content': 'This is the main content of the page about AI optimization.',
271
+ 'title': 'AI Optimization Guide',
272
+ 'url': 'https://example.com/ai-guide'
273
+ }
274
+
275
+ # Format 2: Different field names
276
+ format2 = {
277
+ 'text': 'Content about machine learning best practices.',
278
+ 'page_title': 'ML Best Practices',
279
+ 'link': 'https://example.com/ml-practices'
280
+ }
281
+
282
+ # Format 3: Nested structure
283
+ format3 = {
284
+ 'page_data': {
285
+ 'body': 'Deep learning techniques for content optimization.',
286
+ 'heading': 'Deep Learning Guide'
287
+ },
288
+ 'metadata': {
289
+ 'source_url': 'https://example.com/deep-learning'
290
  }
291
+ }
292
 
293
+ return [format1, format2, format3]
294
+
295
+
296
+ # Usage example and testing
297
+ def test_data_integration():
298
+ """Test the data integration fixes"""
 
 
 
 
 
 
 
 
 
299
 
300
+ # Test with various data formats
301
+ test_data = create_test_scraped_data()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
302
 
303
+ # Debug the data first
304
+ for i, data in enumerate(test_data):
305
+ print(f"\n--- Debug Info for Test Data {i+1} ---")
306
+ debug_info = debug_scraped_data(data)
307
+ print(f"Data type: {debug_info['data_type']}")
308
+ print(f"Keys: {debug_info['data_structure'].get('keys', 'N/A')}")
309
+ print(f"Recommendations: {debug_info['recommendations']}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
310
 
311
+ # Test normalization
312
+ adapter = GEODataAdapter()
313
+ normalized = adapter.normalize_scraped_data(test_data)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
314
 
315
+ print(f"\n--- Normalization Results ---")
316
+ print(f"Original items: {len(test_data)}")
317
+ print(f"Normalized items: {len(normalized)}")
318
+
319
+ for i, item in enumerate(normalized):
320
+ print(f"Item {i+1}: Title='{item['title']}', Content length={len(item['content'])}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
321
 
322
+ # Test validation
323
+ validation = adapter.validate_normalized_data(normalized)
324
+ print(f"\n--- Validation Results ---")
325
+ print(f"Valid pages: {validation['valid_pages']}/{validation['total_pages']}")
326
+ print(f"Issues: {validation['issues']}")
327
+
328
+
329
+ if __name__ == "__main__":
330
+ test_data_integration()