JRNET / core /outline_config.py
Factor Studios
Upload 96 files
6a5b8d8 verified
"""
Outline VPN Configuration Manager
"""
import os
import json
from dataclasses import dataclass
from typing import List, Optional, Dict
@dataclass
class OutlineConfig:
port: int = 443
cipher: str = "chacha20-ietf-poly1305"
timeout: int = 600
workers: int = 4
bind_address: str = "0.0.0.0"
access_key_salt: str = os.urandom(32).hex()
@dataclass
class UserConfig:
user_id: str
access_key: str
data_limit: Optional[int] = None
expiry_date: Optional[str] = None
is_active: bool = True
last_connection: Optional[str] = None
bandwidth_usage: int = 0
class OutlineManager:
def __init__(self, config_path: str = "config/outline_config.json"):
self.config_path = config_path
self.config = OutlineConfig()
self.users: List[UserConfig] = []
self._load_config()
def _load_config(self):
"""Load configuration from file or create default"""
os.makedirs(os.path.dirname(self.config_path), exist_ok=True)
if os.path.exists(self.config_path):
with open(self.config_path, 'r') as f:
data = json.load(f)
self.config = OutlineConfig(**data.get('server', {}))
self.users = [UserConfig(**u) for u in data.get('users', [])]
else:
self.save_config()
def save_config(self):
"""Save current configuration to file"""
data = {
'server': self.config.__dict__,
'users': [u.__dict__ for u in self.users]
}
with open(self.config_path, 'w') as f:
json.dump(data, f, indent=4)
def add_user(self, user_id: str, data_limit: Optional[int] = None) -> UserConfig:
"""Add a new user and generate their access key"""
# Check if user already exists
if any(u.user_id == user_id for u in self.users):
raise ValueError(f"User {user_id} already exists")
access_key = self._generate_access_key(user_id)
user = UserConfig(
user_id=user_id,
access_key=access_key,
data_limit=data_limit
)
self.users.append(user)
self.save_config()
return user
def remove_user(self, user_id: str) -> bool:
"""Remove a user by their ID"""
initial_length = len(self.users)
self.users = [u for u in self.users if u.user_id != user_id]
if len(self.users) < initial_length:
self.save_config()
return True
return False
def get_user_by_key(self, access_key: str) -> Optional[UserConfig]:
"""Find user by their access key"""
for user in self.users:
if user.access_key == access_key and user.is_active:
return user
return None
def update_user_bandwidth(self, user_id: str, bytes_used: int):
"""Update user's bandwidth usage"""
for user in self.users:
if user.user_id == user_id:
user.bandwidth_usage += bytes_used
if user.data_limit and user.bandwidth_usage >= user.data_limit:
user.is_active = False
self.save_config()
break
def _generate_access_key(self, user_id: str) -> str:
"""Generate a unique access key for a user"""
import hashlib
key = hashlib.sha256(f"{user_id}{self.config.access_key_salt}".encode()).hexdigest()
return key[:32] # Return first 32 chars as access key
def get_server_stats(self) -> Dict:
"""Get server statistics"""
return {
"total_users": len(self.users),
"active_users": sum(1 for u in self.users if u.is_active),
"total_bandwidth": sum(u.bandwidth_usage for u in self.users)
}
def generate_openvpn_certificates(config_id: str) -> Dict:
"""Placeholder for OpenVPN certificate generation"""
return {"cert": "placeholder_cert", "key": "placeholder_key"}
def generate_openvpn_config(config_id: str, server_ip: str) -> str:
"""Placeholder for OpenVPN config generation"""
return f"client\ndev tun\nproto udp\nremote {server_ip} 1194\nresolv-retry infinite\nnobind\npersist-key\npersist-tun\nremote-cert-tls server\ncipher AES-256-CBC\nverb 3"
def generate_wireguard_keys(config_id: str) -> Dict:
"""Placeholder for WireGuard key generation"""
return {"server_public": "placeholder_server_public", "client_private": "placeholder_client_private", "client_public": "placeholder_client_public"}