darkfire514's picture
Upload 160 files
399b80c verified
import asyncio
import sys
from typing import Callable, Awaitable, Dict, Optional
from ..types import SecurityPolicy, BackendType
PromptFunc = Callable[[str], Awaitable[bool]]
# ANSI color codes
class Colors:
RESET = "\033[0m"
BOLD = "\033[1m"
RED = "\033[91m"
YELLOW = "\033[93m"
GREEN = "\033[92m"
CYAN = "\033[96m"
GRAY = "\033[90m"
WHITE = "\033[97m"
class SecurityPolicyManager:
def __init__(self, prompt: PromptFunc | None = None):
self._policies: Dict[BackendType, SecurityPolicy] = {}
self._global_policy: Optional[SecurityPolicy] = None
self._prompt: PromptFunc | None = prompt or self._default_cli_prompt
async def _default_cli_prompt(self, message: str) -> bool:
# Clean and professional prompt using unified display
from openspace.utils.display import Box, BoxStyle, colorize, print_separator
print()
print_separator(70, 'y', 2)
print(f" {colorize('⚠️ Security Policy Warning', color=Colors.RED, bold=True)}")
print_separator(70, 'y', 2)
print(f" {message}")
print_separator(70, 'gr', 2)
print(f" {colorize('[y/yes]', color=Colors.GREEN)} Allow | {colorize('[n/no]', color=Colors.RED)} Deny")
print_separator(70, 'gr', 2)
print(f" {colorize('Your choice:', bold=True)} ", end="", flush=True)
answer = await asyncio.get_running_loop().run_in_executor(None, sys.stdin.readline)
response = answer.strip().lower() in {"y", "yes"}
if response:
print(f" {colorize('✓ Allowed', color=Colors.GREEN)}\n")
else:
print(f" {colorize('✗ Denied', color=Colors.RED)}\n")
return response
def set_global_policy(self, policy: SecurityPolicy) -> None:
self._global_policy = policy
def set_backend_policy(self, backend_type: BackendType, policy: SecurityPolicy) -> None:
self._policies[backend_type] = policy
def get_policy(self, backend_type: BackendType) -> SecurityPolicy:
policy = self._policies.get(backend_type)
if policy:
return policy
if self._global_policy:
return self._global_policy
return SecurityPolicy()
async def _ask_user(self, message: str) -> bool:
"""If prompt is provided, ask user for confirmation, otherwise default to deny"""
if self._prompt:
try:
return await self._prompt(message)
except Exception:
return False
return False
async def check_command_allowed(self, backend_type: BackendType, command: str) -> bool:
policy = self.get_policy(backend_type)
if policy.check(command=command):
return True
# Find dangerous tokens
dangerous_tokens = policy.find_dangerous_tokens(command)
# Extract only lines containing dangerous commands
lines = command.split('\n')
dangerous_lines = []
for i, line in enumerate(lines):
line_lower = line.lower()
if any(token in line_lower for token in dangerous_tokens):
# Add line number and the line itself
dangerous_lines.append((i + 1, line.strip()))
# If no specific dangerous lines found but policy failed, show first few lines
if not dangerous_lines:
dangerous_lines = [(i + 1, line.strip()) for i, line in enumerate(lines[:5])]
# Format dangerous lines for display (limit to 10 lines)
max_display_lines = 10
if len(dangerous_lines) > max_display_lines:
display_lines = dangerous_lines[:max_display_lines]
truncated = True
else:
display_lines = dangerous_lines
truncated = False
# Build formatted command display
formatted_cmd_lines = []
for line_num, line in display_lines:
# Truncate very long lines
if len(line) > 80:
line = line[:77] + "..."
formatted_cmd_lines.append(f" L{line_num}: {line}")
if truncated:
formatted_cmd_lines.append(" ... (more lines)")
formatted_command = '\n'.join(formatted_cmd_lines)
# Show which dangerous commands were detected
dangerous_list = ', '.join([f"{Colors.RED}{tok}{Colors.RESET}" for tok in dangerous_tokens[:5]])
from openspace.utils.display import Box, BoxStyle, colorize
# Build command box
box = Box(width=66, style=BoxStyle.SQUARE, color='gr')
cmd_box = [
box.top_line(2),
box.empty_line(2),
]
for line in formatted_cmd_lines:
cmd_box.append(box.text_line(line, indent=2))
cmd_box.extend([
box.empty_line(2),
box.bottom_line(2)
])
message = (
f"\n{colorize('Potentially dangerous command detected', color=Colors.WHITE)}\n\n"
f"Backend: {colorize(backend_type.value, color=Colors.CYAN)}\n"
f"Dangerous commands: {dangerous_list}\n\n"
f"Affected lines:\n"
+ "\n".join(cmd_box) + "\n\n"
f"{colorize('This command may contain risky operations. Continue?', color=Colors.YELLOW)}"
)
return await self._ask_user(message)
async def check_domain_allowed(self, backend_type: BackendType, domain: str) -> bool:
policy = self.get_policy(backend_type)
if policy.check(domain=domain):
return True
message = (
f"\n{Colors.WHITE}Unauthorized domain access detected{Colors.RESET}\n\n"
f"Backend: {Colors.CYAN}{backend_type.value}{Colors.RESET}\n"
f"Domain: {Colors.YELLOW}{domain}{Colors.RESET}\n\n"
f"{Colors.YELLOW}This domain is not in the allowed list. Continue?{Colors.RESET}"
)
return await self._ask_user(message)