Spaces:
Runtime error
Runtime error
| """ | |
| Main Streamlit Application - GEO SEO AI Optimizer | |
| Entry point for the application with UI components | |
| """ | |
| import streamlit as st | |
| import os | |
| import tempfile | |
| import json | |
| from typing import Dict, Any, List | |
| # Import our custom modules | |
| from utils.parser import PDFParser, TextParser, WebpageParser | |
| from utils.scorer import GEOScorer | |
| from utils.optimizer import ContentOptimizer | |
| from utils.chunker import VectorChunker | |
| from utils.export import ResultExporter | |
| # Import LangChain components | |
| from langchain_groq import ChatGroq | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| import streamlit as st | |
| from utils.lang_utils import detect_language, translate_text | |
| from utils.audio_utils import transcribe_audio | |
| st.title("Multilingual and Voice-to-Text Support") | |
| # Upload options | |
| uploaded_audio = st.file_uploader("Upload Audio (MP3)", type=["mp3"]) | |
| user_text = st.text_area("Or Paste Your Text") | |
| input_text = "" | |
| # If audio is uploaded | |
| if uploaded_audio: | |
| with st.spinner("Transcribing audio..."): | |
| input_text = transcribe_audio(uploaded_audio) | |
| st.success("Audio transcribed successfully!") | |
| st.write("Transcribed Text:") | |
| st.write(input_text) | |
| # If text is given | |
| elif user_text: | |
| input_text = user_text | |
| # If input is received | |
| if input_text: | |
| lang = detect_language(input_text) | |
| st.write(f"Detected Language: `{lang}`") | |
| if lang != "en": | |
| translated = translate_text(input_text) | |
| st.write("π Translated to English:") | |
| st.write(translated) | |
| final_text = translated | |
| else: | |
| final_text = input_text | |
| # Now pass final_text to your main logic (scoring, summarizing, etc.) | |
| st.write("β You can now proceed with the optimized or summarized output.") | |
| class GEOSEOApp: | |
| """Main application class that orchestrates all components""" | |
| def __init__(self): | |
| self.setup_config() | |
| self.setup_models() | |
| self.setup_parsers() | |
| self.setup_components() | |
| def setup_config(self): | |
| """Initialize configuration and API keys""" | |
| self.groq_api_key = os.getenv("GROQ_API_KEY", "your-groq-api-key") | |
| self.hf_api_key = os.getenv("HUGGINGFACE_API_KEY", "your-huggingface-api-key") | |
| # Create data directory if it doesn't exist | |
| os.makedirs("data/uploaded_files", exist_ok=True) | |
| def setup_models(self): | |
| """Initialize LLM and embedding models""" | |
| self.llm = ChatGroq( | |
| api_key=self.groq_api_key, | |
| model_name="llama3-8b-8192", | |
| temperature=0.1 | |
| ) | |
| self.embeddings = HuggingFaceEmbeddings( | |
| model_name="sentence-transformers/all-MiniLM-L6-v2", | |
| cache_folder="./hf_cache", | |
| ) | |
| def setup_parsers(self): | |
| """Initialize content parsers""" | |
| self.pdf_parser = PDFParser() | |
| self.text_parser = TextParser() | |
| self.webpage_parser = WebpageParser() | |
| def setup_components(self): | |
| """Initialize processing components""" | |
| self.geo_scorer = GEOScorer(self.llm) | |
| self.content_optimizer = ContentOptimizer(self.llm) | |
| self.vector_chunker = VectorChunker(self.embeddings) | |
| self.result_exporter = ResultExporter() | |
| def run(self): | |
| """Main application runner""" | |
| st.set_page_config( | |
| page_title="GEO SEO AI Optimizer", | |
| page_icon="π", | |
| layout="wide" | |
| ) | |
| st.title("π GEO SEO AI Optimizer") | |
| st.markdown("*Optimize your content for AI search engines and LLM systems*") | |
| # Sidebar | |
| self.render_sidebar() | |
| # Main tabs | |
| tab1, tab2, tab3 = st.tabs([ | |
| "π Website GEO Analysis", | |
| "π§ Content Enhancement", | |
| "π Document Q&A", | |
| ]) | |
| with tab1: | |
| self.render_website_analysis_tab() | |
| with tab2: | |
| self.render_content_enhancement_tab() | |
| with tab3: | |
| self.render_document_qa_tab() | |
| def render_sidebar(self): | |
| """Render sidebar with information and controls""" | |
| st.sidebar.title("π οΈ GEO Tools") | |
| st.sidebar.markdown("- π Document Q&A with RAG") | |
| st.sidebar.markdown("- π§ Content Enhancement") | |
| st.sidebar.markdown("- π Website GEO Analysis") | |
| st.sidebar.markdown("- π AI-First SEO Scoring") | |
| st.sidebar.markdown("---") | |
| st.sidebar.markdown("### π§ Configuration") | |
| st.sidebar.markdown("Set your API keys:") | |
| st.sidebar.code("export GROQ_API_KEY='your-key'") | |
| st.sidebar.markdown("---") | |
| st.sidebar.markdown("### π GEO Metrics") | |
| st.sidebar.markdown("**AI Search Visibility**: How likely AI engines will surface your content") | |
| st.sidebar.markdown("**Query Intent Matching**: How well content matches user queries") | |
| st.sidebar.markdown("**Conversational Readiness**: Suitability for AI chat responses") | |
| st.sidebar.markdown("**Citation Worthiness**: Probability of being cited by AI") | |
| st.sidebar.markdown("---") | |
| st.sidebar.markdown("### βΉοΈ Components") | |
| st.sidebar.markdown("- **Parser**: Extract content from various sources") | |
| st.sidebar.markdown("- **Scorer**: Analyze GEO performance") | |
| st.sidebar.markdown("- **Optimizer**: Enhance content for AI") | |
| st.sidebar.markdown("- **Chunker**: Create vector embeddings") | |
| st.sidebar.markdown("- **Exporter**: Generate reports") | |
| def render_document_qa_tab(self): | |
| """Render Document Q&A tab""" | |
| st.header("π Document Question Answering") | |
| st.markdown("Upload documents or paste text to ask questions using RAG.") | |
| # File upload | |
| uploaded_file = st.file_uploader("Upload a PDF file", type=["pdf"]) | |
| # Text input | |
| pasted_text = st.text_area("Or paste text directly:", height=150) | |
| # Question input | |
| user_query = st.text_input("Ask a question about the content:") | |
| # Submit button | |
| if st.button("π Ask Question", key="qa_submit"): | |
| if not user_query.strip(): | |
| st.warning("Please enter a question.") | |
| return | |
| try: | |
| # Parse content | |
| documents = [] | |
| if uploaded_file: | |
| with st.spinner("Processing PDF..."): | |
| # Save uploaded file temporarily | |
| temp_path = self.save_uploaded_file(uploaded_file) | |
| documents = self.pdf_parser.parse(temp_path) | |
| os.unlink(temp_path) # Clean up | |
| elif pasted_text.strip(): | |
| with st.spinner("Processing text..."): | |
| documents = self.text_parser.parse(pasted_text) | |
| else: | |
| st.warning("Please upload a PDF or paste some text.") | |
| return | |
| # Create vector store and answer question | |
| with st.spinner("Creating embeddings and searching..."): | |
| qa_chain = self.vector_chunker.create_qa_chain(documents, self.llm) | |
| result = qa_chain({"query": user_query}) | |
| # Display results | |
| st.markdown("### π¬ Answer") | |
| st.write(result["result"]) | |
| # Show sources | |
| with st.expander("π Source Documents"): | |
| for i, doc in enumerate(result.get("source_documents", [])): | |
| st.write(f"**Source {i+1}:**") | |
| content = doc.page_content | |
| st.write(content[:500] + "..." if len(content) > 500 else content) | |
| if hasattr(doc, 'metadata') and doc.metadata: | |
| st.write(f"*Metadata: {doc.metadata}*") | |
| st.write("---") | |
| except Exception as e: | |
| st.error(f"An error occurred: {str(e)}") | |
| def render_content_enhancement_tab(self): | |
| """Render Content Enhancement tab""" | |
| st.header("π§ Content Enhancement") | |
| st.markdown("Analyze and optimize your content for better AI/LLM performance.") | |
| # Content input | |
| input_text = st.text_area( | |
| "Enter content to analyze and enhance:", | |
| height=200, | |
| key="enhancement_input" | |
| ) | |
| # Analysis options | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| analyze_only = st.checkbox("Analysis only (no rewriting)", value=False) | |
| with col2: | |
| include_keywords = st.checkbox("Include keyword suggestions", value=True) | |
| # Submit button | |
| if st.button("π§ Analyze & Enhance", key="enhancement_submit"): | |
| if not input_text.strip(): | |
| st.warning("Please enter some content to analyze.") | |
| return | |
| try: | |
| with st.spinner("Analyzing content..."): | |
| # Run content analysis and optimization | |
| result = self.content_optimizer.optimize_content( | |
| input_text, | |
| analyze_only=analyze_only, | |
| include_keywords=include_keywords | |
| ) | |
| if result.get("error"): | |
| st.error(f"Analysis failed: {result['error']}") | |
| return | |
| # Display results | |
| if analyze_only: | |
| st.success("Content analysis and enhancement completed successfully!") | |
| st.markdown("### π Analysis Results") | |
| # Show scores | |
| scores = result.get("scores", {}) | |
| if scores: | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| clarity = scores.get("clarity", 0) | |
| st.metric("Clarity", f"{clarity}/10") | |
| with col2: | |
| structure = scores.get("structuredness", 0) | |
| st.metric("Structure", f"{structure}/10") | |
| with col3: | |
| answerability = scores.get("answerability", 0) | |
| st.metric("Answerability", f"{answerability}/10") | |
| # Show keywords | |
| keywords = result.get("keywords", []) | |
| if keywords: | |
| st.markdown("#### π Key Terms") | |
| st.write(", ".join(keywords)) | |
| # Show optimized content | |
| optimized_text = result.get("optimized_text", "") | |
| # if optimized_text and not analyze_only: | |
| st.markdown("#### β¨ Optimized Content") | |
| st.text_area( | |
| "Enhanced version:", | |
| value=optimized_text, | |
| height=200, | |
| key="optimized_output" | |
| ) | |
| # Export option | |
| if st.button("π₯ Export Results"): | |
| export_data = self.result_exporter.export_enhancement_results(result) | |
| st.download_button( | |
| label="Download Analysis Report", | |
| data=json.dumps(export_data, indent=2), | |
| file_name=f"content_analysis_{int(time.time())}.json", | |
| mime="application/json" | |
| ) | |
| except Exception as e: | |
| st.error(f"An error occurred: {str(e)}") | |
| def render_website_analysis_tab(self): | |
| """Render Website GEO Analysis tab""" | |
| st.header("π Website GEO Analysis") | |
| st.markdown("Analyze websites for Generative Engine Optimization (GEO) performance.") | |
| # URL input | |
| col1, col2 = st.columns([3, 1]) | |
| with col1: | |
| website_url = st.text_input( | |
| "Enter website URL:", | |
| placeholder="https://example.com" | |
| ) | |
| with col2: | |
| max_pages = st.selectbox("Pages to analyze:", [1, 3, 5], index=0) | |
| # Analysis options | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| include_subpages = st.checkbox("Include subpages", value=False) | |
| with col2: | |
| detailed_analysis = st.checkbox("Detailed analysis", value=True) | |
| # Submit button | |
| if st.button("π Analyze Website", key="website_analyze"): | |
| if not website_url.strip(): | |
| st.warning("Please enter a website URL.") | |
| return | |
| try: | |
| # Normalize URL | |
| if not website_url.startswith(('http://', 'https://')): | |
| website_url = 'https://' + website_url | |
| with st.spinner(f"Analyzing website: {website_url}"): | |
| # Parse website content | |
| pages_data = self.webpage_parser.parse_website( | |
| website_url, | |
| max_pages=max_pages, | |
| include_subpages=include_subpages | |
| ) | |
| if not pages_data: | |
| st.error("Could not extract content from the website.") | |
| return | |
| st.success(f"Successfully extracted content from {len(pages_data)} page(s)") | |
| # Analyze GEO scores | |
| with st.spinner("Calculating GEO scores..."): | |
| geo_results = [] | |
| for i, page_data in enumerate(pages_data): | |
| with st.spinner(f"Analyzing page {i+1}/{len(pages_data)}..."): | |
| analysis = self.geo_scorer.analyze_page_geo( | |
| page_data['content'], | |
| page_data['title'], | |
| detailed=detailed_analysis | |
| ) | |
| if not analysis.get('error'): | |
| analysis['page_data'] = page_data | |
| geo_results.append(analysis) | |
| else: | |
| st.warning(f"Could not analyze page {i+1}: {analysis['error']}") | |
| if not geo_results: | |
| st.error("Could not analyze any pages from the website.") | |
| return | |
| # Display results | |
| self.display_geo_results(geo_results, website_url) | |
| # Export functionality | |
| st.markdown("### π₯ Export Results") | |
| if st.button("π Generate Full Report"): | |
| report_data = self.result_exporter.export_geo_results( | |
| geo_results, | |
| website_url | |
| ) | |
| st.download_button( | |
| label="Download GEO Report", | |
| data=json.dumps(report_data, indent=2), | |
| file_name=f"geo_analysis_{website_url.replace('https://', '').replace('/', '_')}.json", | |
| mime="application/json" | |
| ) | |
| except Exception as e: | |
| st.error(f"An error occurred during website analysis: {str(e)}") | |
| def display_geo_results(self, geo_results: List[Dict], website_url: str): | |
| """Display GEO analysis results""" | |
| st.markdown("## π GEO Analysis Results") | |
| # Calculate average scores | |
| avg_scores = self.calculate_average_scores(geo_results) | |
| overall_avg = sum(avg_scores.values()) / len(avg_scores) if avg_scores else 0 | |
| # Main score display | |
| col1, col2, col3 = st.columns([1, 2, 1]) | |
| with col2: | |
| st.metric( | |
| "Overall GEO Score", | |
| f"{overall_avg:.1f}/10", | |
| delta=f"{overall_avg - 7.0:.1f}" if overall_avg != 7.0 else None | |
| ) | |
| # Individual metrics | |
| st.markdown("### π Detailed GEO Metrics") | |
| # First row of metrics | |
| col1, col2, col3, col4 = st.columns(4) | |
| metrics_row1 = [ | |
| ("AI Search Visibility", "ai_search_visibility"), | |
| ("Query Intent Match", "query_intent_matching"), | |
| ("Factual Accuracy", "factual_accuracy"), | |
| ("Conversational Ready", "conversational_readiness") | |
| ] | |
| for i, (display_name, key) in enumerate(metrics_row1): | |
| with [col1, col2, col3, col4][i]: | |
| score = avg_scores.get(key, 0) | |
| st.metric(display_name, f"{score:.1f}") | |
| # Second row of metrics | |
| col1, col2, col3, col4 = st.columns(4) | |
| metrics_row2 = [ | |
| ("Semantic Richness", "semantic_richness"), | |
| ("Context Complete", "context_completeness"), | |
| ("Citation Worthy", "citation_worthiness"), | |
| ("Multi-Query Cover", "multi_query_coverage") | |
| ] | |
| for i, (display_name, key) in enumerate(metrics_row2): | |
| with [col1, col2, col3, col4][i]: | |
| score = avg_scores.get(key, 0) | |
| st.metric(display_name, f"{score:.1f}") | |
| # Recommendations | |
| self.display_recommendations(geo_results) | |
| # Detailed page analysis | |
| with st.expander("π Detailed Page Analysis"): | |
| for i, analysis in enumerate(geo_results): | |
| page_data = analysis.get('page_data', {}) | |
| st.markdown(f"#### Page {i+1}: {page_data.get('title', 'Unknown Title')}") | |
| st.write(f"**URL**: {page_data.get('url', 'Unknown')}") | |
| st.write(f"**Word Count**: {page_data.get('word_count', 0)}") | |
| # Show topics and entities if available | |
| if 'primary_topics' in analysis: | |
| st.write(f"**Topics**: {', '.join(analysis['primary_topics'])}") | |
| if 'entities' in analysis: | |
| st.write(f"**Entities**: {', '.join(analysis['entities'])}") | |
| # Show page-specific scores | |
| if 'geo_scores' in analysis: | |
| scores = analysis['geo_scores'] | |
| score_text = ", ".join([f"{k}: {v:.1f}" for k, v in scores.items()]) | |
| st.write(f"**Scores**: {score_text}") | |
| st.write("---") | |
| def display_recommendations(self, geo_results: List[Dict]): | |
| """Display optimization recommendations""" | |
| st.markdown("### π‘ Optimization Recommendations") | |
| # Collect all recommendations | |
| all_recommendations = [] | |
| all_opportunities = [] | |
| for analysis in geo_results: | |
| all_recommendations.extend(analysis.get('recommendations', [])) | |
| all_opportunities.extend(analysis.get('optimization_opportunities', [])) | |
| # Remove duplicates and display | |
| unique_recommendations = list(set(all_recommendations)) | |
| if unique_recommendations: | |
| for i, rec in enumerate(unique_recommendations[:5], 1): | |
| st.write(f"**{i}.** {rec}") | |
| # Priority opportunities | |
| if all_opportunities: | |
| st.markdown("#### π Priority Optimizations") | |
| high_priority = [opp for opp in all_opportunities if opp.get('priority') == 'high'] | |
| medium_priority = [opp for opp in all_opportunities if opp.get('priority') == 'medium'] | |
| if high_priority: | |
| st.markdown("##### π΄ High Priority") | |
| for opp in high_priority[:3]: | |
| st.write(f"**{opp.get('type', 'Optimization')}**: {opp.get('description', 'No description')}") | |
| if medium_priority: | |
| st.markdown("##### π‘ Medium Priority") | |
| for opp in medium_priority[:3]: | |
| st.write(f"**{opp.get('type', 'Optimization')}**: {opp.get('description', 'No description')}") | |
| def calculate_average_scores(self, geo_results: List[Dict]) -> Dict[str, float]: | |
| """Calculate average GEO scores across all pages""" | |
| if not geo_results: | |
| return {} | |
| # Get all score keys from the first result | |
| score_keys = list(geo_results[0].get('geo_scores', {}).keys()) | |
| avg_scores = {} | |
| for key in score_keys: | |
| scores = [ | |
| result['geo_scores'][key] | |
| for result in geo_results | |
| if 'geo_scores' in result and key in result['geo_scores'] | |
| ] | |
| avg_scores[key] = sum(scores) / len(scores) if scores else 0 | |
| return avg_scores | |
| def save_uploaded_file(self, uploaded_file) -> str: | |
| """Save uploaded file to temporary location""" | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file: | |
| tmp_file.write(uploaded_file.read()) | |
| return tmp_file.name | |
| def main(): | |
| """Main entry point""" | |
| app = GEOSEOApp() | |
| app.run() | |
| if __name__ == "__main__": | |
| main() |