Spaces:
Runtime error
Runtime error
| """ | |
| Results Export and Reporting Module | |
| Handles export of analysis results, reports, and data for external use | |
| """ | |
| import json | |
| import csv | |
| import io | |
| import zipfile | |
| import tempfile | |
| import os | |
| from datetime import datetime | |
| from typing import Dict, Any, List, Optional, Union | |
| import pandas as pd | |
| from dataclasses import dataclass, asdict | |
| class GEOReport: | |
| """Data class for GEO analysis reports""" | |
| website_url: str | |
| analysis_date: str | |
| overall_score: float | |
| pages_analyzed: int | |
| geo_scores: Dict[str, float] | |
| recommendations: List[str] | |
| optimization_opportunities: List[Dict[str, Any]] | |
| competitive_position: str | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert report to dictionary""" | |
| return asdict(self) | |
| class ContentAnalysis: | |
| """Data class for content optimization analysis""" | |
| original_content: str | |
| analysis_date: str | |
| clarity_score: float | |
| structure_score: float | |
| answerability_score: float | |
| keywords: List[str] | |
| optimized_content: Optional[str] | |
| improvements_made: List[str] | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert analysis to dictionary""" | |
| return asdict(self) | |
| class ResultExporter: | |
| """Main class for exporting analysis results and generating reports""" | |
| def __init__(self): | |
| self.export_formats = ['json', 'csv', 'html', 'pdf', 'xlsx'] | |
| self.supported_types = ['geo_analysis', 'content_optimization', 'qa_results', 'batch_analysis'] | |
| def export_geo_results(self, geo_results: List[Dict[str, Any]], | |
| website_url: str, format_type: str = 'json') -> Union[str, bytes, Dict[str, Any]]: | |
| """ | |
| Export GEO analysis results in specified format | |
| Args: | |
| geo_results (List[Dict]): List of GEO analysis results | |
| website_url (str): URL of analyzed website | |
| format_type (str): Export format ('json', 'csv', 'html', 'xlsx') | |
| Returns: | |
| Union[str, bytes, Dict]: Exported data in requested format | |
| """ | |
| try: | |
| # Prepare consolidated data | |
| export_data = self._prepare_geo_export_data(geo_results, website_url) | |
| if format_type.lower() == 'json': | |
| return self._export_geo_json(export_data) | |
| elif format_type.lower() == 'csv': | |
| return self._export_geo_csv(export_data) | |
| elif format_type.lower() == 'html': | |
| return self._export_geo_html(export_data) | |
| elif format_type.lower() == 'xlsx': | |
| return self._export_geo_excel(export_data) | |
| elif format_type.lower() == 'pdf': | |
| return self._export_geo_pdf(export_data) | |
| else: | |
| raise ValueError(f"Unsupported export format: {format_type}") | |
| except Exception as e: | |
| return {'error': f"Export failed: {str(e)}"} | |
| def export_enhancement_results(self, enhancement_result: Dict[str, Any], | |
| format_type: str = 'json') -> Union[str, bytes, Dict[str, Any]]: | |
| """ | |
| Export content enhancement results | |
| Args: | |
| enhancement_result (Dict): Content enhancement analysis result | |
| format_type (str): Export format | |
| Returns: | |
| Union[str, bytes, Dict]: Exported data | |
| """ | |
| try: | |
| # Prepare data for export | |
| export_data = self._prepare_enhancement_export_data(enhancement_result) | |
| if format_type.lower() == 'json': | |
| return json.dumps(export_data, indent=2, ensure_ascii=False) | |
| elif format_type.lower() == 'html': | |
| return self._export_enhancement_html(export_data) | |
| elif format_type.lower() == 'csv': | |
| return self._export_enhancement_csv(export_data) | |
| else: | |
| return json.dumps(export_data, indent=2, ensure_ascii=False) | |
| except Exception as e: | |
| return {'error': f"Enhancement export failed: {str(e)}"} | |
| def export_qa_results(self, qa_results: List[Dict[str, Any]], | |
| format_type: str = 'json') -> Union[str, bytes, Dict[str, Any]]: | |
| """ | |
| Export Q&A session results | |
| Args: | |
| qa_results (List[Dict]): List of Q&A interactions | |
| format_type (str): Export format | |
| Returns: | |
| Union[str, bytes, Dict]: Exported data | |
| """ | |
| try: | |
| export_data = { | |
| 'qa_session': { | |
| 'session_date': datetime.now().isoformat(), | |
| 'total_questions': len(qa_results), | |
| 'interactions': qa_results | |
| }, | |
| 'summary': { | |
| 'successful_answers': len([r for r in qa_results if not r.get('error')]), | |
| 'average_response_length': self._calculate_avg_response_length(qa_results), | |
| 'most_common_topics': self._extract_common_topics(qa_results) | |
| } | |
| } | |
| if format_type.lower() == 'json': | |
| return json.dumps(export_data, indent=2, ensure_ascii=False) | |
| elif format_type.lower() == 'html': | |
| return self._export_qa_html(export_data) | |
| elif format_type.lower() == 'csv': | |
| return self._export_qa_csv(export_data) | |
| else: | |
| return json.dumps(export_data, indent=2, ensure_ascii=False) | |
| except Exception as e: | |
| return {'error': f"Q&A export failed: {str(e)}"} | |
| def create_comprehensive_report(self, analysis_data: Dict[str, Any], | |
| report_type: str = 'full') -> Dict[str, Any]: | |
| """ | |
| Create comprehensive analysis report | |
| Args: | |
| analysis_data (Dict): Combined analysis data from multiple sources | |
| report_type (str): Type of report ('full', 'summary', 'executive') | |
| Returns: | |
| Dict: Comprehensive report data | |
| """ | |
| try: | |
| report = { | |
| 'report_metadata': { | |
| 'generated_at': datetime.now().isoformat(), | |
| 'report_type': report_type, | |
| 'generator': 'GEO SEO AI Optimizer', | |
| 'version': '1.0' | |
| } | |
| } | |
| if report_type == 'executive': | |
| report.update(self._create_executive_summary(analysis_data)) | |
| elif report_type == 'summary': | |
| report.update(self._create_summary_report(analysis_data)) | |
| else: # full report | |
| report.update(self._create_full_report(analysis_data)) | |
| return report | |
| except Exception as e: | |
| return {'error': f"Report creation failed: {str(e)}"} | |
| def export_batch_results(self, batch_results: List[Dict[str, Any]], | |
| batch_metadata: Dict[str, Any], | |
| format_type: str = 'xlsx') -> Union[str, bytes, Dict[str, Any]]: | |
| """ | |
| Export batch analysis results | |
| Args: | |
| batch_results (List[Dict]): List of batch analysis results | |
| batch_metadata (Dict): Metadata about the batch process | |
| format_type (str): Export format | |
| Returns: | |
| Union[str, bytes, Dict]: Exported batch data | |
| """ | |
| try: | |
| export_data = { | |
| 'batch_metadata': batch_metadata, | |
| 'batch_results': batch_results, | |
| 'batch_summary': self._create_batch_summary(batch_results), | |
| 'export_timestamp': datetime.now().isoformat() | |
| } | |
| if format_type.lower() == 'xlsx': | |
| return self._export_batch_excel(export_data) | |
| elif format_type.lower() == 'json': | |
| return json.dumps(export_data, indent=2, ensure_ascii=False) | |
| elif format_type.lower() == 'csv': | |
| return self._export_batch_csv(export_data) | |
| else: | |
| return json.dumps(export_data, indent=2, ensure_ascii=False) | |
| except Exception as e: | |
| return {'error': f"Batch export failed: {str(e)}"} | |
| def create_export_package(self, analysis_data: Dict[str, Any], | |
| package_name: str = "geo_analysis") -> bytes: | |
| """ | |
| Create a ZIP package with multiple export formats | |
| Args: | |
| analysis_data (Dict): Analysis data to package | |
| package_name (str): Name for the package | |
| Returns: | |
| bytes: ZIP file content | |
| """ | |
| try: | |
| # Create temporary directory | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| zip_path = os.path.join(temp_dir, f"{package_name}.zip") | |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zip_file: | |
| # Add JSON export | |
| json_data = json.dumps(analysis_data, indent=2, ensure_ascii=False) | |
| zip_file.writestr(f"{package_name}.json", json_data) | |
| # Add HTML report | |
| if 'geo_results' in analysis_data: | |
| html_data = self._export_geo_html(analysis_data) | |
| zip_file.writestr(f"{package_name}_report.html", html_data) | |
| # Add CSV data | |
| if 'geo_results' in analysis_data: | |
| csv_data = self._export_geo_csv(analysis_data) | |
| zip_file.writestr(f"{package_name}_data.csv", csv_data) | |
| # Add README | |
| readme_content = self._generate_package_readme(analysis_data) | |
| zip_file.writestr("README.txt", readme_content) | |
| # Read the ZIP file | |
| with open(zip_path, 'rb') as zip_file: | |
| return zip_file.read() | |
| except Exception as e: | |
| raise Exception(f"Package creation failed: {str(e)}") | |
| def _prepare_geo_export_data(self, geo_results: List[Dict[str, Any]], website_url: str) -> Dict[str, Any]: | |
| """Prepare GEO data for export""" | |
| try: | |
| # Calculate aggregate metrics | |
| valid_results = [r for r in geo_results if 'geo_scores' in r and not r.get('error')] | |
| if not valid_results: | |
| return { | |
| 'error': 'No valid GEO results to export', | |
| 'website_url': website_url, | |
| 'export_timestamp': datetime.now().isoformat() | |
| } | |
| # Aggregate scores | |
| all_scores = {} | |
| for result in valid_results: | |
| for metric, score in result.get('geo_scores', {}).items(): | |
| if metric not in all_scores: | |
| all_scores[metric] = [] | |
| all_scores[metric].append(score) | |
| avg_scores = {metric: sum(scores) / len(scores) for metric, scores in all_scores.items()} | |
| overall_avg = sum(avg_scores.values()) / len(avg_scores) if avg_scores else 0 | |
| # Collect recommendations | |
| all_recommendations = [] | |
| all_opportunities = [] | |
| for result in valid_results: | |
| all_recommendations.extend(result.get('recommendations', [])) | |
| all_opportunities.extend(result.get('optimization_opportunities', [])) | |
| # Remove duplicates | |
| unique_recommendations = list(set(all_recommendations)) | |
| return { | |
| 'website_analysis': { | |
| 'url': website_url, | |
| 'analysis_date': datetime.now().isoformat(), | |
| 'pages_analyzed': len(valid_results), | |
| 'overall_geo_score': round(overall_avg, 2) | |
| }, | |
| 'aggregate_scores': avg_scores, | |
| 'individual_page_results': valid_results, | |
| 'recommendations': unique_recommendations[:10], # Top 10 | |
| 'optimization_opportunities': all_opportunities, | |
| 'performance_insights': self._generate_performance_insights(avg_scores, overall_avg), | |
| 'export_metadata': { | |
| 'exported_by': 'GEO SEO AI Optimizer', | |
| 'export_timestamp': datetime.now().isoformat(), | |
| 'data_format': 'GEO Analysis Results v1.0' | |
| } | |
| } | |
| except Exception as e: | |
| return {'error': f"Data preparation failed: {str(e)}"} | |
| def _prepare_enhancement_export_data(self, enhancement_result: Dict[str, Any]) -> Dict[str, Any]: | |
| """Prepare content enhancement data for export""" | |
| try: | |
| scores = enhancement_result.get('scores', {}) | |
| return { | |
| 'content_analysis': { | |
| 'analysis_date': datetime.now().isoformat(), | |
| 'original_content_length': enhancement_result.get('original_length', 0), | |
| 'original_word_count': enhancement_result.get('original_word_count', 0), | |
| 'analysis_type': enhancement_result.get('optimization_type', 'standard') | |
| }, | |
| 'performance_scores': { | |
| 'clarity': scores.get('clarity', 0), | |
| 'structure': scores.get('structuredness', 0), | |
| 'answerability': scores.get('answerability', 0), | |
| 'overall_average': sum(scores.values()) / len(scores) if scores else 0 | |
| }, | |
| 'optimization_results': { | |
| 'keywords_identified': enhancement_result.get('keywords', []), | |
| 'optimized_content': enhancement_result.get('optimized_text', ''), | |
| 'improvements_made': enhancement_result.get('optimization_suggestions', []), | |
| 'analyze_only': enhancement_result.get('analyze_only', False) | |
| }, | |
| 'export_metadata': { | |
| 'exported_by': 'GEO SEO AI Optimizer', | |
| 'export_timestamp': datetime.now().isoformat(), | |
| 'data_format': 'Content Enhancement Results v1.0' | |
| } | |
| } | |
| except Exception as e: | |
| return {'error': f"Enhancement data preparation failed: {str(e)}"} | |
| def _export_geo_json(self, data: Dict[str, Any]) -> str: | |
| """Export GEO data as JSON""" | |
| return json.dumps(data, indent=2, ensure_ascii=False) | |
| def _export_geo_csv(self, data: Dict[str, Any]) -> str: | |
| """Export GEO data as CSV""" | |
| try: | |
| output = io.StringIO() | |
| # Write aggregate scores | |
| writer = csv.writer(output) | |
| writer.writerow(['GEO Analysis Results']) | |
| writer.writerow(['Website:', data.get('website_analysis', {}).get('url', 'Unknown')]) | |
| writer.writerow(['Analysis Date:', data.get('website_analysis', {}).get('analysis_date', 'Unknown')]) | |
| writer.writerow(['Overall Score:', data.get('website_analysis', {}).get('overall_geo_score', 0)]) | |
| writer.writerow([]) | |
| # Write aggregate scores | |
| writer.writerow(['Metric', 'Score']) | |
| for metric, score in data.get('aggregate_scores', {}).items(): | |
| writer.writerow([metric.replace('_', ' ').title(), round(score, 2)]) | |
| writer.writerow([]) | |
| writer.writerow(['Recommendations']) | |
| for i, rec in enumerate(data.get('recommendations', []), 1): | |
| writer.writerow([f"{i}.", rec]) | |
| # Individual page results | |
| if data.get('individual_page_results'): | |
| writer.writerow([]) | |
| writer.writerow(['Individual Page Results']) | |
| # Header for page results | |
| first_result = data['individual_page_results'][0] | |
| if 'geo_scores' in first_result: | |
| headers = ['Page Index', 'Page URL', 'Page Title'] + list(first_result['geo_scores'].keys()) | |
| writer.writerow(headers) | |
| for i, result in enumerate(data['individual_page_results']): | |
| page_data = result.get('page_data', {}) | |
| scores = result.get('geo_scores', {}) | |
| row = [ | |
| i + 1, | |
| page_data.get('url', 'Unknown'), | |
| page_data.get('title', 'Unknown') | |
| ] + [round(scores.get(metric, 0), 2) for metric in headers[3:]] | |
| writer.writerow(row) | |
| return output.getvalue() | |
| except Exception as e: | |
| return f"CSV export error: {str(e)}" | |
| def _export_geo_html(self, data: Dict[str, Any]) -> str: | |
| """Export GEO data as HTML report""" | |
| try: | |
| website_info = data.get('website_analysis', {}) | |
| scores = data.get('aggregate_scores', {}) | |
| recommendations = data.get('recommendations', []) | |
| html_content = f""" | |
| <!DOCTYPE html> | |
| <html lang="en"> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| <title>GEO Analysis Report - {website_info.get('url', 'Website')}</title> | |
| <style> | |
| body {{ | |
| font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; | |
| line-height: 1.6; | |
| color: #333; | |
| max-width: 1200px; | |
| margin: 0 auto; | |
| padding: 20px; | |
| background-color: #f5f5f5; | |
| }} | |
| .header {{ | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| color: white; | |
| padding: 30px; | |
| border-radius: 10px; | |
| margin-bottom: 30px; | |
| text-align: center; | |
| }} | |
| .header h1 {{ | |
| margin: 0; | |
| font-size: 2.5em; | |
| }} | |
| .summary-cards {{ | |
| display: grid; | |
| grid-template-columns: repeat(auto-fit, minmax(250px, 1fr)); | |
| gap: 20px; | |
| margin-bottom: 30px; | |
| }} | |
| .card {{ | |
| background: white; | |
| padding: 20px; | |
| border-radius: 10px; | |
| box-shadow: 0 4px 6px rgba(0,0,0,0.1); | |
| text-align: center; | |
| }} | |
| .card h3 {{ | |
| margin-top: 0; | |
| color: #667eea; | |
| }} | |
| .score {{ | |
| font-size: 2em; | |
| font-weight: bold; | |
| color: #333; | |
| }} | |
| .scores-grid {{ | |
| display: grid; | |
| grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); | |
| gap: 20px; | |
| margin-bottom: 30px; | |
| }} | |
| .score-item {{ | |
| background: white; | |
| padding: 15px; | |
| border-radius: 8px; | |
| box-shadow: 0 2px 4px rgba(0,0,0,0.1); | |
| display: flex; | |
| justify-content: space-between; | |
| align-items: center; | |
| }} | |
| .score-bar {{ | |
| width: 100px; | |
| height: 10px; | |
| background: #e0e0e0; | |
| border-radius: 5px; | |
| overflow: hidden; | |
| }} | |
| .score-fill {{ | |
| height: 100%; | |
| background: linear-gradient(90deg, #ff6b6b, #ffa500, #4ecdc4); | |
| transition: width 0.3s ease; | |
| }} | |
| .recommendations {{ | |
| background: white; | |
| padding: 30px; | |
| border-radius: 10px; | |
| box-shadow: 0 4px 6px rgba(0,0,0,0.1); | |
| margin-bottom: 30px; | |
| }} | |
| .recommendations h2 {{ | |
| color: #667eea; | |
| border-bottom: 2px solid #667eea; | |
| padding-bottom: 10px; | |
| }} | |
| .rec-item {{ | |
| padding: 10px 0; | |
| border-bottom: 1px solid #eee; | |
| }} | |
| .footer {{ | |
| text-align: center; | |
| color: #666; | |
| margin-top: 40px; | |
| padding-top: 20px; | |
| border-top: 1px solid #ddd; | |
| }} | |
| </style> | |
| </head> | |
| <body> | |
| <div class="header"> | |
| <h1>🚀 GEO Analysis Report</h1> | |
| <p>Generative Engine Optimization Performance Analysis</p> | |
| <p><strong>Website:</strong> {website_info.get('url', 'Not specified')}</p> | |
| <p><strong>Analysis Date:</strong> {website_info.get('analysis_date', 'Not specified')}</p> | |
| </div> | |
| <div class="summary-cards"> | |
| <div class="card"> | |
| <h3>Overall GEO Score</h3> | |
| <div class="score">{website_info.get('overall_geo_score', 0)}/10</div> | |
| </div> | |
| <div class="card"> | |
| <h3>Pages Analyzed</h3> | |
| <div class="score">{website_info.get('pages_analyzed', 0)}</div> | |
| </div> | |
| <div class="card"> | |
| <h3>Recommendations</h3> | |
| <div class="score">{len(recommendations)}</div> | |
| </div> | |
| </div> | |
| <h2>📊 Detailed GEO Metrics</h2> | |
| <div class="scores-grid"> | |
| """ | |
| # Add individual scores | |
| for metric, score in scores.items(): | |
| metric_display = metric.replace('_', ' ').title() | |
| score_percentage = min(score * 10, 100) # Convert to percentage | |
| html_content += f""" | |
| <div class="score-item"> | |
| <div> | |
| <strong>{metric_display}</strong><br> | |
| <span style="color: #666;">{score:.1f}/10</span> | |
| </div> | |
| <div class="score-bar"> | |
| <div class="score-fill" style="width: {score_percentage}%;"></div> | |
| </div> | |
| </div> | |
| """ | |
| html_content += """ | |
| </div> | |
| <div class="recommendations"> | |
| <h2>💡 Optimization Recommendations</h2> | |
| """ | |
| # Add recommendations | |
| for i, rec in enumerate(recommendations, 1): | |
| html_content += f'<div class="rec-item"><strong>{i}.</strong> {rec}</div>' | |
| html_content += f""" | |
| </div> | |
| <div class="footer"> | |
| <p>Generated by GEO SEO AI Optimizer | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p> | |
| <p>This report provides AI-first SEO optimization insights for better generative engine performance.</p> | |
| </div> | |
| </body> | |
| </html> | |
| """ | |
| return html_content | |
| except Exception as e: | |
| return f"<html><body><h1>HTML Export Error</h1><p>{str(e)}</p></body></html>" | |
| def _export_geo_excel(self, data: Dict[str, Any]) -> bytes: | |
| """Export GEO data as Excel file""" | |
| try: | |
| output = io.BytesIO() | |
| with pd.ExcelWriter(output, engine='openpyxl') as writer: | |
| # Summary sheet | |
| summary_data = { | |
| 'Metric': ['Website URL', 'Analysis Date', 'Pages Analyzed', 'Overall Score'], | |
| 'Value': [ | |
| data.get('website_analysis', {}).get('url', 'Unknown'), | |
| data.get('website_analysis', {}).get('analysis_date', 'Unknown'), | |
| data.get('website_analysis', {}).get('pages_analyzed', 0), | |
| data.get('website_analysis', {}).get('overall_geo_score', 0) | |
| ] | |
| } | |
| pd.DataFrame(summary_data).to_excel(writer, sheet_name='Summary', index=False) | |
| # Scores sheet | |
| scores_data = [] | |
| for metric, score in data.get('aggregate_scores', {}).items(): | |
| scores_data.append({ | |
| 'Metric': metric.replace('_', ' ').title(), | |
| 'Score': round(score, 2), | |
| 'Performance': self._get_performance_level(score) | |
| }) | |
| pd.DataFrame(scores_data).to_excel(writer, sheet_name='GEO Scores', index=False) | |
| # Recommendations sheet | |
| rec_data = [] | |
| for i, rec in enumerate(data.get('recommendations', []), 1): | |
| rec_data.append({ | |
| 'Priority': i, | |
| 'Recommendation': rec, | |
| 'Category': self._categorize_recommendation(rec) | |
| }) | |
| if rec_data: | |
| pd.DataFrame(rec_data).to_excel(writer, sheet_name='Recommendations', index=False) | |
| # Individual pages sheet | |
| if data.get('individual_page_results'): | |
| pages_data = [] | |
| for i, result in enumerate(data['individual_page_results']): | |
| page_data = result.get('page_data', {}) | |
| scores = result.get('geo_scores', {}) | |
| page_row = { | |
| 'Page_Index': i + 1, | |
| 'URL': page_data.get('url', 'Unknown'), | |
| 'Title': page_data.get('title', 'Unknown'), | |
| 'Word_Count': page_data.get('word_count', 0) | |
| } | |
| # Add all GEO scores | |
| for metric, score in scores.items(): | |
| page_row[metric.replace('_', ' ').title()] = round(score, 2) | |
| pages_data.append(page_row) | |
| pd.DataFrame(pages_data).to_excel(writer, sheet_name='Individual Pages', index=False) | |
| output.seek(0) | |
| return output.getvalue() | |
| except Exception as e: | |
| # Return error as text file if Excel creation fails | |
| error_content = f"Excel export failed: {str(e)}\n\nData:\n{json.dumps(data, indent=2)}" | |
| return error_content.encode('utf-8') | |
| def _export_enhancement_html(self, data: Dict[str, Any]) -> str: | |
| """Export content enhancement results as HTML""" | |
| try: | |
| analysis = data.get('content_analysis', {}) | |
| scores = data.get('performance_scores', {}) | |
| optimization = data.get('optimization_results', {}) | |
| html_content = f""" | |
| <!DOCTYPE html> | |
| <html lang="en"> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| <title>Content Enhancement Report</title> | |
| <style> | |
| body {{ | |
| font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; | |
| line-height: 1.6; | |
| color: #333; | |
| max-width: 1000px; | |
| margin: 0 auto; | |
| padding: 20px; | |
| background-color: #f8f9fa; | |
| }} | |
| .header {{ | |
| background: linear-gradient(135deg, #28a745 0%, #20c997 100%); | |
| color: white; | |
| padding: 30px; | |
| border-radius: 10px; | |
| margin-bottom: 30px; | |
| text-align: center; | |
| }} | |
| .scores {{ | |
| display: grid; | |
| grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); | |
| gap: 20px; | |
| margin-bottom: 30px; | |
| }} | |
| .score-card {{ | |
| background: white; | |
| padding: 20px; | |
| border-radius: 10px; | |
| box-shadow: 0 4px 6px rgba(0,0,0,0.1); | |
| text-align: center; | |
| }} | |
| .content-section {{ | |
| background: white; | |
| padding: 30px; | |
| border-radius: 10px; | |
| box-shadow: 0 4px 6px rgba(0,0,0,0.1); | |
| margin-bottom: 20px; | |
| }} | |
| .keywords {{ | |
| display: flex; | |
| flex-wrap: wrap; | |
| gap: 10px; | |
| margin-top: 15px; | |
| }} | |
| .keyword {{ | |
| background: #e9ecef; | |
| padding: 5px 10px; | |
| border-radius: 20px; | |
| font-size: 0.9em; | |
| }} | |
| .optimized-content {{ | |
| background: #f8f9fa; | |
| padding: 20px; | |
| border-left: 4px solid #28a745; | |
| border-radius: 5px; | |
| font-style: italic; | |
| }} | |
| </style> | |
| </head> | |
| <body> | |
| <div class="header"> | |
| <h1>🔧 Content Enhancement Report</h1> | |
| <p>AI-Optimized Content Analysis Results</p> | |
| <p><strong>Analysis Date:</strong> {analysis.get('analysis_date', 'Unknown')}</p> | |
| </div> | |
| <div class="scores"> | |
| <div class="score-card"> | |
| <h3>Clarity Score</h3> | |
| <div style="font-size: 2em; font-weight: bold; color: #28a745;"> | |
| {scores.get('clarity', 0):.1f}/10 | |
| </div> | |
| </div> | |
| <div class="score-card"> | |
| <h3>Structure Score</h3> | |
| <div style="font-size: 2em; font-weight: bold; color: #28a745;"> | |
| {scores.get('structure', 0):.1f}/10 | |
| </div> | |
| </div> | |
| <div class="score-card"> | |
| <h3>Answerability Score</h3> | |
| <div style="font-size: 2em; font-weight: bold; color: #28a745;"> | |
| {scores.get('answerability', 0):.1f}/10 | |
| </div> | |
| </div> | |
| <div class="score-card"> | |
| <h3>Overall Average</h3> | |
| <div style="font-size: 2em; font-weight: bold; color: #28a745;"> | |
| {scores.get('overall_average', 0):.1f}/10 | |
| </div> | |
| </div> | |
| </div> | |
| <div class="content-section"> | |
| <h2>🔑 Identified Keywords</h2> | |
| <div class="keywords"> | |
| {' '.join([f'<span class="keyword">{keyword}</span>' for keyword in optimization.get('keywords_identified', [])])} | |
| </div> | |
| </div> | |
| {'<div class="content-section"><h2>✨ Optimized Content</h2><div class="optimized-content">' + optimization.get('optimized_content', '') + '</div></div>' if optimization.get('optimized_content') and not optimization.get('analyze_only') else ''} | |
| <div class="content-section"> | |
| <h2>💡 Improvements Made</h2> | |
| <ul> | |
| {' '.join([f'<li>{improvement}</li>' for improvement in optimization.get('improvements_made', [])])} | |
| </ul> | |
| </div> | |
| <div style="text-align: center; color: #666; margin-top: 40px; padding-top: 20px; border-top: 1px solid #ddd;"> | |
| <p>Generated by GEO SEO AI Optimizer | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p> | |
| </div> | |
| </body> | |
| </html> | |
| """ | |
| return html_content | |
| except Exception as e: | |
| return f"<html><body><h1>Enhancement HTML Export Error</h1><p>{str(e)}</p></body></html>" | |
| def _export_enhancement_csv(self, data: Dict[str, Any]) -> str: | |
| """Export content enhancement results as CSV""" | |
| try: | |
| output = io.StringIO() | |
| writer = csv.writer(output) | |
| # Header information | |
| analysis = data.get('content_analysis', {}) | |
| scores = data.get('performance_scores', {}) | |
| optimization = data.get('optimization_results', {}) | |
| writer.writerow(['Content Enhancement Analysis Report']) | |
| writer.writerow(['Analysis Date:', analysis.get('analysis_date', 'Unknown')]) | |
| writer.writerow(['Original Content Length:', analysis.get('original_content_length', 0)]) | |
| writer.writerow(['Original Word Count:', analysis.get('original_word_count', 0)]) | |
| writer.writerow([]) | |
| # Performance scores | |
| writer.writerow(['Performance Scores']) | |
| writer.writerow(['Metric', 'Score']) | |
| for metric, score in scores.items(): | |
| writer.writerow([metric.replace('_', ' ').title(), round(score, 2)]) | |
| writer.writerow([]) | |
| writer.writerow(['Keywords Identified']) | |
| for keyword in optimization.get('keywords_identified', []): | |
| writer.writerow([keyword]) | |
| writer.writerow([]) | |
| writer.writerow(['Improvements Made']) | |
| for improvement in optimization.get('improvements_made', []): | |
| writer.writerow([improvement]) | |
| return output.getvalue() | |
| except Exception as e: | |
| return f"Enhancement CSV export error: {str(e)}" | |
| def _export_qa_html(self, data: Dict[str, Any]) -> str: | |
| """Export Q&A results as HTML""" | |
| try: | |
| session = data.get('qa_session', {}) | |
| summary = data.get('summary', {}) | |
| interactions = session.get('interactions', []) | |
| html_content = f""" | |
| <!DOCTYPE html> | |
| <html lang="en"> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| <title>Q&A Session Report</title> | |
| <style> | |
| body {{ | |
| font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; | |
| line-height: 1.6; | |
| color: #333; | |
| max-width: 1000px; | |
| margin: 0 auto; | |
| padding: 20px; | |
| background-color: #f8f9fa; | |
| }} | |
| .header {{ | |
| background: linear-gradient(135deg, #6f42c1 0%, #e83e8c 100%); | |
| color: white; | |
| padding: 30px; | |
| border-radius: 10px; | |
| margin-bottom: 30px; | |
| text-align: center; | |
| }} | |
| .summary {{ | |
| display: grid; | |
| grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); | |
| gap: 20px; | |
| margin-bottom: 30px; | |
| }} | |
| .summary-card {{ | |
| background: white; | |
| padding: 20px; | |
| border-radius: 10px; | |
| box-shadow: 0 4px 6px rgba(0,0,0,0.1); | |
| text-align: center; | |
| }} | |
| .qa-item {{ | |
| background: white; | |
| padding: 20px; | |
| border-radius: 10px; | |
| box-shadow: 0 4px 6px rgba(0,0,0,0.1); | |
| margin-bottom: 20px; | |
| }} | |
| .question {{ | |
| background: #e9ecef; | |
| padding: 15px; | |
| border-left: 4px solid #6f42c1; | |
| border-radius: 5px; | |
| margin-bottom: 15px; | |
| }} | |
| .answer {{ | |
| padding: 15px; | |
| border-left: 4px solid #28a745; | |
| border-radius: 5px; | |
| background: #f8f9fa; | |
| }} | |
| .sources {{ | |
| margin-top: 15px; | |
| padding: 10px; | |
| background: #fff3cd; | |
| border-radius: 5px; | |
| font-size: 0.9em; | |
| }} | |
| </style> | |
| </head> | |
| <body> | |
| <div class="header"> | |
| <h1>💬 Q&A Session Report</h1> | |
| <p>Document Question & Answer Analysis</p> | |
| <p><strong>Session Date:</strong> {session.get('session_date', 'Unknown')}</p> | |
| </div> | |
| <div class="summary"> | |
| <div class="summary-card"> | |
| <h3>Total Questions</h3> | |
| <div style="font-size: 2em; font-weight: bold; color: #6f42c1;"> | |
| {session.get('total_questions', 0)} | |
| </div> | |
| </div> | |
| <div class="summary-card"> | |
| <h3>Successful Answers</h3> | |
| <div style="font-size: 2em; font-weight: bold; color: #28a745;"> | |
| {summary.get('successful_answers', 0)} | |
| </div> | |
| </div> | |
| <div class="summary-card"> | |
| <h3>Avg Response Length</h3> | |
| <div style="font-size: 2em; font-weight: bold; color: #17a2b8;"> | |
| {summary.get('average_response_length', 0):.0f} | |
| </div> | |
| </div> | |
| </div> | |
| <h2>📝 Q&A Interactions</h2> | |
| """ | |
| # Add individual Q&A items | |
| for i, interaction in enumerate(interactions, 1): | |
| question = interaction.get('query', 'No question') | |
| answer = interaction.get('result', interaction.get('answer', 'No answer')) | |
| sources = interaction.get('sources', []) | |
| html_content += f""" | |
| <div class="qa-item"> | |
| <h3>Question {i}</h3> | |
| <div class="question"> | |
| <strong>Q:</strong> {question} | |
| </div> | |
| <div class="answer"> | |
| <strong>A:</strong> {answer} | |
| </div> | |
| """ | |
| if sources: | |
| html_content += '<div class="sources"><strong>Sources:</strong><ul>' | |
| for source in sources[:3]: # Limit to first 3 sources | |
| content_preview = source.get('content', '')[:200] + '...' if len(source.get('content', '')) > 200 else source.get('content', '') | |
| html_content += f'<li>{content_preview}</li>' | |
| html_content += '</ul></div>' | |
| html_content += '</div>' | |
| html_content += f""" | |
| <div style="text-align: center; color: #666; margin-top: 40px; padding-top: 20px; border-top: 1px solid #ddd;"> | |
| <p>Generated by GEO SEO AI Optimizer | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p> | |
| </div> | |
| </body> | |
| </html> | |
| """ | |
| return html_content | |
| except Exception as e: | |
| return f"<html><body><h1>Q&A HTML Export Error</h1><p>{str(e)}</p></body></html>" | |
| def _export_qa_csv(self, data: Dict[str, Any]) -> str: | |
| """Export Q&A results as CSV""" | |
| try: | |
| output = io.StringIO() | |
| writer = csv.writer(output) | |
| session = data.get('qa_session', {}) | |
| summary = data.get('summary', {}) | |
| interactions = session.get('interactions', []) | |
| # Header | |
| writer.writerow(['Q&A Session Report']) | |
| writer.writerow(['Session Date:', session.get('session_date', 'Unknown')]) | |
| writer.writerow(['Total Questions:', session.get('total_questions', 0)]) | |
| writer.writerow(['Successful Answers:', summary.get('successful_answers', 0)]) | |
| writer.writerow([]) | |
| # Q&A data | |
| writer.writerow(['Question Index', 'Question', 'Answer', 'Has Sources', 'Answer Length']) | |
| for i, interaction in enumerate(interactions, 1): | |
| question = interaction.get('query', 'No question') | |
| answer = interaction.get('result', interaction.get('answer', 'No answer')) | |
| has_sources = 'Yes' if interaction.get('sources') else 'No' | |
| answer_length = len(answer) if answer else 0 | |
| writer.writerow([i, question, answer, has_sources, answer_length]) | |
| return output.getvalue() | |
| except Exception as e: | |
| return f"Q&A CSV export error: {str(e)}" | |
| def _export_batch_excel(self, data: Dict[str, Any]) -> bytes: | |
| """Export batch results as Excel file""" | |
| try: | |
| output = io.BytesIO() | |
| with pd.ExcelWriter(output, engine='openpyxl') as writer: | |
| # Batch metadata sheet | |
| metadata = data.get('batch_metadata', {}) | |
| metadata_df = pd.DataFrame([ | |
| {'Property': k, 'Value': v} for k, v in metadata.items() | |
| ]) | |
| metadata_df.to_excel(writer, sheet_name='Batch Metadata', index=False) | |
| # Batch summary sheet | |
| summary = data.get('batch_summary', {}) | |
| summary_df = pd.DataFrame([ | |
| {'Metric': k, 'Value': v} for k, v in summary.items() | |
| ]) | |
| summary_df.to_excel(writer, sheet_name='Batch Summary', index=False) | |
| # Individual results sheet | |
| results = data.get('batch_results', []) | |
| if results: | |
| # Flatten results for tabular format | |
| flattened_results = [] | |
| for i, result in enumerate(results): | |
| flat_result = {'Batch_Index': i} | |
| self._flatten_dict(result, flat_result) | |
| flattened_results.append(flat_result) | |
| results_df = pd.DataFrame(flattened_results) | |
| results_df.to_excel(writer, sheet_name='Batch Results', index=False) | |
| output.seek(0) | |
| return output.getvalue() | |
| except Exception as e: | |
| error_content = f"Batch Excel export failed: {str(e)}\n\nData:\n{json.dumps(data, indent=2)}" | |
| return error_content.encode('utf-8') | |
| def _export_batch_csv(self, data: Dict[str, Any]) -> str: | |
| """Export batch results as CSV""" | |
| try: | |
| output = io.StringIO() | |
| writer = csv.writer(output) | |
| # Batch metadata | |
| metadata = data.get('batch_metadata', {}) | |
| writer.writerow(['Batch Analysis Results']) | |
| writer.writerow(['Export Timestamp:', data.get('export_timestamp', 'Unknown')]) | |
| writer.writerow([]) | |
| writer.writerow(['Batch Metadata']) | |
| for key, value in metadata.items(): | |
| writer.writerow([key, value]) | |
| writer.writerow([]) | |
| # Batch summary | |
| summary = data.get('batch_summary', {}) | |
| writer.writerow(['Batch Summary']) | |
| for key, value in summary.items(): | |
| writer.writerow([key, value]) | |
| writer.writerow([]) | |
| # Individual results (simplified) | |
| results = data.get('batch_results', []) | |
| if results: | |
| writer.writerow(['Individual Results']) | |
| writer.writerow(['Index', 'Status', 'Summary']) | |
| for i, result in enumerate(results): | |
| status = 'Success' if not result.get('error') else 'Error' | |
| summary_text = str(result)[:100] + '...' if len(str(result)) > 100 else str(result) | |
| writer.writerow([i, status, summary_text]) | |
| return output.getvalue() | |
| except Exception as e: | |
| return f"Batch CSV export error: {str(e)}" | |
| def _export_geo_pdf(self, data: Dict[str, Any]) -> bytes: | |
| """Export GEO data as PDF (placeholder - would need reportlab)""" | |
| try: | |
| # For now, return HTML content as bytes | |
| # In a full implementation, you'd use reportlab or weasyprint | |
| html_content = self._export_geo_html(data) | |
| return html_content.encode('utf-8') | |
| except Exception as e: | |
| error_content = f"PDF export not fully implemented. Error: {str(e)}" | |
| return error_content.encode('utf-8') | |
| def _create_executive_summary(self, analysis_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Create executive summary report""" | |
| try: | |
| geo_results = analysis_data.get('geo_results', []) | |
| enhancement_results = analysis_data.get('enhancement_results', {}) | |
| qa_results = analysis_data.get('qa_results', []) | |
| # Calculate key metrics | |
| overall_performance = self._calculate_overall_performance(analysis_data) | |
| return { | |
| 'executive_summary': { | |
| 'overall_performance_score': overall_performance, | |
| 'key_findings': self._extract_key_findings(analysis_data), | |
| 'priority_recommendations': self._get_priority_recommendations(analysis_data), | |
| 'roi_potential': self._estimate_roi_potential(overall_performance), | |
| 'implementation_timeline': self._suggest_implementation_timeline(analysis_data), | |
| 'resource_requirements': self._estimate_resource_requirements(analysis_data) | |
| } | |
| } | |
| except Exception as e: | |
| return {'error': f"Executive summary creation failed: {str(e)}"} | |
| def _create_summary_report(self, analysis_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Create summary report""" | |
| try: | |
| return { | |
| 'summary_report': { | |
| 'analysis_overview': self._create_analysis_overview(analysis_data), | |
| 'performance_metrics': self._summarize_performance_metrics(analysis_data), | |
| 'improvement_opportunities': self._identify_improvement_opportunities(analysis_data), | |
| 'competitive_position': self._assess_competitive_position(analysis_data), | |
| 'next_steps': self._recommend_next_steps(analysis_data) | |
| } | |
| } | |
| except Exception as e: | |
| return {'error': f"Summary report creation failed: {str(e)}"} | |
| def _create_full_report(self, analysis_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Create full detailed report""" | |
| try: | |
| return { | |
| 'full_report': { | |
| 'executive_summary': self._create_executive_summary(analysis_data).get('executive_summary', {}), | |
| 'detailed_analysis': { | |
| 'geo_analysis_details': analysis_data.get('geo_results', []), | |
| 'content_optimization_details': analysis_data.get('enhancement_results', {}), | |
| 'qa_performance_details': analysis_data.get('qa_results', []) | |
| }, | |
| 'methodology': self._document_methodology(), | |
| 'data_sources': self._document_data_sources(analysis_data), | |
| 'limitations': self._document_limitations(), | |
| 'appendices': self._create_appendices(analysis_data) | |
| } | |
| } | |
| except Exception as e: | |
| return {'error': f"Full report creation failed: {str(e)}"} | |
| def _create_batch_summary(self, batch_results: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Create summary of batch processing results""" | |
| try: | |
| total_items = len(batch_results) | |
| successful_items = len([r for r in batch_results if not r.get('error')]) | |
| failed_items = total_items - successful_items | |
| return { | |
| 'total_items': total_items, | |
| 'successful_items': successful_items, | |
| 'failed_items': failed_items, | |
| 'success_rate': (successful_items / total_items * 100) if total_items > 0 else 0, | |
| 'processing_status': 'Completed', | |
| 'average_processing_time': self._calculate_avg_processing_time(batch_results), | |
| 'common_errors': self._identify_common_errors(batch_results) | |
| } | |
| except Exception as e: | |
| return {'error': f"Batch summary creation failed: {str(e)}"} | |
| def _generate_performance_insights(self, scores: Dict[str, float], overall_avg: float) -> List[str]: | |
| """Generate performance insights from scores""" | |
| insights = [] | |
| try: | |
| # Overall performance insight | |
| if overall_avg >= 8.0: | |
| insights.append("Excellent overall GEO performance - content is well-optimized for AI search engines") | |
| elif overall_avg >= 6.0: | |
| insights.append("Good GEO performance with room for improvement in specific areas") | |
| elif overall_avg >= 4.0: | |
| insights.append("Moderate GEO performance - significant optimization opportunities exist") | |
| else: | |
| insights.append("Low GEO performance - comprehensive optimization needed") | |
| # Specific metric insights | |
| for metric, score in scores.items(): | |
| if score < 5.0: | |
| metric_name = metric.replace('_', ' ').title() | |
| insights.append(f"Low {metric_name} score ({score:.1f}) needs immediate attention") | |
| elif score >= 8.5: | |
| metric_name = metric.replace('_', ' ').title() | |
| insights.append(f"Excellent {metric_name} score ({score:.1f}) - maintain current approach") | |
| return insights[:5] # Return top 5 insights | |
| except Exception: | |
| return ["Unable to generate performance insights"] | |
| def _generate_package_readme(self, analysis_data: Dict[str, Any]) -> str: | |
| """Generate README file for export package""" | |
| try: | |
| readme_content = f""" | |
| GEO SEO AI Optimizer - Analysis Package | |
| ====================================== | |
| Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
| This package contains the complete analysis results from the GEO SEO AI Optimizer tool. | |
| Files Included: | |
| - JSON file: Complete raw data in JSON format | |
| - HTML file: Visual report for web viewing | |
| - CSV file: Tabular data for spreadsheet analysis | |
| - README.txt: This file | |
| About GEO (Generative Engine Optimization): | |
| GEO is the practice of optimizing content for AI-powered search engines and | |
| language models. Unlike traditional SEO, GEO focuses on: | |
| - AI search visibility | |
| - Query intent matching | |
| - Conversational readiness | |
| - Citation worthiness | |
| - Semantic richness | |
| - Context completeness | |
| How to Use These Files: | |
| 1. Open the HTML file in a web browser for a visual report | |
| 2. Import the CSV file into Excel or Google Sheets for analysis | |
| 3. Use the JSON file for programmatic processing or integration | |
| For more information about GEO optimization, visit the tool documentation. | |
| Generated by: GEO SEO AI Optimizer v1.0 | |
| """ | |
| return readme_content | |
| except Exception as e: | |
| return f"README generation failed: {str(e)}" | |
| # Helper methods for data processing and analysis | |
| def _get_performance_level(self, score: float) -> str: | |
| """Get performance level description for a score""" | |
| if score >= 8.0: | |
| return "Excellent" | |
| elif score >= 6.0: | |
| return "Good" | |
| elif score >= 4.0: | |
| return "Fair" | |
| else: | |
| return "Needs Improvement" | |
| def _categorize_recommendation(self, recommendation: str) -> str: | |
| """Categorize a recommendation based on content""" | |
| rec_lower = recommendation.lower() | |
| if any(word in rec_lower for word in ['structure', 'heading', 'format']): | |
| return "Content Structure" | |
| elif any(word in rec_lower for word in ['keyword', 'semantic', 'topic']): | |
| return "SEO & Keywords" | |
| elif any(word in rec_lower for word in ['clarity', 'readability', 'language']): | |
| return "Content Quality" | |
| elif any(word in rec_lower for word in ['technical', 'schema', 'markup']): | |
| return "Technical SEO" | |
| else: | |
| return "General" | |
| def _calculate_avg_response_length(self, qa_results: List[Dict[str, Any]]) -> float: | |
| """Calculate average response length for Q&A results""" | |
| try: | |
| response_lengths = [] | |
| for result in qa_results: | |
| answer = result.get('result', result.get('answer', '')) | |
| if answer and not result.get('error'): | |
| response_lengths.append(len(answer)) | |
| return sum(response_lengths) / len(response_lengths) if response_lengths else 0 | |
| except Exception: | |
| return 0 | |
| def _extract_common_topics(self, qa_results: List[Dict[str, Any]]) -> List[str]: | |
| """Extract common topics from Q&A results""" | |
| try: | |
| # Simple topic extraction based on question keywords | |
| topics = {} | |
| for result in qa_results: | |
| question = result.get('query', result.get('question', '')) | |
| if question: | |
| words = question.lower().split() | |
| for word in words: | |
| if len(word) > 4: # Focus on longer words | |
| topics[word] = topics.get(word, 0) + 1 | |
| # Return top 5 most common topics | |
| sorted_topics = sorted(topics.items(), key=lambda x: x[1], reverse=True) | |
| return [topic for topic, count in sorted_topics[:5]] | |
| except Exception: | |
| return [] | |
| def _flatten_dict(self, d: Dict[str, Any], parent_dict: Dict[str, Any], parent_key: str = '') -> None: | |
| """Flatten nested dictionary for tabular export""" | |
| try: | |
| for key, value in d.items(): | |
| new_key = f"{parent_key}_{key}" if parent_key else key | |
| if isinstance(value, dict): | |
| self._flatten_dict(value, parent_dict, new_key) | |
| elif isinstance(value, list): | |
| parent_dict[new_key] = json.dumps(value) # Convert lists to JSON strings | |
| else: | |
| parent_dict[new_key] = value | |
| except Exception: | |
| pass # Skip problematic keys | |
| def _calculate_overall_performance(self, analysis_data: Dict[str, Any]) -> float: | |
| """Calculate overall performance score across all analyses""" | |
| try: | |
| scores = [] | |
| # GEO scores | |
| geo_results = analysis_data.get('geo_results', []) | |
| for result in geo_results: | |
| if 'geo_scores' in result: | |
| geo_score_values = list(result['geo_scores'].values()) | |
| if geo_score_values: | |
| scores.append(sum(geo_score_values) / len(geo_score_values)) | |
| # Enhancement scores | |
| enhancement = analysis_data.get('enhancement_results', {}) | |
| if 'scores' in enhancement: | |
| enh_scores = list(enhancement['scores'].values()) | |
| if enh_scores: | |
| scores.append(sum(enh_scores) / len(enh_scores)) | |
| return sum(scores) / len(scores) if scores else 0 | |
| except Exception: | |
| return 0 | |
| def _extract_key_findings(self, analysis_data: Dict[str, Any]) -> List[str]: | |
| """Extract key findings from analysis data""" | |
| findings = [] | |
| try: | |
| # Add findings based on performance scores | |
| overall_perf = self._calculate_overall_performance(analysis_data) | |
| if overall_perf >= 8.0: | |
| findings.append("Content demonstrates excellent AI search optimization") | |
| elif overall_perf <= 4.0: | |
| findings.append("Significant optimization opportunities identified") | |
| # Add more specific findings based on data | |
| geo_results = analysis_data.get('geo_results', []) | |
| if geo_results: | |
| findings.append(f"Analyzed {len(geo_results)} pages for GEO performance") | |
| enhancement = analysis_data.get('enhancement_results', {}) | |
| if enhancement and 'keywords' in enhancement: | |
| findings.append(f"Identified {len(enhancement['keywords'])} key optimization terms") | |
| return findings[:5] # Return top 5 findings | |
| except Exception: | |
| return ["Unable to extract key findings"] | |
| def _get_priority_recommendations(self, analysis_data: Dict[str, Any]) -> List[str]: | |
| """Get priority recommendations from analysis""" | |
| try: | |
| recommendations = [] | |
| # Collect all recommendations from different analyses | |
| geo_results = analysis_data.get('geo_results', []) | |
| for result in geo_results: | |
| recommendations.extend(result.get('recommendations', [])) | |
| # Remove duplicates and return top priorities | |
| unique_recs = list(set(recommendations)) | |
| return unique_recs[:3] # Top 3 priority recommendations | |
| except Exception: | |
| return ["Review and implement GEO best practices"] | |
| def _estimate_roi_potential(self, performance_score: float) -> str: | |
| """Estimate ROI potential based on performance score""" | |
| if performance_score <= 4.0: | |
| return "High - Significant improvement potential" | |
| elif performance_score <= 6.0: | |
| return "Medium - Moderate improvement opportunities" | |
| else: | |
| return "Low - Already well-optimized" | |
| def _suggest_implementation_timeline(self, analysis_data: Dict[str, Any]) -> str: | |
| """Suggest implementation timeline""" | |
| try: | |
| overall_perf = self._calculate_overall_performance(analysis_data) | |
| if overall_perf <= 4.0: | |
| return "3-6 months for comprehensive optimization" | |
| elif overall_perf <= 6.0: | |
| return "1-3 months for targeted improvements" | |
| else: | |
| return "Ongoing maintenance and monitoring" | |
| except Exception: | |
| return "Timeline assessment unavailable" | |
| def _estimate_resource_requirements(self, analysis_data: Dict[str, Any]) -> Dict[str, str]: | |
| """Estimate resource requirements""" | |
| return { | |
| 'content_team': 'Required for content optimization', | |
| 'technical_team': 'Required for technical implementations', | |
| 'timeline': self._suggest_implementation_timeline(analysis_data), | |
| 'budget': 'Varies based on scope of optimizations' | |
| } | |
| def _create_analysis_overview(self, analysis_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Create analysis overview""" | |
| try: | |
| return { | |
| 'analyses_performed': list(analysis_data.keys()), | |
| 'total_items_analyzed': sum(len(v) if isinstance(v, list) else 1 for v in analysis_data.values()), | |
| 'analysis_scope': 'Comprehensive GEO and content optimization analysis', | |
| 'key_focus_areas': ['AI Search Optimization', 'Content Enhancement', 'Performance Analysis'] | |
| } | |
| except Exception: | |
| return {'error': 'Overview creation failed'} | |
| def _summarize_performance_metrics(self, analysis_data: Dict[str, Any]) -> Dict[str, float]: | |
| """Summarize performance metrics""" | |
| try: | |
| return { | |
| 'overall_performance': self._calculate_overall_performance(analysis_data), | |
| 'optimization_potential': 10 - self._calculate_overall_performance(analysis_data), | |
| 'completion_rate': 100.0 # Assuming analysis completed successfully | |
| } | |
| except Exception: | |
| return {} | |
| def _identify_improvement_opportunities(self, analysis_data: Dict[str, Any]) -> List[str]: | |
| """Identify improvement opportunities""" | |
| return self._get_priority_recommendations(analysis_data) | |
| def _assess_competitive_position(self, analysis_data: Dict[str, Any]) -> str: | |
| """Assess competitive position""" | |
| try: | |
| overall_perf = self._calculate_overall_performance(analysis_data) | |
| if overall_perf >= 8.0: | |
| return "Strong - Above average GEO performance" | |
| elif overall_perf >= 6.0: | |
| return "Competitive - Meeting industry standards" | |
| elif overall_perf >= 4.0: | |
| return "Below Average - Improvement needed" | |
| else: | |
| return "Weak - Significant optimization required" | |
| except Exception: | |
| return "Assessment unavailable" | |
| def _recommend_next_steps(self, analysis_data: Dict[str, Any]) -> List[str]: | |
| """Recommend next steps""" | |
| steps = [ | |
| "Review detailed analysis results", | |
| "Prioritize recommendations by impact", | |
| "Develop implementation plan", | |
| "Monitor performance improvements" | |
| ] | |
| # Add specific steps based on performance | |
| overall_perf = self._calculate_overall_performance(analysis_data) | |
| if overall_perf <= 4.0: | |
| steps.insert(1, "Focus on fundamental GEO optimization") | |
| return steps | |
| def _document_methodology(self) -> Dict[str, str]: | |
| """Document analysis methodology""" | |
| return { | |
| 'geo_analysis': 'AI-powered content analysis using specialized GEO metrics', | |
| 'content_optimization': 'LLM-based content enhancement and scoring', | |
| 'performance_scoring': 'Multi-dimensional scoring system for AI search optimization', | |
| 'data_collection': 'Automated content parsing and analysis', | |
| 'validation': 'Cross-referenced metrics and quality assurance checks' | |
| } | |
| def _document_data_sources(self, analysis_data: Dict[str, Any]) -> List[str]: | |
| """Document data sources used in analysis""" | |
| sources = [] | |
| if 'geo_results' in analysis_data: | |
| sources.append("Website content analysis") | |
| if 'enhancement_results' in analysis_data: | |
| sources.append("Content optimization analysis") | |
| if 'qa_results' in analysis_data: | |
| sources.append("Document Q&A interactions") | |
| sources.extend([ | |
| "AI-powered content scoring", | |
| "GEO performance metrics", | |
| "Industry best practices database" | |
| ]) | |
| return sources | |
| def _document_limitations(self) -> List[str]: | |
| """Document analysis limitations""" | |
| return [ | |
| "Analysis based on current content snapshot", | |
| "Performance may vary with search engine algorithm updates", | |
| "Recommendations require human review for implementation", | |
| "Results depend on quality of input content", | |
| "AI model performance may vary across different content types" | |
| ] | |
| def _create_appendices(self, analysis_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Create report appendices""" | |
| try: | |
| return { | |
| 'technical_details': { | |
| 'models_used': ['GPT-based content analysis', 'Semantic similarity scoring'], | |
| 'processing_time': 'Variable based on content volume', | |
| 'confidence_intervals': 'Scores provided with ±0.5 accuracy' | |
| }, | |
| 'glossary': { | |
| 'GEO': 'Generative Engine Optimization - optimization for AI search engines', | |
| 'AI Search Visibility': 'Likelihood of content appearing in AI search results', | |
| 'Citation Worthiness': 'Probability of content being cited by AI systems', | |
| 'Conversational Readiness': 'Suitability for AI chat responses' | |
| }, | |
| 'references': [ | |
| 'GEO Best Practices Guide', | |
| 'AI Search Engine Optimization Standards', | |
| 'Content Performance Benchmarks' | |
| ] | |
| } | |
| except Exception: | |
| return {} | |
| def _calculate_avg_processing_time(self, batch_results: List[Dict[str, Any]]) -> float: | |
| """Calculate average processing time for batch results""" | |
| try: | |
| processing_times = [] | |
| for result in batch_results: | |
| if 'processing_time' in result: | |
| processing_times.append(result['processing_time']) | |
| return sum(processing_times) / len(processing_times) if processing_times else 0 | |
| except Exception: | |
| return 0 | |
| def _identify_common_errors(self, batch_results: List[Dict[str, Any]]) -> List[str]: | |
| """Identify common errors in batch processing""" | |
| try: | |
| error_counts = {} | |
| for result in batch_results: | |
| if result.get('error'): | |
| error_msg = str(result['error'])[:50] # First 50 chars | |
| error_counts[error_msg] = error_counts.get(error_msg, 0) + 1 | |
| # Return top 3 most common errors | |
| sorted_errors = sorted(error_counts.items(), key=lambda x: x[1], reverse=True) | |
| return [error for error, count in sorted_errors[:3]] | |
| except Exception: | |
| return [] | |
| class DataValidator: | |
| """Helper class for validating export data""" | |
| def validate_geo_data(geo_results: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Validate GEO analysis data structure""" | |
| validation_result = { | |
| 'valid': True, | |
| 'errors': [], | |
| 'warnings': [] | |
| } | |
| try: | |
| if not geo_results: | |
| validation_result['errors'].append("No GEO results provided") | |
| validation_result['valid'] = False | |
| return validation_result | |
| for i, result in enumerate(geo_results): | |
| # Check required fields | |
| if 'geo_scores' not in result: | |
| validation_result['warnings'].append(f"Result {i} missing geo_scores") | |
| if 'page_data' not in result: | |
| validation_result['warnings'].append(f"Result {i} missing page_data") | |
| # Validate score ranges | |
| if 'geo_scores' in result: | |
| for metric, score in result['geo_scores'].items(): | |
| if not isinstance(score, (int, float)) or score < 0 or score > 10: | |
| validation_result['errors'].append(f"Invalid score for {metric} in result {i}") | |
| validation_result['valid'] = False | |
| return validation_result | |
| except Exception as e: | |
| validation_result['errors'].append(f"Validation failed: {str(e)}") | |
| validation_result['valid'] = False | |
| return validation_result | |
| def validate_enhancement_data(enhancement_result: Dict[str, Any]) -> Dict[str, Any]: | |
| """Validate content enhancement data structure""" | |
| validation_result = { | |
| 'valid': True, | |
| 'errors': [], | |
| 'warnings': [] | |
| } | |
| try: | |
| # Check for required fields | |
| if 'scores' not in enhancement_result: | |
| validation_result['warnings'].append("Enhancement result missing scores") | |
| # Validate score structure | |
| if 'scores' in enhancement_result: | |
| scores = enhancement_result['scores'] | |
| required_scores = ['clarity', 'structuredness', 'answerability'] | |
| for req_score in required_scores: | |
| if req_score not in scores: | |
| validation_result['warnings'].append(f"Missing {req_score} score") | |
| elif not isinstance(scores[req_score], (int, float)): | |
| validation_result['errors'].append(f"Invalid {req_score} score type") | |
| validation_result['valid'] = False | |
| return validation_result | |
| except Exception as e: | |
| validation_result['errors'].append(f"Enhancement validation failed: {str(e)}") | |
| validation_result['valid'] = False | |
| return validation_result | |
| class ExportManager: | |
| """High-level export management class""" | |
| def __init__(self): | |
| self.exporter = ResultExporter() | |
| self.validator = DataValidator() | |
| self.export_history = [] | |
| def export_with_validation(self, data: Dict[str, Any], data_type: str, | |
| format_type: str = 'json') -> Dict[str, Any]: | |
| """Export data with validation""" | |
| try: | |
| # Validate data first | |
| if data_type == 'geo_analysis': | |
| validation = self.validator.validate_geo_data(data.get('geo_results', [])) | |
| elif data_type == 'content_optimization': | |
| validation = self.validator.validate_enhancement_data(data) | |
| else: | |
| validation = {'valid': True, 'errors': [], 'warnings': []} | |
| # Proceed with export if validation passes | |
| if validation['valid']: | |
| if data_type == 'geo_analysis': | |
| result = self.exporter.export_geo_results( | |
| data.get('geo_results', []), | |
| data.get('website_url', 'unknown'), | |
| format_type | |
| ) | |
| elif data_type == 'content_optimization': | |
| result = self.exporter.export_enhancement_results(data, format_type) | |
| else: | |
| result = json.dumps(data, indent=2, ensure_ascii=False) | |
| # Log export | |
| self.export_history.append({ | |
| 'timestamp': datetime.now().isoformat(), | |
| 'data_type': data_type, | |
| 'format_type': format_type, | |
| 'validation_warnings': validation.get('warnings', []), | |
| 'success': True | |
| }) | |
| return { | |
| 'success': True, | |
| 'data': result, | |
| 'validation': validation | |
| } | |
| else: | |
| return { | |
| 'success': False, | |
| 'error': 'Data validation failed', | |
| 'validation': validation | |
| } | |
| except Exception as e: | |
| self.export_history.append({ | |
| 'timestamp': datetime.now().isoformat(), | |
| 'data_type': data_type, | |
| 'format_type': format_type, | |
| 'success': False, | |
| 'error': str(e) | |
| }) | |
| return { | |
| 'success': False, | |
| 'error': f"Export failed: {str(e)}" | |
| } | |
| def get_export_history(self) -> List[Dict[str, Any]]: | |
| """Get export history""" | |
| return self.export_history | |
| def clear_export_history(self) -> None: | |
| """Clear export history""" | |
| self.export_history.clear() | |
| def get_supported_formats(self) -> Dict[str, List[str]]: | |
| """Get supported export formats by data type""" | |
| return { | |
| 'geo_analysis': ['json', 'csv', 'html', 'xlsx', 'pdf'], | |
| 'content_optimization': ['json', 'html', 'csv'], | |
| 'qa_results': ['json', 'html', 'csv'], | |
| 'batch_analysis': ['json', 'xlsx', 'csv'] | |
| } | |
| def create_multi_format_export(self, data: Dict[str, Any], data_type: str, | |
| formats: List[str] = None) -> Dict[str, Any]: | |
| """Create export in multiple formats""" | |
| if formats is None: | |
| formats = ['json', 'html', 'csv'] | |
| results = {} | |
| for format_type in formats: | |
| try: | |
| export_result = self.export_with_validation(data, data_type, format_type) | |
| if export_result['success']: | |
| results[format_type] = export_result['data'] | |
| else: | |
| results[format_type] = {'error': export_result['error']} | |
| except Exception as e: | |
| results[format_type] = {'error': str(e)} | |
| return { | |
| 'multi_format_export': results, | |
| 'formats_generated': list(results.keys()), | |
| 'successful_formats': [fmt for fmt, data in results.items() if 'error' not in data] | |
| } | |
| # Utility functions for the export module | |
| def create_export_template(data_type: str) -> Dict[str, Any]: | |
| """Create export template for different data types""" | |
| templates = { | |
| 'geo_analysis': { | |
| 'website_url': 'https://example.com', | |
| 'geo_results': [ | |
| { | |
| 'page_data': { | |
| 'url': 'https://example.com/page1', | |
| 'title': 'Example Page', | |
| 'word_count': 500 | |
| }, | |
| 'geo_scores': { | |
| 'ai_search_visibility': 7.5, | |
| 'query_intent_matching': 6.8, | |
| 'conversational_readiness': 8.2, | |
| 'citation_worthiness': 7.1 | |
| }, | |
| 'recommendations': [ | |
| 'Improve content structure', | |
| 'Add more specific examples' | |
| ] | |
| } | |
| ] | |
| }, | |
| 'content_optimization': { | |
| 'scores': { | |
| 'clarity': 7.5, | |
| 'structuredness': 6.8, | |
| 'answerability': 8.2 | |
| }, | |
| 'keywords': ['example', 'optimization', 'content'], | |
| 'optimized_text': 'This is the optimized version of the content...', | |
| 'optimization_suggestions': [ | |
| 'Improve sentence structure', | |
| 'Add more specific keywords' | |
| ] | |
| }, | |
| 'qa_results': [ | |
| { | |
| 'query': 'What is the main topic?', | |
| 'result': 'The main topic is content optimization for AI systems.', | |
| 'sources': [ | |
| { | |
| 'content': 'Source document content...', | |
| 'metadata': {'source': 'document1.pdf'} | |
| } | |
| ] | |
| } | |
| ] | |
| } | |
| return templates.get(data_type, {}) | |
| def export_demo_data() -> Dict[str, Any]: | |
| """Export demonstration data for testing""" | |
| demo_data = { | |
| 'geo_analysis_demo': create_export_template('geo_analysis'), | |
| 'content_optimization_demo': create_export_template('content_optimization'), | |
| 'qa_results_demo': create_export_template('qa_results') | |
| } | |
| return demo_data | |
| # Export the main classes and functions | |
| __all__ = [ | |
| 'ResultExporter', | |
| 'GEOReport', | |
| 'ContentAnalysis', | |
| 'DataValidator', | |
| 'ExportManager', | |
| 'create_export_template', | |
| 'export_demo_data' | |
| ] | |
| # Example usage for testing | |
| if __name__ == "__main__": | |
| # Create exporter instance | |
| exporter = ResultExporter() | |
| # Test with demo data | |
| demo_geo_data = create_export_template('geo_analysis') | |
| # Export in different formats | |
| json_export = exporter.export_geo_results( | |
| demo_geo_data['geo_results'], | |
| demo_geo_data['website_url'], | |
| 'json' | |
| ) | |
| html_export = exporter.export_geo_results( | |
| demo_geo_data['geo_results'], | |
| demo_geo_data['website_url'], | |
| 'html' | |
| ) | |
| print("JSON Export:", json_export[:200] + "..." if len(str(json_export)) > 200 else json_export) | |
| print("\nHTML Export:", html_export[:200] + "..." if len(str(html_export)) > 200 else html_export) | |
| # Test enhancement export | |
| demo_enhancement = create_export_template('content_optimization') | |
| enhancement_export = exporter.export_enhancement_results(demo_enhancement, 'json') | |
| print("\nEnhancement Export:", enhancement_export[:200] + "..." if len(str(enhancement_export)) > 200 else enhancement_export) |