""" Main Streamlit Application - GEO SEO AI Optimizer Entry point for the application with UI components """ import streamlit as st import os import tempfile import json from typing import Dict, Any, List import time # Add this if not present # Import our custom modules from utils.parser import PDFParser, TextParser, WebpageParser from utils.scorer import GEOScorer from utils.optimizer import ContentOptimizer from utils.chunker import VectorChunker from utils.export import ResultExporter # Import LangChain components from langchain_groq import ChatGroq from langchain_community.embeddings import HuggingFaceEmbeddings class GEOSEOApp: """Main application class that orchestrates all components""" def __init__(self): self.setup_config() self.setup_models() self.setup_parsers() self.setup_components() def setup_config(self): """Initialize configuration and API keys""" self.groq_api_key = os.getenv("GROQ_API_KEY", "your-groq-api-key") self.hf_api_key = os.getenv("HUGGINGFACE_API_KEY", "your-huggingface-api-key") # Create data directory if it doesn't exist os.makedirs("data/uploaded_files", exist_ok=True) def setup_models(self): """Initialize LLM and embedding models""" self.llm = ChatGroq( api_key=self.groq_api_key, model_name="llama3-8b-8192", temperature=0.1 ) # Updated embeddings initialization without the `device` parameter self.embeddings = HuggingFaceEmbeddings( model_name="sentence-transformers/all-MiniLM-L6-v2", model_kwargs={"device": "cpu"}, cache_folder="./hf_cache", ) def setup_parsers(self): """Initialize content parsers""" self.pdf_parser = PDFParser() self.text_parser = TextParser() self.webpage_parser = WebpageParser() def setup_components(self): """Initialize processing components""" self.geo_scorer = GEOScorer(self.llm) self.content_optimizer = ContentOptimizer(self.llm) self.vector_chunker = VectorChunker(self.embeddings) self.result_exporter = ResultExporter() def run(self): """Main application runner""" st.set_page_config( page_title="GEO SEO AI Optimizer", page_icon="🚀", layout="wide" ) st.title("🚀 GEO SEO AI Optimizer") st.markdown("*Optimize your content for AI search engines and LLM systems*") # Sidebar self.render_sidebar() # Main tabs tab1, tab2, tab3 = st.tabs([ "🌐 Website GEO Analysis", "🔧 Content Enhancement", "📄 Document Q&A", ]) with tab1: self.render_website_analysis_tab() with tab2: self.render_content_enhancement_tab() with tab3: self.render_document_qa_tab() def render_sidebar(self): """Render sidebar with information and controls""" st.sidebar.title("đŸ› ī¸ GEO Tools") st.sidebar.markdown("- 📄 Document Q&A with RAG") st.sidebar.markdown("- 🔧 Content Enhancement") st.sidebar.markdown("- 🌐 Website GEO Analysis") st.sidebar.markdown("- 📊 AI-First SEO Scoring") st.sidebar.markdown("---") st.sidebar.markdown("### 🔧 Configuration") st.sidebar.markdown("Set your API keys:") st.sidebar.code("export GROQ_API_KEY='your-key'") st.sidebar.markdown("---") st.sidebar.markdown("### 📖 GEO Metrics") st.sidebar.markdown("**AI Search Visibility**: How likely AI engines will surface your content") st.sidebar.markdown("**Query Intent Matching**: How well content matches user queries") st.sidebar.markdown("**Conversational Readiness**: Suitability for AI chat responses") st.sidebar.markdown("**Citation Worthiness**: Probability of being cited by AI") st.sidebar.markdown("---") st.sidebar.markdown("### â„šī¸ Components") st.sidebar.markdown("- **Parser**: Extract content from various sources") st.sidebar.markdown("- **Scorer**: Analyze GEO performance") st.sidebar.markdown("- **Optimizer**: Enhance content for AI") st.sidebar.markdown("- **Chunker**: Create vector embeddings") st.sidebar.markdown("- **Exporter**: Generate reports") def render_document_qa_tab(self): """Render Document Q&A tab""" st.header("📄 Document Question Answering") st.markdown("Upload documents or paste text to ask questions using RAG.") # File upload uploaded_file = st.file_uploader("Upload a PDF file", type=["pdf"]) # Text input pasted_text = st.text_area("Or paste text directly:", height=150) # Question input user_query = st.text_input("Ask a question about the content:") # Submit button if st.button("🔍 Ask Question", key="qa_submit"): if not user_query.strip(): st.warning("Please enter a question.") return try: # Parse content documents = [] if uploaded_file: with st.spinner("Processing PDF..."): # Save uploaded file temporarily temp_path = self.save_uploaded_file(uploaded_file) documents = self.pdf_parser.parse(temp_path) os.unlink(temp_path) # Clean up elif pasted_text.strip(): with st.spinner("Processing text..."): documents = self.text_parser.parse(pasted_text) else: st.warning("Please upload a PDF or paste some text.") return # Create vector store and answer question with st.spinner("Creating embeddings and searching..."): qa_chain = self.vector_chunker.create_qa_chain(documents, self.llm) result = qa_chain({"query": user_query}) # Display results st.markdown("### đŸ’Ŧ Answer") st.write(result["result"]) # Show sources with st.expander("📄 Source Documents"): for i, doc in enumerate(result.get("source_documents", [])): st.write(f"**Source {i+1}:**") content = doc.page_content st.write(content[:500] + "..." if len(content) > 500 else content) if hasattr(doc, 'metadata') and doc.metadata: st.write(f"*Metadata: {doc.metadata}*") st.write("---") except Exception as e: st.error(f"An error occurred: {str(e)}") def render_content_enhancement_tab(self): """Render Content Enhancement tab with optimization type selector""" st.header("🔧 Content Enhancement") st.markdown("Analyze and optimize your content for better AI/LLM performance.") # Content input input_text = st.text_area( "Enter content to analyze and enhance:", height=200, key="enhancement_input" ) # Optimization type selector st.markdown("### âš™ī¸ Optimization Settings") col1, col2 = st.columns(2) with col1: optimization_type = st.selectbox( "Select Optimization Type:", options=[ "standard", "seo", # "competitive", # "voice_search", # "batch_optimize", # "content_variations", "readability_analysis", # "entity_extraction" ], format_func=lambda x: { "standard": "🔧 Standard Enhancement", "seo": "🌐 SEO-Focused Optimization", # "competitive": "📊 Competitive Analysis", # "voice_search": "🎤 Voice Search Optimization", # "batch_optimize": "đŸ“Ļ Batch Optimization", # "content_variations": "🔄 Content Variations", "readability_analysis": "📖 Readability Analysis", # "entity_extraction": "đŸˇī¸ Entity Extraction" }[x], index=0, help="Choose the type of optimization to apply to your content" ) with col2: # Additional options based on optimization type if optimization_type in ["standard", "seo", "competitive", "readability_analysis"]: analyze_only = st.checkbox("Analysis only (no rewriting)", value=False) include_keywords = st.checkbox("Include keyword suggestions", value=True) # elif optimization_type == "batch_optimize": # st.info("For batch optimization, separate multiple content pieces with '---' in the text area above") # elif optimization_type == "content_variations": # num_variations = st.slider("Number of variations", min_value=1, max_value=5, value=3) else: analyze_only = False include_keywords = True # num_variations = 3 # Show description based on optimization type optimization_descriptions = { "standard": "General content enhancement focusing on clarity, structure, and AI answerability.", "seo": "SEO-focused optimization for AI search engines with semantic keyword analysis.", # "competitive": "Competitive analysis against AI search best practices with gap identification.", # "voice_search": "Optimization for voice search and conversational AI systems.", # "batch_optimize": "Process multiple content pieces simultaneously.", # "content_variations": "Generate multiple optimized variations of the same content.", "readability_analysis": "Detailed readability analysis specifically for AI systems.", # "entity_extraction": "Extract key entities, topics, and concepts for optimization insights." } st.info(f"**{optimization_descriptions[optimization_type]}**") # Submit button if st.button("🚀 Process Content", key="enhancement_submit"): if not input_text.strip(): st.warning("Please enter some content to analyze.") return try: with st.spinner(f"Processing content with {optimization_type} optimization..."): # Handle different optimization types if optimization_type == "standard": result = self.content_optimizer.optimize_content( input_text, analyze_only=analyze_only, include_keywords=include_keywords, optimization_type="standard" ) elif optimization_type == "seo": result = self.content_optimizer.optimize_content( input_text, analyze_only=analyze_only, include_keywords=include_keywords, optimization_type="seo" ) # elif optimization_type == "competitive": # result = self.content_optimizer.optimize_content( # input_text, # optimization_type="competitive" # ) # elif optimization_type == "voice_search": # result = self.content_optimizer.optimize_for_voice_search(input_text) # elif optimization_type == "batch_optimize": # # Split content by '---' separator # content_pieces = [piece.strip() for piece in input_text.split('---') if piece.strip()] # if len(content_pieces) > 1: # result = self.content_optimizer.batch_optimize_content(content_pieces) # else: # st.warning("For batch optimization, please separate content pieces with '---'") # return # elif optimization_type == "content_variations": # result = self.content_optimizer.generate_content_variations( # input_text, # num_variations=num_variations # ) elif optimization_type == "readability_analysis": result = self.content_optimizer.analyze_content_readability(input_text) # elif optimization_type == "entity_extraction": # result = self.content_optimizer.extract_key_entities(input_text) if result.get("error"): st.error(f"Processing failed: {result['error']}") return # Display results based on optimization type self.display_enhancement_results(result, optimization_type, input_text) except Exception as e: st.error(f"An error occurred: {str(e)}") def display_enhancement_results(self, result, optimization_type, original_text): """Display results based on optimization type""" st.success(f"{optimization_type.title()} optimization completed successfully!") # if optimization_type == "batch_optimize": # self.display_batch_results(result) # elif optimization_type == "content_variations": # self.display_variation_results(result) if optimization_type == "readability_analysis": self.display_readability_results(result) # elif optimization_type == "entity_extraction": # self.display_entity_results(result) # elif optimization_type == "voice_search": # self.display_voice_search_results(result) else: self.display_standard_results(result, optimization_type) # Export functionality self.display_export_options(result, optimization_type, original_text) def display_standard_results(self, result, optimization_type): """Display results for standard, SEO, and competitive optimizations""" st.markdown("### 📊 Analysis Results") # Show scores if available scores = result.get("scores", {}) if scores: col1, col2, col3 = st.columns(3) with col1: clarity = scores.get("clarity", 0) st.metric("Clarity", f"{clarity}/10") with col2: structure = scores.get("structuredness", 0) st.metric("Structure", f"{structure}/10") with col3: answerability = scores.get("answerability", 0) st.metric("Answerability", f"{answerability}/10") # Show SEO analysis if available if "seo_analysis" in result: st.markdown("#### 🌐 SEO Analysis") seo_data = result["seo_analysis"] if "readability_score" in seo_data: st.metric("Readability Score", f"{seo_data['readability_score']}/10") if "semantic_gaps" in seo_data: st.write("**Semantic Gaps:**", ", ".join(seo_data["semantic_gaps"])) # Show competitive analysis if available if "competitive_analysis" in result: st.markdown("#### 📊 Competitive Analysis") comp_data = result["competitive_analysis"] for key, value in comp_data.items(): if isinstance(value, list): st.write(f"**{key.replace('_', ' ').title()}:**", ", ".join(value)) else: st.write(f"**{key.replace('_', ' ').title()}:**", value) # Show keywords keywords = result.get("keywords", []) if keywords: st.markdown("#### 🔑 Key Terms") st.write(", ".join(keywords)) # Show optimized content optimized_content = result.get("optimized_text") or result.get("optimized_content", {}).get("enhanced_content", "") if optimized_content: st.markdown("#### ✨ Optimized Content") st.text_area( "Enhanced version:", value=optimized_content, height=200, key="optimized_output" ) # Show recommendations recommendations = result.get("recommendations", []) if recommendations: st.markdown("#### 💡 Recommendations") for i, rec in enumerate(recommendations, 1): st.write(f"**{i}.** {rec}") def display_batch_results(self, results): """Display batch optimization results""" st.markdown("### đŸ“Ļ Batch Processing Results") successful_results = [r for r in results if not r.get('error')] failed_results = [r for r in results if r.get('error')] col1, col2, col3 = st.columns(3) with col1: st.metric("Total Pieces", len(results)) with col2: st.metric("Successful", len(successful_results)) with col3: st.metric("Failed", len(failed_results)) # Show individual results for result in results: idx = result.get('batch_index', 0) st.markdown(f"#### Content Piece {idx + 1}") if result.get('error'): st.error(f"Processing failed: {result['error']}") else: # Show scores scores = result.get("scores", {}) if scores: col1, col2, col3 = st.columns(3) with col1: st.metric("Clarity", f"{scores.get('clarity', 0)}/10") with col2: st.metric("Structure", f"{scores.get('structuredness', 0)}/10") with col3: st.metric("Answerability", f"{scores.get('answerability', 0)}/10") # Show optimized content if available optimized = result.get("optimized_text", "") if optimized: with st.expander("View optimized content"): st.text_area("", value=optimized, height=150, key=f"batch_output_{idx}") st.write("---") def display_variation_results(self, variations): """Display content variation results""" st.markdown("### 🔄 Content Variations") for i, variation in enumerate(variations): if variation.get('error'): st.error(f"Variation {i+1} failed: {variation['error']}") continue variation_type = variation.get('variation_type', f'Variation {i+1}') st.markdown(f"#### {variation_type.title()} Version") # Show variation details target_use_case = variation.get('target_use_case', '') if target_use_case: st.info(f"**Target Use Case:** {target_use_case}") # Show key changes key_changes = variation.get('key_changes', []) if key_changes: st.write("**Key Changes:**") for change in key_changes: st.write(f"â€ĸ {change}") # Show optimized content optimized_content = variation.get('optimized_content', '') if optimized_content: st.text_area( f"{variation_type} content:", value=optimized_content, height=150, key=f"variation_{i}" ) st.write("---") def display_readability_results(self, result): """Display readability analysis results""" st.markdown("### 📖 Readability Analysis") # Basic metrics basic_metrics = result.get('basic_metrics', {}) if basic_metrics: st.markdown("#### 📊 Basic Metrics") col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Total Words", basic_metrics.get('total_words', 0)) with col2: st.metric("Sentences", basic_metrics.get('total_sentences', 0)) with col3: st.metric("Paragraphs", basic_metrics.get('total_paragraphs', 0)) with col4: st.metric("AI Readability", f"{result.get('ai_readability_score', 0)}/10") # Complexity indicators complexity = result.get('complexity_indicators', {}) if complexity: st.markdown("#### đŸŽ¯ Complexity Analysis") col1, col2 = st.columns(2) with col1: st.metric("Long Sentences", f"{complexity.get('long_sentences_percentage', 0):.1f}%") with col2: st.metric("Complex Words", f"{complexity.get('complex_words_percentage', 0):.1f}%") # Recommendations recommendations = result.get('recommendations', []) if recommendations: st.markdown("#### 💡 Readability Recommendations") for i, rec in enumerate(recommendations, 1): st.write(f"**{i}.** {rec}") def display_entity_results(self, result): """Display entity extraction results""" st.markdown("### đŸˇī¸ Entity Analysis") # Named entities named_entities = result.get('named_entities', []) if named_entities: st.markdown("#### đŸ‘Ĩ Named Entities") st.write(", ".join(named_entities)) # Key topics key_topics = result.get('key_topics', []) if key_topics: st.markdown("#### 📋 Key Topics") st.write(", ".join(key_topics)) # Technical terms technical_terms = result.get('technical_terms', []) if technical_terms: st.markdown("#### 🔧 Technical Terms") st.write(", ".join(technical_terms)) # Semantic keywords semantic_keywords = result.get('semantic_keywords', []) if semantic_keywords: st.markdown("#### 🔍 Semantic Keywords") st.write(", ".join(semantic_keywords)) # Question opportunities questions = result.get('question_opportunities', []) if questions: st.markdown("#### ❓ Question Opportunities") for q in questions: st.write(f"â€ĸ {q}") # def display_voice_search_results(self, result): # """Display voice search optimization results""" # st.markdown("### 🎤 Voice Search Optimization") # # Conversational score # conv_score = result.get('conversational_score', 0) # if conv_score: # st.metric("Conversational Score", f"{conv_score}/10") # # Question-answer pairs # qa_pairs = result.get('question_answer_pairs', []) # if qa_pairs: # st.markdown("#### ❓ Question-Answer Pairs") # for qa in qa_pairs: # st.write(f"**Q:** {qa.get('question', '')}") # st.write(f"**A:** {qa.get('answer', '')}") # st.write("---") # # Featured snippet candidates # snippets = result.get('featured_snippet_candidates', []) # if snippets: # st.markdown("#### 🌟 Featured Snippet Candidates") # for i, snippet in enumerate(snippets, 1): # st.write(f"**{i}.** {snippet}") # # Voice optimized content # voice_content = result.get('voice_optimized_content', '') # if voice_content: # st.markdown("#### 🎤 Voice-Optimized Content") # st.text_area("Conversational version:", value=voice_content, height=200, key="voice_output") def display_export_options(self, result, optimization_type, original_text): """Display export options for results""" st.markdown("### đŸ“Ĩ Export Results") # Prepare export data export_data = { 'timestamp': time.time(), 'optimization_type': optimization_type, 'original_text': original_text, 'original_word_count': len(original_text.split()), 'results': result } # Serialize data to JSON export_json = json.dumps(export_data, indent=2) # Add download button for direct download st.download_button( label="đŸ“Ĩ Download Analysis Report", data=export_json, file_name=f"{optimization_type}_analysis_{int(time.time())}.json", mime="application/json" ) def render_website_analysis_tab(self): """Render Website GEO Analysis tab""" st.header("🌐 Website GEO Analysis") st.markdown("Analyze websites for Generative Engine Optimization (GEO) performance.") # URL input col1, col2 = st.columns([3, 1]) with col1: website_url = st.text_input( "Enter website URL:", placeholder="https://example.com" ) with col2: max_pages = st.selectbox("Pages to analyze:", [1, 3, 5], index=0) # Analysis options col1, col2 = st.columns(2) with col1: include_subpages = st.checkbox("Include subpages", value=False) with col2: detailed_analysis = st.checkbox("Detailed analysis", value=True) # Submit button if st.button("🌐 Analyze Website", key="website_analyze"): if not website_url.strip(): st.warning("Please enter a website URL.") return try: # Normalize URL if not website_url.startswith(('http://', 'https://')): website_url = 'https://' + website_url with st.spinner(f"Analyzing website: {website_url}"): # Parse website content pages_data = self.webpage_parser.parse_website( website_url, max_pages=max_pages, include_subpages=include_subpages ) if not pages_data: st.error("Could not extract content from the website.") return st.success(f"Successfully extracted content from {len(pages_data)} page(s)") # Analyze GEO scores with st.spinner("Calculating GEO scores..."): geo_results = [] for i, page_data in enumerate(pages_data): with st.spinner(f"Analyzing page {i+1}/{len(pages_data)}..."): analysis = self.geo_scorer.analyze_page_geo( page_data['content'], page_data['title'], detailed=detailed_analysis ) if not analysis.get('error'): analysis['page_data'] = page_data geo_results.append(analysis) else: st.warning(f"Could not analyze page {i+1}: {analysis['error']}") if not geo_results: st.error("Could not analyze any pages from the website.") return # Display results self.display_geo_results(geo_results, website_url) # Export functionality st.markdown("### đŸ“Ĩ Export Results") if st.button("📊 Generate Full Report"): report_data = self.result_exporter.export_geo_results( geo_results, website_url ) st.download_button( label="Download GEO Report", data=json.dumps(report_data, indent=2), file_name=f"geo_analysis_{website_url.replace('https://', '').replace('/', '_')}.json", mime="application/json" ) except Exception as e: st.error(f"An error occurred during website analysis: {str(e)}") def display_geo_results(self, geo_results: List[Dict], website_url: str): """Display GEO analysis results""" st.markdown("## 📊 GEO Analysis Results") # Calculate average scores avg_scores = self.calculate_average_scores(geo_results) overall_avg = sum(avg_scores.values()) / len(avg_scores) if avg_scores else 0 # Main score display col1, col2, col3 = st.columns([1, 2, 1]) with col2: st.metric( "Overall GEO Score", f"{overall_avg:.1f}/10", delta=f"{overall_avg - 7.0:.1f}" if overall_avg != 7.0 else None ) # Individual metrics st.markdown("### 📈 Detailed GEO Metrics") # First row of metrics col1, col2, col3, col4 = st.columns(4) metrics_row1 = [ ("AI Search Visibility", "ai_search_visibility"), ("Query Intent Match", "query_intent_matching"), ("Factual Accuracy", "factual_accuracy"), ("Conversational Ready", "conversational_readiness") ] for i, (display_name, key) in enumerate(metrics_row1): with [col1, col2, col3, col4][i]: score = avg_scores.get(key, 0) st.metric(display_name, f"{score:.1f}") # Second row of metrics col1, col2, col3, col4 = st.columns(4) metrics_row2 = [ ("Semantic Richness", "semantic_richness"), ("Context Complete", "context_completeness"), ("Citation Worthy", "citation_worthiness"), ("Multi-Query Cover", "multi_query_coverage") ] for i, (display_name, key) in enumerate(metrics_row2): with [col1, col2, col3, col4][i]: score = avg_scores.get(key, 0) st.metric(display_name, f"{score:.1f}") # Recommendations self.display_recommendations(geo_results) # Detailed page analysis with st.expander("📋 Detailed Page Analysis"): for i, analysis in enumerate(geo_results): page_data = analysis.get('page_data', {}) st.markdown(f"#### Page {i+1}: {page_data.get('title', 'Unknown Title')}") st.write(f"**URL**: {page_data.get('url', 'Unknown')}") st.write(f"**Word Count**: {page_data.get('word_count', 0)}") # Show topics and entities if available if 'primary_topics' in analysis: st.write(f"**Topics**: {', '.join(analysis['primary_topics'])}") if 'entities' in analysis: st.write(f"**Entities**: {', '.join(analysis['entities'])}") # Show page-specific scores if 'geo_scores' in analysis: scores = analysis['geo_scores'] score_text = ", ".join([f"{k}: {v:.1f}" for k, v in scores.items()]) st.write(f"**Scores**: {score_text}") st.write("---") def display_recommendations(self, geo_results: List[Dict]): """Display optimization recommendations""" st.markdown("### 💡 Optimization Recommendations") # Collect all recommendations all_recommendations = [] all_opportunities = [] for analysis in geo_results: all_recommendations.extend(analysis.get('recommendations', [])) all_opportunities.extend(analysis.get('optimization_opportunities', [])) # Remove duplicates and display unique_recommendations = list(set(all_recommendations)) if unique_recommendations: for i, rec in enumerate(unique_recommendations[:5], 1): st.write(f"**{i}.** {rec}") # Priority opportunities if all_opportunities: st.markdown("#### 🚀 Priority Optimizations") high_priority = [opp for opp in all_opportunities if opp.get('priority') == 'high'] medium_priority = [opp for opp in all_opportunities if opp.get('priority') == 'medium'] if high_priority: st.markdown("##### 🔴 High Priority") for opp in high_priority[:3]: st.write(f"**{opp.get('type', 'Optimization')}**: {opp.get('description', 'No description')}") if medium_priority: st.markdown("##### 🟡 Medium Priority") for opp in medium_priority[:3]: st.write(f"**{opp.get('type', 'Optimization')}**: {opp.get('description', 'No description')}") def calculate_average_scores(self, geo_results: List[Dict]) -> Dict[str, float]: """Calculate average GEO scores across all pages""" if not geo_results: return {} # Get all score keys from the first result score_keys = list(geo_results[0].get('geo_scores', {}).keys()) avg_scores = {} for key in score_keys: scores = [ result['geo_scores'][key] for result in geo_results if 'geo_scores' in result and key in result['geo_scores'] ] avg_scores[key] = sum(scores) / len(scores) if scores else 0 return avg_scores def save_uploaded_file(self, uploaded_file) -> str: """Save uploaded file to temporary location""" with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file: tmp_file.write(uploaded_file.read()) return tmp_file.name def main(): """Main entry point""" app = GEOSEOApp() app.run() if __name__ == "__main__": main()