Spaces:
Sleeping
Sleeping
| from shiny import reactive, render, ui | |
| import os, sys | |
| from bs4 import BeautifulSoup | |
| from pytrends.request import TrendReq | |
| from playwright.async_api import async_playwright | |
| import requests | |
| import re, ast | |
| import time | |
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "code"))) | |
| from llm_connect import get_response | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| SHOPIFY_STORE = "ultima-supply.myshopify.com" | |
| SHOPIFY_TOKEN = os.getenv("SHOPIFY_TOKEN") | |
| SHOPIFY_API_VERSION = "2024-04" | |
| BLOG_ID = "73667707064" | |
| # === Async JS-rendered scraping === | |
| async def scrape_div_content_from_url(url: str) -> str: | |
| try: | |
| async with async_playwright() as p: | |
| browser = await p.chromium.launch(headless=True) | |
| page = await browser.new_page() | |
| await page.goto(url, wait_until="networkidle") | |
| html = await page.content() | |
| await browser.close() | |
| soup = BeautifulSoup(html, "html.parser") | |
| divs = soup.find_all("div", class_="article-body") | |
| if not divs: | |
| print("[WARN] No <div class='article-body'> found.") | |
| return "" | |
| texts = [div.get_text(separator=" ", strip=True) for div in divs] | |
| return "\n\n".join(texts) | |
| except Exception as e: | |
| print(f"[ERROR] Failed to render or scrape: {e}") | |
| return "" | |
| # === Step 6: Semantic Validation (The "Double Check") === | |
| # ========================================== | |
| # 1. HELPER: Semantic Keyword Validation (Fixed) | |
| # ========================================== | |
| def filter_irrelevant_keywords(keywords: list, article_text: str) -> list: | |
| print(f"[INFO] Validating {len(keywords)} keywords for relevance...") | |
| validation_prompt = ( | |
| f"Role: You are an elite SEO Editor.\n" | |
| f"Task: Review the list of keywords below against the provided Article Content.\n" | |
| f"Action: REMOVE any keywords that are irrelevant, hallucinatory, or completely off-topic.\n" | |
| f"Criteria: Keep specific, long-tail, and topically related keywords. Remove generic terms.\n\n" | |
| f"--- KEYWORDS TO REVIEW ---\n" | |
| f"{', '.join(keywords)}\n\n" | |
| f"--- ARTICLE CONTEXT ---\n" | |
| f"{article_text[:1500]}\n\n" | |
| f"OUTPUT FORMAT:\n" | |
| f"Return the CLEANED list as a simple BULLET LIST (one per line).\n" | |
| f"Example:\n- keyword one\n- keyword two" | |
| ) | |
| try: | |
| validated_raw = get_response( | |
| input=validation_prompt, | |
| template=lambda x: x.strip(), | |
| llm="gemini", | |
| md=False, | |
| temperature=0.1, | |
| max_tokens=1000 | |
| ) | |
| # Robust Line-by-Line Parsing (No more SyntaxErrors) | |
| clean_list = [] | |
| for line in validated_raw.split("\n"): | |
| # Strip bullets (*, -) and surrounding whitespace | |
| clean_item = line.lstrip("*- ").strip().lower() | |
| # Basic sanity checks to avoid empty lines or conversational filler | |
| if clean_item and len(clean_item) > 2 and "here are" not in clean_item: | |
| clean_list.append(clean_item) | |
| dropped_count = len(keywords) - len(clean_list) | |
| if dropped_count > 0: | |
| print(f"[INFO] Validation removed {dropped_count} irrelevant keywords.") | |
| return clean_list | |
| except Exception as e: | |
| print(f"[WARN] Validation failed: {e}. Returning originals.") | |
| return keywords | |
| # === Async keyword + scrape + fallback logic === | |
| async def get_keywords_and_content(url: str, top_n=5, llm_n=25): | |
| scraped_text = await scrape_div_content_from_url(url) | |
| if not scraped_text: | |
| print("[ERROR] No scraped content. Cannot proceed.") | |
| return [], "" | |
| # === Step 1: Extract condensed topic keywords === | |
| try: | |
| condensed_prompt = ( | |
| "You are an SEO expert. Identify exactly 5 distinct main topics from the text below.\n" | |
| "Format: Return a BULLET LIST only.\n" | |
| "Rules: NO intro text. NO numbering. NO explanations.\n" | |
| f"TEXT TO ANALYZE:\n{scraped_text[:3000]}" | |
| ) | |
| condensed_topic_raw = get_response( | |
| input=condensed_prompt, | |
| template=lambda x: x.strip(), | |
| llm="gemini", | |
| md=False, | |
| temperature=0.3, # Lower temp = less chatty | |
| max_tokens=200 | |
| ) | |
| # Cleaner parsing logic | |
| condensed_topic = [] | |
| for line in condensed_topic_raw.split("\n"): | |
| clean = line.replace("*", "").replace("-", "").strip().lower() | |
| if clean and "here are" not in clean: | |
| condensed_topic.append(clean) | |
| if len(condensed_topic) < 2: | |
| condensed_topic = [k.strip() for k in condensed_topic_raw.split(",") if k.strip()] | |
| except Exception as e: | |
| condensed_topic = ["trading cards"] | |
| # === Step 2: PyTrends Logic (Fixed) === | |
| print(f"[INFO] Starting PyTrends for topics: {condensed_topic[:3]}") | |
| all_suggestions = set() | |
| # FIX: Initialize with retries=0 to bypass the 'method_whitelist' crash | |
| # We will handle retries manually in the loop below. | |
| try: | |
| pytrends = TrendReq(hl="en-US", tz=360, timeout=10, retries=0) | |
| except Exception as e: | |
| print(f"[ERROR] Could not initialize PyTrends: {e}") | |
| pytrends = None | |
| if pytrends: | |
| for topic in condensed_topic[:3]: | |
| print(f"[INFO] Querying PyTrends for: '{topic}'...") | |
| # Manual Retry Logic (since we disabled the internal one) | |
| for attempt in range(3): | |
| try: | |
| # Sleep to prevent 429 Too Many Requests | |
| time.sleep(2) | |
| suggestions = pytrends.suggestions(keyword=topic) | |
| if suggestions: | |
| titles = [s["title"].lower().strip() for s in suggestions] | |
| print(f" -> Found {len(titles)} suggestions: {titles}") | |
| all_suggestions.update(titles) | |
| break # Success, stop retrying this keyword | |
| else: | |
| print(" -> No suggestions found.") | |
| break # No data, stop retrying | |
| except Exception as inner_e: | |
| # If it's a 429 error, wait longer and try again | |
| if "429" in str(inner_e): | |
| print(f" -> [WARN] Rate limited on '{topic}'. Waiting 5s...") | |
| time.sleep(5) | |
| else: | |
| print(f" -> [WARN] Failed for '{topic}' (Attempt {attempt+1}/3): {inner_e}") | |
| if attempt == 2: # Last attempt failed | |
| print(" -> Giving up on this keyword.") | |
| # Convert set to list | |
| combined_keywords = list(all_suggestions) | |
| if not combined_keywords: | |
| print("[INFO] PyTrends returned 0 results. Switching to LLM Fallback.") | |
| else: | |
| print(f"[INFO] PyTrends successful. Total keywords: {len(combined_keywords)}") | |
| # === Step 3: Fallback / Filtering === | |
| # If PyTrends gave results, we trust them. If not, we use LLM. | |
| combined_keywords = list(all_suggestions) | |
| # === Step 4: Padding (The Fix for "Okay here are...") === | |
| if len(combined_keywords) < 30: | |
| needed = 35 - len(combined_keywords) | |
| pad_prompt = ( | |
| f"Generate exactly {needed} NEW, DISTINCT long-tail SEO keywords based on this text.\n" | |
| f"STRICT OUTPUT RULES:\n" | |
| f"1. Return ONLY a raw bullet list (one keyword per line).\n" | |
| f"2. DO NOT write 'Here are the keywords'.\n" | |
| f"3. DO NOT add parentheses or explanations like '(best for beginners)'.\n" | |
| f"4. Just the keywords.\n\n" | |
| f"Context:\n{scraped_text[:2500]}" | |
| ) | |
| pad_raw = get_response( | |
| input=pad_prompt, | |
| template=lambda x: x.strip(), | |
| llm="gemini", | |
| md=False, | |
| temperature=0.5, # Lower temp prevents hallucinated explanations | |
| max_tokens=1000 | |
| ) | |
| pad_keywords = [] | |
| for line in pad_raw.split("\n"): | |
| # remove bullets | |
| clean_line = line.strip().lstrip("*-+1234567890. ").strip() | |
| # remove parenthetical explanations using regex | |
| # e.g., "op13 cards (rare)" -> "op13 cards" | |
| clean_line = re.sub(r"\(.*?\)", "", clean_line).strip() | |
| # Filter out chatty lines | |
| if (len(clean_line) > 3 | |
| and "here are" not in clean_line.lower() | |
| and "formatted as" not in clean_line.lower() | |
| and ":" not in clean_line): | |
| pad_keywords.append(clean_line.lower()) | |
| combined_keywords = list(set(combined_keywords + pad_keywords)) | |
| # Double check relevance before returning | |
| if len(combined_keywords) > 10: | |
| validated_keywords = filter_irrelevant_keywords(combined_keywords, scraped_text) | |
| else: | |
| validated_keywords = combined_keywords | |
| # Fallback if validation was too aggressive | |
| if len(validated_keywords) < 10: | |
| validated_keywords = combined_keywords | |
| print(f"[INFO] Final validated count: {len(validated_keywords)}") | |
| return validated_keywords[:30], scraped_text | |
| # === Shopify publisher === | |
| def publish_blog_post(title: str, html_body: str, blog_id: str = BLOG_ID): | |
| url = f"https://{SHOPIFY_STORE}/admin/api/{SHOPIFY_API_VERSION}/blogs/{blog_id}/articles.json" | |
| headers = { | |
| "X-Shopify-Access-Token": SHOPIFY_TOKEN, | |
| "Content-Type": "application/json" | |
| } | |
| data = { | |
| "article": { | |
| "author": "Ultima Supply Writer: (Bingus)", | |
| "title": title, | |
| "body_html": html_body, | |
| } | |
| } | |
| response = requests.post(url, json=data, headers=headers) | |
| if response.status_code == 201: | |
| return True, response.json() | |
| else: | |
| return False, response.text | |
| # === SHINY SERVER === | |
| def server(input, output, session): | |
| related_keywords = reactive.Value([]) | |
| generated_blog = reactive.Value(("", "")) # (title, html_content) | |
| async def blog_result(): | |
| url = input.url() | |
| if not url: | |
| return ui.HTML("<p><strong>⚠️ Please enter a URL.</strong></p>") | |
| keywords, scraped = await get_keywords_and_content(url) | |
| time.sleep(3) | |
| related_keywords.set(keywords) | |
| keyword_str = ", ".join(keywords) | |
| # Title generation from scraped text | |
| # Title generation with stricter prompt | |
| infer_topic_prompt = ( | |
| f"Write ONE catchy, click-worthy H1 Blog Title for the content below.\n" | |
| f"STRICT RULES:\n" | |
| f"- Return ONLY the title string.\n" | |
| f"- Do NOT write 'Title:' or 'Here is a title'.\n" | |
| f"- Do NOT use quotation marks.\n" | |
| f"- Max 15 words.\n\n" | |
| f"Content:\n{scraped[:2000]}" | |
| ) | |
| seo_title_raw = get_response( | |
| input=infer_topic_prompt, | |
| template=lambda x: x.strip(), | |
| llm="gemini", | |
| md=False, | |
| temperature=0.7, | |
| max_tokens=60 | |
| ) | |
| # Cleanup: Remove quotes and "Title:" prefix if the LLM ignores rules | |
| seo_title = seo_title_raw.replace('"', '').replace("Title:", "").strip() | |
| # If it gave multiple options (detected by newlines), take the first one | |
| if "\n" in seo_title: | |
| seo_title = seo_title.split("\n")[0].strip() | |
| # Blog generation with injected SEO | |
| prompt = ( | |
| f"You are a content writer for a collectibles brand called 'Ultima Supply'.\n" | |
| f"Given the following scraped content:\n\n{scraped}\n\n" | |
| f"Rewrite this in an engaging, original, and heavily detailed SEO-optimized blog post.\n" | |
| f"Naturally and organically integrate the following SEO keywords throughout the content:\n{keyword_str}\n\n" | |
| f"⚠️ STRICT FORMATTING RULES (must be followed exactly):\n" | |
| f"- Use <h1> for the blog title\n" | |
| f"- Use <h2> for section headers\n" | |
| f"- Use <p> for all paragraphs\n" | |
| f"- Avoid using all caps\n" | |
| f"- NO Markdown, NO triple backticks, NO code blocks, NO formatting fences\n" | |
| f"- DO NOT include any hyperlinks, URLs, web addresses, or references to any external sites or brands — no exceptions\n" | |
| f"- DO NOT include any <a> tags except for the final line below\n\n" | |
| f"✅ FINAL LINE ONLY:\n" | |
| f"Add this exact call-to-action at the very end of the post inside its own <p> tag:\n" | |
| f"Visit <a href='https://ultima-supply.myshopify.com'>Ultima Supply</a> to explore more collectibles." | |
| ) | |
| blog_html = get_response( | |
| input=prompt, | |
| template=lambda x: x.strip(), | |
| llm="gemini", | |
| md=False, | |
| temperature=0.9, | |
| max_tokens=5000 | |
| ) | |
| blog_html = re.sub(r"```[a-zA-Z]*\n?", "", blog_html).strip() | |
| blog_html = blog_html.replace("```", "").strip() | |
| generated_blog.set((seo_title, blog_html)) | |
| return ui.HTML( | |
| f"<p><strong>✅ Blog generated with title:</strong> {seo_title}</p>" | |
| f"<p>Click 'Post to Shopify' to publish.</p>{blog_html}" | |
| ) | |
| def keywords_used(): | |
| kws = related_keywords() | |
| if not kws: | |
| return ui.HTML("<p><strong>No SEO keywords retrieved yet.</strong></p>") | |
| return ui.HTML( | |
| f"<p><strong>✅ SEO Keywords Injected ({len(kws)}):</strong></p><ul>" | |
| + "".join(f"<li>{kw}</li>" for kw in kws) + | |
| "</ul>" | |
| ) | |
| def post_to_shopify(): | |
| seo_title, html = generated_blog() | |
| if not html: | |
| ui.notification_show("⚠️ No blog generated yet.", type="warning") | |
| return | |
| success, response = publish_blog_post(title=seo_title, html_body=html) | |
| if success: | |
| ui.notification_show("✅ Blog posted to Shopify successfully!", type="message") | |
| else: | |
| ui.notification_show(f"❌ Failed to publish: {response}", type="error") |