agentic-browser / src /security /ethical_guardian.py
anu151105's picture
Initial deployment of Agentic Browser
24a7f55
"""
Ethical Guardian module for the Security & Ethics components.
This module ensures that the agent operates according to ethical guidelines
and compliance with privacy regulations.
"""
import asyncio
import json
import logging
import os
import re
import time
from typing import Dict, List, Any, Optional, Union, Tuple
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EthicalGuardian:
"""
Ensures the agent operates according to ethical guidelines.
This class validates tasks, enforces privacy protections, and ensures
compliance with regulations like GDPR, CCPA, etc.
"""
def __init__(self):
"""Initialize the EthicalGuardian."""
self.llm_client = None
self.ethics_model = os.environ.get("ETHICS_MODEL", "gpt-4-turbo")
# Rules and policies
self.ethical_guidelines = []
self.privacy_policies = []
self.blocked_domains = []
self.data_retention_policies = {}
self.risk_thresholds = {
"low": 0.3,
"medium": 0.6,
"high": 0.8
}
# Load default guidelines
self._load_default_guidelines()
logger.info("EthicalGuardian instance created")
async def initialize(self):
"""Initialize resources."""
try:
import openai
self.llm_client = openai.AsyncClient(
api_key=os.environ.get("OPENAI_API_KEY")
)
# Load custom guidelines from environment if available
custom_guidelines_path = os.environ.get("ETHICAL_GUIDELINES_PATH")
if custom_guidelines_path and os.path.exists(custom_guidelines_path):
with open(custom_guidelines_path, 'r') as f:
custom_guidelines = json.load(f)
self.ethical_guidelines.extend(custom_guidelines.get("ethical_guidelines", []))
self.privacy_policies.extend(custom_guidelines.get("privacy_policies", []))
self.blocked_domains.extend(custom_guidelines.get("blocked_domains", []))
logger.info("EthicalGuardian initialized successfully")
return True
except Exception as e:
logger.error(f"Error initializing ethical guardian: {str(e)}")
return False
async def validate_task(self, task_description: str) -> Tuple[bool, Optional[str]]:
"""
Validate if a task is ethically permissible.
Args:
task_description: Description of the task to validate
Returns:
Tuple[bool, Optional[str]]: (is_valid, reason if invalid)
"""
# Basic rule-based checks
basic_check = self._check_against_rules(task_description)
if not basic_check[0]:
logger.warning(f"Task rejected by rule-based check: {basic_check[1]}")
return basic_check
# Domain check for blocked sites
domain_check = self._check_blocked_domains(task_description)
if not domain_check[0]:
logger.warning(f"Task rejected due to blocked domain: {domain_check[1]}")
return domain_check
# LLM-based ethical analysis for complex cases
if self.llm_client:
analysis = await self._analyze_task_ethics(task_description)
if not analysis["is_ethical"]:
logger.warning(f"Task rejected by ethical analysis: {analysis['reasoning']}")
return False, analysis["reasoning"]
return True, None
def _check_against_rules(self, task_description: str) -> Tuple[bool, Optional[str]]:
"""
Check a task against predefined ethical rules.
Args:
task_description: Description of the task to check
Returns:
Tuple[bool, Optional[str]]: (is_valid, reason if invalid)
"""
task_lower = task_description.lower()
# Check against ethical guidelines
for guideline in self.ethical_guidelines:
rule = guideline["rule"].lower()
if rule in task_lower or any(term in task_lower for term in guideline.get("terms", [])):
return False, guideline["message"]
# Check against privacy policies
for policy in self.privacy_policies:
rule = policy["rule"].lower()
if rule in task_lower or any(term in task_lower for term in policy.get("terms", [])):
return False, policy["message"]
return True, None
def _check_blocked_domains(self, task_description: str) -> Tuple[bool, Optional[str]]:
"""
Check if a task involves blocked domains.
Args:
task_description: Description of the task to check
Returns:
Tuple[bool, Optional[str]]: (is_valid, reason if invalid)
"""
# Extract potential URLs from task
url_pattern = re.compile(r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+')
urls = url_pattern.findall(task_description)
# Extract domains
domains = [url.split('//')[1].split('/')[0] for url in urls]
# Check against blocked domains
for domain in domains:
for blocked in self.blocked_domains:
if blocked in domain:
return False, f"Domain '{domain}' is blocked by policy"
return True, None
async def _analyze_task_ethics(self, task_description: str) -> Dict:
"""
Perform a detailed ethical analysis of a task using LLMs.
Args:
task_description: Description of the task to analyze
Returns:
Dict: Analysis results
"""
try:
# Format guidelines for prompt
guidelines_text = "\n".join([f"- {g['rule']}" for g in self.ethical_guidelines])
privacy_text = "\n".join([f"- {p['rule']}" for p in self.privacy_policies])
prompt = f"""
You are an ethical evaluation system for an AI agent. Assess if this task is ethically permissible:
Task: {task_description}
Ethical guidelines:
{guidelines_text}
Privacy policies:
{privacy_text}
Perform the following analysis:
1. Identify any ethical concerns with the task
2. Check for privacy implications
3. Assess potential for harm or misuse
4. Evaluate legal compliance
5. Consider data protection requirements
Return your analysis as a JSON object with these fields:
- is_ethical: boolean indicating if task is ethically permissible
- risk_level: string ("low", "medium", "high")
- concerns: array of specific concerns
- reasoning: detailed explanation of your assessment
"""
response = await self.llm_client.chat.completions.create(
model=self.ethics_model,
messages=[
{"role": "system", "content": "You are an AI ethics evaluation system that assesses whether tasks comply with ethical guidelines and privacy policies. You are thorough, cautious, and prioritize safety and compliance."},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"}
)
analysis = json.loads(response.choices[0].message.content)
# Ensure required fields
if "is_ethical" not in analysis:
analysis["is_ethical"] = False
analysis["reasoning"] = "Could not confirm ethical compliance"
return analysis
except Exception as e:
logger.error(f"Error in ethical analysis: {str(e)}")
# Default to cautious approach on error
return {
"is_ethical": False,
"risk_level": "high",
"concerns": ["Error in ethical analysis"],
"reasoning": f"Could not complete ethical analysis due to error: {str(e)}"
}
def validate_data_collection(self, data_type: str, purpose: str) -> Tuple[bool, Optional[str]]:
"""
Validate if data collection is permissible.
Args:
data_type: Type of data to collect
purpose: Purpose of data collection
Returns:
Tuple[bool, Optional[str]]: (is_valid, reason if invalid)
"""
sensitive_data_types = [
"password", "credit_card", "social_security", "health",
"biometric", "political", "religious", "sexual_orientation"
]
if data_type.lower() in sensitive_data_types:
return False, f"Collection of {data_type} data is restricted by policy"
valid_purposes = ["task_execution", "debug", "performance_improvement", "error_recovery"]
if purpose.lower() not in valid_purposes:
return False, f"Purpose '{purpose}' is not an approved data collection purpose"
return True, None
async def validate_action(self, action: Dict) -> Tuple[bool, Optional[str]]:
"""
Validate if an action is ethically permissible.
Args:
action: Action configuration to validate
Returns:
Tuple[bool, Optional[str]]: (is_valid, reason if invalid)
"""
action_type = action.get("type", "").lower()
# High-risk actions require special validation
high_risk_actions = ["form_fill", "api_call", "click"]
if action_type in high_risk_actions:
# For form filling, check what data is being entered
if action_type == "form_fill" and "fields" in action:
for field in action["fields"]:
if "sensitive" in field and field["sensitive"]:
# Special validation for sensitive data
return False, "Action involves entering sensitive data"
# For API calls, check the endpoint and payload
if action_type == "api_call":
endpoint = action.get("endpoint", "")
if any(blocked in endpoint for blocked in self.blocked_domains):
return False, f"API endpoint contains blocked domain"
return True, None
def check_data_retention(self, data_type: str) -> int:
"""
Get the retention period for a type of data.
Args:
data_type: Type of data to check
Returns:
int: Retention period in seconds
"""
# Default is 30 days
default_retention = 30 * 24 * 60 * 60
return self.data_retention_policies.get(data_type, default_retention)
def _load_default_guidelines(self):
"""Load default ethical guidelines and privacy policies."""
self.ethical_guidelines = [
{
"rule": "Do not engage in illegal activities",
"terms": ["illegal", "unlawful", "crime", "criminal"],
"message": "Cannot perform illegal activities"
},
{
"rule": "Do not harm individuals or groups",
"terms": ["harm", "hurt", "damage", "attack"],
"message": "Cannot perform actions that might harm individuals or groups"
},
{
"rule": "Do not access unauthorized systems or data",
"terms": ["hack", "breach", "unauthorized", "crack", "steal"],
"message": "Cannot access unauthorized systems or data"
},
{
"rule": "Do not create or distribute malicious content",
"terms": ["malware", "virus", "phishing", "scam"],
"message": "Cannot create or distribute malicious content"
},
{
"rule": "Do not impersonate individuals or organizations",
"terms": ["impersonate", "pretend", "fake"],
"message": "Cannot impersonate individuals or organizations"
}
]
self.privacy_policies = [
{
"rule": "Do not collect data beyond what's necessary for the task",
"terms": ["collect", "gather", "harvest"],
"message": "Cannot collect data beyond what's necessary for the task"
},
{
"rule": "Do not store sensitive personal information",
"terms": ["password", "credit card", "ssn", "social security"],
"message": "Cannot store sensitive personal information"
},
{
"rule": "Respect user consent for data processing",
"terms": ["consent", "permission"],
"message": "Must respect user consent for data processing"
},
{
"rule": "Comply with GDPR and other privacy regulations",
"terms": ["gdpr", "ccpa", "privacy regulation"],
"message": "Must comply with applicable privacy regulations"
}
]
self.blocked_domains = [
"malware.com",
"phishing.org",
"darknet",
"hacking.net"
]
self.data_retention_policies = {
"browsing_history": 30 * 24 * 60 * 60, # 30 days in seconds
"form_data": 7 * 24 * 60 * 60, # 7 days in seconds
"user_preferences": 365 * 24 * 60 * 60, # 1 year in seconds
"error_logs": 90 * 24 * 60 * 60 # 90 days in seconds
}