Spaces:
Sleeping
Sleeping
| from shiny import reactive, render, ui | |
| import os | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from dotenv import load_dotenv, find_dotenv | |
| import tweepy | |
| import time | |
| from llm_connect import get_response | |
| # Load environment variables | |
| load_dotenv(find_dotenv(), override=True) | |
| # Load credentials | |
| API_KEY = os.getenv("TWITTER_ACC_API_KEY") | |
| API_SECRET = os.getenv("TWITTER_ACC_API_SECRET") | |
| ACCESS_TOKEN = os.getenv("TWITTER_ACC_ACCESS_TOKEN") | |
| ACCESS_TOKEN_SECRET = os.getenv("TWITTER_ACC_ACCESS_TOKEN_SECRET") | |
| generated_tweet = reactive.Value("") | |
| def scrape_shopify_blog(url: str) -> str: | |
| try: | |
| resp = requests.get(url) | |
| soup = BeautifulSoup(resp.content, 'html.parser') | |
| section = soup.find( | |
| 'div', | |
| class_='w940 align-center size-content' | |
| ) | |
| return section.get_text(strip=True, separator=' ') if section else "" | |
| except Exception as e: | |
| return f"β Error scraping blog: {e}" | |
| def generate_tweet_from_text(text: str, url: str='https://ultimasupply.com/blogs/news', max_len: int = 250) -> str: | |
| # Step 1: Generate tweet from blog content | |
| gen_prompt = ( | |
| "You are a social media manager for a hobby e-commerce company called 'Ultima Supply'.\n" | |
| f"Write a single engaging Twitter post (max {max_len} characters) summarizing the following Shopify blog content:\n\n" | |
| f"{text}\n\n" | |
| f"The tweet MUST include this URL: {url}\n" | |
| "Use casual, fun language. Add emojis and 3β5 SEO-relevant hashtags. Keep it under the character limit and make it attention-grabbing." | |
| ) | |
| tweet = get_response( | |
| input=gen_prompt, | |
| template=lambda x: x.strip(), | |
| llm='gemini', | |
| md=False, | |
| temperature=0.6, | |
| max_tokens=100 | |
| ).strip() | |
| print(f'Base Tweet: {tweet}') | |
| # Step 2: If too long, ask Gemini to shorten it | |
| if len(tweet) <= max_len: | |
| return tweet | |
| else: | |
| time.sleep(5) | |
| shorten_prompt = ( | |
| f"You are shortening this tweet to {max_len} characters or fewer.\n" | |
| f"YOU MUST KEEP ALL of the original SEO hashtags and the Shopify blog link intact in the final output.\n" | |
| f"DO NOT remove or modify the link or any hashtags.\n\n" | |
| f"Original tweet:\n{tweet}\n" | |
| ) | |
| shortened = get_response( | |
| input=shorten_prompt, | |
| template=lambda x: x.strip(), | |
| llm='gemini', | |
| md=False, | |
| temperature=0.5, | |
| max_tokens=75 | |
| ).strip() | |
| return shortened # Final hard cap just in case | |
| def generate_tweet_from_topic(topic: str, max_len: int = 250) -> str: | |
| # Step 1: Generate tweet from topic | |
| gen_prompt = ( | |
| f"You are a social media manager for a hobby e-commerce brand called 'Ultima Supply'.\n" | |
| f"Write a SHORT SINGLE SENTENCE Twitter post (max {max_len} characters) about: '{topic}'.\n" | |
| f"Use casual, fun language. Include emojis and 3-5 SEO-relevant hashtags." | |
| ) | |
| tweet = get_response( | |
| input=gen_prompt, | |
| template=lambda x: x.strip(), | |
| llm='gemini', | |
| md=False, | |
| temperature=0.6, | |
| max_tokens=100 | |
| ).strip() | |
| print(f'Base Tweet: {tweet}') | |
| # Step 2: If too long, regenerate with shortening prompt | |
| if len(tweet) <= max_len: | |
| return tweet | |
| else: | |
| shorten_prompt = ( | |
| f"You are shortening this tweet to {max_len} characters or fewer.\n" | |
| f"YOU MUST KEEP ALL of the original SEO hashtags and the Shopify blog link intact in the final output.\n" | |
| f"DO NOT remove or modify the link or any hashtags.\n\n" | |
| f"Original tweet:\n{tweet}\n" | |
| ) | |
| shortened = get_response( | |
| input=shorten_prompt, | |
| template=lambda x: x.strip(), | |
| llm='gemini', | |
| md=False, | |
| temperature=0.5, | |
| max_tokens=75 | |
| ).strip() | |
| return shortened # Ensure hard cutoff if still too long | |
| def tweet_input_handler(url: str, topic: str) -> str: | |
| if url.strip(): | |
| raw_text = scrape_shopify_blog(url) | |
| if not raw_text or raw_text.startswith("β"): | |
| return "β οΈ Failed to extract content from blog." | |
| return generate_tweet_from_text(raw_text) | |
| elif topic.strip(): | |
| return generate_tweet_from_topic(topic) | |
| else: | |
| return "β οΈ Provide either a blog URL or a topic." | |
| # Create Tweepy v2 Client | |
| def get_tweepy_client(): | |
| return tweepy.Client( | |
| consumer_key=API_KEY, | |
| consumer_secret=API_SECRET, | |
| access_token=ACCESS_TOKEN, | |
| access_token_secret=ACCESS_TOKEN_SECRET | |
| ) | |
| # Post tweet using Tweepy v2 Client | |
| def post_tweet_tweepy(tweet_text: str) -> str: | |
| try: | |
| client = get_tweepy_client() | |
| response = client.create_tweet(text=tweet_text) | |
| tweet_id = response.data.get("id") | |
| print("β Tweet posted:", tweet_id) | |
| return f"β Tweet posted (ID: {tweet_id})" | |
| except Exception as e: | |
| print("β Failed to post tweet:", e) | |
| return f"β Failed to post tweet: {e}" | |
| # Check auth by attempting to fetch own user (optional) | |
| def check_twitter_connection() -> str: | |
| try: | |
| client = get_tweepy_client() | |
| user_data = client.get_me() | |
| if user_data and user_data.data: | |
| username = user_data.data.username | |
| print(f"β Connected to Twitter as @{username}") | |
| return f"β Connected to Twitter as @{username}" | |
| else: | |
| return "β Unable to fetch Twitter account info." | |
| except Exception as e: | |
| print("β Connection check failed:", e) | |
| return f"β Twitter auth error: {e}" | |
| # ---- EXPORT THIS FOR app.py TO CALL ---- | |
| def server(input, output, session): | |
| def tweet_draft(): | |
| url = input.shopify_url().strip() | |
| topic = input.tweet_topic().strip() | |
| tweet = tweet_input_handler(url, topic) | |
| print("π Generated Tweet:\n", tweet) | |
| if tweet.startswith("β οΈ"): | |
| return ui.HTML(f"<p><strong>{tweet}</strong></p>") | |
| generated_tweet.set(tweet) | |
| return ui.HTML(f"<p><strong>Generated Tweet:</strong><br>{tweet}</p>") | |
| def tweet_post_status(): | |
| status = check_twitter_connection() | |
| if status.startswith("β"): | |
| return status | |
| tweet = generated_tweet() | |
| print("π€ Attempting to post tweet:", tweet) | |
| if not tweet: | |
| return "β οΈ No tweet generated yet." | |
| return post_tweet_tweepy(tweet) | |