JerryCoder commited on
Commit
5e04af4
·
verified ·
1 Parent(s): 92a4a3b

Delete bot.py

Browse files
Files changed (1) hide show
  1. bot.py +0 -168
bot.py DELETED
@@ -1,168 +0,0 @@
1
- import os
2
- import io
3
- import sys
4
- import zipfile
5
- import base64
6
- import random
7
- import shutil
8
- import subprocess
9
- import tempfile
10
- from pathlib import Path
11
-
12
- from fastapi import FastAPI, Request
13
- from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
14
- from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
15
-
16
- from motor.motor_asyncio import AsyncIOMotorClient
17
- import uvicorn
18
- from config import BOT_TOKEN, ADMIN_ID, MONGO_URI, DB_NAME
19
-
20
- # --- MongoDB Setup ---
21
- mongo_client = AsyncIOMotorClient(MONGO_URI)
22
- db = mongo_client[DB_NAME]
23
- users_col = db["users"]
24
- bans_col = db["banned_users"]
25
- stats_col = db["stats"]
26
-
27
- async def init_stats():
28
- if not await stats_col.find_one({"_id":"encryption_count"}):
29
- await stats_col.insert_one({"_id":"encryption_count","count":0})
30
-
31
- # --- Helpers ---
32
- def make_zip_bytes(filename: str, content: bytes) -> bytes:
33
- bio = io.BytesIO()
34
- with zipfile.ZipFile(bio, "w", compression=zipfile.ZIP_DEFLATED) as zf:
35
- zf.writestr(filename, content)
36
- return bio.getvalue()
37
-
38
- def rand_bytes(n): return os.urandom(n)
39
- def bytes_to_c_array_literal(b: bytes) -> str:
40
- return ",".join(str(x) for x in b)
41
-
42
- def xor_string(s: str, key: int = None) -> str:
43
- if key is None: key = random.randint(1,255)
44
- arr = [ord(c)^key for c in s]
45
- return f"(lambda s:''.join(chr(B^{key}) for B in s))([{','.join(str(x) for x in arr)}])"
46
-
47
- def generate_junk_data(min_kb=50,max_kb=70):
48
- junk_size = random.randint(min_kb*1024,max_kb*1024)
49
- chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_()-./,;:!?"
50
- return ''.join(random.choice(chars) for _ in range(junk_size))
51
-
52
- # --- C Template ---
53
- C_TEMPLATE = """/*KEY_BYTES*/"""
54
- SETUP_PY = """# Your setup.py content"""
55
-
56
- def compile_c_extension(build_dir: Path):
57
- (build_dir/"setup.py").write_text(SETUP_PY)
58
- proc = subprocess.run([sys.executable,"setup.py","build_ext","--inplace"],
59
- cwd=str(build_dir), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=30)
60
- candidates = list(build_dir.glob("aes_helper*.so")) + list(build_dir.glob("build/lib.*/*.so"))
61
- if not candidates:
62
- raise RuntimeError("Compile fail:\n"+proc.stdout.decode())
63
- return candidates[0]
64
-
65
- def build_wrapper(j_b64, so_b64, so_filename):
66
- lines = [
67
- "wulcan = '.WulcanPy'",
68
- "import os as O, sys as S, base64 as B, tempfile as T",
69
- f"WL='{j_b64}'",
70
- f"M={repr(so_b64)}",
71
- f"D=O.path.join(T.gettempdir(),{xor_string('ninja_tmp')});O.makedirs(D,exist_ok=True)",
72
- f"P=O.path.join(D,{xor_string(so_filename)})",
73
- "open(P,'wb').write(B.b64decode(M))",
74
- "import importlib.machinery as L, importlib.util as U",
75
- "loader=L.ExtensionFileLoader('aes_helper',P);spec=U.spec_from_loader(loader.name,loader);mod=U.module_from_spec(spec);loader.exec_module(mod)",
76
- "import sys, io, zipfile, runpy",
77
- "try:",
78
- "\tc=B.b64decode(WL);z=mod.decrypt(c)",
79
- "\tt=T.mkdtemp(prefix='nin_');zipfile.ZipFile(io.BytesIO(z)).extractall(t)",
80
- "\tS.path.insert(0,t)",
81
- "\te=[f for f in O.listdir(t) if f.endswith('.py')][0]",
82
- "\tep=O.path.join(t,e)",
83
- "\tS.argv=[ep]+S.argv[1:]",
84
- "\trunpy.run_path(ep,run_name='__main__')",
85
- "except Exception as E: print('ninja_error',E)",
86
- "finally: O.remove(P) if O.path.exists(P) else None",
87
- f"JUNK={repr(generate_junk_data())}"
88
- ]
89
- return "\n".join(lines)
90
-
91
- # --- Process File ---
92
- async def process_file(input_file: Path, update: Update, context: ContextTypes.DEFAULT_TYPE):
93
- user = update.effective_user
94
- tmpdir = Path(tempfile.mkdtemp())
95
- tmp_file = tmpdir/input_file.name
96
- tmp_file.write_bytes(input_file.read_bytes())
97
- py_bytes = tmp_file.read_bytes()
98
- zip_b = make_zip_bytes(input_file.name, py_bytes)
99
- key, iv = rand_bytes(32), rand_bytes(16)
100
- c_src = C_TEMPLATE.replace("/*KEY_BYTES*/", bytes_to_c_array_literal(key)).replace("/*IV_BYTES*/", bytes_to_c_array_literal(iv))
101
- (tmpdir/"aes_helper.c").write_text(c_src)
102
- so_path = compile_c_extension(tmpdir)
103
- import importlib.util
104
- spec = importlib.util.spec_from_file_location("aes_helper",so_path)
105
- mod = importlib.util.module_from_spec(spec)
106
- spec.loader.exec_module(mod)
107
- cipher = mod.encrypt(zip_b)
108
- cipher_b64 = base64.b64encode(cipher).decode()
109
- so_b64 = base64.b64encode(so_path.read_bytes()).decode()
110
- wrapper = build_wrapper(cipher_b64, so_b64, so_path.name)
111
- out_name = input_file.stem+"_enc.py"
112
- out_path = tmpdir/out_name
113
- out_path.write_text(wrapper)
114
- await context.bot.send_document(chat_id=update.effective_chat.id, document=open(out_path,'rb'), filename=out_name)
115
- await stats_col.update_one({"_id":"encryption_count"},{"$inc":{"count":1}})
116
- shutil.rmtree(tmpdir, ignore_errors=True)
117
-
118
- # --- Handlers ---
119
- async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
120
- user = update.effective_user
121
- if not await users_col.find_one({"user_id":user.id}):
122
- await users_col.insert_one({"user_id":user.id,"name":user.full_name,"username":user.username or "None","language":user.language_code or "Unknown"})
123
- keyboard=[[InlineKeyboardButton("▶ Join Channel",url="https://t.me/kuttuxd17")]]
124
- await update.message.reply_text("Welcome to Secure File Encryptor Bot!",reply_markup=InlineKeyboardMarkup(keyboard))
125
-
126
- async def encode(update: Update, context: ContextTypes.DEFAULT_TYPE):
127
- user = update.effective_user
128
- if await bans_col.find_one({"user_id":user.id}):
129
- await update.message.reply_text("You are banned.")
130
- return
131
- await update.message.reply_text("Please upload a .py file.")
132
-
133
- async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE):
134
- await process_file(Path(update.message.document.file_name), update, context)
135
-
136
- # --- Admin Commands ---
137
- async def total_users(update: Update, context: ContextTypes.DEFAULT_TYPE):
138
- if update.effective_user.id != ADMIN_ID: return
139
- count = await users_col.count_documents({})
140
- await update.message.reply_text(f"Total users: {count}")
141
-
142
- async def total_enc(update: Update, context: ContextTypes.DEFAULT_TYPE):
143
- if update.effective_user.id != ADMIN_ID: return
144
- stats = await stats_col.find_one({"_id":"encryption_count"})
145
- await update.message.reply_text(f"Total encryptions: {stats['count'] if stats else 0}")
146
-
147
- # --- FastAPI App ---
148
- app = FastAPI()
149
- bot_app = Application.builder().token(BOT_TOKEN).build()
150
-
151
- # Register handlers
152
- bot_app.add_handler(CommandHandler("start", start))
153
- bot_app.add_handler(CommandHandler("encode", encode))
154
- bot_app.add_handler(MessageHandler(filters.Document.ALL, handle_document))
155
- bot_app.add_handler(CommandHandler("totalusers", total_users))
156
- bot_app.add_handler(CommandHandler("totalenc", total_enc))
157
-
158
- @app.post("/webhook")
159
- async def webhook_endpoint(request: Request):
160
- update_data = await request.json()
161
- update = Update.de_json(update_data, bot_app.bot)
162
- await bot_app.update_queue.put(update)
163
- return {"ok": True}
164
-
165
- if __name__ == "__main__":
166
- import asyncio
167
- asyncio.run(init_stats())
168
- uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 7860)))