""" Results Export and Reporting Module Handles export of analysis results, reports, and data for external use """ import json import csv import io import zipfile import tempfile import os from datetime import datetime from typing import Dict, Any, List, Optional, Union import pandas as pd from dataclasses import dataclass, asdict @dataclass class GEOReport: """Data class for GEO analysis reports""" website_url: str analysis_date: str overall_score: float pages_analyzed: int geo_scores: Dict[str, float] recommendations: List[str] optimization_opportunities: List[Dict[str, Any]] competitive_position: str def to_dict(self) -> Dict[str, Any]: """Convert report to dictionary""" return asdict(self) @dataclass class ContentAnalysis: """Data class for content optimization analysis""" original_content: str analysis_date: str clarity_score: float structure_score: float answerability_score: float keywords: List[str] optimized_content: Optional[str] improvements_made: List[str] def to_dict(self) -> Dict[str, Any]: """Convert analysis to dictionary""" return asdict(self) class ResultExporter: """Main class for exporting analysis results and generating reports""" def __init__(self): self.export_formats = ['json', 'csv', 'html', 'pdf', 'xlsx'] self.supported_types = ['geo_analysis', 'content_optimization', 'qa_results', 'batch_analysis'] def export_geo_results(self, geo_results: List[Dict[str, Any]], website_url: str, format_type: str = 'json') -> Union[str, bytes, Dict[str, Any]]: """ Export GEO analysis results in specified format Args: geo_results (List[Dict]): List of GEO analysis results website_url (str): URL of analyzed website format_type (str): Export format ('json', 'csv', 'html', 'xlsx') Returns: Union[str, bytes, Dict]: Exported data in requested format """ try: # Prepare consolidated data export_data = self._prepare_geo_export_data(geo_results, website_url) if format_type.lower() == 'json': return self._export_geo_json(export_data) elif format_type.lower() == 'csv': return self._export_geo_csv(export_data) elif format_type.lower() == 'html': return self._export_geo_html(export_data) elif format_type.lower() == 'xlsx': return self._export_geo_excel(export_data) elif format_type.lower() == 'pdf': return self._export_geo_pdf(export_data) else: raise ValueError(f"Unsupported export format: {format_type}") except Exception as e: return {'error': f"Export failed: {str(e)}"} def export_enhancement_results(self, enhancement_result: Dict[str, Any], format_type: str = 'json') -> Union[str, bytes, Dict[str, Any]]: """ Export content enhancement results Args: enhancement_result (Dict): Content enhancement analysis result format_type (str): Export format Returns: Union[str, bytes, Dict]: Exported data """ try: # Prepare data for export export_data = self._prepare_enhancement_export_data(enhancement_result) if format_type.lower() == 'json': return json.dumps(export_data, indent=2, ensure_ascii=False) elif format_type.lower() == 'html': return self._export_enhancement_html(export_data) elif format_type.lower() == 'csv': return self._export_enhancement_csv(export_data) else: return json.dumps(export_data, indent=2, ensure_ascii=False) except Exception as e: return {'error': f"Enhancement export failed: {str(e)}"} def export_qa_results(self, qa_results: List[Dict[str, Any]], format_type: str = 'json') -> Union[str, bytes, Dict[str, Any]]: """ Export Q&A session results Args: qa_results (List[Dict]): List of Q&A interactions format_type (str): Export format Returns: Union[str, bytes, Dict]: Exported data """ try: export_data = { 'qa_session': { 'session_date': datetime.now().isoformat(), 'total_questions': len(qa_results), 'interactions': qa_results }, 'summary': { 'successful_answers': len([r for r in qa_results if not r.get('error')]), 'average_response_length': self._calculate_avg_response_length(qa_results), 'most_common_topics': self._extract_common_topics(qa_results) } } if format_type.lower() == 'json': return json.dumps(export_data, indent=2, ensure_ascii=False) elif format_type.lower() == 'html': return self._export_qa_html(export_data) elif format_type.lower() == 'csv': return self._export_qa_csv(export_data) else: return json.dumps(export_data, indent=2, ensure_ascii=False) except Exception as e: return {'error': f"Q&A export failed: {str(e)}"} def create_comprehensive_report(self, analysis_data: Dict[str, Any], report_type: str = 'full') -> Dict[str, Any]: """ Create comprehensive analysis report Args: analysis_data (Dict): Combined analysis data from multiple sources report_type (str): Type of report ('full', 'summary', 'executive') Returns: Dict: Comprehensive report data """ try: report = { 'report_metadata': { 'generated_at': datetime.now().isoformat(), 'report_type': report_type, 'generator': 'GEO SEO AI Optimizer', 'version': '1.0' } } if report_type == 'executive': report.update(self._create_executive_summary(analysis_data)) elif report_type == 'summary': report.update(self._create_summary_report(analysis_data)) else: # full report report.update(self._create_full_report(analysis_data)) return report except Exception as e: return {'error': f"Report creation failed: {str(e)}"} def export_batch_results(self, batch_results: List[Dict[str, Any]], batch_metadata: Dict[str, Any], format_type: str = 'xlsx') -> Union[str, bytes, Dict[str, Any]]: """ Export batch analysis results Args: batch_results (List[Dict]): List of batch analysis results batch_metadata (Dict): Metadata about the batch process format_type (str): Export format Returns: Union[str, bytes, Dict]: Exported batch data """ try: export_data = { 'batch_metadata': batch_metadata, 'batch_results': batch_results, 'batch_summary': self._create_batch_summary(batch_results), 'export_timestamp': datetime.now().isoformat() } if format_type.lower() == 'xlsx': return self._export_batch_excel(export_data) elif format_type.lower() == 'json': return json.dumps(export_data, indent=2, ensure_ascii=False) elif format_type.lower() == 'csv': return self._export_batch_csv(export_data) else: return json.dumps(export_data, indent=2, ensure_ascii=False) except Exception as e: return {'error': f"Batch export failed: {str(e)}"} def create_export_package(self, analysis_data: Dict[str, Any], package_name: str = "geo_analysis") -> bytes: """ Create a ZIP package with multiple export formats Args: analysis_data (Dict): Analysis data to package package_name (str): Name for the package Returns: bytes: ZIP file content """ try: # Create temporary directory with tempfile.TemporaryDirectory() as temp_dir: zip_path = os.path.join(temp_dir, f"{package_name}.zip") with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zip_file: # Add JSON export json_data = json.dumps(analysis_data, indent=2, ensure_ascii=False) zip_file.writestr(f"{package_name}.json", json_data) # Add HTML report if 'geo_results' in analysis_data: html_data = self._export_geo_html(analysis_data) zip_file.writestr(f"{package_name}_report.html", html_data) # Add CSV data if 'geo_results' in analysis_data: csv_data = self._export_geo_csv(analysis_data) zip_file.writestr(f"{package_name}_data.csv", csv_data) # Add README readme_content = self._generate_package_readme(analysis_data) zip_file.writestr("README.txt", readme_content) # Read the ZIP file with open(zip_path, 'rb') as zip_file: return zip_file.read() except Exception as e: raise Exception(f"Package creation failed: {str(e)}") def _prepare_geo_export_data(self, geo_results: List[Dict[str, Any]], website_url: str) -> Dict[str, Any]: """Prepare GEO data for export""" try: # Calculate aggregate metrics valid_results = [r for r in geo_results if 'geo_scores' in r and not r.get('error')] if not valid_results: return { 'error': 'No valid GEO results to export', 'website_url': website_url, 'export_timestamp': datetime.now().isoformat() } # Aggregate scores all_scores = {} for result in valid_results: for metric, score in result.get('geo_scores', {}).items(): if metric not in all_scores: all_scores[metric] = [] all_scores[metric].append(score) avg_scores = {metric: sum(scores) / len(scores) for metric, scores in all_scores.items()} overall_avg = sum(avg_scores.values()) / len(avg_scores) if avg_scores else 0 # Collect recommendations all_recommendations = [] all_opportunities = [] for result in valid_results: all_recommendations.extend(result.get('recommendations', [])) all_opportunities.extend(result.get('optimization_opportunities', [])) # Remove duplicates unique_recommendations = list(set(all_recommendations)) return { 'website_analysis': { 'url': website_url, 'analysis_date': datetime.now().isoformat(), 'pages_analyzed': len(valid_results), 'overall_geo_score': round(overall_avg, 2) }, 'aggregate_scores': avg_scores, 'individual_page_results': valid_results, 'recommendations': unique_recommendations[:10], # Top 10 'optimization_opportunities': all_opportunities, 'performance_insights': self._generate_performance_insights(avg_scores, overall_avg), 'export_metadata': { 'exported_by': 'GEO SEO AI Optimizer', 'export_timestamp': datetime.now().isoformat(), 'data_format': 'GEO Analysis Results v1.0' } } except Exception as e: return {'error': f"Data preparation failed: {str(e)}"} def _prepare_enhancement_export_data(self, enhancement_result: Dict[str, Any]) -> Dict[str, Any]: """Prepare content enhancement data for export""" try: scores = enhancement_result.get('scores', {}) return { 'content_analysis': { 'analysis_date': datetime.now().isoformat(), 'original_content_length': enhancement_result.get('original_length', 0), 'original_word_count': enhancement_result.get('original_word_count', 0), 'analysis_type': enhancement_result.get('optimization_type', 'standard') }, 'performance_scores': { 'clarity': scores.get('clarity', 0), 'structure': scores.get('structuredness', 0), 'answerability': scores.get('answerability', 0), 'overall_average': sum(scores.values()) / len(scores) if scores else 0 }, 'optimization_results': { 'keywords_identified': enhancement_result.get('keywords', []), 'optimized_content': enhancement_result.get('optimized_text', ''), 'improvements_made': enhancement_result.get('optimization_suggestions', []), 'analyze_only': enhancement_result.get('analyze_only', False) }, 'export_metadata': { 'exported_by': 'GEO SEO AI Optimizer', 'export_timestamp': datetime.now().isoformat(), 'data_format': 'Content Enhancement Results v1.0' } } except Exception as e: return {'error': f"Enhancement data preparation failed: {str(e)}"} def _export_geo_json(self, data: Dict[str, Any]) -> str: """Export GEO data as JSON""" return json.dumps(data, indent=2, ensure_ascii=False) def _export_geo_csv(self, data: Dict[str, Any]) -> str: """Export GEO data as CSV""" try: output = io.StringIO() # Write aggregate scores writer = csv.writer(output) writer.writerow(['GEO Analysis Results']) writer.writerow(['Website:', data.get('website_analysis', {}).get('url', 'Unknown')]) writer.writerow(['Analysis Date:', data.get('website_analysis', {}).get('analysis_date', 'Unknown')]) writer.writerow(['Overall Score:', data.get('website_analysis', {}).get('overall_geo_score', 0)]) writer.writerow([]) # Write aggregate scores writer.writerow(['Metric', 'Score']) for metric, score in data.get('aggregate_scores', {}).items(): writer.writerow([metric.replace('_', ' ').title(), round(score, 2)]) writer.writerow([]) writer.writerow(['Recommendations']) for i, rec in enumerate(data.get('recommendations', []), 1): writer.writerow([f"{i}.", rec]) # Individual page results if data.get('individual_page_results'): writer.writerow([]) writer.writerow(['Individual Page Results']) # Header for page results first_result = data['individual_page_results'][0] if 'geo_scores' in first_result: headers = ['Page Index', 'Page URL', 'Page Title'] + list(first_result['geo_scores'].keys()) writer.writerow(headers) for i, result in enumerate(data['individual_page_results']): page_data = result.get('page_data', {}) scores = result.get('geo_scores', {}) row = [ i + 1, page_data.get('url', 'Unknown'), page_data.get('title', 'Unknown') ] + [round(scores.get(metric, 0), 2) for metric in headers[3:]] writer.writerow(row) return output.getvalue() except Exception as e: return f"CSV export error: {str(e)}" def _export_geo_html(self, data: Dict[str, Any]) -> str: """Export GEO data as HTML report""" try: website_info = data.get('website_analysis', {}) scores = data.get('aggregate_scores', {}) recommendations = data.get('recommendations', []) html_content = f""" GEO Analysis Report - {website_info.get('url', 'Website')}

