threads-bot-v1 / main.py
afluakd9's picture
Update main.py
19f65ad verified
import os
import requests
import time
import asyncio
import re
import hashlib
import logging
import sys
from datetime import datetime, timedelta, timezone
from telethon import TelegramClient, events
from telethon.sessions import StringSession
from flask import Flask
from threading import Thread
# --- LOGGING SETUP ---
logging.basicConfig(format='[%(levelname)s] %(asctime)s - %(message)s', level=logging.INFO)
# --- 1. ENVIRONMENT VARIABLES ---
try:
API_ID = int(os.environ.get("API_ID", 0))
API_HASH = os.environ.get("API_HASH", "")
SESSION_STRING = os.environ.get("SESSION_STRING", "")
TARGET_CHAT_ID = int(os.environ.get("TARGET_CHAT_ID", 0))
THREADS_ACCESS_TOKEN = os.environ.get("THREADS_ACCESS_TOKEN", "")
THREADS_USER_ID = os.environ.get("THREADS_USER_ID", "")
HF_TOKEN = os.environ.get("HF_TOKEN", "")
except ValueError:
logging.error("❌ ERROR: Check your Environment Variables.")
sys.exit(1)
HF_API_URL = "https://api-inference.huggingface.co/models/microsoft/Phi-3-mini-4k-instruct"
COOLDOWN_SECONDS = 300
last_post_time = 0
processed_posts = []
IST = timezone(timedelta(hours=5, minutes=30))
app = Flask('')
@app.route('/')
def home(): return "Bot Running 🟢"
def run_http(): app.run(host='0.0.0.0', port=7860)
def keep_alive():
t = Thread(target=run_http)
t.daemon = True
t.start()
# --- 2. AI SALES OPTIMIZER ---
def ai_optimize_post(text):
if not HF_TOKEN: return text
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
# Sales Prompt
prompt = (f"<|user|>\n"
f"You are a professional affiliate marketer. Rewrite this deal description into a high-converting Threads post.\n"
f"Rules:\n"
f"1. Start with a catchy hook (e.g., '🔥 Loot Alert!', '⚡️ Price Drop').\n"
f"2. Mention the Product and Discount clearly.\n"
f"3. Create urgency ('Limited Time', 'Hurry').\n"
f"4. Add 3 hashtags like #Loot #Deals #India.\n"
f"5. End with 'Check it out 👇'.\n"
f"6. NO LINKS in your output. NO BOLD text (**).\n"
f"7. Keep it under 350 characters.\n\n"
f"Deal: {text}\n<|end|>\n<|assistant|>")
payload = {"inputs": prompt, "parameters": {"max_new_tokens": 300, "return_full_text": False}}
try:
response = requests.post(HF_API_URL, headers=headers, json=payload, timeout=20)
res_json = response.json()
if isinstance(res_json, list) and len(res_json) > 0:
return res_json[0]['generated_text'].strip().replace('"', '').replace('**', '')
except: pass
return text
# --- 3. THREADS PUBLISHER ---
def post_to_threads(text, parent_id=None):
try:
url = f"https://graph.threads.net/v1.0/{THREADS_USER_ID}/threads"
params = {'access_token': THREADS_ACCESS_TOKEN, 'text': text[:500], 'media_type': 'TEXT'}
if parent_id: params['reply_to_id'] = parent_id
res = requests.post(url, params=params).json()
if 'id' in res:
requests.post(f"https://graph.threads.net/v1.0/{THREADS_USER_ID}/threads_publish",
params={'creation_id': res['id'], 'access_token': THREADS_ACCESS_TOKEN})
return res['id']
except: pass
return None
# --- 4. MAIN HANDLER ---
client = TelegramClient(StringSession(SESSION_STRING), API_ID, API_HASH)
# മാറ്റം 1: ഇവിടെ 'chats=TARGET_CHAT_ID' ഒഴിവാക്കി. എല്ലാ മെസ്സേജും പിടിക്കുന്നു.
@client.on(events.NewMessage())
async def handler(event):
global last_post_time
# മാറ്റം 2: വരുന്ന മെസ്സേജ് നമ്മുടെ ചാനലിൽ നിന്നാണോ എന്ന് ഇവിടെ നോക്കുന്നു
if event.chat_id != TARGET_CHAT_ID:
return # അല്ലെങ്കിൽ ഒഴിവാക്കുന്നു
# Time Check (6 AM - 12 AM)
if datetime.now(IST).hour < 6: return
# Link Check
raw_text = event.text or ""
links = re.findall(r'(https?://\S+)', raw_text)
if not links: return
# Cooldown Check
if time.time() - last_post_time < COOLDOWN_SECONDS:
logging.info("⏳ Cooldown active... Skipping.")
return
# Duplicate Check
post_hash = hashlib.md5(raw_text[:40].encode()).hexdigest()
if post_hash in processed_posts: return
processed_posts.append(post_hash)
if len(processed_posts) > 20: processed_posts.pop(0)
logging.info(f"📩 Processing Deal: {raw_text[:30]}...")
desc = re.sub(r'(https?://\S+)', '', raw_text).strip()
short_links = [l for l in links if len(l) < 150]
long_links = [l for l in links if len(l) >= 150]
# AI Processing
optimized_desc = ai_optimize_post(desc)
# Final Post Creation
final_post = optimized_desc
if short_links: final_post += "\n\n🔗 Buy Now:\n" + "\n".join(short_links)
if "#" not in final_post: final_post += "\n\n#Deals #Loot"
parent_id = post_to_threads(final_post)
if parent_id:
last_post_time = time.time()
logging.info(f"✅ Posted! ID: {parent_id}")
for l in long_links:
post_to_threads(f"🔗 Link: {l}", parent_id=parent_id)
time.sleep(5)
async def start_bot():
logging.info("--- BOT STARTED ---")
await client.start()
logging.info("--- Connected to Telegram ---")
await client.run_until_disconnected()
if __name__ == '__main__':
keep_alive()
loop = asyncio.get_event_loop()
loop.run_until_complete(start_bot())