Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import logging | |
| import random | |
| import string | |
| import psutil | |
| from datetime import datetime, timedelta, timezone | |
| import requests | |
| import functools | |
| def to_aware(dt: datetime) -> datetime: | |
| if dt.tzinfo is None: | |
| return dt.replace(tzinfo=timezone.utc) | |
| return dt | |
| import redis | |
| from constants import KEYS_FILE, MASTER_KEY, OUTPUT_DIR | |
| def get_geo_location(ip: str) -> str: | |
| if ip in ("127.0.0.1", "::1", "localhost", "Unknown"): | |
| return "" | |
| try: | |
| res = requests.get(f"http://ip-api.com/json/{ip}?fields=status,country,city", timeout=2) | |
| if res.ok: | |
| data = res.json() | |
| if data.get("status") == "success": | |
| city = data.get("city", "") | |
| country = data.get("country", "") | |
| if city and country: | |
| return f" ({city}, {country})" | |
| elif country: | |
| return f" ({country})" | |
| except Exception: | |
| pass | |
| return "" | |
| # Set up access logger | |
| access_logger = logging.getLogger("wealth_access") | |
| access_logger.setLevel(logging.INFO) | |
| handler = logging.FileHandler(os.path.join(OUTPUT_DIR, "access.log"), encoding='utf-8') | |
| formatter = logging.Formatter('%(asctime)s - %(message)s') | |
| handler.setFormatter(formatter) | |
| access_logger.addHandler(handler) | |
| # Initialize Redis client if REDIS_URL is provided | |
| redis_url = os.getenv("REDIS_URL") | |
| redis_client = None | |
| if redis_url: | |
| try: | |
| redis_client = redis.from_url(redis_url, decode_responses=True) | |
| access_logger.info("Successfully connected to Redis for access key storage.") | |
| except Exception as e: | |
| access_logger.error(f"Failed to connect to Redis: {e}") | |
| def load_keys(): | |
| if redis_client: | |
| try: | |
| data = redis_client.get("access_keys") | |
| if data: | |
| return json.loads(data) | |
| except Exception as e: | |
| access_logger.error(f"Redis read error: {e}") | |
| # Fallback to local JSON | |
| if not os.path.exists(KEYS_FILE): | |
| return {"otk": {}} | |
| try: | |
| with open(KEYS_FILE, 'r') as f: | |
| return json.load(f) | |
| except: | |
| return {"otk": {}} | |
| def save_keys(data): | |
| if redis_client: | |
| try: | |
| redis_client.set("access_keys", json.dumps(data)) | |
| return | |
| except Exception as e: | |
| access_logger.error(f"Redis write error: {e}") | |
| # Fallback to local JSON | |
| with open(KEYS_FILE, 'w') as f: | |
| json.dump(data, f, indent=4) | |
| def validate_key(key: str, ip: str = "Unknown", silent: bool = False) -> bool: | |
| if not key: | |
| return False | |
| key = key.strip() | |
| # Check Master Key | |
| if key == MASTER_KEY: | |
| geo = get_geo_location(ip) | |
| msg = f"✅ **MASTER_KEY** used by IP: `{ip}`{geo}" | |
| access_logger.info(msg) | |
| return True | |
| # Check Time-Based Keys | |
| data = load_keys() | |
| if key in data["otk"] and data["otk"][key].get("revoked", False) == False: | |
| key_data = data["otk"][key] | |
| created_at = to_aware(datetime.fromisoformat(key_data["created_at"])) | |
| # If expires_at is present use it, else default to 1 hour (3600s) from creation | |
| if "expires_at" in key_data: | |
| expires_at = to_aware(datetime.fromisoformat(key_data["expires_at"])) | |
| else: | |
| expires_at = created_at + timedelta(hours=1) | |
| if datetime.now(timezone.utc) <= expires_at: | |
| # It's valid. Check if this is a new IP. | |
| is_first_use = "used_at" not in key_data | |
| if is_first_use or key_data.get("used_by_ip") != ip: | |
| key_data["used_at"] = datetime.now().isoformat() | |
| key_data["used_by_ip"] = ip | |
| save_keys(data) | |
| geo = get_geo_location(ip) | |
| msg = f"✅ **Time-Key** '{key}' used by IP: `{ip}`{geo}" | |
| access_logger.info(msg) | |
| return True | |
| geo = get_geo_location(ip) | |
| msg = f"⛔ **DENIED:** Invalid, expired, or revoked key '{key}' attempted by IP: `{ip}`{geo}" | |
| access_logger.warning(msg) | |
| return False | |
| def generate_otk(admin_key: str, new_key: str = None, hours: int = 1) -> str: | |
| if admin_key != MASTER_KEY: | |
| return None | |
| if not new_key: | |
| new_key = "OTK-" + ''.join(random.choices(string.ascii_uppercase + string.digits, k=8)) | |
| data = load_keys() | |
| data["otk"][new_key] = { | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| "expires_at": (datetime.now(timezone.utc) + timedelta(hours=hours)).isoformat(), | |
| "revoked": False | |
| } | |
| save_keys(data) | |
| access_logger.info(f"ADMIN: New OTK generated: '{new_key}', valid for {hours} hours.") | |
| return new_key | |
| def revoke_otk(admin_key: str, target_key: str) -> bool: | |
| if admin_key != MASTER_KEY: | |
| return False | |
| data = load_keys() | |
| if target_key in data["otk"]: | |
| data["otk"][target_key]["revoked"] = True | |
| save_keys(data) | |
| access_logger.info(f"ADMIN: OTK revoked: '{target_key}'") | |
| return True | |
| return False | |
| def get_all_keys(admin_key: str) -> dict: | |
| if admin_key != MASTER_KEY: | |
| return {} | |
| return load_keys().get("otk", {}) | |