🚀 GEO Analysis Report

Generative Engine Optimization Performance Analysis

Website: {website_info.get('url', 'Not specified')}

Analysis Date: {website_info.get('analysis_date', 'Not specified')}

Overall GEO Score

{website_info.get('overall_geo_score', 0)}/10

Pages Analyzed

{website_info.get('pages_analyzed', 0)}

Recommendations

{len(recommendations)}

📊 Detailed GEO Metrics

""" # Add individual scores for metric, score in scores.items(): metric_display = metric.replace('_', ' ').title() score_percentage = min(score * 10, 100) # Convert to percentage html_content += f"""
{metric_display}
{score:.1f}/10
""" html_content += """

💡 Optimization Recommendations

""" # Add recommendations for i, rec in enumerate(recommendations, 1): html_content += f'
{i}. {rec}
' html_content += f"""
""" return html_content except Exception as e: return f"

HTML Export Error

{str(e)}

" def _export_geo_excel(self, data: Dict[str, Any]) -> bytes: """Export GEO data as Excel file""" try: output = io.BytesIO() with pd.ExcelWriter(output, engine='openpyxl') as writer: # Summary sheet summary_data = { 'Metric': ['Website URL', 'Analysis Date', 'Pages Analyzed', 'Overall Score'], 'Value': [ data.get('website_analysis', {}).get('url', 'Unknown'), data.get('website_analysis', {}).get('analysis_date', 'Unknown'), data.get('website_analysis', {}).get('pages_analyzed', 0), data.get('website_analysis', {}).get('overall_geo_score', 0) ] } pd.DataFrame(summary_data).to_excel(writer, sheet_name='Summary', index=False) # Scores sheet scores_data = [] for metric, score in data.get('aggregate_scores', {}).items(): scores_data.append({ 'Metric': metric.replace('_', ' ').title(), 'Score': round(score, 2), 'Performance': self._get_performance_level(score) }) pd.DataFrame(scores_data).to_excel(writer, sheet_name='GEO Scores', index=False) # Recommendations sheet rec_data = [] for i, rec in enumerate(data.get('recommendations', []), 1): rec_data.append({ 'Priority': i, 'Recommendation': rec, 'Category': self._categorize_recommendation(rec) }) if rec_data: pd.DataFrame(rec_data).to_excel(writer, sheet_name='Recommendations', index=False) # Individual pages sheet if data.get('individual_page_results'): pages_data = [] for i, result in enumerate(data['individual_page_results']): page_data = result.get('page_data', {}) scores = result.get('geo_scores', {}) page_row = { 'Page_Index': i + 1, 'URL': page_data.get('url', 'Unknown'), 'Title': page_data.get('title', 'Unknown'), 'Word_Count': page_data.get('word_count', 0) } # Add all GEO scores for metric, score in scores.items(): page_row[metric.replace('_', ' ').title()] = round(score, 2) pages_data.append(page_row) pd.DataFrame(pages_data).to_excel(writer, sheet_name='Individual Pages', index=False) output.seek(0) return output.getvalue() except Exception as e: # Return error as text file if Excel creation fails error_content = f"Excel export failed: {str(e)}\n\nData:\n{json.dumps(data, indent=2)}" return error_content.encode('utf-8') def _export_enhancement_html(self, data: Dict[str, Any]) -> str: """Export content enhancement results as HTML""" try: analysis = data.get('content_analysis', {}) scores = data.get('performance_scores', {}) optimization = data.get('optimization_results', {}) html_content = f""" Content Enhancement Report

