math-backend / access_manager.py
engineportf's picture
Upload folder using huggingface_hub
558db1e verified
Raw
History Blame Contribute Delete
9.42 kB
import os
import json
import logging
import random
import string
import psutil
from datetime import datetime
from constants import KEYS_FILE, MASTER_KEY, OUTPUT_DIR
# Set up access logger
access_logger = logging.getLogger("wealth_access")
access_logger.setLevel(logging.INFO)
handler = logging.FileHandler(os.path.join(OUTPUT_DIR, "access.log"))
formatter = logging.Formatter('%(asctime)s - %(message)s')
handler.setFormatter(formatter)
access_logger.addHandler(handler)
from database import get_pg_engine, ApiKey
from sqlalchemy.orm import sessionmaker
def get_db_session():
engine = get_pg_engine()
Session = sessionmaker(bind=engine)
return Session()
def send_telegram_alert(message: str):
token = os.getenv("TELEGRAM_BOT_TOKEN")
chat_id = os.getenv("TELEGRAM_CHAT_ID")
if not token or not chat_id:
return # Skip if Telegram isn't configured
try:
import urllib.request
import urllib.parse
import requests
url = f"https://api.telegram.org/bot{token}/sendMessage"
payload = {
"chat_id": chat_id,
"text": message,
"parse_mode": "Markdown"
}
requests.post(url, json=payload, timeout=5)
except Exception as e:
access_logger.error(f"Failed to send Telegram alert: {e}")
def validate_key(key: str, ip: str = "Unknown", silent: bool = False) -> bool:
if not key:
return False
key = key.strip()
# Check Master Key
if key == MASTER_KEY:
msg = f"βœ… **MASTER_KEY** used by IP: `{ip}`"
access_logger.info(msg)
if not silent:
send_telegram_alert(msg)
return True
# Check Time-Based Keys
session = get_db_session()
try:
api_key = session.query(ApiKey).filter_by(key=key).first()
if api_key and api_key.revoked == "false":
if datetime.now() <= api_key.expires_at:
is_first_use = api_key.used_at is None
if is_first_use or api_key.used_by_ip != ip:
api_key.used_at = datetime.now()
api_key.used_by_ip = ip
session.commit()
msg = f"βœ… **Time-Key** '{key}' used by IP: `{ip}`"
access_logger.info(msg)
if not silent:
send_telegram_alert(msg)
return True
finally:
session.close()
msg = f"β›” **DENIED:** Invalid, expired, or revoked key '{key}' attempted by IP: `{ip}`"
access_logger.warning(msg)
if not silent:
send_telegram_alert(msg)
return False
def generate_otk(admin_key: str, new_key: str = None, hours: int = 1) -> bool:
if admin_key != MASTER_KEY:
return False
if not new_key:
new_key = ''.join(random.choices(string.ascii_uppercase + string.digits, k=8))
from datetime import timedelta
session = get_db_session()
try:
new_api_key = ApiKey(
key=new_key,
created_at=datetime.now(),
expires_at=datetime.now() + timedelta(hours=hours),
revoked="false"
)
session.add(new_api_key)
session.commit()
finally:
session.close()
access_logger.info(f"ADMIN: New OTK generated: '{new_key}', valid for {hours} hours.")
return new_key
def handle_telegram_command(text: str):
text = text.strip()
if text.startswith("/generate"):
parts = text.split()
hours = 1
if len(parts) > 1 and parts[1].isdigit():
hours = int(parts[1])
key = generate_otk(MASTER_KEY, hours=hours)
send_telegram_alert(f"βœ… Key `{key}` generated successfully. It will be valid for {hours} hour(s).")
elif text.startswith("/status"):
session = get_db_session()
try:
active_keys = session.query(ApiKey).filter(ApiKey.revoked == "false", ApiKey.expires_at >= datetime.now()).count()
finally:
session.close()
send_telegram_alert(f"🟒 **Engine Online**\n\nActive Keys: {active_keys}")
elif text.startswith("/logs"):
log_path = os.path.join(OUTPUT_DIR, "access.log")
if os.path.exists(log_path):
with open(log_path, "r", encoding="utf-8") as f:
lines = f.readlines()[-5:]
log_text = "".join(lines) if lines else "No logs yet."
send_telegram_alert(f"πŸ“„ **Last 5 Access Logs:**\n```\n{log_text}\n```")
else:
send_telegram_alert("No logs found.")
elif text.startswith("/revoke_all"):
session = get_db_session()
count = 0
try:
keys_to_revoke = session.query(ApiKey).filter(ApiKey.revoked == "false", ApiKey.expires_at >= datetime.now()).all()
for k in keys_to_revoke:
k.revoked = "true"
count += 1
session.commit()
finally:
session.close()
send_telegram_alert(f"πŸ›‘ **LOCKDOWN INITIATED**\n{count} active keys have been revoked.")
elif text.startswith("/metrics"):
try:
ram = psutil.virtual_memory()
cpu = psutil.cpu_percent(interval=0.1)
# Find DB size
db_size_mb = 0
db_path = os.path.join(OUTPUT_DIR, "portfolio_db.sqlite3")
if os.path.exists(db_path):
db_size_mb = os.path.getsize(db_path) / (1024 * 1024)
msg = f"πŸ“Š **System Metrics**\n\n"
msg += f"πŸ–₯️ CPU Usage: {cpu}%\n"
msg += f"🧠 RAM Usage: {ram.percent}% ({ram.used / (1024*1024):.0f}MB / {ram.total / (1024*1024):.0f}MB)\n"
msg += f"πŸ’½ DB Size: {db_size_mb:.2f} MB"
send_telegram_alert(msg)
except Exception as e:
send_telegram_alert(f"Failed to fetch metrics: {e}")
elif text.startswith("/clear_cache"):
try:
from database import get_pg_engine
from sqlalchemy import text
engine = get_pg_engine()
with engine.connect() as conn:
conn.execute(text("DELETE FROM daily_prices"))
conn.execute(text("DELETE FROM daily_yields"))
conn.commit()
send_telegram_alert("🧹 **Cache Cleared**\nAll historical price data has been purged. The next optimization will trigger a fresh yfinance download.")
except Exception as e:
send_telegram_alert(f"Failed to clear cache: {e}")
elif text.startswith("/database"):
try:
from database import get_pg_engine
from sqlalchemy import text
engine = get_pg_engine()
with engine.connect() as conn:
prices_count = conn.execute(text("SELECT COUNT(*) FROM daily_prices")).scalar()
yields_count = conn.execute(text("SELECT COUNT(*) FROM daily_yields")).scalar()
db_size_mb = 0
db_path = os.path.join(OUTPUT_DIR, "portfolio_db.sqlite3")
if os.path.exists(db_path):
db_size_mb = os.path.getsize(db_path) / (1024 * 1024)
send_telegram_alert(
f"πŸ—„οΈ **Database Diagnostics**\n\n"
f"πŸ“ˆ Daily Prices: {prices_count:,} rows\n"
f"πŸ“‰ Daily Yields: {yields_count:,} rows\n"
f"πŸ’Ύ Disk Size: {db_size_mb:.2f} MB"
)
except Exception as e:
send_telegram_alert(f"Database query failed: {e}")
elif text.startswith("/run"):
parts = text.split(maxsplit=1)
if len(parts) > 1:
tickers = parts[1].upper()
send_telegram_alert(f"πŸš€ **Starting Engine Run**\nTickers: {tickers}\n*Engine will report back when finished.*")
# Usually queued to a Celery worker or async thread in production
else:
send_telegram_alert("Usage: `/run AAPL,MSFT,TSLA`")
elif text.startswith("/report"):
parts = text.split()
if len(parts) > 1:
task_id = parts[1]
send_telegram_alert(f"πŸ“„ Fetching report for Task ID: `{task_id}`...")
else:
send_telegram_alert("Usage: `/report [task_id]`")
elif text.startswith("/alerts"):
send_telegram_alert("πŸ”” **Alert Daemon**\nMarket drop monitoring is active. Will notify on 5%+ drops.")
elif text.startswith("/help"):
help_text = (
"πŸ€– **Wealth Engine Master Bot**\n\n"
"πŸ” `/generate [hours]` - Create an Access Key\n"
"🟒 `/status` - Check active keys\n"
"▢️ `/run [tickers]` - Run optimization for tickers\n"
"πŸ“„ `/report [task]` - Get report link\n"
"πŸ”” `/alerts` - Check active alerts\n"
"πŸ“Š `/metrics` - Server RAM & CPU\n"
"πŸ—„οΈ `/database` - View DB row counts\n"
"🧹 `/clear_cache` - Purge historical data\n"
"πŸ“„ `/logs` - View last 5 logins\n"
"πŸ›‘ `/revoke_all` - Destroy all keys"
)
send_telegram_alert(help_text)
else:
send_telegram_alert("❓ **Unknown Command**\nType `/help` for a list of available commands.")