Project_base / bot.py
Shinhati2023's picture
Update bot.py
877fc1f verified
import os
import re
import json
import logging
from collections import defaultdict
from telegram import Update
from telegram.ext import (
ApplicationBuilder,
MessageHandler,
CommandHandler,
ContextTypes,
filters,
)
# ------------------ CONFIG ------------------
TOKEN = os.getenv("BOT_TOKEN") # Set this in your environment
DATA_FILE = "bot_data.json"
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO,
)
logger = logging.getLogger(__name__)
# ------------------ ABUSIVE WORD FILTER ------------------
ABUSIVE_WORDS = [
"fuck", "shit", "bitch", "asshole", "cunt", "motherfucker",
"dick", "pussy", "slut", "whore", "bastard",
"mumu", "ode", "werey", "weyre", "olodo", "ashawo", "oloshi",
"agbaya", "oloriburuku", "shege", "dan iska", "apoda",
"obun", "didirin", "ewu", "ndisime"
]
abusive_regex = re.compile(
r"(?i)\b(" + "|".join(re.escape(word) for word in ABUSIVE_WORDS) + r")\b"
)
# ------------------ PERSISTENT STORAGE ------------------
def load_data():
if not os.path.exists(DATA_FILE):
return {"groups": [], "warnings": {}}
with open(DATA_FILE, "r") as f:
return json.load(f)
def save_data(data):
with open(DATA_FILE, "w") as f:
json.dump(data, f)
data_store = load_data()
# Structure:
# data_store["groups"] = [group_ids]
# data_store["warnings"] = { "chat_id:user_id": warning_count }
# ------------------ ABUSE HANDLER ------------------
async def track_groups_and_check_abuse(update: Update, context: ContextTypes.DEFAULT_TYPE):
message = update.message
if not message or not message.text:
return
chat = message.chat
# Track groups
if chat.type in ["group", "supergroup"]:
if chat.id not in data_store["groups"]:
data_store["groups"].append(chat.id)
save_data(data_store)
# Check abuse
if not abusive_regex.search(message.text):
return
user = message.from_user
key = f"{chat.id}:{user.id}"
try:
await message.delete()
except Exception as e:
logger.warning(f"Failed to delete message: {e}")
# Increase warning count
data_store["warnings"][key] = data_store["warnings"].get(key, 0) + 1
warnings = data_store["warnings"][key]
save_data(data_store)
if warnings == 1:
await chat.send_message(
f"⚠️ {user.first_name}, first warning. Do not use abusive language."
)
elif warnings == 2:
await chat.send_message(
f"⚠️ {user.first_name}, FINAL warning. Next time you will be removed."
)
else:
await chat.send_message(
f"🚫 {user.first_name} has been removed for repeated abusive language."
)
try:
await context.bot.ban_chat_member(chat.id, user.id)
except Exception as e:
logger.error(f"Failed to ban user: {e}")
data_store["warnings"][key] = 0
save_data(data_store)
# ------------------ PRIVATE BROADCAST ------------------
async def vikingkull_omega(update: Update, context: ContextTypes.DEFAULT_TYPE):
message = update.message
if message.chat.type != "private":
return
if not data_store["groups"]:
await message.reply_text("No groups stored yet.")
return
post_text = ""
photo_id = None
if message.photo:
photo_id = message.photo[-1].file_id
post_text = (message.caption or "").replace("/vikingkull_omega", "").strip()
elif message.text:
post_text = message.text.replace("/vikingkull_omega", "").strip()
if not post_text and not photo_id:
await message.reply_text(
"Usage:\n"
"/vikingkull_omega your message\n"
"OR send photo with caption."
)
return
success = 0
for group_id in list(data_store["groups"]):
try:
if photo_id:
await context.bot.send_photo(
chat_id=group_id,
photo=photo_id,
caption=post_text
)
else:
await context.bot.send_message(
chat_id=group_id,
text=post_text
)
success += 1
except Exception as e:
logger.warning(f"Removing invalid group {group_id}: {e}")
data_store["groups"].remove(group_id)
save_data(data_store)
await message.reply_text(f"✅ Broadcast sent to {success} group(s).")
# ------------------ DAILY COMMAND ------------------
async def daily_broadcast(update: Update, context: ContextTypes.DEFAULT_TYPE):
message = update.message
chat = message.chat
# Admin-only in groups
if chat.type in ["group", "supergroup"]:
member = await context.bot.get_chat_member(chat.id, message.from_user.id)
if member.status not in ["administrator", "creator"]:
return
if not context.args:
await message.reply_text("Usage: /daily Your message here")
return
text = " ".join(context.args)
try:
await message.delete()
except:
pass
await chat.send_message(
f"📢 Daily Update\n\n{text}"
)
# ------------------ MAIN ------------------
def main():
if not TOKEN:
raise ValueError("BOT_TOKEN environment variable not set.")
app = ApplicationBuilder().token(TOKEN).build()
app.add_handler(CommandHandler("daily", daily_broadcast))
app.add_handler(CommandHandler("vikingkull_omega", vikingkull_omega))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, track_groups_and_check_abuse))
logger.info("Bot is running...")
app.run_polling()
if __name__ == "__main__":
main()