from shiny import reactive, render, ui import os import tweepy from dotenv import load_dotenv from llm_connect import get_response # your existing function load_dotenv() # Twitter API v2 credentials BEARER_TOKEN = os.getenv("TWITTER_BEARER_TOKEN") API_KEY = os.getenv("TWITTER_ACC_API_KEY") API_SECRET = os.getenv("TWITTER_ACC_API_SECRET") ACCESS_TOKEN = os.getenv("TWITTER_ACC_ACCESS_TOKEN") ACCESS_TOKEN_SECRET = os.getenv("TWITTER_ACC_ACCESS_TOKEN_SECRET") client = tweepy.Client( bearer_token=BEARER_TOKEN, consumer_key=API_KEY, consumer_secret=API_SECRET, access_token=ACCESS_TOKEN, access_token_secret=ACCESS_TOKEN_SECRET ) generated_tweet = reactive.Value("") def generate_tweet_from_topic(topic: str) -> str: prompt = ( f"You are a social media manager for a hobby e-commerce company called 'Ultima Supply'.\n" f"Write a short, engaging Twitter post (max 500 characters) about: '{topic}'.\n" f"Include emojis or 3-5 SEO relevant hashtags. Use casual, fun language." ) return get_response( input=prompt, template=lambda x: x, llm='gemini', md=False, temperature=0.9, max_tokens=500 ) def post_tweet(text: str) -> str: try: response = client.create_tweet(text=text) return f"✅ Tweet posted (ID: {response.data['id']})" except Exception as e: return f"❌ Failed to post tweet: {e}" def server(input, output, session): post_status = reactive.Value("") @output @render.ui @reactive.event(input.gen_btn_twt) def tweet_draft(): topic = input.tweet_topic() if not topic.strip(): return ui.HTML("

⚠️ Enter a topic first.

") tweet = generate_tweet_from_topic(topic) generated_tweet.set(tweet) return ui.HTML(f"

Generated Tweet:
{tweet}

") @output @render.text def tweet_post_status(): return post_status() @reactive.effect @reactive.event(input.post_btn_twt) def _(): tweet = generated_tweet() if not tweet: post_status.set("⚠️ No tweet generated yet.") else: result = post_tweet(tweet) post_status.set(result)