|
|
from __future__ import annotations |
|
|
|
|
|
""" |
|
|
Comprehensive audit system for BLUX-cA. |
|
|
|
|
|
Records all agent decisions, actions, and system events with structured metadata. |
|
|
Supports multiple storage backends and provides query/analytics capabilities. |
|
|
""" |
|
|
|
|
|
import json |
|
|
import logging |
|
|
import pickle |
|
|
from abc import ABC, abstractmethod |
|
|
from datetime import datetime, timedelta |
|
|
from enum import Enum |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, List, Optional, Union, Generator |
|
|
from dataclasses import dataclass, asdict, field |
|
|
from uuid import uuid4 |
|
|
import hashlib |
|
|
|
|
|
from cryptography.fernet import Fernet |
|
|
|
|
|
|
|
|
class AuditLevel(str, Enum): |
|
|
"""Audit entry severity levels.""" |
|
|
DEBUG = "DEBUG" |
|
|
INFO = "INFO" |
|
|
WARNING = "WARNING" |
|
|
ERROR = "ERROR" |
|
|
SECURITY = "SECURITY" |
|
|
DECISION = "DECISION" |
|
|
ACTION = "ACTION" |
|
|
|
|
|
|
|
|
class AuditCategory(str, Enum): |
|
|
"""Categories for audit entries.""" |
|
|
SYSTEM = "SYSTEM" |
|
|
USER_INTERACTION = "USER_INTERACTION" |
|
|
AGENT_DECISION = "AGENT_DECISION" |
|
|
MEMORY_OPERATION = "MEMORY_OPERATION" |
|
|
CONSTITUTION_CHECK = "CONSTITUTION_CHECK" |
|
|
DIMENSION_ANALYSIS = "DIMENSION_ANALYSIS" |
|
|
STATE_TRANSITION = "STATE_TRANSITION" |
|
|
SAFETY_CHECK = "SAFETY_CHECK" |
|
|
PERFORMANCE = "PERFORMANCE" |
|
|
CONFIGURATION = "CONFIGURATION" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class AuditEntry: |
|
|
""" |
|
|
Structured audit entry with comprehensive metadata. |
|
|
""" |
|
|
id: str = field(default_factory=lambda: str(uuid4())) |
|
|
timestamp: datetime = field(default_factory=datetime.now) |
|
|
level: AuditLevel = AuditLevel.INFO |
|
|
category: AuditCategory = AuditCategory.SYSTEM |
|
|
component: str = "unknown" |
|
|
operation: str = "" |
|
|
description: str = "" |
|
|
|
|
|
|
|
|
user_id: Optional[str] = None |
|
|
session_id: Optional[str] = None |
|
|
agent_name: Optional[str] = None |
|
|
input_hash: Optional[str] = None |
|
|
recovery_state: Optional[str] = None |
|
|
|
|
|
|
|
|
details: Dict[str, Any] = field(default_factory=dict) |
|
|
metadata: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
"""Convert to serializable dictionary.""" |
|
|
data = asdict(self) |
|
|
data['timestamp'] = self.timestamp.isoformat() |
|
|
data['level'] = self.level.value |
|
|
data['category'] = self.category.value |
|
|
return data |
|
|
|
|
|
@classmethod |
|
|
def from_dict(cls, data: Dict[str, Any]) -> AuditEntry: |
|
|
"""Create from dictionary.""" |
|
|
|
|
|
data = data.copy() |
|
|
data['timestamp'] = datetime.fromisoformat(data['timestamp']) |
|
|
data['level'] = AuditLevel(data['level']) |
|
|
data['category'] = AuditCategory(data['category']) |
|
|
return cls(**data) |
|
|
|
|
|
def get_hash(self) -> str: |
|
|
"""Get content hash for deduplication.""" |
|
|
content = f"{self.timestamp}{self.level}{self.category}{self.component}{self.operation}{self.description}" |
|
|
return hashlib.sha256(content.encode()).hexdigest() |
|
|
|
|
|
def summarize(self) -> str: |
|
|
"""Get human-readable summary.""" |
|
|
return f"[{self.timestamp.strftime('%Y-%m-%d %H:%M:%S')}] {self.level}: {self.category}/{self.component} - {self.description}" |
|
|
|
|
|
|
|
|
class AuditStorage(ABC): |
|
|
"""Abstract base class for audit storage backends.""" |
|
|
|
|
|
@abstractmethod |
|
|
def store(self, entry: AuditEntry) -> bool: |
|
|
"""Store an audit entry.""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def retrieve( |
|
|
self, |
|
|
start_time: Optional[datetime] = None, |
|
|
end_time: Optional[datetime] = None, |
|
|
level: Optional[AuditLevel] = None, |
|
|
category: Optional[AuditCategory] = None, |
|
|
component: Optional[str] = None, |
|
|
limit: Optional[int] = None |
|
|
) -> List[AuditEntry]: |
|
|
"""Retrieve audit entries matching criteria.""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def count( |
|
|
self, |
|
|
start_time: Optional[datetime] = None, |
|
|
end_time: Optional[datetime] = None, |
|
|
level: Optional[AuditLevel] = None, |
|
|
category: Optional[AuditCategory] = None, |
|
|
component: Optional[str] = None |
|
|
) -> int: |
|
|
"""Count audit entries matching criteria.""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def cleanup(self, older_than_days: int = 30) -> int: |
|
|
"""Clean up old audit entries.""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def get_stats(self) -> Dict[str, Any]: |
|
|
"""Get storage statistics.""" |
|
|
pass |
|
|
|
|
|
|
|
|
class MemoryAuditStorage(AuditStorage): |
|
|
"""In-memory audit storage (default, for testing/small deployments).""" |
|
|
|
|
|
def __init__(self, max_entries: int = 10000): |
|
|
self.entries: List[AuditEntry] = [] |
|
|
self.max_entries = max_entries |
|
|
self.logger = logging.getLogger(__name__) |
|
|
|
|
|
def store(self, entry: AuditEntry) -> bool: |
|
|
try: |
|
|
self.entries.append(entry) |
|
|
|
|
|
|
|
|
if len(self.entries) > self.max_entries: |
|
|
removed = len(self.entries) - self.max_entries |
|
|
self.entries = self.entries[removed:] |
|
|
self.logger.debug(f"Trimmed {removed} old audit entries") |
|
|
|
|
|
return True |
|
|
except Exception as e: |
|
|
self.logger.error(f"Failed to store audit entry: {e}") |
|
|
return False |
|
|
|
|
|
def retrieve( |
|
|
self, |
|
|
start_time: Optional[datetime] = None, |
|
|
end_time: Optional[datetime] = None, |
|
|
level: Optional[AuditLevel] = None, |
|
|
category: Optional[AuditCategory] = None, |
|
|
component: Optional[str] = None, |
|
|
limit: Optional[int] = None |
|
|
) -> List[AuditEntry]: |
|
|
filtered = [] |
|
|
|
|
|
for entry in reversed(self.entries): |
|
|
if start_time and entry.timestamp < start_time: |
|
|
continue |
|
|
if end_time and entry.timestamp > end_time: |
|
|
continue |
|
|
if level and entry.level != level: |
|
|
continue |
|
|
if category and entry.category != category: |
|
|
continue |
|
|
if component and entry.component != component: |
|
|
continue |
|
|
|
|
|
filtered.append(entry) |
|
|
|
|
|
if limit and len(filtered) >= limit: |
|
|
break |
|
|
|
|
|
return filtered |
|
|
|
|
|
def count( |
|
|
self, |
|
|
start_time: Optional[datetime] = None, |
|
|
end_time: Optional[datetime] = None, |
|
|
level: Optional[AuditLevel] = None, |
|
|
category: Optional[AuditCategory] = None, |
|
|
component: Optional[str] = None |
|
|
) -> int: |
|
|
return len(self.retrieve(start_time, end_time, level, category, component, limit=None)) |
|
|
|
|
|
def cleanup(self, older_than_days: int = 30) -> int: |
|
|
cutoff = datetime.now() - timedelta(days=older_than_days) |
|
|
initial_count = len(self.entries) |
|
|
self.entries = [e for e in self.entries if e.timestamp >= cutoff] |
|
|
removed = initial_count - len(self.entries) |
|
|
|
|
|
if removed > 0: |
|
|
self.logger.info(f"Cleaned up {removed} audit entries older than {older_than_days} days") |
|
|
|
|
|
return removed |
|
|
|
|
|
def get_stats(self) -> Dict[str, Any]: |
|
|
return { |
|
|
"storage_type": "memory", |
|
|
"total_entries": len(self.entries), |
|
|
"max_entries": self.max_entries, |
|
|
"levels": {level.value: self.count(level=level) for level in AuditLevel}, |
|
|
"categories": {cat.value: self.count(category=cat) for cat in AuditCategory}, |
|
|
} |
|
|
|
|
|
|
|
|
class FileAuditStorage(AuditStorage): |
|
|
"""File-based audit storage (JSON lines format).""" |
|
|
|
|
|
def __init__(self, filepath: Union[str, Path], encrypt: bool = False): |
|
|
self.filepath = Path(filepath) |
|
|
self.encrypt = encrypt |
|
|
self.encryption_key = None |
|
|
|
|
|
if encrypt: |
|
|
|
|
|
key_file = self.filepath.parent / f"{self.filepath.name}.key" |
|
|
if key_file.exists(): |
|
|
with open(key_file, 'rb') as f: |
|
|
self.encryption_key = f.read() |
|
|
else: |
|
|
self.encryption_key = Fernet.generate_key() |
|
|
with open(key_file, 'wb') as f: |
|
|
f.write(self.encryption_key) |
|
|
|
|
|
self.logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
self.filepath.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
def store(self, entry: AuditEntry) -> bool: |
|
|
try: |
|
|
entry_dict = entry.to_dict() |
|
|
line = json.dumps(entry_dict, ensure_ascii=False) |
|
|
|
|
|
if self.encrypt and self.encryption_key: |
|
|
fernet = Fernet(self.encryption_key) |
|
|
line = fernet.encrypt(line.encode()).decode() |
|
|
|
|
|
with open(self.filepath, 'a', encoding='utf-8') as f: |
|
|
f.write(line + '\n') |
|
|
|
|
|
return True |
|
|
except Exception as e: |
|
|
self.logger.error(f"Failed to store audit entry to file: {e}") |
|
|
return False |
|
|
|
|
|
def _read_entries(self) -> Generator[AuditEntry, None, None]: |
|
|
"""Read entries from file.""" |
|
|
if not self.filepath.exists(): |
|
|
return |
|
|
|
|
|
try: |
|
|
with open(self.filepath, 'r', encoding='utf-8') as f: |
|
|
for line in f: |
|
|
line = line.strip() |
|
|
if not line: |
|
|
continue |
|
|
|
|
|
try: |
|
|
if self.encrypt and self.encryption_key: |
|
|
fernet = Fernet(self.encryption_key) |
|
|
line = fernet.decrypt(line.encode()).decode() |
|
|
|
|
|
entry_dict = json.loads(line) |
|
|
yield AuditEntry.from_dict(entry_dict) |
|
|
except (json.JSONDecodeError, ValueError) as e: |
|
|
self.logger.warning(f"Failed to parse audit entry: {e}") |
|
|
continue |
|
|
except Exception as e: |
|
|
self.logger.error(f"Failed to read audit file: {e}") |
|
|
|
|
|
def retrieve( |
|
|
self, |
|
|
start_time: Optional[datetime] = None, |
|
|
end_time: Optional[datetime] = None, |
|
|
level: Optional[AuditLevel] = None, |
|
|
category: Optional[AuditCategory] = None, |
|
|
component: Optional[str] = None, |
|
|
limit: Optional[int] = None |
|
|
) -> List[AuditEntry]: |
|
|
filtered = [] |
|
|
|
|
|
for entry in self._read_entries(): |
|
|
if start_time and entry.timestamp < start_time: |
|
|
continue |
|
|
if end_time and entry.timestamp > end_time: |
|
|
continue |
|
|
if level and entry.level != level: |
|
|
continue |
|
|
if category and entry.category != category: |
|
|
continue |
|
|
if component and entry.component != component: |
|
|
continue |
|
|
|
|
|
filtered.append(entry) |
|
|
|
|
|
if limit and len(filtered) >= limit: |
|
|
break |
|
|
|
|
|
return filtered |
|
|
|
|
|
def count( |
|
|
self, |
|
|
start_time: Optional[datetime] = None, |
|
|
end_time: Optional[datetime] = None, |
|
|
level: Optional[AuditLevel] = None, |
|
|
category: Optional[AuditCategory] = None, |
|
|
component: Optional[str] = None |
|
|
) -> int: |
|
|
count = 0 |
|
|
for _ in self.retrieve(start_time, end_time, level, category, component, limit=None): |
|
|
count += 1 |
|
|
return count |
|
|
|
|
|
def cleanup(self, older_than_days: int = 30) -> int: |
|
|
cutoff = datetime.now() - timedelta(days=older_than_days) |
|
|
temp_file = self.filepath.with_suffix('.tmp') |
|
|
removed = 0 |
|
|
|
|
|
try: |
|
|
with open(temp_file, 'w', encoding='utf-8') as out_f: |
|
|
for entry in self._read_entries(): |
|
|
if entry.timestamp >= cutoff: |
|
|
entry_dict = entry.to_dict() |
|
|
line = json.dumps(entry_dict, ensure_ascii=False) |
|
|
|
|
|
if self.encrypt and self.encryption_key: |
|
|
fernet = Fernet(self.encryption_key) |
|
|
line = fernet.encrypt(line.encode()).decode() |
|
|
|
|
|
out_f.write(line + '\n') |
|
|
else: |
|
|
removed += 1 |
|
|
|
|
|
|
|
|
temp_file.replace(self.filepath) |
|
|
|
|
|
if removed > 0: |
|
|
self.logger.info(f"Cleaned up {removed} audit entries older than {older_than_days} days") |
|
|
|
|
|
return removed |
|
|
except Exception as e: |
|
|
self.logger.error(f"Failed to clean up audit file: {e}") |
|
|
if temp_file.exists(): |
|
|
temp_file.unlink() |
|
|
return 0 |
|
|
|
|
|
def get_stats(self) -> Dict[str, Any]: |
|
|
stats = { |
|
|
"storage_type": "file", |
|
|
"filepath": str(self.filepath), |
|
|
"encrypted": self.encrypt, |
|
|
"file_size": self.filepath.stat().st_size if self.filepath.exists() else 0, |
|
|
} |
|
|
|
|
|
|
|
|
level_counts = {level.value: 0 for level in AuditLevel} |
|
|
category_counts = {cat.value: 0 for cat in AuditCategory} |
|
|
|
|
|
|
|
|
for entry in self.retrieve(limit=1000): |
|
|
level_counts[entry.level.value] += 1 |
|
|
category_counts[entry.category.value] += 1 |
|
|
|
|
|
stats["level_counts"] = level_counts |
|
|
stats["category_counts"] = category_counts |
|
|
|
|
|
return stats |
|
|
|
|
|
|
|
|
class AuditTrail: |
|
|
""" |
|
|
Comprehensive audit trail system for BLUX-cA. |
|
|
|
|
|
Records all system activities with structured metadata and provides |
|
|
query, analytics, and export capabilities. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
storage_backend: Optional[AuditStorage] = None, |
|
|
component_name: str = "BLUX-cA", |
|
|
enable_audit: bool = True, |
|
|
retention_days: int = 30 |
|
|
): |
|
|
""" |
|
|
Initialize audit trail. |
|
|
|
|
|
Args: |
|
|
storage_backend: AuditStorage implementation (defaults to MemoryAuditStorage) |
|
|
component_name: Name of the component being audited |
|
|
enable_audit: Whether auditing is enabled |
|
|
retention_days: Days to retain audit entries before cleanup |
|
|
""" |
|
|
self.storage = storage_backend or MemoryAuditStorage() |
|
|
self.component_name = component_name |
|
|
self.enable_audit = enable_audit |
|
|
self.retention_days = retention_days |
|
|
self.logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
self.performance_stats = { |
|
|
"entries_logged": 0, |
|
|
"queries_performed": 0, |
|
|
"last_cleanup": None, |
|
|
"errors": 0, |
|
|
} |
|
|
|
|
|
def log( |
|
|
self, |
|
|
level: AuditLevel, |
|
|
category: AuditCategory, |
|
|
operation: str, |
|
|
description: str, |
|
|
details: Optional[Dict[str, Any]] = None, |
|
|
user_id: Optional[str] = None, |
|
|
session_id: Optional[str] = None, |
|
|
agent_name: Optional[str] = None, |
|
|
input_hash: Optional[str] = None, |
|
|
recovery_state: Optional[str] = None, |
|
|
metadata: Optional[Dict[str, Any]] = None, |
|
|
component: Optional[str] = None |
|
|
) -> Optional[AuditEntry]: |
|
|
""" |
|
|
Log an audit entry. |
|
|
|
|
|
Returns: |
|
|
AuditEntry if logged, None if auditing disabled or failed |
|
|
""" |
|
|
if not self.enable_audit: |
|
|
return None |
|
|
|
|
|
try: |
|
|
entry = AuditEntry( |
|
|
level=level, |
|
|
category=category, |
|
|
component=component or self.component_name, |
|
|
operation=operation, |
|
|
description=description, |
|
|
user_id=user_id, |
|
|
session_id=session_id, |
|
|
agent_name=agent_name, |
|
|
input_hash=input_hash, |
|
|
recovery_state=recovery_state, |
|
|
details=details or {}, |
|
|
metadata=metadata or {}, |
|
|
) |
|
|
|
|
|
if self.storage.store(entry): |
|
|
self.performance_stats["entries_logged"] += 1 |
|
|
|
|
|
|
|
|
if level in [AuditLevel.ERROR, AuditLevel.SECURITY, AuditLevel.WARNING]: |
|
|
log_method = getattr(self.logger, level.value.lower()) |
|
|
log_method(f"Audit: {entry.summarize()}") |
|
|
|
|
|
return entry |
|
|
else: |
|
|
self.performance_stats["errors"] += 1 |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
self.performance_stats["errors"] += 1 |
|
|
self.logger.error(f"Failed to create audit entry: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
def log_decision( |
|
|
self, |
|
|
decision: str, |
|
|
rationale: str, |
|
|
user_input: Optional[str] = None, |
|
|
user_id: Optional[str] = None, |
|
|
session_id: Optional[str] = None, |
|
|
agent_name: Optional[str] = None, |
|
|
details: Optional[Dict[str, Any]] = None |
|
|
) -> Optional[AuditEntry]: |
|
|
"""Log an agent decision.""" |
|
|
input_hash = None |
|
|
if user_input: |
|
|
input_hash = hashlib.sha256(user_input.encode()).hexdigest() |
|
|
|
|
|
return self.log( |
|
|
level=AuditLevel.DECISION, |
|
|
category=AuditCategory.AGENT_DECISION, |
|
|
operation="decision_making", |
|
|
description=f"Agent decision: {decision}", |
|
|
details={ |
|
|
"decision": decision, |
|
|
"rationale": rationale, |
|
|
"input_preview": user_input[:100] if user_input else None, |
|
|
**(details or {}) |
|
|
}, |
|
|
user_id=user_id, |
|
|
session_id=session_id, |
|
|
agent_name=agent_name, |
|
|
input_hash=input_hash |
|
|
) |
|
|
|
|
|
def log_user_interaction( |
|
|
self, |
|
|
user_input: str, |
|
|
response: str, |
|
|
user_id: Optional[str] = None, |
|
|
session_id: Optional[str] = None, |
|
|
agent_name: Optional[str] = None, |
|
|
recovery_state: Optional[str] = None, |
|
|
clarity_scores: Optional[Dict[str, float]] = None |
|
|
) -> Optional[AuditEntry]: |
|
|
"""Log a user interaction.""" |
|
|
input_hash = hashlib.sha256(user_input.encode()).hexdigest() |
|
|
|
|
|
return self.log( |
|
|
level=AuditLevel.INFO, |
|
|
category=AuditCategory.USER_INTERACTION, |
|
|
operation="user_interaction", |
|
|
description=f"User interaction processed", |
|
|
details={ |
|
|
"input_preview": user_input[:200], |
|
|
"response_preview": response[:200], |
|
|
"input_length": len(user_input), |
|
|
"response_length": len(response), |
|
|
"clarity_scores": clarity_scores or {}, |
|
|
}, |
|
|
user_id=user_id, |
|
|
session_id=session_id, |
|
|
agent_name=agent_name, |
|
|
input_hash=input_hash, |
|
|
recovery_state=recovery_state |
|
|
) |
|
|
|
|
|
def log_state_transition( |
|
|
self, |
|
|
from_state: str, |
|
|
to_state: str, |
|
|
reason: str, |
|
|
confidence: float, |
|
|
session_id: Optional[str] = None, |
|
|
agent_name: Optional[str] = None |
|
|
) -> Optional[AuditEntry]: |
|
|
"""Log a recovery state transition.""" |
|
|
return self.log( |
|
|
level=AuditLevel.INFO, |
|
|
category=AuditCategory.STATE_TRANSITION, |
|
|
operation="state_transition", |
|
|
description=f"Recovery state transition: {from_state} → {to_state}", |
|
|
details={ |
|
|
"from_state": from_state, |
|
|
"to_state": to_state, |
|
|
"reason": reason, |
|
|
"confidence": confidence, |
|
|
}, |
|
|
session_id=session_id, |
|
|
agent_name=agent_name, |
|
|
recovery_state=to_state |
|
|
) |
|
|
|
|
|
def log_safety_check( |
|
|
self, |
|
|
check_type: str, |
|
|
result: str, |
|
|
details: Dict[str, Any], |
|
|
user_id: Optional[str] = None, |
|
|
session_id: Optional[str] = None, |
|
|
agent_name: Optional[str] = None |
|
|
) -> Optional[AuditEntry]: |
|
|
"""Log a safety/guardrail check.""" |
|
|
return self.log( |
|
|
level=AuditLevel.SECURITY if result == "violation" else AuditLevel.INFO, |
|
|
category=AuditCategory.SAFETY_CHECK, |
|
|
operation="safety_check", |
|
|
description=f"Safety check: {check_type} - {result}", |
|
|
details={ |
|
|
"check_type": check_type, |
|
|
"result": result, |
|
|
**details, |
|
|
}, |
|
|
user_id=user_id, |
|
|
session_id=session_id, |
|
|
agent_name=agent_name |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def get_recent_entries(self, limit: int = 100) -> List[AuditEntry]: |
|
|
"""Get most recent audit entries.""" |
|
|
self.performance_stats["queries_performed"] += 1 |
|
|
return self.storage.retrieve(limit=limit) |
|
|
|
|
|
def get_entries_by_time( |
|
|
self, |
|
|
start_time: datetime, |
|
|
end_time: Optional[datetime] = None |
|
|
) -> List[AuditEntry]: |
|
|
"""Get entries within a time range.""" |
|
|
self.performance_stats["queries_performed"] += 1 |
|
|
return self.storage.retrieve(start_time=start_time, end_time=end_time) |
|
|
|
|
|
def get_entries_by_level(self, level: AuditLevel, limit: int = 100) -> List[AuditEntry]: |
|
|
"""Get entries by severity level.""" |
|
|
self.performance_stats["queries_performed"] += 1 |
|
|
return self.storage.retrieve(level=level, limit=limit) |
|
|
|
|
|
def get_entries_by_category(self, category: AuditCategory, limit: int = 100) -> List[AuditEntry]: |
|
|
"""Get entries by category.""" |
|
|
self.performance_stats["queries_performed"] += 1 |
|
|
return self.storage.retrieve(category=category, limit=limit) |
|
|
|
|
|
def search_entries( |
|
|
self, |
|
|
search_text: str, |
|
|
field: str = "description", |
|
|
limit: int = 100 |
|
|
) -> List[AuditEntry]: |
|
|
"""Search entries by text content.""" |
|
|
self.performance_stats["queries_performed"] += 1 |
|
|
all_entries = self.storage.retrieve(limit=limit * 2) |
|
|
|
|
|
filtered = [] |
|
|
search_text_lower = search_text.lower() |
|
|
|
|
|
for entry in all_entries: |
|
|
if field == "description" and search_text_lower in entry.description.lower(): |
|
|
filtered.append(entry) |
|
|
elif field == "component" and search_text_lower in entry.component.lower(): |
|
|
filtered.append(entry) |
|
|
elif field == "operation" and search_text_lower in entry.operation.lower(): |
|
|
filtered.append(entry) |
|
|
elif field == "all": |
|
|
if (search_text_lower in entry.description.lower() or |
|
|
search_text_lower in entry.component.lower() or |
|
|
search_text_lower in entry.operation.lower()): |
|
|
filtered.append(entry) |
|
|
|
|
|
if len(filtered) >= limit: |
|
|
break |
|
|
|
|
|
return filtered |
|
|
|
|
|
|
|
|
|
|
|
def get_summary_stats(self) -> Dict[str, Any]: |
|
|
"""Get summary statistics.""" |
|
|
stats = self.storage.get_stats() |
|
|
stats.update({ |
|
|
"performance": self.performance_stats, |
|
|
"component": self.component_name, |
|
|
"enabled": self.enable_audit, |
|
|
"retention_days": self.retention_days, |
|
|
}) |
|
|
return stats |
|
|
|
|
|
def export_entries( |
|
|
self, |
|
|
output_format: str = "json", |
|
|
start_time: Optional[datetime] = None, |
|
|
end_time: Optional[datetime] = None, |
|
|
limit: Optional[int] = None |
|
|
) -> Union[str, List[Dict[str, Any]]]: |
|
|
"""Export audit entries.""" |
|
|
entries = self.storage.retrieve(start_time=start_time, end_time=end_time, limit=limit) |
|
|
|
|
|
if output_format == "json": |
|
|
return [entry.to_dict() for entry in entries] |
|
|
elif output_format == "csv": |
|
|
|
|
|
csv_lines = ["timestamp,level,category,component,operation,description"] |
|
|
for entry in entries: |
|
|
csv_lines.append( |
|
|
f'"{entry.timestamp.isoformat()}","{entry.level.value}",' |
|
|
f'"{entry.category.value}","{entry.component}","{entry.operation}",' |
|
|
f'"{entry.description.replace('"', '""')}"' |
|
|
) |
|
|
return "\n".join(csv_lines) |
|
|
else: |
|
|
raise ValueError(f"Unsupported export format: {output_format}") |
|
|
|
|
|
|
|
|
|
|
|
def cleanup_old_entries(self) -> int: |
|
|
"""Clean up entries older than retention period.""" |
|
|
removed = self.storage.cleanup(self.retention_days) |
|
|
if removed > 0: |
|
|
self.performance_stats["last_cleanup"] = datetime.now().isoformat() |
|
|
self.logger.info(f"Cleaned up {removed} old audit entries") |
|
|
return removed |
|
|
|
|
|
def enable(self) -> None: |
|
|
"""Enable auditing.""" |
|
|
self.enable_audit = True |
|
|
self.logger.info("Audit trail enabled") |
|
|
|
|
|
def disable(self) -> None: |
|
|
"""Disable auditing.""" |
|
|
self.enable_audit = False |
|
|
self.logger.info("Audit trail disabled") |
|
|
|
|
|
def set_retention_days(self, days: int) -> None: |
|
|
"""Set retention period in days.""" |
|
|
self.retention_days = days |
|
|
self.logger.info(f"Audit retention set to {days} days") |
|
|
|
|
|
def get_status(self) -> Dict[str, Any]: |
|
|
"""Get audit trail status.""" |
|
|
return { |
|
|
"enabled": self.enable_audit, |
|
|
"storage_type": self.storage.__class__.__name__, |
|
|
"entries_logged": self.performance_stats["entries_logged"], |
|
|
"retention_days": self.retention_days, |
|
|
"last_cleanup": self.performance_stats["last_cleanup"], |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
def create_audit_trail( |
|
|
storage_type: str = "memory", |
|
|
filepath: Optional[str] = None, |
|
|
encrypt: bool = False, |
|
|
component_name: str = "BLUX-cA", |
|
|
max_entries: int = 10000, |
|
|
retention_days: int = 30 |
|
|
) -> AuditTrail: |
|
|
""" |
|
|
Create an audit trail with specified storage backend. |
|
|
|
|
|
Args: |
|
|
storage_type: "memory" or "file" |
|
|
filepath: Required for file storage |
|
|
encrypt: Encrypt file storage |
|
|
component_name: Name of component being audited |
|
|
max_entries: For memory storage |
|
|
retention_days: Days to retain entries |
|
|
|
|
|
Returns: |
|
|
Configured AuditTrail instance |
|
|
""" |
|
|
storage = None |
|
|
|
|
|
if storage_type == "memory": |
|
|
storage = MemoryAuditStorage(max_entries=max_entries) |
|
|
elif storage_type == "file": |
|
|
if not filepath: |
|
|
raise ValueError("Filepath required for file storage") |
|
|
storage = FileAuditStorage(filepath=filepath, encrypt=encrypt) |
|
|
else: |
|
|
raise ValueError(f"Unknown storage type: {storage_type}") |
|
|
|
|
|
return AuditTrail( |
|
|
storage_backend=storage, |
|
|
component_name=component_name, |
|
|
retention_days=retention_days |
|
|
) |