import streamlit as st import requests from bs4 import BeautifulSoup, Comment from googlesearch import search from fake_useragent import UserAgent from transformers import pipeline, AutoTokenizer import torch import time import logging import re from retrying import retry import gc # --- Configuration --- # Model Options (Ensure keys clearly indicate resource needs) MODEL_OPTIONS = { # Lighter Models (More likely to work on free tiers) "Mistral-7B-Instruct (Fast, Med RAM)": "mistralai/Mistral-7B-Instruct-v0.2", "Gemma-7B-IT (Google, Med RAM)": "google/gemma-7b-it", "Phi-3-Mini-4k-Instruct (Microsoft, Small, Good)": "microsoft/Phi-3-mini-4k-instruct", # Requires trust_remote_code # Medium Models (May require upgraded tiers / more RAM/GPU) "Llama-3-8B-Instruct (Meta, High Quality, High RAM/GPU)": "meta-llama/Meta-Llama-3-8B-Instruct", "Phi-3-Medium-4k-Instruct (Microsoft, Strong, High RAM/GPU)": "microsoft/Phi-3-medium-4k-instruct", # Requires trust_remote_code "Qwen1.5-14B-Chat (Alibaba, Strong, High RAM/GPU)": "Qwen/Qwen1.5-14B-Chat", # Larger Models (Very likely require significant paid resources) "DeepSeek-Coder-V2-Instruct (DeepSeek, High RAM/GPU)": "deepseek-ai/DeepSeek-Coder-V2-Instruct", # Requires trust_remote_code } DEFAULT_MODEL_KEY = "Mistral-7B-Instruct (Fast, Med RAM)" # Start with a lighter default selection # Scraping & Generation Defaults DEFAULT_NUM_RESULTS = 4 # Reduced default slightly REQUEST_TIMEOUT = 15 MAX_COMPETITOR_TEXT_LENGTH = 5500 DEFAULT_MAX_GENERATION_TOKENS = 2800 # Retry settings RETRY_WAIT_FIXED = 2000 RETRY_STOP_MAX_ATTEMPT = 3 # Tone & Audience Options TONE_OPTIONS = ["Conversational", "Professional", "Authoritative", "Technical", "Friendly", "Engaging", "Educational", "Persuasive"] AUDIENCE_OPTIONS = ["Beginners", "General Audience", "Experts", "Professionals (Specific Field)", "Customers", "Students", "Decision Makers"] # --- Logging Setup --- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - [%(funcName)s] - %(message)s') logger = logging.getLogger(__name__) # --- State Management --- # Initialize session state keys carefully if 'current_model_pipeline' not in st.session_state: st.session_state.current_model_pipeline = None if 'current_model_id' not in st.session_state: st.session_state.current_model_id = "" # Data related state if 'scraped_urls' not in st.session_state: st.session_state.scraped_urls = [] if 'competitor_analysis_text' not in st.session_state: st.session_state.competitor_analysis_text = "" if 'generated_content' not in st.session_state: st.session_state.generated_content = "" if 'internal_link_suggestions' not in st.session_state: st.session_state.internal_link_suggestions = "" if 'last_keyword' not in st.session_state: st.session_state.last_keyword = "" if 'last_website_url' not in st.session_state: st.session_state.last_website_url = "" if '_internal_last_scrape_keyword' not in st.session_state: st.session_state._internal_last_scrape_keyword = "" # --- Helper Functions --- def clear_gpu_memory(): """Attempts to clear GPU memory cache and run garbage collection.""" logger.info("Attempting to clear GPU memory...") if torch.cuda.is_available(): try: st.session_state.current_model_pipeline = None # Ensure reference is removed FIRST gc.collect() # Run Python garbage collection torch.cuda.empty_cache() # Tell PyTorch to release cached memory gc.collect() # Run GC again logger.info("GPU memory cache cleared and garbage collected.") st.toast("Cleared GPU memory.", icon="🧹") except Exception as e: logger.error(f"Error clearing GPU memory: {e}", exc_info=True) st.toast(f"Error clearing GPU memory: {e}", icon="❌") else: logger.info("No GPU available, skipping memory clearing.") st.session_state.current_model_pipeline = None # Still clear the reference gc.collect() def reset_app_data(): """Clears stored scraping and generation results, keeps model loaded.""" st.session_state.scraped_urls = [] st.session_state.competitor_analysis_text = "" st.session_state.generated_content = "" st.session_state.internal_link_suggestions = "" st.session_state.last_keyword = "" st.session_state._internal_last_scrape_keyword = "" logger.info("App data state reset (scraped/generated content).") st.toast("Cleared scraped data and generated content.", icon="πŸ—‘οΈ") # --- Model Loading (On Demand) --- def load_model(model_id_to_load): """Loads the selected model, unloading any previous one.""" # If the requested model is already loaded, do nothing if st.session_state.get('current_model_id') == model_id_to_load and st.session_state.get('current_model_pipeline') is not None: logger.info(f"Model {model_id_to_load} is already loaded.") st.toast(f"{model_id_to_load} is already loaded.", icon="βœ…") return True # Unload previous model if one exists and is different if st.session_state.get('current_model_pipeline') is not None: logger.info(f"Unloading previous model: {st.session_state.current_model_id}") st.toast(f"Unloading {st.session_state.current_model_id}...", icon="🧹") clear_gpu_memory() # This sets pipeline to None and clears cache st.session_state.current_model_id = "" # Clear model ID state # Load the new model st.toast(f"Loading {model_id_to_load}... This may take time & RAM/GPU.", icon="⏳") logger.info(f"Attempting to load LLM pipeline for model: {model_id_to_load}") pipeline_instance = None success = False try: dtype = torch.bfloat16 if torch.cuda.is_available() and torch.cuda.is_bf16_supported() else torch.float16 if torch.cuda.is_available() else torch.float32 logger.info(f"Using dtype: {dtype}") trust_code_models = [ "microsoft/Phi-3-mini-4k-instruct", "microsoft/Phi-3-medium-4k-instruct", "deepseek-ai/DeepSeek-Coder-V2-Instruct", # Add others if needed ] trust_code = model_id_to_load in trust_code_models logger.info(f"Trust remote code for {model_id_to_load}: {trust_code}") # Display spinner during the actual loading with st.spinner(f"Loading {model_id_to_load} into memory..."): pipeline_instance = pipeline( "text-generation", model=model_id_to_load, trust_remote_code=trust_code, device_map="auto", torch_dtype=dtype, ) # Handle pad_token if pipeline_instance.tokenizer.pad_token_id is None: pipeline_instance.tokenizer.pad_token_id = pipeline_instance.tokenizer.eos_token_id if hasattr(pipeline_instance.model, 'config'): pipeline_instance.model.config.pad_token_id = pipeline_instance.tokenizer.eos_token_id logger.warning(f"Set pad_token_id to eos_token_id for {model_id_to_load}") logger.info(f"LLM pipeline loaded successfully for {model_id_to_load}.") st.session_state.current_model_pipeline = pipeline_instance st.session_state.current_model_id = model_id_to_load st.toast(f"Model {model_id_to_load} loaded!", icon="βœ…") success = True except ImportError as e: logger.error(f"ImportError loading {model_id_to_load}: {e}. Missing dependency?", exc_info=True) st.error(f"Load Error: Missing library for {model_id_to_load}? Check logs. Details: {e}") except Exception as e: logger.error(f"Failed to load {model_id_to_load}: {e}", exc_info=True) st.error(f"Failed to load {model_id_to_load}. Error: {e}. Check resource limits (RAM/GPU) & logs.") clear_gpu_memory() # Attempt to clean up if loading failed st.session_state.current_model_id = "" # Ensure state reflects failure finally: return success # Return status # --- User Agent Caching --- @st.cache_resource def get_user_agent(): # (Same as previous version) logger.info("Initializing FakeUserAgent.") try: return UserAgent(fallback='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36') except Exception as e: logger.error(f"Failed to initialize FakeUserAgent: {e}", exc_info=True) st.error(f"Could not initialize User Agent generator. Error: {e}") return None # --- Core Functions (Scraping, Prompt Building, Generation Logic) --- # These functions (get_top_urls, scrape_page_content, clean_text, fetch_url_content, # build_content_generation_prompt, build_internal_link_prompt, run_llm_generation) # remain largely the same as the previous version, as they were already quite robust. # Ensure `run_llm_generation` correctly uses the pipeline passed to it (which it did). # --- (Include the definitions for the core functions here - unchanged from previous version) --- @retry(wait_fixed=RETRY_WAIT_FIXED, stop_max_attempt_number=RETRY_STOP_MAX_ATTEMPT, retry_on_exception=lambda e: isinstance(e, (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.HTTPError))) def fetch_url_content(url, headers): logger.info(f"Fetching {url} (Attempt {fetch_url_content.retry.attempt_number+1}/{RETRY_STOP_MAX_ATTEMPT})") response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT) response.raise_for_status() if 'text/html' not in response.headers.get('Content-Type', ''): logger.warning(f"Skipping URL {url} - Not HTML") return None if len(response.content) > 10 * 1024 * 1024: # 10 MB limit logger.warning(f"Skipping URL {url} - Content too large") return None return response def clean_text(text): text = re.sub(r'\s{2,}', ' ', text) text = re.sub(r'\n+', '\n', text) lines = text.split('\n') cleaned_lines = [] min_line_length = 20 min_words_per_line = 3 skip_phrases = [ 'copyright Β©', 'all rights reserved', 'privacy policy', 'terms of use', 'terms and conditions', 'cookie policy', 'subscribe', 'sign up', 'log in', 'advertisement', 'share this', 'related posts', 'leave a reply', 'comment', 'posted on', 'by author', 'tags:', 'categories:', 'follow us', 'read more', 'click here', 'learn more', 'next article', 'previous article', 'you may also like', 'related topics' ] for line in lines: stripped_line = line.strip() lower_line = stripped_line.lower() if len(stripped_line) >= min_line_length and \ len(stripped_line.split()) >= min_words_per_line and \ not any(phrase in lower_line for phrase in skip_phrases): cleaned_lines.append(stripped_line) text = '\n'.join(cleaned_lines) return text.strip() def scrape_page_content(url, user_agent, scrape_status_ui): if not user_agent: logger.error("User Agent missing."); return "" headers = { 'User-Agent': user_agent.random, 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Referer': 'https://www.google.com/', 'DNT': '1', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1' } try: response = fetch_url_content(url, headers) if response is None: scrape_status_ui.warning(f"⚠️ Skip/Fail fetch: {url}", icon="πŸ•ΈοΈ"); return "" soup = BeautifulSoup(response.content, 'lxml') tags_to_remove = ["script", "style", "nav", "footer", "aside", "form", "header", "noscript", "button", "input", "select", "textarea", "figure", "figcaption", "iframe", "svg", "path", "meta", "link"] for element in soup(tags_to_remove): element.decompose() for comment in soup.find_all(string=lambda text: isinstance(text, Comment)): comment.extract() main_content = (soup.find('main') or soup.find('article') or soup.find(role='main') or soup.find('div', class_=re.compile(r'(content|main|body|post|entry|article)', re.I)) or soup.find('div', id=re.compile(r'(content|main|body|post|entry|article)', re.I))) target_soup = main_content if main_content else soup.body if not target_soup: logger.warning(f"No body/main: {url}"); scrape_status_ui.warning(f"⚠️ No body/main: {url}", icon="πŸ•ΈοΈ"); return "" texts = target_soup.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'td', 'th', 'blockquote', 'span']) content_parts = [] for elem in texts: if elem.find_parent(tags_to_remove): continue elem_text = elem.get_text(separator=' ', strip=True) if len(elem_text) > 10 and len(elem_text.split()) > 1: if elem.name in ['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'blockquote', 'tr', 'div']: # Added div for structure content_parts.append(elem_text + "\n") else: content_parts.append(elem_text + " ") content = "".join(content_parts) cleaned_content = clean_text(content) if len(cleaned_content) < 150: logger.warning(f"Low content ({len(cleaned_content)} chars): {url}"); scrape_status_ui.warning(f"⚠️ Low content: {url}", icon="πŸ•ΈοΈ") else: logger.info(f"Scraped {len(cleaned_content)} chars: {url}"); scrape_status_ui.success(f"βœ… Scraped: {url} ({len(cleaned_content)} chars)", icon="πŸ•ΈοΈ") time.sleep(0.6) return cleaned_content except requests.exceptions.RequestException as e: logger.warning(f"Final scrape fail: {url}. Err: {e}"); scrape_status_ui.error(f"❌ Fail scrape: {url} ({e})", icon="πŸ•ΈοΈ"); return "" except Exception as e: logger.error(f"Unexpected scrape error: {url}: {e}", exc_info=True); scrape_status_ui.error(f"❌ Error scraping: {url} (Logs)", icon="πŸ•ΈοΈ"); return "" def get_top_urls(keyword, num_results): logger.info(f"Fetching top {num_results} URLs for keyword: '{keyword}'") try: urls = list(search(keyword, num_results=num_results, sleep_interval=2.5, lang="en", timeout=15)) logger.info(f"Found URLs: {urls}") if not urls: st.warning(f"⚠️ No Google search results found for '{keyword}'."); return [] return urls except Exception as e: error_message = str(e); logger.error(f"GSearch Error: {error_message}", exc_info=True) if "429" in error_message: st.error(f"❌ Google search blocked (429). WAIT before retrying.") elif "timed out" in error_message: st.error(f"❌ Google search timed out.") else: st.error(f"❌ GSearch Error: {error_message[:100]}...") return [] def build_content_generation_prompt(keyword, competitor_texts, tone, audience, model_id): logger.info(f"Build content gen prompt. Tone: {tone}, Audience: {audience}. Comp length: {len(competitor_texts)}") if len(competitor_texts) > MAX_COMPETITOR_TEXT_LENGTH: competitor_summary = competitor_texts[:MAX_COMPETITOR_TEXT_LENGTH] + "... [Truncated]" logger.warning(f"Comp text truncated.") else: competitor_summary = competitor_texts system_prompt = f"""You are an expert SEO Content Strategist & world-class Copywriter. Task: Analyze competitor text & generate a significantly superior, comprehensive, user-first article for keyword '{keyword}', targeting '{audience}' audience with '{tone}' tone. Focus on quality, depth, clarity, fulfilling user intent better than competition.""" user_prompt = f"""**Keyword:** "{keyword}" **Audience:** {audience} **Tone:** {tone} **Objective:** Generate exceptional, SEO-optimized article for "{keyword}" designed to outperform top content via superior value, insights, UX. **Competitor Analysis Context (Analyze for topics, depth, strengths, WEAKNESSES/GAPS):** --- BEGIN COMPETITOR --- {competitor_summary} --- END COMPETITOR --- **Content Gen Instructions:** 1. **Value & Depth:** Be demonstrably better. Deeper, clearer, actionable advice, unique perspectives/data, fill gaps. Address user intent exhaustively. 2. **User-First & Humanized:** Write for '{audience}' in '{tone}'. Clear, concise, short paras, varied sentences, engaging Qs. Logical flow, readable. 3. **Structure (Strict Markdown):** Compelling H2 Title. Engaging Intro (50-150 words): Hook, purpose/value, outline. Logical Sections (H2)/Sub-sections (H3): Descriptive, keyword-aware headings. Readability: Bullets (`* `), Numbered lists (`1. `), **Bold** (strategic). Comprehensive Body: Expand beyond competitors. Strong Conclusion: Summarize takeaways, final insight/CTA. 4. **SEO (Natural):** Weave "{keyword}" & LSI terms into title, headings, intro, body, conclusion. Prioritize relevance/clarity over density. NO keyword stuffing. 5. **Originality & Credibility:** 100% unique. Use comp text ONLY for analysis. NO plagiarism. Factual accuracy. 6. **Negative Constraints:** DO NOT: Rehash competitors; use preambles/sign-offs; use excessive jargon (unless 'Experts'); write long paragraphs; stuff keywords; invent facts. **Output:** ONLY the Markdown article, starting with H2 title.""" messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}] logger.info(f"Content prompt done for {model_id}.") return messages def build_internal_link_prompt(generated_content, keyword, website_url): logger.info(f"Build internal link prompt for URL: {website_url}") system_prompt = "You are an SEO assistant specialized in identifying internal linking opportunities." user_prompt = f"""**Website Base URL:** {website_url} **Main Topic of Article:** "{keyword}" **Task:** Review the article below. Identify 3-5 phrases/sentences for internal links relevant to {website_url}. **For each opportunity, provide:** 1. Exact anchor text phrase/sentence from article. 2. Brief description of the *type* of relevant content needed (e.g., "detailed guide on [sub-topic]", "service page for [service]"). **IMPORTANT:** Do NOT invent URLs. Describe the *type* of page. Choose natural anchor text. Focus on value. Format as Markdown numbered list. **Article Content (Analyze first ~8000 chars):** --- BEGIN ARTICLE --- {generated_content[:8000]} --- END ARTICLE ---""" messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}] return messages def run_llm_generation(pipe, messages, max_tokens): if pipe is None: st.error("❌ LLM Pipeline missing."); return None model_id = pipe.model.name_or_path logger.info(f"Running generation: {model_id}. Max tokens: {max_tokens}.") start_time = time.time() try: gen_args = {"max_new_tokens": max_tokens, "temperature": 0.7, "top_p": 0.95, "top_k": 40, "do_sample": True, "pad_token_id": pipe.tokenizer.eos_token_id, "eos_token_id": pipe.tokenizer.eos_token_id} logger.info(f"Gen args: {gen_args}") results = pipe(messages, **gen_args) # --- Robust Extraction --- assistant_response = None if results and results[0] and 'generated_text' in results[0]: output_data = results[0]['generated_text'] if isinstance(output_data, list): assistant_message = next((msg['content'] for msg in reversed(output_data) if msg['role'] == 'assistant'), None); assistant_response = assistant_message elif isinstance(output_data, str): last_prompt_content = messages[-1]['content'] last_prompt_index = output_data.rfind(last_prompt_content) if last_prompt_index != -1: potential_response = output_data[last_prompt_index + len(last_prompt_content):].strip() else: potential_response = output_data assistant_response = re.sub(r"^(assistant|ASSISTANT||<\|im_end\|>|<\|assistant\|>)\s*[:\n]*", "", potential_response, flags=re.IGNORECASE | re.DOTALL).strip() else: logger.error(f"Unexpected output format: {type(output_data)}") else: logger.error(f"Unexpected LLM output structure: {results}") # --- Validation --- if assistant_response: duration = time.time() - start_time; logger.info(f"Gen success ({model_id}) {duration:.2f}s. Len: {len(assistant_response)}.") assistant_response = re.sub(r"^```markdown\n", "", assistant_response).strip(); assistant_response = re.sub(r"\n```$", "", assistant_response).strip() if len(assistant_response) < 30: logger.warning(f"Gen output very short ({len(assistant_response)})."); st.warning("⚠️ Gen output very short.") return assistant_response else: logger.error(f"Failed parse assistant response. Output: {results}"); st.error("❌ Failed parse LLM response. Check logs."); return None except torch.cuda.OutOfMemoryError: logger.error(f"OOM Error ({model_id})!", exc_info=True); st.error(f"❌ OOM Error ({model_id}). Try smaller model/less tokens/restart."); clear_gpu_memory(); return None except Exception as e: logger.error(f"Unhandled gen error ({model_id}): {e}", exc_info=True); st.error(f"❌ Unexpected gen error: {e}"); return None # --- Streamlit App UI --- st.set_page_config(layout="wide", page_title="On-Demand SEO Content Gen") # --- Sidebar --- with st.sidebar: st.header("βš™οΈ Configuration") # Model Selection & Loading Area st.subheader("1. Select & Load Model") selected_model_key = st.selectbox( "Choose Language Model:", options=list(MODEL_OPTIONS.keys()), index=list(MODEL_OPTIONS.keys()).index(DEFAULT_MODEL_KEY), key="model_selector", # Key for potential state access help="Choose AI model. Performance & resources vary. Load required." ) selected_model_id = MODEL_OPTIONS[selected_model_key] # Display current status and load button load_button_placeholder = st.empty() # Placeholder for dynamic button text/state model_status_placeholder = st.empty() # Placeholder for status message if st.session_state.get('current_model_id') == selected_model_id and st.session_state.get('current_model_pipeline') is not None: model_status_placeholder.success(f"βœ… Loaded: `{selected_model_id}`") load_button_text = f"Switch from {selected_model_key}" # Or "Reload" elif st.session_state.get('current_model_pipeline') is not None: model_status_placeholder.warning(f"⚠️ Loaded: `{st.session_state.current_model_id}`\nSelected: `{selected_model_id}`") load_button_text = f"Unload Current & Load {selected_model_key}" else: model_status_placeholder.info("ℹ️ No model loaded.") load_button_text = f"Load Selected: {selected_model_key}" if load_button_placeholder.button(load_button_text, key="load_model"): load_model(selected_model_id) # Rerun to update status placeholders immediately after load attempt st.rerun() st.markdown("---") # Content Settings st.subheader("2. Content Settings") with st.expander("Adjust Content Parameters", expanded=False): num_results = st.slider("Competitors to Analyze:", min_value=1, max_value=8, value=DEFAULT_NUM_RESULTS, step=1) selected_tone = st.selectbox("Content Tone:", options=TONE_OPTIONS, index=TONE_OPTIONS.index("Engaging")) selected_audience = st.selectbox("Target Audience:", options=AUDIENCE_OPTIONS, index=AUDIENCE_OPTIONS.index("General Audience")) max_gen_tokens = st.number_input("Max Generation Tokens:", min_value=500, max_value=8192, value=DEFAULT_MAX_GENERATION_TOKENS, step=100) # Internal Linking st.subheader("3. Internal Linking (Optional)") with st.expander("Configure Link Suggestions", expanded=False): website_url = st.text_input("Your Website URL:", placeholder="https://www.example.com", value=st.session_state.get("last_website_url", ""), key="website_url_input") # Update state immediately on change if needed, or just read before use st.session_state.last_website_url = website_url st.markdown("---") st.header("ℹ️ App Info & Actions") st.info(f""" - **Status:** {'Model Loaded' if st.session_state.current_model_pipeline else 'No Model Loaded'} - **Competitors:** Top {num_results} - **Max Generation:** ~{max_gen_tokens} tokens """) st.warning(""" - **Load Model First:** Select a model and click 'Load' before generating. - **Resource Use:** Models need significant RAM/GPU. Loading WILL fail if resources are insufficient. - **Review Output:** AI provides drafts. ALWAYS review, edit, fact-check. """) if st.button("Clear Scraped/Generated Data", key="clear_data"): reset_app_data() # --- Main App Area --- st.title("✨ On-Demand SEO Content Generator ✨") st.markdown(f"Load your chosen AI model, then generate SEO-focused content.") # User Input Area st.subheader("Keyword & Generation") keyword = st.text_input("Enter Primary Target Keyword:", placeholder="e.g., vertical hydroponics guide", value=st.session_state.get("last_keyword", ""), key="keyword_input") # Disable button if model not loaded generate_button_disabled = st.session_state.current_model_pipeline is None generate_button_help = "Load a model from the sidebar first." if generate_button_disabled else "Analyze competitors and generate article." analyze_button = st.button( "Analyze Competitors & Generate Content", type="primary", key="generate_button", disabled=generate_button_disabled, help=generate_button_help ) st.markdown("---") # --- Main Workflow Triggered by Button --- if analyze_button: # Double check model is loaded (though button should be disabled) if not st.session_state.current_model_pipeline: st.error("❌ Cannot generate: No model loaded. Please use the sidebar.") st.stop() if not keyword: st.warning("⚠️ Please enter a keyword.") st.stop() st.session_state.last_keyword = keyword # Store keyword for potential reuse ua = get_user_agent() # Ensure user agent is ready if not ua: st.error("❌ User Agent failed. Cannot scrape."); st.stop() # Reset previous generation results for this run st.session_state.generated_content = "" st.session_state.internal_link_suggestions = "" # --- Step 1: Scrape Competitors (with status updates) --- # Check if scrape needed if keyword != st.session_state.get('_internal_last_scrape_keyword', None) or not st.session_state.competitor_analysis_text: logger.info(f"Scraping needed for '{keyword}'.") st.session_state.competitor_analysis_text = "" # Clear old text st.session_state.scraped_urls = [] st.session_state['_internal_last_scrape_keyword'] = "" # Reset marker until success scrape_container = st.container() with scrape_container: st.info(f"πŸ•ΈοΈ Fetching URLs and Scraping Top {num_results} Competitors...") progress_text = "Scraping progress..." scrape_progress_bar = st.progress(0, text=progress_text) status_area = st.container() # Use container for multiple status lines urls = get_top_urls(keyword, num_results) st.session_state.scraped_urls = urls if urls: all_texts = [] scraped_count = 0 for i, url in enumerate(urls): with status_area: # Show status within the designated area scrape_status_ui = st.empty() # Placeholder for single URL status content = scrape_page_content(url, ua, scrape_status_ui) if content: all_texts.append(content) scraped_count += 1 scrape_progress_bar.progress((i + 1) / len(urls), text=f"Processed URL {i+1}/{len(urls)}...") time.sleep(0.1) # UI update breather st.session_state.competitor_analysis_text = "\n\n --- ARTICLE SEPARATOR --- \n\n".join(all_texts) st.session_state['_internal_last_scrape_keyword'] = keyword # Mark scrape success for this keyword if st.session_state.competitor_analysis_text: scrape_container.success(f"βœ… Scraped {scraped_count}/{len(urls)} pages. Analysis text: {len(st.session_state.competitor_analysis_text)} chars.") else: scrape_container.error("❌ Failed to scrape sufficient content. Cannot generate article.") st.stop() else: scrape_container.error("❌ Could not retrieve competitor URLs. Cannot proceed.") st.stop() else: st.success(f"βœ”οΈ Using previously scraped data for '{keyword}'. ({len(st.session_state.competitor_analysis_text)} chars).") # --- Step 2: Generate Main Content --- st.info(f"✍️ Generating Content with {st.session_state.current_model_id}...") generation_status = st.status("Sending request to LLM...") with generation_status: st.write(f"**Tone:** {selected_tone}, **Audience:** {selected_audience}, **Max Tokens:** {max_gen_tokens}") gen_prompt = build_content_generation_prompt( keyword, st.session_state.competitor_analysis_text, selected_tone, selected_audience, st.session_state.current_model_id ) generated_content = run_llm_generation(st.session_state.current_model_pipeline, gen_prompt, max_gen_tokens) st.session_state.generated_content = generated_content if generated_content: generation_status.update(label="βœ… Content Generation Complete!", state="complete") else: generation_status.update(label="❌ Content Generation Failed.", state="error") st.stop() # Stop if main content fails # --- Display Outputs (Outside the button click conditional) --- if st.session_state.generated_content: st.markdown("---") st.subheader("πŸ“ Generated SEO Content") st.markdown(st.session_state.generated_content) st.text_area("Copyable Markdown:", st.session_state.generated_content, height=400, key="generated_content_area_display") # --- Internal Linking Section --- if st.session_state.last_website_url: # Only show if URL was provided st.markdown("---") st.subheader("πŸ”— Internal Linking Suggestions") if st.button("Suggest Internal Links", key="suggest_links_button_display"): link_status = st.status(f"Analyzing content for link opportunities ({st.session_state.current_model_id})...") with link_status: st.write(f"Website context: {st.session_state.last_website_url}") link_prompt = build_internal_link_prompt(st.session_state.generated_content, keyword, st.session_state.last_website_url) link_suggestions = run_llm_generation(st.session_state.current_model_pipeline, link_prompt, max_tokens=500) # Use fewer tokens st.session_state.internal_link_suggestions = link_suggestions if link_suggestions: link_status.update(label="βœ… Link suggestions generated!", state="complete") else: link_status.update(label="❌ Failed to generate link suggestions.", state="error") # Display suggestions if they exist in state if st.session_state.internal_link_suggestions: st.markdown(st.session_state.internal_link_suggestions) st.info("ℹ️ AI suggestions only. Verify relevance and find actual URLs on your site.") else: st.markdown("---") st.info("Provide your website URL in the sidebar to enable internal link suggestions after generating content.")