import os import json import logging import random import string import psutil from datetime import datetime from constants import KEYS_FILE, MASTER_KEY, OUTPUT_DIR # Set up access logger access_logger = logging.getLogger("wealth_access") access_logger.setLevel(logging.INFO) handler = logging.FileHandler(os.path.join(OUTPUT_DIR, "access.log")) formatter = logging.Formatter('%(asctime)s - %(message)s') handler.setFormatter(formatter) access_logger.addHandler(handler) from database import get_pg_engine, ApiKey from sqlalchemy.orm import sessionmaker def get_db_session(): engine = get_pg_engine() Session = sessionmaker(bind=engine) return Session() def send_telegram_alert(message: str): token = os.getenv("TELEGRAM_BOT_TOKEN") chat_id = os.getenv("TELEGRAM_CHAT_ID") if not token or not chat_id: return # Skip if Telegram isn't configured try: import urllib.request import urllib.parse import requests url = f"https://api.telegram.org/bot{token}/sendMessage" payload = { "chat_id": chat_id, "text": message, "parse_mode": "Markdown" } requests.post(url, json=payload, timeout=5) except Exception as e: access_logger.error(f"Failed to send Telegram alert: {e}") def validate_key(key: str, ip: str = "Unknown", silent: bool = False) -> bool: if not key: return False key = key.strip() # Check Master Key if key == MASTER_KEY: msg = f"โœ… **MASTER_KEY** used by IP: `{ip}`" access_logger.info(msg) if not silent: send_telegram_alert(msg) return True # Check Time-Based Keys session = get_db_session() try: api_key = session.query(ApiKey).filter_by(key=key).first() if api_key and api_key.revoked == "false": if datetime.now() <= api_key.expires_at: is_first_use = api_key.used_at is None if is_first_use or api_key.used_by_ip != ip: api_key.used_at = datetime.now() api_key.used_by_ip = ip session.commit() msg = f"โœ… **Time-Key** '{key}' used by IP: `{ip}`" access_logger.info(msg) if not silent: send_telegram_alert(msg) return True finally: session.close() msg = f"โ›” **DENIED:** Invalid, expired, or revoked key '{key}' attempted by IP: `{ip}`" access_logger.warning(msg) if not silent: send_telegram_alert(msg) return False def generate_otk(admin_key: str, new_key: str = None, hours: int = 1) -> bool: if admin_key != MASTER_KEY: return False if not new_key: new_key = ''.join(random.choices(string.ascii_uppercase + string.digits, k=8)) from datetime import timedelta session = get_db_session() try: new_api_key = ApiKey( key=new_key, created_at=datetime.now(), expires_at=datetime.now() + timedelta(hours=hours), revoked="false" ) session.add(new_api_key) session.commit() finally: session.close() access_logger.info(f"ADMIN: New OTK generated: '{new_key}', valid for {hours} hours.") return new_key def handle_telegram_command(text: str): text = text.strip() if text.startswith("/generate"): parts = text.split() hours = 1 if len(parts) > 1 and parts[1].isdigit(): hours = int(parts[1]) key = generate_otk(MASTER_KEY, hours=hours) send_telegram_alert(f"โœ… Key `{key}` generated successfully. It will be valid for {hours} hour(s).") elif text.startswith("/status"): session = get_db_session() try: active_keys = session.query(ApiKey).filter(ApiKey.revoked == "false", ApiKey.expires_at >= datetime.now()).count() finally: session.close() send_telegram_alert(f"๐ŸŸข **Engine Online**\n\nActive Keys: {active_keys}") elif text.startswith("/logs"): log_path = os.path.join(OUTPUT_DIR, "access.log") if os.path.exists(log_path): with open(log_path, "r", encoding="utf-8") as f: lines = f.readlines()[-5:] log_text = "".join(lines) if lines else "No logs yet." send_telegram_alert(f"๐Ÿ“„ **Last 5 Access Logs:**\n```\n{log_text}\n```") else: send_telegram_alert("No logs found.") elif text.startswith("/revoke_all"): session = get_db_session() count = 0 try: keys_to_revoke = session.query(ApiKey).filter(ApiKey.revoked == "false", ApiKey.expires_at >= datetime.now()).all() for k in keys_to_revoke: k.revoked = "true" count += 1 session.commit() finally: session.close() send_telegram_alert(f"๐Ÿ›‘ **LOCKDOWN INITIATED**\n{count} active keys have been revoked.") elif text.startswith("/metrics"): try: ram = psutil.virtual_memory() cpu = psutil.cpu_percent(interval=0.1) # Find DB size db_size_mb = 0 db_path = os.path.join(OUTPUT_DIR, "portfolio_db.sqlite3") if os.path.exists(db_path): db_size_mb = os.path.getsize(db_path) / (1024 * 1024) msg = f"๐Ÿ“Š **System Metrics**\n\n" msg += f"๐Ÿ–ฅ๏ธ CPU Usage: {cpu}%\n" msg += f"๐Ÿง  RAM Usage: {ram.percent}% ({ram.used / (1024*1024):.0f}MB / {ram.total / (1024*1024):.0f}MB)\n" msg += f"๐Ÿ’ฝ DB Size: {db_size_mb:.2f} MB" send_telegram_alert(msg) except Exception as e: send_telegram_alert(f"Failed to fetch metrics: {e}") elif text.startswith("/clear_cache"): try: from database import get_pg_engine from sqlalchemy import text engine = get_pg_engine() with engine.connect() as conn: conn.execute(text("DELETE FROM daily_prices")) conn.execute(text("DELETE FROM daily_yields")) conn.commit() send_telegram_alert("๐Ÿงน **Cache Cleared**\nAll historical price data has been purged. The next optimization will trigger a fresh yfinance download.") except Exception as e: send_telegram_alert(f"Failed to clear cache: {e}") elif text.startswith("/database"): try: from database import get_pg_engine from sqlalchemy import text engine = get_pg_engine() with engine.connect() as conn: prices_count = conn.execute(text("SELECT COUNT(*) FROM daily_prices")).scalar() yields_count = conn.execute(text("SELECT COUNT(*) FROM daily_yields")).scalar() db_size_mb = 0 db_path = os.path.join(OUTPUT_DIR, "portfolio_db.sqlite3") if os.path.exists(db_path): db_size_mb = os.path.getsize(db_path) / (1024 * 1024) send_telegram_alert( f"๐Ÿ—„๏ธ **Database Diagnostics**\n\n" f"๐Ÿ“ˆ Daily Prices: {prices_count:,} rows\n" f"๐Ÿ“‰ Daily Yields: {yields_count:,} rows\n" f"๐Ÿ’พ Disk Size: {db_size_mb:.2f} MB" ) except Exception as e: send_telegram_alert(f"Database query failed: {e}") elif text.startswith("/run"): parts = text.split(maxsplit=1) if len(parts) > 1: tickers = parts[1].upper() send_telegram_alert(f"๐Ÿš€ **Starting Engine Run**\nTickers: {tickers}\n*Engine will report back when finished.*") # Usually queued to a Celery worker or async thread in production else: send_telegram_alert("Usage: `/run AAPL,MSFT,TSLA`") elif text.startswith("/report"): parts = text.split() if len(parts) > 1: task_id = parts[1] send_telegram_alert(f"๐Ÿ“„ Fetching report for Task ID: `{task_id}`...") else: send_telegram_alert("Usage: `/report [task_id]`") elif text.startswith("/alerts"): send_telegram_alert("๐Ÿ”” **Alert Daemon**\nMarket drop monitoring is active. Will notify on 5%+ drops.") elif text.startswith("/help"): help_text = ( "๐Ÿค– **Wealth Engine Master Bot**\n\n" "๐Ÿ” `/generate [hours]` - Create an Access Key\n" "๐ŸŸข `/status` - Check active keys\n" "โ–ถ๏ธ `/run [tickers]` - Run optimization for tickers\n" "๐Ÿ“„ `/report [task]` - Get report link\n" "๐Ÿ”” `/alerts` - Check active alerts\n" "๐Ÿ“Š `/metrics` - Server RAM & CPU\n" "๐Ÿ—„๏ธ `/database` - View DB row counts\n" "๐Ÿงน `/clear_cache` - Purge historical data\n" "๐Ÿ“„ `/logs` - View last 5 logins\n" "๐Ÿ›‘ `/revoke_all` - Destroy all keys" ) send_telegram_alert(help_text) else: send_telegram_alert("โ“ **Unknown Command**\nType `/help` for a list of available commands.")