Spaces:
Runtime error
Runtime error
| """ | |
| Main Streamlit Application - GEO SEO AI Optimizer | |
| Entry point for the application with UI components | |
| """ | |
| import streamlit as st | |
| import os | |
| import tempfile | |
| import json | |
| from typing import Dict, Any, List | |
| import time # Add this if not present | |
| # Import our custom modules | |
| from utils.parser import PDFParser, TextParser, WebpageParser | |
| from utils.scorer import GEOScorer | |
| from utils.optimizer import ContentOptimizer | |
| from utils.chunker import VectorChunker | |
| from utils.export import ResultExporter | |
| # Import LangChain components | |
| from langchain_groq import ChatGroq | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| class GEOSEOApp: | |
| """Main application class that orchestrates all components""" | |
| def __init__(self): | |
| self.setup_config() | |
| self.setup_models() | |
| self.setup_parsers() | |
| self.setup_components() | |
| def setup_config(self): | |
| """Initialize configuration and API keys""" | |
| self.groq_api_key = os.getenv("GROQ_API_KEY", "your-groq-api-key") | |
| self.hf_api_key = os.getenv("HUGGINGFACE_API_KEY", "your-huggingface-api-key") | |
| # Create data directory if it doesn't exist | |
| os.makedirs("data/uploaded_files", exist_ok=True) | |
| def setup_models(self): | |
| """Initialize LLM and embedding models""" | |
| self.llm = ChatGroq( | |
| api_key=self.groq_api_key, | |
| model_name="llama3-8b-8192", | |
| temperature=0.1 | |
| ) | |
| # Updated embeddings initialization without the `device` parameter | |
| self.embeddings = HuggingFaceEmbeddings( | |
| model_name="sentence-transformers/all-MiniLM-L6-v2", | |
| cache_folder="./hf_cache", | |
| model_kwargs={'device': 'cpu'} # Ensure the model loads on CPU | |
| ) | |
| def setup_parsers(self): | |
| """Initialize content parsers""" | |
| self.pdf_parser = PDFParser() | |
| self.text_parser = TextParser() | |
| self.webpage_parser = WebpageParser() | |
| def setup_components(self): | |
| """Initialize processing components""" | |
| self.geo_scorer = GEOScorer(self.llm) | |
| self.content_optimizer = ContentOptimizer(self.llm) | |
| self.vector_chunker = VectorChunker(self.embeddings) | |
| self.result_exporter = ResultExporter() | |
| def run(self): | |
| """Main application runner""" | |
| st.set_page_config( | |
| page_title="GEO SEO AI Optimizer", | |
| page_icon="π", | |
| layout="wide" | |
| ) | |
| st.title("π GEO SEO AI Optimizer") | |
| st.markdown("*Optimize your content for AI search engines and LLM systems*") | |
| # Sidebar | |
| self.render_sidebar() | |
| # Main tabs | |
| tab1, tab2, tab3 = st.tabs([ | |
| "π Website GEO Analysis", | |
| "π§ Content Enhancement", | |
| "π Document Q&A", | |
| ]) | |
| with tab1: | |
| self.render_website_analysis_tab() | |
| with tab2: | |
| self.render_content_enhancement_tab() | |
| with tab3: | |
| self.render_document_qa_tab() | |
| def render_sidebar(self): | |
| """Render sidebar with information and controls""" | |
| st.sidebar.title("π οΈ GEO Tools") | |
| st.sidebar.markdown("- π Document Q&A with RAG") | |
| st.sidebar.markdown("- π§ Content Enhancement") | |
| st.sidebar.markdown("- π Website GEO Analysis") | |
| st.sidebar.markdown("- π AI-First SEO Scoring") | |
| st.sidebar.markdown("---") | |
| st.sidebar.markdown("### π§ Configuration") | |
| st.sidebar.markdown("Set your API keys:") | |
| st.sidebar.code("export GROQ_API_KEY='your-key'") | |
| st.sidebar.markdown("---") | |
| st.sidebar.markdown("### π GEO Metrics") | |
| st.sidebar.markdown("**AI Search Visibility**: How likely AI engines will surface your content") | |
| st.sidebar.markdown("**Query Intent Matching**: How well content matches user queries") | |
| st.sidebar.markdown("**Conversational Readiness**: Suitability for AI chat responses") | |
| st.sidebar.markdown("**Citation Worthiness**: Probability of being cited by AI") | |
| st.sidebar.markdown("---") | |
| st.sidebar.markdown("### βΉοΈ Components") | |
| st.sidebar.markdown("- **Parser**: Extract content from various sources") | |
| st.sidebar.markdown("- **Scorer**: Analyze GEO performance") | |
| st.sidebar.markdown("- **Optimizer**: Enhance content for AI") | |
| st.sidebar.markdown("- **Chunker**: Create vector embeddings") | |
| st.sidebar.markdown("- **Exporter**: Generate reports") | |
| def render_document_qa_tab(self): | |
| """Render Document Q&A tab""" | |
| st.header("π Document Question Answering") | |
| st.markdown("Upload documents or paste text to ask questions using RAG.") | |
| # File upload | |
| uploaded_file = st.file_uploader("Upload a PDF file", type=["pdf"]) | |
| # Text input | |
| pasted_text = st.text_area("Or paste text directly:", height=150) | |
| # Question input | |
| user_query = st.text_input("Ask a question about the content:") | |
| # Submit button | |
| if st.button("π Ask Question", key="qa_submit"): | |
| if not user_query.strip(): | |
| st.warning("Please enter a question.") | |
| return | |
| try: | |
| # Parse content | |
| documents = [] | |
| if uploaded_file: | |
| with st.spinner("Processing PDF..."): | |
| # Save uploaded file temporarily | |
| temp_path = self.save_uploaded_file(uploaded_file) | |
| documents = self.pdf_parser.parse(temp_path) | |
| os.unlink(temp_path) # Clean up | |
| elif pasted_text.strip(): | |
| with st.spinner("Processing text..."): | |
| documents = self.text_parser.parse(pasted_text) | |
| else: | |
| st.warning("Please upload a PDF or paste some text.") | |
| return | |
| # Create vector store and answer question | |
| with st.spinner("Creating embeddings and searching..."): | |
| qa_chain = self.vector_chunker.create_qa_chain(documents, self.llm) | |
| result = qa_chain({"query": user_query}) | |
| # Display results | |
| st.markdown("### π¬ Answer") | |
| st.write(result["result"]) | |
| # Show sources | |
| with st.expander("π Source Documents"): | |
| for i, doc in enumerate(result.get("source_documents", [])): | |
| st.write(f"**Source {i+1}:**") | |
| content = doc.page_content | |
| st.write(content[:500] + "..." if len(content) > 500 else content) | |
| if hasattr(doc, 'metadata') and doc.metadata: | |
| st.write(f"*Metadata: {doc.metadata}*") | |
| st.write("---") | |
| except Exception as e: | |
| st.error(f"An error occurred: {str(e)}") | |
| def render_content_enhancement_tab(self): | |
| """Render Content Enhancement tab with optimization type selector""" | |
| st.header("π§ Content Enhancement") | |
| st.markdown("Analyze and optimize your content for better AI/LLM performance.") | |
| # Content input | |
| input_text = st.text_area( | |
| "Enter content to analyze and enhance:", | |
| height=200, | |
| key="enhancement_input" | |
| ) | |
| # Optimization type selector | |
| st.markdown("### βοΈ Optimization Settings") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| optimization_type = st.selectbox( | |
| "Select Optimization Type:", | |
| options=[ | |
| "standard", | |
| "seo", | |
| "competitive", | |
| "voice_search", | |
| # "batch_optimize", | |
| # "content_variations", | |
| "readability_analysis", | |
| # "entity_extraction" | |
| ], | |
| format_func=lambda x: { | |
| "standard": "π§ Standard Enhancement", | |
| "seo": "π SEO-Focused Optimization", | |
| "competitive": "π Competitive Analysis", | |
| "voice_search": "π€ Voice Search Optimization", | |
| # "batch_optimize": "π¦ Batch Optimization", | |
| # "content_variations": "π Content Variations", | |
| "readability_analysis": "π Readability Analysis", | |
| # "entity_extraction": "π·οΈ Entity Extraction" | |
| }[x], | |
| index=0, | |
| help="Choose the type of optimization to apply to your content" | |
| ) | |
| with col2: | |
| # Additional options based on optimization type | |
| if optimization_type in ["standard", "seo", "competitive", "voice_search", "readability_analysis"]: | |
| analyze_only = st.checkbox("Analysis only (no rewriting)", value=False) | |
| include_keywords = st.checkbox("Include keyword suggestions", value=True) | |
| # elif optimization_type == "batch_optimize": | |
| # st.info("For batch optimization, separate multiple content pieces with '---' in the text area above") | |
| # elif optimization_type == "content_variations": | |
| # num_variations = st.slider("Number of variations", min_value=1, max_value=5, value=3) | |
| else: | |
| analyze_only = False | |
| include_keywords = True | |
| # num_variations = 3 | |
| # Show description based on optimization type | |
| optimization_descriptions = { | |
| "standard": "General content enhancement focusing on clarity, structure, and AI answerability.", | |
| "seo": "SEO-focused optimization for AI search engines with semantic keyword analysis.", | |
| "competitive": "Competitive analysis against AI search best practices with gap identification.", | |
| "voice_search": "Optimization for voice search and conversational AI systems.", | |
| # "batch_optimize": "Process multiple content pieces simultaneously.", | |
| # "content_variations": "Generate multiple optimized variations of the same content.", | |
| "readability_analysis": "Detailed readability analysis specifically for AI systems.", | |
| # "entity_extraction": "Extract key entities, topics, and concepts for optimization insights." | |
| } | |
| st.info(f"**{optimization_descriptions[optimization_type]}**") | |
| # Submit button | |
| if st.button("π Process Content", key="enhancement_submit"): | |
| if not input_text.strip(): | |
| st.warning("Please enter some content to analyze.") | |
| return | |
| try: | |
| with st.spinner(f"Processing content with {optimization_type} optimization..."): | |
| # Handle different optimization types | |
| if optimization_type == "standard": | |
| result = self.content_optimizer.optimize_content( | |
| input_text, | |
| analyze_only=analyze_only, | |
| include_keywords=include_keywords, | |
| optimization_type="standard" | |
| ) | |
| elif optimization_type == "seo": | |
| result = self.content_optimizer.optimize_content( | |
| input_text, | |
| analyze_only=analyze_only, | |
| include_keywords=include_keywords, | |
| optimization_type="seo" | |
| ) | |
| elif optimization_type == "competitive": | |
| result = self.content_optimizer.optimize_content( | |
| input_text, | |
| optimization_type="competitive" | |
| ) | |
| elif optimization_type == "voice_search": | |
| result = self.content_optimizer.optimize_for_voice_search(input_text) | |
| # elif optimization_type == "batch_optimize": | |
| # # Split content by '---' separator | |
| # content_pieces = [piece.strip() for piece in input_text.split('---') if piece.strip()] | |
| # if len(content_pieces) > 1: | |
| # result = self.content_optimizer.batch_optimize_content(content_pieces) | |
| # else: | |
| # st.warning("For batch optimization, please separate content pieces with '---'") | |
| # return | |
| # elif optimization_type == "content_variations": | |
| # result = self.content_optimizer.generate_content_variations( | |
| # input_text, | |
| # num_variations=num_variations | |
| # ) | |
| elif optimization_type == "readability_analysis": | |
| result = self.content_optimizer.analyze_content_readability(input_text) | |
| # elif optimization_type == "entity_extraction": | |
| # result = self.content_optimizer.extract_key_entities(input_text) | |
| if result.get("error"): | |
| st.error(f"Processing failed: {result['error']}") | |
| return | |
| # Display results based on optimization type | |
| self.display_enhancement_results(result, optimization_type, input_text) | |
| except Exception as e: | |
| st.error(f"An error occurred: {str(e)}") | |
| def display_enhancement_results(self, result, optimization_type, original_text): | |
| """Display results based on optimization type""" | |
| st.success(f"{optimization_type.title()} optimization completed successfully!") | |
| # if optimization_type == "batch_optimize": | |
| # self.display_batch_results(result) | |
| # elif optimization_type == "content_variations": | |
| # self.display_variation_results(result) | |
| if optimization_type == "readability_analysis": | |
| self.display_readability_results(result) | |
| # elif optimization_type == "entity_extraction": | |
| # self.display_entity_results(result) | |
| elif optimization_type == "voice_search": | |
| self.display_voice_search_results(result) | |
| else: | |
| self.display_standard_results(result, optimization_type) | |
| # Export functionality | |
| self.display_export_options(result, optimization_type, original_text) | |
| def display_standard_results(self, result, optimization_type): | |
| """Display results for standard, SEO, and competitive optimizations""" | |
| st.markdown("### π Analysis Results") | |
| # Show scores if available | |
| scores = result.get("scores", {}) | |
| if scores: | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| clarity = scores.get("clarity", 0) | |
| st.metric("Clarity", f"{clarity}/10") | |
| with col2: | |
| structure = scores.get("structuredness", 0) | |
| st.metric("Structure", f"{structure}/10") | |
| with col3: | |
| answerability = scores.get("answerability", 0) | |
| st.metric("Answerability", f"{answerability}/10") | |
| # Show SEO analysis if available | |
| if "seo_analysis" in result: | |
| st.markdown("#### π SEO Analysis") | |
| seo_data = result["seo_analysis"] | |
| if "readability_score" in seo_data: | |
| st.metric("Readability Score", f"{seo_data['readability_score']}/10") | |
| if "semantic_gaps" in seo_data: | |
| st.write("**Semantic Gaps:**", ", ".join(seo_data["semantic_gaps"])) | |
| # Show competitive analysis if available | |
| if "competitive_analysis" in result: | |
| st.markdown("#### π Competitive Analysis") | |
| comp_data = result["competitive_analysis"] | |
| for key, value in comp_data.items(): | |
| if isinstance(value, list): | |
| st.write(f"**{key.replace('_', ' ').title()}:**", ", ".join(value)) | |
| else: | |
| st.write(f"**{key.replace('_', ' ').title()}:**", value) | |
| # Show keywords | |
| keywords = result.get("keywords", []) | |
| if keywords: | |
| st.markdown("#### π Key Terms") | |
| st.write(", ".join(keywords)) | |
| # Show optimized content | |
| optimized_content = result.get("optimized_text") or result.get("optimized_content", {}).get("enhanced_content", "") | |
| if optimized_content: | |
| st.markdown("#### β¨ Optimized Content") | |
| st.text_area( | |
| "Enhanced version:", | |
| value=optimized_content, | |
| height=200, | |
| key="optimized_output" | |
| ) | |
| # Show recommendations | |
| recommendations = result.get("recommendations", []) | |
| if recommendations: | |
| st.markdown("#### π‘ Recommendations") | |
| for i, rec in enumerate(recommendations, 1): | |
| st.write(f"**{i}.** {rec}") | |
| def display_batch_results(self, results): | |
| """Display batch optimization results""" | |
| st.markdown("### π¦ Batch Processing Results") | |
| successful_results = [r for r in results if not r.get('error')] | |
| failed_results = [r for r in results if r.get('error')] | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("Total Pieces", len(results)) | |
| with col2: | |
| st.metric("Successful", len(successful_results)) | |
| with col3: | |
| st.metric("Failed", len(failed_results)) | |
| # Show individual results | |
| for result in results: | |
| idx = result.get('batch_index', 0) | |
| st.markdown(f"#### Content Piece {idx + 1}") | |
| if result.get('error'): | |
| st.error(f"Processing failed: {result['error']}") | |
| else: | |
| # Show scores | |
| scores = result.get("scores", {}) | |
| if scores: | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("Clarity", f"{scores.get('clarity', 0)}/10") | |
| with col2: | |
| st.metric("Structure", f"{scores.get('structuredness', 0)}/10") | |
| with col3: | |
| st.metric("Answerability", f"{scores.get('answerability', 0)}/10") | |
| # Show optimized content if available | |
| optimized = result.get("optimized_text", "") | |
| if optimized: | |
| with st.expander("View optimized content"): | |
| st.text_area("", value=optimized, height=150, key=f"batch_output_{idx}") | |
| st.write("---") | |
| def display_variation_results(self, variations): | |
| """Display content variation results""" | |
| st.markdown("### π Content Variations") | |
| for i, variation in enumerate(variations): | |
| if variation.get('error'): | |
| st.error(f"Variation {i+1} failed: {variation['error']}") | |
| continue | |
| variation_type = variation.get('variation_type', f'Variation {i+1}') | |
| st.markdown(f"#### {variation_type.title()} Version") | |
| # Show variation details | |
| target_use_case = variation.get('target_use_case', '') | |
| if target_use_case: | |
| st.info(f"**Target Use Case:** {target_use_case}") | |
| # Show key changes | |
| key_changes = variation.get('key_changes', []) | |
| if key_changes: | |
| st.write("**Key Changes:**") | |
| for change in key_changes: | |
| st.write(f"β’ {change}") | |
| # Show optimized content | |
| optimized_content = variation.get('optimized_content', '') | |
| if optimized_content: | |
| st.text_area( | |
| f"{variation_type} content:", | |
| value=optimized_content, | |
| height=150, | |
| key=f"variation_{i}" | |
| ) | |
| st.write("---") | |
| def display_readability_results(self, result): | |
| """Display readability analysis results""" | |
| st.markdown("### π Readability Analysis") | |
| # Basic metrics | |
| basic_metrics = result.get('basic_metrics', {}) | |
| if basic_metrics: | |
| st.markdown("#### π Basic Metrics") | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| st.metric("Total Words", basic_metrics.get('total_words', 0)) | |
| with col2: | |
| st.metric("Sentences", basic_metrics.get('total_sentences', 0)) | |
| with col3: | |
| st.metric("Paragraphs", basic_metrics.get('total_paragraphs', 0)) | |
| with col4: | |
| st.metric("AI Readability", f"{result.get('ai_readability_score', 0)}/10") | |
| # Complexity indicators | |
| complexity = result.get('complexity_indicators', {}) | |
| if complexity: | |
| st.markdown("#### π― Complexity Analysis") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.metric("Long Sentences", f"{complexity.get('long_sentences_percentage', 0):.1f}%") | |
| with col2: | |
| st.metric("Complex Words", f"{complexity.get('complex_words_percentage', 0):.1f}%") | |
| # Recommendations | |
| recommendations = result.get('recommendations', []) | |
| if recommendations: | |
| st.markdown("#### π‘ Readability Recommendations") | |
| for i, rec in enumerate(recommendations, 1): | |
| st.write(f"**{i}.** {rec}") | |
| def display_entity_results(self, result): | |
| """Display entity extraction results""" | |
| st.markdown("### π·οΈ Entity Analysis") | |
| # Named entities | |
| named_entities = result.get('named_entities', []) | |
| if named_entities: | |
| st.markdown("#### π₯ Named Entities") | |
| st.write(", ".join(named_entities)) | |
| # Key topics | |
| key_topics = result.get('key_topics', []) | |
| if key_topics: | |
| st.markdown("#### π Key Topics") | |
| st.write(", ".join(key_topics)) | |
| # Technical terms | |
| technical_terms = result.get('technical_terms', []) | |
| if technical_terms: | |
| st.markdown("#### π§ Technical Terms") | |
| st.write(", ".join(technical_terms)) | |
| # Semantic keywords | |
| semantic_keywords = result.get('semantic_keywords', []) | |
| if semantic_keywords: | |
| st.markdown("#### π Semantic Keywords") | |
| st.write(", ".join(semantic_keywords)) | |
| # Question opportunities | |
| questions = result.get('question_opportunities', []) | |
| if questions: | |
| st.markdown("#### β Question Opportunities") | |
| for q in questions: | |
| st.write(f"β’ {q}") | |
| def display_voice_search_results(self, result): | |
| """Display voice search optimization results""" | |
| st.markdown("### π€ Voice Search Optimization") | |
| # Conversational score | |
| conv_score = result.get('conversational_score', 0) | |
| if conv_score: | |
| st.metric("Conversational Score", f"{conv_score}/10") | |
| # Question-answer pairs | |
| qa_pairs = result.get('question_answer_pairs', []) | |
| if qa_pairs: | |
| st.markdown("#### β Question-Answer Pairs") | |
| for qa in qa_pairs: | |
| st.write(f"**Q:** {qa.get('question', '')}") | |
| st.write(f"**A:** {qa.get('answer', '')}") | |
| st.write("---") | |
| # Featured snippet candidates | |
| snippets = result.get('featured_snippet_candidates', []) | |
| if snippets: | |
| st.markdown("#### π Featured Snippet Candidates") | |
| for i, snippet in enumerate(snippets, 1): | |
| st.write(f"**{i}.** {snippet}") | |
| # Voice optimized content | |
| voice_content = result.get('voice_optimized_content', '') | |
| if voice_content: | |
| st.markdown("#### π€ Voice-Optimized Content") | |
| st.text_area("Conversational version:", value=voice_content, height=200, key="voice_output") | |
| def display_export_options(self, result, optimization_type, original_text): | |
| """Display export options for results""" | |
| st.markdown("### π₯ Export Results") | |
| if st.button("π Generate Report", key="export_button"): | |
| import time | |
| export_data = { | |
| 'timestamp': time.time(), | |
| 'optimization_type': optimization_type, | |
| 'original_text': original_text, | |
| 'original_word_count': len(original_text.split()), | |
| 'results': result | |
| } | |
| st.download_button( | |
| label="Download Analysis Report", | |
| data=json.dumps(export_data, indent=2), | |
| file_name=f"{optimization_type}_analysis_{int(time.time())}.json", | |
| mime="application/json" | |
| ) | |
| def render_website_analysis_tab(self): | |
| """Render Website GEO Analysis tab""" | |
| st.header("π Website GEO Analysis") | |
| st.markdown("Analyze websites for Generative Engine Optimization (GEO) performance.") | |
| # URL input | |
| col1, col2 = st.columns([3, 1]) | |
| with col1: | |
| website_url = st.text_input( | |
| "Enter website URL:", | |
| placeholder="https://example.com" | |
| ) | |
| with col2: | |
| max_pages = st.selectbox("Pages to analyze:", [1, 3, 5], index=0) | |
| # Analysis options | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| include_subpages = st.checkbox("Include subpages", value=False) | |
| with col2: | |
| detailed_analysis = st.checkbox("Detailed analysis", value=True) | |
| # Submit button | |
| if st.button("π Analyze Website", key="website_analyze"): | |
| if not website_url.strip(): | |
| st.warning("Please enter a website URL.") | |
| return | |
| try: | |
| # Normalize URL | |
| if not website_url.startswith(('http://', 'https://')): | |
| website_url = 'https://' + website_url | |
| with st.spinner(f"Analyzing website: {website_url}"): | |
| # Parse website content | |
| pages_data = self.webpage_parser.parse_website( | |
| website_url, | |
| max_pages=max_pages, | |
| include_subpages=include_subpages | |
| ) | |
| if not pages_data: | |
| st.error("Could not extract content from the website.") | |
| return | |
| st.success(f"Successfully extracted content from {len(pages_data)} page(s)") | |
| # Analyze GEO scores | |
| with st.spinner("Calculating GEO scores..."): | |
| geo_results = [] | |
| for i, page_data in enumerate(pages_data): | |
| with st.spinner(f"Analyzing page {i+1}/{len(pages_data)}..."): | |
| analysis = self.geo_scorer.analyze_page_geo( | |
| page_data['content'], | |
| page_data['title'], | |
| detailed=detailed_analysis | |
| ) | |
| if not analysis.get('error'): | |
| analysis['page_data'] = page_data | |
| geo_results.append(analysis) | |
| else: | |
| st.warning(f"Could not analyze page {i+1}: {analysis['error']}") | |
| if not geo_results: | |
| st.error("Could not analyze any pages from the website.") | |
| return | |
| # Display results | |
| self.display_geo_results(geo_results, website_url) | |
| # Export functionality | |
| st.markdown("### π₯ Export Results") | |
| if st.button("π Generate Full Report"): | |
| report_data = self.result_exporter.export_geo_results( | |
| geo_results, | |
| website_url | |
| ) | |
| st.download_button( | |
| label="Download GEO Report", | |
| data=json.dumps(report_data, indent=2), | |
| file_name=f"geo_analysis_{website_url.replace('https://', '').replace('/', '_')}.json", | |
| mime="application/json" | |
| ) | |
| except Exception as e: | |
| st.error(f"An error occurred during website analysis: {str(e)}") | |
| def display_geo_results(self, geo_results: List[Dict], website_url: str): | |
| """Display GEO analysis results""" | |
| st.markdown("## π GEO Analysis Results") | |
| # Calculate average scores | |
| avg_scores = self.calculate_average_scores(geo_results) | |
| overall_avg = sum(avg_scores.values()) / len(avg_scores) if avg_scores else 0 | |
| # Main score display | |
| col1, col2, col3 = st.columns([1, 2, 1]) | |
| with col2: | |
| st.metric( | |
| "Overall GEO Score", | |
| f"{overall_avg:.1f}/10", | |
| delta=f"{overall_avg - 7.0:.1f}" if overall_avg != 7.0 else None | |
| ) | |
| # Individual metrics | |
| st.markdown("### π Detailed GEO Metrics") | |
| # First row of metrics | |
| col1, col2, col3, col4 = st.columns(4) | |
| metrics_row1 = [ | |
| ("AI Search Visibility", "ai_search_visibility"), | |
| ("Query Intent Match", "query_intent_matching"), | |
| ("Factual Accuracy", "factual_accuracy"), | |
| ("Conversational Ready", "conversational_readiness") | |
| ] | |
| for i, (display_name, key) in enumerate(metrics_row1): | |
| with [col1, col2, col3, col4][i]: | |
| score = avg_scores.get(key, 0) | |
| st.metric(display_name, f"{score:.1f}") | |
| # Second row of metrics | |
| col1, col2, col3, col4 = st.columns(4) | |
| metrics_row2 = [ | |
| ("Semantic Richness", "semantic_richness"), | |
| ("Context Complete", "context_completeness"), | |
| ("Citation Worthy", "citation_worthiness"), | |
| ("Multi-Query Cover", "multi_query_coverage") | |
| ] | |
| for i, (display_name, key) in enumerate(metrics_row2): | |
| with [col1, col2, col3, col4][i]: | |
| score = avg_scores.get(key, 0) | |
| st.metric(display_name, f"{score:.1f}") | |
| # Recommendations | |
| self.display_recommendations(geo_results) | |
| # Detailed page analysis | |
| with st.expander("π Detailed Page Analysis"): | |
| for i, analysis in enumerate(geo_results): | |
| page_data = analysis.get('page_data', {}) | |
| st.markdown(f"#### Page {i+1}: {page_data.get('title', 'Unknown Title')}") | |
| st.write(f"**URL**: {page_data.get('url', 'Unknown')}") | |
| st.write(f"**Word Count**: {page_data.get('word_count', 0)}") | |
| # Show topics and entities if available | |
| if 'primary_topics' in analysis: | |
| st.write(f"**Topics**: {', '.join(analysis['primary_topics'])}") | |
| if 'entities' in analysis: | |
| st.write(f"**Entities**: {', '.join(analysis['entities'])}") | |
| # Show page-specific scores | |
| if 'geo_scores' in analysis: | |
| scores = analysis['geo_scores'] | |
| score_text = ", ".join([f"{k}: {v:.1f}" for k, v in scores.items()]) | |
| st.write(f"**Scores**: {score_text}") | |
| st.write("---") | |
| def display_recommendations(self, geo_results: List[Dict]): | |
| """Display optimization recommendations""" | |
| st.markdown("### π‘ Optimization Recommendations") | |
| # Collect all recommendations | |
| all_recommendations = [] | |
| all_opportunities = [] | |
| for analysis in geo_results: | |
| all_recommendations.extend(analysis.get('recommendations', [])) | |
| all_opportunities.extend(analysis.get('optimization_opportunities', [])) | |
| # Remove duplicates and display | |
| unique_recommendations = list(set(all_recommendations)) | |
| if unique_recommendations: | |
| for i, rec in enumerate(unique_recommendations[:5], 1): | |
| st.write(f"**{i}.** {rec}") | |
| # Priority opportunities | |
| if all_opportunities: | |
| st.markdown("#### π Priority Optimizations") | |
| high_priority = [opp for opp in all_opportunities if opp.get('priority') == 'high'] | |
| medium_priority = [opp for opp in all_opportunities if opp.get('priority') == 'medium'] | |
| if high_priority: | |
| st.markdown("##### π΄ High Priority") | |
| for opp in high_priority[:3]: | |
| st.write(f"**{opp.get('type', 'Optimization')}**: {opp.get('description', 'No description')}") | |
| if medium_priority: | |
| st.markdown("##### π‘ Medium Priority") | |
| for opp in medium_priority[:3]: | |
| st.write(f"**{opp.get('type', 'Optimization')}**: {opp.get('description', 'No description')}") | |
| def calculate_average_scores(self, geo_results: List[Dict]) -> Dict[str, float]: | |
| """Calculate average GEO scores across all pages""" | |
| if not geo_results: | |
| return {} | |
| # Get all score keys from the first result | |
| score_keys = list(geo_results[0].get('geo_scores', {}).keys()) | |
| avg_scores = {} | |
| for key in score_keys: | |
| scores = [ | |
| result['geo_scores'][key] | |
| for result in geo_results | |
| if 'geo_scores' in result and key in result['geo_scores'] | |
| ] | |
| avg_scores[key] = sum(scores) / len(scores) if scores else 0 | |
| return avg_scores | |
| def save_uploaded_file(self, uploaded_file) -> str: | |
| """Save uploaded file to temporary location""" | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file: | |
| tmp_file.write(uploaded_file.read()) | |
| return tmp_file.name | |
| def main(): | |
| """Main entry point""" | |
| app = GEOSEOApp() | |
| app.run() | |
| if __name__ == "__main__": | |
| main() |