c2sentinel / examples /advanced_usage.py
danielostrow's picture
Upload folder using huggingface_hub
3751c05 verified
raw
history blame
8.16 kB
#!/usr/bin/env python3
"""
C2Sentinel Advanced Usage Example
Demonstrates context enrichment, whitelist/blacklist management,
batch analysis, log parsing, and reconnaissance features.
"""
from c2sentinel import C2Sentinel, ConnectionContext
def main():
# Load the model
sentinel = C2Sentinel.load('c2_sentinel')
# =========================================================================
# Context Enrichment
# =========================================================================
print("=" * 60)
print("Context Enrichment")
print("=" * 60)
# Create connections that might look suspicious
connections = []
timestamp = 1705600000
for i in range(10):
connections.append({
'timestamp': timestamp + (i * 60),
'dst_ip': '10.0.0.50',
'dst_port': 443,
'bytes_sent': 200,
'bytes_recv': 500,
})
# Analyze without context
result_no_ctx = sentinel.analyze(connections)
print(f"Without context: is_c2={result_no_ctx.is_c2}, prob={result_no_ctx.c2_probability:.2f}")
# Analyze with context indicating this is a known monitoring agent
context = ConnectionContext(
process_name='prometheus',
known_good=True,
ip_reputation=0.95,
dns_queries=['metrics.internal.company.com']
)
result_with_ctx = sentinel.analyze(connections, context=context)
print(f"With context: is_c2={result_with_ctx.is_c2}, prob={result_with_ctx.c2_probability:.2f}")
print(f"Context applied: {result_with_ctx.context_applied}")
print()
# =========================================================================
# Whitelist and Blacklist Management
# =========================================================================
print("=" * 60)
print("Whitelist and Blacklist")
print("=" * 60)
# Add trusted infrastructure to whitelist
sentinel.add_whitelist(
ips=['8.8.8.8', '1.1.1.1'],
domains=['google.com', 'cloudflare.com']
)
# Add known malicious indicators to blacklist
sentinel.add_blacklist(
ips=['10.10.10.10'],
domains=['malware.example.com']
)
# Test whitelisted IP
dns_connections = []
for i in range(10):
dns_connections.append({
'timestamp': timestamp + (i * 5),
'dst_ip': '8.8.8.8',
'dst_port': 53,
'bytes_sent': 50,
'bytes_recv': 200,
})
result = sentinel.analyze(dns_connections)
print(f"Whitelisted DNS (8.8.8.8): is_c2={result.is_c2}")
# Test blacklisted IP
blacklist_connections = []
for i in range(10):
blacklist_connections.append({
'timestamp': timestamp + (i * 60),
'dst_ip': '10.10.10.10',
'dst_port': 443,
'bytes_sent': 200,
'bytes_recv': 500,
})
result = sentinel.analyze(blacklist_connections)
print(f"Blacklisted IP (10.10.10.10): is_c2={result.is_c2}, prob={result.c2_probability:.2f}")
print()
# =========================================================================
# Batch Analysis
# =========================================================================
print("=" * 60)
print("Batch Analysis")
print("=" * 60)
# Create multiple connection groups for batch processing
connection_groups = []
# Group 1: Normal web browsing (variable sizes, multiple destinations)
web_group = []
for i, dest in enumerate(['93.184.216.34', '151.101.1.140', '172.217.14.206']):
for j in range(3):
web_group.append({
'timestamp': timestamp + (i * 10) + j,
'dst_ip': dest,
'dst_port': 443,
'bytes_sent': 100 + (j * 50),
'bytes_recv': 5000 + (j * 1000),
})
connection_groups.append(web_group)
# Group 2: Potential C2 beacon
beacon_group = []
for i in range(10):
beacon_group.append({
'timestamp': timestamp + (i * 60),
'dst_ip': '45.33.32.156',
'dst_port': 8080,
'bytes_sent': 200,
'bytes_recv': 500,
})
connection_groups.append(beacon_group)
# Group 3: Database connection pool
db_group = []
for i in range(15):
db_group.append({
'timestamp': timestamp + (i * 0.5),
'dst_ip': '10.0.1.100',
'dst_port': 5432,
'bytes_sent': 100 + (i * 10),
'bytes_recv': 2000 + (i * 500),
})
connection_groups.append(db_group)
# Analyze all groups at once
results = sentinel.analyze_batch(connection_groups)
for i, result in enumerate(results):
print(f"Group {i+1}: is_c2={result.is_c2}, prob={result.c2_probability:.2f}, "
f"pattern={result.matched_legitimate_pattern or 'None'}")
print()
# =========================================================================
# Reconnaissance Features
# =========================================================================
print("=" * 60)
print("Reconnaissance Features")
print("=" * 60)
# IP Analysis
print("\nIP Analysis:")
ip_info = sentinel.recon.analyze_ip('104.16.132.229')
print(f" IP: 104.16.132.229")
print(f" Valid: {ip_info['is_valid']}")
print(f" Private: {ip_info['is_private']}")
print(f" CDN: {ip_info['is_cdn']}")
if ip_info['cdn_provider']:
print(f" CDN Provider: {ip_info['cdn_provider']}")
# Connection Pattern Analysis
print("\nConnection Pattern Analysis:")
patterns = sentinel.recon.analyze_connection_patterns(beacon_group)
print(f" Mean Interval: {patterns['timing']['mean_interval']:.2f}s")
print(f" Interval CV: {patterns['timing']['interval_cv']:.4f}")
print(f" Mean Bytes Sent: {patterns['volume']['mean_bytes_sent']:.0f}")
print(f" Single Destination: {patterns['behavioral']['single_destination']}")
# IOC Generation (only if C2 detected)
print("\nIOC Generation:")
beacon_result = sentinel.analyze(beacon_group)
if beacon_result.is_c2:
iocs = sentinel.recon.generate_iocs(beacon_group, beacon_result.to_dict())
print(f" IPs: {iocs['ips']}")
print(f" Ports: {iocs['ports']}")
print(f" Timing Signature: {iocs['timing_signatures']}")
print()
# =========================================================================
# Log File Parsing
# =========================================================================
print("=" * 60)
print("Log File Parsing")
print("=" * 60)
# Example with JSON log format
json_logs = [
'{"timestamp": 1705600000, "dst_ip": "10.0.0.1", "dst_port": 443, "bytes_sent": 200, "bytes_recv": 500}',
'{"timestamp": 1705600060, "dst_ip": "10.0.0.1", "dst_port": 443, "bytes_sent": 200, "bytes_recv": 500}',
'{"timestamp": 1705600120, "dst_ip": "10.0.0.1", "dst_port": 443, "bytes_sent": 200, "bytes_recv": 500}',
'{"timestamp": 1705600180, "dst_ip": "10.0.0.1", "dst_port": 443, "bytes_sent": 200, "bytes_recv": 500}',
'{"timestamp": 1705600240, "dst_ip": "10.0.0.1", "dst_port": 443, "bytes_sent": 200, "bytes_recv": 500}',
]
results = sentinel.analyze_logs(json_logs, group_by_dst=True)
print(f"Analyzed {len(json_logs)} log lines")
for dst, result in results.items():
print(f" {dst}: is_c2={result.is_c2}, prob={result.c2_probability:.2f}")
print()
# =========================================================================
# Result Object Details
# =========================================================================
print("=" * 60)
print("Full Result Object")
print("=" * 60)
result = sentinel.analyze(beacon_group)
result_dict = result.to_dict()
for key, value in result_dict.items():
if isinstance(value, list) and len(value) > 3:
print(f" {key}: [{value[0]}, {value[1]}, ... ({len(value)} items)]")
else:
print(f" {key}: {value}")
if __name__ == '__main__':
main()