Spaces:
Running
Running
| import os | |
| import requests | |
| import random | |
| import threading | |
| import time | |
| import schedule | |
| import tweepy | |
| import gradio as gr | |
| import json | |
| # Secrets from HF Space environment | |
| HF_TOKEN = os.environ['HF_TOKEN'] | |
| CONSUMER_KEY = os.environ['CONSUMER_KEY'] | |
| CONSUMER_SECRET = os.environ['CONSUMER_SECRET'] | |
| ACCESS_TOKEN = os.environ['ACCESS_TOKEN'] | |
| ACCESS_SECRET = os.environ['ACCESS_SECRET'] | |
| # LLM API setup | |
| API_URL = "https://router.huggingface.co/v1/chat/completions" | |
| headers = { | |
| "Authorization": f"Bearer {HF_TOKEN}", | |
| } | |
| def query(payload): | |
| try: | |
| response = requests.post(API_URL, headers=headers, json=payload, timeout=120) | |
| response.raise_for_status() | |
| json_response = response.json() | |
| return json_response | |
| except requests.exceptions.HTTPError as http_err: | |
| error_detail = f"HTTP error: {http_err} - Status: {response.status_code}" | |
| try: | |
| error_body = response.json() | |
| error_detail += f" | Details: {error_body}" | |
| except: | |
| error_detail += f" | Raw response: {response.text[:300]}" | |
| print(error_detail) | |
| raise ValueError(error_detail) | |
| except requests.exceptions.RequestException as req_err: | |
| error_msg = f"Request failed: {req_err}" | |
| print(error_msg) | |
| raise ValueError(error_msg) | |
| except ValueError as json_err: | |
| error_msg = f"JSON decode error: {json_err} | Raw: {response.text[:300]}" | |
| print(error_msg) | |
| raise ValueError(error_msg) | |
| # Topics for posts | |
| topics = [ | |
| "GeoAI as a field and it's application", | |
| "application of Geology with Tech for exploration", | |
| "application of AI in geological exploration", | |
| "Entrepreneurship" | |
| ] | |
| # Function to generate a high-quality post using LLM | |
| def generate_post(topic=None): | |
| if topic is None or topic == "Random": | |
| topic = random.choice(topics) | |
| prompt = f"Generate a high-quality, educational, and informative X (Twitter) post under 280 characters showcasing expertise in {topic}. Make it engaging, insightful, and professional. Include relevant hashtags and hashtags shouldn't be more than 2 or not even at all in some post also avoid using - in the post so it looks natural." | |
| payload = { | |
| "messages": [{"role": "user", "content": prompt}], | |
| "model": "deepseek-ai/DeepSeek-V3.2:novita", | |
| "max_tokens": 200, | |
| "temperature": 0.8 | |
| } | |
| try: | |
| response = query(payload) | |
| print("API Response:", response) | |
| if "choices" in response and response["choices"]: | |
| post_content = response["choices"][0]["message"]["content"].strip() | |
| elif "error" in response: | |
| raise ValueError(f"API returned error: {response['error']}") | |
| elif isinstance(response, dict) and "content" in response: | |
| post_content = response["content"].strip() | |
| else: | |
| raise KeyError("Unexpected response format - no 'choices' or parsable content") | |
| except Exception as e: | |
| error_msg = f"Failed to generate post: {str(e)}" | |
| print(error_msg) | |
| post_content = f"[Generation failed: {str(e)} — please check logs or try again later.]" | |
| if len(post_content) > 280: | |
| post_content = post_content[:277] + "..." | |
| return post_content | |
| # Tweepy v2 Client setup | |
| client = tweepy.Client( | |
| consumer_key=CONSUMER_KEY, | |
| consumer_secret=CONSUMER_SECRET, | |
| access_token=ACCESS_TOKEN, | |
| access_token_secret=ACCESS_SECRET | |
| ) | |
| # Global queue and scheduler state | |
| post_queue = [] | |
| posted_history = [] | |
| scheduler_running = False | |
| scheduler_thread = None | |
| next_post_time = None | |
| # Persistence functions for queue | |
| def save_queue(): | |
| with open('queue.json', 'w') as f: | |
| json.dump(post_queue, f) | |
| def load_queue(): | |
| global post_queue | |
| if os.path.exists('queue.json'): | |
| with open('queue.json', 'r') as f: | |
| post_queue = json.load(f) | |
| # Persistence functions for posted history | |
| def save_posted(): | |
| with open('posted.json', 'w') as f: | |
| json.dump(posted_history, f) | |
| def load_posted(): | |
| global posted_history | |
| if os.path.exists('posted.json'): | |
| with open('posted.json', 'r') as f: | |
| posted_history = json.load(f) | |
| # Load at startup | |
| load_queue() | |
| load_posted() | |
| # Function to post to X | |
| def post_to_x(content): | |
| try: | |
| response = client.create_tweet(text=content) | |
| tweet_id = response.data['id'] | |
| return f"Posted! https://x.com/user/status/{tweet_id}" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| # Scheduler job — posts one item from queue | |
| def scheduled_post_job(): | |
| global post_queue, posted_history, next_post_time | |
| if post_queue: | |
| content = post_queue.pop(0) | |
| result = post_to_x(content) | |
| save_queue() | |
| if "Posted!" in result: | |
| posted_history.append({ | |
| "content": content, | |
| "result": result, | |
| "timestamp": time.strftime("%Y-%m-%d %H:%M:%S") | |
| }) | |
| save_posted() | |
| print(f"[Scheduler] {result}\nContent: {content}") | |
| if post_queue: | |
| next_post_time = time.time() + 7200 | |
| else: | |
| next_post_time = None | |
| return result, "\n\n".join(post_queue) if post_queue else "Queue is empty." | |
| else: | |
| print("[Scheduler] Queue is empty.") | |
| next_post_time = None | |
| return "No post scheduled — queue is empty.", "Queue is empty." | |
| # Background scheduler loop | |
| def run_scheduler(): | |
| global scheduler_running | |
| schedule.every(2).hours.do(scheduled_post_job) | |
| while scheduler_running: | |
| schedule.run_pending() | |
| time.sleep(30) | |
| # Start scheduler | |
| def start_scheduler(): | |
| global scheduler_running, scheduler_thread, next_post_time | |
| if not scheduler_running: | |
| scheduler_running = True | |
| scheduler_thread = threading.Thread(target=run_scheduler, daemon=True) | |
| scheduler_thread.start() | |
| if post_queue: | |
| scheduled_post_job() | |
| if post_queue: | |
| next_post_time = time.time() + 7200 | |
| return "Scheduler started! Posts every 2 hours from the queue." | |
| return "Scheduler is already running." | |
| # Stop scheduler | |
| def stop_scheduler(): | |
| global scheduler_running, next_post_time | |
| scheduler_running = False | |
| next_post_time = None | |
| return "Scheduler stopped." | |
| # Clear queue function | |
| def clear_queue(): | |
| global post_queue, next_post_time | |
| post_queue.clear() | |
| next_post_time = None | |
| save_queue() | |
| return "Queue cleared.", "Queue is empty." | |
| # Refresh queue display | |
| def refresh_queue(): | |
| return "\n\n".join(post_queue) if post_queue else "Queue is empty." | |
| # Refresh posted history display | |
| def refresh_posted(): | |
| if posted_history: | |
| return "\n\n".join([f"{p['timestamp']} - {p['result']}\n{p['content']}" for p in posted_history]) | |
| else: | |
| return "No posts yet." | |
| # Get time until next post | |
| def get_timer(): | |
| global next_post_time | |
| if next_post_time is None or not scheduler_running: | |
| return "No upcoming posts or scheduler stopped." | |
| remaining = next_post_time - time.time() | |
| if remaining <= 0: | |
| return "Posting soon..." | |
| hours, rem = divmod(remaining, 3600) | |
| mins, secs = divmod(rem, 60) | |
| return f"{int(hours):02d}:{int(mins):02d}:{int(secs):02d}" | |
| # Gradio functions | |
| def generate_new_post(topic, num_posts): | |
| contents = [generate_post(topic) for _ in range(num_posts)] | |
| return "\n\n---\n\n".join(contents), gr.update(visible=True) | |
| def add_to_queue(content): | |
| global post_queue, next_post_time | |
| posts = [p.strip() for p in content.split("---") if p.strip()] | |
| added = False | |
| for p in posts: | |
| if p and p not in post_queue: | |
| post_queue.append(p) | |
| added = True | |
| if added: | |
| save_queue() | |
| if scheduler_running and next_post_time is None and post_queue: | |
| next_post_time = time.time() + 7200 | |
| return ( | |
| "", | |
| gr.update(visible=False), | |
| "\n\n".join(post_queue) if post_queue else "Queue is empty." | |
| ) | |
| def refresh_all(): | |
| return refresh_queue(), refresh_posted(), get_timer() | |
| # Gradio Interface | |
| with gr.Blocks(title="X Post Generator & Scheduler") as demo: | |
| gr.Markdown("# AI/Tech/Startups X Post Generator & Queue Scheduler") | |
| gr.Markdown("Generate → Review → Add to queue → Start scheduler for automated 2-hour posting.") | |
| with gr.Row(): | |
| topic_input = gr.Dropdown(choices=["Random"] + topics, label="Topic", allow_custom_value=True) | |
| num_posts_input = gr.Number(label="Number of Posts", value=1, minimum=1, maximum=20, step=1) | |
| generate_btn = gr.Button("Generate New Post(s)") | |
| current_post = gr.Textbox(label="Generated Post(s) (Review and edit before adding; separate multiples with ---)", lines=6, interactive=True) | |
| add_btn = gr.Button("➕ Add to Queue", visible=False) | |
| queue_display = gr.Textbox( | |
| label="Post Queue (will be posted in order, every 2 hours)", | |
| value=refresh_queue(), | |
| lines=12, | |
| interactive=False | |
| ) | |
| posted_display = gr.Textbox( | |
| label="Posted History", | |
| value=refresh_posted(), | |
| lines=12, | |
| interactive=False | |
| ) | |
| timer_display = gr.Textbox(label="Time Until Next Post", value=get_timer(), interactive=False) | |
| with gr.Row(): | |
| start_btn = gr.Button("Start Scheduler") | |
| stop_btn = gr.Button("Stop Scheduler") | |
| clear_queue_btn = gr.Button("Clear Queue") | |
| refresh_btn = gr.Button("Refresh Queue & History") | |
| status_box = gr.Textbox(label="Status", value="Ready", interactive=False) | |
| # Invisible timer to update countdown every 1 second | |
| timer = gr.Timer(value=1, active=True) | |
| # Event bindings | |
| generate_btn.click( | |
| generate_new_post, | |
| inputs=[topic_input, num_posts_input], | |
| outputs=[current_post, add_btn] | |
| ) | |
| add_btn.click( | |
| add_to_queue, | |
| inputs=current_post, | |
| outputs=[current_post, add_btn, queue_display] | |
| ) | |
| start_btn.click( | |
| start_scheduler, | |
| outputs=status_box | |
| ) | |
| stop_btn.click( | |
| stop_scheduler, | |
| outputs=status_box | |
| ) | |
| clear_queue_btn.click( | |
| clear_queue, | |
| outputs=[status_box, queue_display] | |
| ) | |
| refresh_btn.click( | |
| refresh_all, | |
| outputs=[queue_display, posted_display, timer_display] | |
| ) | |
| timer.tick( | |
| get_timer, | |
| outputs=timer_display | |
| ) | |
| # Automatically start scheduler on app load | |
| demo.load( | |
| start_scheduler, | |
| outputs=status_box | |
| ) | |
| demo.load( | |
| get_timer, | |
| outputs=timer_display | |
| ) | |
| demo.launch() |