Spaces:
Sleeping
Sleeping
| from shiny import reactive, render, ui | |
| import os, sys | |
| from bs4 import BeautifulSoup | |
| from pytrends.request import TrendReq | |
| from playwright.async_api import async_playwright | |
| import requests | |
| import re, ast | |
| import time | |
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "code"))) | |
| from llm_connect import get_response | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| SHOPIFY_STORE = "ultima-supply.myshopify.com" | |
| SHOPIFY_TOKEN = os.getenv("SHOPIFY_TOKEN") | |
| SHOPIFY_API_VERSION = "2024-04" | |
| BLOG_ID = "73667707064" | |
| # === Async JS-rendered scraping === | |
| async def scrape_div_content_from_url(url: str) -> str: | |
| try: | |
| async with async_playwright() as p: | |
| browser = await p.chromium.launch(headless=True) | |
| page = await browser.new_page() | |
| await page.goto(url, wait_until="networkidle") | |
| html = await page.content() | |
| await browser.close() | |
| soup = BeautifulSoup(html, "html.parser") | |
| divs = soup.find_all("div", class_="article-body") | |
| if not divs: | |
| print("[WARN] No <div class='article-body'> found.") | |
| return "" | |
| texts = [div.get_text(separator=" ", strip=True) for div in divs] | |
| return "\n\n".join(texts) | |
| except Exception as e: | |
| print(f"[ERROR] Failed to render or scrape: {e}") | |
| return "" | |
| # === Async keyword + scrape + fallback logic === | |
| async def get_keywords_and_content(url: str, top_n=5, llm_n=25): | |
| scraped_text = await scrape_div_content_from_url(url) | |
| if not scraped_text: | |
| print("[ERROR] No scraped content. Cannot proceed.") | |
| return [], "" | |
| # === Step 1: Extract condensed topic keywords === | |
| try: | |
| condensed_prompt = ( | |
| "From the content below, extract 5 to 7 mid-specific Google search phrases that reflect real user intent. " | |
| "They should describe product types, use cases, or collector topics — not brand names alone. " | |
| "Avoid single-word topics and overly broad terms like 'pokemon'. Each phrase should be 2–5 words, lowercase, and ASCII only.\n\n" | |
| "You MUST return ONLY a valid Python list of strings. Do not use bullet points, newlines, or any explanation. " | |
| "Your response must look exactly like this format:\n" | |
| "['phrase one', 'phrase two', 'phrase three']\n\n" | |
| f"Content:\n{scraped_text}" | |
| ) | |
| condensed_topic_raw = get_response( | |
| input=condensed_prompt, | |
| template=lambda x: x.strip(), | |
| llm="gemini", | |
| md=False, | |
| temperature=0.6, | |
| max_tokens=100, | |
| model_name="gemini-2.0-flash-lite" | |
| ) | |
| print(condensed_topic_raw) | |
| match = re.search(r"\[.*?\]", condensed_topic_raw, re.DOTALL) | |
| condensed_topic = ast.literal_eval(match.group(0)) if match else [] | |
| if not condensed_topic: | |
| condensed_topic = ["trading cards"] | |
| print(f"[INFO] Condensed topic keywords: {condensed_topic}") | |
| except Exception as e: | |
| print(f"[WARN] Could not infer topics: {e}") | |
| condensed_topic = ["trading cards"] | |
| # === Step 2: Pull suggestions from PyTrends === | |
| all_suggestions = set() | |
| try: | |
| pytrends = TrendReq(hl="en-US", tz=360, timeout=10) | |
| for topic in condensed_topic: | |
| time.sleep(5) | |
| suggestions = pytrends.suggestions(keyword=topic) | |
| if suggestions: | |
| titles = [s["title"] for s in suggestions] | |
| all_suggestions.update(titles) | |
| print(f"[INFO] Suggestions for '{topic}': {titles[:3]}") | |
| except Exception as e: | |
| print(f"[WARN] PyTrends suggestions failed: {e}") | |
| all_suggestions = list(all_suggestions) | |
| # === Step 3: Let Gemini filter suggestions for relevance === | |
| filtered_keywords = [] | |
| if all_suggestions: | |
| filter_prompt = ( | |
| f"The following article was scraped:\n\n{scraped_text[:1500]}\n\n" | |
| f"Here is a list of keyword suggestions:\n{all_suggestions}\n\n" | |
| "Return only the keywords that are clearly relevant to the article topic. " | |
| "Return a valid Python list of strings only. No explanation, bullets, or formatting." | |
| ) | |
| raw_filtered = get_response( | |
| input=filter_prompt, | |
| template=lambda x: x.strip(), | |
| llm="gemini", | |
| md=False, | |
| temperature=0.3, | |
| max_tokens=200 | |
| ) | |
| match = re.search(r"\[.*?\]", raw_filtered) | |
| if match: | |
| try: | |
| filtered_keywords = ast.literal_eval(match.group(0)) | |
| except: | |
| filtered_keywords = [] | |
| # === Step 4: Fallback to Gemini keyword generation if needed === | |
| if not filtered_keywords: | |
| fallback_prompt = ( | |
| f"You are an SEO expert. Generate {llm_n} niche-relevant SEO keywords " | |
| f"based on this content:\n\n{scraped_text}\n\n" | |
| "Return a comma-separated list of lowercase 2–5 word search phrases. No formatting." | |
| ) | |
| fallback_keywords_raw = get_response( | |
| input=fallback_prompt, | |
| template=lambda x: x.strip(), | |
| llm="gemini", | |
| md=False, | |
| temperature=0.7, | |
| max_tokens=400 | |
| ) | |
| filtered_keywords = [kw.strip() for kw in fallback_keywords_raw.split(",") if kw.strip()] | |
| print(f"[INFO] Fallback keywords used: {filtered_keywords[:top_n]}") | |
| # === Step 5: Enforce minimum of 30 keywords === | |
| combined_keywords = list(dict.fromkeys(filtered_keywords)) # remove duplicates | |
| if len(combined_keywords) < 30: | |
| needed = 30 - len(combined_keywords) | |
| print(f"[INFO] Need {needed} more keywords to reach 30. Using Gemini to pad.") | |
| pad_prompt = ( | |
| f"The following article content is missing SEO keyword coverage:\n\n" | |
| f"{scraped_text}\n\n" | |
| f"Generate exactly {needed} additional SEO keyword phrases. " | |
| "Each keyword must be:\n" | |
| "- 2 to 5 words long\n" | |
| "- lowercase only\n" | |
| "- written in ASCII (no symbols or accents)\n" | |
| "- clearly relevant to the article\n" | |
| "- not overlapping with any common generic terms like 'pokemon'\n\n" | |
| "You MUST return a valid Python list of strings. DO NOT include any explanation, extra text, markdown, or formatting.\n" | |
| "Format example:\n" | |
| "['keyword one', 'keyword two', 'keyword three']" | |
| ) | |
| pad_raw = get_response( | |
| input=pad_prompt, | |
| template=lambda x: x.strip(), | |
| llm="gemini", | |
| md=False, | |
| temperature=0.7, | |
| max_tokens=200 | |
| ) | |
| pad_keywords = [] | |
| pad_match = re.search(r"\[[^\]]+\]", pad_raw) # greedy, non-linebreaking | |
| if pad_match: | |
| try: | |
| pad_keywords = ast.literal_eval(pad_match.group(0)) | |
| except Exception as e: | |
| print(f"[WARN] ast.literal_eval failed: {e}") | |
| pad_keywords = [] | |
| combined_keywords = list(dict.fromkeys(combined_keywords + pad_keywords)) | |
| print(f"[INFO] Padded {len(pad_keywords)} keywords:", pad_keywords) | |
| return combined_keywords[:30], scraped_text | |
| # === Shopify publisher === | |
| def publish_blog_post(title: str, html_body: str, blog_id: str = BLOG_ID): | |
| url = f"https://{SHOPIFY_STORE}/admin/api/{SHOPIFY_API_VERSION}/blogs/{blog_id}/articles.json" | |
| headers = { | |
| "X-Shopify-Access-Token": SHOPIFY_TOKEN, | |
| "Content-Type": "application/json" | |
| } | |
| data = { | |
| "article": { | |
| "title": title, | |
| "body_html": html_body | |
| } | |
| } | |
| response = requests.post(url, json=data, headers=headers) | |
| if response.status_code == 201: | |
| return True, response.json() | |
| else: | |
| return False, response.text | |
| # === SHINY SERVER === | |
| def server(input, output, session): | |
| related_keywords = reactive.Value([]) | |
| generated_blog = reactive.Value(("", "")) # (title, html_content) | |
| async def blog_result(): | |
| url = input.url() | |
| if not url: | |
| return ui.HTML("<p><strong>⚠️ Please enter a URL.</strong></p>") | |
| keywords, scraped = await get_keywords_and_content(url) | |
| related_keywords.set(keywords) | |
| keyword_str = ", ".join(keywords) | |
| # Title generation from scraped text | |
| infer_topic_prompt = ( | |
| f"Based on the following article content:\n\n{scraped[:2000]}\n\n" | |
| f"Return a short, descriptive blog post title (max 70 characters)." | |
| f"Return ONLY the TITLE" | |
| ) | |
| seo_title = get_response( | |
| input=infer_topic_prompt, | |
| template=lambda x: x.strip().replace('"', ''), | |
| llm="gemini", | |
| md=False, | |
| temperature=0.5, | |
| max_tokens=20 | |
| ) | |
| # Blog generation with injected SEO | |
| prompt = ( | |
| f"You are a content writer for a collectibles brand called 'Ultima Supply'.\n" | |
| f"Given the following scraped content:\n\n{scraped}\n\n" | |
| f"Adapt this into an engaging, original, and heavily detailed SEO-optimized blog post.\n" | |
| f"Inject the following SEO keywords naturally and organically throughout the content:\n{keyword_str}\n\n" | |
| f"Use proper HTML structure: <h1> for the title, <h2> for section headers, and <p> for all paragraphs.\n" | |
| f"Do NOT include any markdown, code blocks, or triple backticks. Do NOT use ```html or any formatting fences.\n" | |
| f"Just return the raw HTML.\n\n" | |
| f"DO NOT include any hyperlinks or images inside the body of the blog post.\n" | |
| f"At the very end, add a single call-to-action in a new <p> tag:\n" | |
| f"Visit <a href='https://ultima-supply.myshopify.com'>Ultima Supply</a> to explore more collectibles." | |
| ) | |
| blog_html = get_response( | |
| input=prompt, | |
| template=lambda x: x.strip(), | |
| llm="gemini", | |
| md=False, | |
| temperature=0.9, | |
| max_tokens=5000 | |
| ) | |
| blog_html = re.sub(r"```[a-zA-Z]*\n?", "", blog_html).strip() | |
| blog_html = blog_html.replace("```", "").strip() | |
| generated_blog.set((seo_title, blog_html)) | |
| return ui.HTML( | |
| f"<p><strong>✅ Blog generated with title:</strong> {seo_title}</p>" | |
| f"<p>Click 'Post to Shopify' to publish.</p>{blog_html}" | |
| ) | |
| def keywords_used(): | |
| kws = related_keywords() | |
| if not kws: | |
| return ui.HTML("<p><strong>No SEO keywords retrieved yet.</strong></p>") | |
| return ui.HTML( | |
| f"<p><strong>✅ SEO Keywords Injected ({len(kws)}):</strong></p><ul>" | |
| + "".join(f"<li>{kw}</li>" for kw in kws) + | |
| "</ul>" | |
| ) | |
| def post_to_shopify(): | |
| seo_title, html = generated_blog() | |
| if not html: | |
| ui.notification_show("⚠️ No blog generated yet.", type="warning") | |
| return | |
| success, response = publish_blog_post(title=seo_title, html_body=html) | |
| if success: | |
| ui.notification_show("✅ Blog posted to Shopify successfully!", type="message") | |
| else: | |
| ui.notification_show(f"❌ Failed to publish: {response}", type="error") | |