portfolio-engine / access_manager.py
engineportf's picture
Initial Deployment from Local Engine
208fbf8 verified
Raw
History Blame Contribute Delete
5.49 kB
import os
import json
import logging
import random
import string
import psutil
from datetime import datetime, timedelta, timezone
import requests
import functools
def to_aware(dt: datetime) -> datetime:
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt
import redis
from constants import KEYS_FILE, MASTER_KEY, OUTPUT_DIR
@functools.lru_cache(maxsize=1024)
def get_geo_location(ip: str) -> str:
if ip in ("127.0.0.1", "::1", "localhost", "Unknown"):
return ""
try:
res = requests.get(f"http://ip-api.com/json/{ip}?fields=status,country,city", timeout=2)
if res.ok:
data = res.json()
if data.get("status") == "success":
city = data.get("city", "")
country = data.get("country", "")
if city and country:
return f" ({city}, {country})"
elif country:
return f" ({country})"
except Exception:
pass
return ""
# Set up access logger
access_logger = logging.getLogger("wealth_access")
access_logger.setLevel(logging.INFO)
handler = logging.FileHandler(os.path.join(OUTPUT_DIR, "access.log"), encoding='utf-8')
formatter = logging.Formatter('%(asctime)s - %(message)s')
handler.setFormatter(formatter)
access_logger.addHandler(handler)
# Initialize Redis client if REDIS_URL is provided
redis_url = os.getenv("REDIS_URL")
redis_client = None
if redis_url:
try:
redis_client = redis.from_url(redis_url, decode_responses=True)
access_logger.info("Successfully connected to Redis for access key storage.")
except Exception as e:
access_logger.error(f"Failed to connect to Redis: {e}")
def load_keys():
if redis_client:
try:
data = redis_client.get("access_keys")
if data:
return json.loads(data)
except Exception as e:
access_logger.error(f"Redis read error: {e}")
# Fallback to local JSON
if not os.path.exists(KEYS_FILE):
return {"otk": {}}
try:
with open(KEYS_FILE, 'r') as f:
return json.load(f)
except:
return {"otk": {}}
def save_keys(data):
if redis_client:
try:
redis_client.set("access_keys", json.dumps(data))
return
except Exception as e:
access_logger.error(f"Redis write error: {e}")
# Fallback to local JSON
with open(KEYS_FILE, 'w') as f:
json.dump(data, f, indent=4)
def validate_key(key: str, ip: str = "Unknown", silent: bool = False) -> bool:
if not key:
return False
key = key.strip()
# Check Master Key
if key == MASTER_KEY:
geo = get_geo_location(ip)
msg = f"✅ **MASTER_KEY** used by IP: `{ip}`{geo}"
access_logger.info(msg)
return True
# Check Time-Based Keys
data = load_keys()
if key in data["otk"] and data["otk"][key].get("revoked", False) == False:
key_data = data["otk"][key]
created_at = to_aware(datetime.fromisoformat(key_data["created_at"]))
# If expires_at is present use it, else default to 1 hour (3600s) from creation
if "expires_at" in key_data:
expires_at = to_aware(datetime.fromisoformat(key_data["expires_at"]))
else:
expires_at = created_at + timedelta(hours=1)
if datetime.now(timezone.utc) <= expires_at:
# It's valid. Check if this is a new IP.
is_first_use = "used_at" not in key_data
if is_first_use or key_data.get("used_by_ip") != ip:
key_data["used_at"] = datetime.now().isoformat()
key_data["used_by_ip"] = ip
save_keys(data)
geo = get_geo_location(ip)
msg = f"✅ **Time-Key** '{key}' used by IP: `{ip}`{geo}"
access_logger.info(msg)
return True
geo = get_geo_location(ip)
msg = f"⛔ **DENIED:** Invalid, expired, or revoked key '{key}' attempted by IP: `{ip}`{geo}"
access_logger.warning(msg)
return False
def generate_otk(admin_key: str, new_key: str = None, hours: int = 1) -> str:
if admin_key != MASTER_KEY:
return None
if not new_key:
new_key = "OTK-" + ''.join(random.choices(string.ascii_uppercase + string.digits, k=8))
data = load_keys()
data["otk"][new_key] = {
"created_at": datetime.now(timezone.utc).isoformat(),
"expires_at": (datetime.now(timezone.utc) + timedelta(hours=hours)).isoformat(),
"revoked": False
}
save_keys(data)
access_logger.info(f"ADMIN: New OTK generated: '{new_key}', valid for {hours} hours.")
return new_key
def revoke_otk(admin_key: str, target_key: str) -> bool:
if admin_key != MASTER_KEY:
return False
data = load_keys()
if target_key in data["otk"]:
data["otk"][target_key]["revoked"] = True
save_keys(data)
access_logger.info(f"ADMIN: OTK revoked: '{target_key}'")
return True
return False
def get_all_keys(admin_key: str) -> dict:
if admin_key != MASTER_KEY:
return {}
return load_keys().get("otk", {})