import os import json import logging import random import string import psutil from datetime import datetime, timedelta, timezone import requests import functools def to_aware(dt: datetime) -> datetime: if dt.tzinfo is None: return dt.replace(tzinfo=timezone.utc) return dt import redis from constants import KEYS_FILE, MASTER_KEY, OUTPUT_DIR @functools.lru_cache(maxsize=1024) def get_geo_location(ip: str) -> str: if ip in ("127.0.0.1", "::1", "localhost", "Unknown"): return "" try: res = requests.get(f"http://ip-api.com/json/{ip}?fields=status,country,city", timeout=2) if res.ok: data = res.json() if data.get("status") == "success": city = data.get("city", "") country = data.get("country", "") if city and country: return f" ({city}, {country})" elif country: return f" ({country})" except Exception: pass return "" # Set up access logger access_logger = logging.getLogger("wealth_access") access_logger.setLevel(logging.INFO) handler = logging.FileHandler(os.path.join(OUTPUT_DIR, "access.log"), encoding='utf-8') formatter = logging.Formatter('%(asctime)s - %(message)s') handler.setFormatter(formatter) access_logger.addHandler(handler) # Initialize Redis client if REDIS_URL is provided redis_url = os.getenv("REDIS_URL") redis_client = None if redis_url: try: redis_client = redis.from_url(redis_url, decode_responses=True) access_logger.info("Successfully connected to Redis for access key storage.") except Exception as e: access_logger.error(f"Failed to connect to Redis: {e}") def load_keys(): if redis_client: try: data = redis_client.get("access_keys") if data: return json.loads(data) except Exception as e: access_logger.error(f"Redis read error: {e}") # Fallback to local JSON if not os.path.exists(KEYS_FILE): return {"otk": {}} try: with open(KEYS_FILE, 'r') as f: return json.load(f) except: return {"otk": {}} def save_keys(data): if redis_client: try: redis_client.set("access_keys", json.dumps(data)) return except Exception as e: access_logger.error(f"Redis write error: {e}") # Fallback to local JSON with open(KEYS_FILE, 'w') as f: json.dump(data, f, indent=4) def validate_key(key: str, ip: str = "Unknown", silent: bool = False) -> bool: if not key: return False key = key.strip() # Check Master Key if key == MASTER_KEY: geo = get_geo_location(ip) msg = f"✅ **MASTER_KEY** used by IP: `{ip}`{geo}" access_logger.info(msg) return True # Check Time-Based Keys data = load_keys() if key in data["otk"] and data["otk"][key].get("revoked", False) == False: key_data = data["otk"][key] created_at = to_aware(datetime.fromisoformat(key_data["created_at"])) # If expires_at is present use it, else default to 1 hour (3600s) from creation if "expires_at" in key_data: expires_at = to_aware(datetime.fromisoformat(key_data["expires_at"])) else: expires_at = created_at + timedelta(hours=1) if datetime.now(timezone.utc) <= expires_at: # It's valid. Check if this is a new IP. is_first_use = "used_at" not in key_data if is_first_use or key_data.get("used_by_ip") != ip: key_data["used_at"] = datetime.now().isoformat() key_data["used_by_ip"] = ip save_keys(data) geo = get_geo_location(ip) msg = f"✅ **Time-Key** '{key}' used by IP: `{ip}`{geo}" access_logger.info(msg) return True geo = get_geo_location(ip) msg = f"⛔ **DENIED:** Invalid, expired, or revoked key '{key}' attempted by IP: `{ip}`{geo}" access_logger.warning(msg) return False def generate_otk(admin_key: str, new_key: str = None, hours: int = 1) -> str: if admin_key != MASTER_KEY: return None if not new_key: new_key = "OTK-" + ''.join(random.choices(string.ascii_uppercase + string.digits, k=8)) data = load_keys() data["otk"][new_key] = { "created_at": datetime.now(timezone.utc).isoformat(), "expires_at": (datetime.now(timezone.utc) + timedelta(hours=hours)).isoformat(), "revoked": False } save_keys(data) access_logger.info(f"ADMIN: New OTK generated: '{new_key}', valid for {hours} hours.") return new_key def revoke_otk(admin_key: str, target_key: str) -> bool: if admin_key != MASTER_KEY: return False data = load_keys() if target_key in data["otk"]: data["otk"][target_key]["revoked"] = True save_keys(data) access_logger.info(f"ADMIN: OTK revoked: '{target_key}'") return True return False def get_all_keys(admin_key: str) -> dict: if admin_key != MASTER_KEY: return {} return load_keys().get("otk", {})