Files changed (8) hide show
  1. app.py +104 -976
  2. demo.json +0 -58
  3. requirements.txt +0 -6
  4. utils/chunker.py +0 -1314
  5. utils/export.py +0 -1896
  6. utils/optimizer.py +0 -354
  7. utils/parser.py +0 -549
  8. utils/scorer.py +0 -484
app.py CHANGED
@@ -1,983 +1,111 @@
1
- """
2
- Main Streamlit Application - GEO SEO AI Optimizer with RAG-Enhanced Content Optimization
3
- Entry point for the application with UI components
4
- """
5
-
6
- import streamlit as st
7
  import os
8
  import tempfile
9
- import json
10
- from typing import Dict, Any, List
11
- import time
12
-
13
- # Import our custom modules
14
- from utils.parser import PDFParser, TextParser, WebpageParser
15
- from utils.scorer import GEOScorer
16
- from utils.optimizer import ContentOptimizer # This will be your enhanced version
17
- from utils.chunker import VectorChunker
18
- from utils.export import ResultExporter
19
 
20
- # Import LangChain components
21
- from langchain_groq import ChatGroq
22
  from langchain_community.embeddings import HuggingFaceEmbeddings
23
- from langchain.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate
24
- from langchain_core.messages import AIMessage, HumanMessage
25
-
26
- class GEOSEOApp:
27
- """Main application class that orchestrates all components"""
28
-
29
- def __init__(self):
30
- self.setup_config()
31
- self.setup_models()
32
- self.setup_parsers()
33
- self.setup_components()
34
-
35
- def setup_config(self):
36
- """Initialize configuration and API keys"""
37
- self.groq_api_key = os.getenv("GROQ_API_KEY", "your-groq-api-key")
38
- self.hf_api_key = os.getenv("HUGGINGFACE_API_KEY", "your-huggingface-api-key")
39
-
40
- # Create data directory if it doesn't exist
41
- os.makedirs("data/uploaded_files", exist_ok=True)
42
-
43
- def setup_models(self):
44
- """Initialize LLM and embedding models"""
45
- self.llm = ChatGroq(
46
- api_key=self.groq_api_key,
47
- model_name="llama-3.1-8b-instant",
48
- temperature=0.1
49
- )
50
-
51
- self.embeddings = HuggingFaceEmbeddings(
52
- model_name="sentence-transformers/all-MiniLM-L6-v2",
53
- model_kwargs={"device": "cpu"}
54
- # model_name="sentence-transformers/all-MiniLM-L6-v2",
55
- # model_kwargs={"device": "cpu"},
56
- # cache_folder="./hf_caches",
57
- )
58
-
59
- def setup_parsers(self):
60
- """Initialize content parsers"""
61
- self.pdf_parser = PDFParser()
62
- self.text_parser = TextParser()
63
- self.webpage_parser = WebpageParser()
64
-
65
- def setup_components(self):
66
- """Initialize processing components with RAG integration"""
67
- self.geo_scorer = GEOScorer(self.llm)
68
- self.vector_chunker = VectorChunker(self.embeddings)
69
-
70
- # Enhanced content optimizer with RAG capabilities
71
- self.content_optimizer = ContentOptimizer(self.llm, self.vector_chunker)
72
-
73
- self.result_exporter = ResultExporter()
74
-
75
- def run(self):
76
- """Main application runner"""
77
- st.set_page_config(
78
- page_title="GEO SEO AI Optimizer",
79
- page_icon="🚀",
80
- layout="wide"
81
- )
82
-
83
- st.title("🚀 GEO SEO AI Optimizer")
84
- st.markdown("*Optimize your content for AI search engines and LLM systems with RAG-enhanced analysis*")
85
-
86
- # Sidebar
87
- self.render_sidebar()
88
-
89
- # Main tabs
90
- tab1, tab2, tab3, tab4 = st.tabs([
91
- "🌐 Website GEO Analysis",
92
- "🔧 GEO Content Enhancement",
93
- "📄 Document Q&A",
94
- "🧠 Generate GEO Content",
95
- ])
96
-
97
- with tab1:
98
- self.render_website_analysis_tab()
99
-
100
- with tab2:
101
- self.render_geo_content_enhancement_tab()
102
-
103
- with tab3:
104
- self.render_document_qa_tab()
105
- with tab4:
106
- self.render_generate_geo_content_tab()
107
-
108
-
109
- def render_sidebar(self):
110
- """Render sidebar with information and controls"""
111
- st.sidebar.title("🛠️ GEO Tools")
112
- st.sidebar.markdown("- 🌐 Website GEO Analysis")
113
- st.sidebar.markdown("- 🔧 RAG-Enhanced Content Optimization")
114
- st.sidebar.markdown("- 📊 AI-First SEO Scoring")
115
- st.sidebar.markdown("- 📄 Document Q&A with RAG")
116
- st.sidebar.markdown("- 🧠 Generate GEO Content")
117
-
118
- st.sidebar.markdown("---")
119
- st.sidebar.markdown("### 📖 GEO Metrics")
120
- st.sidebar.markdown("**AI Search Visibility**: How likely AI engines will surface your content")
121
- st.sidebar.markdown("**Query Intent Matching**: How well content matches user queries")
122
- st.sidebar.markdown("**Conversational Readiness**: Suitability for AI chat responses")
123
- st.sidebar.markdown("**Citation Worthiness**: Probability of being cited by AI")
124
- st.sidebar.markdown("**Context Completeness**: How self-contained the content is")
125
- st.sidebar.markdown("**Semantic Richness**: Depth of topic coverage")
126
-
127
- st.sidebar.markdown("---")
128
- st.sidebar.markdown("### 🧠 RAG Enhancement")
129
- st.sidebar.markdown("- **Knowledge Base**: GEO best practices")
130
- st.sidebar.markdown("- **Contextual Analysis**: AI-informed optimization")
131
- st.sidebar.markdown("- **Entity Extraction**: AI-powered entity recognition")
132
- st.sidebar.markdown("- **Competitive Analysis**: Gap identification")
133
-
134
- def render_geo_content_enhancement_tab(self):
135
- """Render GEO Content Enhancement tab with RAG integration"""
136
- st.header("🔧 GEO Content Enhancement with RAG")
137
- st.markdown("Analyze and optimize your content using AI-powered Generative Engine Optimization with RAG-enhanced knowledge base.")
138
-
139
- # Content input
140
- input_text = st.text_area(
141
- "Enter content to analyze and enhance:",
142
- height=200,
143
- key="geo_enhancement_input",
144
- help="Paste your content here for GEO optimization using RAG-enhanced analysis"
145
- )
146
-
147
- # GEO Optimization type selector
148
- st.markdown("### ⚙️ GEO Optimization Settings")
149
- col1, col2 = st.columns(2)
150
-
151
- with col1:
152
- optimization_type = st.selectbox(
153
- "Select GEO Optimization Type:",
154
- options=[
155
- "geo_standard",
156
- # "competitive_geo",
157
- # "geo_readability",
158
- # "geo_entity_extraction",
159
- # "geo_variations",
160
- # "geo_batch_optimize"
161
- ],
162
- format_func=lambda x: {
163
- "geo_standard": "🔧 Standard GEO Enhancement",
164
- # "competitive_geo": "📊 Competitive GEO Analysis",
165
- # "geo_readability": "📖 GEO Readability Analysis",
166
- # "geo_entity_extraction": "🏷️ GEO Entity Extraction",
167
- # "geo_variations": "🔄 GEO Content Variations",
168
- # "geo_batch_optimize": "📦 Batch GEO Optimization"
169
- }[x],
170
- index=0,
171
- help="Choose the type of GEO optimization powered by RAG analysis"
172
- )
173
-
174
- with col2:
175
- # Additional options based on optimization type
176
- if optimization_type in ["geo_standard", "competitive_geo"]:
177
- analyze_only = st.checkbox("Analysis", value=True)
178
- include_rag_context = st.checkbox("Include RAG context details", value=True)
179
- # elif optimization_type == "geo_variations":
180
- # num_variations = st.slider("Number of variations", min_value=1, max_value=3, value=2)
181
- # analyze_only = False
182
- # include_rag_context = True
183
- # elif optimization_type == "geo_batch_optimize":
184
- # st.info("For batch optimization, separate multiple content pieces with '---' divider")
185
- # analyze_only = False
186
- # include_rag_context = True
187
- else:
188
- analyze_only = False
189
- include_rag_context = True
190
-
191
- # Show description based on optimization type
192
- optimization_descriptions = {
193
- "geo_standard": "🔧 RAG-enhanced GEO optimization focusing on AI search visibility, conversational readiness, and citation worthiness using knowledge base guidance.",
194
- # "competitive_geo": "📊 Competitive GEO analysis against best practices with gap identification and actionable recommendations using RAG context.",
195
- # "geo_readability": "📖 Detailed readability analysis specifically optimized for AI systems and LLM consumption patterns.",
196
- # "geo_entity_extraction": "🏷️ AI-powered extraction of key entities, topics, and concepts relevant for GEO optimization.",
197
- # "geo_variations": "🔄 Generate multiple GEO-optimized variations (FAQ, conversational, authoritative) using RAG knowledge.",
198
- # "geo_batch_optimize": "📦 Process multiple content pieces simultaneously with consistent GEO optimization."
199
- }
200
-
201
- st.info(f"**{optimization_descriptions[optimization_type]}**")
202
-
203
- # Knowledge base status
204
- if hasattr(self.content_optimizer, 'geo_knowledge'):
205
- st.success(f"✅ RAG Knowledge Base Loaded: {len(self.content_optimizer.geo_knowledge)} GEO best practice documents")
206
- else:
207
- st.warning("⚠️ RAG Knowledge Base not available - falling back to standard optimization")
208
-
209
- # Submit button
210
- if st.button("🚀 Process Content with GEO+RAG", key="geo_enhancement_submit"):
211
- if not input_text.strip():
212
- st.warning("Please enter some content to analyze.")
213
- return
214
-
215
- try:
216
- with st.spinner(f"Processing content with {optimization_type} using RAG-enhanced GEO analysis..."):
217
- # Handle different GEO optimization types
218
- if optimization_type == "geo_standard":
219
- result = self.content_optimizer.optimize_content_with_rag(
220
- input_text,
221
- optimization_type="geo_standard",
222
- analyze_only=analyze_only
223
- )
224
-
225
- elif optimization_type == "competitive_geo":
226
- result = self.content_optimizer.optimize_content_with_rag(
227
- input_text,
228
- optimization_type="competitive_geo",
229
- analyze_only=analyze_only
230
- )
231
-
232
- elif optimization_type == "geo_readability":
233
- result = self.content_optimizer.analyze_geo_readability(input_text)
234
-
235
- elif optimization_type == "geo_entity_extraction":
236
- result = self.content_optimizer.extract_geo_entities(input_text)
237
-
238
- elif optimization_type == "geo_variations":
239
- result = self.content_optimizer.generate_geo_variations(
240
- input_text,
241
- num_variations=num_variations
242
- )
243
-
244
- elif optimization_type == "geo_batch_optimize":
245
- # Split content by '---' separator
246
- content_pieces = [piece.strip() for piece in input_text.split('---') if piece.strip()]
247
- if len(content_pieces) > 1:
248
- result = self.content_optimizer.batch_optimize_with_rag(content_pieces)
249
- else:
250
- st.warning("For batch optimization, please separate content pieces with '---'")
251
- return
252
-
253
- if isinstance(result, list):
254
- # Handle list results (variations, batch)
255
- if any(r.get("error") for r in result):
256
- failed_results = [r for r in result if r.get("error")]
257
- st.error(f"Some processing failed: {len(failed_results)} out of {len(result)} items")
258
- else:
259
- st.success("All content processed successfully!")
260
- elif result.get("error"):
261
- st.error(f"Processing failed: {result['error']}")
262
- return
263
- else:
264
- st.success(f"{optimization_type.replace('_', ' ').title()} completed successfully!")
265
-
266
- # Display results based on optimization type
267
- self.display_geo_enhancement_results(result, optimization_type, input_text, include_rag_context)
268
-
269
- except Exception as e:
270
- st.error(f"An error occurred: {str(e)}")
271
-
272
- def display_geo_enhancement_results(self, result, optimization_type, original_text, include_rag_context=True):
273
- """Display results based on GEO optimization type"""
274
-
275
- if optimization_type == "geo_batch_optimize":
276
- self.display_geo_batch_results(result)
277
- elif optimization_type == "geo_variations":
278
- self.display_geo_variation_results(result)
279
- elif optimization_type == "geo_readability":
280
- self.display_geo_readability_results(result)
281
- elif optimization_type == "geo_entity_extraction":
282
- self.display_geo_entity_results(result)
283
- else:
284
- self.display_standard_geo_results(result, optimization_type, include_rag_context)
285
-
286
- # Export functionality
287
- self.display_geo_export_options(result, optimization_type, original_text)
288
-
289
- def display_standard_geo_results(self, result, optimization_type, include_rag_context):
290
- """Display results for standard and competitive GEO optimizations"""
291
- st.markdown("### 📊 GEO Analysis Results")
292
-
293
- # Show GEO scores if available
294
- geo_analysis = result.get("geo_analysis", {})
295
- if geo_analysis:
296
- st.markdown("#### 🎯 GEO Performance Metrics")
297
-
298
- col1, col2, col3 = st.columns(3)
299
- with col1:
300
- current_score = geo_analysis.get("current_geo_score", 0)
301
- st.metric("Overall GEO Score", f"{current_score}/10")
302
-
303
- with col2:
304
- ai_visibility = geo_analysis.get("ai_search_visibility", 0)
305
- st.metric("AI Search Visibility", f"{ai_visibility}/10")
306
-
307
- with col3:
308
- citation_worthy = geo_analysis.get("citation_worthiness", 0)
309
- st.metric("Citation Worthiness", f"{citation_worthy}/10")
310
-
311
- # Second row of metrics
312
- col1, col2, col3 = st.columns(3)
313
- with col1:
314
- query_matching = geo_analysis.get("query_intent_matching", 0)
315
- st.metric("Query Intent Match", f"{query_matching}/10")
316
-
317
- with col2:
318
- conversational = geo_analysis.get("conversational_readiness", 0)
319
- st.metric("Conversational Ready", f"{conversational}/10")
320
-
321
- with col3:
322
- context_complete = geo_analysis.get("context_completeness", 0)
323
- st.metric("Context Complete", f"{context_complete}/10")
324
-
325
- # Show optimization opportunities
326
- opportunities = result.get("optimization_opportunities", [])
327
- if opportunities:
328
- st.markdown("#### 🚀 Optimization Opportunities")
329
-
330
- high_priority = [opp for opp in opportunities if opp.get('priority') == 'high']
331
- medium_priority = [opp for opp in opportunities if opp.get('priority') == 'medium']
332
-
333
- if high_priority:
334
- st.markdown("##### 🔴 High Priority")
335
- for opp in high_priority:
336
- st.write(f"**{opp.get('type', 'Optimization')}**: {opp.get('description', '')}")
337
- if opp.get('expected_impact'):
338
- st.write(f"*Expected Impact: {opp.get('expected_impact')}*")
339
- st.write("---")
340
-
341
- if medium_priority:
342
- st.markdown("##### 🟡 Medium Priority")
343
- for opp in medium_priority:
344
- st.write(f"**{opp.get('type', 'Optimization')}**: {opp.get('description', '')}")
345
- if opp.get('expected_impact'):
346
- st.write(f"*Expected Impact: {opp.get('expected_impact')}*")
347
- st.write("---")
348
-
349
- # Show GEO keywords and entities
350
- geo_keywords = result.get("geo_keywords", {})
351
- if geo_keywords:
352
- st.markdown("#### 🔑 GEO Keywords & Entities")
353
-
354
- col1, col2 = st.columns(2)
355
- with col1:
356
- primary_entities = geo_keywords.get("primary_entities", [])
357
- if primary_entities:
358
- st.write("**Primary Entities:**")
359
- st.write(", ".join(primary_entities))
360
-
361
- semantic_terms = geo_keywords.get("semantic_terms", [])
362
- if semantic_terms:
363
- st.write("**Semantic Terms:**")
364
- st.write(", ".join(semantic_terms))
365
-
366
- with col2:
367
- question_patterns = geo_keywords.get("question_patterns", [])
368
- if question_patterns:
369
- st.write("**Question Patterns:**")
370
- for q in question_patterns:
371
- st.write(f"• {q}")
372
-
373
- related_concepts = geo_keywords.get("related_concepts", [])
374
- if related_concepts:
375
- st.write("**Related Concepts:**")
376
- st.write(", ".join(related_concepts))
377
-
378
- # Show optimized content
379
- optimized_content = result.get("optimized_content", {})
380
- if optimized_content:
381
- enhanced_text = optimized_content.get("enhanced_text", "")
382
- if enhanced_text:
383
- st.markdown("#### ✨ GEO-Optimized Content")
384
- st.text_area(
385
- "Enhanced version:",
386
- value=enhanced_text,
387
- height=250,
388
- key="geo_optimized_output"
389
- )
390
-
391
- # Show structural improvements
392
- structural_improvements = optimized_content.get("structural_improvements", [])
393
- if structural_improvements:
394
- st.markdown("**Structural Improvements:**")
395
- for improvement in structural_improvements:
396
- st.write(f"• {improvement}")
397
-
398
- # Show semantic enhancements
399
- semantic_enhancements = optimized_content.get("semantic_enhancements", [])
400
- if semantic_enhancements:
401
- st.markdown("**Semantic Enhancements:**")
402
- for enhancement in semantic_enhancements:
403
- st.write(f"• {enhancement}")
404
-
405
- # Show competitive analysis if available
406
- if "competitive_gaps" in result:
407
- st.markdown("#### 📊 Competitive GEO Analysis")
408
- competitive_gaps = result["competitive_gaps"]
409
-
410
- col1, col2 = st.columns(2)
411
- with col1:
412
- missing_questions = competitive_gaps.get("missing_question_patterns", [])
413
- if missing_questions:
414
- st.write("**Missing Question Patterns:**")
415
- for q in missing_questions:
416
- st.write(f"• {q}")
417
-
418
- entity_gaps = competitive_gaps.get("entity_gaps", [])
419
- if entity_gaps:
420
- st.write("**Entity Gaps:**")
421
- st.write(", ".join(entity_gaps))
422
-
423
- with col2:
424
- semantic_opportunities = competitive_gaps.get("semantic_opportunities", [])
425
- if semantic_opportunities:
426
- st.write("**Semantic Opportunities:**")
427
- st.write(", ".join(semantic_opportunities))
428
-
429
- structural_weaknesses = competitive_gaps.get("structural_weaknesses", [])
430
- if structural_weaknesses:
431
- st.write("**Structural Weaknesses:**")
432
- for weakness in structural_weaknesses:
433
- st.write(f"• {weakness}")
434
-
435
- # Show recommendations
436
- recommendations = result.get("recommendations", [])
437
- if recommendations:
438
- st.markdown("#### 💡 GEO Recommendations")
439
- for i, rec in enumerate(recommendations, 1):
440
- st.write(f"**{i}.** {rec}")
441
-
442
- # RAG context information
443
- if include_rag_context and result.get("rag_enhanced"):
444
- with st.expander("🧠 RAG Enhancement Details"):
445
- st.write("**RAG Status:** ✅ Knowledge base successfully applied")
446
- st.write(f"**Knowledge Sources:** {result.get('knowledge_sources', 'Multiple')} GEO best practice documents")
447
- st.write(f"**Enhancement Type:** {result.get('optimization_type', 'Standard')}")
448
-
449
- if result.get('parsing_error'):
450
- st.warning(f"**Parsing Note:** {result['parsing_error']}")
451
-
452
- def display_geo_batch_results(self, results):
453
- """Display batch GEO optimization results"""
454
- st.markdown("### 📦 Batch GEO Processing Results")
455
-
456
- successful_results = [r for r in results if not r.get('error')]
457
- failed_results = [r for r in results if r.get('error')]
458
-
459
- col1, col2, col3 = st.columns(3)
460
- with col1:
461
- st.metric("Total Pieces", len(results))
462
- with col2:
463
- st.metric("Successful", len(successful_results))
464
- with col3:
465
- st.metric("Failed", len(failed_results))
466
-
467
- # Show individual results
468
- for result in results:
469
- idx = result.get('batch_index', 0)
470
- st.markdown(f"#### Content Piece {idx + 1}")
471
-
472
- if result.get('error'):
473
- st.error(f"Processing failed: {result['error']}")
474
- else:
475
- # Show GEO scores
476
- geo_analysis = result.get("geo_analysis", {})
477
- if geo_analysis:
478
- col1, col2, col3 = st.columns(3)
479
- with col1:
480
- st.metric("GEO Score", f"{geo_analysis.get('current_geo_score', 0):.1f}")
481
- with col2:
482
- st.metric("AI Visibility", f"{geo_analysis.get('ai_search_visibility', 0):.1f}")
483
- with col3:
484
- st.metric("Citation Worthy", f"{geo_analysis.get('citation_worthiness', 0):.1f}")
485
-
486
- # Show optimized content if available
487
- optimized_content = result.get("optimized_content", {})
488
- enhanced_text = optimized_content.get("enhanced_text", "")
489
- if enhanced_text:
490
- with st.expander("View GEO-optimized content"):
491
- st.text_area("", value=enhanced_text[:500] + "...", height=150, key=f"batch_geo_output_{idx}")
492
-
493
- st.write("---")
494
-
495
- def display_geo_variation_results(self, variations):
496
- """Display GEO content variation results"""
497
- st.markdown("### 🔄 GEO Content Variations")
498
-
499
- for i, variation in enumerate(variations):
500
- if variation.get('error'):
501
- st.error(f"Variation {i+1} failed: {variation['error']}")
502
- continue
503
-
504
- variation_type = variation.get('variation_type', f'Variation {i+1}')
505
- st.markdown(f"#### {variation_type.replace('_', ' ').title()} Version")
506
-
507
- # Show GEO improvements
508
- geo_improvements = variation.get('geo_improvements', [])
509
- if geo_improvements:
510
- st.write("**GEO Improvements:**")
511
- for improvement in geo_improvements:
512
- st.write(f"• {improvement}")
513
-
514
- # Show target AI systems
515
- target_ai_systems = variation.get('target_ai_systems', [])
516
- if target_ai_systems:
517
- st.write(f"**Optimized For:** {', '.join(target_ai_systems)}")
518
-
519
- # Show expected benefits
520
- expected_benefits = variation.get('expected_geo_benefits', [])
521
- if expected_benefits:
522
- st.write("**Expected GEO Benefits:**")
523
- for benefit in expected_benefits:
524
- st.write(f"• {benefit}")
525
-
526
- # Show optimized content
527
- optimized_content = variation.get('optimized_content', '')
528
- if optimized_content:
529
- st.text_area(
530
- f"{variation_type} content:",
531
- value=optimized_content,
532
- height=200,
533
- key=f"geo_variation_{i}"
534
- )
535
-
536
- st.write("---")
537
-
538
- def display_geo_readability_results(self, result):
539
- """Display GEO readability analysis results"""
540
- st.markdown("### 📖 GEO Readability Analysis")
541
-
542
- # Basic GEO metrics
543
- geo_metrics = result.get('geo_readability_metrics', {})
544
- if geo_metrics:
545
- st.markdown("#### 📊 GEO Content Metrics")
546
- col1, col2, col3, col4 = st.columns(4)
547
-
548
- with col1:
549
- st.metric("Total Words", geo_metrics.get('total_words', 0))
550
- with col2:
551
- st.metric("Questions", geo_metrics.get('questions_count', 0))
552
- with col3:
553
- st.metric("Headings", geo_metrics.get('headings_count', 0))
554
- with col4:
555
- st.metric("Lists", geo_metrics.get('lists_count', 0))
556
-
557
- # Second row
558
- col1, col2, col3, col4 = st.columns(4)
559
- with col1:
560
- st.metric("Entity Mentions", geo_metrics.get('entity_mentions', 0))
561
- with col2:
562
- st.metric("Data Points", geo_metrics.get('numeric_data_points', 0))
563
- with col3:
564
- st.metric("Paragraphs", geo_metrics.get('total_paragraphs', 0))
565
- with col4:
566
- geo_score = result.get('geo_readability_score', 0)
567
- st.metric("GEO Readability", f"{geo_score}/10")
568
-
569
- # AI optimization indicators
570
- ai_indicators = result.get('ai_optimization_indicators', {})
571
- if ai_indicators:
572
- st.markdown("#### 🤖 AI Optimization Indicators")
573
- col1, col2 = st.columns(2)
574
-
575
- with col1:
576
- question_ratio = ai_indicators.get('question_ratio', 0)
577
- st.metric("Question Ratio", f"{question_ratio:.2%}")
578
- structure_score = ai_indicators.get('structure_score', 0)
579
- st.metric("Structure Score", f"{structure_score:.1f}/10")
580
-
581
- with col2:
582
- entity_density = ai_indicators.get('entity_density', 0)
583
- st.metric("Entity Density", f"{entity_density:.2%}")
584
- data_richness = ai_indicators.get('data_richness', 0)
585
- st.metric("Data Richness", f"{data_richness:.2%}")
586
-
587
- # GEO recommendations
588
- geo_recommendations = result.get('geo_recommendations', [])
589
- if geo_recommendations:
590
- st.markdown("#### 💡 GEO Optimization Recommendations")
591
- for i, rec in enumerate(geo_recommendations, 1):
592
- st.write(f"**{i}.** {rec}")
593
-
594
- def display_geo_entity_results(self, result):
595
- """Display GEO entity extraction results"""
596
- st.markdown("### 🏷️ GEO Entity Analysis")
597
-
598
- if result.get('error'):
599
- st.error(f"Entity extraction failed: {result['error']}")
600
- return
601
-
602
- geo_entities = result.get('geo_entities', {})
603
- if geo_entities:
604
- # Display extracted entities
605
- for entity_type, entity_data in geo_entities.items():
606
- if entity_data:
607
- st.markdown(f"#### {entity_type.replace('_', ' ').title()}")
608
- st.write(entity_data)
609
- st.write("---")
610
-
611
- # Extraction metadata
612
- extraction_success = result.get('extraction_success', False)
613
- if extraction_success:
614
- st.success("✅ Entity extraction completed successfully")
615
- st.write(f"**Content Length:** {result.get('content_length', 0)} characters")
616
- st.write(f"**Extraction Method:** {result.get('extraction_method', 'Unknown')}")
617
-
618
- def display_geo_export_options(self, result, optimization_type, original_text):
619
- """Display export options for GEO results"""
620
- st.markdown("### 📥 Export GEO Results")
621
-
622
- # Prepare export data
623
- export_data = {
624
- 'timestamp': time.time(),
625
- 'optimization_type': optimization_type,
626
- 'original_text': original_text,
627
- 'original_word_count': len(original_text.split()),
628
- 'geo_results': result,
629
- 'rag_enhanced': result.get('rag_enhanced', False) if not isinstance(result, list) else any(r.get('rag_enhanced', False) for r in result),
630
- 'knowledge_sources': result.get('knowledge_sources', 0) if not isinstance(result, list) else 'multiple'
631
- }
632
-
633
- # Serialize data to JSON
634
- export_json = json.dumps(export_data, indent=2, default=str)
635
-
636
- # Add download button
637
- st.download_button(
638
- label="📥 Download GEO Analysis Report",
639
- data=export_json,
640
- file_name=f"geo_{optimization_type}_analysis_{int(time.time())}.json",
641
- mime="application/json"
642
- )
643
 
