import os import asyncio from flask import Flask, render_template_string, request, redirect, url_for, flash, session from functools import wraps from supabase import create_client, Client from dotenv import load_dotenv import requests from datetime import datetime, timedelta, timezone import json from apscheduler.schedulers.background import BackgroundScheduler # Load environment variables load_dotenv() # Configuration TELEGRAM_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") SUPABASE_URL = os.getenv("SUPABASE_URL") SUPABASE_KEY = os.getenv("SUPABASE_KEY") ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD", "1sampai8") TELEGRAM_API_BASE = os.getenv("TELEGRAM_API_BASE_URL", "https://api.telegram.org").rstrip('/') if TELEGRAM_API_BASE.endswith('/bot'): TELEGRAM_API_BASE = TELEGRAM_API_BASE[:-4] # Supabase Client supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) app = Flask(__name__) app.secret_key = os.getenv("FLASK_SECRET_KEY", "your-secret-key-change-this") # Initialize Scheduler scheduler = BackgroundScheduler(timezone='UTC') # HTML Templates LOGIN_TEMPLATE = """ Admin Login - Icebox AI

🔐 Admin Login

{% if error %}

{{ error }}

{% endif %}
""" DASHBOARD_TEMPLATE = """ Admin Console - Icebox AI

🤖 Icebox AI Admin

Logout
{% with messages = get_flashed_messages(with_categories=true) %} {% for category, message in messages %}
{{ message }}
{% endfor %} {% endwith %}

{{ stats.total_users }}

Total Users

{{ stats.active_users_7d }}

Active Users (7d)

{{ stats.paid_users or 0 }}

Premium Users

{{ stats.total_generations }}

Total Generations

{{ stats.gen_today or 0 }}

Generations Today

📢 Send Broadcast

📝 Message History

{% if message_logs %} {% for msg in message_logs %}

{{ msg.message_content[:60] }}{% if msg.message_content|length > 60 %}...{% endif %}

{{ msg.target_type }} {{ msg.success_count }} sent, {{ msg.fail_count }} failed {% if msg.sent_message_ids %} ✓ Editable {% endif %}

{{ msg.sent_at }}
{% endfor %} {% else %}

No broadcast history yet.

{% endif %}

⏰ Scheduled Messages

{% if schedules %} {% for sched in schedules %}

{{ sched.name }}

{{ sched.message_content[:50] }}...

{{ 'Active' if sched.is_enabled else 'Disabled' }} {{ sched.schedule_time }} UTC
{% endfor %} {% else %}

No scheduled messages.

{% endif %}

👥 Recent Active Users