🔧 Content Enhancement Report

AI-Optimized Content Analysis Results

Analysis Date: {analysis.get('analysis_date', 'Unknown')}

Clarity Score

{scores.get('clarity', 0):.1f}/10

Structure Score

{scores.get('structure', 0):.1f}/10

Answerability Score

{scores.get('answerability', 0):.1f}/10

Overall Average

{scores.get('overall_average', 0):.1f}/10

🔑 Identified Keywords

{' '.join([f'{keyword}' for keyword in optimization.get('keywords_identified', [])])}
{'

✨ Optimized Content

' + optimization.get('optimized_content', '') + '
' if optimization.get('optimized_content') and not optimization.get('analyze_only') else ''}

💡 Improvements Made

Generated by GEO SEO AI Optimizer | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

""" return html_content except Exception as e: return f"

Enhancement HTML Export Error

{str(e)}

" def _export_enhancement_csv(self, data: Dict[str, Any]) -> str: """Export content enhancement results as CSV""" try: output = io.StringIO() writer = csv.writer(output) # Header information analysis = data.get('content_analysis', {}) scores = data.get('performance_scores', {}) optimization = data.get('optimization_results', {}) writer.writerow(['Content Enhancement Analysis Report']) writer.writerow(['Analysis Date:', analysis.get('analysis_date', 'Unknown')]) writer.writerow(['Original Content Length:', analysis.get('original_content_length', 0)]) writer.writerow(['Original Word Count:', analysis.get('original_word_count', 0)]) writer.writerow([]) # Performance scores writer.writerow(['Performance Scores']) writer.writerow(['Metric', 'Score']) for metric, score in scores.items(): writer.writerow([metric.replace('_', ' ').title(), round(score, 2)]) writer.writerow([]) writer.writerow(['Keywords Identified']) for keyword in optimization.get('keywords_identified', []): writer.writerow([keyword]) writer.writerow([]) writer.writerow(['Improvements Made']) for improvement in optimization.get('improvements_made', []): writer.writerow([improvement]) return output.getvalue() except Exception as e: return f"Enhancement CSV export error: {str(e)}" def _export_qa_html(self, data: Dict[str, Any]) -> str: """Export Q&A results as HTML""" try: session = data.get('qa_session', {}) summary = data.get('summary', {}) interactions = session.get('interactions', []) html_content = f""" Q&A Session Report

💬 Q&A Session Report

Document Question & Answer Analysis

Session Date: {session.get('session_date', 'Unknown')}

Total Questions

{session.get('total_questions', 0)}

Successful Answers

{summary.get('successful_answers', 0)}

Avg Response Length

{summary.get('average_response_length', 0):.0f}

📝 Q&A Interactions

""" # Add individual Q&A items for i, interaction in enumerate(interactions, 1): question = interaction.get('query', 'No question') answer = interaction.get('result', interaction.get('answer', 'No answer')) sources = interaction.get('sources', []) html_content += f"""

Question {i}

Q: {question}
A: {answer}
""" if sources: html_content += '
Sources:
' html_content += '
' html_content += f"""

Generated by GEO SEO AI Optimizer | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

""" return html_content except Exception as e: return f"

Q&A HTML Export Error

{str(e)}

" def _export_qa_csv(self, data: Dict[str, Any]) -> str: """Export Q&A results as CSV""" try: output = io.StringIO() writer = csv.writer(output) session = data.get('qa_session', {}) summary = data.get('summary', {}) interactions = session.get('interactions', []) # Header writer.writerow(['Q&A Session Report']) writer.writerow(['Session Date:', session.get('session_date', 'Unknown')]) writer.writerow(['Total Questions:', session.get('total_questions', 0)]) writer.writerow(['Successful Answers:', summary.get('successful_answers', 0)]) writer.writerow([]) # Q&A data writer.writerow(['Question Index', 'Question', 'Answer', 'Has Sources', 'Answer Length']) for i, interaction in enumerate(interactions, 1): question = interaction.get('query', 'No question') answer = interaction.get('result', interaction.get('answer', 'No answer')) has_sources = 'Yes' if interaction.get('sources') else 'No' answer_length = len(answer) if answer else 0 writer.writerow([i, question, answer, has_sources, answer_length]) return output.getvalue() except Exception as e: return f"Q&A CSV export error: {str(e)}" def _export_batch_excel(self, data: Dict[str, Any]) -> bytes: """Export batch results as Excel file""" try: output = io.BytesIO() with pd.ExcelWriter(output, engine='openpyxl') as writer: # Batch metadata sheet metadata = data.get('batch_metadata', {}) metadata_df = pd.DataFrame([ {'Property': k, 'Value': v} for k, v in metadata.items() ]) metadata_df.to_excel(writer, sheet_name='Batch Metadata', index=False) # Batch summary sheet summary = data.get('batch_summary', {}) summary_df = pd.DataFrame([ {'Metric': k, 'Value': v} for k, v in summary.items() ]) summary_df.to_excel(writer, sheet_name='Batch Summary', index=False) # Individual results sheet results = data.get('batch_results', []) if results: # Flatten results for tabular format flattened_results = [] for i, result in enumerate(results): flat_result = {'Batch_Index': i} self._flatten_dict(result, flat_result) flattened_results.append(flat_result) results_df = pd.DataFrame(flattened_results) results_df.to_excel(writer, sheet_name='Batch Results', index=False) output.seek(0) return output.getvalue() except Exception as e: error_content = f"Batch Excel export failed: {str(e)}\n\nData:\n{json.dumps(data, indent=2)}" return error_content.encode('utf-8') def _export_batch_csv(self, data: Dict[str, Any]) -> str: """Export batch results as CSV""" try: output = io.StringIO() writer = csv.writer(output) # Batch metadata metadata = data.get('batch_metadata', {}) writer.writerow(['Batch Analysis Results']) writer.writerow(['Export Timestamp:', data.get('export_timestamp', 'Unknown')]) writer.writerow([]) writer.writerow(['Batch Metadata']) for key, value in metadata.items(): writer.writerow([key, value]) writer.writerow([]) # Batch summary summary = data.get('batch_summary', {}) writer.writerow(['Batch Summary']) for key, value in summary.items(): writer.writerow([key, value]) writer.writerow([]) # Individual results (simplified) results = data.get('batch_results', []) if results: writer.writerow(['Individual Results']) writer.writerow(['Index', 'Status', 'Summary']) for i, result in enumerate(results): status = 'Success' if not result.get('error') else 'Error' summary_text = str(result)[:100] + '...' if len(str(result)) > 100 else str(result) writer.writerow([i, status, summary_text]) return output.getvalue() except Exception as e: return f"Batch CSV export error: {str(e)}" def _export_geo_pdf(self, data: Dict[str, Any]) -> bytes: """Export GEO data as PDF (placeholder - would need reportlab)""" try: # For now, return HTML content as bytes # In a full implementation, you'd use reportlab or weasyprint html_content = self._export_geo_html(data) return html_content.encode('utf-8') except Exception as e: error_content = f"PDF export not fully implemented. Error: {str(e)}" return error_content.encode('utf-8') def _create_executive_summary(self, analysis_data: Dict[str, Any]) -> Dict[str, Any]: """Create executive summary report""" try: geo_results = analysis_data.get('geo_results', []) enhancement_results = analysis_data.get('enhancement_results', {}) qa_results = analysis_data.get('qa_results', []) # Calculate key metrics overall_performance = self._calculate_overall_performance(analysis_data) return { 'executive_summary': { 'overall_performance_score': overall_performance, 'key_findings': self._extract_key_findings(analysis_data), 'priority_recommendations': self._get_priority_recommendations(analysis_data), 'roi_potential': self._estimate_roi_potential(overall_performance), 'implementation_timeline': self._suggest_implementation_timeline(analysis_data), 'resource_requirements': self._estimate_resource_requirements(analysis_data) } } except Exception as e: return {'error': f"Executive summary creation failed: {str(e)}"} def _create_summary_report(self, analysis_data: Dict[str, Any]) -> Dict[str, Any]: """Create summary report""" try: return { 'summary_report': { 'analysis_overview': self._create_analysis_overview(analysis_data), 'performance_metrics': self._summarize_performance_metrics(analysis_data), 'improvement_opportunities': self._identify_improvement_opportunities(analysis_data), 'competitive_position': self._assess_competitive_position(analysis_data), 'next_steps': self._recommend_next_steps(analysis_data) } } except Exception as e: return {'error': f"Summary report creation failed: {str(e)}"} def _create_full_report(self, analysis_data: Dict[str, Any]) -> Dict[str, Any]: """Create full detailed report""" try: return { 'full_report': { 'executive_summary': self._create_executive_summary(analysis_data).get('executive_summary', {}), 'detailed_analysis': { 'geo_analysis_details': analysis_data.get('geo_results', []), 'content_optimization_details': analysis_data.get('enhancement_results', {}), 'qa_performance_details': analysis_data.get('qa_results', []) }, 'methodology': self._document_methodology(), 'data_sources': self._document_data_sources(analysis_data), 'limitations': self._document_limitations(), 'appendices': self._create_appendices(analysis_data) } } except Exception as e: return {'error': f"Full report creation failed: {str(e)}"} def _create_batch_summary(self, batch_results: List[Dict[str, Any]]) -> Dict[str, Any]: """Create summary of batch processing results""" try: total_items = len(batch_results) successful_items = len([r for r in batch_results if not r.get('error')]) failed_items = total_items - successful_items return { 'total_items': total_items, 'successful_items': successful_items, 'failed_items': failed_items, 'success_rate': (successful_items / total_items * 100) if total_items > 0 else 0, 'processing_status': 'Completed', 'average_processing_time': self._calculate_avg_processing_time(batch_results), 'common_errors': self._identify_common_errors(batch_results) } except Exception as e: return {'error': f"Batch summary creation failed: {str(e)}"} def _generate_performance_insights(self, scores: Dict[str, float], overall_avg: float) -> List[str]: """Generate performance insights from scores""" insights = [] try: # Overall performance insight if overall_avg >= 8.0: insights.append("Excellent overall GEO performance - content is well-optimized for AI search engines") elif overall_avg >= 6.0: insights.append("Good GEO performance with room for improvement in specific areas") elif overall_avg >= 4.0: insights.append("Moderate GEO performance - significant optimization opportunities exist") else: insights.append("Low GEO performance - comprehensive optimization needed") # Specific metric insights for metric, score in scores.items(): if score < 5.0: metric_name = metric.replace('_', ' ').title() insights.append(f"Low {metric_name} score ({score:.1f}) needs immediate attention") elif score >= 8.5: metric_name = metric.replace('_', ' ').title() insights.append(f"Excellent {metric_name} score ({score:.1f}) - maintain current approach") return insights[:5] # Return top 5 insights except Exception: return ["Unable to generate performance insights"] def _generate_package_readme(self, analysis_data: Dict[str, Any]) -> str: """Generate README file for export package""" try: readme_content = f""" GEO SEO AI Optimizer - Analysis Package ====================================== Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} This package contains the complete analysis results from the GEO SEO AI Optimizer tool. Files Included: - JSON file: Complete raw data in JSON format - HTML file: Visual report for web viewing - CSV file: Tabular data for spreadsheet analysis - README.txt: This file About GEO (Generative Engine Optimization): GEO is the practice of optimizing content for AI-powered search engines and language models. Unlike traditional SEO, GEO focuses on: - AI search visibility - Query intent matching - Conversational readiness - Citation worthiness - Semantic richness - Context completeness How to Use These Files: 1. Open the HTML file in a web browser for a visual report 2. Import the CSV file into Excel or Google Sheets for analysis 3. Use the JSON file for programmatic processing or integration For more information about GEO optimization, visit the tool documentation. Generated by: GEO SEO AI Optimizer v1.0 """ return readme_content except Exception as e: return f"README generation failed: {str(e)}" # Helper methods for data processing and analysis def _get_performance_level(self, score: float) -> str: """Get performance level description for a score""" if score >= 8.0: return "Excellent" elif score >= 6.0: return "Good" elif score >= 4.0: return "Fair" else: return "Needs Improvement" def _categorize_recommendation(self, recommendation: str) -> str: """Categorize a recommendation based on content""" rec_lower = recommendation.lower() if any(word in rec_lower for word in ['structure', 'heading', 'format']): return "Content Structure" elif any(word in rec_lower for word in ['keyword', 'semantic', 'topic']): return "SEO & Keywords" elif any(word in rec_lower for word in ['clarity', 'readability', 'language']): return "Content Quality" elif any(word in rec_lower for word in ['technical', 'schema', 'markup']): return "Technical SEO" else: return "General" def _calculate_avg_response_length(self, qa_results: List[Dict[str, Any]]) -> float: """Calculate average response length for Q&A results""" try: response_lengths = [] for result in qa_results: answer = result.get('result', result.get('answer', '')) if answer and not result.get('error'): response_lengths.append(len(answer)) return sum(response_lengths) / len(response_lengths) if response_lengths else 0 except Exception: return 0 def _extract_common_topics(self, qa_results: List[Dict[str, Any]]) -> List[str]: """Extract common topics from Q&A results""" try: # Simple topic extraction based on question keywords topics = {} for result in qa_results: question = result.get('query', result.get('question', '')) if question: words = question.lower().split() for word in words: if len(word) > 4: # Focus on longer words topics[word] = topics.get(word, 0) + 1 # Return top 5 most common topics sorted_topics = sorted(topics.items(), key=lambda x: x[1], reverse=True) return [topic for topic, count in sorted_topics[:5]] except Exception: return [] def _flatten_dict(self, d: Dict[str, Any], parent_dict: Dict[str, Any], parent_key: str = '') -> None: """Flatten nested dictionary for tabular export""" try: for key, value in d.items(): new_key = f"{parent_key}_{key}" if parent_key else key if isinstance(value, dict): self._flatten_dict(value, parent_dict, new_key) elif isinstance(value, list): parent_dict[new_key] = json.dumps(value) # Convert lists to JSON strings else: parent_dict[new_key] = value except Exception: pass # Skip problematic keys def _calculate_overall_performance(self, analysis_data: Dict[str, Any]) -> float: """Calculate overall performance score across all analyses""" try: scores = [] # GEO scores geo_results = analysis_data.get('geo_results', []) for result in geo_results: if 'geo_scores' in result: geo_score_values = list(result['geo_scores'].values()) if geo_score_values: scores.append(sum(geo_score_values) / len(geo_score_values)) # Enhancement scores enhancement = analysis_data.get('enhancement_results', {}) if 'scores' in enhancement: enh_scores = list(enhancement['scores'].values()) if enh_scores: scores.append(sum(enh_scores) / len(enh_scores)) return sum(scores) / len(scores) if scores else 0 except Exception: return 0 def _extract_key_findings(self, analysis_data: Dict[str, Any]) -> List[str]: """Extract key findings from analysis data""" findings = [] try: # Add findings based on performance scores overall_perf = self._calculate_overall_performance(analysis_data) if overall_perf >= 8.0: findings.append("Content demonstrates excellent AI search optimization") elif overall_perf <= 4.0: findings.append("Significant optimization opportunities identified") # Add more specific findings based on data geo_results = analysis_data.get('geo_results', []) if geo_results: findings.append(f"Analyzed {len(geo_results)} pages for GEO performance") enhancement = analysis_data.get('enhancement_results', {}) if enhancement and 'keywords' in enhancement: findings.append(f"Identified {len(enhancement['keywords'])} key optimization terms") return findings[:5] # Return top 5 findings except Exception: return ["Unable to extract key findings"] def _get_priority_recommendations(self, analysis_data: Dict[str, Any]) -> List[str]: """Get priority recommendations from analysis""" try: recommendations = [] # Collect all recommendations from different analyses geo_results = analysis_data.get('geo_results', []) for result in geo_results: recommendations.extend(result.get('recommendations', [])) # Remove duplicates and return top priorities unique_recs = list(set(recommendations)) return unique_recs[:3] # Top 3 priority recommendations except Exception: return ["Review and implement GEO best practices"] def _estimate_roi_potential(self, performance_score: float) -> str: """Estimate ROI potential based on performance score""" if performance_score <= 4.0: return "High - Significant improvement potential" elif performance_score <= 6.0: return "Medium - Moderate improvement opportunities" else: return "Low - Already well-optimized" def _suggest_implementation_timeline(self, analysis_data: Dict[str, Any]) -> str: """Suggest implementation timeline""" try: overall_perf = self._calculate_overall_performance(analysis_data) if overall_perf <= 4.0: return "3-6 months for comprehensive optimization" elif overall_perf <= 6.0: return "1-3 months for targeted improvements" else: return "Ongoing maintenance and monitoring" except Exception: return "Timeline assessment unavailable" def _estimate_resource_requirements(self, analysis_data: Dict[str, Any]) -> Dict[str, str]: """Estimate resource requirements""" return { 'content_team': 'Required for content optimization', 'technical_team': 'Required for technical implementations', 'timeline': self._suggest_implementation_timeline(analysis_data), 'budget': 'Varies based on scope of optimizations' } def _create_analysis_overview(self, analysis_data: Dict[str, Any]) -> Dict[str, Any]: """Create analysis overview""" try: return { 'analyses_performed': list(analysis_data.keys()), 'total_items_analyzed': sum(len(v) if isinstance(v, list) else 1 for v in analysis_data.values()), 'analysis_scope': 'Comprehensive GEO and content optimization analysis', 'key_focus_areas': ['AI Search Optimization', 'Content Enhancement', 'Performance Analysis'] } except Exception: return {'error': 'Overview creation failed'} def _summarize_performance_metrics(self, analysis_data: Dict[str, Any]) -> Dict[str, float]: """Summarize performance metrics""" try: return { 'overall_performance': self._calculate_overall_performance(analysis_data), 'optimization_potential': 10 - self._calculate_overall_performance(analysis_data), 'completion_rate': 100.0 # Assuming analysis completed successfully } except Exception: return {} def _identify_improvement_opportunities(self, analysis_data: Dict[str, Any]) -> List[str]: """Identify improvement opportunities""" return self._get_priority_recommendations(analysis_data) def _assess_competitive_position(self, analysis_data: Dict[str, Any]) -> str: """Assess competitive position""" try: overall_perf = self._calculate_overall_performance(analysis_data) if overall_perf >= 8.0: return "Strong - Above average GEO performance" elif overall_perf >= 6.0: return "Competitive - Meeting industry standards" elif overall_perf >= 4.0: return "Below Average - Improvement needed" else: return "Weak - Significant optimization required" except Exception: return "Assessment unavailable" def _recommend_next_steps(self, analysis_data: Dict[str, Any]) -> List[str]: """Recommend next steps""" steps = [ "Review detailed analysis results", "Prioritize recommendations by impact", "Develop implementation plan", "Monitor performance improvements" ] # Add specific steps based on performance overall_perf = self._calculate_overall_performance(analysis_data) if overall_perf <= 4.0: steps.insert(1, "Focus on fundamental GEO optimization") return steps def _document_methodology(self) -> Dict[str, str]: """Document analysis methodology""" return { 'geo_analysis': 'AI-powered content analysis using specialized GEO metrics', 'content_optimization': 'LLM-based content enhancement and scoring', 'performance_scoring': 'Multi-dimensional scoring system for AI search optimization', 'data_collection': 'Automated content parsing and analysis', 'validation': 'Cross-referenced metrics and quality assurance checks' } def _document_data_sources(self, analysis_data: Dict[str, Any]) -> List[str]: """Document data sources used in analysis""" sources = [] if 'geo_results' in analysis_data: sources.append("Website content analysis") if 'enhancement_results' in analysis_data: sources.append("Content optimization analysis") if 'qa_results' in analysis_data: sources.append("Document Q&A interactions") sources.extend([ "AI-powered content scoring", "GEO performance metrics", "Industry best practices database" ]) return sources def _document_limitations(self) -> List[str]: """Document analysis limitations""" return [ "Analysis based on current content snapshot", "Performance may vary with search engine algorithm updates", "Recommendations require human review for implementation", "Results depend on quality of input content", "AI model performance may vary across different content types" ] def _create_appendices(self, analysis_data: Dict[str, Any]) -> Dict[str, Any]: """Create report appendices""" try: return { 'technical_details': { 'models_used': ['GPT-based content analysis', 'Semantic similarity scoring'], 'processing_time': 'Variable based on content volume', 'confidence_intervals': 'Scores provided with ±0.5 accuracy' }, 'glossary': { 'GEO': 'Generative Engine Optimization - optimization for AI search engines', 'AI Search Visibility': 'Likelihood of content appearing in AI search results', 'Citation Worthiness': 'Probability of content being cited by AI systems', 'Conversational Readiness': 'Suitability for AI chat responses' }, 'references': [ 'GEO Best Practices Guide', 'AI Search Engine Optimization Standards', 'Content Performance Benchmarks' ] } except Exception: return {} def _calculate_avg_processing_time(self, batch_results: List[Dict[str, Any]]) -> float: """Calculate average processing time for batch results""" try: processing_times = [] for result in batch_results: if 'processing_time' in result: processing_times.append(result['processing_time']) return sum(processing_times) / len(processing_times) if processing_times else 0 except Exception: return 0 def _identify_common_errors(self, batch_results: List[Dict[str, Any]]) -> List[str]: """Identify common errors in batch processing""" try: error_counts = {} for result in batch_results: if result.get('error'): error_msg = str(result['error'])[:50] # First 50 chars error_counts[error_msg] = error_counts.get(error_msg, 0) + 1 # Return top 3 most common errors sorted_errors = sorted(error_counts.items(), key=lambda x: x[1], reverse=True) return [error for error, count in sorted_errors[:3]] except Exception: return [] class DataValidator: """Helper class for validating export data""" @staticmethod def validate_geo_data(geo_results: List[Dict[str, Any]]) -> Dict[str, Any]: """Validate GEO analysis data structure""" validation_result = { 'valid': True, 'errors': [], 'warnings': [] } try: if not geo_results: validation_result['errors'].append("No GEO results provided") validation_result['valid'] = False return validation_result for i, result in enumerate(geo_results): # Check required fields if 'geo_scores' not in result: validation_result['warnings'].append(f"Result {i} missing geo_scores") if 'page_data' not in result: validation_result['warnings'].append(f"Result {i} missing page_data") # Validate score ranges if 'geo_scores' in result: for metric, score in result['geo_scores'].items(): if not isinstance(score, (int, float)) or score < 0 or score > 10: validation_result['errors'].append(f"Invalid score for {metric} in result {i}") validation_result['valid'] = False return validation_result except Exception as e: validation_result['errors'].append(f"Validation failed: {str(e)}") validation_result['valid'] = False return validation_result @staticmethod def validate_enhancement_data(enhancement_result: Dict[str, Any]) -> Dict[str, Any]: """Validate content enhancement data structure""" validation_result = { 'valid': True, 'errors': [], 'warnings': [] } try: # Check for required fields if 'scores' not in enhancement_result: validation_result['warnings'].append("Enhancement result missing scores") # Validate score structure if 'scores' in enhancement_result: scores = enhancement_result['scores'] required_scores = ['clarity', 'structuredness', 'answerability'] for req_score in required_scores: if req_score not in scores: validation_result['warnings'].append(f"Missing {req_score} score") elif not isinstance(scores[req_score], (int, float)): validation_result['errors'].append(f"Invalid {req_score} score type") validation_result['valid'] = False return validation_result except Exception as e: validation_result['errors'].append(f"Enhancement validation failed: {str(e)}") validation_result['valid'] = False return validation_result class ExportManager: """High-level export management class""" def __init__(self): self.exporter = ResultExporter() self.validator = DataValidator() self.export_history = [] def export_with_validation(self, data: Dict[str, Any], data_type: str, format_type: str = 'json') -> Dict[str, Any]: """Export data with validation""" try: # Validate data first if data_type == 'geo_analysis': validation = self.validator.validate_geo_data(data.get('geo_results', [])) elif data_type == 'content_optimization': validation = self.validator.validate_enhancement_data(data) else: validation = {'valid': True, 'errors': [], 'warnings': []} # Proceed with export if validation passes if validation['valid']: if data_type == 'geo_analysis': result = self.exporter.export_geo_results( data.get('geo_results', []), data.get('website_url', 'unknown'), format_type ) elif data_type == 'content_optimization': result = self.exporter.export_enhancement_results(data, format_type) else: result = json.dumps(data, indent=2, ensure_ascii=False) # Log export self.export_history.append({ 'timestamp': datetime.now().isoformat(), 'data_type': data_type, 'format_type': format_type, 'validation_warnings': validation.get('warnings', []), 'success': True }) return { 'success': True, 'data': result, 'validation': validation } else: return { 'success': False, 'error': 'Data validation failed', 'validation': validation } except Exception as e: self.export_history.append({ 'timestamp': datetime.now().isoformat(), 'data_type': data_type, 'format_type': format_type, 'success': False, 'error': str(e) }) return { 'success': False, 'error': f"Export failed: {str(e)}" } def get_export_history(self) -> List[Dict[str, Any]]: """Get export history""" return self.export_history def clear_export_history(self) -> None: """Clear export history""" self.export_history.clear() def get_supported_formats(self) -> Dict[str, List[str]]: """Get supported export formats by data type""" return { 'geo_analysis': ['json', 'csv', 'html', 'xlsx', 'pdf'], 'content_optimization': ['json', 'html', 'csv'], 'qa_results': ['json', 'html', 'csv'], 'batch_analysis': ['json', 'xlsx', 'csv'] } def create_multi_format_export(self, data: Dict[str, Any], data_type: str, formats: List[str] = None) -> Dict[str, Any]: """Create export in multiple formats""" if formats is None: formats = ['json', 'html', 'csv'] results = {} for format_type in formats: try: export_result = self.export_with_validation(data, data_type, format_type) if export_result['success']: results[format_type] = export_result['data'] else: results[format_type] = {'error': export_result['error']} except Exception as e: results[format_type] = {'error': str(e)} return { 'multi_format_export': results, 'formats_generated': list(results.keys()), 'successful_formats': [fmt for fmt, data in results.items() if 'error' not in data] } # Utility functions for the export module def create_export_template(data_type: str) -> Dict[str, Any]: """Create export template for different data types""" templates = { 'geo_analysis': { 'website_url': 'https://example.com', 'geo_results': [ { 'page_data': { 'url': 'https://example.com/page1', 'title': 'Example Page', 'word_count': 500 }, 'geo_scores': { 'ai_search_visibility': 7.5, 'query_intent_matching': 6.8, 'conversational_readiness': 8.2, 'citation_worthiness': 7.1 }, 'recommendations': [ 'Improve content structure', 'Add more specific examples' ] } ] }, 'content_optimization': { 'scores': { 'clarity': 7.5, 'structuredness': 6.8, 'answerability': 8.2 }, 'keywords': ['example', 'optimization', 'content'], 'optimized_text': 'This is the optimized version of the content...', 'optimization_suggestions': [ 'Improve sentence structure', 'Add more specific keywords' ] }, 'qa_results': [ { 'query': 'What is the main topic?', 'result': 'The main topic is content optimization for AI systems.', 'sources': [ { 'content': 'Source document content...', 'metadata': {'source': 'document1.pdf'} } ] } ] } return templates.get(data_type, {}) def export_demo_data() -> Dict[str, Any]: """Export demonstration data for testing""" demo_data = { 'geo_analysis_demo': create_export_template('geo_analysis'), 'content_optimization_demo': create_export_template('content_optimization'), 'qa_results_demo': create_export_template('qa_results') } return demo_data # Export the main classes and functions __all__ = [ 'ResultExporter', 'GEOReport', 'ContentAnalysis', 'DataValidator', 'ExportManager', 'create_export_template', 'export_demo_data' ] # Example usage for testing if __name__ == "__main__": # Create exporter instance exporter = ResultExporter() # Test with demo data demo_geo_data = create_export_template('geo_analysis') # Export in different formats json_export = exporter.export_geo_results( demo_geo_data['geo_results'], demo_geo_data['website_url'], 'json' ) html_export = exporter.export_geo_results( demo_geo_data['geo_results'], demo_geo_data['website_url'], 'html' ) print("JSON Export:", json_export[:200] + "..." if len(str(json_export)) > 200 else json_export) print("\nHTML Export:", html_export[:200] + "..." if len(str(html_export)) > 200 else html_export) # Test enhancement export demo_enhancement = create_export_template('content_optimization') enhancement_export = exporter.export_enhancement_results(demo_enhancement, 'json') print("\nEnhancement Export:", enhancement_export[:200] + "..." if len(str(enhancement_export)) > 200 else enhancement_export)