644
- # Keep existing methods for other tabs (render_document_qa_tab, render_website_analysis_tab, etc.)
645
- # ... (rest of the methods remain the same as in your original code)
646
-
647
- def render_document_qa_tab(self):
648
- """Render Document Q&A tab"""
649
- st.header("📄 Document Question Answering")
650
- st.markdown("Upload documents or paste text to ask questions using RAG.")
651
-
652
- # File upload
653
- uploaded_file = st.file_uploader("Upload a PDF file", type=["pdf"])
654
-
655
- # Text input
656
- pasted_text = st.text_area("Or paste text directly:", height=150)
657
-
658
- # Question input
659
- user_query = st.text_input("Ask a question about the content:")
660
-
661
- # Submit button
662
- if st.button("🔍 Ask Question", key="qa_submit"):
663
- if not user_query.strip():
664
- st.warning("Please enter a question.")
665
- return
666
-
667
- try:
668
- # Parse content
669
- documents = []
670
-
671
- if uploaded_file:
672
- with st.spinner("Processing PDF..."):
673
- # Save uploaded file temporarily
674
- temp_path = self.save_uploaded_file(uploaded_file)
675
- documents = self.pdf_parser.parse(temp_path)
676
- os.unlink(temp_path) # Clean up
677
-
678
- elif pasted_text.strip():
679
- with st.spinner("Processing text..."):
680
- documents = self.text_parser.parse(pasted_text)
681
-
682
- else:
683
- st.warning("Please upload a PDF or paste some text.")
684
- return
685
-
686
- # Create vector store and answer question
687
- with st.spinner("Creating embeddings and searching..."):
688
- qa_chain = self.vector_chunker.create_qa_chain(documents, self.llm)
689
- result = qa_chain({"query": user_query})
690
-
691
- # Display results
692
- st.markdown("### 💬 Answer")
693
- st.write(result["result"])
694
-
695
- # Show sources
696
- with st.expander("📄 Source Documents"):
697
- for i, doc in enumerate(result.get("source_documents", [])):
698
- st.write(f"**Source {i+1}:**")
699
- content = doc.page_content
700
- st.write(content[:500] + "..." if len(content) > 500 else content)
701
- if hasattr(doc, 'metadata') and doc.metadata:
702
- st.write(f"*Metadata: {doc.metadata}*")
703
- st.write("---")
704
-
705
- except Exception as e:
706
- st.error(f"An error occurred: {str(e)}")
707
-
708
- def render_website_analysis_tab(self):
709
- """Render Website GEO Analysis tab"""
710
- st.header("🌐 Website GEO Analysis")
711
- st.markdown("Analyze websites for Generative Engine Optimization (GEO) performance.")
712
-
713
- # URL input
714
- col1, col2 = st.columns([3, 1])
715
-
716
- with col1:
717
- website_url = st.text_input(
718
- "Enter website URL:",
719
- placeholder="https://example.com"
720
- )
721
-
722
- with col2:
723
- max_pages = st.selectbox("Pages to analyze:", [1, 3, 5], index=0)
724
-
725
- # Analysis options
726
- col1, col2 = st.columns(2)
727
- with col1:
728
- include_subpages = st.checkbox("Include subpages", value=False)
729
- with col2:
730
- detailed_analysis = st.checkbox("Detailed analysis", value=True)
731
-
732
- # Submit button
733
- if st.button("🌐 Analyze Website", key="website_analyze"):
734
- if not website_url.strip():
735
- st.warning("Please enter a website URL.")
736
- return
737
-
738
- try:
739
- # Normalize URL
740
- if not website_url.startswith(('http://', 'https://')):
741
- website_url = 'https://' + website_url
742
-
743
- with st.spinner(f"Analyzing website: {website_url}"):
744
- # Parse website content
745
- pages_data = self.webpage_parser.parse_website(
746
- website_url,
747
- max_pages=max_pages,
748
- include_subpages=include_subpages
749
- )
750
-
751
- if not pages_data:
752
- st.error("Could not extract content from the website.")
753
- return
754
-
755
- st.success(f"Successfully extracted content from {len(pages_data)} page(s)")
756
-
757
- # Analyze GEO scores
758
- with st.spinner("Calculating GEO scores..."):
759
- geo_results = []
760
-
761
- for i, page_data in enumerate(pages_data):
762
- with st.spinner(f"Analyzing page {i+1}/{len(pages_data)}..."):
763
- analysis = self.geo_scorer.analyze_page_geo(
764
- page_data['content'],
765
- page_data['title'],
766
- detailed=detailed_analysis
767
- )
768
-
769
- if not analysis.get('error'):
770
- analysis['page_data'] = page_data
771
- geo_results.append(analysis)
772
- else:
773
- st.warning(f"Could not analyze page {i+1}: {analysis['error']}")
774
-
775
- if not geo_results:
776
- st.error("Could not analyze any pages from the website.")
777
- return
778
-
779
- # Display results
780
- self.display_geo_results(geo_results, website_url)
781
-
782
- # Export functionality
783
- st.markdown("### 📥 Export Results")
784
- if st.button("📊 Generate Full Report"):
785
- report_data = self.result_exporter.export_geo_results(
786
- geo_results,
787
- website_url
788
- )
789
-
790
- st.download_button(
791
- label="Download GEO Report",
792
- data=json.dumps(report_data, indent=2),
793
- file_name=f"geo_analysis_{website_url.replace('https://', '').replace('/', '_')}.json",
794
- mime="application/json"
795
- )
796
-
797
- except Exception as e:
798
- st.error(f"An error occurred during website analysis: {str(e)}")
799
-
800
- def display_geo_results(self, geo_results: List[Dict], website_url: str):
801
- """Display GEO analysis results"""
802
- st.markdown("## 📊 GEO Analysis Results")
803
-
804
- # Calculate average scores
805
- avg_scores = self.calculate_average_scores(geo_results)
806
- overall_avg = sum(avg_scores.values()) / len(avg_scores) if avg_scores else 0
807
-
808
- # Main score display
809
- col1, col2, col3 = st.columns([1, 2, 1])
810
- with col2:
811
- st.metric(
812
- "Overall GEO Score",
813
- f"{overall_avg:.1f}/10",
814
- delta=f"{overall_avg - 7.0:.1f}" if overall_avg != 7.0 else None
815
- )
816
-
817
- # Individual metrics
818
- st.markdown("### 📈 Detailed GEO Metrics")
819
-
820
- # First row of metrics
821
- col1, col2, col3, col4 = st.columns(4)
822
- metrics_row1 = [
823
- ("AI Search Visibility", "ai_search_visibility"),
824
- ("Query Intent Match", "query_intent_matching"),
825
- ("Factual Accuracy", "factual_accuracy"),
826
- ("Conversational Ready", "conversational_readiness")
827
- ]
828
-
829
- for i, (display_name, key) in enumerate(metrics_row1):
830
- with [col1, col2, col3, col4][i]:
831
- score = avg_scores.get(key, 0)
832
- st.metric(display_name, f"{score:.1f}")
833
-
834
- # Second row of metrics
835
- col1, col2, col3, col4 = st.columns(4)
836
- metrics_row2 = [
837
- ("Semantic Richness", "semantic_richness"),
838
- ("Context Complete", "context_completeness"),
839
- ("Citation Worthy", "citation_worthiness"),
840
- ("Multi-Query Cover", "multi_query_coverage")
841
- ]
842
-
843
- for i, (display_name, key) in enumerate(metrics_row2):
844
- with [col1, col2, col3, col4][i]:
845
- score = avg_scores.get(key, 0)
846
- st.metric(display_name, f"{score:.1f}")
847
-
848
- # Recommendations
849
- self.display_recommendations(geo_results)
850
-
851
- # Detailed page analysis
852
- with st.expander("📋 Detailed Page Analysis"):
853
- for i, analysis in enumerate(geo_results):
854
- page_data = analysis.get('page_data', {})
855
- st.markdown(f"#### Page {i+1}: {page_data.get('title', 'Unknown Title')}")
856
- st.write(f"**URL**: {page_data.get('url', 'Unknown')}")
857
- st.write(f"**Word Count**: {page_data.get('word_count', 0)}")
858
-
859
- # Show topics and entities if available
860
- if 'primary_topics' in analysis:
861
- st.write(f"**Topics**: {', '.join(analysis['primary_topics'])}")
862
-
863
- if 'entities' in analysis:
864
- st.write(f"**Entities**: {', '.join(analysis['entities'])}")
865
-
866
- # Show page-specific scores
867
- if 'geo_scores' in analysis:
868
- scores = analysis['geo_scores']
869
- score_text = ", ".join([f"{k}: {v:.1f}" for k, v in scores.items()])
870
- st.write(f"**Scores**: {score_text}")
871
-
872
- st.write("---")
873
-
874
- def display_recommendations(self, geo_results: List[Dict]):
875
- """Display optimization recommendations"""
876
- st.markdown("### 💡 Optimization Recommendations")
877
-
878
- # Collect all recommendations
879
- all_recommendations = []
880
- all_opportunities = []
881
-
882
- for analysis in geo_results:
883
- all_recommendations.extend(analysis.get('recommendations', []))
884
- all_opportunities.extend(analysis.get('optimization_opportunities', []))
885
-
886
- # Remove duplicates and display
887
- unique_recommendations = list(set(all_recommendations))
888
-
889
- if unique_recommendations:
890
- for i, rec in enumerate(unique_recommendations[:5], 1):
891
- st.write(f"**{i}.** {rec}")
892
-
893
- # Priority opportunities
894
- if all_opportunities:
895
- st.markdown("#### 🚀 Priority Optimizations")
896
-
897
- high_priority = [opp for opp in all_opportunities if opp.get('priority') == 'high']
898
- medium_priority = [opp for opp in all_opportunities if opp.get('priority') == 'medium']
899
-
900
- if high_priority:
901
- st.markdown("##### 🔴 High Priority")
902
- for opp in high_priority[:3]:
903
- st.write(f"**{opp.get('type', 'Optimization')}**: {opp.get('description', 'No description')}")
904
-
905
- if medium_priority:
906
- st.markdown("##### 🟡 Medium Priority")
907
- for opp in medium_priority[:3]:
908
- st.write(f"**{opp.get('type', 'Optimization')}**: {opp.get('description', 'No description')}")
909
-
910
- def calculate_average_scores(self, geo_results: List[Dict]) -> Dict[str, float]:
911
- """Calculate average GEO scores across all pages"""
912
- if not geo_results:
913
- return {}
914
-
915
- # Get all score keys from the first result
916
- score_keys = list(geo_results[0].get('geo_scores', {}).keys())
917
- avg_scores = {}
918
-
919
- for key in score_keys:
920
- scores = [
921
- result['geo_scores'][key]
922
- for result in geo_results
923
- if 'geo_scores' in result and key in result['geo_scores']
924
- ]
925
- avg_scores[key] = sum(scores) / len(scores) if scores else 0
926
-
927
- return avg_scores
928
-
929
- def save_uploaded_file(self, uploaded_file) -> str:
930
- """Save uploaded file to temporary location"""
931
  with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file:
932
  tmp_file.write(uploaded_file.read())
933
- return tmp_file.name
934
-
935
- def render_generate_geo_content_tab(self):
936
- """Tab to generate fresh GEO-optimized content using system prompts"""
937
- st.header("🧠 Generate GEO Content")
938
- st.markdown("Use this tool to generate AI-optimized content from scratch based on your topic or query.")
939
-
940
- # User input
941
- user_prompt = st.text_area("Describe the content you want (e.g., topic, style, target audience):", height=150)
942
-
943
- # Continue chat option
944
- if "chat_history" not in st.session_state:
945
- st.session_state.chat_history = []
946
-
947
- if st.button("🧠 Generate Content"):
948
- if not user_prompt.strip():
949
- st.warning("Please enter a topic or description.")
950
- return
951
-
952
- # Add user message to chat history
953
- st.session_state.chat_history.append(HumanMessage(content=user_prompt))
954
-
955
- # Define system prompt for GEO content generation
956
- system_prompt = (
957
- "You are a Generative Engine Optimization (GEO) content creation specialist. "
958
- "Create content that is highly optimized for AI systems, LLMs, and generative search engines. "
959
- "Ensure the content includes rich semantics, clear structure, relevant keywords, and is suitable for conversational use, citations, and AI summaries."
960
- )
961
- st.session_state.chat_history.insert(0, SystemMessagePromptTemplate.from_template(system_prompt).format())
962
-
963
- with st.spinner("Generating GEO-optimized content..."):
964
- response = self.llm.invoke(st.session_state.chat_history)
965
- st.session_state.chat_history.append(AIMessage(content=response.content))
966
- st.success("✅ Content generated successfully!")
967
-
968
- # Display chat history
969
- for msg in st.session_state.chat_history:
970
- if isinstance(msg, HumanMessage):
971
- st.markdown(f"**🧑 You:** {msg.content}")
972
- elif isinstance(msg, AIMessage):
973
- st.markdown(f"**🤖 Assistant:** {msg.content}")
974
-
975
-
976
- def main():
977
- """Main entry point"""
978
- app = GEOSEOApp()
979
- app.run()
980
-
981
-
982
- if __name__ == "__main__":
983
- main()
 
 
 
 
 
 
 
1
  import os
2
  import tempfile
3
+ import streamlit as st
 
 
 
 
 
 
 
 
 
4
 
5
+ from langchain_community.document_loaders import PyPDFLoader
6
+ from langchain_community.vectorstores import FAISS
7
  from langchain_community.embeddings import HuggingFaceEmbeddings
8
+ from langchain.chains import RetrievalQA
9
+ from langchain.prompts import PromptTemplate
10
+ from langchain.schema import Document
11
+ # from langchain_groq import GroqLLM
12
+ from langchain_groq import ChatGroq
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
13
 
14
+ # --- Environment Variables ---
15
+ GROQ_API_KEY = os.getenv("GROQ_API_KEY", "your-groq-api-key")
16
+ HUGGINGFACE_API_KEY = os.getenv("HUGGINGFACE_API_KEY", "your-huggingface-api-key")
17
+
18
+ # --- Initialize Groq LLM ---
19
+ # llm = GroqLLM(
20
+ # api_key=GROQ_API_KEY,
21
+ # model="llama3-8b-8192",
22
+ # temperature=0.1
23
+ # )
24
+ llm = ChatGroq(
25
+ api_key=GROQ_API_KEY,
26
+ model_name="llama3-8b-8192", # Note: it's `model_name` not `model`
27
+ temperature=0.1
28
+ )
29
+
30
+ # --- HuggingFace Embeddings ---
31
+ embedding = HuggingFaceEmbeddings(
32
+ model_name="sentence-transformers/all-MiniLM-L6-v2",
33
+ cache_folder="./hf_cache",
34
+ # huggingfacehub_api_token=HUGGINGFACE_API_KEY
35
+ )
36
+ # embedding = HuggingFaceEmbeddings(
37
+ # model_name="sentence-transformers/all-MiniLM-L6-v2"
38
+ # )
39
+
40
+ # --- Streamlit UI ---
41
+ st.title("📄📥 Chat with PDF or Text using Groq + RAG")
42
+
43
+ # Option to upload PDF
44
+ uploaded_file = st.file_uploader("Upload a PDF file", type=["pdf"])
45
+
46
+ # Option to paste raw text
47
+ pasted_text = st.text_area("Or paste some text below:")
48
+
49
+ # User's question
50
+ user_query = st.text_input("Ask a question about the content")
51
+
52
+ # Submit button
53
+ submit_button = st.button("Submit")
54
+
55
+ if submit_button:
56
+ documents = []
57
+
58
+ # Handle uploaded PDF
59
+ if uploaded_file:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
60
  with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file:
61
  tmp_file.write(uploaded_file.read())
62
+ tmp_path = tmp_file.name
63
+
64
+ loader = PyPDFLoader(tmp_path)
65
+ documents = loader.load_and_split()
66
+
67
+ # Handle pasted text if no PDF
68
+ elif pasted_text.strip():
69
+ documents = [Document(page_content=pasted_text)]
70
+
71
+ else:
72
+ st.warning("Please upload a PDF or paste some text.")
73
+ st.stop()
74
+
75
+ # Create vector store
76
+ vectorstore = FAISS.from_documents(documents, embedding)
77
+ retriever = vectorstore.as_retriever()
78
+
79
+ # Optional custom prompt
80
+ prompt_template = PromptTemplate(
81
+ input_variables=["context", "question"],
82
+ template="""
83
+ You are an AI assistant. Use the following context to answer the question.
84
+ Be concise, accurate, and helpful.
85
+
86
+ Context: {context}
87
+ Question: {question}
88
+ Answer:"""
89
+ )
90
+
91
+ # QA Chain
92
+ qa_chain = RetrievalQA.from_chain_type(
93
+ llm=llm,
94
+ chain_type="stuff",
95
+ retriever=retriever,
96
+ return_source_documents=True,
97
+ chain_type_kwargs={"prompt": prompt_template}
98
+ )
99
+
100
+ # Run QA
101
+ result = qa_chain({"query": user_query})
102
+
103
+ # Show result
104
+ st.markdown("### 💬 Answer")
105
+ st.write(result["result"])
106
+
107
+ # Show sources (only if from PDF)
108
+ if uploaded_file:
109
+ with st.expander("📄 Sources"):
110
+ for i, doc in enumerate(result["source_documents"]):
111
+ st.write(f"**Page {i+1}** {doc.metadata.get('source', 'Unknown')}")
 
