JRNET / core /outline_config.py
Factor Studios
Upload 96 files
6a5b8d8 verified
raw
history blame
4.64 kB
"""
Outline VPN Configuration Manager
"""
import os
import json
from dataclasses import dataclass
from typing import List, Optional, Dict
@dataclass
class OutlineConfig:
port: int = 443
cipher: str = "chacha20-ietf-poly1305"
timeout: int = 600
workers: int = 4
bind_address: str = "0.0.0.0"
access_key_salt: str = os.urandom(32).hex()
@dataclass
class UserConfig:
user_id: str
access_key: str
data_limit: Optional[int] = None
expiry_date: Optional[str] = None
is_active: bool = True
last_connection: Optional[str] = None
bandwidth_usage: int = 0
class OutlineManager:
def __init__(self, config_path: str = "config/outline_config.json"):
self.config_path = config_path
self.config = OutlineConfig()
self.users: List[UserConfig] = []
self._load_config()
def _load_config(self):
"""Load configuration from file or create default"""
os.makedirs(os.path.dirname(self.config_path), exist_ok=True)
if os.path.exists(self.config_path):
with open(self.config_path, 'r') as f:
data = json.load(f)
self.config = OutlineConfig(**data.get('server', {}))
self.users = [UserConfig(**u) for u in data.get('users', [])]
else:
self.save_config()
def save_config(self):
"""Save current configuration to file"""
data = {
'server': self.config.__dict__,
'users': [u.__dict__ for u in self.users]
}
with open(self.config_path, 'w') as f:
json.dump(data, f, indent=4)
def add_user(self, user_id: str, data_limit: Optional[int] = None) -> UserConfig:
"""Add a new user and generate their access key"""
# Check if user already exists
if any(u.user_id == user_id for u in self.users):
raise ValueError(f"User {user_id} already exists")
access_key = self._generate_access_key(user_id)
user = UserConfig(
user_id=user_id,
access_key=access_key,
data_limit=data_limit
)
self.users.append(user)
self.save_config()
return user
def remove_user(self, user_id: str) -> bool:
"""Remove a user by their ID"""
initial_length = len(self.users)
self.users = [u for u in self.users if u.user_id != user_id]
if len(self.users) < initial_length:
self.save_config()
return True
return False
def get_user_by_key(self, access_key: str) -> Optional[UserConfig]:
"""Find user by their access key"""
for user in self.users:
if user.access_key == access_key and user.is_active:
return user
return None
def update_user_bandwidth(self, user_id: str, bytes_used: int):
"""Update user's bandwidth usage"""
for user in self.users:
if user.user_id == user_id:
user.bandwidth_usage += bytes_used
if user.data_limit and user.bandwidth_usage >= user.data_limit:
user.is_active = False
self.save_config()
break
def _generate_access_key(self, user_id: str) -> str:
"""Generate a unique access key for a user"""
import hashlib
key = hashlib.sha256(f"{user_id}{self.config.access_key_salt}".encode()).hexdigest()
return key[:32] # Return first 32 chars as access key
def get_server_stats(self) -> Dict:
"""Get server statistics"""
return {
"total_users": len(self.users),
"active_users": sum(1 for u in self.users if u.is_active),
"total_bandwidth": sum(u.bandwidth_usage for u in self.users)
}
def generate_openvpn_certificates(config_id: str) -> Dict:
"""Placeholder for OpenVPN certificate generation"""
return {"cert": "placeholder_cert", "key": "placeholder_key"}
def generate_openvpn_config(config_id: str, server_ip: str) -> str:
"""Placeholder for OpenVPN config generation"""
return f"client\ndev tun\nproto udp\nremote {server_ip} 1194\nresolv-retry infinite\nnobind\npersist-key\npersist-tun\nremote-cert-tls server\ncipher AES-256-CBC\nverb 3"
def generate_wireguard_keys(config_id: str) -> Dict:
"""Placeholder for WireGuard key generation"""
return {"server_public": "placeholder_server_public", "client_private": "placeholder_client_private", "client_public": "placeholder_client_public"}