Spaces:
Runtime error
Runtime error
| """ | |
| Main Streamlit Application - GEO SEO AI Optimizer with RAG-Enhanced Content Optimization | |
| Entry point for the application with UI components | |
| """ | |
| import streamlit as st | |
| import os | |
| import tempfile | |
| import json | |
| from typing import Dict, Any, List | |
| import time | |
| # Import our custom modules | |
| from utils.parser import PDFParser, TextParser, WebpageParser | |
| from utils.scorer import GEOScorer | |
| from utils.optimizer import ContentOptimizer # This will be your enhanced version | |
| from utils.chunker import VectorChunker | |
| from utils.export import ResultExporter | |
| # Import LangChain components | |
| from langchain_groq import ChatGroq | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate | |
| from langchain_core.messages import AIMessage, HumanMessage | |
| class GEOSEOApp: | |
| """Main application class that orchestrates all components""" | |
| def __init__(self): | |
| self.setup_config() | |
| self.setup_models() | |
| self.setup_parsers() | |
| self.setup_components() | |
| def setup_config(self): | |
| """Initialize configuration and API keys""" | |
| self.groq_api_key = os.getenv("GROQ_API_KEY", "your-groq-api-key") | |
| self.hf_api_key = os.getenv("HUGGINGFACE_API_KEY", "your-huggingface-api-key") | |
| # Create data directory if it doesn't exist | |
| os.makedirs("data/uploaded_files", exist_ok=True) | |
| def setup_models(self): | |
| """Initialize LLM and embedding models""" | |
| self.llm = ChatGroq( | |
| api_key=self.groq_api_key, | |
| model_name="llama-3.1-8b-instant", | |
| temperature=0.1 | |
| ) | |
| self.embeddings = HuggingFaceEmbeddings( | |
| model_name="sentence-transformers/all-MiniLM-L6-v2", | |
| model_kwargs={"device": "cpu"} | |
| # model_name="sentence-transformers/all-MiniLM-L6-v2", | |
| # model_kwargs={"device": "cpu"}, | |
| # cache_folder="./hf_caches", | |
| ) | |
| def setup_parsers(self): | |
| """Initialize content parsers""" | |
| self.pdf_parser = PDFParser() | |
| self.text_parser = TextParser() | |
| self.webpage_parser = WebpageParser() | |
| def setup_components(self): | |
| """Initialize processing components with RAG integration""" | |
| self.geo_scorer = GEOScorer(self.llm) | |
| self.vector_chunker = VectorChunker(self.embeddings) | |
| # Enhanced content optimizer with RAG capabilities | |
| self.content_optimizer = ContentOptimizer(self.llm, self.vector_chunker) | |
| self.result_exporter = ResultExporter() | |
| def run(self): | |
| """Main application runner""" | |
| st.set_page_config( | |
| page_title="GEO SEO AI Optimizer", | |
| page_icon="π", | |
| layout="wide" | |
| ) | |
| st.title("π GEO SEO AI Optimizer") | |
| st.markdown("*Optimize your content for AI search engines and LLM systems with RAG-enhanced analysis*") | |
| # Sidebar | |
| self.render_sidebar() | |
| # Main tabs | |
| tab1, tab2, tab3, tab4 = st.tabs([ | |
| "π Website GEO Analysis", | |
| "π§ GEO Content Enhancement", | |
| "π Document Q&A", | |
| "π§ Generate GEO Content", | |
| ]) | |
| with tab1: | |
| self.render_website_analysis_tab() | |
| with tab2: | |
| self.render_geo_content_enhancement_tab() | |
| with tab3: | |
| self.render_document_qa_tab() | |
| with tab4: | |
| self.render_generate_geo_content_tab() | |
| def render_sidebar(self): | |
| """Render sidebar with information and controls""" | |
| st.sidebar.title("π οΈ GEO Tools") | |
| st.sidebar.markdown("- π Website GEO Analysis") | |
| st.sidebar.markdown("- π§ RAG-Enhanced Content Optimization") | |
| st.sidebar.markdown("- π AI-First SEO Scoring") | |
| st.sidebar.markdown("- π Document Q&A with RAG") | |
| st.sidebar.markdown("- π§ Generate GEO Content") | |
| st.sidebar.markdown("---") | |
| st.sidebar.markdown("### π GEO Metrics") | |
| st.sidebar.markdown("**AI Search Visibility**: How likely AI engines will surface your content") | |
| st.sidebar.markdown("**Query Intent Matching**: How well content matches user queries") | |
| st.sidebar.markdown("**Conversational Readiness**: Suitability for AI chat responses") | |
| st.sidebar.markdown("**Citation Worthiness**: Probability of being cited by AI") | |
| st.sidebar.markdown("**Context Completeness**: How self-contained the content is") | |
| st.sidebar.markdown("**Semantic Richness**: Depth of topic coverage") | |
| st.sidebar.markdown("---") | |
| st.sidebar.markdown("### π§ RAG Enhancement") | |
| st.sidebar.markdown("- **Knowledge Base**: GEO best practices") | |
| st.sidebar.markdown("- **Contextual Analysis**: AI-informed optimization") | |
| st.sidebar.markdown("- **Entity Extraction**: AI-powered entity recognition") | |
| st.sidebar.markdown("- **Competitive Analysis**: Gap identification") | |
| def render_geo_content_enhancement_tab(self): | |
| """Render GEO Content Enhancement tab with RAG integration""" | |
| st.header("π§ GEO Content Enhancement with RAG") | |
| st.markdown("Analyze and optimize your content using AI-powered Generative Engine Optimization with RAG-enhanced knowledge base.") | |
| # Content input | |
| input_text = st.text_area( | |
| "Enter content to analyze and enhance:", | |
| height=200, | |
| key="geo_enhancement_input", | |
| help="Paste your content here for GEO optimization using RAG-enhanced analysis" | |
| ) | |
| # GEO Optimization type selector | |
| st.markdown("### βοΈ GEO Optimization Settings") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| optimization_type = st.selectbox( | |
| "Select GEO Optimization Type:", | |
| options=[ | |
| "geo_standard", | |
| # "competitive_geo", | |
| # "geo_readability", | |
| # "geo_entity_extraction", | |
| # "geo_variations", | |
| # "geo_batch_optimize" | |
| ], | |
| format_func=lambda x: { | |
| "geo_standard": "π§ Standard GEO Enhancement", | |
| # "competitive_geo": "π Competitive GEO Analysis", | |
| # "geo_readability": "π GEO Readability Analysis", | |
| # "geo_entity_extraction": "π·οΈ GEO Entity Extraction", | |
| # "geo_variations": "π GEO Content Variations", | |
| # "geo_batch_optimize": "π¦ Batch GEO Optimization" | |
| }[x], | |
| index=0, | |
| help="Choose the type of GEO optimization powered by RAG analysis" | |
| ) | |
| with col2: | |
| # Additional options based on optimization type | |
| if optimization_type in ["geo_standard", "competitive_geo"]: | |
| analyze_only = st.checkbox("Analysis", value=True) | |
| include_rag_context = st.checkbox("Include RAG context details", value=True) | |
| # elif optimization_type == "geo_variations": | |
| # num_variations = st.slider("Number of variations", min_value=1, max_value=3, value=2) | |
| # analyze_only = False | |
| # include_rag_context = True | |
| # elif optimization_type == "geo_batch_optimize": | |
| # st.info("For batch optimization, separate multiple content pieces with '---' divider") | |
| # analyze_only = False | |
| # include_rag_context = True | |
| else: | |
| analyze_only = False | |
| include_rag_context = True | |
| # Show description based on optimization type | |
| optimization_descriptions = { | |
| "geo_standard": "π§ RAG-enhanced GEO optimization focusing on AI search visibility, conversational readiness, and citation worthiness using knowledge base guidance.", | |
| # "competitive_geo": "π Competitive GEO analysis against best practices with gap identification and actionable recommendations using RAG context.", | |
| # "geo_readability": "π Detailed readability analysis specifically optimized for AI systems and LLM consumption patterns.", | |
| # "geo_entity_extraction": "π·οΈ AI-powered extraction of key entities, topics, and concepts relevant for GEO optimization.", | |
| # "geo_variations": "π Generate multiple GEO-optimized variations (FAQ, conversational, authoritative) using RAG knowledge.", | |
| # "geo_batch_optimize": "π¦ Process multiple content pieces simultaneously with consistent GEO optimization." | |
| } | |
| st.info(f"**{optimization_descriptions[optimization_type]}**") | |
| # Knowledge base status | |
| if hasattr(self.content_optimizer, 'geo_knowledge'): | |
| st.success(f"β RAG Knowledge Base Loaded: {len(self.content_optimizer.geo_knowledge)} GEO best practice documents") | |
| else: | |
| st.warning("β οΈ RAG Knowledge Base not available - falling back to standard optimization") | |
| # Submit button | |
| if st.button("π Process Content with GEO+RAG", key="geo_enhancement_submit"): | |
| if not input_text.strip(): | |
| st.warning("Please enter some content to analyze.") | |
| return | |
| try: | |
| with st.spinner(f"Processing content with {optimization_type} using RAG-enhanced GEO analysis..."): | |
| # Handle different GEO optimization types | |
| if optimization_type == "geo_standard": | |
| result = self.content_optimizer.optimize_content_with_rag( | |
| input_text, | |
| optimization_type="geo_standard", | |
| analyze_only=analyze_only | |
| ) | |
| elif optimization_type == "competitive_geo": | |
| result = self.content_optimizer.optimize_content_with_rag( | |
| input_text, | |
| optimization_type="competitive_geo", | |
| analyze_only=analyze_only | |
| ) | |
| elif optimization_type == "geo_readability": | |
| result = self.content_optimizer.analyze_geo_readability(input_text) | |
| elif optimization_type == "geo_entity_extraction": | |
| result = self.content_optimizer.extract_geo_entities(input_text) | |
| elif optimization_type == "geo_variations": | |
| result = self.content_optimizer.generate_geo_variations( | |
| input_text, | |
| num_variations=num_variations | |
| ) | |
| elif optimization_type == "geo_batch_optimize": | |
| # Split content by '---' separator | |
| content_pieces = [piece.strip() for piece in input_text.split('---') if piece.strip()] | |
| if len(content_pieces) > 1: | |
| result = self.content_optimizer.batch_optimize_with_rag(content_pieces) | |
| else: | |
| st.warning("For batch optimization, please separate content pieces with '---'") | |
| return | |
| if isinstance(result, list): | |
| # Handle list results (variations, batch) | |
| if any(r.get("error") for r in result): | |
| failed_results = [r for r in result if r.get("error")] | |
| st.error(f"Some processing failed: {len(failed_results)} out of {len(result)} items") | |
| else: | |
| st.success("All content processed successfully!") | |
| elif result.get("error"): | |
| st.error(f"Processing failed: {result['error']}") | |
| return | |
| else: | |
| st.success(f"{optimization_type.replace('_', ' ').title()} completed successfully!") | |
| # Display results based on optimization type | |
| self.display_geo_enhancement_results(result, optimization_type, input_text, include_rag_context) | |
| except Exception as e: | |
| st.error(f"An error occurred: {str(e)}") | |
| def display_geo_enhancement_results(self, result, optimization_type, original_text, include_rag_context=True): | |
| """Display results based on GEO optimization type""" | |
| if optimization_type == "geo_batch_optimize": | |
| self.display_geo_batch_results(result) | |
| elif optimization_type == "geo_variations": | |
| self.display_geo_variation_results(result) | |
| elif optimization_type == "geo_readability": | |
| self.display_geo_readability_results(result) | |
| elif optimization_type == "geo_entity_extraction": | |
| self.display_geo_entity_results(result) | |
| else: | |
| self.display_standard_geo_results(result, optimization_type, include_rag_context) | |
| # Export functionality | |
| self.display_geo_export_options(result, optimization_type, original_text) | |
| def display_standard_geo_results(self, result, optimization_type, include_rag_context): | |
| """Display results for standard and competitive GEO optimizations""" | |
| st.markdown("### π GEO Analysis Results") | |
| # Show GEO scores if available | |
| geo_analysis = result.get("geo_analysis", {}) | |
| if geo_analysis: | |
| st.markdown("#### π― GEO Performance Metrics") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| current_score = geo_analysis.get("current_geo_score", 0) | |
| st.metric("Overall GEO Score", f"{current_score}/10") | |
| with col2: | |
| ai_visibility = geo_analysis.get("ai_search_visibility", 0) | |
| st.metric("AI Search Visibility", f"{ai_visibility}/10") | |
| with col3: | |
| citation_worthy = geo_analysis.get("citation_worthiness", 0) | |
| st.metric("Citation Worthiness", f"{citation_worthy}/10") | |
| # Second row of metrics | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| query_matching = geo_analysis.get("query_intent_matching", 0) | |
| st.metric("Query Intent Match", f"{query_matching}/10") | |
| with col2: | |
| conversational = geo_analysis.get("conversational_readiness", 0) | |
| st.metric("Conversational Ready", f"{conversational}/10") | |
| with col3: | |
| context_complete = geo_analysis.get("context_completeness", 0) | |
| st.metric("Context Complete", f"{context_complete}/10") | |
| # Show optimization opportunities | |
| opportunities = result.get("optimization_opportunities", []) | |
| if opportunities: | |
| st.markdown("#### π Optimization Opportunities") | |
| high_priority = [opp for opp in opportunities if opp.get('priority') == 'high'] | |
| medium_priority = [opp for opp in opportunities if opp.get('priority') == 'medium'] | |
| if high_priority: | |
| st.markdown("##### π΄ High Priority") | |
| for opp in high_priority: | |
| st.write(f"**{opp.get('type', 'Optimization')}**: {opp.get('description', '')}") | |
| if opp.get('expected_impact'): | |
| st.write(f"*Expected Impact: {opp.get('expected_impact')}*") | |
| st.write("---") | |
| if medium_priority: | |
| st.markdown("##### π‘ Medium Priority") | |
| for opp in medium_priority: | |
| st.write(f"**{opp.get('type', 'Optimization')}**: {opp.get('description', '')}") | |
| if opp.get('expected_impact'): | |
| st.write(f"*Expected Impact: {opp.get('expected_impact')}*") | |
| st.write("---") | |
| # Show GEO keywords and entities | |
| geo_keywords = result.get("geo_keywords", {}) | |
| if geo_keywords: | |
| st.markdown("#### π GEO Keywords & Entities") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| primary_entities = geo_keywords.get("primary_entities", []) | |
| if primary_entities: | |
| st.write("**Primary Entities:**") | |
| st.write(", ".join(primary_entities)) | |
| semantic_terms = geo_keywords.get("semantic_terms", []) | |
| if semantic_terms: | |
| st.write("**Semantic Terms:**") | |
| st.write(", ".join(semantic_terms)) | |
| with col2: | |
| question_patterns = geo_keywords.get("question_patterns", []) | |
| if question_patterns: | |
| st.write("**Question Patterns:**") | |
| for q in question_patterns: | |
| st.write(f"β’ {q}") | |
| related_concepts = geo_keywords.get("related_concepts", []) | |
| if related_concepts: | |
| st.write("**Related Concepts:**") | |
| st.write(", ".join(related_concepts)) | |
| # Show optimized content | |
| optimized_content = result.get("optimized_content", {}) | |
| if optimized_content: | |
| enhanced_text = optimized_content.get("enhanced_text", "") | |
| if enhanced_text: | |
| st.markdown("#### β¨ GEO-Optimized Content") | |
| st.text_area( | |
| "Enhanced version:", | |
| value=enhanced_text, | |
| height=250, | |
| key="geo_optimized_output" | |
| ) | |
| # Show structural improvements | |
| structural_improvements = optimized_content.get("structural_improvements", []) | |
| if structural_improvements: | |
| st.markdown("**Structural Improvements:**") | |
| for improvement in structural_improvements: | |
| st.write(f"β’ {improvement}") | |
| # Show semantic enhancements | |
| semantic_enhancements = optimized_content.get("semantic_enhancements", []) | |
| if semantic_enhancements: | |
| st.markdown("**Semantic Enhancements:**") | |
| for enhancement in semantic_enhancements: | |
| st.write(f"β’ {enhancement}") | |
| # Show competitive analysis if available | |
| if "competitive_gaps" in result: | |
| st.markdown("#### π Competitive GEO Analysis") | |
| competitive_gaps = result["competitive_gaps"] | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| missing_questions = competitive_gaps.get("missing_question_patterns", []) | |
| if missing_questions: | |
| st.write("**Missing Question Patterns:**") | |
| for q in missing_questions: | |
| st.write(f"β’ {q}") | |
| entity_gaps = competitive_gaps.get("entity_gaps", []) | |
| if entity_gaps: | |
| st.write("**Entity Gaps:**") | |
| st.write(", ".join(entity_gaps)) | |
| with col2: | |
| semantic_opportunities = competitive_gaps.get("semantic_opportunities", []) | |
| if semantic_opportunities: | |
| st.write("**Semantic Opportunities:**") | |
| st.write(", ".join(semantic_opportunities)) | |
| structural_weaknesses = competitive_gaps.get("structural_weaknesses", []) | |
| if structural_weaknesses: | |
| st.write("**Structural Weaknesses:**") | |
| for weakness in structural_weaknesses: | |
| st.write(f"β’ {weakness}") | |
| # Show recommendations | |
| recommendations = result.get("recommendations", []) | |
| if recommendations: | |
| st.markdown("#### π‘ GEO Recommendations") | |
| for i, rec in enumerate(recommendations, 1): | |
| st.write(f"**{i}.** {rec}") | |
| # RAG context information | |
| if include_rag_context and result.get("rag_enhanced"): | |
| with st.expander("π§ RAG Enhancement Details"): | |
| st.write("**RAG Status:** β Knowledge base successfully applied") | |
| st.write(f"**Knowledge Sources:** {result.get('knowledge_sources', 'Multiple')} GEO best practice documents") | |
| st.write(f"**Enhancement Type:** {result.get('optimization_type', 'Standard')}") | |
| if result.get('parsing_error'): | |
| st.warning(f"**Parsing Note:** {result['parsing_error']}") | |
| def display_geo_batch_results(self, results): | |
| """Display batch GEO optimization results""" | |
| st.markdown("### π¦ Batch GEO Processing Results") | |
| successful_results = [r for r in results if not r.get('error')] | |
| failed_results = [r for r in results if r.get('error')] | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("Total Pieces", len(results)) | |
| with col2: | |
| st.metric("Successful", len(successful_results)) | |
| with col3: | |
| st.metric("Failed", len(failed_results)) | |
| # Show individual results | |
| for result in results: | |
| idx = result.get('batch_index', 0) | |
| st.markdown(f"#### Content Piece {idx + 1}") | |
| if result.get('error'): | |
| st.error(f"Processing failed: {result['error']}") | |
| else: | |
| # Show GEO scores | |
| geo_analysis = result.get("geo_analysis", {}) | |
| if geo_analysis: | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("GEO Score", f"{geo_analysis.get('current_geo_score', 0):.1f}") | |
| with col2: | |
| st.metric("AI Visibility", f"{geo_analysis.get('ai_search_visibility', 0):.1f}") | |
| with col3: | |
| st.metric("Citation Worthy", f"{geo_analysis.get('citation_worthiness', 0):.1f}") | |
| # Show optimized content if available | |
| optimized_content = result.get("optimized_content", {}) | |
| enhanced_text = optimized_content.get("enhanced_text", "") | |
| if enhanced_text: | |
| with st.expander("View GEO-optimized content"): | |
| st.text_area("", value=enhanced_text[:500] + "...", height=150, key=f"batch_geo_output_{idx}") | |
| st.write("---") | |
| def display_geo_variation_results(self, variations): | |
| """Display GEO content variation results""" | |
| st.markdown("### π GEO Content Variations") | |
| for i, variation in enumerate(variations): | |
| if variation.get('error'): | |
| st.error(f"Variation {i+1} failed: {variation['error']}") | |
| continue | |
| variation_type = variation.get('variation_type', f'Variation {i+1}') | |
| st.markdown(f"#### {variation_type.replace('_', ' ').title()} Version") | |
| # Show GEO improvements | |
| geo_improvements = variation.get('geo_improvements', []) | |
| if geo_improvements: | |
| st.write("**GEO Improvements:**") | |
| for improvement in geo_improvements: | |
| st.write(f"β’ {improvement}") | |
| # Show target AI systems | |
| target_ai_systems = variation.get('target_ai_systems', []) | |
| if target_ai_systems: | |
| st.write(f"**Optimized For:** {', '.join(target_ai_systems)}") | |
| # Show expected benefits | |
| expected_benefits = variation.get('expected_geo_benefits', []) | |
| if expected_benefits: | |
| st.write("**Expected GEO Benefits:**") | |
| for benefit in expected_benefits: | |
| st.write(f"β’ {benefit}") | |
| # Show optimized content | |
| optimized_content = variation.get('optimized_content', '') | |
| if optimized_content: | |
| st.text_area( | |
| f"{variation_type} content:", | |
| value=optimized_content, | |
| height=200, | |
| key=f"geo_variation_{i}" | |
| ) | |
| st.write("---") | |
| def display_geo_readability_results(self, result): | |
| """Display GEO readability analysis results""" | |
| st.markdown("### π GEO Readability Analysis") | |
| # Basic GEO metrics | |
| geo_metrics = result.get('geo_readability_metrics', {}) | |
| if geo_metrics: | |
| st.markdown("#### π GEO Content Metrics") | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| st.metric("Total Words", geo_metrics.get('total_words', 0)) | |
| with col2: | |
| st.metric("Questions", geo_metrics.get('questions_count', 0)) | |
| with col3: | |
| st.metric("Headings", geo_metrics.get('headings_count', 0)) | |
| with col4: | |
| st.metric("Lists", geo_metrics.get('lists_count', 0)) | |
| # Second row | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| st.metric("Entity Mentions", geo_metrics.get('entity_mentions', 0)) | |
| with col2: | |
| st.metric("Data Points", geo_metrics.get('numeric_data_points', 0)) | |
| with col3: | |
| st.metric("Paragraphs", geo_metrics.get('total_paragraphs', 0)) | |
| with col4: | |
| geo_score = result.get('geo_readability_score', 0) | |
| st.metric("GEO Readability", f"{geo_score}/10") | |
| # AI optimization indicators | |
| ai_indicators = result.get('ai_optimization_indicators', {}) | |
| if ai_indicators: | |
| st.markdown("#### π€ AI Optimization Indicators") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| question_ratio = ai_indicators.get('question_ratio', 0) | |
| st.metric("Question Ratio", f"{question_ratio:.2%}") | |
| structure_score = ai_indicators.get('structure_score', 0) | |
| st.metric("Structure Score", f"{structure_score:.1f}/10") | |
| with col2: | |
| entity_density = ai_indicators.get('entity_density', 0) | |
| st.metric("Entity Density", f"{entity_density:.2%}") | |
| data_richness = ai_indicators.get('data_richness', 0) | |
| st.metric("Data Richness", f"{data_richness:.2%}") | |
| # GEO recommendations | |
| geo_recommendations = result.get('geo_recommendations', []) | |
| if geo_recommendations: | |
| st.markdown("#### π‘ GEO Optimization Recommendations") | |
| for i, rec in enumerate(geo_recommendations, 1): | |
| st.write(f"**{i}.** {rec}") | |
| def display_geo_entity_results(self, result): | |
| """Display GEO entity extraction results""" | |
| st.markdown("### π·οΈ GEO Entity Analysis") | |
| if result.get('error'): | |
| st.error(f"Entity extraction failed: {result['error']}") | |
| return | |
| geo_entities = result.get('geo_entities', {}) | |
| if geo_entities: | |
| # Display extracted entities | |
| for entity_type, entity_data in geo_entities.items(): | |
| if entity_data: | |
| st.markdown(f"#### {entity_type.replace('_', ' ').title()}") | |
| st.write(entity_data) | |
| st.write("---") | |
| # Extraction metadata | |
| extraction_success = result.get('extraction_success', False) | |
| if extraction_success: | |
| st.success("β Entity extraction completed successfully") | |
| st.write(f"**Content Length:** {result.get('content_length', 0)} characters") | |
| st.write(f"**Extraction Method:** {result.get('extraction_method', 'Unknown')}") | |
| def display_geo_export_options(self, result, optimization_type, original_text): | |
| """Display export options for GEO results""" | |
| st.markdown("### π₯ Export GEO Results") | |
| # Prepare export data | |
| export_data = { | |
| 'timestamp': time.time(), | |
| 'optimization_type': optimization_type, | |
| 'original_text': original_text, | |
| 'original_word_count': len(original_text.split()), | |
| 'geo_results': result, | |
| 'rag_enhanced': result.get('rag_enhanced', False) if not isinstance(result, list) else any(r.get('rag_enhanced', False) for r in result), | |
| 'knowledge_sources': result.get('knowledge_sources', 0) if not isinstance(result, list) else 'multiple' | |
| } | |
| # Serialize data to JSON | |
| export_json = json.dumps(export_data, indent=2, default=str) | |
| # Add download button | |
| st.download_button( | |
| label="π₯ Download GEO Analysis Report", | |
| data=export_json, | |
| file_name=f"geo_{optimization_type}_analysis_{int(time.time())}.json", | |
| mime="application/json" | |
| ) | |
| # Keep existing methods for other tabs (render_document_qa_tab, render_website_analysis_tab, etc.) | |
| # ... (rest of the methods remain the same as in your original code) | |
| def render_document_qa_tab(self): | |
| """Render Document Q&A tab""" | |
| st.header("π Document Question Answering") | |
| st.markdown("Upload documents or paste text to ask questions using RAG.") | |
| # File upload | |
| uploaded_file = st.file_uploader("Upload a PDF file", type=["pdf"]) | |
| # Text input | |
| pasted_text = st.text_area("Or paste text directly:", height=150) | |
| # Question input | |
| user_query = st.text_input("Ask a question about the content:") | |
| # Submit button | |
| if st.button("π Ask Question", key="qa_submit"): | |
| if not user_query.strip(): | |
| st.warning("Please enter a question.") | |
| return | |
| try: | |
| # Parse content | |
| documents = [] | |
| if uploaded_file: | |
| with st.spinner("Processing PDF..."): | |
| # Save uploaded file temporarily | |
| temp_path = self.save_uploaded_file(uploaded_file) | |
| documents = self.pdf_parser.parse(temp_path) | |
| os.unlink(temp_path) # Clean up | |
| elif pasted_text.strip(): | |
| with st.spinner("Processing text..."): | |
| documents = self.text_parser.parse(pasted_text) | |
| else: | |
| st.warning("Please upload a PDF or paste some text.") | |
| return | |
| # Create vector store and answer question | |
| with st.spinner("Creating embeddings and searching..."): | |
| qa_chain = self.vector_chunker.create_qa_chain(documents, self.llm) | |
| result = qa_chain({"query": user_query}) | |
| # Display results | |
| st.markdown("### π¬ Answer") | |
| st.write(result["result"]) | |
| # Show sources | |
| with st.expander("π Source Documents"): | |
| for i, doc in enumerate(result.get("source_documents", [])): | |
| st.write(f"**Source {i+1}:**") | |
| content = doc.page_content | |
| st.write(content[:500] + "..." if len(content) > 500 else content) | |
| if hasattr(doc, 'metadata') and doc.metadata: | |
| st.write(f"*Metadata: {doc.metadata}*") | |
| st.write("---") | |
| except Exception as e: | |
| st.error(f"An error occurred: {str(e)}") | |
| def render_website_analysis_tab(self): | |
| """Render Website GEO Analysis tab""" | |
| st.header("π Website GEO Analysis") | |
| st.markdown("Analyze websites for Generative Engine Optimization (GEO) performance.") | |
| # URL input | |
| col1, col2 = st.columns([3, 1]) | |
| with col1: | |
| website_url = st.text_input( | |
| "Enter website URL:", | |
| placeholder="https://example.com" | |
| ) | |
| with col2: | |
| max_pages = st.selectbox("Pages to analyze:", [1, 3, 5], index=0) | |
| # Analysis options | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| include_subpages = st.checkbox("Include subpages", value=False) | |
| with col2: | |
| detailed_analysis = st.checkbox("Detailed analysis", value=True) | |
| # Submit button | |
| if st.button("π Analyze Website", key="website_analyze"): | |
| if not website_url.strip(): | |
| st.warning("Please enter a website URL.") | |
| return | |
| try: | |
| # Normalize URL | |
| if not website_url.startswith(('http://', 'https://')): | |
| website_url = 'https://' + website_url | |
| with st.spinner(f"Analyzing website: {website_url}"): | |
| # Parse website content | |
| pages_data = self.webpage_parser.parse_website( | |
| website_url, | |
| max_pages=max_pages, | |
| include_subpages=include_subpages | |
| ) | |
| if not pages_data: | |
| st.error("Could not extract content from the website.") | |
| return | |
| st.success(f"Successfully extracted content from {len(pages_data)} page(s)") | |
| # Analyze GEO scores | |
| with st.spinner("Calculating GEO scores..."): | |
| geo_results = [] | |
| for i, page_data in enumerate(pages_data): | |
| with st.spinner(f"Analyzing page {i+1}/{len(pages_data)}..."): | |
| analysis = self.geo_scorer.analyze_page_geo( | |
| page_data['content'], | |
| page_data['title'], | |
| detailed=detailed_analysis | |
| ) | |
| if not analysis.get('error'): | |
| analysis['page_data'] = page_data | |
| geo_results.append(analysis) | |
| else: | |
| st.warning(f"Could not analyze page {i+1}: {analysis['error']}") | |
| if not geo_results: | |
| st.error("Could not analyze any pages from the website.") | |
| return | |
| # Display results | |
| self.display_geo_results(geo_results, website_url) | |
| # Export functionality | |
| st.markdown("### π₯ Export Results") | |
| if st.button("π Generate Full Report"): | |
| report_data = self.result_exporter.export_geo_results( | |
| geo_results, | |
| website_url | |
| ) | |
| st.download_button( | |
| label="Download GEO Report", | |
| data=json.dumps(report_data, indent=2), | |
| file_name=f"geo_analysis_{website_url.replace('https://', '').replace('/', '_')}.json", | |
| mime="application/json" | |
| ) | |
| except Exception as e: | |
| st.error(f"An error occurred during website analysis: {str(e)}") | |
| def display_geo_results(self, geo_results: List[Dict], website_url: str): | |
| """Display GEO analysis results""" | |
| st.markdown("## π GEO Analysis Results") | |
| # Calculate average scores | |
| avg_scores = self.calculate_average_scores(geo_results) | |
| overall_avg = sum(avg_scores.values()) / len(avg_scores) if avg_scores else 0 | |
| # Main score display | |
| col1, col2, col3 = st.columns([1, 2, 1]) | |
| with col2: | |
| st.metric( | |
| "Overall GEO Score", | |
| f"{overall_avg:.1f}/10", | |
| delta=f"{overall_avg - 7.0:.1f}" if overall_avg != 7.0 else None | |
| ) | |
| # Individual metrics | |
| st.markdown("### π Detailed GEO Metrics") | |
| # First row of metrics | |
| col1, col2, col3, col4 = st.columns(4) | |
| metrics_row1 = [ | |
| ("AI Search Visibility", "ai_search_visibility"), | |
| ("Query Intent Match", "query_intent_matching"), | |
| ("Factual Accuracy", "factual_accuracy"), | |
| ("Conversational Ready", "conversational_readiness") | |
| ] | |
| for i, (display_name, key) in enumerate(metrics_row1): | |
| with [col1, col2, col3, col4][i]: | |
| score = avg_scores.get(key, 0) | |
| st.metric(display_name, f"{score:.1f}") | |
| # Second row of metrics | |
| col1, col2, col3, col4 = st.columns(4) | |
| metrics_row2 = [ | |
| ("Semantic Richness", "semantic_richness"), | |
| ("Context Complete", "context_completeness"), | |
| ("Citation Worthy", "citation_worthiness"), | |
| ("Multi-Query Cover", "multi_query_coverage") | |
| ] | |
| for i, (display_name, key) in enumerate(metrics_row2): | |
| with [col1, col2, col3, col4][i]: | |
| score = avg_scores.get(key, 0) | |
| st.metric(display_name, f"{score:.1f}") | |
| # Recommendations | |
| self.display_recommendations(geo_results) | |
| # Detailed page analysis | |
| with st.expander("π Detailed Page Analysis"): | |
| for i, analysis in enumerate(geo_results): | |
| page_data = analysis.get('page_data', {}) | |
| st.markdown(f"#### Page {i+1}: {page_data.get('title', 'Unknown Title')}") | |
| st.write(f"**URL**: {page_data.get('url', 'Unknown')}") | |
| st.write(f"**Word Count**: {page_data.get('word_count', 0)}") | |
| # Show topics and entities if available | |
| if 'primary_topics' in analysis: | |
| st.write(f"**Topics**: {', '.join(analysis['primary_topics'])}") | |
| if 'entities' in analysis: | |
| st.write(f"**Entities**: {', '.join(analysis['entities'])}") | |
| # Show page-specific scores | |
| if 'geo_scores' in analysis: | |
| scores = analysis['geo_scores'] | |
| score_text = ", ".join([f"{k}: {v:.1f}" for k, v in scores.items()]) | |
| st.write(f"**Scores**: {score_text}") | |
| st.write("---") | |
| def display_recommendations(self, geo_results: List[Dict]): | |
| """Display optimization recommendations""" | |
| st.markdown("### π‘ Optimization Recommendations") | |
| # Collect all recommendations | |
| all_recommendations = [] | |
| all_opportunities = [] | |
| for analysis in geo_results: | |
| all_recommendations.extend(analysis.get('recommendations', [])) | |
| all_opportunities.extend(analysis.get('optimization_opportunities', [])) | |
| # Remove duplicates and display | |
| unique_recommendations = list(set(all_recommendations)) | |
| if unique_recommendations: | |
| for i, rec in enumerate(unique_recommendations[:5], 1): | |
| st.write(f"**{i}.** {rec}") | |
| # Priority opportunities | |
| if all_opportunities: | |
| st.markdown("#### π Priority Optimizations") | |
| high_priority = [opp for opp in all_opportunities if opp.get('priority') == 'high'] | |
| medium_priority = [opp for opp in all_opportunities if opp.get('priority') == 'medium'] | |
| if high_priority: | |
| st.markdown("##### π΄ High Priority") | |
| for opp in high_priority[:3]: | |
| st.write(f"**{opp.get('type', 'Optimization')}**: {opp.get('description', 'No description')}") | |
| if medium_priority: | |
| st.markdown("##### π‘ Medium Priority") | |
| for opp in medium_priority[:3]: | |
| st.write(f"**{opp.get('type', 'Optimization')}**: {opp.get('description', 'No description')}") | |
| def calculate_average_scores(self, geo_results: List[Dict]) -> Dict[str, float]: | |
| """Calculate average GEO scores across all pages""" | |
| if not geo_results: | |
| return {} | |
| # Get all score keys from the first result | |
| score_keys = list(geo_results[0].get('geo_scores', {}).keys()) | |
| avg_scores = {} | |
| for key in score_keys: | |
| scores = [ | |
| result['geo_scores'][key] | |
| for result in geo_results | |
| if 'geo_scores' in result and key in result['geo_scores'] | |
| ] | |
| avg_scores[key] = sum(scores) / len(scores) if scores else 0 | |
| return avg_scores | |
| def save_uploaded_file(self, uploaded_file) -> str: | |
| """Save uploaded file to temporary location""" | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file: | |
| tmp_file.write(uploaded_file.read()) | |
| return tmp_file.name | |
| def render_generate_geo_content_tab(self): | |
| """Tab to generate fresh GEO-optimized content using system prompts""" | |
| st.header("π§ Generate GEO Content") | |
| st.markdown("Use this tool to generate AI-optimized content from scratch based on your topic or query.") | |
| # User input | |
| user_prompt = st.text_area("Describe the content you want (e.g., topic, style, target audience):", height=150) | |
| # Continue chat option | |
| if "chat_history" not in st.session_state: | |
| st.session_state.chat_history = [] | |
| if st.button("π§ Generate Content"): | |
| if not user_prompt.strip(): | |
| st.warning("Please enter a topic or description.") | |
| return | |
| # Add user message to chat history | |
| st.session_state.chat_history.append(HumanMessage(content=user_prompt)) | |
| # Define system prompt for GEO content generation | |
| system_prompt = ( | |
| "You are a Generative Engine Optimization (GEO) content creation specialist. " | |
| "Create content that is highly optimized for AI systems, LLMs, and generative search engines. " | |
| "Ensure the content includes rich semantics, clear structure, relevant keywords, and is suitable for conversational use, citations, and AI summaries." | |
| ) | |
| st.session_state.chat_history.insert(0, SystemMessagePromptTemplate.from_template(system_prompt).format()) | |
| with st.spinner("Generating GEO-optimized content..."): | |
| response = self.llm.invoke(st.session_state.chat_history) | |
| st.session_state.chat_history.append(AIMessage(content=response.content)) | |
| st.success("β Content generated successfully!") | |
| # Display chat history | |
| for msg in st.session_state.chat_history: | |
| if isinstance(msg, HumanMessage): | |
| st.markdown(f"**π§ You:** {msg.content}") | |
| elif isinstance(msg, AIMessage): | |
| st.markdown(f"**π€ Assistant:** {msg.content}") | |
| def main(): | |
| """Main entry point""" | |
| app = GEOSEOApp() | |
| app.run() | |
| if __name__ == "__main__": | |
| main() |