demo.json DELETED
@@ -1,58 +0,0 @@
1
- {
2
- "website_url": "https://example.com",
3
- "geo_results": [
4
- {
5
- "page_data": {
6
- "url": "https://example.com/page1",
7
- "title": "Example Page 1",
8
- "word_count": 500
9
- },
10
- "geo_scores": {
11
- "ai_search_visibility": 7.5,
12
- "query_intent_matching": 8.0,
13
- "factual_accuracy": 9.0,
14
- "conversational_readiness": 6.5,
15
- "semantic_richness": 7.0,
16
- "context_completeness": 8.5,
17
- "citation_worthiness": 7.8,
18
- "multi_query_coverage": 6.0
19
- },
20
- "overall_geo_score": 7.5,
21
- "primary_topics": ["SEO", "AI Optimization"],
22
- "entities": ["Google", "OpenAI"],
23
- "recommendations": [
24
- "Add more semantic keywords",
25
- "Improve conversational flow"
26
- ],
27
- "optimization_opportunities": [
28
- {
29
- "type": "semantic_enhancement",
30
- "description": "Add more related terms",
31
- "priority": "high"
32
- }
33
- ]
34
- }
35
- ],
36
- "enhancement_results": {
37
- "original_content": "Sample content for enhancement.",
38
- "analysis_date": "2024-06-01T12:00:00",
39
- "clarity_score": 8.5,
40
- "structure_score": 7.0,
41
- "answerability_score": 9.0,
42
- "keywords": ["example", "installation", "setup"],
43
- "optimized_content": "Enhanced sample content.",
44
- "improvements_made": ["Improved clarity", "Added keywords"]
45
- },
46
- "qa_results": [
47
- {
48
- "query": "What is SEO?",
49
- "result": "SEO stands for Search Engine Optimization.",
50
- "sources": [
51
- {
52
- "content": "SEO stands for Search Engine Optimization...",
53
- "metadata": {"source": "example.com/page1"}
54
- }
55
- ]
56
- }
57
- ]
58
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
requirements.txt CHANGED
@@ -8,9 +8,3 @@ faiss-cpu
8
  transformers
9
  sentence-transformers
10
  pypdf
11
- beautifulsoup4
12
- requests
13
- numpy
14
- pandas
15
- openpyxl
16
- torch
 
8
  transformers
9
  sentence-transformers
10
  pypdf
 
 
 
 
 
 
utils/chunker.py DELETED
@@ -1,1314 +0,0 @@
1
- """
2
- Vector Chunking and RAG Module
3
- Handles document chunking, vector embeddings, and RAG question-answering
4
- """
5
-
6
- import os
7
- import json
8
- import numpy as np
9
- from typing import Dict, Any, List, Optional, Tuple
10
- from langchain.text_splitter import RecursiveCharacterTextSplitter, CharacterTextSplitter
11
- from langchain.schema import Document
12
- from langchain_community.vectorstores import FAISS, Chroma
13
- from langchain.chains import RetrievalQA, ConversationalRetrievalChain
14
- from langchain.memory import ConversationBufferMemory
15
- from langchain.prompts import PromptTemplate
16
- import tempfile
17
- import shutil
18
-
19
-
20
- class VectorChunker:
21
- """Main class for document chunking and vector operations"""
22
-
23
- def __init__(self, embeddings_model, chunk_size: int = 1000, chunk_overlap: int = 200):
24
- self.embeddings = embeddings_model
25
- self.chunk_size = chunk_size
26
- self.chunk_overlap = chunk_overlap
27
- self.setup_text_splitters()
28
- self.vector_stores = {} # Cache for vector stores
29
-
30
- def setup_text_splitters(self):
31
- """Initialize different text splitting strategies"""
32
-
33
- # Default recursive splitter
34
- self.recursive_splitter = RecursiveCharacterTextSplitter(
35
- chunk_size=self.chunk_size,
36
- chunk_overlap=self.chunk_overlap,
37
- length_function=len,
38
- separators=["\n\n", "\n", " ", ""]
39
- )
40
-
41
- # Character-based splitter
42
- self.character_splitter = CharacterTextSplitter(
43
- chunk_size=self.chunk_size,
44
- chunk_overlap=self.chunk_overlap,
45
- separator="\n\n"
46
- )
47
-
48
- # Semantic splitter for better context preservation
49
- self.semantic_splitter = RecursiveCharacterTextSplitter(
50
- chunk_size=800, # Smaller chunks for better semantic coherence
51
- chunk_overlap=150,
52
- length_function=len,
53
- separators=["\n\n", "\n", ". ", " ", ""]
54
- )
55
-
56
- def chunk_documents(self, documents: List[Document], strategy: str = "recursive") -> List[Document]:
57
- """
58
- Chunk documents using specified strategy
59
-
60
- Args:
61
- documents (List[Document]): List of documents to chunk
62
- strategy (str): Chunking strategy ("recursive", "character", "semantic")
63
-
64
- Returns:
65
- List[Document]: List of chunked documents
66
- """
67
- try:
68
- # Choose splitter based on strategy
69
- if strategy == "character":
70
- splitter = self.character_splitter
71
- elif strategy == "semantic":
72
- splitter = self.semantic_splitter
73
- else:
74
- splitter = self.recursive_splitter
75
-
76
- # Split documents
77
- chunked_docs = []
78
-
79
- for doc in documents:
80
- chunks = splitter.split_documents([doc])
81
-
82
- # Add chunk metadata
83
- for i, chunk in enumerate(chunks):
84
- chunk.metadata.update({
85
- 'chunk_index': i,
86
- 'total_chunks': len(chunks),
87
- 'chunk_strategy': strategy,
88
- 'original_source': doc.metadata.get('source', 'unknown'),
89
- 'chunk_size': len(chunk.page_content),
90
- 'chunk_word_count': len(chunk.page_content.split())
91
- })
92
-
93
- chunked_docs.extend(chunks)
94
-
95
- return chunked_docs
96
-
97
- except Exception as e:
98
- raise Exception(f"Document chunking failed: {str(e)}")
99
-
100
- def create_vector_store(self, documents: List[Document], store_type: str = "faiss",
101
- persist_directory: Optional[str] = None) -> Any:
102
- """
103
- Create vector store from documents
104
-
105
- Args:
106
- documents (List[Document]): Documents to vectorize
107
- store_type (str): Type of vector store ("faiss", "chroma")
108
- persist_directory (str): Optional directory to persist the store
109
-
110
- Returns:
111
- Vector store instance
112
- """
113
- try:
114
- if not documents:
115
- raise ValueError("No documents provided for vector store creation")
116
-
117
- if store_type.lower() == "chroma":
118
- if persist_directory:
119
- vector_store = Chroma.from_documents(
120
- documents=documents,
121
- embedding=self.embeddings,
122
- persist_directory=persist_directory
123
- )
124
- vector_store.persist()
125
- else:
126
- vector_store = Chroma.from_documents(
127
- documents=documents,
128
- embedding=self.embeddings
129
- )
130
- else: # Default to FAISS
131
- vector_store = FAISS.from_documents(
132
- documents=documents,
133
- embedding=self.embeddings
134
- )
135
-
136
- # Save FAISS index if persist directory provided
137
- if persist_directory:
138
- os.makedirs(persist_directory, exist_ok=True)
139
- vector_store.save_local(persist_directory)
140
-
141
- return vector_store
142
-
143
- except Exception as e:
144
- raise Exception(f"Vector store creation failed: {str(e)}")
145
-
146
- def create_qa_chain(self, documents: List[Document], llm, chain_type: str = "stuff") -> RetrievalQA:
147
- """
148
- Create a Question-Answering chain from documents
149
-
150
- Args:
151
- documents (List[Document]): Documents for the knowledge base
152
- llm: Language model for answering questions
153
- chain_type (str): Type of QA chain ("stuff", "map_reduce", "refine")
154
-
155
- Returns:
156
- RetrievalQA: Configured QA chain
157
- """
158
- try:
159
- # Chunk documents
160
- chunked_docs = self.chunk_documents(documents, strategy="semantic")
161
-
162
- # Create vector store
163
- vector_store = self.create_vector_store(chunked_docs, store_type="faiss")
164
-
165
- # Create retriever
166
- retriever = vector_store.as_retriever(
167
- search_type="similarity",
168
- search_kwargs={"k": 4} # Retrieve top 4 most relevant chunks
169
- )
170
-
171
- # Custom prompt for GEO-focused QA
172
- qa_prompt_template = """Use the following pieces of context to answer the question at the end.
173
- If you don't know the answer, just say that you don't know, don't try to make up an answer.
174
- Focus on providing clear, accurate, and complete answers that would be suitable for AI search engines.
175
-
176
- Context:
177
- {context}
178
-
179
- Question: {question}
180
-
181
- Answer:"""
182
-
183
- qa_prompt = PromptTemplate(
184
- template=qa_prompt_template,
185
- input_variables=["context", "question"]
186
- )
187
-
188
- # Create QA chain
189
- qa_chain = RetrievalQA.from_chain_type(
190
- llm=llm,
191
- chain_type=chain_type,
192
- retriever=retriever,
193
- return_source_documents=True,
194
- chain_type_kwargs={"prompt": qa_prompt}
195
- )
196
-
197
- return qa_chain
198
-
199
- except Exception as e:
200
- raise Exception(f"QA chain creation failed: {str(e)}")
201
-
202
- def create_conversational_chain(self, documents: List[Document], llm) -> ConversationalRetrievalChain:
203
- """
204
- Create a conversational retrieval chain with memory
205
-
206
- Args:
207
- documents (List[Document]): Documents for the knowledge base
208
- llm: Language model for conversation
209
-
210
- Returns:
211
- ConversationalRetrievalChain: Configured conversational chain
212
- """
213
- try:
214
- # Chunk documents
215
- chunked_docs = self.chunk_documents(documents, strategy="semantic")
216
-
217
- # Create vector store
218
- vector_store = self.create_vector_store(chunked_docs, store_type="faiss")
219
-
220
- # Create retriever
221
- retriever = vector_store.as_retriever(
222
- search_type="similarity",
223
- search_kwargs={"k": 3}
224
- )
225
-
226
- # Create memory
227
- memory = ConversationBufferMemory(
228
- memory_key="chat_history",
229
- return_messages=True,
230
- output_key="answer"
231
- )
232
-
233
- # Custom prompt for conversational QA
234
- condense_question_prompt = """Given the following conversation and a follow up question,
235
- rephrase the follow up question to be a standalone question that can be understood without the chat history.
236
-
237
- Chat History:
238
- {chat_history}
239
- Follow Up Input: {question}
240
- Standalone question:"""
241
-
242
- # Create conversational chain
243
- conv_chain = ConversationalRetrievalChain.from_llm(
244
- llm=llm,
245
- retriever=retriever,
246
- memory=memory,
247
- return_source_documents=True,
248
- condense_question_prompt=PromptTemplate.from_template(condense_question_prompt)
249
- )
250
-
251
- return conv_chain
252
-
253
- except Exception as e:
254
- raise Exception(f"Conversational chain creation failed: {str(e)}")
255
-
256
- def semantic_search(self, query: str, documents: List[Document], top_k: int = 5) -> List[Dict[str, Any]]:
257
- """
258
- Perform semantic search on documents
259
-
260
- Args:
261
- query (str): Search query
262
- documents (List[Document]): Documents to search
263
- top_k (int): Number of top results to return
264
-
265
- Returns:
266
- List[Dict]: Search results with scores
267
- """
268
- try:
269
- # Chunk documents
270
- chunked_docs = self.chunk_documents(documents, strategy="semantic")
271
-
272
- # Create vector store
273
- vector_store = self.create_vector_store(chunked_docs, store_type="faiss")
274
-
275
- # Perform similarity search with scores
276
- results = vector_store.similarity_search_with_score(query, k=top_k)
277
-
278
- # Format results
279
- formatted_results = []
280
- for doc, score in results:
281
- result = {
282
- 'content': doc.page_content,
283
- 'metadata': doc.metadata,
284
- 'similarity_score': float(score),
285
- 'relevance_rank': len(formatted_results) + 1
286
- }
287
- formatted_results.append(result)
288
-
289
- return formatted_results
290
-
291
- except Exception as e:
292
- raise Exception(f"Semantic search failed: {str(e)}")
293
-
294
- def analyze_document_similarity(self, documents: List[Document]) -> Dict[str, Any]:
295
- """
296
- Analyze similarity between documents
297
-
298
- Args:
299
- documents (List[Document]): Documents to analyze
300
-
301
- Returns:
302
- Dict: Similarity analysis results
303
- """
304
- try:
305
- if len(documents) < 2:
306
- return {'error': 'Need at least 2 documents for similarity analysis'}
307
-
308
- # Chunk documents
309
- chunked_docs = self.chunk_documents(documents, strategy="semantic")
310
-
311
- # Create embeddings for each document
312
- doc_embeddings = []
313
- doc_metadata = []
314
-
315
- for doc in chunked_docs:
316
- # Get embedding for the document
317
- embedding = self.embeddings.embed_query(doc.page_content)
318
- doc_embeddings.append(embedding)
319
- doc_metadata.append({
320
- 'content_preview': doc.page_content[:200] + "...",
321
- 'metadata': doc.metadata,
322
- 'length': len(doc.page_content)
323
- })
324
-
325
- # Calculate pairwise similarities
326
- similarities = []
327
- embeddings_array = np.array(doc_embeddings)
328
-
329
- for i in range(len(embeddings_array)):
330
- for j in range(i + 1, len(embeddings_array)):
331
- # Calculate cosine similarity
332
- similarity = np.dot(embeddings_array[i], embeddings_array[j]) / (
333
- np.linalg.norm(embeddings_array[i]) * np.linalg.norm(embeddings_array[j])
334
- )
335
-
336
- similarities.append({
337
- 'doc_1_index': i,
338
- 'doc_2_index': j,
339
- 'similarity_score': float(similarity),
340
- 'doc_1_preview': doc_metadata[i]['content_preview'],
341
- 'doc_2_preview': doc_metadata[j]['content_preview']
342
- })
343
-
344
- # Sort by similarity score
345
- similarities.sort(key=lambda x: x['similarity_score'], reverse=True)
346
-
347
- # Calculate statistics
348
- similarity_scores = [s['similarity_score'] for s in similarities]
349
-
350
- return {
351
- 'total_comparisons': len(similarities),
352
- 'average_similarity': np.mean(similarity_scores),
353
- 'max_similarity': max(similarity_scores),
354
- 'min_similarity': min(similarity_scores),
355
- 'similarity_distribution': {
356
- 'high_similarity': len([s for s in similarity_scores if s > 0.8]),
357
- 'medium_similarity': len([s for s in similarity_scores if 0.5 < s <= 0.8]),
358
- 'low_similarity': len([s for s in similarity_scores if s <= 0.5])
359
- },
360
- 'top_similar_pairs': similarities[:5],
361
- 'most_dissimilar_pairs': similarities[-3:]
362
- }
363
-
364
- except Exception as e:
365
- return {'error': f"Similarity analysis failed: {str(e)}"}
366
-
367
- def extract_key_passages(self, documents: List[Document], queries: List[str],
368
- passages_per_query: int = 3) -> Dict[str, List[Dict[str, Any]]]:
369
- """
370
- Extract key passages from documents based on multiple queries
371
-
372
- Args:
373
- documents (List[Document]): Documents to search
374
- queries (List[str]): List of queries to search for
375
- passages_per_query (int): Number of passages to extract per query
376
-
377
- Returns:
378
- Dict: Key passages organized by query
379
- """
380
- try:
381
- # Chunk documents
382
- chunked_docs = self.chunk_documents(documents, strategy="semantic")
383
-
384
- # Create vector store
385
- vector_store = self.create_vector_store(chunked_docs, store_type="faiss")
386
-
387
- key_passages = {}
388
-
389
- for query in queries:
390
- # Search for relevant passages
391
- results = vector_store.similarity_search_with_score(query, k=passages_per_query)
392
-
393
- passages = []
394
- for doc, score in results:
395
- passage = {
396
- 'content': doc.page_content,
397
- 'relevance_score': float(score),
398
- 'metadata': doc.metadata,
399
- 'word_count': len(doc.page_content.split()),
400
- 'query_match': query
401
- }
402
- passages.append(passage)
403
-
404
- key_passages[query] = passages
405
-
406
- return key_passages
407
-
408
- except Exception as e:
409
- return {'error': f"Key passage extraction failed: {str(e)}"}
410
-
411
- def optimize_chunking_strategy(self, documents: List[Document],
412
- test_queries: List[str]) -> Dict[str, Any]:
413
- """
414
- Test different chunking strategies and recommend the best one
415
-
416
- Args:
417
- documents (List[Document]): Documents to test
418
- test_queries (List[str]): Queries to test retrieval performance
419
-
420
- Returns:
421
- Dict: Optimization results and recommendations
422
- """
423
- try:
424
- strategies = ["recursive", "character", "semantic"]
425
- strategy_results = {}
426
-
427
- for strategy in strategies:
428
- try:
429
- # Test this strategy
430
- chunked_docs = self.chunk_documents(documents, strategy=strategy)
431
- vector_store = self.create_vector_store(chunked_docs, store_type="faiss")
432
-
433
- # Test retrieval performance
434
- retrieval_scores = []
435
-
436
- for query in test_queries:
437
- results = vector_store.similarity_search_with_score(query, k=3)
438
-
439
- # Calculate average relevance score
440
- if results:
441
- avg_score = sum(score for _, score in results) / len(results)
442
- retrieval_scores.append(float(avg_score))
443
-
444
- # Calculate strategy metrics
445
- avg_retrieval_score = np.mean(retrieval_scores) if retrieval_scores else 0
446
- total_chunks = len(chunked_docs)
447
- avg_chunk_size = np.mean([len(doc.page_content) for doc in chunked_docs])
448
-
449
- strategy_results[strategy] = {
450
- 'average_retrieval_score': avg_retrieval_score,
451
- 'total_chunks': total_chunks,
452
- 'average_chunk_size': avg_chunk_size,
453
- 'retrieval_scores': retrieval_scores,
454
- 'chunk_size_distribution': {
455
- 'min': min(len(doc.page_content) for doc in chunked_docs),
456
- 'max': max(len(doc.page_content) for doc in chunked_docs),
457
- 'std': float(np.std([len(doc.page_content) for doc in chunked_docs]))
458
- }
459
- }
460
-
461
- except Exception as e:
462
- strategy_results[strategy] = {'error': f"Strategy test failed: {str(e)}"}
463
-
464
- # Determine best strategy
465
- valid_strategies = {k: v for k, v in strategy_results.items() if 'error' not in v}
466
-
467
- if valid_strategies:
468
- best_strategy = max(valid_strategies.keys(),
469
- key=lambda k: valid_strategies[k]['average_retrieval_score'])
470
-
471
- recommendation = {
472
- 'recommended_strategy': best_strategy,
473
- 'reason': f"Best average retrieval score: {valid_strategies[best_strategy]['average_retrieval_score']:.4f}",
474
- 'all_results': strategy_results,
475
- 'performance_summary': {
476
- strategy: result.get('average_retrieval_score', 0)
477
- for strategy, result in valid_strategies.items()
478
- }
479
- }
480
- else:
481
- recommendation = {
482
- 'recommended_strategy': 'recursive', # Default fallback
483
- 'reason': 'All strategies failed, using default',
484
- 'all_results': strategy_results
485
- }
486
-
487
- return recommendation
488
-
489
- except Exception as e:
490
- return {'error': f"Chunking optimization failed: {str(e)}"}
491
-
492
- def create_document_summary(self, documents: List[Document], llm,
493
- summary_type: str = "extractive") -> Dict[str, Any]:
494
- """
495
- Create document summaries using the chunked content
496
-
497
- Args:
498
- documents (List[Document]): Documents to summarize
499
- llm: Language model for summarization
500
- summary_type (str): Type of summary ("extractive", "abstractive")
501
-
502
- Returns:
503
- Dict: Summary results
504
- """
505
- try:
506
- # Chunk documents for better processing
507
- chunked_docs = self.chunk_documents(documents, strategy="semantic")
508
-
509
- if summary_type == "extractive":
510
- # Extract key sentences/chunks
511
- return self._create_extractive_summary(chunked_docs)
512
- else:
513
- # Generate abstractive summary using LLM
514
- return self._create_abstractive_summary(chunked_docs, llm)
515
-
516
- except Exception as e:
517
- return {'error': f"Document summarization failed: {str(e)}"}
518
-
519
- def _create_extractive_summary(self, chunked_docs: List[Document]) -> Dict[str, Any]:
520
- """Create extractive summary by selecting key chunks"""
521
- try:
522
- # Simple extractive approach: select chunks with highest semantic density
523
- chunk_scores = []
524
-
525
- for doc in chunked_docs:
526
- content = doc.page_content
527
- # Simple scoring based on content characteristics
528
- word_count = len(content.split())
529
- sentence_count = len([s for s in content.split('.') if s.strip()])
530
-
531
- # Score based on information density
532
- density_score = word_count / max(sentence_count, 1)
533
-
534
- # Bonus for chunks with questions, definitions, or lists
535
- structure_bonus = 0
536
- if '?' in content:
537
- structure_bonus += 1
538
- if any(word in content.lower() for word in ['define', 'definition', 'means', 'refers to']):
539
- structure_bonus += 2
540
- if content.count('\n•') > 0 or content.count('1.') > 0:
541
- structure_bonus += 1
542
-
543
- total_score = density_score + structure_bonus
544
- chunk_scores.append((doc, total_score))
545
-
546
- # Sort by score and select top chunks for summary
547
- chunk_scores.sort(key=lambda x: x[1], reverse=True)
548
- top_chunks = chunk_scores[:min(5, len(chunk_scores))]
549
-
550
- summary_content = []
551
- for doc, score in top_chunks:
552
- summary_content.append({
553
- 'content': doc.page_content,
554
- 'score': score,
555
- 'metadata': doc.metadata
556
- })
557
-
558
- return {
559
- 'summary_type': 'extractive',
560
- 'key_chunks': summary_content,
561
- 'total_chunks_analyzed': len(chunked_docs),
562
- 'chunks_selected': len(top_chunks)
563
- }
564
-
565
- except Exception as e:
566
- return {'error': f"Extractive summary failed: {str(e)}"}
567
-
568
- def _create_abstractive_summary(self, chunked_docs: List[Document], llm) -> Dict[str, Any]:
569
- """Create abstractive summary using language model"""
570
- try:
571
- # Combine content from top chunks
572
- combined_content = "\n\n".join([doc.page_content for doc in chunked_docs[:10]])
573
-
574
- summary_prompt = f"""Please provide a comprehensive summary of the following content.
575
- Focus on the main topics, key insights, and important details that would be valuable for AI search engines.
576
-
577
- Content:
578
- {combined_content[:5000]}
579
-
580
- Summary:"""
581
-
582
- from langchain.prompts import ChatPromptTemplate
583
-
584
- prompt_template = ChatPromptTemplate.from_messages([
585
- ("system", "You are a professional content summarizer. Create clear, informative summaries."),
586
- ("user", summary_prompt)
587
- ])
588
-
589
- chain = prompt_template | llm
590
- result = chain.invoke({})
591
-
592
- summary_text = result.content if hasattr(result, 'content') else str(result)
593
-
594
- return {
595
- 'summary_type': 'abstractive',
596
- 'summary': summary_text,
597
- 'source_chunks': len(chunked_docs),
598
- 'content_length_processed': len(combined_content)
599
- }
600
-
601
- except Exception as e:
602
- return {'error': f"Abstractive summary failed: {str(e)}"}
603
-
604
- def save_vector_store(self, vector_store, directory_path: str, store_type: str = "faiss") -> bool:
605
- """
606
- Save vector store to disk
607
-
608
- Args:
609
- vector_store: Vector store instance to save
610
- directory_path (str): Directory to save the store
611
- store_type (str): Type of vector store
612
-
613
- Returns:
614
- bool: Success status
615
- """
616
- try:
617
- os.makedirs(directory_path, exist_ok=True)
618
-
619
- if store_type.lower() == "faiss":
620
- vector_store.save_local(directory_path)
621
- elif store_type.lower() == "chroma":
622
- # Chroma stores are typically persisted during creation
623
- pass
624
-
625
- return True
626
-
627
- except Exception as e:
628
- print(f"Failed to save vector store: {str(e)}")
629
- return False
630
-
631
- def load_vector_store(self, directory_path: str, store_type: str = "faiss"):
632
- """
633
- Load vector store from disk
634
-
635
- Args:
636
- directory_path (str): Directory containing the saved store
637
- store_type (str): Type of vector store
638
-
639
- Returns:
640
- Vector store instance or None if failed
641
- """
642
- try:
643
- if not os.path.exists(directory_path):
644
- return None
645
-
646
- if store_type.lower() == "faiss":
647
- vector_store = FAISS.load_local(
648
- directory_path,
649
- self.embeddings,
650
- allow_dangerous_deserialization=True
651
- )
652
- return vector_store
653
- elif store_type.lower() == "chroma":
654
- vector_store = Chroma(
655
- persist_directory=directory_path,
656
- embedding_function=self.embeddings
657
- )
658
- return vector_store
659
-
660
- return None
661
-
662
- except Exception as e:
663
- print(f"Failed to load vector store: {str(e)}")
664
- return None
665
-
666
- def get_chunking_stats(self, documents: List[Document], strategy: str = "recursive") -> Dict[str, Any]:
667
- """
668
- Get detailed statistics about document chunking
669
-
670
- Args:
671
- documents (List[Document]): Documents to analyze
672
- strategy (str): Chunking strategy to use
673
-
674
- Returns:
675
- Dict: Detailed chunking statistics
676
- """
677
- try:
678
- # Chunk documents
679
- chunked_docs = self.chunk_documents(documents, strategy=strategy)
680
-
681
- # Calculate statistics
682
- chunk_sizes = [len(doc.page_content) for doc in chunked_docs]
683
- word_counts = [len(doc.page_content.split()) for doc in chunked_docs]
684
-
685
- stats = {
686
- 'strategy_used': strategy,
687
- 'original_documents': len(documents),
688
- 'total_chunks': len(chunked_docs),
689
- 'chunk_size_stats': {
690
- 'min': min(chunk_sizes) if chunk_sizes else 0,
691
- 'max': max(chunk_sizes) if chunk_sizes else 0,
692
- 'mean': np.mean(chunk_sizes) if chunk_sizes else 0,
693
- 'median': np.median(chunk_sizes) if chunk_sizes else 0,
694
- 'std': np.std(chunk_sizes) if chunk_sizes else 0
695
- },
696
- 'word_count_stats': {
697
- 'min': min(word_counts) if word_counts else 0,
698
- 'max': max(word_counts) if word_counts else 0,
699
- 'mean': np.mean(word_counts) if word_counts else 0,
700
- 'median': np.median(word_counts) if word_counts else 0,
701
- 'std': np.std(word_counts) if word_counts else 0
702
- },
703
- 'chunk_distribution': {
704
- 'very_small': len([s for s in chunk_sizes if s < 200]),
705
- 'small': len([s for s in chunk_sizes if 200 <= s < 500]),
706
- 'medium': len([s for s in chunk_sizes if 500 <= s < 1000]),
707
- 'large': len([s for s in chunk_sizes if 1000 <= s < 2000]),
708
- 'very_large': len([s for s in chunk_sizes if s >= 2000])
709
- },
710
- 'overlap_efficiency': self._calculate_overlap_efficiency(chunked_docs),
711
- 'content_coverage': self._calculate_content_coverage(documents, chunked_docs)
712
- }
713
-
714
- return stats
715
-
716
- except Exception as e:
717
- return {'error': f"Chunking statistics failed: {str(e)}"}
718
-
719
- def _calculate_overlap_efficiency(self, chunked_docs: List[Document]) -> float:
720
- """Calculate efficiency of chunk overlaps"""
721
- try:
722
- if len(chunked_docs) < 2:
723
- return 1.0
724
-
725
- total_content_length = sum(len(doc.page_content) for doc in chunked_docs)
726
- unique_content = set()
727
-
728
- # Rough estimate of content uniqueness
729
- for doc in chunked_docs:
730
- words = doc.page_content.split()
731
- for i in range(0, len(words), 10): # Sample every 10th word
732
- unique_content.add(' '.join(words[i:i+10]))
733
-
734
- # Efficiency as ratio of unique content to total content
735
- efficiency = len(unique_content) * 10 / total_content_length if total_content_length > 0 else 0
736
- return min(efficiency, 1.0)
737
-
738
- except Exception:
739
- return 0.5 # Default neutral efficiency
740
-
741
- def _calculate_content_coverage(self, original_docs: List[Document],
742
- chunked_docs: List[Document]) -> float:
743
- """Calculate how well chunks cover original content"""
744
- try:
745
- original_content = ' '.join([doc.page_content for doc in original_docs])
746
- chunked_content = ' '.join([doc.page_content for doc in chunked_docs])
747
-
748
- # Simple coverage metric based on length
749
- coverage = len(chunked_content) / len(original_content) if original_content else 0
750
- return min(coverage, 1.0)
751
-
752
- except Exception:
753
- return 0.0
754
-
755
-
756
- class ChunkingOptimizer:
757
- """Helper class for optimizing chunking parameters"""
758
-
759
- def __init__(self, embeddings_model):
760
- self.embeddings = embeddings_model
761
-
762
- def optimize_chunk_size(self, documents: List[Document], test_queries: List[str],
763
- size_range: Tuple[int, int] = (200, 2000),
764
- step_size: int = 200) -> Dict[str, Any]:
765
- """
766
- Find optimal chunk size for given documents and queries
767
-
768
- Args:
769
- documents (List[Document]): Documents to test
770
- test_queries (List[str]): Queries for testing retrieval
771
- size_range (Tuple[int, int]): Range of chunk sizes to test
772
- step_size (int): Step size for testing
773
-
774
- Returns:
775
- Dict: Optimization results with recommended chunk size
776
- """
777
- try:
778
- results = {}
779
- min_size, max_size = size_range
780
-
781
- for chunk_size in range(min_size, max_size + 1, step_size):
782
- # Test this chunk size
783
- chunker = VectorChunker(self.embeddings, chunk_size=chunk_size)
784
-
785
- try:
786
- chunked_docs = chunker.chunk_documents(documents)
787
- vector_store = chunker.create_vector_store(chunked_docs)
788
-
789
- # Test retrieval performance
790
- retrieval_scores = []
791
- for query in test_queries:
792
- search_results = vector_store.similarity_search_with_score(query, k=3)
793
- if search_results:
794
- avg_score = sum(score for _, score in search_results) / len(search_results)
795
- retrieval_scores.append(float(avg_score))
796
-
797
- avg_performance = np.mean(retrieval_scores) if retrieval_scores else 0
798
-
799
- results[chunk_size] = {
800
- 'average_retrieval_score': avg_performance,
801
- 'total_chunks': len(chunked_docs),
802
- 'retrieval_scores': retrieval_scores
803
- }
804
-
805
- except Exception as e:
806
- results[chunk_size] = {'error': str(e)}
807
-
808
- # Find optimal chunk size
809
- valid_results = {k: v for k, v in results.items() if 'error' not in v}
810
-
811
- if valid_results:
812
- optimal_size = max(valid_results.keys(),
813
- key=lambda k: valid_results[k]['average_retrieval_score'])
814
-
815
- return {
816
- 'optimal_chunk_size': optimal_size,
817
- 'optimal_performance': valid_results[optimal_size]['average_retrieval_score'],
818
- 'all_results': results,
819
- 'performance_trend': self._analyze_performance_trend(valid_results),
820
- 'recommendation': f"Use chunk size {optimal_size} for best retrieval performance"
821
- }
822
- else:
823
- return {
824
- 'error': 'No valid chunk sizes could be tested',
825
- 'all_results': results
826
- }
827
-
828
- except Exception as e:
829
- return {'error': f"Chunk size optimization failed: {str(e)}"}
830
-
831
- def _analyze_performance_trend(self, results: Dict[int, Dict[str, Any]]) -> Dict[str, Any]:
832
- """Analyze performance trend across different chunk sizes"""
833
- try:
834
- sizes = sorted(results.keys())
835
- performances = [results[size]['average_retrieval_score'] for size in sizes]
836
-
837
- # Find trend direction
838
- if len(performances) >= 2:
839
- trend_direction = "increasing" if performances[-1] > performances[0] else "decreasing"
840
- peak_performance = max(performances)
841
- peak_size = sizes[performances.index(peak_performance)]
842
-
843
- return {
844
- 'trend_direction': trend_direction,
845
- 'peak_performance': peak_performance,
846
- 'peak_size': peak_size,
847
- 'performance_range': max(performances) - min(performances),
848
- 'stable_performance': max(performances) - min(performances) < 0.1
849
- }
850
- else:
851
- return {'error': 'Insufficient data for trend analysis'}
852
-
853
- except Exception:
854
- return {'error': 'Trend analysis failed'}
855
-
856
-
857
- class RAGPipeline:
858
- """Complete RAG pipeline for document question-answering"""
859
-
860
- def __init__(self, embeddings_model, llm):
861
- self.embeddings = embeddings_model
862
- self.llm = llm
863
- self.chunker = VectorChunker(embeddings_model)
864
- self.vector_stores = {}
865
- self.qa_chains = {}
866
-
867
- def create_pipeline(self, documents: List[Document], pipeline_id: str,
868
- chunking_strategy: str = "semantic") -> Dict[str, Any]:
869
- """
870
- Create a complete RAG pipeline for documents
871
-
872
- Args:
873
- documents (List[Document]): Documents to process
874
- pipeline_id (str): Unique identifier for this pipeline
875
- chunking_strategy (str): Strategy for document chunking
876
-
877
- Returns:
878
- Dict: Pipeline creation results
879
- """
880
- try:
881
- # Step 1: Chunk documents
882
- chunked_docs = self.chunker.chunk_documents(documents, strategy=chunking_strategy)
883
-
884
- # Step 2: Create vector store
885
- vector_store = self.chunker.create_vector_store(chunked_docs, store_type="faiss")
886
-
887
- # Step 3: Create QA chain
888
- qa_chain = self.chunker.create_qa_chain(documents, self.llm)
889
-
890
- # Store pipeline components
891
- self.vector_stores[pipeline_id] = vector_store
892
- self.qa_chains[pipeline_id] = qa_chain
893
-
894
- # Pipeline statistics
895
- stats = {
896
- 'pipeline_id': pipeline_id,
897
- 'documents_processed': len(documents),
898
- 'chunks_created': len(chunked_docs),
899
- 'chunking_strategy': chunking_strategy,
900
- 'vector_store_type': 'faiss',
901
- 'embedding_model': str(self.embeddings),
902
- 'created_at': self._get_timestamp()
903
- }
904
-
905
- return {
906
- 'success': True,
907
- 'pipeline_stats': stats,
908
- 'chunking_info': self.chunker.get_chunking_stats(documents, chunking_strategy)
909
- }
910
-
911
- except Exception as e:
912
- return {'error': f"Pipeline creation failed: {str(e)}"}
913
-
914
- def query_pipeline(self, pipeline_id: str, query: str,
915
- return_sources: bool = True) -> Dict[str, Any]:
916
- """
917
- Query a created RAG pipeline
918
-
919
- Args:
920
- pipeline_id (str): ID of the pipeline to query
921
- query (str): Question to ask
922
- return_sources (bool): Whether to return source documents
923
-
924
- Returns:
925
- Dict: Query results with answer and sources
926
- """
927
- try:
928
- if pipeline_id not in self.qa_chains:
929
- return {'error': f"Pipeline '{pipeline_id}' not found"}
930
-
931
- qa_chain = self.qa_chains[pipeline_id]
932
-
933
- # Execute query
934
- result = qa_chain({"query": query})
935
-
936
- # Format response
937
- response = {
938
- 'query': query,
939
- 'answer': result.get('result', 'No answer generated'),
940
- 'pipeline_id': pipeline_id,
941
- 'query_timestamp': self._get_timestamp()
942
- }
943
-
944
- # Add source documents if requested
945
- if return_sources and 'source_documents' in result:
946
- sources = []
947
- for i, doc in enumerate(result['source_documents']):
948
- source = {
949
- 'source_index': i,
950
- 'content': doc.page_content,
951
- 'metadata': doc.metadata,
952
- 'relevance_rank': i + 1
953
- }
954
- sources.append(source)
955
-
956
- response['sources'] = sources
957
- response['num_sources'] = len(sources)
958
-
959
- return response
960
-
961
- except Exception as e:
962
- return {'error': f"Pipeline query failed: {str(e)}"}
963
-
964
- def batch_query_pipeline(self, pipeline_id: str, queries: List[str]) -> List[Dict[str, Any]]:
965
- """
966
- Execute multiple queries on a pipeline
967
-
968
- Args:
969
- pipeline_id (str): ID of the pipeline to query
970
- queries (List[str]): List of questions to ask
971
-
972
- Returns:
973
- List[Dict]: List of query results
974
- """
975
- results = []
976
-
977
- for i, query in enumerate(queries):
978
- try:
979
- result = self.query_pipeline(pipeline_id, query, return_sources=False)
980
- result['batch_index'] = i
981
- results.append(result)
982
-
983
- except Exception as e:
984
- results.append({
985
- 'batch_index': i,
986
- 'query': query,
987
- 'error': f"Batch query failed: {str(e)}"
988
- })
989
-
990
- return results
991
-
992
- def evaluate_pipeline(self, pipeline_id: str, test_queries: List[str],
993
- expected_answers: List[str] = None) -> Dict[str, Any]:
994
- """
995
- Evaluate pipeline performance on test queries
996
-
997
- Args:
998
- pipeline_id (str): ID of the pipeline to evaluate
999
- test_queries (List[str]): Test questions
1000
- expected_answers (List[str]): Optional expected answers for comparison
1001
-
1002
- Returns:
1003
- Dict: Evaluation results
1004
- """
1005
- try:
1006
- if pipeline_id not in self.qa_chains:
1007
- return {'error': f"Pipeline '{pipeline_id}' not found"}
1008
-
1009
- evaluation_results = []
1010
- response_times = []
1011
-
1012
- for i, query in enumerate(test_queries):
1013
- import time
1014
- start_time = time.time()
1015
-
1016
- # Execute query
1017
- result = self.query_pipeline(pipeline_id, query, return_sources=True)
1018
-
1019
- end_time = time.time()
1020
- response_time = end_time - start_time
1021
- response_times.append(response_time)
1022
-
1023
- # Evaluate result
1024
- eval_result = {
1025
- 'query_index': i,
1026
- 'query': query,
1027
- 'answer_generated': not result.get('error'),
1028
- 'response_time': response_time,
1029
- 'answer_length': len(result.get('answer', '')),
1030
- 'sources_returned': result.get('num_sources', 0)
1031
- }
1032
-
1033
- # If expected answer provided, calculate similarity
1034
- if expected_answers and i < len(expected_answers):
1035
- expected = expected_answers[i]
1036
- generated = result.get('answer', '')
1037
-
1038
- # Simple similarity metric
1039
- similarity = self._calculate_answer_similarity(expected, generated)
1040
- eval_result['answer_similarity'] = similarity
1041
- eval_result['expected_answer'] = expected
1042
-
1043
- evaluation_results.append(eval_result)
1044
-
1045
- # Calculate aggregate metrics
1046
- successful_queries = len([r for r in evaluation_results if r['answer_generated']])
1047
- avg_response_time = np.mean(response_times) if response_times else 0
1048
-
1049
- if expected_answers:
1050
- similarities = [r.get('answer_similarity', 0) for r in evaluation_results
1051
- if 'answer_similarity' in r]
1052
- avg_similarity = np.mean(similarities) if similarities else 0
1053
- else:
1054
- avg_similarity = None
1055
-
1056
- return {
1057
- 'pipeline_id': pipeline_id,
1058
- 'total_queries': len(test_queries),
1059
- 'successful_queries': successful_queries,
1060
- 'success_rate': successful_queries / len(test_queries) if test_queries else 0,
1061
- 'average_response_time': avg_response_time,
1062
- 'average_answer_similarity': avg_similarity,
1063
- 'detailed_results': evaluation_results,
1064
- 'evaluation_timestamp': self._get_timestamp()
1065
- }
1066
-
1067
- except Exception as e:
1068
- return {'error': f"Pipeline evaluation failed: {str(e)}"}
1069
-
1070
- def _calculate_answer_similarity(self, expected: str, generated: str) -> float:
1071
- """Calculate similarity between expected and generated answers"""
1072
- try:
1073
- # Simple word overlap similarity
1074
- expected_words = set(expected.lower().split())
1075
- generated_words = set(generated.lower().split())
1076
-
1077
- if not expected_words and not generated_words:
1078
- return 1.0
1079
-
1080
- intersection = expected_words.intersection(generated_words)
1081
- union = expected_words.union(generated_words)
1082
-
1083
- return len(intersection) / len(union) if union else 0.0
1084
-
1085
- except Exception:
1086
- return 0.0
1087
-
1088
- def get_pipeline_info(self, pipeline_id: str) -> Dict[str, Any]:
1089
- """Get information about a specific pipeline"""
1090
- try:
1091
- if pipeline_id not in self.qa_chains:
1092
- return {'error': f"Pipeline '{pipeline_id}' not found"}
1093
-
1094
- # Get vector store info
1095
- vector_store = self.vector_stores.get(pipeline_id)
1096
- if vector_store:
1097
- try:
1098
- # Try to get vector store statistics
1099
- total_vectors = vector_store.index.ntotal if hasattr(vector_store, 'index') else 'unknown'
1100
- except:
1101
- total_vectors = 'unknown'
1102
- else:
1103
- total_vectors = 'unknown'
1104
-
1105
- return {
1106
- 'pipeline_id': pipeline_id,
1107
- 'has_qa_chain': pipeline_id in self.qa_chains,
1108
- 'has_vector_store': pipeline_id in self.vector_stores,
1109
- 'total_vectors': total_vectors,
1110
- 'embedding_model': str(self.embeddings),
1111
- 'llm_model': str(self.llm)
1112
- }
1113
-
1114
- except Exception as e:
1115
- return {'error': f"Failed to get pipeline info: {str(e)}"}
1116
-
1117
- def list_pipelines(self) -> Dict[str, Any]:
1118
- """List all created pipelines"""
1119
- return {
1120
- 'total_pipelines': len(self.qa_chains),
1121
- 'pipeline_ids': list(self.qa_chains.keys()),
1122
- 'vector_stores': list(self.vector_stores.keys())
1123
- }
1124
-
1125
- def delete_pipeline(self, pipeline_id: str) -> Dict[str, Any]:
1126
- """Delete a pipeline and free resources"""
1127
- try:
1128
- deleted_components = []
1129
-
1130
- if pipeline_id in self.qa_chains:
1131
- del self.qa_chains[pipeline_id]
1132
- deleted_components.append('qa_chain')
1133
-
1134
- if pipeline_id in self.vector_stores:
1135
- del self.vector_stores[pipeline_id]
1136
- deleted_components.append('vector_store')
1137
-
1138
- if deleted_components:
1139
- return {
1140
- 'success': True,
1141
- 'pipeline_id': pipeline_id,
1142
- 'deleted_components': deleted_components
1143
- }
1144
- else:
1145
- return {'error': f"Pipeline '{pipeline_id}' not found"}
1146
-
1147
- except Exception as e:
1148
- return {'error': f"Pipeline deletion failed: {str(e)}"}
1149
-
1150
- def export_pipeline_config(self, pipeline_id: str) -> Dict[str, Any]:
1151
- """Export pipeline configuration for recreation"""
1152
- try:
1153
- if pipeline_id not in self.qa_chains:
1154
- return {'error': f"Pipeline '{pipeline_id}' not found"}
1155
-
1156
- config = {
1157
- 'pipeline_id': pipeline_id,
1158
- 'embedding_model_name': getattr(self.embeddings, 'model_name', 'unknown'),
1159
- 'llm_model_name': getattr(self.llm, 'model_name', 'unknown'),
1160
- 'chunker_config': {
1161
- 'chunk_size': self.chunker.chunk_size,
1162
- 'chunk_overlap': self.chunker.chunk_overlap
1163
- },
1164
- 'export_timestamp': self._get_timestamp(),
1165
- 'vector_store_type': 'faiss'
1166
- }
1167
-
1168
- return config
1169
-
1170
- except Exception as e:
1171
- return {'error': f"Pipeline export failed: {str(e)}"}
1172
-
1173
- def _get_timestamp(self) -> str:
1174
- """Get current timestamp"""
1175
- from datetime import datetime
1176
- return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
1177
-
1178
-
1179
- # Utility functions for the module
1180
-
1181
- def optimize_rag_pipeline(documents: List[Document], embeddings_model, llm,
1182
- test_queries: List[str]) -> Dict[str, Any]:
1183
- """
1184
- Optimize RAG pipeline configuration for given documents and queries
1185
-
1186
- Args:
1187
- documents (List[Document]): Documents to optimize for
1188
- embeddings_model: Embedding model to use
1189
- llm: Language model to use
1190
- test_queries (List[str]): Test queries for optimization
1191
-
1192
- Returns:
1193
- Dict: Optimization recommendations
1194
- """
1195
- try:
1196
- # Test different chunking strategies
1197
- chunker = VectorChunker(embeddings_model)
1198
- chunking_results = chunker.optimize_chunking_strategy(documents, test_queries)
1199
-
1200
- # Test different chunk sizes
1201
- optimizer = ChunkingOptimizer(embeddings_model)
1202
- size_results = optimizer.optimize_chunk_size(documents, test_queries)
1203
-
1204
- # Create optimized pipeline
1205
- best_strategy = chunking_results.get('recommended_strategy', 'semantic')
1206
- best_size = size_results.get('optimal_chunk_size', 1000)
1207
-
1208
- # Create optimized chunker
1209
- optimized_chunker = VectorChunker(
1210
- embeddings_model,
1211
- chunk_size=best_size,
1212
- chunk_overlap=best_size // 5 # 20% overlap
1213
- )
1214
-
1215
- # Test the optimized configuration
1216
- pipeline = RAGPipeline(embeddings_model, llm)
1217
- pipeline.chunker = optimized_chunker
1218
-
1219
- test_pipeline_id = "optimization_test"
1220
- creation_result = pipeline.create_pipeline(documents, test_pipeline_id, best_strategy)
1221
-
1222
- if not creation_result.get('error'):
1223
- evaluation_result = pipeline.evaluate_pipeline(test_pipeline_id, test_queries)
1224
- pipeline.delete_pipeline(test_pipeline_id) # Clean up
1225
- else:
1226
- evaluation_result = {'error': 'Could not evaluate optimized pipeline'}
1227
-
1228
- return {
1229
- 'optimization_complete': True,
1230
- 'recommended_config': {
1231
- 'chunking_strategy': best_strategy,
1232
- 'chunk_size': best_size,
1233
- 'chunk_overlap': best_size // 5
1234
- },
1235
- 'chunking_optimization': chunking_results,
1236
- 'size_optimization': size_results,
1237
- 'performance_evaluation': evaluation_result,
1238
- 'recommendations': [
1239
- f"Use {best_strategy} chunking strategy",
1240
- f"Set chunk size to {best_size} characters",
1241
- f"Use {best_size // 5} character overlap",
1242
- "Monitor and adjust based on query performance"
1243
- ]
1244
- }
1245
-
1246
- except Exception as e:
1247
- return {'error': f"RAG optimization failed: {str(e)}"}
1248
-
1249
-
1250
- def create_demo_rag_system(sample_documents: List[Document], embeddings_model, llm) -> Dict[str, Any]:
1251
- """
1252
- Create a demonstration RAG system with sample documents
1253
-
1254
- Args:
1255
- sample_documents (List[Document]): Sample documents for demo
1256
- embeddings_model: Embedding model
1257
- llm: Language model
1258
-
1259
- Returns:
1260
- Dict: Demo system information and sample interactions
1261
- """
1262
- try:
1263
- # Create RAG pipeline
1264
- pipeline = RAGPipeline(embeddings_model, llm)
1265
- demo_id = "demo_system"
1266
-
1267
- # Create the pipeline
1268
- creation_result = pipeline.create_pipeline(sample_documents, demo_id, "semantic")
1269
-
1270
- if creation_result.get('error'):
1271
- return {'error': f"Demo system creation failed: {creation_result['error']}"}
1272
-
1273
- # Sample queries for demonstration
1274
- demo_queries = [
1275
- "What is the main topic of these documents?",
1276
- "Can you summarize the key points?",
1277
- "What are the most important concepts mentioned?"
1278
- ]
1279
-
1280
- # Execute demo queries
1281
- demo_results = []
1282
- for query in demo_queries:
1283
- result = pipeline.query_pipeline(demo_id, query, return_sources=True)
1284
- demo_results.append(result)
1285
-
1286
- # Get system statistics
1287
- pipeline_info = pipeline.get_pipeline_info(demo_id)
1288
-
1289
- return {
1290
- 'demo_system_created': True,
1291
- 'pipeline_id': demo_id,
1292
- 'creation_stats': creation_result,
1293
- 'pipeline_info': pipeline_info,
1294
- 'demo_queries': demo_queries,
1295
- 'demo_results': demo_results,
1296
- 'usage_instructions': [
1297
- f"Use pipeline.query_pipeline('{demo_id}', 'your question') to ask questions",
1298
- "The system will return answers with source document references",
1299
- "Sources show which parts of the documents were used for the answer"
1300
- ]
1301
- }
1302
-
1303
- except Exception as e:
1304
- return {'error': f"Demo system creation failed: {str(e)}"}
1305
-
1306
-
1307
- # Export the main classes for use in other modules
1308
- __all__ = [
1309
- 'VectorChunker',
1310
- 'ChunkingOptimizer',
1311
- 'RAGPipeline',
1312
- 'optimize_rag_pipeline',
1313
- 'create_demo_rag_system'
1314
- ]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
utils/export.py DELETED
@@ -1,1896 +0,0 @@
1
- """
2
- Results Export and Reporting Module
3
- Handles export of analysis results, reports, and data for external use
4
- """
5
-
6
- import json
7
- import csv
8
- import io
9
- import zipfile
10
- import tempfile
11
- import os
12
- from datetime import datetime
13
- from typing import Dict, Any, List, Optional, Union
14
- import pandas as pd
15
- from dataclasses import dataclass, asdict
16
-
17
-
18
- @dataclass
19
- class GEOReport:
20
- """Data class for GEO analysis reports"""
21
- website_url: str
22
- analysis_date: str
23
- overall_score: float
24
- pages_analyzed: int
25
- geo_scores: Dict[str, float]
26
- recommendations: List[str]
27
- optimization_opportunities: List[Dict[str, Any]]
28
- competitive_position: str
29
-
30
- def to_dict(self) -> Dict[str, Any]:
31
- """Convert report to dictionary"""
32
- return asdict(self)
33
-
34
-
35
- @dataclass
36
- class ContentAnalysis:
37
- """Data class for content optimization analysis"""
38
- original_content: str
39
- analysis_date: str
40
- clarity_score: float
41
- structure_score: float
42
- answerability_score: float
43
- keywords: List[str]
44
- optimized_content: Optional[str]
45
- improvements_made: List[str]
46
-
47
- def to_dict(self) -> Dict[str, Any]:
48
- """Convert analysis to dictionary"""
49
- return asdict(self)
50
-
51
-
52
- class ResultExporter:
53
- """Main class for exporting analysis results and generating reports"""
54
-
55
- def __init__(self):
56
- self.export_formats = ['json', 'csv', 'html', 'pdf', 'xlsx']
57
- self.supported_types = ['geo_analysis', 'content_optimization', 'qa_results', 'batch_analysis']
58
-
59
- def export_geo_results(self, geo_results: List[Dict[str, Any]],
60
- website_url: str, format_type: str = 'json') -> Union[str, bytes, Dict[str, Any]]:
61
- """
62
- Export GEO analysis results in specified format
63
-
64
- Args:
65
- geo_results (List[Dict]): List of GEO analysis results
66
- website_url (str): URL of analyzed website
67
- format_type (str): Export format ('json', 'csv', 'html', 'xlsx')
68
-
69
- Returns:
70
- Union[str, bytes, Dict]: Exported data in requested format
71
- """
72
- try:
73
- # Prepare consolidated data
74
- export_data = self._prepare_geo_export_data(geo_results, website_url)
75
-
76
- if format_type.lower() == 'json':
77
- return self._export_geo_json(export_data)
78
- elif format_type.lower() == 'csv':
79
- return self._export_geo_csv(export_data)
80
- elif format_type.lower() == 'html':
81
- return self._export_geo_html(export_data)
82
- elif format_type.lower() == 'xlsx':
83
- return self._export_geo_excel(export_data)
84
- elif format_type.lower() == 'pdf':
85
- return self._export_geo_pdf(export_data)
86
- else:
87
- raise ValueError(f"Unsupported export format: {format_type}")
88
-
89
- except Exception as e:
90
- return {'error': f"Export failed: {str(e)}"}
91
-
92
- def export_enhancement_results(self, enhancement_result: Dict[str, Any],
93
- format_type: str = 'json') -> Union[str, bytes, Dict[str, Any]]:
94
- """
95
- Export content enhancement results
96
-
97
- Args:
98
- enhancement_result (Dict): Content enhancement analysis result
99
- format_type (str): Export format
100
-
101
- Returns:
102
- Union[str, bytes, Dict]: Exported data
103
- """
104
- try:
105
- # Prepare data for export
106
- export_data = self._prepare_enhancement_export_data(enhancement_result)
107
-
108
- if format_type.lower() == 'json':
109
- return json.dumps(export_data, indent=2, ensure_ascii=False)
110
- elif format_type.lower() == 'html':
111
- return self._export_enhancement_html(export_data)
112
- elif format_type.lower() == 'csv':
113
- return self._export_enhancement_csv(export_data)
114
- else:
115
- return json.dumps(export_data, indent=2, ensure_ascii=False)
116
-
117
- except Exception as e:
118
- return {'error': f"Enhancement export failed: {str(e)}"}
119
-
120
- def export_qa_results(self, qa_results: List[Dict[str, Any]],
121
- format_type: str = 'json') -> Union[str, bytes, Dict[str, Any]]:
122
- """
123
- Export Q&A session results
124
-
125
- Args:
126
- qa_results (List[Dict]): List of Q&A interactions
127
- format_type (str): Export format
128
-
129
- Returns:
130
- Union[str, bytes, Dict]: Exported data
131
- """
132
- try:
133
- export_data = {
134
- 'qa_session': {
135
- 'session_date': datetime.now().isoformat(),
136
- 'total_questions': len(qa_results),
137
- 'interactions': qa_results
138
- },
139
- 'summary': {
140
- 'successful_answers': len([r for r in qa_results if not r.get('error')]),
141
- 'average_response_length': self._calculate_avg_response_length(qa_results),
142
- 'most_common_topics': self._extract_common_topics(qa_results)
143
- }
144
- }
145
-
146
- if format_type.lower() == 'json':
147
- return json.dumps(export_data, indent=2, ensure_ascii=False)
148
- elif format_type.lower() == 'html':
149
- return self._export_qa_html(export_data)
150
- elif format_type.lower() == 'csv':
151
- return self._export_qa_csv(export_data)
152
- else:
153
- return json.dumps(export_data, indent=2, ensure_ascii=False)
154
-
155
- except Exception as e:
156
- return {'error': f"Q&A export failed: {str(e)}"}
157
-
158
- def create_comprehensive_report(self, analysis_data: Dict[str, Any],
159
- report_type: str = 'full') -> Dict[str, Any]:
160
- """
161
- Create comprehensive analysis report
162
-
163
- Args:
164
- analysis_data (Dict): Combined analysis data from multiple sources
165
- report_type (str): Type of report ('full', 'summary', 'executive')
166
-
167
- Returns:
168
- Dict: Comprehensive report data
169
- """
170
- try:
171
- report = {
172
- 'report_metadata': {
173
- 'generated_at': datetime.now().isoformat(),
174
- 'report_type': report_type,
175
- 'generator': 'GEO SEO AI Optimizer',
176
- 'version': '1.0'
177
- }
178
- }
179
-
180
- if report_type == 'executive':
181
- report.update(self._create_executive_summary(analysis_data))
182
- elif report_type == 'summary':
183
- report.update(self._create_summary_report(analysis_data))
184
- else: # full report
185
- report.update(self._create_full_report(analysis_data))
186
-
187
- return report
188
-
189
- except Exception as e:
190
- return {'error': f"Report creation failed: {str(e)}"}
191
-
192
- def export_batch_results(self, batch_results: List[Dict[str, Any]],
193
- batch_metadata: Dict[str, Any],
194
- format_type: str = 'xlsx') -> Union[str, bytes, Dict[str, Any]]:
195
- """
196
- Export batch analysis results
197
-
198
- Args:
199
- batch_results (List[Dict]): List of batch analysis results
200
- batch_metadata (Dict): Metadata about the batch process
201
- format_type (str): Export format
202
-
203
- Returns:
204
- Union[str, bytes, Dict]: Exported batch data
205
- """
206
- try:
207
- export_data = {
208
- 'batch_metadata': batch_metadata,
209
- 'batch_results': batch_results,
210
- 'batch_summary': self._create_batch_summary(batch_results),
211
- 'export_timestamp': datetime.now().isoformat()
212
- }
213
-
214
- if format_type.lower() == 'xlsx':
215
- return self._export_batch_excel(export_data)
216
- elif format_type.lower() == 'json':
217
- return json.dumps(export_data, indent=2, ensure_ascii=False)
218
- elif format_type.lower() == 'csv':
219
- return self._export_batch_csv(export_data)
220
- else:
221
- return json.dumps(export_data, indent=2, ensure_ascii=False)
222
-
223
- except Exception as e:
224
- return {'error': f"Batch export failed: {str(e)}"}
225
-
226
- def create_export_package(self, analysis_data: Dict[str, Any],
227
- package_name: str = "geo_analysis") -> bytes:
228
- """
229
- Create a ZIP package with multiple export formats
230
-
231
- Args:
232
- analysis_data (Dict): Analysis data to package
233
- package_name (str): Name for the package
234
-
235
- Returns:
236
- bytes: ZIP file content
237
- """
238
- try:
239
- # Create temporary directory
240
- with tempfile.TemporaryDirectory() as temp_dir:
241
- zip_path = os.path.join(temp_dir, f"{package_name}.zip")
242
-
243
- with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zip_file:
244
- # Add JSON export
245
- json_data = json.dumps(analysis_data, indent=2, ensure_ascii=False)
246
- zip_file.writestr(f"{package_name}.json", json_data)
247
-
248
- # Add HTML report
249
- if 'geo_results' in analysis_data:
250
- html_data = self._export_geo_html(analysis_data)
251
- zip_file.writestr(f"{package_name}_report.html", html_data)
252
-
253
- # Add CSV data
254
- if 'geo_results' in analysis_data:
255
- csv_data = self._export_geo_csv(analysis_data)
256
- zip_file.writestr(f"{package_name}_data.csv", csv_data)
257
-
258
- # Add README
259
- readme_content = self._generate_package_readme(analysis_data)
260
- zip_file.writestr("README.txt", readme_content)
261
-
262
- # Read the ZIP file
263
- with open(zip_path, 'rb') as zip_file:
264
- return zip_file.read()
265
-
266
- except Exception as e:
267
- raise Exception(f"Package creation failed: {str(e)}")
268
-
269
- def _prepare_geo_export_data(self, geo_results: List[Dict[str, Any]], website_url: str) -> Dict[str, Any]:
270
- """Prepare GEO data for export"""
271
- try:
272
- # Calculate aggregate metrics
273
- valid_results = [r for r in geo_results if 'geo_scores' in r and not r.get('error')]
274
-
275
- if not valid_results:
276
- return {
277
- 'error': 'No valid GEO results to export',
278
- 'website_url': website_url,
279
- 'export_timestamp': datetime.now().isoformat()
280
- }
281
-
282
- # Aggregate scores
283
- all_scores = {}
284
- for result in valid_results:
285
- for metric, score in result.get('geo_scores', {}).items():
286
- if metric not in all_scores:
287
- all_scores[metric] = []
288
- all_scores[metric].append(score)
289
-
290
- avg_scores = {metric: sum(scores) / len(scores) for metric, scores in all_scores.items()}
291
- overall_avg = sum(avg_scores.values()) / len(avg_scores) if avg_scores else 0
292
-
293
- # Collect recommendations
294
- all_recommendations = []
295
- all_opportunities = []
296
-
297
- for result in valid_results:
298
- all_recommendations.extend(result.get('recommendations', []))
299
- all_opportunities.extend(result.get('optimization_opportunities', []))
300
-
301
- # Remove duplicates
302
- unique_recommendations = list(set(all_recommendations))
303
-
304
- return {
305
- 'website_analysis': {
306
- 'url': website_url,
307
- 'analysis_date': datetime.now().isoformat(),
308
- 'pages_analyzed': len(valid_results),
309
- 'overall_geo_score': round(overall_avg, 2)
310
- },
311
- 'aggregate_scores': avg_scores,
312
- 'individual_page_results': valid_results,
313
- 'recommendations': unique_recommendations[:10], # Top 10
314
- 'optimization_opportunities': all_opportunities,
315
- 'performance_insights': self._generate_performance_insights(avg_scores, overall_avg),
316
- 'export_metadata': {
317
- 'exported_by': 'GEO SEO AI Optimizer',
318
- 'export_timestamp': datetime.now().isoformat(),
319
- 'data_format': 'GEO Analysis Results v1.0'
320
- }
321
- }
322
-
323
- except Exception as e:
324
- return {'error': f"Data preparation failed: {str(e)}"}
325
-
326
- def _prepare_enhancement_export_data(self, enhancement_result: Dict[str, Any]) -> Dict[str, Any]:
327
- """Prepare content enhancement data for export"""
328
- try:
329
- scores = enhancement_result.get('scores', {})
330
-
331
- return {
332
- 'content_analysis': {
333
- 'analysis_date': datetime.now().isoformat(),
334
- 'original_content_length': enhancement_result.get('original_length', 0),
335
- 'original_word_count': enhancement_result.get('original_word_count', 0),
336
- 'analysis_type': enhancement_result.get('optimization_type', 'standard')
337
- },
338
- 'performance_scores': {
339
- 'clarity': scores.get('clarity', 0),
340
- 'structure': scores.get('structuredness', 0),
341
- 'answerability': scores.get('answerability', 0),
342
- 'overall_average': sum(scores.values()) / len(scores) if scores else 0
343
- },
344
- 'optimization_results': {
345
- 'keywords_identified': enhancement_result.get('keywords', []),
346
- 'optimized_content': enhancement_result.get('optimized_text', ''),
347
- 'improvements_made': enhancement_result.get('optimization_suggestions', []),
348
- 'analyze_only': enhancement_result.get('analyze_only', False)
349
- },
350
- 'export_metadata': {
351
- 'exported_by': 'GEO SEO AI Optimizer',
352
- 'export_timestamp': datetime.now().isoformat(),
353
- 'data_format': 'Content Enhancement Results v1.0'
354
- }
355
- }
356
-
357
- except Exception as e:
358
- return {'error': f"Enhancement data preparation failed: {str(e)}"}
359
-
360
- def _export_geo_json(self, data: Dict[str, Any]) -> str:
361
- """Export GEO data as JSON"""
362
- return json.dumps(data, indent=2, ensure_ascii=False)
363
-
364
- def _export_geo_csv(self, data: Dict[str, Any]) -> str:
365
- """Export GEO data as CSV"""
366
- try:
367
- output = io.StringIO()
368
-
369
- # Write aggregate scores
370
- writer = csv.writer(output)
371
- writer.writerow(['GEO Analysis Results'])
372
- writer.writerow(['Website:', data.get('website_analysis', {}).get('url', 'Unknown')])
373
- writer.writerow(['Analysis Date:', data.get('website_analysis', {}).get('analysis_date', 'Unknown')])
374
- writer.writerow(['Overall Score:', data.get('website_analysis', {}).get('overall_geo_score', 0)])
375
- writer.writerow([])
376
-
377
- # Write aggregate scores
378
- writer.writerow(['Metric', 'Score'])
379
- for metric, score in data.get('aggregate_scores', {}).items():
380
- writer.writerow([metric.replace('_', ' ').title(), round(score, 2)])
381
-
382
- writer.writerow([])
383
- writer.writerow(['Recommendations'])
384
- for i, rec in enumerate(data.get('recommendations', []), 1):
385
- writer.writerow([f"{i}.", rec])
386
-
387
- # Individual page results
388
- if data.get('individual_page_results'):
389
- writer.writerow([])
390
- writer.writerow(['Individual Page Results'])
391
-
392
- # Header for page results
393
- first_result = data['individual_page_results'][0]
394
- if 'geo_scores' in first_result:
395
- headers = ['Page Index', 'Page URL', 'Page Title'] + list(first_result['geo_scores'].keys())
396
- writer.writerow(headers)
397
-
398
- for i, result in enumerate(data['individual_page_results']):
399
- page_data = result.get('page_data', {})
400
- scores = result.get('geo_scores', {})
401
-
402
- row = [
403
- i + 1,
404
- page_data.get('url', 'Unknown'),
405
- page_data.get('title', 'Unknown')
406
- ] + [round(scores.get(metric, 0), 2) for metric in headers[3:]]
407
-
408
- writer.writerow(row)
409
-
410
- return output.getvalue()
411
-
412
- except Exception as e:
413
- return f"CSV export error: {str(e)}"
414
-
415
- def _export_geo_html(self, data: Dict[str, Any]) -> str:
416
- """Export GEO data as HTML report"""
417
- try:
418
- website_info = data.get('website_analysis', {})
419
- scores = data.get('aggregate_scores', {})
420
- recommendations = data.get('recommendations', [])
421
-
422
- html_content = f"""
423
- <!DOCTYPE html>
424
- <html lang="en">
425
- <head>
426
- <meta charset="UTF-8">
427
- <meta name="viewport" content="width=device-width, initial-scale=1.0">
428
- <title>GEO Analysis Report - {website_info.get('url', 'Website')}</title>
429
- <style>
430
- body {{
431
- font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
432
- line-height: 1.6;
433
- color: #333;
434
- max-width: 1200px;
435
- margin: 0 auto;
436
- padding: 20px;
437
- background-color: #f5f5f5;
438
- }}
439
- .header {{
440
- background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
441
- color: white;
442
- padding: 30px;
443
- border-radius: 10px;
444
- margin-bottom: 30px;
445
- text-align: center;
446
- }}
447
- .header h1 {{
448
- margin: 0;
449
- font-size: 2.5em;
450
- }}
451
- .summary-cards {{
452
- display: grid;
453
- grid-template-columns: repeat(auto-fit, minmax(250px, 1fr));
454
- gap: 20px;
455
- margin-bottom: 30px;
456
- }}
457
- .card {{
458
- background: white;
459
- padding: 20px;
460
- border-radius: 10px;
461
- box-shadow: 0 4px 6px rgba(0,0,0,0.1);
462
- text-align: center;
463
- }}
464
- .card h3 {{
465
- margin-top: 0;
466
- color: #667eea;
467
- }}
468
- .score {{
469
- font-size: 2em;
470
- font-weight: bold;
471
- color: #333;
472
- }}
473
- .scores-grid {{
474
- display: grid;
475
- grid-template-columns: repeat(auto-fit, minmax(300px, 1fr));
476
- gap: 20px;
477
- margin-bottom: 30px;
478
- }}
479
- .score-item {{
480
- background: white;
481
- padding: 15px;
482
- border-radius: 8px;
483
- box-shadow: 0 2px 4px rgba(0,0,0,0.1);
484
- display: flex;
485
- justify-content: space-between;
486
- align-items: center;
487
- }}
488
- .score-bar {{
489
- width: 100px;
490
- height: 10px;
491
- background: #e0e0e0;
492
- border-radius: 5px;
493
- overflow: hidden;
494
- }}
495
- .score-fill {{
496
- height: 100%;
497
- background: linear-gradient(90deg, #ff6b6b, #ffa500, #4ecdc4);
498
- transition: width 0.3s ease;
499
- }}
500
- .recommendations {{
501
- background: white;
502
- padding: 30px;
503
- border-radius: 10px;
504
- box-shadow: 0 4px 6px rgba(0,0,0,0.1);
505
- margin-bottom: 30px;
506
- }}
507
- .recommendations h2 {{
508
- color: #667eea;
509
- border-bottom: 2px solid #667eea;
510
- padding-bottom: 10px;
511
- }}
512
- .rec-item {{
513
- padding: 10px 0;
514
- border-bottom: 1px solid #eee;
515
- }}
516
- .footer {{
517
- text-align: center;
518
- color: #666;
519
- margin-top: 40px;
520
- padding-top: 20px;
521
- border-top: 1px solid #ddd;
522
- }}
523
- </style>
524
- </head>
525
- <body>
526
- <div class="header">
527
- <h1>🚀 GEO Analysis Report</h1>
528
- <p>Generative Engine Optimization Performance Analysis</p>
529
- <p><strong>Website:</strong> {website_info.get('url', 'Not specified')}</p>
530
- <p><strong>Analysis Date:</strong> {website_info.get('analysis_date', 'Not specified')}</p>
531
- </div>
532
-
533
- <div class="summary-cards">
534
- <div class="card">
535
- <h3>Overall GEO Score</h3>
536
- <div class="score">{website_info.get('overall_geo_score', 0)}/10</div>
537
- </div>
538
- <div class="card">
539
- <h3>Pages Analyzed</h3>
540
- <div class="score">{website_info.get('pages_analyzed', 0)}</div>
541
- </div>
542
- <div class="card">
543
- <h3>Recommendations</h3>
544
- <div class="score">{len(recommendations)}</div>
545
- </div>
546
- </div>
547
-
548
- <h2>📊 Detailed GEO Metrics</h2>
549
- <div class="scores-grid">
550
- """
551
-
552
- # Add individual scores
553
- for metric, score in scores.items():
554
- metric_display = metric.replace('_', ' ').title()
555
- score_percentage = min(score * 10, 100) # Convert to percentage
556
-
557
- html_content += f"""
558
- <div class="score-item">
559
- <div>
560
- <strong>{metric_display}</strong><br>
561
- <span style="color: #666;">{score:.1f}/10</span>
562
- </div>
563
- <div class="score-bar">
564
- <div class="score-fill" style="width: {score_percentage}%;"></div>
565
- </div>
566
- </div>
567
- """
568
-
569
- html_content += """
570
- </div>
571
-
572
- <div class="recommendations">
573
- <h2>💡 Optimization Recommendations</h2>
574
- """
575
-
576
- # Add recommendations
577
- for i, rec in enumerate(recommendations, 1):
578
- html_content += f'<div class="rec-item"><strong>{i}.</strong> {rec}</div>'
579
-
580
- html_content += f"""
581
- </div>
582
-
583
- <div class="footer">
584
- <p>Generated by GEO SEO AI Optimizer | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
585
- <p>This report provides AI-first SEO optimization insights for better generative engine performance.</p>
586
- </div>
587
- </body>
588
- </html>
589
- """
590
-
591
- return html_content
592
-
593
- except Exception as e:
594
- return f"<html><body><h1>HTML Export Error</h1><p>{str(e)}</p></body></html>"
595
-
596
- def _export_geo_excel(self, data: Dict[str, Any]) -> bytes:
597
- """Export GEO data as Excel file"""
598
- try:
599
- output = io.BytesIO()
600
-
601
- with pd.ExcelWriter(output, engine='openpyxl') as writer:
602
- # Summary sheet
603
- summary_data = {
604
- 'Metric': ['Website URL', 'Analysis Date', 'Pages Analyzed', 'Overall Score'],
605
- 'Value': [
606
- data.get('website_analysis', {}).get('url', 'Unknown'),
607
- data.get('website_analysis', {}).get('analysis_date', 'Unknown'),
608
- data.get('website_analysis', {}).get('pages_analyzed', 0),
609
- data.get('website_analysis', {}).get('overall_geo_score', 0)
610
- ]
611
- }
612
- pd.DataFrame(summary_data).to_excel(writer, sheet_name='Summary', index=False)
613
-
614
- # Scores sheet
615
- scores_data = []
616
- for metric, score in data.get('aggregate_scores', {}).items():
617
- scores_data.append({
618
- 'Metric': metric.replace('_', ' ').title(),
619
- 'Score': round(score, 2),
620
- 'Performance': self._get_performance_level(score)
621
- })
622
-
623
- pd.DataFrame(scores_data).to_excel(writer, sheet_name='GEO Scores', index=False)
624
-
625
- # Recommendations sheet
626
- rec_data = []
627
- for i, rec in enumerate(data.get('recommendations', []), 1):
628
- rec_data.append({
629
- 'Priority': i,
630
- 'Recommendation': rec,
631
- 'Category': self._categorize_recommendation(rec)
632
- })
633
-
634
- if rec_data:
635
- pd.DataFrame(rec_data).to_excel(writer, sheet_name='Recommendations', index=False)
636
-
637
- # Individual pages sheet
638
- if data.get('individual_page_results'):
639
- pages_data = []
640
- for i, result in enumerate(data['individual_page_results']):
641
- page_data = result.get('page_data', {})
642
- scores = result.get('geo_scores', {})
643
-
644
- page_row = {
645
- 'Page_Index': i + 1,
646
- 'URL': page_data.get('url', 'Unknown'),
647
- 'Title': page_data.get('title', 'Unknown'),
648
- 'Word_Count': page_data.get('word_count', 0)
649
- }
650
-
651
- # Add all GEO scores
652
- for metric, score in scores.items():
653
- page_row[metric.replace('_', ' ').title()] = round(score, 2)
654
-
655
- pages_data.append(page_row)
656
-
657
- pd.DataFrame(pages_data).to_excel(writer, sheet_name='Individual Pages', index=False)
658
-
659
- output.seek(0)
660
- return output.getvalue()
661
-
662
- except Exception as e:
663
- # Return error as text file if Excel creation fails
664
- error_content = f"Excel export failed: {str(e)}\n\nData:\n{json.dumps(data, indent=2)}"
665
- return error_content.encode('utf-8')
666
-
667
- def _export_enhancement_html(self, data: Dict[str, Any]) -> str:
668
- """Export content enhancement results as HTML"""
669
- try:
670
- analysis = data.get('content_analysis', {})
671
- scores = data.get('performance_scores', {})
672
- optimization = data.get('optimization_results', {})
673
-
674
- html_content = f"""
675
- <!DOCTYPE html>
676
- <html lang="en">
677
- <head>
678
- <meta charset="UTF-8">
679
- <meta name="viewport" content="width=device-width, initial-scale=1.0">
680
- <title>Content Enhancement Report</title>
681
- <style>
682
- body {{
683
- font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
684
- line-height: 1.6;
685
- color: #333;
686
- max-width: 1000px;
687
- margin: 0 auto;
688
- padding: 20px;
689
- background-color: #f8f9fa;
690
- }}
691
- .header {{
692
- background: linear-gradient(135deg, #28a745 0%, #20c997 100%);
693
- color: white;
694
- padding: 30px;
695
- border-radius: 10px;
696
- margin-bottom: 30px;
697
- text-align: center;
698
- }}
699
- .scores {{
700
- display: grid;
701
- grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
702
- gap: 20px;
703
- margin-bottom: 30px;
704
- }}
705
- .score-card {{
706
- background: white;
707
- padding: 20px;
708
- border-radius: 10px;
709
- box-shadow: 0 4px 6px rgba(0,0,0,0.1);
710
- text-align: center;
711
- }}
712
- .content-section {{
713
- background: white;
714
- padding: 30px;
715
- border-radius: 10px;
716
- box-shadow: 0 4px 6px rgba(0,0,0,0.1);
717
- margin-bottom: 20px;
718
- }}
719
- .keywords {{
720
- display: flex;
721
- flex-wrap: wrap;
722
- gap: 10px;
723
- margin-top: 15px;
724
- }}
725
- .keyword {{
726
- background: #e9ecef;
727
- padding: 5px 10px;
728
- border-radius: 20px;
729
- font-size: 0.9em;
730
- }}
731
- .optimized-content {{
732
- background: #f8f9fa;
733
- padding: 20px;
734
- border-left: 4px solid #28a745;
735
- border-radius: 5px;
736
- font-style: italic;
737
- }}
738
- </style>
739
- </head>
740
- <body>
741
- <div class="header">
742
- <h1>🔧 Content Enhancement Report</h1>
743
- <p>AI-Optimized Content Analysis Results</p>
744
- <p><strong>Analysis Date:</strong> {analysis.get('analysis_date', 'Unknown')}</p>
745
- </div>
746
-
747
- <div class="scores">
748
- <div class="score-card">
749
- <h3>Clarity Score</h3>
750
- <div style="font-size: 2em; font-weight: bold; color: #28a745;">
751
- {scores.get('clarity', 0):.1f}/10
752
- </div>
753
- </div>
754
- <div class="score-card">
755
- <h3>Structure Score</h3>
756
- <div style="font-size: 2em; font-weight: bold; color: #28a745;">
757
- {scores.get('structure', 0):.1f}/10
758
- </div>
759
- </div>
760
- <div class="score-card">
761
- <h3>Answerability Score</h3>
762
- <div style="font-size: 2em; font-weight: bold; color: #28a745;">
763
- {scores.get('answerability', 0):.1f}/10
764
- </div>
765
- </div>
766
- <div class="score-card">
767
- <h3>Overall Average</h3>
768
- <div style="font-size: 2em; font-weight: bold; color: #28a745;">
769
- {scores.get('overall_average', 0):.1f}/10
770
- </div>
771
- </div>
772
- </div>
773
-
774
- <div class="content-section">
775
- <h2>🔑 Identified Keywords</h2>
776
- <div class="keywords">
777
- {' '.join([f'<span class="keyword">{keyword}</span>' for keyword in optimization.get('keywords_identified', [])])}
778
- </div>
779
- </div>
780
-
781
- {'<div class="content-section"><h2>✨ Optimized Content</h2><div class="optimized-content">' + optimization.get('optimized_content', '') + '</div></div>' if optimization.get('optimized_content') and not optimization.get('analyze_only') else ''}
782
-
783
- <div class="content-section">
784
- <h2>💡 Improvements Made</h2>
785
- <ul>
786
- {' '.join([f'<li>{improvement}</li>' for improvement in optimization.get('improvements_made', [])])}
787
- </ul>
788
- </div>
789
-
790
- <div style="text-align: center; color: #666; margin-top: 40px; padding-top: 20px; border-top: 1px solid #ddd;">
791
- <p>Generated by GEO SEO AI Optimizer | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
792
- </div>
793
- </body>
794
- </html>
795
- """
796
-
797
- return html_content
798
-
799
- except Exception as e:
800
- return f"<html><body><h1>Enhancement HTML Export Error</h1><p>{str(e)}</p></body></html>"
801
-
802
- def _export_enhancement_csv(self, data: Dict[str, Any]) -> str:
803
- """Export content enhancement results as CSV"""
804
- try:
805
- output = io.StringIO()
806
- writer = csv.writer(output)
807
-
808
- # Header information
809
- analysis = data.get('content_analysis', {})
810
- scores = data.get('performance_scores', {})
811
- optimization = data.get('optimization_results', {})
812
-
813
- writer.writerow(['Content Enhancement Analysis Report'])
814
- writer.writerow(['Analysis Date:', analysis.get('analysis_date', 'Unknown')])
815
- writer.writerow(['Original Content Length:', analysis.get('original_content_length', 0)])
816
- writer.writerow(['Original Word Count:', analysis.get('original_word_count', 0)])
817
- writer.writerow([])
818
-
819
- # Performance scores
820
- writer.writerow(['Performance Scores'])
821
- writer.writerow(['Metric', 'Score'])
822
- for metric, score in scores.items():
823
- writer.writerow([metric.replace('_', ' ').title(), round(score, 2)])
824
-
825
- writer.writerow([])
826
- writer.writerow(['Keywords Identified'])
827
- for keyword in optimization.get('keywords_identified', []):
828
- writer.writerow([keyword])
829
-
830
- writer.writerow([])
831
- writer.writerow(['Improvements Made'])
832
- for improvement in optimization.get('improvements_made', []):
833
- writer.writerow([improvement])
834
-
835
- return output.getvalue()
836
-
837
- except Exception as e:
838
- return f"Enhancement CSV export error: {str(e)}"
839
-
840
- def _export_qa_html(self, data: Dict[str, Any]) -> str:
841
- """Export Q&A results as HTML"""
842
- try:
843
- session = data.get('qa_session', {})
844
- summary = data.get('summary', {})
845
- interactions = session.get('interactions', [])
846
-
847
- html_content = f"""
848
- <!DOCTYPE html>
849
- <html lang="en">
850
- <head>
851
- <meta charset="UTF-8">
852
- <meta name="viewport" content="width=device-width, initial-scale=1.0">
853
- <title>Q&A Session Report</title>
854
- <style>
855
- body {{
856
- font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
857
- line-height: 1.6;
858
- color: #333;
859
- max-width: 1000px;
860
- margin: 0 auto;
861
- padding: 20px;
862
- background-color: #f8f9fa;
863
- }}
864
- .header {{
865
- background: linear-gradient(135deg, #6f42c1 0%, #e83e8c 100%);
866
- color: white;
867
- padding: 30px;
868
- border-radius: 10px;
869
- margin-bottom: 30px;
870
- text-align: center;
871
- }}
872
- .summary {{
873
- display: grid;
874
- grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
875
- gap: 20px;
876
- margin-bottom: 30px;
877
- }}
878
- .summary-card {{
879
- background: white;
880
- padding: 20px;
881
- border-radius: 10px;
882
- box-shadow: 0 4px 6px rgba(0,0,0,0.1);
883
- text-align: center;
884
- }}
885
- .qa-item {{
886
- background: white;
887
- padding: 20px;
888
- border-radius: 10px;
889
- box-shadow: 0 4px 6px rgba(0,0,0,0.1);
890
- margin-bottom: 20px;
891
- }}
892
- .question {{
893
- background: #e9ecef;
894
- padding: 15px;
895
- border-left: 4px solid #6f42c1;
896
- border-radius: 5px;
897
- margin-bottom: 15px;
898
- }}
899
- .answer {{
900
- padding: 15px;
901
- border-left: 4px solid #28a745;
902
- border-radius: 5px;
903
- background: #f8f9fa;
904
- }}
905
- .sources {{
906
- margin-top: 15px;
907
- padding: 10px;
908
- background: #fff3cd;
909
- border-radius: 5px;
910
- font-size: 0.9em;
911
- }}
912
- </style>
913
- </head>
914
- <body>
915
- <div class="header">
916
- <h1>💬 Q&A Session Report</h1>
917
- <p>Document Question & Answer Analysis</p>
918
- <p><strong>Session Date:</strong> {session.get('session_date', 'Unknown')}</p>
919
- </div>
920
-
921
- <div class="summary">
922
- <div class="summary-card">
923
- <h3>Total Questions</h3>
924
- <div style="font-size: 2em; font-weight: bold; color: #6f42c1;">
925
- {session.get('total_questions', 0)}
926
- </div>
927
- </div>
928
- <div class="summary-card">
929
- <h3>Successful Answers</h3>
930
- <div style="font-size: 2em; font-weight: bold; color: #28a745;">
931
- {summary.get('successful_answers', 0)}
932
- </div>
933
- </div>
934
- <div class="summary-card">
935
- <h3>Avg Response Length</h3>
936
- <div style="font-size: 2em; font-weight: bold; color: #17a2b8;">
937
- {summary.get('average_response_length', 0):.0f}
938
- </div>
939
- </div>
940
- </div>
941
-
942
- <h2>📝 Q&A Interactions</h2>
943
- """
944
-
945
- # Add individual Q&A items
946
- for i, interaction in enumerate(interactions, 1):
947
- question = interaction.get('query', 'No question')
948
- answer = interaction.get('result', interaction.get('answer', 'No answer'))
949
- sources = interaction.get('sources', [])
950
-
951
- html_content += f"""
952
- <div class="qa-item">
953
- <h3>Question {i}</h3>
954
- <div class="question">
955
- <strong>Q:</strong> {question}
956
- </div>
957
- <div class="answer">
958
- <strong>A:</strong> {answer}
959
- </div>
960
- """
961
-
962
- if sources:
963
- html_content += '<div class="sources"><strong>Sources:</strong><ul>'
964
- for source in sources[:3]: # Limit to first 3 sources
965
- content_preview = source.get('content', '')[:200] + '...' if len(source.get('content', '')) > 200 else source.get('content', '')
966
- html_content += f'<li>{content_preview}</li>'
967
- html_content += '</ul></div>'
968
-
969
- html_content += '</div>'
970
-
971
- html_content += f"""
972
-
973
- <div style="text-align: center; color: #666; margin-top: 40px; padding-top: 20px; border-top: 1px solid #ddd;">
974
- <p>Generated by GEO SEO AI Optimizer | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
975
- </div>
976
- </body>
977
- </html>
978
- """
979
-
980
- return html_content
981
-
982
- except Exception as e:
983
- return f"<html><body><h1>Q&A HTML Export Error</h1><p>{str(e)}</p></body></html>"
984
-
985
- def _export_qa_csv(self, data: Dict[str, Any]) -> str:
986
- """Export Q&A results as CSV"""
987
- try:
988
- output = io.StringIO()
989
- writer = csv.writer(output)
990
-
991
- session = data.get('qa_session', {})
992
- summary = data.get('summary', {})
993
- interactions = session.get('interactions', [])
994
-
995
- # Header
996
- writer.writerow(['Q&A Session Report'])
997
- writer.writerow(['Session Date:', session.get('session_date', 'Unknown')])
998
- writer.writerow(['Total Questions:', session.get('total_questions', 0)])
999
- writer.writerow(['Successful Answers:', summary.get('successful_answers', 0)])
1000
- writer.writerow([])
1001
-
1002
- # Q&A data
1003
- writer.writerow(['Question Index', 'Question', 'Answer', 'Has Sources', 'Answer Length'])
1004
-
1005
- for i, interaction in enumerate(interactions, 1):
1006
- question = interaction.get('query', 'No question')
1007
- answer = interaction.get('result', interaction.get('answer', 'No answer'))
1008
- has_sources = 'Yes' if interaction.get('sources') else 'No'
1009
- answer_length = len(answer) if answer else 0
1010
-
1011
- writer.writerow([i, question, answer, has_sources, answer_length])
1012
-
1013
- return output.getvalue()
1014
-
1015
- except Exception as e:
1016
- return f"Q&A CSV export error: {str(e)}"
1017
-
1018
- def _export_batch_excel(self, data: Dict[str, Any]) -> bytes:
1019
- """Export batch results as Excel file"""
1020
- try:
1021
- output = io.BytesIO()
1022
-
1023
- with pd.ExcelWriter(output, engine='openpyxl') as writer:
1024
- # Batch metadata sheet
1025
- metadata = data.get('batch_metadata', {})
1026
- metadata_df = pd.DataFrame([
1027
- {'Property': k, 'Value': v} for k, v in metadata.items()
1028
- ])
1029
- metadata_df.to_excel(writer, sheet_name='Batch Metadata', index=False)
1030
-
1031
- # Batch summary sheet
1032
- summary = data.get('batch_summary', {})
1033
- summary_df = pd.DataFrame([
1034
- {'Metric': k, 'Value': v} for k, v in summary.items()
1035
- ])
1036
- summary_df.to_excel(writer, sheet_name='Batch Summary', index=False)
1037
-
1038
- # Individual results sheet
1039
- results = data.get('batch_results', [])
1040
- if results:
1041
- # Flatten results for tabular format
1042
- flattened_results = []
1043
- for i, result in enumerate(results):
1044
- flat_result = {'Batch_Index': i}
1045
- self._flatten_dict(result, flat_result)
1046
- flattened_results.append(flat_result)
1047
-
1048
- results_df = pd.DataFrame(flattened_results)
1049
- results_df.to_excel(writer, sheet_name='Batch Results', index=False)
1050
-
1051
- output.seek(0)
1052
- return output.getvalue()
1053
-
1054
- except Exception as e:
1055
- error_content = f"Batch Excel export failed: {str(e)}\n\nData:\n{json.dumps(data, indent=2)}"
1056
- return error_content.encode('utf-8')
1057
-
1058
- def _export_batch_csv(self, data: Dict[str, Any]) -> str:
1059
- """Export batch results as CSV"""
1060
- try:
1061
- output = io.StringIO()
1062
- writer = csv.writer(output)
1063
-
1064
- # Batch metadata
1065
- metadata = data.get('batch_metadata', {})
1066
- writer.writerow(['Batch Analysis Results'])
1067
- writer.writerow(['Export Timestamp:', data.get('export_timestamp', 'Unknown')])
1068
- writer.writerow([])
1069
-
1070
- writer.writerow(['Batch Metadata'])
1071
- for key, value in metadata.items():
1072
- writer.writerow([key, value])
1073
-
1074
- writer.writerow([])
1075
-
1076
- # Batch summary
1077
- summary = data.get('batch_summary', {})
1078
- writer.writerow(['Batch Summary'])
1079
- for key, value in summary.items():
1080
- writer.writerow([key, value])
1081
-
1082
- writer.writerow([])
1083
-
1084
- # Individual results (simplified)
1085
- results = data.get('batch_results', [])
1086
- if results:
1087
- writer.writerow(['Individual Results'])
1088
- writer.writerow(['Index', 'Status', 'Summary'])
1089
-
1090
- for i, result in enumerate(results):
1091
- status = 'Success' if not result.get('error') else 'Error'
1092
- summary_text = str(result)[:100] + '...' if len(str(result)) > 100 else str(result)
1093
- writer.writerow([i, status, summary_text])
1094
-
1095
- return output.getvalue()
1096
-
1097
- except Exception as e:
1098
- return f"Batch CSV export error: {str(e)}"
1099
-
1100
- def _export_geo_pdf(self, data: Dict[str, Any]) -> bytes:
1101
- """Export GEO data as PDF (placeholder - would need reportlab)"""
1102
- try:
1103
- # For now, return HTML content as bytes
1104
- # In a full implementation, you'd use reportlab or weasyprint
1105
- html_content = self._export_geo_html(data)
1106
- return html_content.encode('utf-8')
1107
-
1108
- except Exception as e:
1109
- error_content = f"PDF export not fully implemented. Error: {str(e)}"
1110
- return error_content.encode('utf-8')
1111
-
1112
- def _create_executive_summary(self, analysis_data: Dict[str, Any]) -> Dict[str, Any]:
1113
- """Create executive summary report"""
1114
- try:
1115
- geo_results = analysis_data.get('geo_results', [])
1116
- enhancement_results = analysis_data.get('enhancement_results', {})
1117
- qa_results = analysis_data.get('qa_results', [])
1118
-
1119
- # Calculate key metrics
1120
- overall_performance = self._calculate_overall_performance(analysis_data)
1121
-
1122
- return {
1123
- 'executive_summary': {
1124
- 'overall_performance_score': overall_performance,
1125
- 'key_findings': self._extract_key_findings(analysis_data),
1126
- 'priority_recommendations': self._get_priority_recommendations(analysis_data),
1127
- 'roi_potential': self._estimate_roi_potential(overall_performance),
1128
- 'implementation_timeline': self._suggest_implementation_timeline(analysis_data),
1129
- 'resource_requirements': self._estimate_resource_requirements(analysis_data)
1130
- }
1131
- }
1132
-
1133
- except Exception as e:
1134
- return {'error': f"Executive summary creation failed: {str(e)}"}
1135
-
1136
- def _create_summary_report(self, analysis_data: Dict[str, Any]) -> Dict[str, Any]:
1137
- """Create summary report"""
1138
- try:
1139
- return {
1140
- 'summary_report': {
1141
- 'analysis_overview': self._create_analysis_overview(analysis_data),
1142
- 'performance_metrics': self._summarize_performance_metrics(analysis_data),
1143
- 'improvement_opportunities': self._identify_improvement_opportunities(analysis_data),
1144
- 'competitive_position': self._assess_competitive_position(analysis_data),
1145
- 'next_steps': self._recommend_next_steps(analysis_data)
1146
- }
1147
- }
1148
-
1149
- except Exception as e:
1150
- return {'error': f"Summary report creation failed: {str(e)}"}
1151
-
1152
- def _create_full_report(self, analysis_data: Dict[str, Any]) -> Dict[str, Any]:
1153
- """Create full detailed report"""
1154
- try:
1155
- return {
1156
- 'full_report': {
1157
- 'executive_summary': self._create_executive_summary(analysis_data).get('executive_summary', {}),
1158
- 'detailed_analysis': {
1159
- 'geo_analysis_details': analysis_data.get('geo_results', []),
1160
- 'content_optimization_details': analysis_data.get('enhancement_results', {}),
1161
- 'qa_performance_details': analysis_data.get('qa_results', [])
1162
- },
1163
- 'methodology': self._document_methodology(),
1164
- 'data_sources': self._document_data_sources(analysis_data),
1165
- 'limitations': self._document_limitations(),
1166
- 'appendices': self._create_appendices(analysis_data)
1167
- }
1168
- }
1169
-
1170
- except Exception as e:
1171
- return {'error': f"Full report creation failed: {str(e)}"}
1172
-
1173
- def _create_batch_summary(self, batch_results: List[Dict[str, Any]]) -> Dict[str, Any]:
1174
- """Create summary of batch processing results"""
1175
- try:
1176
- total_items = len(batch_results)
1177
- successful_items = len([r for r in batch_results if not r.get('error')])
1178
- failed_items = total_items - successful_items
1179
-
1180
- return {
1181
- 'total_items': total_items,
1182
- 'successful_items': successful_items,
1183
- 'failed_items': failed_items,
1184
- 'success_rate': (successful_items / total_items * 100) if total_items > 0 else 0,
1185
- 'processing_status': 'Completed',
1186
- 'average_processing_time': self._calculate_avg_processing_time(batch_results),
1187
- 'common_errors': self._identify_common_errors(batch_results)
1188
- }
1189
-
1190
- except Exception as e:
1191
- return {'error': f"Batch summary creation failed: {str(e)}"}
1192
-
1193
- def _generate_performance_insights(self, scores: Dict[str, float], overall_avg: float) -> List[str]:
1194
- """Generate performance insights from scores"""
1195
- insights = []
1196
-
1197
- try:
1198
- # Overall performance insight
1199
- if overall_avg >= 8.0:
1200
- insights.append("Excellent overall GEO performance - content is well-optimized for AI search engines")
1201
- elif overall_avg >= 6.0:
1202
- insights.append("Good GEO performance with room for improvement in specific areas")
1203
- elif overall_avg >= 4.0:
1204
- insights.append("Moderate GEO performance - significant optimization opportunities exist")
1205
- else:
1206
- insights.append("Low GEO performance - comprehensive optimization needed")
1207
-
1208
- # Specific metric insights
1209
- for metric, score in scores.items():
1210
- if score < 5.0:
1211
- metric_name = metric.replace('_', ' ').title()
1212
- insights.append(f"Low {metric_name} score ({score:.1f}) needs immediate attention")
1213
- elif score >= 8.5:
1214
- metric_name = metric.replace('_', ' ').title()
1215
- insights.append(f"Excellent {metric_name} score ({score:.1f}) - maintain current approach")
1216
-
1217
- return insights[:5] # Return top 5 insights
1218
-
1219
- except Exception:
1220
- return ["Unable to generate performance insights"]
1221
-
1222
- def _generate_package_readme(self, analysis_data: Dict[str, Any]) -> str:
1223
- """Generate README file for export package"""
1224
- try:
1225
- readme_content = f"""
1226
- GEO SEO AI Optimizer - Analysis Package
1227
- ======================================
1228
-
1229
- Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
1230
-
1231
- This package contains the complete analysis results from the GEO SEO AI Optimizer tool.
1232
-
1233
- Files Included:
1234
- - JSON file: Complete raw data in JSON format
1235
- - HTML file: Visual report for web viewing
1236
- - CSV file: Tabular data for spreadsheet analysis
1237
- - README.txt: This file
1238
-
1239
- About GEO (Generative Engine Optimization):
1240
- GEO is the practice of optimizing content for AI-powered search engines and
1241
- language models. Unlike traditional SEO, GEO focuses on:
1242
-
1243
- - AI search visibility
1244
- - Query intent matching
1245
- - Conversational readiness
1246
- - Citation worthiness
1247
- - Semantic richness
1248
- - Context completeness
1249
-
1250
- How to Use These Files:
1251
- 1. Open the HTML file in a web browser for a visual report
1252
- 2. Import the CSV file into Excel or Google Sheets for analysis
1253
- 3. Use the JSON file for programmatic processing or integration
1254
-
1255
- For more information about GEO optimization, visit the tool documentation.
1256
-
1257
- Generated by: GEO SEO AI Optimizer v1.0
1258
- """
1259
- return readme_content
1260
-
1261
- except Exception as e:
1262
- return f"README generation failed: {str(e)}"
1263
-
1264
- # Helper methods for data processing and analysis
1265
-
1266
- def _get_performance_level(self, score: float) -> str:
1267
- """Get performance level description for a score"""
1268
- if score >= 8.0:
1269
- return "Excellent"
1270
- elif score >= 6.0:
1271
- return "Good"
1272
- elif score >= 4.0:
1273
- return "Fair"
1274
- else:
1275
- return "Needs Improvement"
1276
-
1277
- def _categorize_recommendation(self, recommendation: str) -> str:
1278
- """Categorize a recommendation based on content"""
1279
- rec_lower = recommendation.lower()
1280
-
1281
- if any(word in rec_lower for word in ['structure', 'heading', 'format']):
1282
- return "Content Structure"
1283
- elif any(word in rec_lower for word in ['keyword', 'semantic', 'topic']):
1284
- return "SEO & Keywords"
1285
- elif any(word in rec_lower for word in ['clarity', 'readability', 'language']):
1286
- return "Content Quality"
1287
- elif any(word in rec_lower for word in ['technical', 'schema', 'markup']):
1288
- return "Technical SEO"
1289
- else:
1290
- return "General"
1291
-
1292
- def _calculate_avg_response_length(self, qa_results: List[Dict[str, Any]]) -> float:
1293
- """Calculate average response length for Q&A results"""
1294
- try:
1295
- response_lengths = []
1296
- for result in qa_results:
1297
- answer = result.get('result', result.get('answer', ''))
1298
- if answer and not result.get('error'):
1299
- response_lengths.append(len(answer))
1300
-
1301
- return sum(response_lengths) / len(response_lengths) if response_lengths else 0
1302
-
1303
- except Exception:
1304
- return 0
1305
-
1306
- def _extract_common_topics(self, qa_results: List[Dict[str, Any]]) -> List[str]:
1307
- """Extract common topics from Q&A results"""
1308
- try:
1309
- # Simple topic extraction based on question keywords
1310
- topics = {}
1311
-
1312
- for result in qa_results:
1313
- question = result.get('query', result.get('question', ''))
1314
- if question:
1315
- words = question.lower().split()
1316
- for word in words:
1317
- if len(word) > 4: # Focus on longer words
1318
- topics[word] = topics.get(word, 0) + 1
1319
-
1320
- # Return top 5 most common topics
1321
- sorted_topics = sorted(topics.items(), key=lambda x: x[1], reverse=True)
1322
- return [topic for topic, count in sorted_topics[:5]]
1323
-
1324
- except Exception:
1325
- return []
1326
-
1327
- def _flatten_dict(self, d: Dict[str, Any], parent_dict: Dict[str, Any], parent_key: str = '') -> None:
1328
- """Flatten nested dictionary for tabular export"""
1329
- try:
1330
- for key, value in d.items():
1331
- new_key = f"{parent_key}_{key}" if parent_key else key
1332
-
1333
- if isinstance(value, dict):
1334
- self._flatten_dict(value, parent_dict, new_key)
1335
- elif isinstance(value, list):
1336
- parent_dict[new_key] = json.dumps(value) # Convert lists to JSON strings
1337
- else:
1338
- parent_dict[new_key] = value
1339
-
1340
- except Exception:
1341
- pass # Skip problematic keys
1342
-
1343
- def _calculate_overall_performance(self, analysis_data: Dict[str, Any]) -> float:
1344
- """Calculate overall performance score across all analyses"""
1345
- try:
1346
- scores = []
1347
-
1348
- # GEO scores
1349
- geo_results = analysis_data.get('geo_results', [])
1350
- for result in geo_results:
1351
- if 'geo_scores' in result:
1352
- geo_score_values = list(result['geo_scores'].values())
1353
- if geo_score_values:
1354
- scores.append(sum(geo_score_values) / len(geo_score_values))
1355
-
1356
- # Enhancement scores
1357
- enhancement = analysis_data.get('enhancement_results', {})
1358
- if 'scores' in enhancement:
1359
- enh_scores = list(enhancement['scores'].values())
1360
- if enh_scores:
1361
- scores.append(sum(enh_scores) / len(enh_scores))
1362
-
1363
- return sum(scores) / len(scores) if scores else 0
1364
-
1365
- except Exception:
1366
- return 0
1367
-
1368
- def _extract_key_findings(self, analysis_data: Dict[str, Any]) -> List[str]:
1369
- """Extract key findings from analysis data"""
1370
- findings = []
1371
-
1372
- try:
1373
- # Add findings based on performance scores
1374
- overall_perf = self._calculate_overall_performance(analysis_data)
1375
-
1376
- if overall_perf >= 8.0:
1377
- findings.append("Content demonstrates excellent AI search optimization")
1378
- elif overall_perf <= 4.0:
1379
- findings.append("Significant optimization opportunities identified")
1380
-
1381
- # Add more specific findings based on data
1382
- geo_results = analysis_data.get('geo_results', [])
1383
- if geo_results:
1384
- findings.append(f"Analyzed {len(geo_results)} pages for GEO performance")
1385
-
1386
- enhancement = analysis_data.get('enhancement_results', {})
1387
- if enhancement and 'keywords' in enhancement:
1388
- findings.append(f"Identified {len(enhancement['keywords'])} key optimization terms")
1389
-
1390
- return findings[:5] # Return top 5 findings
1391
-
1392
- except Exception:
1393
- return ["Unable to extract key findings"]
1394
-
1395
- def _get_priority_recommendations(self, analysis_data: Dict[str, Any]) -> List[str]:
1396
- """Get priority recommendations from analysis"""
1397
- try:
1398
- recommendations = []
1399
-
1400
- # Collect all recommendations from different analyses
1401
- geo_results = analysis_data.get('geo_results', [])
1402
- for result in geo_results:
1403
- recommendations.extend(result.get('recommendations', []))
1404
-
1405
- # Remove duplicates and return top priorities
1406
- unique_recs = list(set(recommendations))
1407
- return unique_recs[:3] # Top 3 priority recommendations
1408
-
1409
- except Exception:
1410
- return ["Review and implement GEO best practices"]
1411
-
1412
- def _estimate_roi_potential(self, performance_score: float) -> str:
1413
- """Estimate ROI potential based on performance score"""
1414
- if performance_score <= 4.0:
1415
- return "High - Significant improvement potential"
1416
- elif performance_score <= 6.0:
1417
- return "Medium - Moderate improvement opportunities"
1418
- else:
1419
- return "Low - Already well-optimized"
1420
-
1421
- def _suggest_implementation_timeline(self, analysis_data: Dict[str, Any]) -> str:
1422
- """Suggest implementation timeline"""
1423
- try:
1424
- overall_perf = self._calculate_overall_performance(analysis_data)
1425
-
1426
- if overall_perf <= 4.0:
1427
- return "3-6 months for comprehensive optimization"
1428
- elif overall_perf <= 6.0:
1429
- return "1-3 months for targeted improvements"
1430
- else:
1431
- return "Ongoing maintenance and monitoring"
1432
-
1433
- except Exception:
1434
- return "Timeline assessment unavailable"
1435
-
1436
- def _estimate_resource_requirements(self, analysis_data: Dict[str, Any]) -> Dict[str, str]:
1437
- """Estimate resource requirements"""
1438
- return {
1439
- 'content_team': 'Required for content optimization',
1440
- 'technical_team': 'Required for technical implementations',
1441
- 'timeline': self._suggest_implementation_timeline(analysis_data),
1442
- 'budget': 'Varies based on scope of optimizations'
1443
- }
1444
-
1445
- def _create_analysis_overview(self, analysis_data: Dict[str, Any]) -> Dict[str, Any]:
1446
- """Create analysis overview"""
1447
- try:
1448
- return {
1449
- 'analyses_performed': list(analysis_data.keys()),
1450
- 'total_items_analyzed': sum(len(v) if isinstance(v, list) else 1 for v in analysis_data.values()),
1451
- 'analysis_scope': 'Comprehensive GEO and content optimization analysis',
1452
- 'key_focus_areas': ['AI Search Optimization', 'Content Enhancement', 'Performance Analysis']
1453
- }
1454
-
1455
- except Exception:
1456
- return {'error': 'Overview creation failed'}
1457
-
1458
- def _summarize_performance_metrics(self, analysis_data: Dict[str, Any]) -> Dict[str, float]:
1459
- """Summarize performance metrics"""
1460
- try:
1461
- return {
1462
- 'overall_performance': self._calculate_overall_performance(analysis_data),
1463
- 'optimization_potential': 10 - self._calculate_overall_performance(analysis_data),
1464
- 'completion_rate': 100.0 # Assuming analysis completed successfully
1465
- }
1466
-
1467
- except Exception:
1468
- return {}
1469
-
1470
- def _identify_improvement_opportunities(self, analysis_data: Dict[str, Any]) -> List[str]:
1471
- """Identify improvement opportunities"""
1472
- return self._get_priority_recommendations(analysis_data)
1473
-
1474
- def _assess_competitive_position(self, analysis_data: Dict[str, Any]) -> str:
1475
- """Assess competitive position"""
1476
- try:
1477
- overall_perf = self._calculate_overall_performance(analysis_data)
1478
-
1479
- if overall_perf >= 8.0:
1480
- return "Strong - Above average GEO performance"
1481
- elif overall_perf >= 6.0:
1482
- return "Competitive - Meeting industry standards"
1483
- elif overall_perf >= 4.0:
1484
- return "Below Average - Improvement needed"
1485
- else:
1486
- return "Weak - Significant optimization required"
1487
-
1488
- except Exception:
1489
- return "Assessment unavailable"
1490
-
1491
- def _recommend_next_steps(self, analysis_data: Dict[str, Any]) -> List[str]:
1492
- """Recommend next steps"""
1493
- steps = [
1494
- "Review detailed analysis results",
1495
- "Prioritize recommendations by impact",
1496
- "Develop implementation plan",
1497
- "Monitor performance improvements"
1498
- ]
1499
-
1500
- # Add specific steps based on performance
1501
- overall_perf = self._calculate_overall_performance(analysis_data)
1502
- if overall_perf <= 4.0:
1503
- steps.insert(1, "Focus on fundamental GEO optimization")
1504
-
1505
- return steps
1506
-
1507
- def _document_methodology(self) -> Dict[str, str]:
1508
- """Document analysis methodology"""
1509
- return {
1510
- 'geo_analysis': 'AI-powered content analysis using specialized GEO metrics',
1511
- 'content_optimization': 'LLM-based content enhancement and scoring',
1512
- 'performance_scoring': 'Multi-dimensional scoring system for AI search optimization',
1513
- 'data_collection': 'Automated content parsing and analysis',
1514
- 'validation': 'Cross-referenced metrics and quality assurance checks'
1515
- }
1516
-
1517
- def _document_data_sources(self, analysis_data: Dict[str, Any]) -> List[str]:
1518
- """Document data sources used in analysis"""
1519
- sources = []
1520
-
1521
- if 'geo_results' in analysis_data:
1522
- sources.append("Website content analysis")
1523
- if 'enhancement_results' in analysis_data:
1524
- sources.append("Content optimization analysis")
1525
- if 'qa_results' in analysis_data:
1526
- sources.append("Document Q&A interactions")
1527
-
1528
- sources.extend([
1529
- "AI-powered content scoring",
1530
- "GEO performance metrics",
1531
- "Industry best practices database"
1532
- ])
1533
-
1534
- return sources
1535
-
1536
- def _document_limitations(self) -> List[str]:
1537
- """Document analysis limitations"""
1538
- return [
1539
- "Analysis based on current content snapshot",
1540
- "Performance may vary with search engine algorithm updates",
1541
- "Recommendations require human review for implementation",
1542
- "Results depend on quality of input content",
1543
- "AI model performance may vary across different content types"
1544
- ]
1545
-
1546
- def _create_appendices(self, analysis_data: Dict[str, Any]) -> Dict[str, Any]:
1547
- """Create report appendices"""
1548
- try:
1549
- return {
1550
- 'technical_details': {
1551
- 'models_used': ['GPT-based content analysis', 'Semantic similarity scoring'],
1552
- 'processing_time': 'Variable based on content volume',
1553
- 'confidence_intervals': 'Scores provided with ±0.5 accuracy'
1554
- },
1555
- 'glossary': {
1556
- 'GEO': 'Generative Engine Optimization - optimization for AI search engines',
1557
- 'AI Search Visibility': 'Likelihood of content appearing in AI search results',
1558
- 'Citation Worthiness': 'Probability of content being cited by AI systems',
1559
- 'Conversational Readiness': 'Suitability for AI chat responses'
1560
- },
1561
- 'references': [
1562
- 'GEO Best Practices Guide',
1563
- 'AI Search Engine Optimization Standards',
1564
- 'Content Performance Benchmarks'
1565
- ]
1566
- }
1567
-
1568
- except Exception:
1569
- return {}
1570
-
1571
- def _calculate_avg_processing_time(self, batch_results: List[Dict[str, Any]]) -> float:
1572
- """Calculate average processing time for batch results"""
1573
- try:
1574
- processing_times = []
1575
-
1576
- for result in batch_results:
1577
- if 'processing_time' in result:
1578
- processing_times.append(result['processing_time'])
1579
-
1580
- return sum(processing_times) / len(processing_times) if processing_times else 0
1581
-
1582
- except Exception:
1583
- return 0
1584
-
1585
- def _identify_common_errors(self, batch_results: List[Dict[str, Any]]) -> List[str]:
1586
- """Identify common errors in batch processing"""
1587
- try:
1588
- error_counts = {}
1589
-
1590
- for result in batch_results:
1591
- if result.get('error'):
1592
- error_msg = str(result['error'])[:50] # First 50 chars
1593
- error_counts[error_msg] = error_counts.get(error_msg, 0) + 1
1594
-
1595
- # Return top 3 most common errors
1596
- sorted_errors = sorted(error_counts.items(), key=lambda x: x[1], reverse=True)
1597
- return [error for error, count in sorted_errors[:3]]
1598
-
1599
- except Exception:
1600
- return []
1601
-
1602
-
1603
- class DataValidator:
1604
- """Helper class for validating export data"""
1605
-
1606
- @staticmethod
1607
- def validate_geo_data(geo_results: List[Dict[str, Any]]) -> Dict[str, Any]:
1608
- """Validate GEO analysis data structure"""
1609
- validation_result = {
1610
- 'valid': True,
1611
- 'errors': [],
1612
- 'warnings': []
1613
- }
1614
-
1615
- try:
1616
- if not geo_results:
1617
- validation_result['errors'].append("No GEO results provided")
1618
- validation_result['valid'] = False
1619
- return validation_result
1620
-
1621
- for i, result in enumerate(geo_results):
1622
- # Check required fields
1623
- if 'geo_scores' not in result:
1624
- validation_result['warnings'].append(f"Result {i} missing geo_scores")
1625
-
1626
- if 'page_data' not in result:
1627
- validation_result['warnings'].append(f"Result {i} missing page_data")
1628
-
1629
- # Validate score ranges
1630
- if 'geo_scores' in result:
1631
- for metric, score in result['geo_scores'].items():
1632
- if not isinstance(score, (int, float)) or score < 0 or score > 10:
1633
- validation_result['errors'].append(f"Invalid score for {metric} in result {i}")
1634
- validation_result['valid'] = False
1635
-
1636
- return validation_result
1637
-
1638
- except Exception as e:
1639
- validation_result['errors'].append(f"Validation failed: {str(e)}")
1640
- validation_result['valid'] = False
1641
- return validation_result
1642
-
1643
- @staticmethod
1644
- def validate_enhancement_data(enhancement_result: Dict[str, Any]) -> Dict[str, Any]:
1645
- """Validate content enhancement data structure"""
1646
- validation_result = {
1647
- 'valid': True,
1648
- 'errors': [],
1649
- 'warnings': []
1650
- }
1651
-
1652
- try:
1653
- # Check for required fields
1654
- if 'scores' not in enhancement_result:
1655
- validation_result['warnings'].append("Enhancement result missing scores")
1656
-
1657
- # Validate score structure
1658
- if 'scores' in enhancement_result:
1659
- scores = enhancement_result['scores']
1660
- required_scores = ['clarity', 'structuredness', 'answerability']
1661
-
1662
- for req_score in required_scores:
1663
- if req_score not in scores:
1664
- validation_result['warnings'].append(f"Missing {req_score} score")
1665
- elif not isinstance(scores[req_score], (int, float)):
1666
- validation_result['errors'].append(f"Invalid {req_score} score type")
1667
- validation_result['valid'] = False
1668
-
1669
- return validation_result
1670
-
1671
- except Exception as e:
1672
- validation_result['errors'].append(f"Enhancement validation failed: {str(e)}")
1673
- validation_result['valid'] = False
1674
- return validation_result
1675
-
1676
-
1677
- class ExportManager:
1678
- """High-level export management class"""
1679
-
1680
- def __init__(self):
1681
- self.exporter = ResultExporter()
1682
- self.validator = DataValidator()
1683
- self.export_history = []
1684
-
1685
- def export_with_validation(self, data: Dict[str, Any], data_type: str,
1686
- format_type: str = 'json') -> Dict[str, Any]:
1687
- """Export data with validation"""
1688
- try:
1689
- # Validate data first
1690
- if data_type == 'geo_analysis':
1691
- validation = self.validator.validate_geo_data(data.get('geo_results', []))
1692
- elif data_type == 'content_optimization':
1693
- validation = self.validator.validate_enhancement_data(data)
1694
- else:
1695
- validation = {'valid': True, 'errors': [], 'warnings': []}
1696
-
1697
- # Proceed with export if validation passes
1698
- if validation['valid']:
1699
- if data_type == 'geo_analysis':
1700
- result = self.exporter.export_geo_results(
1701
- data.get('geo_results', []),
1702
- data.get('website_url', 'unknown'),
1703
- format_type
1704
- )
1705
- elif data_type == 'content_optimization':
1706
- result = self.exporter.export_enhancement_results(data, format_type)
1707
- else:
1708
- result = json.dumps(data, indent=2, ensure_ascii=False)
1709
-
1710
- # Log export
1711
- self.export_history.append({
1712
- 'timestamp': datetime.now().isoformat(),
1713
- 'data_type': data_type,
1714
- 'format_type': format_type,
1715
- 'validation_warnings': validation.get('warnings', []),
1716
- 'success': True
1717
- })
1718
-
1719
- return {
1720
- 'success': True,
1721
- 'data': result,
1722
- 'validation': validation
1723
- }
1724
- else:
1725
- return {
1726
- 'success': False,
1727
- 'error': 'Data validation failed',
1728
- 'validation': validation
1729
- }
1730
-
1731
- except Exception as e:
1732
- self.export_history.append({
1733
- 'timestamp': datetime.now().isoformat(),
1734
- 'data_type': data_type,
1735
- 'format_type': format_type,
1736
- 'success': False,
1737
- 'error': str(e)
1738
- })
1739
-
1740
- return {
1741
- 'success': False,
1742
- 'error': f"Export failed: {str(e)}"
1743
- }
1744
-
1745
- def get_export_history(self) -> List[Dict[str, Any]]:
1746
- """Get export history"""
1747
- return self.export_history
1748
-
1749
- def clear_export_history(self) -> None:
1750
- """Clear export history"""
1751
- self.export_history.clear()
1752
-
1753
- def get_supported_formats(self) -> Dict[str, List[str]]:
1754
- """Get supported export formats by data type"""
1755
- return {
1756
- 'geo_analysis': ['json', 'csv', 'html', 'xlsx', 'pdf'],
1757
- 'content_optimization': ['json', 'html', 'csv'],
1758
- 'qa_results': ['json', 'html', 'csv'],
1759
- 'batch_analysis': ['json', 'xlsx', 'csv']
1760
- }
1761
-
1762
- def create_multi_format_export(self, data: Dict[str, Any], data_type: str,
1763
- formats: List[str] = None) -> Dict[str, Any]:
1764
- """Create export in multiple formats"""
1765
- if formats is None:
1766
- formats = ['json', 'html', 'csv']
1767
-
1768
- results = {}
1769
-
1770
- for format_type in formats:
1771
- try:
1772
- export_result = self.export_with_validation(data, data_type, format_type)
1773
- if export_result['success']:
1774
- results[format_type] = export_result['data']
1775
- else:
1776
- results[format_type] = {'error': export_result['error']}
1777
-
1778
- except Exception as e:
1779
- results[format_type] = {'error': str(e)}
1780
-
1781
- return {
1782
- 'multi_format_export': results,
1783
- 'formats_generated': list(results.keys()),
1784
- 'successful_formats': [fmt for fmt, data in results.items() if 'error' not in data]
1785
- }
1786
-
1787
-
1788
- # Utility functions for the export module
1789
-
1790
- def create_export_template(data_type: str) -> Dict[str, Any]:
1791
- """Create export template for different data types"""
1792
- templates = {
1793
- 'geo_analysis': {
1794
- 'website_url': 'https://example.com',
1795
- 'geo_results': [
1796
- {
1797
- 'page_data': {
1798
- 'url': 'https://example.com/page1',
1799
- 'title': 'Example Page',
1800
- 'word_count': 500
1801
- },
1802
- 'geo_scores': {
1803
- 'ai_search_visibility': 7.5,
1804
- 'query_intent_matching': 6.8,
1805
- 'conversational_readiness': 8.2,
1806
- 'citation_worthiness': 7.1
1807
- },
1808
- 'recommendations': [
1809
- 'Improve content structure',
1810
- 'Add more specific examples'
1811
- ]
1812
- }
1813
- ]
1814
- },
1815
- 'content_optimization': {
1816
- 'scores': {
1817
- 'clarity': 7.5,
1818
- 'structuredness': 6.8,
1819
- 'answerability': 8.2
1820
- },
1821
- 'keywords': ['example', 'optimization', 'content'],
1822
- 'optimized_text': 'This is the optimized version of the content...',
1823
- 'optimization_suggestions': [
1824
- 'Improve sentence structure',
1825
- 'Add more specific keywords'
1826
- ]
1827
- },
1828
- 'qa_results': [
1829
- {
1830
- 'query': 'What is the main topic?',
1831
- 'result': 'The main topic is content optimization for AI systems.',
1832
- 'sources': [
1833
- {
1834
- 'content': 'Source document content...',
1835
- 'metadata': {'source': 'document1.pdf'}
1836
- }
1837
- ]
1838
- }
1839
- ]
1840
- }
1841
-
1842
- return templates.get(data_type, {})
1843
-
1844
-
1845
- def export_demo_data() -> Dict[str, Any]:
1846
- """Export demonstration data for testing"""
1847
- demo_data = {
1848
- 'geo_analysis_demo': create_export_template('geo_analysis'),
1849
- 'content_optimization_demo': create_export_template('content_optimization'),
1850
- 'qa_results_demo': create_export_template('qa_results')
1851
- }
1852
-
1853
- return demo_data
1854
-
1855
-
1856
- # Export the main classes and functions
1857
- __all__ = [
1858
- 'ResultExporter',
1859
- 'GEOReport',
1860
- 'ContentAnalysis',
1861
- 'DataValidator',
1862
- 'ExportManager',
1863
- 'create_export_template',
1864
- 'export_demo_data'
1865
- ]
1866
-
1867
-
1868
- # Example usage for testing
1869
- if __name__ == "__main__":
1870
- # Create exporter instance
1871
- exporter = ResultExporter()
1872
-
1873
- # Test with demo data
1874
- demo_geo_data = create_export_template('geo_analysis')
1875
-
1876
- # Export in different formats
1877
- json_export = exporter.export_geo_results(
1878
- demo_geo_data['geo_results'],
1879
- demo_geo_data['website_url'],
1880
- 'json'
1881
- )
1882
-
1883
- html_export = exporter.export_geo_results(
1884
- demo_geo_data['geo_results'],
1885
- demo_geo_data['website_url'],
1886
- 'html'
1887
- )
1888
-
1889
- print("JSON Export:", json_export[:200] + "..." if len(str(json_export)) > 200 else json_export)
1890
- print("\nHTML Export:", html_export[:200] + "..." if len(str(html_export)) > 200 else html_export)
1891
-
1892
- # Test enhancement export
1893
- demo_enhancement = create_export_template('content_optimization')
1894
- enhancement_export = exporter.export_enhancement_results(demo_enhancement, 'json')
1895
-
1896
- print("\nEnhancement Export:", enhancement_export[:200] + "..." if len(str(enhancement_export)) > 200 else enhancement_export)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
utils/optimizer.py DELETED
@@ -1,354 +0,0 @@
1
- # Enhanced Content Optimization Module with RAG for GEO
2
- # Integrates RAG functionality for better Generative Engine Optimization
3
-
4
- import json
5
- import re
6
- from typing import Dict, Any, List, Optional
7
- from langchain.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate
8
- from langchain.schema import Document
9
-
10
-
11
- class ContentOptimizer:
12
- """Enhanced Content Optimizer with RAG capabilities for GEO"""
13
-
14
- def __init__(self, llm, vector_chunker=None):
15
- self.llm = llm
16
- self.vector_chunker = vector_chunker
17
- self.setup_prompts()
18
- self.setup_geo_knowledge_base()
19
-
20
- def setup_geo_knowledge_base(self):
21
- """Initialize GEO best practices knowledge base"""
22
- self.geo_knowledge = [
23
- """
24
- Generative Engine Optimization (GEO) Best Practices:
25
-
26
- 1. Structure for AI Consumption:
27
- - Use clear headings and subheadings
28
- - Include bullet points and numbered lists
29
- - Provide direct, concise answers to common questions
30
- - Use schema markup when possible
31
-
32
- 2. Content Format for LLMs:
33
- - Answer questions directly in the first sentence
34
- - Use "what, why, how" question patterns
35
- - Include relevant entities and proper nouns
36
- - Maintain factual accuracy with citations
37
-
38
- 3. Semantic Optimization:
39
- - Include related terms and synonyms
40
- - Use entity-rich content (people, places, organizations)
41
- - Connect concepts with clear relationships
42
- - Optimize for topic clusters, not just keywords
43
- """,
44
-
45
- """
46
- AI Search Visibility Optimization:
47
-
48
- 1. Query Intent Matching:
49
- - Address user intent explicitly
50
- - Use natural language patterns
51
- - Include question-answer pairs
52
- - Optimize for conversational queries
53
-
54
- 2. Citation Worthiness:
55
- - Include authoritative sources and data
56
- - Use specific facts and statistics
57
- - Provide expert opinions and insights
58
- - Maintain consistent tone and expertise
59
-
60
- 3. Multi-Query Coverage:
61
- - Address related questions in the same content
62
- - Use comprehensive topic coverage
63
- - Include long-tail and specific queries
64
- - Provide context for complex topics
65
- """,
66
-
67
- """
68
- Content Structure for AI Systems:
69
-
70
- 1. Information Architecture:
71
- - Lead with key information
72
- - Use inverted pyramid structure
73
- - Include table of contents for long content
74
- - Break complex topics into digestible sections
75
-
76
- 2. Conversational Readiness:
77
- - Write in active voice
78
- - Use clear, direct language
79
- - Include transitional phrases
80
- - Optimize sentence length (12-20 words)
81
-
82
- 3. Context Completeness:
83
- - Define technical terms
84
- - Provide background information
85
- - Include relevant examples
86
- - Connect to broader topic context
87
- """
88
- ]
89
-
90
- def setup_prompts(self):
91
- """Initialize optimization prompts with RAG integration"""
92
- self.rag_enhancement_prompt = """
93
- You are a Generative Engine Optimization (GEO) specialist with access to best practices knowledge.
94
-
95
- Based on the provided GEO knowledge and the user's content, optimize the content for:
96
- 1. AI search engines (ChatGPT, Claude, Gemini)
97
- 2. LLM-based question answering systems
98
- 3. Conversational AI interfaces
99
- 4. Citation and reference systems
100
-
101
- Use the knowledge base to inform your optimization decisions.
102
-
103
- Knowledge Base Context:
104
- {context}
105
-
106
- Original Content:
107
- {content}
108
-
109
- Provide comprehensive GEO optimization in JSON format:
110
- ```json
111
- {{
112
- "geo_analysis": {{
113
- "current_geo_score": 7.5,
114
- "ai_search_visibility": 8.0,
115
- "query_intent_matching": 7.0,
116
- "conversational_readiness": 8.5,
117
- "citation_worthiness": 6.5,
118
- "context_completeness": 7.5
119
- }},
120
- "optimization_opportunities": [
121
- {{
122
- "type": "Structure Enhancement",
123
- "description": "Add clear headings and Q&A format",
124
- "priority": "high",
125
- "expected_impact": "Improve AI parsing by 25%"
126
- }}
127
- ],
128
- "optimized_content": {{
129
- "enhanced_text": "Your optimized content here...",
130
- "structural_improvements": ["Added FAQ section", "Improved headings"],
131
- "semantic_enhancements": ["Added related terms", "Improved entity density"]
132
- }},
133
- "geo_keywords": {{
134
- "primary_entities": ["entity1", "entity2"],
135
- "semantic_terms": ["term1", "term2"],
136
- "question_patterns": ["What is...", "How does..."],
137
- "related_concepts": ["concept1", "concept2"]
138
- }},
139
- "recommendations": [
140
- "Add more specific examples",
141
- "Include authoritative citations",
142
- "Improve conversational flow"
143
- ]
144
- }}
145
- ```
146
- """.strip()
147
-
148
- self.competitive_geo_prompt = """
149
- Analyze the content against GEO best practices and identify competitive optimization opportunities.
150
-
151
- GEO Knowledge Base:
152
- {context}
153
-
154
- Content to Analyze:
155
- {content}
156
-
157
- Provide competitive GEO analysis:
158
- ```json
159
- {{
160
- "competitive_gaps": {{
161
- "missing_question_patterns": ["What questions aren't covered"],
162
- "entity_gaps": ["Important entities not mentioned"],
163
- "semantic_opportunities": ["Related terms to include"],
164
- "structural_weaknesses": ["Formatting issues for AI"]
165
- }},
166
- "benchmark_comparison": {{
167
- "current_performance": {{
168
- "ai_answerability": 6.5,
169
- "semantic_richness": 7.0,
170
- "structural_clarity": 8.0
171
- }},
172
- "optimization_potential": {{
173
- "ai_answerability": 9.0,
174
- "semantic_richness": 8.5,
175
- "structural_clarity": 9.5
176
- }}
177
- }},
178
- "action_plan": [
179
- {{
180
- "priority": "high",
181
- "action": "Add FAQ section",
182
- "rationale": "Improves direct question answering"
183
- }}
184
- ]
185
- }}
186
- ```
187
- """.strip()
188
-
189
-
190
- def optimize_content_with_rag(self, content: str, optimization_type: str = "geo_standard", analyze_only: bool = False) -> Dict[str, Any]:
191
- try:
192
- knowledge_docs = [Document(page_content=k, metadata={"source": "geo_best_practices"}) for k in self.geo_knowledge]
193
- context = "\n\n".join(self.geo_knowledge)
194
-
195
- if self.vector_chunker:
196
- qa_chain = self.vector_chunker.create_qa_chain(knowledge_docs, self.llm)
197
- geo_query = f"How to optimize this type of content for AI search engines: {content[:500]}"
198
- context_result = qa_chain({"query": geo_query})
199
- context = context_result.get("result", context)
200
-
201
- return self._competitive_geo_optimization(content, context) if optimization_type == "competitive_geo" else self._standard_geo_optimization(content, context, analyze_only)
202
-
203
- except Exception as e:
204
- return {"error": f"RAG-enhanced optimization failed: {str(e)}"}
205
-
206
- def _standard_geo_optimization(self, content: str, context: str, analyze_only: bool) -> Dict[str, Any]:
207
- try:
208
- prompt = ChatPromptTemplate.from_messages([
209
- SystemMessagePromptTemplate.from_template(self.rag_enhancement_prompt),
210
- HumanMessagePromptTemplate.from_template("Optimize this content using GEO best practices.")
211
- ])
212
- result = (prompt | self.llm).invoke({"context": context, "content": content[:5000]})
213
- parsed = self._parse_optimization_result(getattr(result, 'content', str(result)))
214
- parsed.update({
215
- 'optimization_type': 'geo_standard',
216
- 'rag_enhanced': True,
217
- 'analyze_only': analyze_only,
218
- 'original_length': len(content),
219
- 'knowledge_sources': len(self.geo_knowledge)
220
- })
221
- return parsed
222
- except Exception as e:
223
- return {"error": f"Standard GEO optimization failed: {str(e)}"}
224
-
225
- def _competitive_geo_optimization(self, content: str, context: str) -> Dict[str, Any]:
226
- try:
227
- prompt = ChatPromptTemplate.from_messages([
228
- SystemMessagePromptTemplate.from_template(self.competitive_geo_prompt),
229
- HumanMessagePromptTemplate.from_template("Perform competitive GEO analysis.")
230
- ])
231
- result = (prompt | self.llm).invoke({"context": context, "content": content[:5000]})
232
- parsed = self._parse_optimization_result(getattr(result, 'content', str(result)))
233
- parsed.update({
234
- 'optimization_type': 'competitive_geo',
235
- 'rag_enhanced': True,
236
- 'competitive_analysis': True
237
- })
238
- return parsed
239
- except Exception as e:
240
- return {"error": f"Competitive GEO optimization failed: {str(e)}"}
241
-
242
- def batch_optimize_with_rag(self, content_list: List[str], optimization_type: str = "geo_standard") -> List[Dict[str, Any]]:
243
- results = []
244
- for i, content in enumerate(content_list):
245
- try:
246
- result = self.optimize_content_with_rag(content, optimization_type)
247
- result['batch_index'] = i
248
- results.append(result)
249
- except Exception as e:
250
- results.append({
251
- 'batch_index': i,
252
- 'error': f"Batch GEO optimization failed: {str(e)}"
253
- })
254
- return results
255
-
256
- def analyze_geo_readability(self, content: str) -> Dict[str, Any]:
257
- try:
258
- words = content.split()
259
- sentences = [s.strip() for s in re.split(r'[.!?]+', content) if s.strip()]
260
- paragraphs = [p.strip() for p in content.split('\n\n') if p.strip()]
261
-
262
- metrics = {
263
- 'questions': len(re.findall(r'\?', content)),
264
- 'headings': len(re.findall(r'^#+\s', content, re.MULTILINE)),
265
- 'lists': len(re.findall(r'^\s*[-*+]\s', content, re.MULTILINE)),
266
- 'entities': len(re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', content)),
267
- 'numbers': len(re.findall(r'\b\d+\.?\d*\b', content)),
268
- 'sentence_count': len(sentences),
269
- 'word_count': len(words)
270
- }
271
-
272
- geo_score = self._calculate_geo_readability_score({
273
- 'avg_words_per_sentence': metrics['word_count'] / metrics['sentence_count'] if metrics['sentence_count'] else 0,
274
- 'questions_ratio': metrics['questions'] / metrics['sentence_count'] if metrics['sentence_count'] else 0,
275
- 'structure_elements': metrics['headings'] + metrics['lists'],
276
- 'entity_density': metrics['entities'] / metrics['word_count'] if metrics['word_count'] else 0,
277
- 'numeric_data': metrics['numbers'] / metrics['word_count'] if metrics['word_count'] else 0
278
- })
279
-
280
- return {
281
- 'geo_readability_metrics': metrics,
282
- 'geo_readability_score': geo_score,
283
- 'geo_recommendations': self._generate_geo_recommendations(metrics)
284
- }
285
- except Exception as e:
286
- return {'error': f"GEO readability analysis failed: {str(e)}"}
287
-
288
- def _calculate_geo_readability_score(self, m: Dict[str, float]) -> float:
289
- try:
290
- score = (
291
- max(0, 10 - abs(m['avg_words_per_sentence'] - 15) * 0.3) * 0.2 +
292
- min(10, m['questions_ratio'] * 50) * 0.25 +
293
- min(10, m['structure_elements'] * 1.5) * 0.25 +
294
- min(10, m['entity_density'] * 100) * 0.15 +
295
- min(10, m['numeric_data'] * 200) * 0.15
296
- )
297
- return round(score, 1)
298
- except Exception:
299
- return 5.0
300
-
301
- def _generate_geo_recommendations(self, m: Dict[str, int]) -> List[str]:
302
- r = []
303
- if m['questions'] == 0:
304
- r.append("Add FAQ section or question-based headings.")
305
- if m['headings'] < 2:
306
- r.append("Use more structured headings.")
307
- if m['lists'] == 0:
308
- r.append("Include bullet points or numbered lists.")
309
- if m['entities'] < 5:
310
- r.append("Add named or topical entities.")
311
- if m['questions'] / m['sentence_count'] < 0.1:
312
- r.append("Transform statements into Q&A pairs.")
313
- return r
314
-
315
- def _clean_json_string(self, json_str: str) -> str:
316
- json_str = json_str.replace("...", "")
317
- json_str = re.sub(r",\s*([}\]])", r"\\1", json_str)
318
- json_str = json_str.strip('`')
319
- return json_str
320
-
321
- def _parse_optimization_result(self, response_text: str) -> Dict[str, Any]:
322
- try:
323
- start = response_text.find('{')
324
- end = response_text.rfind('}') + 1
325
- if start != -1 and end != -1:
326
- json_str = self._clean_json_string(response_text[start:end])
327
- return json.loads(json_str)
328
- return {
329
- 'raw_response': response_text,
330
- 'parsing_error': 'No JSON structure found in response',
331
- 'geo_analysis': {},
332
- 'recommendations': []
333
- }
334
- except json.JSONDecodeError as e:
335
- return {
336
- 'raw_response': response_text,
337
- 'parsing_error': f'JSON decode error: {str(e)}',
338
- 'geo_analysis': {},
339
- 'recommendations': []
340
- }
341
- except Exception as e:
342
- return {
343
- 'raw_response': response_text,
344
- 'parsing_error': f'Unexpected error: {str(e)}',
345
- 'geo_analysis': {},
346
- 'recommendations': []
347
- }
348
-
349
- # Legacy support methods
350
- def optimize_content(self, content: str, analyze_only: bool = False, include_keywords: bool = True, optimization_type: str = "standard") -> Dict[str, Any]:
351
- return self.optimize_content_with_rag(content, optimization_type, analyze_only)
352
-
353
- def analyze_content_readability(self, content: str) -> Dict[str, Any]:
354
- return self.analyze_geo_readability(content)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
utils/parser.py DELETED
@@ -1,549 +0,0 @@
1
- """
2
- Content Parsing Module
3
- Handles extraction of content from PDFs, text, and webpages
4
- """
5
-
6
- import requests
7
- from bs4 import BeautifulSoup
8
- from urllib.parse import urljoin, urlparse
9
- from typing import List, Dict, Any
10
- import time
11
- from langchain_community.document_loaders import PyPDFLoader
12
- from langchain.schema import Document
13
-
14
-
15
- class BaseParser:
16
- """Base class for all content parsers"""
17
-
18
- def __init__(self):
19
- self.supported_formats = []
20
-
21
- def parse(self, source: str) -> List[Document]:
22
- """Parse content from source and return LangChain Documents"""
23
- raise NotImplementedError("Subclasses must implement parse method")
24
-
25
- def validate_source(self, source: str) -> bool:
26
- """Validate if the source can be processed"""
27
- return True
28
-
29
-
30
- class PDFParser(BaseParser):
31
- """Parser for PDF documents"""
32
-
33
- def __init__(self):
34
- super().__init__()
35
- self.supported_formats = ['.pdf']
36
-
37
- def parse(self, pdf_path: str) -> List[Document]:
38
- """
39
- Parse PDF file and return list of Document objects
40
-
41
- Args:
42
- pdf_path (str): Path to the PDF file
43
-
44
- Returns:
45
- List[Document]: List of parsed documents with metadata
46
- """
47
- try:
48
- loader = PyPDFLoader(pdf_path)
49
- documents = loader.load_and_split()
50
-
51
- # Add additional metadata
52
- for i, doc in enumerate(documents):
53
- doc.metadata.update({
54
- 'source_type': 'pdf',
55
- 'page_number': i + 1,
56
- 'total_pages': len(documents),
57
- 'parser': 'PDFParser'
58
- })
59
-
60
- return documents
61
-
62
- except Exception as e:
63
- raise Exception(f"Error parsing PDF: {str(e)}")
64
-
65
- def get_pdf_metadata(self, pdf_path: str) -> Dict[str, Any]:
66
- """Extract metadata from PDF file"""
67
- try:
68
- loader = PyPDFLoader(pdf_path)
69
- documents = loader.load()
70
-
71
- total_pages = len(documents)
72
- total_words = sum(len(doc.page_content.split()) for doc in documents)
73
-
74
- return {
75
- 'total_pages': total_pages,
76
- 'total_words': total_words,
77
- 'average_words_per_page': total_words / total_pages if total_pages > 0 else 0,
78
- 'file_type': 'PDF',
79
- 'parser_used': 'PyPDFLoader'
80
- }
81
-
82
- except Exception as e:
83
- return {'error': f"Could not extract metadata: {str(e)}"}
84
-
85
-
86
- class TextParser(BaseParser):
87
- """Parser for plain text content"""
88
-
89
- def __init__(self):
90
- super().__init__()
91
- self.supported_formats = ['.txt', 'plain_text']
92
- self.chunk_size = 1000 # Default chunk size for long texts
93
-
94
- def parse(self, text_content: str, chunk_size: int = None) -> List[Document]:
95
- """
96
- Parse text content and return list of Document objects
97
-
98
- Args:
99
- text_content (str): Raw text content
100
- chunk_size (int): Optional chunk size for splitting long texts
101
-
102
- Returns:
103
- List[Document]: List of documents, potentially chunked
104
- """
105
- try:
106
- if not text_content.strip():
107
- raise ValueError("Empty text content provided")
108
-
109
- chunk_size = chunk_size or self.chunk_size
110
-
111
- # If text is short, return as single document
112
- if len(text_content) <= chunk_size:
113
- doc = Document(
114
- page_content=text_content,
115
- metadata={
116
- 'source_type': 'text',
117
- 'word_count': len(text_content.split()),
118
- 'char_count': len(text_content),
119
- 'chunk_index': 0,
120
- 'total_chunks': 1,
121
- 'parser': 'TextParser'
122
- }
123
- )
124
- return [doc]
125
-
126
- # Split long text into chunks
127
- chunks = self._split_text_into_chunks(text_content, chunk_size)
128
- documents = []
129
-
130
- for i, chunk in enumerate(chunks):
131
- doc = Document(
132
- page_content=chunk,
133
- metadata={
134
- 'source_type': 'text',
135
- 'word_count': len(chunk.split()),
136
- 'char_count': len(chunk),
137
- 'chunk_index': i,
138
- 'total_chunks': len(chunks),
139
- 'parser': 'TextParser'
140
- }
141
- )
142
- documents.append(doc)
143
-
144
- return documents
145
-
146
- except Exception as e:
147
- raise Exception(f"Error parsing text: {str(e)}")
148
-
149
- def _split_text_into_chunks(self, text: str, chunk_size: int) -> List[str]:
150
- """Split text into chunks while preserving sentence boundaries"""
151
- sentences = text.split('. ')
152
- chunks = []
153
- current_chunk = ""
154
-
155
- for sentence in sentences:
156
- # Add sentence to current chunk if it fits
157
- test_chunk = current_chunk + sentence + ". "
158
-
159
- if len(test_chunk) <= chunk_size:
160
- current_chunk = test_chunk
161
- else:
162
- # Start new chunk if current chunk has content
163
- if current_chunk.strip():
164
- chunks.append(current_chunk.strip())
165
- current_chunk = sentence + ". "
166
-
167
- # Add final chunk if it has content
168
- if current_chunk.strip():
169
- chunks.append(current_chunk.strip())
170
-
171
- return chunks
172
-
173
- def analyze_text_structure(self, text_content: str) -> Dict[str, Any]:
174
- """Analyze the structure and characteristics of text content"""
175
- try:
176
- lines = text_content.split('\n')
177
- words = text_content.split()
178
- sentences = text_content.split('.')
179
-
180
- # Count different elements
181
- paragraphs = [p.strip() for p in text_content.split('\n\n') if p.strip()]
182
-
183
- return {
184
- 'total_words': len(words),
185
- 'total_sentences': len([s for s in sentences if s.strip()]),
186
- 'total_lines': len(lines),
187
- 'total_paragraphs': len(paragraphs),
188
- 'average_words_per_sentence': len(words) / len(sentences) if sentences else 0,
189
- 'average_sentences_per_paragraph': len(sentences) / len(paragraphs) if paragraphs else 0,
190
- 'character_count': len(text_content),
191
- 'reading_time_minutes': len(words) / 200, # Assuming 200 words per minute
192
- 'complexity_score': self._calculate_text_complexity(text_content)
193
- }
194
-
195
- except Exception as e:
196
- return {'error': f"Could not analyze text structure: {str(e)}"}
197
-
198
- def _calculate_text_complexity(self, text: str) -> float:
199
- """Calculate a simple text complexity score"""
200
- words = text.split()
201
- sentences = [s for s in text.split('.') if s.strip()]
202
-
203
- if not sentences:
204
- return 0.0
205
-
206
- # Average words per sentence (higher = more complex)
207
- avg_words_per_sentence = len(words) / len(sentences)
208
-
209
- # Average characters per word (higher = more complex)
210
- avg_chars_per_word = sum(len(word) for word in words) / len(words) if words else 0
211
-
212
- # Simple complexity score (normalized to 1-10 scale)
213
- complexity = (avg_words_per_sentence * 0.1) + (avg_chars_per_word * 0.5)
214
- return min(complexity, 10.0)
215
-
216
-
217
- class WebpageParser(BaseParser):
218
- """Parser for web content"""
219
-
220
- def __init__(self):
221
- super().__init__()
222
- self.supported_formats = ['http', 'https']
223
- self.headers = {
224
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
225
- }
226
- self.timeout = 10
227
- self.max_retries = 3
228
-
229
- def parse_website(self, url: str, max_pages: int = 1, include_subpages: bool = False) -> List[Dict[str, Any]]:
230
- """
231
- Parse website content and return structured data
232
-
233
- Args:
234
- url (str): Website URL to parse
235
- max_pages (int): Maximum number of pages to parse
236
- include_subpages (bool): Whether to include subpages
237
-
238
- Returns:
239
- List[Dict]: List of page data with content and metadata
240
- """
241
- try:
242
- pages_data = []
243
- urls_to_process = [url]
244
- processed_urls = set()
245
-
246
- # If including subpages, find additional URLs
247
- if include_subpages and max_pages > 1:
248
- subpage_urls = self._find_subpages(url, max_pages - 1)
249
- urls_to_process.extend(subpage_urls)
250
-
251
- # Process each URL
252
- for current_url in urls_to_process[:max_pages]:
253
- if current_url in processed_urls:
254
- continue
255
-
256
- page_data = self._parse_single_page(current_url)
257
- if page_data:
258
- pages_data.append(page_data)
259
- processed_urls.add(current_url)
260
-
261
- # Add small delay to be respectful
262
- time.sleep(1)
263
-
264
- return pages_data
265
-
266
- except Exception as e:
267
- raise Exception(f"Error parsing website: {str(e)}")
268
-
269
- def _parse_single_page(self, url: str) -> Dict[str, Any]:
270
- """Parse a single webpage and extract content"""
271
- try:
272
- # Make request with retries
273
- response = None
274
- for attempt in range(self.max_retries):
275
- try:
276
- response = requests.get(url, headers=self.headers, timeout=self.timeout)
277
- response.raise_for_status()
278
- break
279
- except requests.RequestException as e:
280
- if attempt == self.max_retries - 1:
281
- raise e
282
- time.sleep(2 ** attempt) # Exponential backoff
283
-
284
- if not response:
285
- return None
286
-
287
- # Parse HTML content
288
- soup = BeautifulSoup(response.content, 'html.parser')
289
-
290
- # Remove unwanted elements
291
- for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']):
292
- element.decompose()
293
-
294
- # Extract main content
295
- main_content = self._extract_main_content(soup)
296
-
297
- # Extract metadata
298
- title = self._extract_title(soup)
299
- description = self._extract_description(soup)
300
- headings = self._extract_headings(soup)
301
- links = self._extract_links(soup, url)
302
-
303
- # Clean and process text
304
- cleaned_text = self._clean_text_content(main_content)
305
-
306
- return {
307
- 'url': url,
308
- 'title': title,
309
- 'description': description,
310
- 'content': cleaned_text,
311
- 'headings': headings,
312
- 'internal_links': links['internal'],
313
- 'external_links': links['external'],
314
- 'word_count': len(cleaned_text.split()),
315
- 'char_count': len(cleaned_text),
316
- 'meta_keywords': self._extract_meta_keywords(soup),
317
- 'images': self._extract_images(soup, url),
318
- 'parser': 'WebpageParser',
319
- 'parsed_at': time.strftime('%Y-%m-%d %H:%M:%S')
320
- }
321
-
322
- except Exception as e:
323
- return {'url': url, 'error': f"Failed to parse page: {str(e)}"}
324
-
325
- def _extract_main_content(self, soup: BeautifulSoup) -> str:
326
- """Extract the main content from the page"""
327
- # Try to find main content in order of preference
328
- content_selectors = [
329
- 'main',
330
- 'article',
331
- '[role="main"]',
332
- '.content',
333
- '.main-content',
334
- '#content',
335
- '#main',
336
- '.post-content',
337
- '.entry-content'
338
- ]
339
-
340
- for selector in content_selectors:
341
- element = soup.select_one(selector)
342
- if element:
343
- return element.get_text(separator=' ', strip=True)
344
-
345
- # Fallback to body content
346
- body = soup.find('body')
347
- if body:
348
- return body.get_text(separator=' ', strip=True)
349
-
350
- return soup.get_text(separator=' ', strip=True)
351
-
352
- def _extract_title(self, soup: BeautifulSoup) -> str:
353
- """Extract page title"""
354
- title_tag = soup.find('title')
355
- if title_tag:
356
- return title_tag.get_text().strip()
357
-
358
- # Fallback to h1
359
- h1 = soup.find('h1')
360
- if h1:
361
- return h1.get_text().strip()
362
-
363
- return "No Title Found"
364
-
365
- def _extract_description(self, soup: BeautifulSoup) -> str:
366
- """Extract meta description"""
367
- meta_desc = soup.find('meta', attrs={'name': 'description'})
368
- if meta_desc and meta_desc.get('content'):
369
- return meta_desc['content'].strip()
370
-
371
- # Fallback to Open Graph description
372
- og_desc = soup.find('meta', attrs={'property': 'og:description'})
373
- if og_desc and og_desc.get('content'):
374
- return og_desc['content'].strip()
375
-
376
- return "No Description Found"
377
-
378
- def _extract_headings(self, soup: BeautifulSoup) -> List[Dict[str, Any]]:
379
- """Extract all headings with their hierarchy"""
380
- headings = []
381
-
382
- for i in range(1, 7): # h1 to h6
383
- for heading in soup.find_all(f'h{i}'):
384
- text = heading.get_text(strip=True)
385
- if text:
386
- headings.append({
387
- 'level': i,
388
- 'text': text,
389
- 'id': heading.get('id', ''),
390
- 'class': heading.get('class', [])
391
- })
392
-
393
- return headings
394
-
395
- def _extract_links(self, soup: BeautifulSoup, base_url: str) -> Dict[str, List[str]]:
396
- """Extract internal and external links"""
397
- internal_links = []
398
- external_links = []
399
- base_domain = urlparse(base_url).netloc
400
-
401
- for link in soup.find_all('a', href=True):
402
- href = link['href']
403
- full_url = urljoin(base_url, href)
404
- parsed_url = urlparse(full_url)
405
-
406
- if parsed_url.netloc == base_domain:
407
- internal_links.append(full_url)
408
- elif parsed_url.netloc: # External link with domain
409
- external_links.append(full_url)
410
-
411
- return {
412
- 'internal': list(set(internal_links)),
413
- 'external': list(set(external_links))
414
- }
415
-
416
- def _extract_meta_keywords(self, soup: BeautifulSoup) -> List[str]:
417
- """Extract meta keywords if available"""
418
- meta_keywords = soup.find('meta', attrs={'name': 'keywords'})
419
- if meta_keywords and meta_keywords.get('content'):
420
- keywords = meta_keywords['content'].split(',')
421
- return [kw.strip() for kw in keywords if kw.strip()]
422
- return []
423
-
424
- def _extract_images(self, soup: BeautifulSoup, base_url: str) -> List[Dict[str, str]]:
425
- """Extract image information"""
426
- images = []
427
-
428
- for img in soup.find_all('img'):
429
- src = img.get('src')
430
- if src:
431
- full_url = urljoin(base_url, src)
432
- images.append({
433
- 'src': full_url,
434
- 'alt': img.get('alt', ''),
435
- 'title': img.get('title', '')
436
- })
437
-
438
- return images
439
-
440
- def _clean_text_content(self, text: str) -> str:
441
- """Clean and normalize text content"""
442
- if not text:
443
- return ""
444
-
445
- # Split into lines and clean each line
446
- lines = text.split('\n')
447
- cleaned_lines = []
448
-
449
- for line in lines:
450
- line = line.strip()
451
- if line and len(line) > 1: # Skip empty lines and single characters
452
- cleaned_lines.append(line)
453
-
454
- # Join lines with single spaces
455
- cleaned_text = ' '.join(cleaned_lines)
456
-
457
- # Remove multiple spaces
458
- while ' ' in cleaned_text:
459
- cleaned_text = cleaned_text.replace(' ', ' ')
460
-
461
- return cleaned_text
462
-
463
- def _find_subpages(self, url: str, max_subpages: int) -> List[str]:
464
- """Find subpages from the main page"""
465
- try:
466
- response = requests.get(url, headers=self.headers, timeout=self.timeout)
467
- response.raise_for_status()
468
-
469
- soup = BeautifulSoup(response.content, 'html.parser')
470
- base_domain = urlparse(url).netloc
471
- subpages = set()
472
-
473
- # Find internal links
474
- for link in soup.find_all('a', href=True):
475
- href = link['href']
476
- full_url = urljoin(url, href)
477
- parsed_url = urlparse(full_url)
478
-
479
- # Only include internal links from same domain
480
- if (parsed_url.netloc == base_domain and
481
- full_url != url and
482
- not any(ext in full_url.lower() for ext in ['.pdf', '.jpg', '.png', '.gif', '.zip'])):
483
- subpages.add(full_url)
484
-
485
- if len(subpages) >= max_subpages:
486
- break
487
-
488
- return list(subpages)[:max_subpages]
489
-
490
- except Exception:
491
- return []
492
-
493
- def validate_url(self, url: str) -> bool:
494
- """Validate if URL is accessible"""
495
- try:
496
- response = requests.head(url, headers=self.headers, timeout=5)
497
- return response.status_code == 200
498
- except:
499
- return False
500
-
501
- def get_website_info(self, url: str) -> Dict[str, Any]:
502
- """Get basic information about a website"""
503
- try:
504
- response = requests.get(url, headers=self.headers, timeout=self.timeout)
505
- response.raise_for_status()
506
-
507
- soup = BeautifulSoup(response.content, 'html.parser')
508
-
509
- return {
510
- 'url': url,
511
- 'title': self._extract_title(soup),
512
- 'description': self._extract_description(soup),
513
- 'meta_keywords': self._extract_meta_keywords(soup),
514
- 'has_robots_meta': bool(soup.find('meta', attrs={'name': 'robots'})),
515
- 'has_viewport_meta': bool(soup.find('meta', attrs={'name': 'viewport'})),
516
- 'language': soup.get('lang', 'unknown'),
517
- 'status_code': response.status_code,
518
- 'content_type': response.headers.get('content-type', 'unknown'),
519
- 'server': response.headers.get('server', 'unknown')
520
- }
521
-
522
- except Exception as e:
523
- return {'url': url, 'error': f"Could not get website info: {str(e)}"}
524
-
525
-
526
- class ParserFactory:
527
- """Factory class to create appropriate parsers"""
528
-
529
- @staticmethod
530
- def get_parser(source_type: str):
531
- """Get the appropriate parser for the source type"""
532
- parsers = {
533
- 'pdf': PDFParser(),
534
- 'text': TextParser(),
535
- 'webpage': WebpageParser(),
536
- 'url': WebpageParser()
537
- }
538
-
539
- return parsers.get(source_type.lower())
540
-
541
- @staticmethod
542
- def detect_source_type(source: str) -> str:
543
- """Detect the type of content source"""
544
- if source.startswith(('http://', 'https://')):
545
- return 'webpage'
546
- elif source.endswith('.pdf'):
547
- return 'pdf'
548
- else:
549
- return 'text'
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
utils/scorer.py DELETED
@@ -1,484 +0,0 @@
1
- """
2
- GEO Scoring Module
3
- Analyzes content for Generative Engine Optimization (GEO) performance
4
- """
5
-
6
- import json
7
- from typing import Dict, Any, List
8
- from langchain.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate
9
-
10
-
11
- class GEOScorer:
12
- """Main class for calculating GEO scores and analysis"""
13
-
14
- def __init__(self, llm):
15
- self.llm = llm
16
- self.setup_prompts()
17
-
18
- def setup_prompts(self):
19
- """Initialize prompts for different types of analysis"""
20
-
21
- # Main GEO analysis prompt
22
- self.geo_analysis_prompt = (
23
- "You are a Generative Engine Optimization (GEO) Specialist. Your task is to critically analyze the input content for its effectiveness in AI-powered search engines and large language model (LLM) systems. "
24
- "Evaluate the content using the following GEO criteria, assigning a score from 1 to 10 for each: \n\n"
25
- "1. AI Search Visibility - How likely is the content to be surfaced by AI search engines?\n"
26
- "2. Query Intent Matching - How well does the content align with common user queries?\n"
27
- "3. Factual Accuracy & Authority - How trustworthy and authoritative is the information?\n"
28
- "4. Conversational Readiness - Is the content well-suited for AI chat responses?\n"
29
- "5. Semantic Richness - Does the content effectively use relevant semantic keywords?\n"
30
- "6. Context Completeness - Is the content self-contained and does it provide complete answers?\n"
31
- "7. Citation Worthiness - How likely is the content to be cited by AI systems?\n"
32
- "8. Multi-Query Coverage - Does the content address multiple related questions?\n\n"
33
- "Also provide:\n"
34
- "- Key topics and entities mentioned\n"
35
- "- Missing information or content gaps\n"
36
- "- Specific optimization opportunities\n"
37
- "- Actionable enhancement recommendations\n\n"
38
- "Respond strictly in JSON format using the structure below (double curly braces shown here to escape string formatting, do NOT include them in actual output):\n\n"
39
- "{{\n"
40
- " \"geo_scores\": {{\n"
41
- " \"ai_search_visibility\": 0.0,\n"
42
- " \"query_intent_matching\": 0.0,\n"
43
- " \"factual_accuracy\": 0.0,\n"
44
- " \"conversational_readiness\": 0.0,\n"
45
- " \"semantic_richness\": 0.0,\n"
46
- " \"context_completeness\": 0.0,\n"
47
- " \"citation_worthiness\": 0.0,\n"
48
- " \"multi_query_coverage\": 0.0\n"
49
- " }},\n"
50
- " \"overall_geo_score\": 0.0,\n"
51
- " \"primary_topics\": [\"topic1\", \"topic2\"],\n"
52
- " \"entities\": [\"entity1\", \"entity2\"],\n"
53
- " \"missing_gaps\": [\"gap1\", \"gap2\"],\n"
54
- " \"optimization_opportunities\": [\n"
55
- " {{\n"
56
- " \"type\": \"semantic_enhancement\",\n"
57
- " \"description\": \"Describe the improvement opportunity\",\n"
58
- " \"priority\": \"high\"\n"
59
- " }}\n"
60
- " ],\n"
61
- " \"recommendations\": [\n"
62
- " \"Write clear and specific suggestions to improve the content\"\n"
63
- " ]\n"
64
- "}}"
65
- )
66
-
67
- # Quick scoring prompt for faster analysis
68
- self.quick_score_prompt = (
69
- "You are an AI Search Optimization Analyst. Evaluate the given content and provide a quick scoring based on key criteria.\n"
70
- "Rate each of the following from 1 to 10:\n"
71
- "1. AI Search Visibility\n"
72
- "2. Query Intent Matching\n"
73
- "3. Conversational Readiness\n"
74
- "4. Citation Worthiness\n\n"
75
- "{{\n"
76
- " \"scores\": {{\n"
77
- " \"ai_search_visibility\": 0.0,\n"
78
- " \"query_intent_matching\": 0.0,\n"
79
- " \"conversational_readiness\": 0.0,\n"
80
- " \"citation_worthiness\": 0.0\n"
81
- " }},\n"
82
- " \"overall_score\": 0.0,\n"
83
- " \"top_recommendation\": \"Provide the most critical improvement needed\"\n"
84
- "}}"
85
- )
86
-
87
- # Competitive analysis prompt
88
- self.competitive_prompt = (
89
- "Compare these content pieces for GEO performance. Identify which performs better for AI search and why.\n"
90
- "Content A: {content_a}\n"
91
- "Content B: {content_b}\n"
92
- "Provide analysis in JSON:\n"
93
- "{{\n"
94
- " \"winner\": \"A\" or \"B\",\n"
95
- " \"score_comparison\": {{\n"
96
- " \"content_a_score\": 7.5,\n"
97
- " \"content_b_score\": 8.2\n"
98
- " }},\n"
99
- " \"key_differences\": [\"difference1\", \"difference2\"],\n"
100
- " \"improvement_suggestions\": {{\n"
101
- " \"content_a\": [\"suggestion1\"],\n"
102
- " \"content_b\": [\"suggestion1\"]\n"
103
- " }}\n"
104
- "}}"
105
- )
106
-
107
- def analyze_page_geo(self, content: str, title: str, detailed: bool = True) -> Dict[str, Any]:
108
- """
109
- Analyze a single page for GEO performance
110
- """
111
- try:
112
- # Choose prompt based on detail level
113
- if detailed:
114
- system_prompt = self.geo_analysis_prompt
115
- user_message = f"Title: {title}\n\nContent: {content[:8000]}"
116
- else:
117
- system_prompt = self.quick_score_prompt
118
- user_message = f"Title: {title}\n\nContent: {content[:4000]}"
119
-
120
- # Build prompt and run analysis
121
- prompt_template = ChatPromptTemplate.from_messages([
122
- SystemMessagePromptTemplate.from_template(system_prompt),
123
- HumanMessagePromptTemplate.from_template(user_message)
124
- ])
125
- # ("user", user_message)
126
- # ("system", system_prompt),
127
- chain = prompt_template | self.llm
128
- result = chain.invoke({}) # No variables needed
129
-
130
- # Extract and parse result
131
- result_content = result.content if hasattr(result, 'content') else str(result)
132
- parsed_result = self._parse_llm_response(result_content)
133
-
134
- # Add metadata
135
- parsed_result.update({
136
- 'analyzed_title': title,
137
- 'content_length': len(content),
138
- 'word_count': len(content.split()),
139
- 'analysis_type': 'detailed' if detailed else 'quick'
140
- })
141
-
142
- return parsed_result
143
-
144
- except Exception as e:
145
- return {'error': f"GEO analysis failed: {str(e)}"}
146
-
147
- def analyze_multiple_pages(self, pages_data: List[Dict[str, Any]], detailed: bool = True) -> List[Dict[str, Any]]:
148
- """
149
- Analyze multiple pages and return consolidated results
150
-
151
- Args:
152
- pages_data (List[Dict]): List of page data with content and metadata
153
- detailed (bool): Whether to perform detailed analysis
154
-
155
- Returns:
156
- List[Dict]: List of GEO analysis results
157
- """
158
- results = []
159
-
160
- for i, page_data in enumerate(pages_data):
161
- try:
162
- content = page_data.get('content', '')
163
- title = page_data.get('title', f'Page {i+1}')
164
-
165
- analysis = self.analyze_page_geo(content, title, detailed)
166
-
167
- # Add page-specific metadata
168
- analysis.update({
169
- 'page_url': page_data.get('url', ''),
170
- 'page_index': i,
171
- 'source_word_count': page_data.get('word_count', 0)
172
- })
173
-
174
- results.append(analysis)
175
-
176
- except Exception as e:
177
- results.append({
178
- 'page_index': i,
179
- 'page_url': page_data.get('url', ''),
180
- 'error': f"Analysis failed: {str(e)}"
181
- })
182
-
183
- return results
184
-
185
- def compare_content_geo(self, content_a: str, content_b: str, titles: tuple = None) -> Dict[str, Any]:
186
- """
187
- Compare two pieces of content for GEO performance
188
-
189
- Args:
190
- content_a (str): First content to compare
191
- content_b (str): Second content to compare
192
- titles (tuple): Optional titles for the content pieces
193
-
194
- Returns:
195
- Dict: Comparison analysis results
196
- """
197
- try:
198
- title_a, title_b = titles if titles else ("Content A", "Content B")
199
-
200
- prompt_template = ChatPromptTemplate.from_messages([
201
- ("system", self.competitive_prompt),
202
- ("user", "")
203
- ])
204
-
205
- # Format the competitive analysis prompt
206
- formatted_prompt = self.competitive_prompt.format(
207
- content_a=f"Title: {title_a}\nContent: {content_a[:4000]}",
208
- content_b=f"Title: {title_b}\nContent: {content_b[:4000]}"
209
- )
210
-
211
- chain = ChatPromptTemplate.from_messages([
212
- ("system", formatted_prompt),
213
- ("user", "Perform the comparison analysis.")
214
- ]) | self.llm
215
-
216
- result = chain.invoke({})
217
- result_content = result.content if hasattr(result, 'content') else str(result)
218
-
219
- return self._parse_llm_response(result_content)
220
-
221
- except Exception as e:
222
- return {'error': f"Comparison analysis failed: {str(e)}"}
223
-
224
- def calculate_aggregate_scores(self, individual_results: List[Dict[str, Any]]) -> Dict[str, Any]:
225
- """
226
- Calculate aggregate GEO scores from multiple page analyses
227
-
228
- Args:
229
- individual_results (List[Dict]): List of individual page analysis results
230
-
231
- Returns:
232
- Dict: Aggregate scores and insights
233
- """
234
- try:
235
- valid_results = [r for r in individual_results if 'geo_scores' in r and not r.get('error')]
236
-
237
- if not valid_results:
238
- return {'error': 'No valid results to aggregate'}
239
-
240
- # Calculate average scores
241
- score_keys = list(valid_results[0]['geo_scores'].keys())
242
- avg_scores = {}
243
-
244
- for key in score_keys:
245
- scores = [r['geo_scores'][key] for r in valid_results if key in r['geo_scores']]
246
- avg_scores[key] = sum(scores) / len(scores) if scores else 0
247
-
248
- overall_avg = sum(avg_scores.values()) / len(avg_scores) if avg_scores else 0
249
-
250
- # Collect all recommendations and opportunities
251
- all_recommendations = []
252
- all_opportunities = []
253
- all_topics = []
254
- all_entities = []
255
-
256
- for result in valid_results:
257
- all_recommendations.extend(result.get('recommendations', []))
258
- all_opportunities.extend(result.get('optimization_opportunities', []))
259
- all_topics.extend(result.get('primary_topics', []))
260
- all_entities.extend(result.get('entities', []))
261
-
262
- # Remove duplicates and prioritize
263
- unique_recommendations = list(set(all_recommendations))
264
- unique_topics = list(set(all_topics))
265
- unique_entities = list(set(all_entities))
266
-
267
- # Find highest and lowest performing areas
268
- best_score = max(avg_scores.items(), key=lambda x: x[1]) if avg_scores else ('none', 0)
269
- worst_score = min(avg_scores.items(), key=lambda x: x[1]) if avg_scores else ('none', 0)
270
-
271
- return {
272
- 'aggregate_scores': avg_scores,
273
- 'overall_score': overall_avg,
274
- 'pages_analyzed': len(valid_results),
275
- 'best_performing_metric': {
276
- 'metric': best_score[0],
277
- 'score': best_score[1]
278
- },
279
- 'lowest_performing_metric': {
280
- 'metric': worst_score[0],
281
- 'score': worst_score[1]
282
- },
283
- 'consolidated_recommendations': unique_recommendations[:10],
284
- 'all_topics': unique_topics,
285
- 'all_entities': unique_entities,
286
- 'high_priority_opportunities': [
287
- opp for opp in all_opportunities
288
- if opp.get('priority') == 'high'
289
- ][:5],
290
- 'score_distribution': self._calculate_score_distribution(avg_scores)
291
- }
292
-
293
- except Exception as e:
294
- return {'error': f"Aggregation failed: {str(e)}"}
295
-
296
- def generate_geo_report(self, analysis_results: Dict[str, Any], website_url: str = None) -> Dict[str, Any]:
297
- """
298
- Generate a comprehensive GEO report
299
-
300
- Args:
301
- analysis_results (Dict): Results from aggregate analysis
302
- website_url (str): Optional website URL for context
303
-
304
- Returns:
305
- Dict: Comprehensive GEO report
306
- """
307
- try:
308
- report = {
309
- 'report_metadata': {
310
- 'generated_at': self._get_timestamp(),
311
- 'website_url': website_url,
312
- 'analysis_type': 'GEO Performance Report'
313
- },
314
- 'executive_summary': self._generate_executive_summary(analysis_results),
315
- 'detailed_scores': analysis_results.get('aggregate_scores', {}),
316
- 'performance_insights': self._generate_performance_insights(analysis_results),
317
- 'actionable_recommendations': self._prioritize_recommendations(
318
- analysis_results.get('consolidated_recommendations', [])
319
- ),
320
- 'optimization_roadmap': self._create_optimization_roadmap(analysis_results),
321
- 'competitive_position': self._assess_competitive_position(analysis_results),
322
- 'technical_details': {
323
- 'pages_analyzed': analysis_results.get('pages_analyzed', 0),
324
- 'overall_score': analysis_results.get('overall_score', 0),
325
- 'score_distribution': analysis_results.get('score_distribution', {})
326
- }
327
- }
328
-
329
- return report
330
-
331
- except Exception as e:
332
- return {'error': f"Report generation failed: {str(e)}"}
333
-
334
- def _parse_llm_response(self, response_text: str) -> Dict[str, Any]:
335
- """Parse LLM response and extract JSON content"""
336
- try:
337
- # Find JSON content in the response
338
- json_start = response_text.find('{')
339
- json_end = response_text.rfind('}') + 1
340
-
341
- if json_start != -1 and json_end != -1:
342
- json_str = response_text[json_start:json_end]
343
- return json.loads(json_str)
344
- else:
345
- # If no JSON found, return the raw response
346
- return {'raw_response': response_text, 'parsing_error': 'No JSON found'}
347
-
348
- except json.JSONDecodeError as e:
349
- return {'raw_response': response_text, 'parsing_error': f'JSON decode error: {str(e)}'}
350
- except Exception as e:
351
- return {'raw_response': response_text, 'parsing_error': f'Unexpected error: {str(e)}'}
352
-
353
- def _calculate_score_distribution(self, scores: Dict[str, float]) -> Dict[str, Any]:
354
- """Calculate distribution of scores for insights"""
355
- if not scores:
356
- return {}
357
-
358
- score_values = list(scores.values())
359
-
360
- return {
361
- 'highest_score': max(score_values),
362
- 'lowest_score': min(score_values),
363
- 'average_score': sum(score_values) / len(score_values),
364
- 'score_range': max(score_values) - min(score_values),
365
- 'scores_above_7': len([s for s in score_values if s >= 7.0]),
366
- 'scores_below_5': len([s for s in score_values if s < 5.0])
367
- }
368
-
369
- def _generate_executive_summary(self, analysis_results: Dict[str, Any]) -> str:
370
- """Generate executive summary based on analysis results"""
371
- overall_score = analysis_results.get('overall_score', 0)
372
- pages_analyzed = analysis_results.get('pages_analyzed', 0)
373
-
374
- if overall_score >= 8.0:
375
- performance = "excellent"
376
- elif overall_score >= 6.5:
377
- performance = "good"
378
- elif overall_score >= 5.0:
379
- performance = "moderate"
380
- else:
381
- performance = "needs improvement"
382
-
383
- return f"Analysis of {pages_analyzed} pages shows {performance} GEO performance with an overall score of {overall_score:.1f}/10. Key opportunities exist in {analysis_results.get('lowest_performing_metric', {}).get('metric', 'multiple areas')}."
384
-
385
- def _generate_performance_insights(self, analysis_results: Dict[str, Any]) -> List[str]:
386
- """Generate performance insights based on analysis"""
387
- insights = []
388
-
389
- best_metric = analysis_results.get('best_performing_metric', {})
390
- worst_metric = analysis_results.get('lowest_performing_metric', {})
391
-
392
- if best_metric.get('score', 0) >= 8.0:
393
- insights.append(f"Strong performance in {best_metric.get('metric', 'unknown')} (score: {best_metric.get('score', 0):.1f})")
394
-
395
- if worst_metric.get('score', 10) < 6.0:
396
- insights.append(f"Significant improvement needed in {worst_metric.get('metric', 'unknown')} (score: {worst_metric.get('score', 0):.1f})")
397
-
398
- score_dist = analysis_results.get('score_distribution', {})
399
- if score_dist.get('score_range', 0) > 3.0:
400
- insights.append("High variability in scores indicates inconsistent optimization across metrics")
401
-
402
- return insights
403
-
404
- def _prioritize_recommendations(self, recommendations: List[str]) -> List[Dict[str, Any]]:
405
- """Prioritize recommendations based on impact potential"""
406
- prioritized = []
407
-
408
- # Simple prioritization based on keywords
409
- high_impact_keywords = ['semantic', 'structure', 'authority', 'factual']
410
- medium_impact_keywords = ['readability', 'clarity', 'format']
411
-
412
- for i, rec in enumerate(recommendations):
413
- priority = 'low'
414
- if any(keyword in rec.lower() for keyword in high_impact_keywords):
415
- priority = 'high'
416
- elif any(keyword in rec.lower() for keyword in medium_impact_keywords):
417
- priority = 'medium'
418
-
419
- prioritized.append({
420
- 'recommendation': rec,
421
- 'priority': priority,
422
- 'order': i + 1
423
- })
424
-
425
- # Sort by priority
426
- priority_order = {'high': 1, 'medium': 2, 'low': 3}
427
- prioritized.sort(key=lambda x: priority_order[x['priority']])
428
-
429
- return prioritized
430
-
431
- def _create_optimization_roadmap(self, analysis_results: Dict[str, Any]) -> Dict[str, List[str]]:
432
- """Create a phased optimization roadmap"""
433
- roadmap = {
434
- 'immediate_actions': [],
435
- 'short_term_goals': [],
436
- 'long_term_strategy': []
437
- }
438
-
439
- overall_score = analysis_results.get('overall_score', 0)
440
- worst_metric = analysis_results.get('lowest_performing_metric', {})
441
-
442
- # Immediate actions based on worst performing metric
443
- if worst_metric.get('score', 10) < 5.0:
444
- roadmap['immediate_actions'].append(f"Address critical issues in {worst_metric.get('metric', 'low-scoring areas')}")
445
-
446
- # Short-term goals
447
- if overall_score < 7.0:
448
- roadmap['short_term_goals'].append("Improve overall GEO score to above 7.0")
449
- roadmap['short_term_goals'].append("Enhance content structure and semantic richness")
450
-
451
- # Long-term strategy
452
- roadmap['long_term_strategy'].append("Establish consistent GEO optimization process")
453
- roadmap['long_term_strategy'].append("Monitor and track AI search performance")
454
-
455
- return roadmap
456
-
457
- def _assess_competitive_position(self, analysis_results: Dict[str, Any]) -> Dict[str, Any]:
458
- """Assess competitive position based on scores"""
459
- overall_score = analysis_results.get('overall_score', 0)
460
-
461
- if overall_score >= 8.5:
462
- position = "market_leader"
463
- description = "Content is highly optimized for AI search engines"
464
- elif overall_score >= 7.0:
465
- position = "competitive"
466
- description = "Content performs well but has room for improvement"
467
- elif overall_score >= 5.5:
468
- position = "average"
469
- description = "Content meets basic standards but lacks optimization"
470
- else:
471
- position = "needs_work"
472
- description = "Content requires significant optimization for AI search"
473
-
474
- return {
475
- 'position': position,
476
- 'description': description,
477
- 'score': overall_score,
478
- 'percentile_estimate': min(overall_score * 10, 100) # Rough percentile estimate
479
- }
480
-
481
- def _get_timestamp(self) -> str:
482
- """Get current timestamp"""
483
- from datetime import datetime
484
- return datetime.now().strftime('%Y-%m-%d %H:%M:%S')