garyuzair's picture
Update app.py
c0388eb verified
import streamlit as st
import requests
from bs4 import BeautifulSoup, Comment
from googlesearch import search
from fake_useragent import UserAgent
from transformers import pipeline, AutoTokenizer
import torch
import time
import logging
import re
from retrying import retry
import gc
# --- Configuration ---
# Model Options (Ensure keys clearly indicate resource needs)
MODEL_OPTIONS = {
# Lighter Models (More likely to work on free tiers)
"Mistral-7B-Instruct (Fast, Med RAM)": "mistralai/Mistral-7B-Instruct-v0.2",
"Gemma-7B-IT (Google, Med RAM)": "google/gemma-7b-it",
"Phi-3-Mini-4k-Instruct (Microsoft, Small, Good)": "microsoft/Phi-3-mini-4k-instruct", # Requires trust_remote_code
# Medium Models (May require upgraded tiers / more RAM/GPU)
"Llama-3-8B-Instruct (Meta, High Quality, High RAM/GPU)": "meta-llama/Meta-Llama-3-8B-Instruct",
"Phi-3-Medium-4k-Instruct (Microsoft, Strong, High RAM/GPU)": "microsoft/Phi-3-medium-4k-instruct", # Requires trust_remote_code
"Qwen1.5-14B-Chat (Alibaba, Strong, High RAM/GPU)": "Qwen/Qwen1.5-14B-Chat",
# Larger Models (Very likely require significant paid resources)
"DeepSeek-Coder-V2-Instruct (DeepSeek, High RAM/GPU)": "deepseek-ai/DeepSeek-Coder-V2-Instruct", # Requires trust_remote_code
}
DEFAULT_MODEL_KEY = "Mistral-7B-Instruct (Fast, Med RAM)" # Start with a lighter default selection
# Scraping & Generation Defaults
DEFAULT_NUM_RESULTS = 4 # Reduced default slightly
REQUEST_TIMEOUT = 15
MAX_COMPETITOR_TEXT_LENGTH = 5500
DEFAULT_MAX_GENERATION_TOKENS = 2800
# Retry settings
RETRY_WAIT_FIXED = 2000
RETRY_STOP_MAX_ATTEMPT = 3
# Tone & Audience Options
TONE_OPTIONS = ["Conversational", "Professional", "Authoritative", "Technical", "Friendly", "Engaging", "Educational", "Persuasive"]
AUDIENCE_OPTIONS = ["Beginners", "General Audience", "Experts", "Professionals (Specific Field)", "Customers", "Students", "Decision Makers"]
# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - [%(funcName)s] - %(message)s')
logger = logging.getLogger(__name__)
# --- State Management ---
# Initialize session state keys carefully
if 'current_model_pipeline' not in st.session_state: st.session_state.current_model_pipeline = None
if 'current_model_id' not in st.session_state: st.session_state.current_model_id = ""
# Data related state
if 'scraped_urls' not in st.session_state: st.session_state.scraped_urls = []
if 'competitor_analysis_text' not in st.session_state: st.session_state.competitor_analysis_text = ""
if 'generated_content' not in st.session_state: st.session_state.generated_content = ""
if 'internal_link_suggestions' not in st.session_state: st.session_state.internal_link_suggestions = ""
if 'last_keyword' not in st.session_state: st.session_state.last_keyword = ""
if 'last_website_url' not in st.session_state: st.session_state.last_website_url = ""
if '_internal_last_scrape_keyword' not in st.session_state: st.session_state._internal_last_scrape_keyword = ""
# --- Helper Functions ---
def clear_gpu_memory():
"""Attempts to clear GPU memory cache and run garbage collection."""
logger.info("Attempting to clear GPU memory...")
if torch.cuda.is_available():
try:
st.session_state.current_model_pipeline = None # Ensure reference is removed FIRST
gc.collect() # Run Python garbage collection
torch.cuda.empty_cache() # Tell PyTorch to release cached memory
gc.collect() # Run GC again
logger.info("GPU memory cache cleared and garbage collected.")
st.toast("Cleared GPU memory.", icon="🧹")
except Exception as e:
logger.error(f"Error clearing GPU memory: {e}", exc_info=True)
st.toast(f"Error clearing GPU memory: {e}", icon="❌")
else:
logger.info("No GPU available, skipping memory clearing.")
st.session_state.current_model_pipeline = None # Still clear the reference
gc.collect()
def reset_app_data():
"""Clears stored scraping and generation results, keeps model loaded."""
st.session_state.scraped_urls = []
st.session_state.competitor_analysis_text = ""
st.session_state.generated_content = ""
st.session_state.internal_link_suggestions = ""
st.session_state.last_keyword = ""
st.session_state._internal_last_scrape_keyword = ""
logger.info("App data state reset (scraped/generated content).")
st.toast("Cleared scraped data and generated content.", icon="πŸ—‘οΈ")
# --- Model Loading (On Demand) ---
def load_model(model_id_to_load):
"""Loads the selected model, unloading any previous one."""
# If the requested model is already loaded, do nothing
if st.session_state.get('current_model_id') == model_id_to_load and st.session_state.get('current_model_pipeline') is not None:
logger.info(f"Model {model_id_to_load} is already loaded.")
st.toast(f"{model_id_to_load} is already loaded.", icon="βœ…")
return True
# Unload previous model if one exists and is different
if st.session_state.get('current_model_pipeline') is not None:
logger.info(f"Unloading previous model: {st.session_state.current_model_id}")
st.toast(f"Unloading {st.session_state.current_model_id}...", icon="🧹")
clear_gpu_memory() # This sets pipeline to None and clears cache
st.session_state.current_model_id = "" # Clear model ID state
# Load the new model
st.toast(f"Loading {model_id_to_load}... This may take time & RAM/GPU.", icon="⏳")
logger.info(f"Attempting to load LLM pipeline for model: {model_id_to_load}")
pipeline_instance = None
success = False
try:
dtype = torch.bfloat16 if torch.cuda.is_available() and torch.cuda.is_bf16_supported() else torch.float16 if torch.cuda.is_available() else torch.float32
logger.info(f"Using dtype: {dtype}")
trust_code_models = [
"microsoft/Phi-3-mini-4k-instruct",
"microsoft/Phi-3-medium-4k-instruct",
"deepseek-ai/DeepSeek-Coder-V2-Instruct",
# Add others if needed
]
trust_code = model_id_to_load in trust_code_models
logger.info(f"Trust remote code for {model_id_to_load}: {trust_code}")
# Display spinner during the actual loading
with st.spinner(f"Loading {model_id_to_load} into memory..."):
pipeline_instance = pipeline(
"text-generation",
model=model_id_to_load,
trust_remote_code=trust_code,
device_map="auto",
torch_dtype=dtype,
)
# Handle pad_token
if pipeline_instance.tokenizer.pad_token_id is None:
pipeline_instance.tokenizer.pad_token_id = pipeline_instance.tokenizer.eos_token_id
if hasattr(pipeline_instance.model, 'config'):
pipeline_instance.model.config.pad_token_id = pipeline_instance.tokenizer.eos_token_id
logger.warning(f"Set pad_token_id to eos_token_id for {model_id_to_load}")
logger.info(f"LLM pipeline loaded successfully for {model_id_to_load}.")
st.session_state.current_model_pipeline = pipeline_instance
st.session_state.current_model_id = model_id_to_load
st.toast(f"Model {model_id_to_load} loaded!", icon="βœ…")
success = True
except ImportError as e:
logger.error(f"ImportError loading {model_id_to_load}: {e}. Missing dependency?", exc_info=True)
st.error(f"Load Error: Missing library for {model_id_to_load}? Check logs. Details: {e}")
except Exception as e:
logger.error(f"Failed to load {model_id_to_load}: {e}", exc_info=True)
st.error(f"Failed to load {model_id_to_load}. Error: {e}. Check resource limits (RAM/GPU) & logs.")
clear_gpu_memory() # Attempt to clean up if loading failed
st.session_state.current_model_id = "" # Ensure state reflects failure
finally:
return success # Return status
# --- User Agent Caching ---
@st.cache_resource
def get_user_agent():
# (Same as previous version)
logger.info("Initializing FakeUserAgent.")
try:
return UserAgent(fallback='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36')
except Exception as e:
logger.error(f"Failed to initialize FakeUserAgent: {e}", exc_info=True)
st.error(f"Could not initialize User Agent generator. Error: {e}")
return None
# --- Core Functions (Scraping, Prompt Building, Generation Logic) ---
# These functions (get_top_urls, scrape_page_content, clean_text, fetch_url_content,
# build_content_generation_prompt, build_internal_link_prompt, run_llm_generation)
# remain largely the same as the previous version, as they were already quite robust.
# Ensure `run_llm_generation` correctly uses the pipeline passed to it (which it did).
# --- (Include the definitions for the core functions here - unchanged from previous version) ---
@retry(wait_fixed=RETRY_WAIT_FIXED, stop_max_attempt_number=RETRY_STOP_MAX_ATTEMPT,
retry_on_exception=lambda e: isinstance(e, (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.HTTPError)))
def fetch_url_content(url, headers):
logger.info(f"Fetching {url} (Attempt {fetch_url_content.retry.attempt_number+1}/{RETRY_STOP_MAX_ATTEMPT})")
response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT)
response.raise_for_status()
if 'text/html' not in response.headers.get('Content-Type', ''):
logger.warning(f"Skipping URL {url} - Not HTML")
return None
if len(response.content) > 10 * 1024 * 1024: # 10 MB limit
logger.warning(f"Skipping URL {url} - Content too large")
return None
return response
def clean_text(text):
text = re.sub(r'\s{2,}', ' ', text)
text = re.sub(r'\n+', '\n', text)
lines = text.split('\n')
cleaned_lines = []
min_line_length = 20
min_words_per_line = 3
skip_phrases = [
'copyright Β©', 'all rights reserved', 'privacy policy', 'terms of use', 'terms and conditions',
'cookie policy', 'subscribe', 'sign up', 'log in', 'advertisement', 'share this', 'related posts',
'leave a reply', 'comment', 'posted on', 'by author', 'tags:', 'categories:', 'follow us', 'read more',
'click here', 'learn more', 'next article', 'previous article', 'you may also like', 'related topics'
]
for line in lines:
stripped_line = line.strip()
lower_line = stripped_line.lower()
if len(stripped_line) >= min_line_length and \
len(stripped_line.split()) >= min_words_per_line and \
not any(phrase in lower_line for phrase in skip_phrases):
cleaned_lines.append(stripped_line)
text = '\n'.join(cleaned_lines)
return text.strip()
def scrape_page_content(url, user_agent, scrape_status_ui):
if not user_agent: logger.error("User Agent missing."); return ""
headers = {
'User-Agent': user_agent.random,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5', 'Referer': 'https://www.google.com/',
'DNT': '1', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1'
}
try:
response = fetch_url_content(url, headers)
if response is None: scrape_status_ui.warning(f"⚠️ Skip/Fail fetch: {url}", icon="πŸ•ΈοΈ"); return ""
soup = BeautifulSoup(response.content, 'lxml')
tags_to_remove = ["script", "style", "nav", "footer", "aside", "form", "header", "noscript", "button", "input", "select", "textarea", "figure", "figcaption", "iframe", "svg", "path", "meta", "link"]
for element in soup(tags_to_remove): element.decompose()
for comment in soup.find_all(string=lambda text: isinstance(text, Comment)): comment.extract()
main_content = (soup.find('main') or soup.find('article') or soup.find(role='main') or
soup.find('div', class_=re.compile(r'(content|main|body|post|entry|article)', re.I)) or
soup.find('div', id=re.compile(r'(content|main|body|post|entry|article)', re.I)))
target_soup = main_content if main_content else soup.body
if not target_soup: logger.warning(f"No body/main: {url}"); scrape_status_ui.warning(f"⚠️ No body/main: {url}", icon="πŸ•ΈοΈ"); return ""
texts = target_soup.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'td', 'th', 'blockquote', 'span'])
content_parts = []
for elem in texts:
if elem.find_parent(tags_to_remove): continue
elem_text = elem.get_text(separator=' ', strip=True)
if len(elem_text) > 10 and len(elem_text.split()) > 1:
if elem.name in ['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'blockquote', 'tr', 'div']: # Added div for structure
content_parts.append(elem_text + "\n")
else: content_parts.append(elem_text + " ")
content = "".join(content_parts)
cleaned_content = clean_text(content)
if len(cleaned_content) < 150: logger.warning(f"Low content ({len(cleaned_content)} chars): {url}"); scrape_status_ui.warning(f"⚠️ Low content: {url}", icon="πŸ•ΈοΈ")
else: logger.info(f"Scraped {len(cleaned_content)} chars: {url}"); scrape_status_ui.success(f"βœ… Scraped: {url} ({len(cleaned_content)} chars)", icon="πŸ•ΈοΈ")
time.sleep(0.6)
return cleaned_content
except requests.exceptions.RequestException as e: logger.warning(f"Final scrape fail: {url}. Err: {e}"); scrape_status_ui.error(f"❌ Fail scrape: {url} ({e})", icon="πŸ•ΈοΈ"); return ""
except Exception as e: logger.error(f"Unexpected scrape error: {url}: {e}", exc_info=True); scrape_status_ui.error(f"❌ Error scraping: {url} (Logs)", icon="πŸ•ΈοΈ"); return ""
def get_top_urls(keyword, num_results):
logger.info(f"Fetching top {num_results} URLs for keyword: '{keyword}'")
try:
urls = list(search(keyword, num_results=num_results, sleep_interval=2.5, lang="en", timeout=15))
logger.info(f"Found URLs: {urls}")
if not urls: st.warning(f"⚠️ No Google search results found for '{keyword}'."); return []
return urls
except Exception as e:
error_message = str(e); logger.error(f"GSearch Error: {error_message}", exc_info=True)
if "429" in error_message: st.error(f"❌ Google search blocked (429). WAIT before retrying.")
elif "timed out" in error_message: st.error(f"❌ Google search timed out.")
else: st.error(f"❌ GSearch Error: {error_message[:100]}...")
return []
def build_content_generation_prompt(keyword, competitor_texts, tone, audience, model_id):
logger.info(f"Build content gen prompt. Tone: {tone}, Audience: {audience}. Comp length: {len(competitor_texts)}")
if len(competitor_texts) > MAX_COMPETITOR_TEXT_LENGTH:
competitor_summary = competitor_texts[:MAX_COMPETITOR_TEXT_LENGTH] + "... [Truncated]"
logger.warning(f"Comp text truncated.")
else: competitor_summary = competitor_texts
system_prompt = f"""You are an expert SEO Content Strategist & world-class Copywriter. Task: Analyze competitor text & generate a significantly superior, comprehensive, user-first article for keyword '{keyword}', targeting '{audience}' audience with '{tone}' tone. Focus on quality, depth, clarity, fulfilling user intent better than competition."""
user_prompt = f"""**Keyword:** "{keyword}"
**Audience:** {audience}
**Tone:** {tone}
**Objective:** Generate exceptional, SEO-optimized article for "{keyword}" designed to outperform top content via superior value, insights, UX.
**Competitor Analysis Context (Analyze for topics, depth, strengths, WEAKNESSES/GAPS):**
--- BEGIN COMPETITOR ---
{competitor_summary}
--- END COMPETITOR ---
**Content Gen Instructions:**
1. **Value & Depth:** Be demonstrably better. Deeper, clearer, actionable advice, unique perspectives/data, fill gaps. Address user intent exhaustively.
2. **User-First & Humanized:** Write for '{audience}' in '{tone}'. Clear, concise, short paras, varied sentences, engaging Qs. Logical flow, readable.
3. **Structure (Strict Markdown):** Compelling H2 Title. Engaging Intro (50-150 words): Hook, purpose/value, outline. Logical Sections (H2)/Sub-sections (H3): Descriptive, keyword-aware headings. Readability: Bullets (`* `), Numbered lists (`1. `), **Bold** (strategic). Comprehensive Body: Expand beyond competitors. Strong Conclusion: Summarize takeaways, final insight/CTA.
4. **SEO (Natural):** Weave "{keyword}" & LSI terms into title, headings, intro, body, conclusion. Prioritize relevance/clarity over density. NO keyword stuffing.
5. **Originality & Credibility:** 100% unique. Use comp text ONLY for analysis. NO plagiarism. Factual accuracy.
6. **Negative Constraints:** DO NOT: Rehash competitors; use preambles/sign-offs; use excessive jargon (unless 'Experts'); write long paragraphs; stuff keywords; invent facts.
**Output:** ONLY the Markdown article, starting with H2 title."""
messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}]
logger.info(f"Content prompt done for {model_id}.")
return messages
def build_internal_link_prompt(generated_content, keyword, website_url):
logger.info(f"Build internal link prompt for URL: {website_url}")
system_prompt = "You are an SEO assistant specialized in identifying internal linking opportunities."
user_prompt = f"""**Website Base URL:** {website_url}
**Main Topic of Article:** "{keyword}"
**Task:** Review the article below. Identify 3-5 phrases/sentences for internal links relevant to {website_url}.
**For each opportunity, provide:**
1. Exact anchor text phrase/sentence from article.
2. Brief description of the *type* of relevant content needed (e.g., "detailed guide on [sub-topic]", "service page for [service]").
**IMPORTANT:** Do NOT invent URLs. Describe the *type* of page. Choose natural anchor text. Focus on value. Format as Markdown numbered list.
**Article Content (Analyze first ~8000 chars):**
--- BEGIN ARTICLE ---
{generated_content[:8000]}
--- END ARTICLE ---"""
messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}]
return messages
def run_llm_generation(pipe, messages, max_tokens):
if pipe is None: st.error("❌ LLM Pipeline missing."); return None
model_id = pipe.model.name_or_path
logger.info(f"Running generation: {model_id}. Max tokens: {max_tokens}.")
start_time = time.time()
try:
gen_args = {"max_new_tokens": max_tokens, "temperature": 0.7, "top_p": 0.95, "top_k": 40,
"do_sample": True, "pad_token_id": pipe.tokenizer.eos_token_id, "eos_token_id": pipe.tokenizer.eos_token_id}
logger.info(f"Gen args: {gen_args}")
results = pipe(messages, **gen_args)
# --- Robust Extraction ---
assistant_response = None
if results and results[0] and 'generated_text' in results[0]:
output_data = results[0]['generated_text']
if isinstance(output_data, list): assistant_message = next((msg['content'] for msg in reversed(output_data) if msg['role'] == 'assistant'), None); assistant_response = assistant_message
elif isinstance(output_data, str):
last_prompt_content = messages[-1]['content']
last_prompt_index = output_data.rfind(last_prompt_content)
if last_prompt_index != -1: potential_response = output_data[last_prompt_index + len(last_prompt_content):].strip()
else: potential_response = output_data
assistant_response = re.sub(r"^(assistant|ASSISTANT|</s>|<\|im_end\|>|<\|assistant\|>)\s*[:\n]*", "", potential_response, flags=re.IGNORECASE | re.DOTALL).strip()
else: logger.error(f"Unexpected output format: {type(output_data)}")
else: logger.error(f"Unexpected LLM output structure: {results}")
# --- Validation ---
if assistant_response:
duration = time.time() - start_time; logger.info(f"Gen success ({model_id}) {duration:.2f}s. Len: {len(assistant_response)}.")
assistant_response = re.sub(r"^```markdown\n", "", assistant_response).strip(); assistant_response = re.sub(r"\n```$", "", assistant_response).strip()
if len(assistant_response) < 30: logger.warning(f"Gen output very short ({len(assistant_response)})."); st.warning("⚠️ Gen output very short.")
return assistant_response
else: logger.error(f"Failed parse assistant response. Output: {results}"); st.error("❌ Failed parse LLM response. Check logs."); return None
except torch.cuda.OutOfMemoryError: logger.error(f"OOM Error ({model_id})!", exc_info=True); st.error(f"❌ OOM Error ({model_id}). Try smaller model/less tokens/restart."); clear_gpu_memory(); return None
except Exception as e: logger.error(f"Unhandled gen error ({model_id}): {e}", exc_info=True); st.error(f"❌ Unexpected gen error: {e}"); return None
# --- Streamlit App UI ---
st.set_page_config(layout="wide", page_title="On-Demand SEO Content Gen")
# --- Sidebar ---
with st.sidebar:
st.header("βš™οΈ Configuration")
# Model Selection & Loading Area
st.subheader("1. Select & Load Model")
selected_model_key = st.selectbox(
"Choose Language Model:",
options=list(MODEL_OPTIONS.keys()),
index=list(MODEL_OPTIONS.keys()).index(DEFAULT_MODEL_KEY),
key="model_selector", # Key for potential state access
help="Choose AI model. Performance & resources vary. Load required."
)
selected_model_id = MODEL_OPTIONS[selected_model_key]
# Display current status and load button
load_button_placeholder = st.empty() # Placeholder for dynamic button text/state
model_status_placeholder = st.empty() # Placeholder for status message
if st.session_state.get('current_model_id') == selected_model_id and st.session_state.get('current_model_pipeline') is not None:
model_status_placeholder.success(f"βœ… Loaded: `{selected_model_id}`")
load_button_text = f"Switch from {selected_model_key}" # Or "Reload"
elif st.session_state.get('current_model_pipeline') is not None:
model_status_placeholder.warning(f"⚠️ Loaded: `{st.session_state.current_model_id}`\nSelected: `{selected_model_id}`")
load_button_text = f"Unload Current & Load {selected_model_key}"
else:
model_status_placeholder.info("ℹ️ No model loaded.")
load_button_text = f"Load Selected: {selected_model_key}"
if load_button_placeholder.button(load_button_text, key="load_model"):
load_model(selected_model_id)
# Rerun to update status placeholders immediately after load attempt
st.rerun()
st.markdown("---")
# Content Settings
st.subheader("2. Content Settings")
with st.expander("Adjust Content Parameters", expanded=False):
num_results = st.slider("Competitors to Analyze:", min_value=1, max_value=8, value=DEFAULT_NUM_RESULTS, step=1)
selected_tone = st.selectbox("Content Tone:", options=TONE_OPTIONS, index=TONE_OPTIONS.index("Engaging"))
selected_audience = st.selectbox("Target Audience:", options=AUDIENCE_OPTIONS, index=AUDIENCE_OPTIONS.index("General Audience"))
max_gen_tokens = st.number_input("Max Generation Tokens:", min_value=500, max_value=8192, value=DEFAULT_MAX_GENERATION_TOKENS, step=100)
# Internal Linking
st.subheader("3. Internal Linking (Optional)")
with st.expander("Configure Link Suggestions", expanded=False):
website_url = st.text_input("Your Website URL:", placeholder="https://www.example.com", value=st.session_state.get("last_website_url", ""), key="website_url_input")
# Update state immediately on change if needed, or just read before use
st.session_state.last_website_url = website_url
st.markdown("---")
st.header("ℹ️ App Info & Actions")
st.info(f"""
- **Status:** {'Model Loaded' if st.session_state.current_model_pipeline else 'No Model Loaded'}
- **Competitors:** Top {num_results}
- **Max Generation:** ~{max_gen_tokens} tokens
""")
st.warning("""
- **Load Model First:** Select a model and click 'Load' before generating.
- **Resource Use:** Models need significant RAM/GPU. Loading WILL fail if resources are insufficient.
- **Review Output:** AI provides drafts. ALWAYS review, edit, fact-check.
""")
if st.button("Clear Scraped/Generated Data", key="clear_data"):
reset_app_data()
# --- Main App Area ---
st.title("✨ On-Demand SEO Content Generator ✨")
st.markdown(f"Load your chosen AI model, then generate SEO-focused content.")
# User Input Area
st.subheader("Keyword & Generation")
keyword = st.text_input("Enter Primary Target Keyword:", placeholder="e.g., vertical hydroponics guide", value=st.session_state.get("last_keyword", ""), key="keyword_input")
# Disable button if model not loaded
generate_button_disabled = st.session_state.current_model_pipeline is None
generate_button_help = "Load a model from the sidebar first." if generate_button_disabled else "Analyze competitors and generate article."
analyze_button = st.button(
"Analyze Competitors & Generate Content",
type="primary",
key="generate_button",
disabled=generate_button_disabled,
help=generate_button_help
)
st.markdown("---")
# --- Main Workflow Triggered by Button ---
if analyze_button:
# Double check model is loaded (though button should be disabled)
if not st.session_state.current_model_pipeline:
st.error("❌ Cannot generate: No model loaded. Please use the sidebar.")
st.stop()
if not keyword:
st.warning("⚠️ Please enter a keyword.")
st.stop()
st.session_state.last_keyword = keyword # Store keyword for potential reuse
ua = get_user_agent() # Ensure user agent is ready
if not ua: st.error("❌ User Agent failed. Cannot scrape."); st.stop()
# Reset previous generation results for this run
st.session_state.generated_content = ""
st.session_state.internal_link_suggestions = ""
# --- Step 1: Scrape Competitors (with status updates) ---
# Check if scrape needed
if keyword != st.session_state.get('_internal_last_scrape_keyword', None) or not st.session_state.competitor_analysis_text:
logger.info(f"Scraping needed for '{keyword}'.")
st.session_state.competitor_analysis_text = "" # Clear old text
st.session_state.scraped_urls = []
st.session_state['_internal_last_scrape_keyword'] = "" # Reset marker until success
scrape_container = st.container()
with scrape_container:
st.info(f"πŸ•ΈοΈ Fetching URLs and Scraping Top {num_results} Competitors...")
progress_text = "Scraping progress..."
scrape_progress_bar = st.progress(0, text=progress_text)
status_area = st.container() # Use container for multiple status lines
urls = get_top_urls(keyword, num_results)
st.session_state.scraped_urls = urls
if urls:
all_texts = []
scraped_count = 0
for i, url in enumerate(urls):
with status_area: # Show status within the designated area
scrape_status_ui = st.empty() # Placeholder for single URL status
content = scrape_page_content(url, ua, scrape_status_ui)
if content:
all_texts.append(content)
scraped_count += 1
scrape_progress_bar.progress((i + 1) / len(urls), text=f"Processed URL {i+1}/{len(urls)}...")
time.sleep(0.1) # UI update breather
st.session_state.competitor_analysis_text = "\n\n --- ARTICLE SEPARATOR --- \n\n".join(all_texts)
st.session_state['_internal_last_scrape_keyword'] = keyword # Mark scrape success for this keyword
if st.session_state.competitor_analysis_text:
scrape_container.success(f"βœ… Scraped {scraped_count}/{len(urls)} pages. Analysis text: {len(st.session_state.competitor_analysis_text)} chars.")
else:
scrape_container.error("❌ Failed to scrape sufficient content. Cannot generate article.")
st.stop()
else:
scrape_container.error("❌ Could not retrieve competitor URLs. Cannot proceed.")
st.stop()
else:
st.success(f"βœ”οΈ Using previously scraped data for '{keyword}'. ({len(st.session_state.competitor_analysis_text)} chars).")
# --- Step 2: Generate Main Content ---
st.info(f"✍️ Generating Content with {st.session_state.current_model_id}...")
generation_status = st.status("Sending request to LLM...")
with generation_status:
st.write(f"**Tone:** {selected_tone}, **Audience:** {selected_audience}, **Max Tokens:** {max_gen_tokens}")
gen_prompt = build_content_generation_prompt(
keyword, st.session_state.competitor_analysis_text, selected_tone, selected_audience, st.session_state.current_model_id
)
generated_content = run_llm_generation(st.session_state.current_model_pipeline, gen_prompt, max_gen_tokens)
st.session_state.generated_content = generated_content
if generated_content:
generation_status.update(label="βœ… Content Generation Complete!", state="complete")
else:
generation_status.update(label="❌ Content Generation Failed.", state="error")
st.stop() # Stop if main content fails
# --- Display Outputs (Outside the button click conditional) ---
if st.session_state.generated_content:
st.markdown("---")
st.subheader("πŸ“ Generated SEO Content")
st.markdown(st.session_state.generated_content)
st.text_area("Copyable Markdown:", st.session_state.generated_content, height=400, key="generated_content_area_display")
# --- Internal Linking Section ---
if st.session_state.last_website_url: # Only show if URL was provided
st.markdown("---")
st.subheader("πŸ”— Internal Linking Suggestions")
if st.button("Suggest Internal Links", key="suggest_links_button_display"):
link_status = st.status(f"Analyzing content for link opportunities ({st.session_state.current_model_id})...")
with link_status:
st.write(f"Website context: {st.session_state.last_website_url}")
link_prompt = build_internal_link_prompt(st.session_state.generated_content, keyword, st.session_state.last_website_url)
link_suggestions = run_llm_generation(st.session_state.current_model_pipeline, link_prompt, max_tokens=500) # Use fewer tokens
st.session_state.internal_link_suggestions = link_suggestions
if link_suggestions: link_status.update(label="βœ… Link suggestions generated!", state="complete")
else: link_status.update(label="❌ Failed to generate link suggestions.", state="error")
# Display suggestions if they exist in state
if st.session_state.internal_link_suggestions:
st.markdown(st.session_state.internal_link_suggestions)
st.info("ℹ️ AI suggestions only. Verify relevance and find actual URLs on your site.")
else:
st.markdown("---")
st.info("Provide your website URL in the sidebar to enable internal link suggestions after generating content.")