Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Kasper Discord Bot | |
| Single-file Discord bot with: | |
| - Prefix commands + slash commands (hybrid where practical) | |
| - Welcome and instructions embeds | |
| - Button-based self-role toggle for role name "Kasper" | |
| - Multi-server JSON config persistence | |
| - Retirement / contribution calculators | |
| - Watchlist management | |
| - Placeholder stock alert system and daily task loop | |
| Setup: | |
| 1. Copy .env.example to .env and fill values, or paste values directly below. | |
| 2. Install requirements.txt | |
| 3. Run: python bot.py | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import logging | |
| import math | |
| import os | |
| from dataclasses import dataclass, asdict | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import discord | |
| from discord import app_commands | |
| from discord.ext import commands, tasks | |
| from dotenv import load_dotenv | |
| # ========================= | |
| # Editable placeholders | |
| # ========================= | |
| BOT_NAME = "Kasper" | |
| DEFAULT_ROLE_NAME = "Kasper" | |
| COMMAND_PREFIX = "!" | |
| # Either use .env or paste directly here. | |
| DISCORD_BOT_TOKEN = "PASTE_BOT_TOKEN_HERE" | |
| DISCORD_APPLICATION_ID = "PASTE_APPLICATION_ID_HERE" | |
| # Optional default IDs. You can override per server with commands. | |
| DEFAULT_ALERT_CHANNEL_ID = 0 | |
| DEFAULT_ALERT_ROLE_ID = 0 | |
| DEFAULT_ADMIN_ROLE_IDS: List[int] = [] | |
| # Local storage | |
| DATA_DIR = Path("./kasper_data") | |
| CONFIG_PATH = DATA_DIR / "guild_config.json" | |
| WATCHLIST_PATH = DATA_DIR / "watchlists.json" | |
| ALERT_LOG_PATH = DATA_DIR / "alert_log.json" | |
| # ========================= | |
| # Environment / logging | |
| # ========================= | |
| load_dotenv() | |
| TOKEN = os.getenv("DISCORD_BOT_TOKEN", DISCORD_BOT_TOKEN) | |
| APP_ID_RAW = os.getenv("DISCORD_APPLICATION_ID", DISCORD_APPLICATION_ID) | |
| APPLICATION_ID = int(APP_ID_RAW) if APP_ID_RAW.isdigit() else None | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", | |
| ) | |
| log = logging.getLogger(BOT_NAME.lower()) | |
| DATA_DIR.mkdir(parents=True, exist_ok=True) | |
| # ========================= | |
| # JSON storage helpers | |
| # ========================= | |
| def _load_json(path: Path, default: Any) -> Any: | |
| if not path.exists(): | |
| return default | |
| try: | |
| return json.loads(path.read_text(encoding="utf-8")) | |
| except Exception as exc: | |
| log.warning("Failed to load %s: %s", path, exc) | |
| return default | |
| def _save_json(path: Path, data: Any) -> None: | |
| path.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8") | |
| guild_config: Dict[str, Dict[str, Any]] = _load_json(CONFIG_PATH, {}) | |
| watchlists: Dict[str, List[str]] = _load_json(WATCHLIST_PATH, {}) | |
| alert_log: List[Dict[str, Any]] = _load_json(ALERT_LOG_PATH, []) | |
| def save_all() -> None: | |
| _save_json(CONFIG_PATH, guild_config) | |
| _save_json(WATCHLIST_PATH, watchlists) | |
| _save_json(ALERT_LOG_PATH, alert_log) | |
| # ========================= | |
| # Config helpers | |
| # ========================= | |
| def get_guild_key(guild_id: int) -> str: | |
| return str(guild_id) | |
| def get_guild_config(guild_id: int) -> Dict[str, Any]: | |
| key = get_guild_key(guild_id) | |
| if key not in guild_config: | |
| guild_config[key] = { | |
| "alert_channel_id": DEFAULT_ALERT_CHANNEL_ID, | |
| "alert_role_id": DEFAULT_ALERT_ROLE_ID, | |
| "admin_role_ids": DEFAULT_ADMIN_ROLE_IDS.copy(), | |
| "role_name": DEFAULT_ROLE_NAME, | |
| "scanner_enabled": True, | |
| "last_scanner_run": None, | |
| } | |
| save_all() | |
| return guild_config[key] | |
| def get_watchlist(guild_id: int) -> List[str]: | |
| key = get_guild_key(guild_id) | |
| if key not in watchlists: | |
| watchlists[key] = [] | |
| save_all() | |
| return watchlists[key] | |
| def user_is_adminish(member: discord.Member, cfg: Dict[str, Any]) -> bool: | |
| if member.guild_permissions.administrator: | |
| return True | |
| admin_role_ids = set(cfg.get("admin_role_ids", [])) | |
| if admin_role_ids and any(role.id in admin_role_ids for role in member.roles): | |
| return True | |
| return False | |
| def find_alert_role(guild: discord.Guild, cfg: Dict[str, Any]) -> Optional[discord.Role]: | |
| role_id = cfg.get("alert_role_id", 0) | |
| if role_id: | |
| role = guild.get_role(int(role_id)) | |
| if role: | |
| return role | |
| role_name = cfg.get("role_name", DEFAULT_ROLE_NAME) | |
| return discord.utils.get(guild.roles, name=role_name) | |
| def find_alert_channel(guild: discord.Guild, cfg: Dict[str, Any]) -> Optional[discord.TextChannel]: | |
| channel_id = cfg.get("alert_channel_id", 0) | |
| if channel_id: | |
| channel = guild.get_channel(int(channel_id)) | |
| if isinstance(channel, discord.TextChannel): | |
| return channel | |
| return None | |
| # ========================= | |
| # Calculator logic | |
| # ========================= | |
| class Phase: | |
| years: int | |
| monthly_contribution: float | |
| lump_sum_at_start: float = 0.0 | |
| label: str = "" | |
| class ScenarioResult: | |
| annual_return: float | |
| ending_value: float | |
| total_contributed: float | |
| profit: float | |
| annual_snapshots: List[Tuple[int, float]] | |
| DEFAULT_RETURNS = [0.10, 0.14, 0.18] | |
| def fmt_money(v: float) -> str: | |
| return f"${v:,.2f}" | |
| def fmt_pct(v: float) -> str: | |
| return f"{v*100:.2f}%" | |
| def simulate_portfolio(initial_investment: float, phases: List[Phase], annual_return: float) -> ScenarioResult: | |
| monthly_rate = (1 + annual_return) ** (1 / 12) - 1 | |
| balance = float(initial_investment) | |
| total_contributed = float(initial_investment) | |
| annual_snapshots: List[Tuple[int, float]] = [] | |
| current_year = 0 | |
| for phase in phases: | |
| if phase.lump_sum_at_start: | |
| balance += phase.lump_sum_at_start | |
| total_contributed += phase.lump_sum_at_start | |
| for month_idx in range(1, phase.years * 12 + 1): | |
| balance += phase.monthly_contribution | |
| total_contributed += phase.monthly_contribution | |
| balance *= (1 + monthly_rate) | |
| if month_idx % 12 == 0: | |
| current_year += 1 | |
| annual_snapshots.append((current_year, balance)) | |
| return ScenarioResult( | |
| annual_return=annual_return, | |
| ending_value=balance, | |
| total_contributed=total_contributed, | |
| profit=balance - total_contributed, | |
| annual_snapshots=annual_snapshots, | |
| ) | |
| def future_value_constant_monthly(initial_investment: float, monthly_contribution: float, years: int, annual_return: float) -> float: | |
| monthly_rate = (1 + annual_return) ** (1 / 12) - 1 | |
| balance = initial_investment | |
| for _ in range(years * 12): | |
| balance += monthly_contribution | |
| balance *= (1 + monthly_rate) | |
| return balance | |
| def required_monthly_contribution(target_value: float, years: int, annual_return: float, initial_investment: float = 0.0) -> float: | |
| low, high = 0.0, max(target_value, 1.0) | |
| for _ in range(200): | |
| mid = (low + high) / 2 | |
| fv = future_value_constant_monthly(initial_investment, mid, years, annual_return) | |
| if fv >= target_value: | |
| high = mid | |
| else: | |
| low = mid | |
| return high | |
| def parse_returns(raw: str) -> List[float]: | |
| values: List[float] = [] | |
| for part in raw.split(","): | |
| part = part.strip().replace("%", "") | |
| if not part: | |
| continue | |
| values.append(float(part) / 100) | |
| return values or DEFAULT_RETURNS | |
| def build_projection_embed(initial: float, phases: List[Phase], returns: List[float]) -> discord.Embed: | |
| embed = discord.Embed( | |
| title=f"{BOT_NAME} Projection Results", | |
| description="Scenario modeling using monthly compounding and phased contributions.", | |
| color=discord.Color.blurple(), | |
| timestamp=datetime.now(timezone.utc), | |
| ) | |
| phase_lines = [] | |
| total_years = 0 | |
| for idx, phase in enumerate(phases, start=1): | |
| total_years += phase.years | |
| label = f" — {phase.label}" if phase.label else "" | |
| phase_lines.append( | |
| f"**Phase {idx}{label}**\n" | |
| f"Years: {phase.years}\n" | |
| f"Monthly: {fmt_money(phase.monthly_contribution)}\n" | |
| f"Lump at start: {fmt_money(phase.lump_sum_at_start)}" | |
| ) | |
| embed.add_field(name="Inputs", value=f"Initial: {fmt_money(initial)}\nTotal years: {total_years}", inline=False) | |
| embed.add_field(name="Phases", value="\n\n".join(phase_lines)[:1024], inline=False) | |
| result_lines = [] | |
| for annual_return in returns: | |
| result = simulate_portfolio(initial, phases, annual_return) | |
| gain_pct = (result.profit / result.total_contributed) if result.total_contributed else 0 | |
| result_lines.append( | |
| f"**{fmt_pct(annual_return)}** → End: {fmt_money(result.ending_value)} | " | |
| f"Contributed: {fmt_money(result.total_contributed)} | Profit: {fmt_money(result.profit)} | Gain: {fmt_pct(gain_pct)}" | |
| ) | |
| embed.add_field(name="Results", value="\n".join(result_lines)[:1024], inline=False) | |
| embed.set_footer(text=f"{BOT_NAME} calculator") | |
| return embed | |
| def build_target_embed(target: float, years: int, initial: float, returns: List[float]) -> discord.Embed: | |
| embed = discord.Embed( | |
| title=f"{BOT_NAME} Required Contribution Calculator", | |
| description="Required constant monthly contribution to hit the selected target.", | |
| color=discord.Color.green(), | |
| timestamp=datetime.now(timezone.utc), | |
| ) | |
| embed.add_field(name="Inputs", value=f"Target: {fmt_money(target)}\nYears: {years}\nInitial: {fmt_money(initial)}", inline=False) | |
| lines = [] | |
| for annual_return in returns: | |
| monthly = required_monthly_contribution(target, years, annual_return, initial) | |
| lines.append(f"**{fmt_pct(annual_return)}** → {fmt_money(monthly)} / month") | |
| embed.add_field(name="Required Monthly Contribution", value="\n".join(lines), inline=False) | |
| embed.set_footer(text=f"{BOT_NAME} calculator") | |
| return embed | |
| # ========================= | |
| # UI Views / Modals | |
| # ========================= | |
| class RoleToggleView(discord.ui.View): | |
| def __init__(self): | |
| super().__init__(timeout=None) | |
| async def toggle_role(self, interaction: discord.Interaction, button: discord.ui.Button) -> None: | |
| if interaction.guild is None or not isinstance(interaction.user, discord.Member): | |
| await interaction.response.send_message("This button only works inside a server.", ephemeral=True) | |
| return | |
| cfg = get_guild_config(interaction.guild.id) | |
| role = find_alert_role(interaction.guild, cfg) | |
| if role is None: | |
| await interaction.response.send_message( | |
| f"I couldn't find the **{cfg.get('role_name', DEFAULT_ROLE_NAME)}** role in this server. " | |
| f"An admin can run `/setalertroleid` or create the role first.", | |
| ephemeral=True, | |
| ) | |
| return | |
| member = interaction.user | |
| try: | |
| if role in member.roles: | |
| await member.remove_roles(role, reason=f"{BOT_NAME} self-role toggle") | |
| await interaction.response.send_message(f"Removed **{role.name}** from you.", ephemeral=True) | |
| else: | |
| await member.add_roles(role, reason=f"{BOT_NAME} self-role toggle") | |
| await interaction.response.send_message(f"Added **{role.name}** to you.", ephemeral=True) | |
| except discord.Forbidden: | |
| await interaction.response.send_message( | |
| "I do not have permission to manage that role. Move my role above the Kasper role and grant Manage Roles.", | |
| ephemeral=True, | |
| ) | |
| class ProjectionModal(discord.ui.Modal, title="Kasper Projection Calculator"): | |
| initial = discord.ui.TextInput(label="Initial investment", placeholder="1000", default="1000") | |
| phase1_years = discord.ui.TextInput(label="Phase 1 years", placeholder="5", default="5") | |
| phase1_monthly = discord.ui.TextInput(label="Phase 1 monthly contribution", placeholder="250", default="250") | |
| phase2_years = discord.ui.TextInput(label="Phase 2 years", placeholder="5", default="5", required=False) | |
| phase2_monthly = discord.ui.TextInput(label="Phase 2 monthly contribution", placeholder="500", default="500", required=False) | |
| async def on_submit(self, interaction: discord.Interaction) -> None: | |
| try: | |
| initial = float(str(self.initial)) | |
| p1y = int(str(self.phase1_years)) | |
| p1m = float(str(self.phase1_monthly)) | |
| phases = [Phase(years=p1y, monthly_contribution=p1m, label="Phase 1")] | |
| if str(self.phase2_years).strip() and str(self.phase2_monthly).strip(): | |
| p2y = int(str(self.phase2_years)) | |
| p2m = float(str(self.phase2_monthly)) | |
| phases.append(Phase(years=p2y, monthly_contribution=p2m, label="Phase 2")) | |
| embed = build_projection_embed(initial, phases, DEFAULT_RETURNS) | |
| await interaction.response.send_message(embed=embed, ephemeral=True) | |
| except ValueError: | |
| await interaction.response.send_message("Invalid numeric input. Please use numbers only.", ephemeral=True) | |
| class TargetModal(discord.ui.Modal, title="Kasper Target Calculator"): | |
| target = discord.ui.TextInput(label="Target portfolio value", placeholder="2000000", default="2000000") | |
| years = discord.ui.TextInput(label="Years to invest", placeholder="30", default="30") | |
| initial = discord.ui.TextInput(label="Initial investment", placeholder="0", default="0", required=False) | |
| returns = discord.ui.TextInput(label="Return scenarios (%)", placeholder="10,14,18", default="10,14,18", required=False) | |
| async def on_submit(self, interaction: discord.Interaction) -> None: | |
| try: | |
| target = float(str(self.target)) | |
| years = int(str(self.years)) | |
| initial = float(str(self.initial) or 0) | |
| returns = parse_returns(str(self.returns) or "10,14,18") | |
| embed = build_target_embed(target, years, initial, returns) | |
| await interaction.response.send_message(embed=embed, ephemeral=True) | |
| except ValueError: | |
| await interaction.response.send_message("Invalid numeric input. Please use numbers only.", ephemeral=True) | |
| class CalculatorHubView(discord.ui.View): | |
| def __init__(self): | |
| super().__init__(timeout=300) | |
| async def projection(self, interaction: discord.Interaction, button: discord.ui.Button) -> None: | |
| await interaction.response.send_modal(ProjectionModal()) | |
| async def target(self, interaction: discord.Interaction, button: discord.ui.Button) -> None: | |
| await interaction.response.send_modal(TargetModal()) | |
| # ========================= | |
| # Bot setup | |
| # ========================= | |
| intents = discord.Intents.default() | |
| intents.guilds = True | |
| intents.members = True | |
| intents.message_content = True | |
| bot = commands.Bot( | |
| command_prefix=COMMAND_PREFIX, | |
| intents=intents, | |
| application_id=APPLICATION_ID, | |
| help_command=None, | |
| ) | |
| # ========================= | |
| # Embeds / text builders | |
| # ========================= | |
| def welcome_embed() -> discord.Embed: | |
| embed = discord.Embed( | |
| title=f"Welcome to {BOT_NAME}", | |
| description=( | |
| f"**{BOT_NAME}** is a long-term investing and retirement-planning Discord bot built to help users " | |
| "analyze contribution plans, compare outcome scenarios, manage stock watchlists, and receive structured alerts." | |
| ), | |
| color=discord.Color.gold(), | |
| timestamp=datetime.now(timezone.utc), | |
| ) | |
| embed.add_field( | |
| name="Overarching Goal", | |
| value=( | |
| "Help users build a disciplined long-term investing process with retirement-focused scenario planning, " | |
| "watchlist tracking, and explainable alert workflows." | |
| ), | |
| inline=False, | |
| ) | |
| embed.add_field( | |
| name="Core Features", | |
| value=( | |
| "• Portfolio projection calculator\n" | |
| "• Target contribution calculator\n" | |
| "• Multi-server Kasper alert role toggle\n" | |
| "• Watchlist management\n" | |
| "• Alert channel / role configuration\n" | |
| "• Placeholder daily scanner loop for future stock intelligence\n" | |
| "• Clean embeds, slash commands, and prefix commands" | |
| ), | |
| inline=False, | |
| ) | |
| embed.add_field( | |
| name="Best Use", | |
| value=( | |
| "Use Kasper to model retirement contribution plans, manage compounder watchlists, and later plug in a deeper stock-scoring system." | |
| ), | |
| inline=False, | |
| ) | |
| embed.set_footer(text=f"Use /instructions or !instructions to see the full command list.") | |
| return embed | |
| def instructions_embed() -> discord.Embed: | |
| embed = discord.Embed( | |
| title=f"{BOT_NAME} Instructions", | |
| description="Every major command and what it does.", | |
| color=discord.Color.blurple(), | |
| timestamp=datetime.now(timezone.utc), | |
| ) | |
| cmd_text = ( | |
| "**Public commands**\n" | |
| "`/welcome` or `!welcome` — Explain what Kasper is, its goal, and major features.\n" | |
| "`/instructions` or `!instructions` — Show the full command guide.\n" | |
| "`/role` or `!role` — Post a button that lets a user toggle the **Kasper** alert role.\n" | |
| "`/calculator` or `!calculator` — Open clean calculator buttons for projections and target planning.\n" | |
| "`/project` or `!project` — Quick projection command using phase inputs.\n" | |
| "`/targetcalc` or `!targetcalc` — Calculate required monthly contribution to hit a goal.\n" | |
| "`/listwatch` or `!listwatch` — Show the current server watchlist.\n" | |
| "`/serverinfo` or `!serverinfo` — Show alert channel, role, and scanner status for this server.\n\n" | |
| "**Watchlist commands**\n" | |
| "`/addwatch <ticker>` or `!addwatch <ticker>` — Add a ticker to this server's watchlist.\n" | |
| "`/removewatch <ticker>` or `!removewatch <ticker>` — Remove a ticker from the watchlist.\n\n" | |
| "**Admin commands**\n" | |
| "`/setalertchannel <channel>` or `!setalertchannel #channel` — Choose where alerts should be posted.\n" | |
| "`/setalertroleid <role_id>` or `!setalertroleid <role_id>` — Set the server's alert role by ID.\n" | |
| "`/setadminroles <ids>` or `!setadminroles 123,456` — Define which roles may use Kasper admin commands.\n" | |
| "`/setscanner <on|off>` or `!setscanner on/off` — Enable or disable the placeholder scanner loop for this server.\n" | |
| "`/testalert` or `!testalert` — Send a sample alert to the configured alert channel.\n" | |
| "`/sync` or `!sync` — Force slash-command sync (admin only).\n" | |
| ) | |
| embed.add_field(name="Commands", value=cmd_text[:1024], inline=False) | |
| embed.add_field( | |
| name="Role Button Behavior", | |
| value=( | |
| "The role button privately confirms whether the **Kasper** role was added or removed. " | |
| "It does not spam the alert channel." | |
| ), | |
| inline=False, | |
| ) | |
| embed.add_field( | |
| name="Multi-server behavior", | |
| value=( | |
| "Kasper keeps per-server config in local JSON storage. The shared role name stays **Kasper**, but each server can set its own role ID and alert channel." | |
| ), | |
| inline=False, | |
| ) | |
| embed.set_footer(text="Use /calculator for the clean modal UI.") | |
| return embed | |
| def role_embed(guild: discord.Guild) -> discord.Embed: | |
| cfg = get_guild_config(guild.id) | |
| embed = discord.Embed( | |
| title=f"{BOT_NAME} Alerts Role", | |
| description=( | |
| f"Press the button below to add or remove the **{cfg.get('role_name', DEFAULT_ROLE_NAME)}** role for this server.\n" | |
| "This role is intended for alert pings and notification opt-in." | |
| ), | |
| color=discord.Color.purple(), | |
| ) | |
| return embed | |
| def server_info_embed(guild: discord.Guild) -> discord.Embed: | |
| cfg = get_guild_config(guild.id) | |
| alert_channel = find_alert_channel(guild, cfg) | |
| alert_role = find_alert_role(guild, cfg) | |
| embed = discord.Embed(title=f"{BOT_NAME} Server Info", color=discord.Color.teal()) | |
| embed.add_field(name="Alert channel", value=alert_channel.mention if alert_channel else "Not configured", inline=False) | |
| embed.add_field(name="Alert role", value=alert_role.mention if alert_role else f"Not found ({cfg.get('role_name', DEFAULT_ROLE_NAME)})", inline=False) | |
| embed.add_field(name="Admin role IDs", value=", ".join(map(str, cfg.get("admin_role_ids", []))) or "None", inline=False) | |
| embed.add_field(name="Scanner enabled", value=str(cfg.get("scanner_enabled", True)), inline=False) | |
| return embed | |
| def watchlist_embed(guild: discord.Guild) -> discord.Embed: | |
| tickers = get_watchlist(guild.id) | |
| embed = discord.Embed(title=f"{BOT_NAME} Watchlist", color=discord.Color.orange()) | |
| if tickers: | |
| embed.description = "\n".join(f"• {t}" for t in sorted(tickers)) | |
| else: | |
| embed.description = "This server watchlist is empty. Add names with `/addwatch` or `!addwatch`." | |
| return embed | |
| # ========================= | |
| # Common response helper | |
| # ========================= | |
| async def respond(ctx: commands.Context, *, embed: Optional[discord.Embed] = None, content: Optional[str] = None, view: Optional[discord.ui.View] = None, ephemeral: bool = False): | |
| if ctx.interaction: | |
| if ctx.interaction.response.is_done(): | |
| return await ctx.interaction.followup.send(content=content, embed=embed, view=view, ephemeral=ephemeral) | |
| return await ctx.interaction.response.send_message(content=content, embed=embed, view=view, ephemeral=ephemeral) | |
| return await ctx.send(content=content, embed=embed, view=view) | |
| # ========================= | |
| # Events | |
| # ========================= | |
| async def on_ready() -> None: | |
| bot.add_view(RoleToggleView()) | |
| if not daily_scanner.is_running(): | |
| daily_scanner.start() | |
| log.info("%s is ready as %s (%s)", BOT_NAME, bot.user, bot.user.id if bot.user else "unknown") | |
| # ========================= | |
| # Public commands | |
| # ========================= | |
| async def welcome(ctx: commands.Context) -> None: | |
| await respond(ctx, embed=welcome_embed()) | |
| async def instructions(ctx: commands.Context) -> None: | |
| await respond(ctx, embed=instructions_embed()) | |
| async def role(ctx: commands.Context) -> None: | |
| if not ctx.guild: | |
| await respond(ctx, content="Use this command inside a server.") | |
| return | |
| await respond(ctx, embed=role_embed(ctx.guild), view=RoleToggleView()) | |
| async def serverinfo(ctx: commands.Context) -> None: | |
| if not ctx.guild: | |
| await respond(ctx, content="Use this command inside a server.") | |
| return | |
| await respond(ctx, embed=server_info_embed(ctx.guild)) | |
| async def calculator(ctx: commands.Context) -> None: | |
| embed = discord.Embed( | |
| title=f"{BOT_NAME} Calculator Hub", | |
| description="Use the buttons below to open a cleaner input UI.", | |
| color=discord.Color.blurple(), | |
| ) | |
| await respond(ctx, embed=embed, view=CalculatorHubView(), ephemeral=True) | |
| async def project( | |
| ctx: commands.Context, | |
| initial: float, | |
| phase1_years: int, | |
| phase1_monthly: float, | |
| phase2_years: Optional[int] = 0, | |
| phase2_monthly: Optional[float] = 0.0, | |
| phase2_lump: Optional[float] = 0.0, | |
| returns: Optional[str] = "10,14,18", | |
| ) -> None: | |
| phases = [Phase(years=phase1_years, monthly_contribution=phase1_monthly, label="Phase 1")] | |
| if phase2_years and phase2_years > 0: | |
| phases.append(Phase(years=phase2_years, monthly_contribution=phase2_monthly or 0.0, lump_sum_at_start=phase2_lump or 0.0, label="Phase 2")) | |
| embed = build_projection_embed(initial, phases, parse_returns(returns or "10,14,18")) | |
| await respond(ctx, embed=embed) | |
| async def targetcalc( | |
| ctx: commands.Context, | |
| target: float, | |
| years: int, | |
| initial: Optional[float] = 0.0, | |
| returns: Optional[str] = "10,14,18", | |
| ) -> None: | |
| embed = build_target_embed(target, years, initial or 0.0, parse_returns(returns or "10,14,18")) | |
| await respond(ctx, embed=embed) | |
| async def addwatch(ctx: commands.Context, ticker: str) -> None: | |
| if not ctx.guild: | |
| await respond(ctx, content="Use this command inside a server.") | |
| return | |
| ticker = ticker.upper().strip() | |
| wl = get_watchlist(ctx.guild.id) | |
| if ticker not in wl: | |
| wl.append(ticker) | |
| wl.sort() | |
| save_all() | |
| await respond(ctx, embed=watchlist_embed(ctx.guild)) | |
| async def removewatch(ctx: commands.Context, ticker: str) -> None: | |
| if not ctx.guild: | |
| await respond(ctx, content="Use this command inside a server.") | |
| return | |
| ticker = ticker.upper().strip() | |
| wl = get_watchlist(ctx.guild.id) | |
| if ticker in wl: | |
| wl.remove(ticker) | |
| save_all() | |
| await respond(ctx, embed=watchlist_embed(ctx.guild)) | |
| async def listwatch(ctx: commands.Context) -> None: | |
| if not ctx.guild: | |
| await respond(ctx, content="Use this command inside a server.") | |
| return | |
| await respond(ctx, embed=watchlist_embed(ctx.guild)) | |
| # ========================= | |
| # Admin commands | |
| # ========================= | |
| async def setalertchannel(ctx: commands.Context, channel: discord.TextChannel) -> None: | |
| if not ctx.guild or not isinstance(ctx.author, discord.Member): | |
| await respond(ctx, content="Use this command inside a server.") | |
| return | |
| cfg = get_guild_config(ctx.guild.id) | |
| if not user_is_adminish(ctx.author, cfg): | |
| await respond(ctx, content="You do not have permission to use this command.", ephemeral=True) | |
| return | |
| cfg["alert_channel_id"] = channel.id | |
| save_all() | |
| await respond(ctx, content=f"Alert channel set to {channel.mention}.") | |
| async def setalertroleid(ctx: commands.Context, role_id: str) -> None: | |
| if not ctx.guild or not isinstance(ctx.author, discord.Member): | |
| await respond(ctx, content="Use this command inside a server.") | |
| return | |
| cfg = get_guild_config(ctx.guild.id) | |
| if not user_is_adminish(ctx.author, cfg): | |
| await respond(ctx, content="You do not have permission to use this command.", ephemeral=True) | |
| return | |
| if not role_id.isdigit(): | |
| await respond(ctx, content="Role ID must be numeric.", ephemeral=True) | |
| return | |
| cfg["alert_role_id"] = int(role_id) | |
| save_all() | |
| role = ctx.guild.get_role(int(role_id)) | |
| await respond(ctx, content=f"Alert role ID saved. Resolved role: {role.mention if role else 'not currently found'}." ) | |
| async def setadminroles(ctx: commands.Context, role_ids: str) -> None: | |
| if not ctx.guild or not isinstance(ctx.author, discord.Member): | |
| await respond(ctx, content="Use this command inside a server.") | |
| return | |
| cfg = get_guild_config(ctx.guild.id) | |
| if not (ctx.author.guild_permissions.administrator or user_is_adminish(ctx.author, cfg)): | |
| await respond(ctx, content="You do not have permission to use this command.", ephemeral=True) | |
| return | |
| parsed = [] | |
| for item in role_ids.split(","): | |
| item = item.strip() | |
| if item: | |
| if not item.isdigit(): | |
| await respond(ctx, content=f"Invalid role ID: {item}", ephemeral=True) | |
| return | |
| parsed.append(int(item)) | |
| cfg["admin_role_ids"] = parsed | |
| save_all() | |
| await respond(ctx, content=f"Admin role IDs updated: {', '.join(map(str, parsed)) or 'None'}") | |
| async def setscanner(ctx: commands.Context, state: str) -> None: | |
| if not ctx.guild or not isinstance(ctx.author, discord.Member): | |
| await respond(ctx, content="Use this command inside a server.") | |
| return | |
| cfg = get_guild_config(ctx.guild.id) | |
| if not user_is_adminish(ctx.author, cfg): | |
| await respond(ctx, content="You do not have permission to use this command.", ephemeral=True) | |
| return | |
| state = state.lower().strip() | |
| if state not in {"on", "off"}: | |
| await respond(ctx, content="Use `on` or `off`.", ephemeral=True) | |
| return | |
| cfg["scanner_enabled"] = state == "on" | |
| save_all() | |
| await respond(ctx, content=f"Scanner enabled: {cfg['scanner_enabled']}") | |
| async def testalert(ctx: commands.Context) -> None: | |
| if not ctx.guild or not isinstance(ctx.author, discord.Member): | |
| await respond(ctx, content="Use this command inside a server.") | |
| return | |
| cfg = get_guild_config(ctx.guild.id) | |
| if not user_is_adminish(ctx.author, cfg): | |
| await respond(ctx, content="You do not have permission to use this command.", ephemeral=True) | |
| return | |
| channel = find_alert_channel(ctx.guild, cfg) | |
| role = find_alert_role(ctx.guild, cfg) | |
| if not channel: | |
| await respond(ctx, content="Alert channel is not configured.", ephemeral=True) | |
| return | |
| embed = discord.Embed( | |
| title=f"{BOT_NAME} Test Alert", | |
| description="This is a sample alert message from Kasper.", | |
| color=discord.Color.red(), | |
| timestamp=datetime.now(timezone.utc), | |
| ) | |
| embed.add_field(name="Why this exists", value="Use this to verify channel and role setup.", inline=False) | |
| mention = role.mention if role else "" | |
| await channel.send(content=mention or None, embed=embed) | |
| await respond(ctx, content=f"Sent test alert to {channel.mention}.") | |
| async def sync(ctx: commands.Context) -> None: | |
| if not ctx.guild or not isinstance(ctx.author, discord.Member): | |
| await respond(ctx, content="Use this command inside a server.") | |
| return | |
| cfg = get_guild_config(ctx.guild.id) | |
| if not user_is_adminish(ctx.author, cfg): | |
| await respond(ctx, content="You do not have permission to use this command.", ephemeral=True) | |
| return | |
| synced = await bot.tree.sync() | |
| await respond(ctx, content=f"Synced {len(synced)} slash commands.") | |
| # ========================= | |
| # Placeholder scanner | |
| # ========================= | |
| async def post_scan_alert(guild: discord.Guild, tickers: List[str]) -> None: | |
| cfg = get_guild_config(guild.id) | |
| channel = find_alert_channel(guild, cfg) | |
| if not channel: | |
| return | |
| role = find_alert_role(guild, cfg) | |
| embed = discord.Embed( | |
| title=f"{BOT_NAME} Daily Watchlist Scan", | |
| description="Placeholder scan result. Replace this section with real free-data stock logic later.", | |
| color=discord.Color.orange(), | |
| timestamp=datetime.now(timezone.utc), | |
| ) | |
| embed.add_field(name="Watchlist checked", value=", ".join(tickers[:50]) if tickers else "No tickers", inline=False) | |
| embed.add_field( | |
| name="Next step", | |
| value="Swap in your real SEC + price-data scoring pipeline here while keeping the same alert shell.", | |
| inline=False, | |
| ) | |
| mention = role.mention if role else None | |
| await channel.send(content=mention, embed=embed) | |
| alert_log.append({ | |
| "guild_id": guild.id, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "type": "daily_placeholder_scan", | |
| "tickers": tickers, | |
| }) | |
| save_all() | |
| async def daily_scanner() -> None: | |
| await bot.wait_until_ready() | |
| for guild in bot.guilds: | |
| cfg = get_guild_config(guild.id) | |
| if not cfg.get("scanner_enabled", True): | |
| continue | |
| tickers = get_watchlist(guild.id) | |
| if not tickers: | |
| continue | |
| try: | |
| await post_scan_alert(guild, tickers) | |
| cfg["last_scanner_run"] = datetime.now(timezone.utc).isoformat() | |
| save_all() | |
| except Exception as exc: | |
| log.exception("Scanner failed for guild %s: %s", guild.id, exc) | |
| # ========================= | |
| # Error handling | |
| # ========================= | |
| async def on_command_error(ctx: commands.Context, error: commands.CommandError) -> None: | |
| if isinstance(error, commands.CommandNotFound): | |
| return | |
| if isinstance(error, commands.MissingRequiredArgument): | |
| await ctx.send(f"Missing argument: {error.param.name}") | |
| return | |
| log.exception("Command error: %s", error) | |
| try: | |
| await ctx.send(f"Something went wrong: {error}") | |
| except Exception: | |
| pass | |
| # ========================= | |
| # Startup checks / run | |
| # ========================= | |
| if __name__ == "__main__": | |
| if not TOKEN or TOKEN == "PASTE_BOT_TOKEN_HERE": | |
| raise SystemExit("Set DISCORD_BOT_TOKEN in .env or paste it into bot.py before running.") | |
| if APPLICATION_ID is None: | |
| raise SystemExit("Set a numeric DISCORD_APPLICATION_ID in .env or bot.py before running.") | |
| save_all() | |
| bot.run(TOKEN) | |