Spaces:
Runtime error
Runtime error
| import streamlit as st | |
| import requests | |
| from bs4 import BeautifulSoup, Comment | |
| from googlesearch import search | |
| from fake_useragent import UserAgent | |
| from transformers import pipeline, AutoTokenizer | |
| import torch | |
| import time | |
| import logging | |
| import re | |
| from retrying import retry | |
| import gc | |
| # --- Configuration --- | |
| # Model Options (Ensure keys clearly indicate resource needs) | |
| MODEL_OPTIONS = { | |
| # Lighter Models (More likely to work on free tiers) | |
| "Mistral-7B-Instruct (Fast, Med RAM)": "mistralai/Mistral-7B-Instruct-v0.2", | |
| "Gemma-7B-IT (Google, Med RAM)": "google/gemma-7b-it", | |
| "Phi-3-Mini-4k-Instruct (Microsoft, Small, Good)": "microsoft/Phi-3-mini-4k-instruct", # Requires trust_remote_code | |
| # Medium Models (May require upgraded tiers / more RAM/GPU) | |
| "Llama-3-8B-Instruct (Meta, High Quality, High RAM/GPU)": "meta-llama/Meta-Llama-3-8B-Instruct", | |
| "Phi-3-Medium-4k-Instruct (Microsoft, Strong, High RAM/GPU)": "microsoft/Phi-3-medium-4k-instruct", # Requires trust_remote_code | |
| "Qwen1.5-14B-Chat (Alibaba, Strong, High RAM/GPU)": "Qwen/Qwen1.5-14B-Chat", | |
| # Larger Models (Very likely require significant paid resources) | |
| "DeepSeek-Coder-V2-Instruct (DeepSeek, High RAM/GPU)": "deepseek-ai/DeepSeek-Coder-V2-Instruct", # Requires trust_remote_code | |
| } | |
| DEFAULT_MODEL_KEY = "Mistral-7B-Instruct (Fast, Med RAM)" # Start with a lighter default selection | |
| # Scraping & Generation Defaults | |
| DEFAULT_NUM_RESULTS = 4 # Reduced default slightly | |
| REQUEST_TIMEOUT = 15 | |
| MAX_COMPETITOR_TEXT_LENGTH = 5500 | |
| DEFAULT_MAX_GENERATION_TOKENS = 2800 | |
| # Retry settings | |
| RETRY_WAIT_FIXED = 2000 | |
| RETRY_STOP_MAX_ATTEMPT = 3 | |
| # Tone & Audience Options | |
| TONE_OPTIONS = ["Conversational", "Professional", "Authoritative", "Technical", "Friendly", "Engaging", "Educational", "Persuasive"] | |
| AUDIENCE_OPTIONS = ["Beginners", "General Audience", "Experts", "Professionals (Specific Field)", "Customers", "Students", "Decision Makers"] | |
| # --- Logging Setup --- | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - [%(funcName)s] - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # --- State Management --- | |
| # Initialize session state keys carefully | |
| if 'current_model_pipeline' not in st.session_state: st.session_state.current_model_pipeline = None | |
| if 'current_model_id' not in st.session_state: st.session_state.current_model_id = "" | |
| # Data related state | |
| if 'scraped_urls' not in st.session_state: st.session_state.scraped_urls = [] | |
| if 'competitor_analysis_text' not in st.session_state: st.session_state.competitor_analysis_text = "" | |
| if 'generated_content' not in st.session_state: st.session_state.generated_content = "" | |
| if 'internal_link_suggestions' not in st.session_state: st.session_state.internal_link_suggestions = "" | |
| if 'last_keyword' not in st.session_state: st.session_state.last_keyword = "" | |
| if 'last_website_url' not in st.session_state: st.session_state.last_website_url = "" | |
| if '_internal_last_scrape_keyword' not in st.session_state: st.session_state._internal_last_scrape_keyword = "" | |
| # --- Helper Functions --- | |
| def clear_gpu_memory(): | |
| """Attempts to clear GPU memory cache and run garbage collection.""" | |
| logger.info("Attempting to clear GPU memory...") | |
| if torch.cuda.is_available(): | |
| try: | |
| st.session_state.current_model_pipeline = None # Ensure reference is removed FIRST | |
| gc.collect() # Run Python garbage collection | |
| torch.cuda.empty_cache() # Tell PyTorch to release cached memory | |
| gc.collect() # Run GC again | |
| logger.info("GPU memory cache cleared and garbage collected.") | |
| st.toast("Cleared GPU memory.", icon="π§Ή") | |
| except Exception as e: | |
| logger.error(f"Error clearing GPU memory: {e}", exc_info=True) | |
| st.toast(f"Error clearing GPU memory: {e}", icon="β") | |
| else: | |
| logger.info("No GPU available, skipping memory clearing.") | |
| st.session_state.current_model_pipeline = None # Still clear the reference | |
| gc.collect() | |
| def reset_app_data(): | |
| """Clears stored scraping and generation results, keeps model loaded.""" | |
| st.session_state.scraped_urls = [] | |
| st.session_state.competitor_analysis_text = "" | |
| st.session_state.generated_content = "" | |
| st.session_state.internal_link_suggestions = "" | |
| st.session_state.last_keyword = "" | |
| st.session_state._internal_last_scrape_keyword = "" | |
| logger.info("App data state reset (scraped/generated content).") | |
| st.toast("Cleared scraped data and generated content.", icon="ποΈ") | |
| # --- Model Loading (On Demand) --- | |
| def load_model(model_id_to_load): | |
| """Loads the selected model, unloading any previous one.""" | |
| # If the requested model is already loaded, do nothing | |
| if st.session_state.get('current_model_id') == model_id_to_load and st.session_state.get('current_model_pipeline') is not None: | |
| logger.info(f"Model {model_id_to_load} is already loaded.") | |
| st.toast(f"{model_id_to_load} is already loaded.", icon="β ") | |
| return True | |
| # Unload previous model if one exists and is different | |
| if st.session_state.get('current_model_pipeline') is not None: | |
| logger.info(f"Unloading previous model: {st.session_state.current_model_id}") | |
| st.toast(f"Unloading {st.session_state.current_model_id}...", icon="π§Ή") | |
| clear_gpu_memory() # This sets pipeline to None and clears cache | |
| st.session_state.current_model_id = "" # Clear model ID state | |
| # Load the new model | |
| st.toast(f"Loading {model_id_to_load}... This may take time & RAM/GPU.", icon="β³") | |
| logger.info(f"Attempting to load LLM pipeline for model: {model_id_to_load}") | |
| pipeline_instance = None | |
| success = False | |
| try: | |
| dtype = torch.bfloat16 if torch.cuda.is_available() and torch.cuda.is_bf16_supported() else torch.float16 if torch.cuda.is_available() else torch.float32 | |
| logger.info(f"Using dtype: {dtype}") | |
| trust_code_models = [ | |
| "microsoft/Phi-3-mini-4k-instruct", | |
| "microsoft/Phi-3-medium-4k-instruct", | |
| "deepseek-ai/DeepSeek-Coder-V2-Instruct", | |
| # Add others if needed | |
| ] | |
| trust_code = model_id_to_load in trust_code_models | |
| logger.info(f"Trust remote code for {model_id_to_load}: {trust_code}") | |
| # Display spinner during the actual loading | |
| with st.spinner(f"Loading {model_id_to_load} into memory..."): | |
| pipeline_instance = pipeline( | |
| "text-generation", | |
| model=model_id_to_load, | |
| trust_remote_code=trust_code, | |
| device_map="auto", | |
| torch_dtype=dtype, | |
| ) | |
| # Handle pad_token | |
| if pipeline_instance.tokenizer.pad_token_id is None: | |
| pipeline_instance.tokenizer.pad_token_id = pipeline_instance.tokenizer.eos_token_id | |
| if hasattr(pipeline_instance.model, 'config'): | |
| pipeline_instance.model.config.pad_token_id = pipeline_instance.tokenizer.eos_token_id | |
| logger.warning(f"Set pad_token_id to eos_token_id for {model_id_to_load}") | |
| logger.info(f"LLM pipeline loaded successfully for {model_id_to_load}.") | |
| st.session_state.current_model_pipeline = pipeline_instance | |
| st.session_state.current_model_id = model_id_to_load | |
| st.toast(f"Model {model_id_to_load} loaded!", icon="β ") | |
| success = True | |
| except ImportError as e: | |
| logger.error(f"ImportError loading {model_id_to_load}: {e}. Missing dependency?", exc_info=True) | |
| st.error(f"Load Error: Missing library for {model_id_to_load}? Check logs. Details: {e}") | |
| except Exception as e: | |
| logger.error(f"Failed to load {model_id_to_load}: {e}", exc_info=True) | |
| st.error(f"Failed to load {model_id_to_load}. Error: {e}. Check resource limits (RAM/GPU) & logs.") | |
| clear_gpu_memory() # Attempt to clean up if loading failed | |
| st.session_state.current_model_id = "" # Ensure state reflects failure | |
| finally: | |
| return success # Return status | |
| # --- User Agent Caching --- | |
| def get_user_agent(): | |
| # (Same as previous version) | |
| logger.info("Initializing FakeUserAgent.") | |
| try: | |
| return UserAgent(fallback='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36') | |
| except Exception as e: | |
| logger.error(f"Failed to initialize FakeUserAgent: {e}", exc_info=True) | |
| st.error(f"Could not initialize User Agent generator. Error: {e}") | |
| return None | |
| # --- Core Functions (Scraping, Prompt Building, Generation Logic) --- | |
| # These functions (get_top_urls, scrape_page_content, clean_text, fetch_url_content, | |
| # build_content_generation_prompt, build_internal_link_prompt, run_llm_generation) | |
| # remain largely the same as the previous version, as they were already quite robust. | |
| # Ensure `run_llm_generation` correctly uses the pipeline passed to it (which it did). | |
| # --- (Include the definitions for the core functions here - unchanged from previous version) --- | |
| def fetch_url_content(url, headers): | |
| logger.info(f"Fetching {url} (Attempt {fetch_url_content.retry.attempt_number+1}/{RETRY_STOP_MAX_ATTEMPT})") | |
| response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT) | |
| response.raise_for_status() | |
| if 'text/html' not in response.headers.get('Content-Type', ''): | |
| logger.warning(f"Skipping URL {url} - Not HTML") | |
| return None | |
| if len(response.content) > 10 * 1024 * 1024: # 10 MB limit | |
| logger.warning(f"Skipping URL {url} - Content too large") | |
| return None | |
| return response | |
| def clean_text(text): | |
| text = re.sub(r'\s{2,}', ' ', text) | |
| text = re.sub(r'\n+', '\n', text) | |
| lines = text.split('\n') | |
| cleaned_lines = [] | |
| min_line_length = 20 | |
| min_words_per_line = 3 | |
| skip_phrases = [ | |
| 'copyright Β©', 'all rights reserved', 'privacy policy', 'terms of use', 'terms and conditions', | |
| 'cookie policy', 'subscribe', 'sign up', 'log in', 'advertisement', 'share this', 'related posts', | |
| 'leave a reply', 'comment', 'posted on', 'by author', 'tags:', 'categories:', 'follow us', 'read more', | |
| 'click here', 'learn more', 'next article', 'previous article', 'you may also like', 'related topics' | |
| ] | |
| for line in lines: | |
| stripped_line = line.strip() | |
| lower_line = stripped_line.lower() | |
| if len(stripped_line) >= min_line_length and \ | |
| len(stripped_line.split()) >= min_words_per_line and \ | |
| not any(phrase in lower_line for phrase in skip_phrases): | |
| cleaned_lines.append(stripped_line) | |
| text = '\n'.join(cleaned_lines) | |
| return text.strip() | |
| def scrape_page_content(url, user_agent, scrape_status_ui): | |
| if not user_agent: logger.error("User Agent missing."); return "" | |
| headers = { | |
| 'User-Agent': user_agent.random, | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
| 'Accept-Language': 'en-US,en;q=0.5', 'Referer': 'https://www.google.com/', | |
| 'DNT': '1', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1' | |
| } | |
| try: | |
| response = fetch_url_content(url, headers) | |
| if response is None: scrape_status_ui.warning(f"β οΈ Skip/Fail fetch: {url}", icon="πΈοΈ"); return "" | |
| soup = BeautifulSoup(response.content, 'lxml') | |
| tags_to_remove = ["script", "style", "nav", "footer", "aside", "form", "header", "noscript", "button", "input", "select", "textarea", "figure", "figcaption", "iframe", "svg", "path", "meta", "link"] | |
| for element in soup(tags_to_remove): element.decompose() | |
| for comment in soup.find_all(string=lambda text: isinstance(text, Comment)): comment.extract() | |
| main_content = (soup.find('main') or soup.find('article') or soup.find(role='main') or | |
| soup.find('div', class_=re.compile(r'(content|main|body|post|entry|article)', re.I)) or | |
| soup.find('div', id=re.compile(r'(content|main|body|post|entry|article)', re.I))) | |
| target_soup = main_content if main_content else soup.body | |
| if not target_soup: logger.warning(f"No body/main: {url}"); scrape_status_ui.warning(f"β οΈ No body/main: {url}", icon="πΈοΈ"); return "" | |
| texts = target_soup.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'td', 'th', 'blockquote', 'span']) | |
| content_parts = [] | |
| for elem in texts: | |
| if elem.find_parent(tags_to_remove): continue | |
| elem_text = elem.get_text(separator=' ', strip=True) | |
| if len(elem_text) > 10 and len(elem_text.split()) > 1: | |
| if elem.name in ['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'blockquote', 'tr', 'div']: # Added div for structure | |
| content_parts.append(elem_text + "\n") | |
| else: content_parts.append(elem_text + " ") | |
| content = "".join(content_parts) | |
| cleaned_content = clean_text(content) | |
| if len(cleaned_content) < 150: logger.warning(f"Low content ({len(cleaned_content)} chars): {url}"); scrape_status_ui.warning(f"β οΈ Low content: {url}", icon="πΈοΈ") | |
| else: logger.info(f"Scraped {len(cleaned_content)} chars: {url}"); scrape_status_ui.success(f"β Scraped: {url} ({len(cleaned_content)} chars)", icon="πΈοΈ") | |
| time.sleep(0.6) | |
| return cleaned_content | |
| except requests.exceptions.RequestException as e: logger.warning(f"Final scrape fail: {url}. Err: {e}"); scrape_status_ui.error(f"β Fail scrape: {url} ({e})", icon="πΈοΈ"); return "" | |
| except Exception as e: logger.error(f"Unexpected scrape error: {url}: {e}", exc_info=True); scrape_status_ui.error(f"β Error scraping: {url} (Logs)", icon="πΈοΈ"); return "" | |
| def get_top_urls(keyword, num_results): | |
| logger.info(f"Fetching top {num_results} URLs for keyword: '{keyword}'") | |
| try: | |
| urls = list(search(keyword, num_results=num_results, sleep_interval=2.5, lang="en", timeout=15)) | |
| logger.info(f"Found URLs: {urls}") | |
| if not urls: st.warning(f"β οΈ No Google search results found for '{keyword}'."); return [] | |
| return urls | |
| except Exception as e: | |
| error_message = str(e); logger.error(f"GSearch Error: {error_message}", exc_info=True) | |
| if "429" in error_message: st.error(f"β Google search blocked (429). WAIT before retrying.") | |
| elif "timed out" in error_message: st.error(f"β Google search timed out.") | |
| else: st.error(f"β GSearch Error: {error_message[:100]}...") | |
| return [] | |
| def build_content_generation_prompt(keyword, competitor_texts, tone, audience, model_id): | |
| logger.info(f"Build content gen prompt. Tone: {tone}, Audience: {audience}. Comp length: {len(competitor_texts)}") | |
| if len(competitor_texts) > MAX_COMPETITOR_TEXT_LENGTH: | |
| competitor_summary = competitor_texts[:MAX_COMPETITOR_TEXT_LENGTH] + "... [Truncated]" | |
| logger.warning(f"Comp text truncated.") | |
| else: competitor_summary = competitor_texts | |
| system_prompt = f"""You are an expert SEO Content Strategist & world-class Copywriter. Task: Analyze competitor text & generate a significantly superior, comprehensive, user-first article for keyword '{keyword}', targeting '{audience}' audience with '{tone}' tone. Focus on quality, depth, clarity, fulfilling user intent better than competition.""" | |
| user_prompt = f"""**Keyword:** "{keyword}" | |
| **Audience:** {audience} | |
| **Tone:** {tone} | |
| **Objective:** Generate exceptional, SEO-optimized article for "{keyword}" designed to outperform top content via superior value, insights, UX. | |
| **Competitor Analysis Context (Analyze for topics, depth, strengths, WEAKNESSES/GAPS):** | |
| --- BEGIN COMPETITOR --- | |
| {competitor_summary} | |
| --- END COMPETITOR --- | |
| **Content Gen Instructions:** | |
| 1. **Value & Depth:** Be demonstrably better. Deeper, clearer, actionable advice, unique perspectives/data, fill gaps. Address user intent exhaustively. | |
| 2. **User-First & Humanized:** Write for '{audience}' in '{tone}'. Clear, concise, short paras, varied sentences, engaging Qs. Logical flow, readable. | |
| 3. **Structure (Strict Markdown):** Compelling H2 Title. Engaging Intro (50-150 words): Hook, purpose/value, outline. Logical Sections (H2)/Sub-sections (H3): Descriptive, keyword-aware headings. Readability: Bullets (`* `), Numbered lists (`1. `), **Bold** (strategic). Comprehensive Body: Expand beyond competitors. Strong Conclusion: Summarize takeaways, final insight/CTA. | |
| 4. **SEO (Natural):** Weave "{keyword}" & LSI terms into title, headings, intro, body, conclusion. Prioritize relevance/clarity over density. NO keyword stuffing. | |
| 5. **Originality & Credibility:** 100% unique. Use comp text ONLY for analysis. NO plagiarism. Factual accuracy. | |
| 6. **Negative Constraints:** DO NOT: Rehash competitors; use preambles/sign-offs; use excessive jargon (unless 'Experts'); write long paragraphs; stuff keywords; invent facts. | |
| **Output:** ONLY the Markdown article, starting with H2 title.""" | |
| messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}] | |
| logger.info(f"Content prompt done for {model_id}.") | |
| return messages | |
| def build_internal_link_prompt(generated_content, keyword, website_url): | |
| logger.info(f"Build internal link prompt for URL: {website_url}") | |
| system_prompt = "You are an SEO assistant specialized in identifying internal linking opportunities." | |
| user_prompt = f"""**Website Base URL:** {website_url} | |
| **Main Topic of Article:** "{keyword}" | |
| **Task:** Review the article below. Identify 3-5 phrases/sentences for internal links relevant to {website_url}. | |
| **For each opportunity, provide:** | |
| 1. Exact anchor text phrase/sentence from article. | |
| 2. Brief description of the *type* of relevant content needed (e.g., "detailed guide on [sub-topic]", "service page for [service]"). | |
| **IMPORTANT:** Do NOT invent URLs. Describe the *type* of page. Choose natural anchor text. Focus on value. Format as Markdown numbered list. | |
| **Article Content (Analyze first ~8000 chars):** | |
| --- BEGIN ARTICLE --- | |
| {generated_content[:8000]} | |
| --- END ARTICLE ---""" | |
| messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}] | |
| return messages | |
| def run_llm_generation(pipe, messages, max_tokens): | |
| if pipe is None: st.error("β LLM Pipeline missing."); return None | |
| model_id = pipe.model.name_or_path | |
| logger.info(f"Running generation: {model_id}. Max tokens: {max_tokens}.") | |
| start_time = time.time() | |
| try: | |
| gen_args = {"max_new_tokens": max_tokens, "temperature": 0.7, "top_p": 0.95, "top_k": 40, | |
| "do_sample": True, "pad_token_id": pipe.tokenizer.eos_token_id, "eos_token_id": pipe.tokenizer.eos_token_id} | |
| logger.info(f"Gen args: {gen_args}") | |
| results = pipe(messages, **gen_args) | |
| # --- Robust Extraction --- | |
| assistant_response = None | |
| if results and results[0] and 'generated_text' in results[0]: | |
| output_data = results[0]['generated_text'] | |
| if isinstance(output_data, list): assistant_message = next((msg['content'] for msg in reversed(output_data) if msg['role'] == 'assistant'), None); assistant_response = assistant_message | |
| elif isinstance(output_data, str): | |
| last_prompt_content = messages[-1]['content'] | |
| last_prompt_index = output_data.rfind(last_prompt_content) | |
| if last_prompt_index != -1: potential_response = output_data[last_prompt_index + len(last_prompt_content):].strip() | |
| else: potential_response = output_data | |
| assistant_response = re.sub(r"^(assistant|ASSISTANT|</s>|<\|im_end\|>|<\|assistant\|>)\s*[:\n]*", "", potential_response, flags=re.IGNORECASE | re.DOTALL).strip() | |
| else: logger.error(f"Unexpected output format: {type(output_data)}") | |
| else: logger.error(f"Unexpected LLM output structure: {results}") | |
| # --- Validation --- | |
| if assistant_response: | |
| duration = time.time() - start_time; logger.info(f"Gen success ({model_id}) {duration:.2f}s. Len: {len(assistant_response)}.") | |
| assistant_response = re.sub(r"^```markdown\n", "", assistant_response).strip(); assistant_response = re.sub(r"\n```$", "", assistant_response).strip() | |
| if len(assistant_response) < 30: logger.warning(f"Gen output very short ({len(assistant_response)})."); st.warning("β οΈ Gen output very short.") | |
| return assistant_response | |
| else: logger.error(f"Failed parse assistant response. Output: {results}"); st.error("β Failed parse LLM response. Check logs."); return None | |
| except torch.cuda.OutOfMemoryError: logger.error(f"OOM Error ({model_id})!", exc_info=True); st.error(f"β OOM Error ({model_id}). Try smaller model/less tokens/restart."); clear_gpu_memory(); return None | |
| except Exception as e: logger.error(f"Unhandled gen error ({model_id}): {e}", exc_info=True); st.error(f"β Unexpected gen error: {e}"); return None | |
| # --- Streamlit App UI --- | |
| st.set_page_config(layout="wide", page_title="On-Demand SEO Content Gen") | |
| # --- Sidebar --- | |
| with st.sidebar: | |
| st.header("βοΈ Configuration") | |
| # Model Selection & Loading Area | |
| st.subheader("1. Select & Load Model") | |
| selected_model_key = st.selectbox( | |
| "Choose Language Model:", | |
| options=list(MODEL_OPTIONS.keys()), | |
| index=list(MODEL_OPTIONS.keys()).index(DEFAULT_MODEL_KEY), | |
| key="model_selector", # Key for potential state access | |
| help="Choose AI model. Performance & resources vary. Load required." | |
| ) | |
| selected_model_id = MODEL_OPTIONS[selected_model_key] | |
| # Display current status and load button | |
| load_button_placeholder = st.empty() # Placeholder for dynamic button text/state | |
| model_status_placeholder = st.empty() # Placeholder for status message | |
| if st.session_state.get('current_model_id') == selected_model_id and st.session_state.get('current_model_pipeline') is not None: | |
| model_status_placeholder.success(f"β Loaded: `{selected_model_id}`") | |
| load_button_text = f"Switch from {selected_model_key}" # Or "Reload" | |
| elif st.session_state.get('current_model_pipeline') is not None: | |
| model_status_placeholder.warning(f"β οΈ Loaded: `{st.session_state.current_model_id}`\nSelected: `{selected_model_id}`") | |
| load_button_text = f"Unload Current & Load {selected_model_key}" | |
| else: | |
| model_status_placeholder.info("βΉοΈ No model loaded.") | |
| load_button_text = f"Load Selected: {selected_model_key}" | |
| if load_button_placeholder.button(load_button_text, key="load_model"): | |
| load_model(selected_model_id) | |
| # Rerun to update status placeholders immediately after load attempt | |
| st.rerun() | |
| st.markdown("---") | |
| # Content Settings | |
| st.subheader("2. Content Settings") | |
| with st.expander("Adjust Content Parameters", expanded=False): | |
| num_results = st.slider("Competitors to Analyze:", min_value=1, max_value=8, value=DEFAULT_NUM_RESULTS, step=1) | |
| selected_tone = st.selectbox("Content Tone:", options=TONE_OPTIONS, index=TONE_OPTIONS.index("Engaging")) | |
| selected_audience = st.selectbox("Target Audience:", options=AUDIENCE_OPTIONS, index=AUDIENCE_OPTIONS.index("General Audience")) | |
| max_gen_tokens = st.number_input("Max Generation Tokens:", min_value=500, max_value=8192, value=DEFAULT_MAX_GENERATION_TOKENS, step=100) | |
| # Internal Linking | |
| st.subheader("3. Internal Linking (Optional)") | |
| with st.expander("Configure Link Suggestions", expanded=False): | |
| website_url = st.text_input("Your Website URL:", placeholder="https://www.example.com", value=st.session_state.get("last_website_url", ""), key="website_url_input") | |
| # Update state immediately on change if needed, or just read before use | |
| st.session_state.last_website_url = website_url | |
| st.markdown("---") | |
| st.header("βΉοΈ App Info & Actions") | |
| st.info(f""" | |
| - **Status:** {'Model Loaded' if st.session_state.current_model_pipeline else 'No Model Loaded'} | |
| - **Competitors:** Top {num_results} | |
| - **Max Generation:** ~{max_gen_tokens} tokens | |
| """) | |
| st.warning(""" | |
| - **Load Model First:** Select a model and click 'Load' before generating. | |
| - **Resource Use:** Models need significant RAM/GPU. Loading WILL fail if resources are insufficient. | |
| - **Review Output:** AI provides drafts. ALWAYS review, edit, fact-check. | |
| """) | |
| if st.button("Clear Scraped/Generated Data", key="clear_data"): | |
| reset_app_data() | |
| # --- Main App Area --- | |
| st.title("β¨ On-Demand SEO Content Generator β¨") | |
| st.markdown(f"Load your chosen AI model, then generate SEO-focused content.") | |
| # User Input Area | |
| st.subheader("Keyword & Generation") | |
| keyword = st.text_input("Enter Primary Target Keyword:", placeholder="e.g., vertical hydroponics guide", value=st.session_state.get("last_keyword", ""), key="keyword_input") | |
| # Disable button if model not loaded | |
| generate_button_disabled = st.session_state.current_model_pipeline is None | |
| generate_button_help = "Load a model from the sidebar first." if generate_button_disabled else "Analyze competitors and generate article." | |
| analyze_button = st.button( | |
| "Analyze Competitors & Generate Content", | |
| type="primary", | |
| key="generate_button", | |
| disabled=generate_button_disabled, | |
| help=generate_button_help | |
| ) | |
| st.markdown("---") | |
| # --- Main Workflow Triggered by Button --- | |
| if analyze_button: | |
| # Double check model is loaded (though button should be disabled) | |
| if not st.session_state.current_model_pipeline: | |
| st.error("β Cannot generate: No model loaded. Please use the sidebar.") | |
| st.stop() | |
| if not keyword: | |
| st.warning("β οΈ Please enter a keyword.") | |
| st.stop() | |
| st.session_state.last_keyword = keyword # Store keyword for potential reuse | |
| ua = get_user_agent() # Ensure user agent is ready | |
| if not ua: st.error("β User Agent failed. Cannot scrape."); st.stop() | |
| # Reset previous generation results for this run | |
| st.session_state.generated_content = "" | |
| st.session_state.internal_link_suggestions = "" | |
| # --- Step 1: Scrape Competitors (with status updates) --- | |
| # Check if scrape needed | |
| if keyword != st.session_state.get('_internal_last_scrape_keyword', None) or not st.session_state.competitor_analysis_text: | |
| logger.info(f"Scraping needed for '{keyword}'.") | |
| st.session_state.competitor_analysis_text = "" # Clear old text | |
| st.session_state.scraped_urls = [] | |
| st.session_state['_internal_last_scrape_keyword'] = "" # Reset marker until success | |
| scrape_container = st.container() | |
| with scrape_container: | |
| st.info(f"πΈοΈ Fetching URLs and Scraping Top {num_results} Competitors...") | |
| progress_text = "Scraping progress..." | |
| scrape_progress_bar = st.progress(0, text=progress_text) | |
| status_area = st.container() # Use container for multiple status lines | |
| urls = get_top_urls(keyword, num_results) | |
| st.session_state.scraped_urls = urls | |
| if urls: | |
| all_texts = [] | |
| scraped_count = 0 | |
| for i, url in enumerate(urls): | |
| with status_area: # Show status within the designated area | |
| scrape_status_ui = st.empty() # Placeholder for single URL status | |
| content = scrape_page_content(url, ua, scrape_status_ui) | |
| if content: | |
| all_texts.append(content) | |
| scraped_count += 1 | |
| scrape_progress_bar.progress((i + 1) / len(urls), text=f"Processed URL {i+1}/{len(urls)}...") | |
| time.sleep(0.1) # UI update breather | |
| st.session_state.competitor_analysis_text = "\n\n --- ARTICLE SEPARATOR --- \n\n".join(all_texts) | |
| st.session_state['_internal_last_scrape_keyword'] = keyword # Mark scrape success for this keyword | |
| if st.session_state.competitor_analysis_text: | |
| scrape_container.success(f"β Scraped {scraped_count}/{len(urls)} pages. Analysis text: {len(st.session_state.competitor_analysis_text)} chars.") | |
| else: | |
| scrape_container.error("β Failed to scrape sufficient content. Cannot generate article.") | |
| st.stop() | |
| else: | |
| scrape_container.error("β Could not retrieve competitor URLs. Cannot proceed.") | |
| st.stop() | |
| else: | |
| st.success(f"βοΈ Using previously scraped data for '{keyword}'. ({len(st.session_state.competitor_analysis_text)} chars).") | |
| # --- Step 2: Generate Main Content --- | |
| st.info(f"βοΈ Generating Content with {st.session_state.current_model_id}...") | |
| generation_status = st.status("Sending request to LLM...") | |
| with generation_status: | |
| st.write(f"**Tone:** {selected_tone}, **Audience:** {selected_audience}, **Max Tokens:** {max_gen_tokens}") | |
| gen_prompt = build_content_generation_prompt( | |
| keyword, st.session_state.competitor_analysis_text, selected_tone, selected_audience, st.session_state.current_model_id | |
| ) | |
| generated_content = run_llm_generation(st.session_state.current_model_pipeline, gen_prompt, max_gen_tokens) | |
| st.session_state.generated_content = generated_content | |
| if generated_content: | |
| generation_status.update(label="β Content Generation Complete!", state="complete") | |
| else: | |
| generation_status.update(label="β Content Generation Failed.", state="error") | |
| st.stop() # Stop if main content fails | |
| # --- Display Outputs (Outside the button click conditional) --- | |
| if st.session_state.generated_content: | |
| st.markdown("---") | |
| st.subheader("π Generated SEO Content") | |
| st.markdown(st.session_state.generated_content) | |
| st.text_area("Copyable Markdown:", st.session_state.generated_content, height=400, key="generated_content_area_display") | |
| # --- Internal Linking Section --- | |
| if st.session_state.last_website_url: # Only show if URL was provided | |
| st.markdown("---") | |
| st.subheader("π Internal Linking Suggestions") | |
| if st.button("Suggest Internal Links", key="suggest_links_button_display"): | |
| link_status = st.status(f"Analyzing content for link opportunities ({st.session_state.current_model_id})...") | |
| with link_status: | |
| st.write(f"Website context: {st.session_state.last_website_url}") | |
| link_prompt = build_internal_link_prompt(st.session_state.generated_content, keyword, st.session_state.last_website_url) | |
| link_suggestions = run_llm_generation(st.session_state.current_model_pipeline, link_prompt, max_tokens=500) # Use fewer tokens | |
| st.session_state.internal_link_suggestions = link_suggestions | |
| if link_suggestions: link_status.update(label="β Link suggestions generated!", state="complete") | |
| else: link_status.update(label="β Failed to generate link suggestions.", state="error") | |
| # Display suggestions if they exist in state | |
| if st.session_state.internal_link_suggestions: | |
| st.markdown(st.session_state.internal_link_suggestions) | |
| st.info("βΉοΈ AI suggestions only. Verify relevance and find actual URLs on your site.") | |
| else: | |
| st.markdown("---") | |
| st.info("Provide your website URL in the sidebar to enable internal link suggestions after generating content.") |