{% for user in users_list %} {% endfor %}
User Chat ID Tier Generated Last Active
{{ user.first_name }} {% if user.username %}
@{{ user.username }}{% endif %}
{{ user.chat_id }} {{ user.tier }} {{ user.total_images_generated }} {{ user.last_active }}
""" def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if not session.get('logged_in'): return redirect(url_for('login')) return f(*args, **kwargs) return decorated_function # --- Helper Functions --- def normalize_chat_ids(target, chat_ids_input): """Return a list of chat IDs based on target selection.""" if target == 'specific': return [int(cid.strip()) for cid in chat_ids_input.split(',') if cid.strip()] elif target == 'active': # Active in last 7 days res = supabase.table("telegram_users").select("chat_id").gte( "last_active", (datetime.now() - timedelta(days=7)).isoformat() ).execute() return [u['chat_id'] for u in res.data] else: # All users res = supabase.table("telegram_users").select("chat_id").execute() return [u['chat_id'] for u in res.data] def send_telegram_message_with_menu(chat_id: int, message: str, send_menu: bool = True) -> tuple[bool, str, int]: """Send message and optionally the main menu. Returns: (success, error_message, message_id) Telegram Markdown syntax: - *bold* (not **bold**) - _italic_ - `code` - [link text](url) """ try: # 1. Send Main Message url = f"{TELEGRAM_API_BASE}/bot{TELEGRAM_TOKEN}/sendMessage" data = { "chat_id": chat_id, "text": message, "parse_mode": "Markdown" } response = requests.post(url, json=data, timeout=10) if response.status_code != 200: if response.status_code == 404: return False, f"404 Not Found at {url.replace(TELEGRAM_TOKEN, '***')}. Check Base URL or Token.", 0 return False, f"Main msg failed: {response.text}", 0 # Extract message_id from response result = response.json() message_id = result.get('result', {}).get('message_id', 0) if send_menu: # 2. Add Main Menu menu_url = f"{TELEGRAM_API_BASE}/bot{TELEGRAM_TOKEN}/sendMessage" menu_keyboard = { "inline_keyboard": [ [{"text": "Generate Image", "callback_data": "generate_mode"}], [{"text": "My Profile", "callback_data": "profile"}], [{"text": "Help & Support", "callback_data": "help"}] ] } menu_data = { "chat_id": chat_id, "text": "👇 *Quick Actions:*", "parse_mode": "Markdown", "reply_markup": menu_keyboard } menu_res = requests.post(menu_url, json=menu_data, timeout=5) if menu_res.status_code != 200: return True, f"Msg sent, but menu failed: {menu_res.text}", message_id return True, "", message_id except Exception as e: return False, str(e), 0 def edit_telegram_message(chat_id: int, message_id: int, new_text: str) -> tuple[bool, str]: """Edit existing Telegram message using editMessageText API. Note: Can only edit messages sent by the bot within 48 hours. """ try: url = f"{TELEGRAM_API_BASE}/bot{TELEGRAM_TOKEN}/editMessageText" data = { "chat_id": chat_id, "message_id": message_id, "text": new_text, "parse_mode": "Markdown" } response = requests.post(url, json=data, timeout=10) if response.status_code == 200: return True, "" return False, response.text except Exception as e: return False, str(e) def delete_telegram_message(chat_id: int, message_id: int) -> tuple[bool, str]: """Delete Telegram message using deleteMessage API. Note: Can only delete messages sent by the bot within 48 hours. """ try: url = f"{TELEGRAM_API_BASE}/bot{TELEGRAM_TOKEN}/deleteMessage" data = { "chat_id": chat_id, "message_id": message_id } response = requests.post(url, json=data, timeout=10) if response.status_code == 200: return True, "" return False, response.text except Exception as e: return False, str(e) def check_and_send_scheduled_messages(): """Background task to check and send scheduled messages.""" with app.app_context(): now = datetime.now(timezone.utc) current_time_str = now.strftime('%H:%M') try: # Fetch active schedules res = supabase.table("scheduled_messages").select("*").eq("is_enabled", True).execute() schedules = res.data for sched in schedules: if sched['schedule_time'].startswith(current_time_str): # Check if already sent today (simple debounce) last_sent = sched.get('last_sent_at') if last_sent: last_sent_date = datetime.fromisoformat(last_sent).date() if last_sent_date == now.date(): continue # Already sent today # Execute Broadcast logic chat_ids = normalize_chat_ids(sched.get('target_type', 'all'), "") msg_content = sched['message_content'] success = 0 fail = 0 last_error = "" sent_message_ids = {} for cid in chat_ids: is_ok, err_msg, msg_id = send_telegram_message_with_menu(cid, msg_content, send_menu=True) if is_ok: success += 1 if msg_id: sent_message_ids[str(cid)] = msg_id else: fail += 1 last_error = err_msg # Update Schedule supabase.table("scheduled_messages").update({ "last_sent_at": now.isoformat() }).eq("id", sched['id']).execute() # Log it try: supabase.table("admin_message_logs").insert({ "target_type": sched.get('target_type'), "message_content": msg_content, "total_recipients": len(chat_ids), "success_count": success, "fail_count": fail, "created_by": "scheduler", "sent_message_ids": sent_message_ids }).execute() except: pass print(f"Executed schedule {sched['name']}: {success} sent. Last Error: {last_error}") except Exception as e: print(f"Scheduler Error: {e}") # Start Scheduler if not scheduler.running: scheduler.add_job(check_and_send_scheduled_messages, 'interval', minutes=1) scheduler.start() # --- Routes --- @app.route('/login', methods=['GET', 'POST']) def login(): error = None if request.method == 'POST': if request.form['password'] == ADMIN_PASSWORD: session['logged_in'] = True return redirect(url_for('dashboard')) else: error = 'Invalid password' return render_template_string(LOGIN_TEMPLATE, error=error) @app.route('/logout') def logout(): session.pop('logged_in', None) return redirect(url_for('login')) @app.route('/', methods=['GET']) @login_required def dashboard(): stats = {} message_logs = [] schedules = [] users_list = [] try: # 1. Stats total_users = supabase.table("telegram_users").select("id", count="exact").execute().count active_7d = supabase.table("telegram_users").select("id", count="exact").gte( "last_active", (datetime.now() - timedelta(days=7)).isoformat() ).execute().count paid_users = supabase.table("telegram_users").select("id", count="exact").eq("tier", "paid").execute().count total_gen = supabase.table("image_generation_logs").select("id", count="exact").execute().count gen_today = supabase.table("image_generation_logs").select("id", count="exact").gte( "created_at", datetime.now().strftime('%Y-%m-%d') ).execute().count stats = { "total_users": total_users, "active_users_7d": active_7d, "paid_users": paid_users, "total_generations": total_gen, "gen_today": gen_today } # 2. Message Logs try: res_logs = supabase.table("admin_message_logs").select("*").order("sent_at", desc=True).limit(10).execute() message_logs = res_logs.data except: message_logs = [] # Table might not exist yet # 3. Schedules try: res_sched = supabase.table("scheduled_messages").select("*").order("created_at", desc=True).execute() schedules = res_sched.data except: schedules = [] # 4. Users List users_list = supabase.table("telegram_users").select("*").order("last_active", desc=True).limit(50).execute().data except Exception as e: flash(f"Error loading dashboard: {e}", "error") return render_template_string( DASHBOARD_TEMPLATE, stats=stats, message_logs=message_logs, schedules=schedules, users_list=users_list ) @app.route('/broadcast', methods=['POST']) @login_required def broadcast(): target = request.form.get('target', 'all') message = request.form.get('message', '').strip() chat_ids_input = request.form.get('chat_ids', '') send_menu = 'send_menu' in request.form if not message: flash('Message cannot be empty', 'error') return redirect(url_for('dashboard')) try: # Get recipients chat_ids = normalize_chat_ids(target, chat_ids_input) if not chat_ids: flash('No users found for target audience', 'error') return redirect(url_for('dashboard')) success_count = 0 fail_count = 0 last_error = "" sent_message_ids = {} for chat_id in chat_ids: is_ok, err_msg, msg_id = send_telegram_message_with_menu(chat_id, message, send_menu) if is_ok: success_count += 1 if msg_id: sent_message_ids[str(chat_id)] = msg_id else: fail_count += 1 last_error = err_msg # Log to DB with message_ids try: supabase.table("admin_message_logs").insert({ "target_type": target, "target_chat_ids": chat_ids_input if target == 'specific' else None, "message_content": message, "total_recipients": len(chat_ids), "success_count": success_count, "fail_count": fail_count, "created_by": "admin", "sent_message_ids": sent_message_ids }).execute() except Exception as log_err: print(f"Logging error: {log_err}") msg_text = f'Broadcast complete! Sent: {success_count}, Failed: {fail_count}' if fail_count > 0: msg_text += f" (Last Error: {last_error})" flash(msg_text, 'success' if fail_count == 0 else 'warning') except Exception as e: flash(f'Error: {str(e)}', 'error') return redirect(url_for('dashboard')) @app.route('/message//resend', methods=['POST']) @login_required def resend_message(id): """Edit already sent Telegram messages with new content.""" try: # Get new content from form new_content = request.form.get('message', '').strip() # Fetch message log res = supabase.table("admin_message_logs").select("*").eq("id", id).execute() if not res.data: flash("Message log not found", "error") return redirect(url_for('dashboard')) msg = res.data[0] sent_ids = msg.get('sent_message_ids', {}) if not new_content: # If no new content provided, use existing content new_content = msg['message_content'] if not sent_ids: flash("No message IDs stored - cannot edit (message may be too old)", "warning") return redirect(url_for('dashboard')) # Edit each sent message success = 0 fail = 0 last_error = "" for chat_id_str, message_id in sent_ids.items(): is_ok, err_msg = edit_telegram_message(int(chat_id_str), message_id, new_content) if is_ok: success += 1 else: fail += 1 last_error = err_msg # Update message content in DB supabase.table("admin_message_logs").update({ "message_content": new_content }).eq("id", id).execute() msg_text = f"Edited {success}/{len(sent_ids)} messages" if fail > 0: msg_text += f" ({fail} failed: {last_error})" flash(msg_text, "success" if fail == 0 else "warning") except Exception as e: flash(f"Error editing: {e}", "error") return redirect(url_for('dashboard')) @app.route('/message//delete', methods=['POST']) @login_required def delete_message(id): """Delete messages from Telegram and then from database.""" try: # Fetch message first to get message_ids res = supabase.table("admin_message_logs").select("*").eq("id", id).execute() if res.data: msg = res.data[0] sent_ids = msg.get('sent_message_ids', {}) # Delete from Telegram deleted = 0 failed = 0 for chat_id_str, message_id in sent_ids.items(): is_ok, _ = delete_telegram_message(int(chat_id_str), message_id) if is_ok: deleted += 1 else: failed += 1 # Delete from database supabase.table("admin_message_logs").delete().eq("id", id).execute() if sent_ids: flash(f"Deleted from Telegram: {deleted}/{len(sent_ids)}, DB log removed", "success") else: flash("DB log removed (no Telegram messages to delete)", "success") else: flash("Message log not found", "error") except Exception as e: flash(f"Error deleting: {e}", "error") return redirect(url_for('dashboard')) @app.route('/message//update', methods=['POST']) @login_required def update_message(id): """Update message content in DB only (for scheduled editing before resend).""" try: new_content = request.form.get('message') supabase.table("admin_message_logs").update({"message_content": new_content}).eq("id", id).execute() flash("Message log updated (not yet sent to Telegram)", "success") except Exception as e: flash(f"Error updating message: {e}", "error") return redirect(url_for('dashboard')) @app.route('/schedule/new', methods=['POST']) @login_required def new_schedule(): try: data = { "name": request.form['name'], "message_content": request.form['message'], "schedule_time": request.form['schedule_time'], # Format HH:MM "is_enabled": 'is_enabled' in request.form, "target_type": "all" # Default to all for now } supabase.table("scheduled_messages").insert(data).execute() flash("Schedule created!", "success") except Exception as e: flash(f"Error creating schedule: {e}", "error") return redirect(url_for('dashboard')) @app.route('/schedule//update', methods=['POST']) @login_required def update_schedule(id): try: data = { "name": request.form['name'], "message_content": request.form['message'], "schedule_time": request.form['schedule_time'], "is_enabled": 'is_enabled' in request.form } supabase.table("scheduled_messages").update(data).eq("id", id).execute() flash("Schedule updated!", "success") except Exception as e: flash(f"Error updating schedule: {e}", "error") return redirect(url_for('dashboard')) @app.route('/schedule//toggle', methods=['POST']) @login_required def toggle_schedule(id): try: # Get current status curr = supabase.table("scheduled_messages").select("is_enabled").eq("id", id).execute().data[0] new_status = not curr['is_enabled'] supabase.table("scheduled_messages").update({"is_enabled": new_status}).eq("id", id).execute() except Exception as e: flash(f"Error updating schedule: {e}", "error") return redirect(url_for('dashboard')) @app.route('/schedule//delete', methods=['POST']) @login_required def delete_schedule(id): try: supabase.table("scheduled_messages").delete().eq("id", id).execute() flash("Schedule deleted", "success") except Exception as e: flash(f"Error deleting schedule: {e}", "error") return redirect(url_for('dashboard')) if __name__ == "__main__": app.run(host="0.0.0.0", port=7860, debug=False)