clawdbot / main.py
Bjo53's picture
Update main.py
af28d9f verified
import logging
import asyncio
import os
import sys
import json
import subprocess
import threading
import time
from urllib.parse import urlparse
from flask import Flask, request, Response
# ── 3rd Party ──
from aiogram import Bot, Dispatcher, types, F
from aiogram.filters import CommandStart
from aiogram.client.session.aiohttp import AiohttpSession
from aiogram.client.telegram import TelegramAPIServer
from apscheduler.schedulers.asyncio import AsyncIOScheduler
# ╔═══════════════════════════════════════════════════════════════════╗
# β•‘ πŸŒ‰ THE CURL BRIDGE (BODY-LOSS FIX) β•‘
# β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
BRIDGE_PORT = 7860
PROXY_TARGET = "https://lucky-hat-e0d0.brukg9419.workers.dev"
CLOUDFLARE_IP = "104.21.28.169"
app = Flask(__name__)
# Health Check
@app.route('/')
def health_check():
return "βœ… PEKKA Bridge is Running!", 200
def run_curl(method, url, data=None):
parsed = urlparse(url)
domain = parsed.hostname
# 1. Prepare Command
cmd = [
"curl",
"-X", method, url,
"--resolve", f"{domain}:443:{CLOUDFLARE_IP}",
"-H", "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
"-H", "Content-Type: application/json",
"-k", "-s", "--max-time", "30"
]
# 2. Prepare Data (The Fix: Use stdin)
input_str = None
if data:
# Tell curl to read from stdin
cmd.append("--data-binary")
cmd.append("@-")
input_str = json.dumps(data)
try:
# 3. Execute with Input Piped
result = subprocess.run(
cmd,
input=input_str, # Pass JSON string here
capture_output=True,
text=True,
timeout=35
)
if not result.stdout:
print(f"❌ [BRIDGE ERROR] Empty Curl Output. Stderr: {result.stderr}")
return json.dumps({"ok": False, "description": "Empty Curl Response", "error_code": 500})
return result.stdout
except Exception as e:
print(f"❌ [BRIDGE EXCEPTION] {e}")
return json.dumps({"ok": False, "description": str(e), "error_code": 500})
@app.route('/bot<token>/<method>', methods=['POST', 'GET'])
def proxy(token, method):
real_url = f"{PROXY_TARGET}/bot{token}/{method}"
# ⚠️ CRITICAL FIX: Force parse JSON even if header is slightly different
data = request.get_json(force=True, silent=True)
if not data:
# Fallback: check form data or args if JSON failed
data = request.form.to_dict() or request.args.to_dict()
# Debug: Print first 50 chars of data to prove we have it
# print(f"πŸ”„ Proxying {method} | Data size: {len(str(data))}")
response_text = run_curl(request.method, real_url, data)
return Response(response_text, mimetype='application/json')
def start_bridge():
print(f"πŸš€ Starting Bridge on 0.0.0.0:{BRIDGE_PORT}")
app.run(host="0.0.0.0", port=BRIDGE_PORT, threaded=True)
threading.Thread(target=start_bridge, daemon=True).start()
time.sleep(3)
# ╔═══════════════════════════════════════════════════════════════════╗
# β•‘ πŸ€– YOUR PEKKA BOT β•‘
# β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
TELEGRAM_TOKEN = "8484056866:AAFdmEZCmedznAA2DWRkyFLjS6uTirVFMEQ"
session = AiohttpSession(
api=TelegramAPIServer.from_base(f"http://127.0.0.1:{BRIDGE_PORT}")
)
bot = Bot(token=TELEGRAM_TOKEN, session=session)
dp = Dispatcher()
scheduler = AsyncIOScheduler()
@dp.message(CommandStart())
async def handle_start(message: types.Message):
print(f"πŸ“© Received /start from {message.from_user.id}")
try:
await message.answer("βœ… PEKKA is Online (Body-Loss Fixed)!")
print("πŸ“€ Reply Sent!")
except Exception as e:
print(f"❌ Send Failed: {e}")
async def on_startup():
print("πŸ€– PEKKA AGENT IS STARTING...")
scheduler.start()
async def main():
await on_startup()
try:
await bot.delete_webhook(drop_pending_updates=True)
except Exception as e:
print(f"⚠️ Webhook warning (safe): {e}")
await dp.start_polling(bot)
if __name__ == "__main__":
asyncio.run(main())