Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import logging | |
| import random | |
| import string | |
| import psutil | |
| from datetime import datetime | |
| from constants import KEYS_FILE, MASTER_KEY, OUTPUT_DIR | |
| # Set up access logger | |
| access_logger = logging.getLogger("wealth_access") | |
| access_logger.setLevel(logging.INFO) | |
| handler = logging.FileHandler(os.path.join(OUTPUT_DIR, "access.log")) | |
| formatter = logging.Formatter('%(asctime)s - %(message)s') | |
| handler.setFormatter(formatter) | |
| access_logger.addHandler(handler) | |
| from database import get_pg_engine, ApiKey | |
| from sqlalchemy.orm import sessionmaker | |
| def get_db_session(): | |
| engine = get_pg_engine() | |
| Session = sessionmaker(bind=engine) | |
| return Session() | |
| def send_telegram_alert(message: str): | |
| token = os.getenv("TELEGRAM_BOT_TOKEN") | |
| chat_id = os.getenv("TELEGRAM_CHAT_ID") | |
| if not token or not chat_id: | |
| return # Skip if Telegram isn't configured | |
| try: | |
| import urllib.request | |
| import urllib.parse | |
| import requests | |
| url = f"https://api.telegram.org/bot{token}/sendMessage" | |
| payload = { | |
| "chat_id": chat_id, | |
| "text": message, | |
| "parse_mode": "Markdown" | |
| } | |
| requests.post(url, json=payload, timeout=5) | |
| except Exception as e: | |
| access_logger.error(f"Failed to send Telegram alert: {e}") | |
| def validate_key(key: str, ip: str = "Unknown", silent: bool = False) -> bool: | |
| if not key: | |
| return False | |
| key = key.strip() | |
| # Check Master Key | |
| if key == MASTER_KEY: | |
| msg = f"β **MASTER_KEY** used by IP: `{ip}`" | |
| access_logger.info(msg) | |
| if not silent: | |
| send_telegram_alert(msg) | |
| return True | |
| # Check Time-Based Keys | |
| session = get_db_session() | |
| try: | |
| api_key = session.query(ApiKey).filter_by(key=key).first() | |
| if api_key and api_key.revoked == "false": | |
| if datetime.now() <= api_key.expires_at: | |
| is_first_use = api_key.used_at is None | |
| if is_first_use or api_key.used_by_ip != ip: | |
| api_key.used_at = datetime.now() | |
| api_key.used_by_ip = ip | |
| session.commit() | |
| msg = f"β **Time-Key** '{key}' used by IP: `{ip}`" | |
| access_logger.info(msg) | |
| if not silent: | |
| send_telegram_alert(msg) | |
| return True | |
| finally: | |
| session.close() | |
| msg = f"β **DENIED:** Invalid, expired, or revoked key '{key}' attempted by IP: `{ip}`" | |
| access_logger.warning(msg) | |
| if not silent: | |
| send_telegram_alert(msg) | |
| return False | |
| def generate_otk(admin_key: str, new_key: str = None, hours: int = 1) -> bool: | |
| if admin_key != MASTER_KEY: | |
| return False | |
| if not new_key: | |
| new_key = ''.join(random.choices(string.ascii_uppercase + string.digits, k=8)) | |
| from datetime import timedelta | |
| session = get_db_session() | |
| try: | |
| new_api_key = ApiKey( | |
| key=new_key, | |
| created_at=datetime.now(), | |
| expires_at=datetime.now() + timedelta(hours=hours), | |
| revoked="false" | |
| ) | |
| session.add(new_api_key) | |
| session.commit() | |
| finally: | |
| session.close() | |
| access_logger.info(f"ADMIN: New OTK generated: '{new_key}', valid for {hours} hours.") | |
| return new_key | |
| def handle_telegram_command(text: str): | |
| text = text.strip() | |
| if text.startswith("/generate"): | |
| parts = text.split() | |
| hours = 1 | |
| if len(parts) > 1 and parts[1].isdigit(): | |
| hours = int(parts[1]) | |
| key = generate_otk(MASTER_KEY, hours=hours) | |
| send_telegram_alert(f"β Key `{key}` generated successfully. It will be valid for {hours} hour(s).") | |
| elif text.startswith("/status"): | |
| session = get_db_session() | |
| try: | |
| active_keys = session.query(ApiKey).filter(ApiKey.revoked == "false", ApiKey.expires_at >= datetime.now()).count() | |
| finally: | |
| session.close() | |
| send_telegram_alert(f"π’ **Engine Online**\n\nActive Keys: {active_keys}") | |
| elif text.startswith("/logs"): | |
| log_path = os.path.join(OUTPUT_DIR, "access.log") | |
| if os.path.exists(log_path): | |
| with open(log_path, "r", encoding="utf-8") as f: | |
| lines = f.readlines()[-5:] | |
| log_text = "".join(lines) if lines else "No logs yet." | |
| send_telegram_alert(f"π **Last 5 Access Logs:**\n```\n{log_text}\n```") | |
| else: | |
| send_telegram_alert("No logs found.") | |
| elif text.startswith("/revoke_all"): | |
| session = get_db_session() | |
| count = 0 | |
| try: | |
| keys_to_revoke = session.query(ApiKey).filter(ApiKey.revoked == "false", ApiKey.expires_at >= datetime.now()).all() | |
| for k in keys_to_revoke: | |
| k.revoked = "true" | |
| count += 1 | |
| session.commit() | |
| finally: | |
| session.close() | |
| send_telegram_alert(f"π **LOCKDOWN INITIATED**\n{count} active keys have been revoked.") | |
| elif text.startswith("/metrics"): | |
| try: | |
| ram = psutil.virtual_memory() | |
| cpu = psutil.cpu_percent(interval=0.1) | |
| # Find DB size | |
| db_size_mb = 0 | |
| db_path = os.path.join(OUTPUT_DIR, "portfolio_db.sqlite3") | |
| if os.path.exists(db_path): | |
| db_size_mb = os.path.getsize(db_path) / (1024 * 1024) | |
| msg = f"π **System Metrics**\n\n" | |
| msg += f"π₯οΈ CPU Usage: {cpu}%\n" | |
| msg += f"π§ RAM Usage: {ram.percent}% ({ram.used / (1024*1024):.0f}MB / {ram.total / (1024*1024):.0f}MB)\n" | |
| msg += f"π½ DB Size: {db_size_mb:.2f} MB" | |
| send_telegram_alert(msg) | |
| except Exception as e: | |
| send_telegram_alert(f"Failed to fetch metrics: {e}") | |
| elif text.startswith("/clear_cache"): | |
| try: | |
| from database import get_pg_engine | |
| from sqlalchemy import text | |
| engine = get_pg_engine() | |
| with engine.connect() as conn: | |
| conn.execute(text("DELETE FROM daily_prices")) | |
| conn.execute(text("DELETE FROM daily_yields")) | |
| conn.commit() | |
| send_telegram_alert("π§Ή **Cache Cleared**\nAll historical price data has been purged. The next optimization will trigger a fresh yfinance download.") | |
| except Exception as e: | |
| send_telegram_alert(f"Failed to clear cache: {e}") | |
| elif text.startswith("/database"): | |
| try: | |
| from database import get_pg_engine | |
| from sqlalchemy import text | |
| engine = get_pg_engine() | |
| with engine.connect() as conn: | |
| prices_count = conn.execute(text("SELECT COUNT(*) FROM daily_prices")).scalar() | |
| yields_count = conn.execute(text("SELECT COUNT(*) FROM daily_yields")).scalar() | |
| db_size_mb = 0 | |
| db_path = os.path.join(OUTPUT_DIR, "portfolio_db.sqlite3") | |
| if os.path.exists(db_path): | |
| db_size_mb = os.path.getsize(db_path) / (1024 * 1024) | |
| send_telegram_alert( | |
| f"ποΈ **Database Diagnostics**\n\n" | |
| f"π Daily Prices: {prices_count:,} rows\n" | |
| f"π Daily Yields: {yields_count:,} rows\n" | |
| f"πΎ Disk Size: {db_size_mb:.2f} MB" | |
| ) | |
| except Exception as e: | |
| send_telegram_alert(f"Database query failed: {e}") | |
| elif text.startswith("/run"): | |
| parts = text.split(maxsplit=1) | |
| if len(parts) > 1: | |
| tickers = parts[1].upper() | |
| send_telegram_alert(f"π **Starting Engine Run**\nTickers: {tickers}\n*Engine will report back when finished.*") | |
| # Usually queued to a Celery worker or async thread in production | |
| else: | |
| send_telegram_alert("Usage: `/run AAPL,MSFT,TSLA`") | |
| elif text.startswith("/report"): | |
| parts = text.split() | |
| if len(parts) > 1: | |
| task_id = parts[1] | |
| send_telegram_alert(f"π Fetching report for Task ID: `{task_id}`...") | |
| else: | |
| send_telegram_alert("Usage: `/report [task_id]`") | |
| elif text.startswith("/alerts"): | |
| send_telegram_alert("π **Alert Daemon**\nMarket drop monitoring is active. Will notify on 5%+ drops.") | |
| elif text.startswith("/help"): | |
| help_text = ( | |
| "π€ **Wealth Engine Master Bot**\n\n" | |
| "π `/generate [hours]` - Create an Access Key\n" | |
| "π’ `/status` - Check active keys\n" | |
| "βΆοΈ `/run [tickers]` - Run optimization for tickers\n" | |
| "π `/report [task]` - Get report link\n" | |
| "π `/alerts` - Check active alerts\n" | |
| "π `/metrics` - Server RAM & CPU\n" | |
| "ποΈ `/database` - View DB row counts\n" | |
| "π§Ή `/clear_cache` - Purge historical data\n" | |
| "π `/logs` - View last 5 logins\n" | |
| "π `/revoke_all` - Destroy all keys" | |
| ) | |
| send_telegram_alert(help_text) | |
| else: | |
| send_telegram_alert("β **Unknown Command**\nType `/help` for a list of available commands.") | |