Discord_Bot / main_working_backup.py
Dr-P's picture
Create main_working_backup.py
1280f6f verified
import os
import json
import time
import random
import sqlite3
import asyncio
from typing import Any, Dict, Optional
import socket
import socket
import aiohttp
import dns.resolver
from aiohttp.abc import AbstractResolver
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from nacl.signing import VerifyKey
from nacl.exceptions import BadSignatureError
# =========
# ENV VARS
# =========
DISCORD_APPLICATION_ID = os.environ["DISCORD_APPLICATION_ID"]
DISCORD_PUBLIC_KEY = os.environ["DISCORD_PUBLIC_KEY"]
DISCORD_BOT_TOKEN = os.environ["DISCORD_BOT_TOKEN"]
DISCORD_TEST_GUILD_ID = os.getenv("DISCORD_TEST_GUILD_ID")
ADMIN_REGISTER_TOKEN = os.getenv("ADMIN_REGISTER_TOKEN", "")
DISCORD_API_BASE = "https://discord.com/api/v10"
DEFAULT_TOPIC = os.getenv("DEFAULT_TOPIC", "controller vs mouse")
VOTE_SECONDS = int(os.getenv("VOTE_SECONDS", "20"))
# Use persistent /data if available at runtime; otherwise local file.
DB_PATH = "/data/battle.db" if os.path.isdir("/data") else "battle.db"
app = FastAPI(title="Discord Trash Talk Arena")
verify_key = VerifyKey(bytes.fromhex(DISCORD_PUBLIC_KEY))
discord_session: aiohttp.ClientSession | None = None
# Single shared connection is fine for this MVP in one worker.
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.row_factory = sqlite3.Row
class DnsPythonResolver(AbstractResolver):
def __init__(self, nameservers=None):
self._resolver = dns.resolver.Resolver(configure=False)
self._resolver.nameservers = nameservers or ["8.8.8.8", "1.1.1.1"]
async def resolve(self, host, port=0, family=socket.AF_INET):
def lookup():
if family == socket.AF_INET6:
answer = self._resolver.resolve(host, "AAAA")
return [
{
"hostname": host,
"host": rdata.address,
"port": port,
"family": socket.AF_INET6,
"proto": 0,
"flags": socket.AI_NUMERICHOST,
}
for rdata in answer
]
# Default to IPv4 because Discord is fine over IPv4 and this is simpler.
answer = self._resolver.resolve(host, "A")
return [
{
"hostname": host,
"host": rdata.address,
"port": port,
"family": socket.AF_INET,
"proto": 0,
"flags": socket.AI_NUMERICHOST,
}
for rdata in answer
]
return await asyncio.to_thread(lookup)
async def close(self):
return None
# =========
# DB SETUP
# =========
def init_db() -> None:
with conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS channel_state (
channel_id TEXT PRIMARY KEY,
active INTEGER NOT NULL DEFAULT 0,
paused INTEGER NOT NULL DEFAULT 0,
topic TEXT NOT NULL DEFAULT '',
coach_a TEXT NOT NULL DEFAULT '',
coach_b TEXT NOT NULL DEFAULT '',
total_rounds INTEGER NOT NULL DEFAULT 3,
current_round INTEGER NOT NULL DEFAULT 0,
updated_at INTEGER NOT NULL DEFAULT 0
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS rounds (
channel_id TEXT NOT NULL,
round_no INTEGER NOT NULL,
a_text TEXT NOT NULL,
b_text TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'open',
PRIMARY KEY (channel_id, round_no)
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS votes (
channel_id TEXT NOT NULL,
round_no INTEGER NOT NULL,
user_id TEXT NOT NULL,
winner TEXT NOT NULL,
PRIMARY KEY (channel_id, round_no, user_id)
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS wins (
bot TEXT NOT NULL,
topic TEXT NOT NULL,
text TEXT NOT NULL,
win_count INTEGER NOT NULL DEFAULT 1,
PRIMARY KEY (bot, topic, text)
)
"""
)
def ensure_channel_state(channel_id: str) -> None:
with conn:
conn.execute(
"""
INSERT OR IGNORE INTO channel_state (
channel_id, active, paused, topic, coach_a, coach_b, total_rounds, current_round, updated_at
)
VALUES (?, 0, 0, ?, '', '', 3, 0, ?)
""",
(channel_id, DEFAULT_TOPIC, int(time.time())),
)
def get_state(channel_id: str) -> Dict[str, Any]:
ensure_channel_state(channel_id)
row = conn.execute(
"SELECT * FROM channel_state WHERE channel_id = ?",
(channel_id,),
).fetchone()
return dict(row)
def update_state(channel_id: str, **kwargs: Any) -> None:
if not kwargs:
return
ensure_channel_state(channel_id)
kwargs["updated_at"] = int(time.time())
columns = ", ".join([f"{k} = ?" for k in kwargs.keys()])
values = list(kwargs.values()) + [channel_id]
with conn:
conn.execute(
f"UPDATE channel_state SET {columns} WHERE channel_id = ?",
values,
)
def set_round(channel_id: str, round_no: int, a_text: str, b_text: str, status: str = "open") -> None:
with conn:
conn.execute(
"""
INSERT OR REPLACE INTO rounds (channel_id, round_no, a_text, b_text, status)
VALUES (?, ?, ?, ?, ?)
""",
(channel_id, round_no, a_text, b_text, status),
)
def get_round(channel_id: str, round_no: int) -> Optional[Dict[str, Any]]:
row = conn.execute(
"SELECT * FROM rounds WHERE channel_id = ? AND round_no = ?",
(channel_id, round_no),
).fetchone()
return dict(row) if row else None
def close_round(channel_id: str, round_no: int) -> None:
with conn:
conn.execute(
"UPDATE rounds SET status = 'closed' WHERE channel_id = ? AND round_no = ?",
(channel_id, round_no),
)
def save_vote(channel_id: str, round_no: int, user_id: str, winner: str) -> None:
with conn:
conn.execute(
"""
INSERT INTO votes (channel_id, round_no, user_id, winner)
VALUES (?, ?, ?, ?)
ON CONFLICT(channel_id, round_no, user_id)
DO UPDATE SET winner = excluded.winner
""",
(channel_id, round_no, user_id, winner),
)
def tally_votes(channel_id: str, round_no: int) -> Dict[str, int]:
rows = conn.execute(
"""
SELECT winner, COUNT(*) AS c
FROM votes
WHERE channel_id = ? AND round_no = ?
GROUP BY winner
""",
(channel_id, round_no),
).fetchall()
counts = {"A": 0, "B": 0}
for row in rows:
counts[row["winner"]] = row["c"]
return counts
def save_winning_line(bot_name: str, topic: str, text: str) -> None:
with conn:
conn.execute(
"""
INSERT INTO wins (bot, topic, text, win_count)
VALUES (?, ?, ?, 1)
ON CONFLICT(bot, topic, text)
DO UPDATE SET win_count = win_count + 1
""",
(bot_name, topic, text),
)
def top_examples(bot_name: str, topic: str, limit: int = 2) -> list[str]:
rows = conn.execute(
"""
SELECT text
FROM wins
WHERE bot = ? AND topic = ?
ORDER BY win_count DESC
LIMIT ?
""",
(bot_name, topic, limit),
).fetchall()
return [row["text"] for row in rows]
# =========
# DISCORD HELPERS
# =========
async def discord_request(method: str, path: str, json_body: Optional[dict] = None) -> Optional[dict]:
global discord_session
if discord_session is None:
raise RuntimeError("Discord session is not initialized")
headers = {
"Authorization": f"Bot {DISCORD_BOT_TOKEN}",
"Content-Type": "application/json",
}
url = f"{DISCORD_API_BASE}{path}"
async with discord_session.request(method, url, headers=headers, json=json_body) as response:
text = await response.text()
if response.status >= 400:
raise RuntimeError(f"Discord API error {response.status}: {text}")
if text.strip():
try:
return json.loads(text)
except json.JSONDecodeError:
return {"raw": text}
return None
async def send_channel_message(channel_id: str, content: str, components: Optional[list] = None) -> None:
payload: Dict[str, Any] = {
"content": content.replace("@", ""),
"allowed_mentions": {"parse": []},
}
if components:
payload["components"] = components
await discord_request("POST", f"/channels/{channel_id}/messages", payload)
def vote_components(channel_id: str, round_no: int) -> list:
return [
{
"type": 1, # action row
"components": [
{
"type": 2, # button
"style": 1, # blurple
"label": "🔥 Vote Bot A",
"custom_id": f"vote|A|{channel_id}|{round_no}",
},
{
"type": 2,
"style": 2, # gray
"label": "💀 Vote Bot B",
"custom_id": f"vote|B|{channel_id}|{round_no}",
},
],
}
]
# =========
# COMMAND REGISTRATION
# =========
async def register_commands() -> dict:
commands = [
{
"name": "fight",
"description": "Start a trash talk battle in this channel",
"options": [
{
"name": "rounds",
"description": "Number of rounds (1-5)",
"type": 4,
"required": False,
"min_value": 1,
"max_value": 5,
}
],
},
{
"name": "pause",
"description": "Pause the current battle after the active round finishes voting",
},
{
"name": "continue",
"description": "Resume the paused battle",
},
{
"name": "end",
"description": "End the current battle in this channel",
},
{
"name": "topic",
"description": "Set the debate topic for this channel",
"options": [
{
"name": "value",
"description": "Topic text",
"type": 3,
"required": True,
}
],
},
{
"name": "coach_a",
"description": "Set coaching for Bot A",
"options": [
{
"name": "value",
"description": "Coaching text",
"type": 3,
"required": True,
}
],
},
{
"name": "coach_b",
"description": "Set coaching for Bot B",
"options": [
{
"name": "value",
"description": "Coaching text",
"type": 3,
"required": True,
}
],
},
]
if DISCORD_TEST_GUILD_ID:
path = f"/applications/{DISCORD_APPLICATION_ID}/guilds/{DISCORD_TEST_GUILD_ID}/commands"
else:
path = f"/applications/{DISCORD_APPLICATION_ID}/commands"
result = await discord_request("PUT", path, commands)
print("Registered commands:", json.dumps(result, indent=2))
return {"ok": True, "registered_count": len(result or []), "scope": path}
# =========
# STARTUP
# =========
@app.on_event("startup")
async def startup_event() -> None:
global discord_session
init_db()
resolver = DnsPythonResolver(nameservers=["8.8.8.8", "1.1.1.1"])
connector = aiohttp.TCPConnector(
resolver=resolver,
use_dns_cache=False,
ssl=True,
family=socket.AF_INET,
)
discord_session = aiohttp.ClientSession(connector=connector)
for attempt in range(1, 6):
try:
result = await register_commands()
print(f"Command registration succeeded on attempt {attempt}: {result}")
break
except Exception as exc:
print(f"Command registration attempt {attempt} failed: {exc}")
if attempt < 5:
await asyncio.sleep(5)
@app.on_event("shutdown")
async def shutdown_event() -> None:
global discord_session
if discord_session is not None:
await discord_session.close()
discord_session = None
# =========
# CORE GENERATOR
# Replace this function later with your real local model runtime.
# =========
def coach_style(text: str) -> str:
t = text.lower()
if "smart" in t or "clever" in t:
return "smart"
if "absurd" in t or "weird" in t:
return "absurd"
if "aggressive" in t or "toxic" in t:
return "aggressive"
return "default"
def generate_line(bot_name: str, topic: str, coach: str, opponent_line: str = "") -> str:
memories = top_examples(bot_name, topic)
if memories and random.random() < 0.25:
return memories[0]
style = coach_style(coach)
pool_default_a = [
f"{topic}? You debate like your headset is running on low battery.",
f"Your {topic} take has the survival instincts of a flashbang.",
f"I've seen tutorial bots sound tougher than your {topic} opinion.",
f"That {topic} take just got benched by common sense.",
]
pool_smart_a = [
f"Your {topic} argument has the structural integrity of lag.",
f"Even patch notes have more coherence than your {topic} take.",
f"Your {topic} logic got spawn-killed by basic reasoning.",
]
pool_absurd_a = [
f"Your {topic} take wears socks in the shower and calls it meta.",
f"That {topic} opinion sounds like it was forged in microwave ranked.",
f"Your {topic} argument smells like expired gamer fuel.",
]
pool_aggressive_a = [
f"Your {topic} take is getting folded like a bad loadout.",
f"I'd roast your {topic} take softer, but it started it.",
f"Your {topic} opinion just rage-quit mid sentence.",
]
pool_default_b = [
f"Funny speech. Your {topic} take still loses on contact.",
f"You talk loud for someone getting outplayed by punctuation.",
f"That roast had ping spikes; try again with better aim.",
f"You sound like confidence with no patch notes.",
]
pool_smart_b = [
f"Counterpoint: your {topic} logic has negative KD.",
f"Your argument collapses faster than your map awareness.",
f"You're debating {topic} like evidence is optional DLC.",
]
pool_absurd_b = [
f"Your roast arrived in clown armor and still missed.",
f"That comeback moonwalked into traffic and blamed the minimap.",
f"You sound like a loading screen with delusions.",
]
pool_aggressive_b = [
f"All that energy and you still got diffed by one sentence.",
f"Your comeback tripped over spawn protection.",
f"You swing hard for someone getting read like patch notes.",
]
if bot_name == "A":
if style == "smart":
return random.choice(pool_smart_a)
if style == "absurd":
return random.choice(pool_absurd_a)
if style == "aggressive":
return random.choice(pool_aggressive_a)
return random.choice(pool_default_a)
if style == "smart":
return random.choice(pool_smart_b)
if style == "absurd":
return random.choice(pool_absurd_b)
if style == "aggressive":
return random.choice(pool_aggressive_b)
return random.choice(pool_default_b)
# =========
# BATTLE LOOP
# =========
async def run_battle(channel_id: str, rounds: int) -> None:
state = get_state(channel_id)
topic = state["topic"]
await send_channel_message(
channel_id,
f"💥 **Battle starting**\n"
f"Topic: **{topic}**\n"
f"Rounds: **{rounds}**\n"
f"Use `/pause`, `/continue`, `/end`, `/coach_a`, `/coach_b`, or `/topic` between rounds."
)
previous_a = ""
for round_no in range(1, rounds + 1):
state = get_state(channel_id)
if not state["active"]:
await send_channel_message(channel_id, "🛑 Battle ended.")
return
while state["paused"]:
await asyncio.sleep(1)
state = get_state(channel_id)
if not state["active"]:
await send_channel_message(channel_id, "🛑 Battle ended.")
return
topic = state["topic"]
coach_a = state["coach_a"]
coach_b = state["coach_b"]
update_state(channel_id, current_round=round_no)
a_text = generate_line("A", topic, coach_a)
b_text = generate_line("B", topic, coach_b, opponent_line=a_text)
previous_a = a_text
set_round(channel_id, round_no, a_text, b_text, status="open")
await send_channel_message(
channel_id,
(
f"## Round {round_no}\n"
f"**Topic:** {topic}\n\n"
f"**Bot A:** {a_text}\n"
f"**Bot B:** {b_text}\n\n"
f"Vote below. Voting closes in **{VOTE_SECONDS} seconds**."
),
components=vote_components(channel_id, round_no),
)
await asyncio.sleep(VOTE_SECONDS)
# The round may have been ended manually.
state = get_state(channel_id)
if not state["active"]:
close_round(channel_id, round_no)
await send_channel_message(channel_id, "🛑 Battle ended.")
return
close_round(channel_id, round_no)
counts = tally_votes(channel_id, round_no)
if counts["A"] > counts["B"]:
winner = "A"
save_winning_line("A", topic, a_text)
result_line = f"🏆 **Round {round_no} winner: Bot A** ({counts['A']} - {counts['B']})"
elif counts["B"] > counts["A"]:
winner = "B"
save_winning_line("B", topic, b_text)
result_line = f"🏆 **Round {round_no} winner: Bot B** ({counts['B']} - {counts['A']})"
else:
winner = "TIE"
result_line = f"🤝 **Round {round_no} is a tie** ({counts['A']} - {counts['B']})"
await send_channel_message(channel_id, result_line)
if round_no < rounds:
await send_channel_message(channel_id, "⏭️ Next round starting soon...")
update_state(channel_id, active=0, paused=0)
await send_channel_message(channel_id, "✅ **Battle finished.**")
# =========
# DISCORD INTERACTION PARSING
# =========
def verify_discord_request(signature: str, timestamp: str, body: bytes) -> None:
try:
verify_key.verify(timestamp.encode() + body, bytes.fromhex(signature))
except BadSignatureError as exc:
raise HTTPException(status_code=401, detail="Invalid request signature") from exc
def interaction_user_id(payload: dict) -> str:
if "member" in payload and "user" in payload["member"]:
return payload["member"]["user"]["id"]
return payload["user"]["id"]
def interaction_channel_id(payload: dict) -> str:
if "channel" in payload and "id" in payload["channel"]:
return payload["channel"]["id"]
return payload["channel_id"]
def options_to_dict(data: dict) -> dict:
result = {}
for opt in data.get("options", []):
result[opt["name"]] = opt["value"]
return result
def msg_response(content: str, ephemeral: bool = True) -> JSONResponse:
payload = {
"type": 4,
"data": {
"content": content,
"allowed_mentions": {"parse": []},
},
}
if ephemeral:
payload["data"]["flags"] = 64
return JSONResponse(payload)
# =========
# ROUTES
# =========
@app.get("/")
async def root() -> dict:
return {"ok": True, "service": "discord-trash-talk-arena"}
@app.get("/health")
async def health() -> dict:
return {
"ok": True,
"db_path": DB_PATH,
"guild_commands_target": DISCORD_TEST_GUILD_ID or "global",
}
@app.get("/dnscheck")
async def dnscheck():
try:
resolver = DnsPythonResolver(nameservers=["8.8.8.8", "1.1.1.1"])
result = await resolver.resolve("discord.com", 443, socket.AF_INET)
return {
"ok": True,
"resolved_count": len(result),
"sample": result[0] if result else None
}
except Exception as e:
return {
"ok": False,
"error_type": type(e).__name__,
"error": str(e)
}
@app.get("/discordcheck")
async def discordcheck():
global discord_session
if discord_session is None:
return {"ok": False, "error": "discord_session not initialized"}
try:
async with discord_session.get("https://discord.com/api/v10/gateway") as response:
text = await response.text()
return {
"ok": True,
"status": response.status,
"body_preview": text[:300]
}
except Exception as e:
return {
"ok": False,
"error_type": type(e).__name__,
"error": str(e)
}
@app.get("/register")
async def manual_register(token: str):
if not ADMIN_REGISTER_TOKEN:
raise HTTPException(status_code=500, detail="ADMIN_REGISTER_TOKEN is not configured")
if token != ADMIN_REGISTER_TOKEN:
raise HTTPException(status_code=403, detail="Forbidden")
result = await register_commands()
return result
@app.post("/interactions")
async def interactions(request: Request):
raw_body = await request.body()
signature = request.headers.get("X-Signature-Ed25519")
timestamp = request.headers.get("X-Signature-Timestamp")
if not signature or not timestamp:
raise HTTPException(status_code=401, detail="Missing signature headers")
verify_discord_request(signature, timestamp, raw_body)
payload = json.loads(raw_body.decode("utf-8"))
# Discord PING
if payload["type"] == 1:
return JSONResponse({"type": 1})
# Slash commands
if payload["type"] == 2:
data = payload["data"]
command_name = data["name"]
channel_id = interaction_channel_id(payload)
ensure_channel_state(channel_id)
if command_name == "fight":
state = get_state(channel_id)
if state["active"]:
return msg_response("A battle is already running in this channel.")
opts = options_to_dict(data)
rounds = int(opts.get("rounds", 3))
rounds = max(1, min(rounds, 5))
update_state(
channel_id,
active=1,
paused=0,
total_rounds=rounds,
current_round=0,
)
asyncio.create_task(run_battle(channel_id, rounds))
return msg_response(f"Starting a {rounds}-round battle in this channel.", ephemeral=False)
if command_name == "pause":
state = get_state(channel_id)
if not state["active"]:
return msg_response("No active battle in this channel.")
update_state(channel_id, paused=1)
return msg_response("Battle paused. The current round vote window will finish, then the loop will wait.")
if command_name == "continue":
state = get_state(channel_id)
if not state["active"]:
return msg_response("No active battle in this channel.")
update_state(channel_id, paused=0)
return msg_response("Battle resumed.")
if command_name == "end":
state = get_state(channel_id)
if not state["active"]:
return msg_response("No active battle in this channel.")
update_state(channel_id, active=0, paused=0)
return msg_response("Battle will stop.")
if command_name == "topic":
value = options_to_dict(data)["value"].strip()
update_state(channel_id, topic=value)
return msg_response(f"Topic set to: {value}")
if command_name == "coach_a":
value = options_to_dict(data)["value"].strip()
update_state(channel_id, coach_a=value)
return msg_response(f"Bot A coaching updated: {value}")
if command_name == "coach_b":
value = options_to_dict(data)["value"].strip()
update_state(channel_id, coach_b=value)
return msg_response(f"Bot B coaching updated: {value}")
return msg_response(f"Unknown command: {command_name}")
# Button clicks
if payload["type"] == 3:
custom_id = payload["data"]["custom_id"]
parts = custom_id.split("|")
if len(parts) != 4 or parts[0] != "vote":
return msg_response("Unknown button action.")
_, winner, channel_id, round_no_text = parts
round_no = int(round_no_text)
user_id = interaction_user_id(payload)
round_row = get_round(channel_id, round_no)
if not round_row:
return msg_response("That round does not exist.")
if round_row["status"] != "open":
return msg_response("Voting for that round is closed.")
save_vote(channel_id, round_no, user_id, winner)
return msg_response(f"Vote recorded for Bot {winner}.", ephemeral=True)
return msg_response("Unsupported interaction type.")